Add timeseries, discriminator

This commit is contained in:
George Suntres
2026-04-17 11:38:12 -04:00
parent cba3e326e8
commit 99b36e577e
18 changed files with 897 additions and 130 deletions

28
.test/event.json Normal file
View File

@@ -0,0 +1,28 @@
{
"_group": "system_collections",
"_name": "event",
"_version": "202507",
"singular": "event",
"plural": "events",
"idPrefix": "e",
"indexSpecs": [],
"timeseries": {
"timeField": "createdAt",
"metaField": "name",
"granularity": "minutes"
},
"schema": {
"type": "object",
"properties": {
"_id": {
"bsonType": "string"
},
"name": {
"bsonType": "string"
},
"createdAt": {
"bsonType": "timestamp"
}
}
}
}

35
.test/offer.json Normal file
View File

@@ -0,0 +1,35 @@
{
"_group": "system_collections",
"_name": "offer",
"_version": "202507",
"singular": "offer",
"plural": "offers",
"idPrefix": "ofr",
"indexSpecs": [{
"name": "name_1",
"keys": { "name": 1 },
"unique": true
}],
"discriminator": {
"field": "store",
"ctxField": "store",
"collection": "store"
},
"schema": {
"type": "object",
"properties": {
"_id": {
"bsonType": "string"
},
"name": {
"bsonType": "string"
},
"store": {
"bsonType": "string"
},
"createdAt": {
"bsonType": "date"
}
}
}
}

36
.test/store.json Normal file
View File

@@ -0,0 +1,36 @@
{
"_group": "system_collections",
"_name": "store",
"_version": "202507",
"singular": "store",
"plural": "stores",
"idPrefix": "str",
"indexSpecs": [{
"name": "code_1",
"keys": { "code": 1 },
"unique": true
}],
"schema": {
"type": "object",
"properties": {
"_id": {
"bsonType": "string"
},
"code": {
"bsonType": "string"
},
"name": {
"bsonType": "string"
},
"created_at": {
"bsonType": "date"
},
"updated_at": {
"bsonType": "date"
},
"archived_at": {
"bsonType": "date"
}
}
}
}

47
discrimination.go Normal file
View File

@@ -0,0 +1,47 @@
package mongo
import (
"log"
"fmt"
"context"
"go.mongodb.org/mongo-driver/v2/bson"
)
func (c *MongoClient) DiscriminatorCheckAndApplyToData(ctx context.Context, name string, data map[string]any) error {
cdef, ok := c.Registry[name]
if ok && cdef.Discriminator != nil {
log.Printf("Discriminator found for %s; will use it", name)
// get from context
vAny := ctx.Value(cdef.Discriminator.CtxField)
if vAny == nil {
return fmt.Errorf("discriminator field required for %s", name)
}
// update payload
v := vAny.(string)
data[cdef.Discriminator.Field] = v
}
return nil
}
func (c *MongoClient) DiscriminatorCheckAndApplyToFilter(ctx context.Context, name string, filter bson.M) error {
cdef, ok := c.Registry[name]
if ok && cdef.Discriminator != nil {
log.Printf("Discriminator found for %s; will use it", name)
// get from context
vAny := ctx.Value(cdef.Discriminator.CtxField)
if vAny == nil {
return fmt.Errorf("discriminator field required for %s", name)
}
// update payload
v := vAny.(string)
filter[cdef.Discriminator.Field] = bson.M{"$eq": v}
}
return nil
}

72
ensure.go Normal file
View File

@@ -0,0 +1,72 @@
package mongo
import (
"time"
"fmt"
"go.mongodb.org/mongo-driver/v2/bson"
"github.com/matoous/go-nanoid/v2"
)
const alphabet = "0123456789abcdefghijclmnopqrstuvwxyz"
const maxLen = 10
// ensureId adds the id property when missing or when it's an empty string.
func ensureId(data bson.M, idPrefix string) string {
maybeId, hasId := data["_id"]
var id, finalId string
if !hasId || maybeId == "" {
id, _ = gonanoid.Generate(alphabet, maxLen)
if idPrefix != "" {
finalId = fmt.Sprintf("%s_%s", idPrefix, id)
} else {
finalId = id
}
data["_id"] = finalId
}
return finalId
}
func ensureNoId(data bson.M) {
if _, ok := data["_id"]; ok {
delete(data, "_id")
}
}
func ensureExistingCreatedAt(data bson.M, old bson.M) time.Time {
var cAt time.Time
cAtAny, ok := old["createdAt"]
if ok {
switch cAtAny.(type) {
case bson.DateTime:
cAtBson := cAtAny.(bson.DateTime)
cAt = cAtBson.Time()
default:
cAt = cAtAny.(time.Time)
}
data["createdAt"] = cAt
}
return cAt
}
func ensureCreatedAt(data bson.M) time.Time {
now := time.Now().UTC().Truncate(time.Millisecond)
data["createdAt"] = now
return now
}
func ensureUpdatedAt(data bson.M) time.Time {
now := time.Now().UTC().Truncate(time.Millisecond)
data["updatedAt"] = now
return now
}

90
ensure_test.go Normal file
View File

@@ -0,0 +1,90 @@
package mongo
import (
"testing"
"time"
"testing/synctest"
"go.mongodb.org/mongo-driver/v2/bson"
)
func TestEnsureId(t *testing.T) {
data := map[string]any {
"Name": "My Name Is",
}
ensureId(data, "")
if id, okid := data["_id"]; !okid || id == "" {
t.Fatal("Failed to add Id")
}
}
func TestEnsureId_EmptyId(t *testing.T) {
data := map[string]any {
"_id": "myidxxxxxx",
"name": "My Name Is",
}
ensureId(data, "")
if data["_id"] == "" {
t.Fatal("Id was updated")
}
}
func TestEnsureId_ExistingId(t *testing.T) {
data := map[string]any {
"_id": "",
"name": "My Name Is",
}
ensureId(data, "")
if id, okid := data["_id"]; !okid || id == "" {
t.Fatal("Failed to add Id")
}
}
func TestEnsureCreatedAt(t *testing.T) {
data := map[string]any {
"name": "My Name Is",
}
synctest.Test(t, func(t *testing.T) {
now := ensureCreatedAt(data)
if createdAt, _ := data["createdAt"]; createdAt != now {
t.Fatal("Failed to add CreatedAt")
}
})
}
func TestEnsureExistingCreatedAt(t *testing.T) {
tm, err := time.Parse(time.RFC3339, "2025-12-24T13:00:11Z")
if err != nil { t.Fatal(err) }
// t.Fatalf("==> %T", tm)
old := bson.M{"createdAt": tm}
data := bson.M{}
ensureExistingCreatedAt(data, old)
if data["createdAt"] != tm {
t.Fatal("wrong date")
}
}
func TestEnsureExistingCreatedAt_NoField(t *testing.T) {
old := bson.M{"name": "I have no created at"}
data := bson.M{}
ensureExistingCreatedAt(data, old)
if _, ok := data["createdAt"]; ok {
t.Fatal("Should had no createdAt")
}
}

View File

@@ -16,6 +16,10 @@ func (c *MongoClient) Find(ctx context.Context, database, name string, filter bs
sort := bson.M{"_id": 1}
if err := c.DiscriminatorCheckAndApplyToFilter(ctx, name, filter); err != nil {
return nil, err
}
pipeline := BuildPaginationPipeline(0, pageSize + 1, filter, sort)
// 2. Query

View File

@@ -1,10 +1,14 @@
package mongo
import (
"os"
"context"
"testing"
"encoding/json"
"go.mongodb.org/mongo-driver/v2/bson"
"git.gsuntres.com/general/commons"
)
func TestFind_Default(t *testing.T) {
@@ -49,3 +53,69 @@ func TestFind_Default(t *testing.T) {
t.Fatalf("Expected total to be 1 but found %d", total)
}
}
func TestFind_Discriminator(t *testing.T) {
// 1. Register schemas
schemaStore, err := os.ReadFile("./.test/store.json")
if err != nil { t.Fatal(err) }
schemaOffer, err := os.ReadFile("./.test/offer.json")
if err != nil { t.Fatal(err) }
var store bson.M
if err := json.Unmarshal(schemaStore, &store); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaStore), schemaStore[:4])
}
var offer bson.M
if err := json.Unmarshal(schemaOffer, &offer); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaOffer), schemaOffer[:4])
}
client := GetMongoClient()
client.AddDefinition(store)
client.AddDefinition(offer)
// Save two offers with the similar name in different stores each.
ctx1 := context.Background()
ctx1 = context.WithValue(ctx1, "account", "xxxxxx")
ctx1 = context.WithValue(ctx1, "store", "str_1234")
// One offer in str_1234
offer1 := map[string]any { "name": "OSRAM 1" }
_, err = client.InsertOne(ctx1, "mydb", "offer", offer1)
if err != nil { t.Fatalf("Failed to insertOne %#v", err) }
// The other in str_4321
ctx2 := context.Background()
ctx2 = context.WithValue(ctx2, "account", "xxxxxx")
ctx2 = context.WithValue(ctx2, "store", "str_4321")
offer2 := map[string]any { "name": "OSRAM 2" }
_, err = client.InsertOne(ctx2, "mydb", "offer", offer2)
if err != nil { t.Fatalf("Failed to insertOne %#v", err) }
// Now searching in store str_1234 for OSRAM should return only one
filter := bson.M{"name": bson.M{"$regex": "OSRAM*"}}
findResult, err := client.Find(ctx1, "mydb", "offer", filter, 0)
if err != nil { t.Fatalf("Failed to find %#v", err) }
dataAny, hasData := findResult["data"]
if !hasData { t.Fatal("no data") }
data := dataAny.(bson.A)
if len(data) != 1 {
t.Fatalf("Expected to return 1 document but got %d", len(data))
}
arr, _ := commons.BsonAToSlice(data)
o := arr[0]
name := o["name"]
if name != "OSRAM 1" {
t.Fatalf("Expected OSRAM 1 not %s", name)
}
}

8
go.mod
View File

@@ -3,14 +3,14 @@ module git.gsuntres.com/general/mongo
go 1.25.0
require (
git.gsuntres.com/general/commons v0.0.0-20260329160148-434ecef67a94
git.gsuntres.com/general/commons v0.0.0-20260416141603-7a6c5b6c3c8c
git.gsuntres.com/general/sys v0.0.0-20260329160429-49966ca31027
github.com/go-viper/mapstructure/v2 v2.5.0
github.com/google/go-cmp v0.7.0
github.com/matoous/go-nanoid/v2 v2.1.0
github.com/testcontainers/testcontainers-go v0.41.0
github.com/testcontainers/testcontainers-go/modules/mongodb v0.41.0
go.mongodb.org/mongo-driver/v2 v2.5.0
go.mongodb.org/mongo-driver/v2 v2.5.1
)
require (
@@ -69,9 +69,9 @@ require (
go.opentelemetry.io/otel/metric v1.41.0 // indirect
go.opentelemetry.io/otel/trace v1.41.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/text v0.36.0 // indirect
google.golang.org/grpc v1.79.1 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

16
go.sum
View File

@@ -1,7 +1,7 @@
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
git.gsuntres.com/general/commons v0.0.0-20260329160148-434ecef67a94 h1:U0R2Mg00oC9dWNkXWZMRCBj8Fx6out3HHNIR24srWJ8=
git.gsuntres.com/general/commons v0.0.0-20260329160148-434ecef67a94/go.mod h1:gVqoj8oD7D81CnU7vWZbv2jbSYXQDtHBcXs4t6E3rWM=
git.gsuntres.com/general/commons v0.0.0-20260416141603-7a6c5b6c3c8c h1:3jh+CfW0j4JQOa1tRN7UFWcbv5LxBRdHAeM8SoPDbJM=
git.gsuntres.com/general/commons v0.0.0-20260416141603-7a6c5b6c3c8c/go.mod h1:ZawSPCI/Irjx7P83qJRcknKGuLLJ9c7hhP4OXgILnCY=
git.gsuntres.com/general/sys v0.0.0-20260329160429-49966ca31027 h1:4pmcjxEDM4rzv+iimQ7wTgCAQ1VnAoeGiHLuf6wC6Fw=
git.gsuntres.com/general/sys v0.0.0-20260329160429-49966ca31027/go.mod h1:OVs7w4/tJO1GT7cLIeEsb90LuZqH2xYIVQODI5P1GJs=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk=
@@ -132,8 +132,8 @@ github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfS
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.mongodb.org/mongo-driver/v2 v2.5.0 h1:yXUhImUjjAInNcpTcAlPHiT7bIXhshCTL3jVBkF3xaE=
go.mongodb.org/mongo-driver/v2 v2.5.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0=
go.mongodb.org/mongo-driver/v2 v2.5.1 h1:j2U/Qp+wvueSpqitLCSZPT/+ZpVc1xzuwdHWwl7d8ro=
go.mongodb.org/mongo-driver/v2 v2.5.1/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
@@ -164,8 +164,8 @@ golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -185,8 +185,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -2,18 +2,10 @@ package mongo
import (
"context"
"time"
"fmt"
"go.mongodb.org/mongo-driver/v2/bson"
"github.com/matoous/go-nanoid/v2"
// "git.gsuntres.com/boxtep/boxtep/core"
)
const alphabet = "0123456789abcdefghijclmnopqrstuvwxyz"
const maxLen = 10
// InsertOneWithStruct can be used to insert defined structs.
func (c *MongoClient) InsertOneFromStruct(ctx context.Context, database, name string, data any) (bson.M, error) {
@@ -31,6 +23,10 @@ func (c *MongoClient) InsertOne(ctx context.Context, database, name string, data
prepareForInsert(data, c.GetIdPrefix(name))
if err := c.DiscriminatorCheckAndApplyToData(ctx, name, data); err != nil {
return nil, err
}
if _, err := collection.InsertOne(ctx, data); err != nil {
return nil, err
}
@@ -47,37 +43,4 @@ func prepareForInsert(data bson.M, idPrefix string) {
ensureUpdatedAt(data)
}
// ensureId adds the id property when missing or when it's an empty string.
func ensureId(data bson.M, idPrefix string) string {
maybeId, hasId := data["_id"]
var id, finalId string
if !hasId || maybeId == "" {
id, _ = gonanoid.Generate(alphabet, maxLen)
if idPrefix != "" {
finalId = fmt.Sprintf("%s_%s", idPrefix, id)
} else {
finalId = id
}
data["_id"] = finalId
}
return finalId
}
func ensureCreatedAt(data bson.M) time.Time {
now := time.Now().UTC().Truncate(time.Millisecond)
data["created_at"] = now
return now
}
func ensureUpdatedAt(data bson.M) time.Time {
now := time.Now().UTC().Truncate(time.Millisecond)
data["updated_at"] = now
return now
}

View File

@@ -5,7 +5,6 @@ import (
"context"
"strings"
"testing"
"testing/synctest"
"encoding/json"
"go.mongodb.org/mongo-driver/v2/bson"
@@ -122,54 +121,118 @@ func TestPrepareForInsert_ExistingId(t *testing.T) {
}
}
func TestEnsureId(t *testing.T) {
data := map[string]any {
"Name": "My Name Is",
func TestInsertOne_Discriminate(t *testing.T) {
// 1. Register schemas
schemaStore, err := os.ReadFile("./.test/store.json")
if err != nil { t.Fatal(err) }
schemaOffer, err := os.ReadFile("./.test/offer.json")
if err != nil { t.Fatal(err) }
var store bson.M
if err := json.Unmarshal(schemaStore, &store); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaStore), schemaStore[:4])
}
ensureId(data, "")
if id, okid := data["_id"]; !okid || id == "" {
t.Fatal("Failed to add Id")
var offer bson.M
if err := json.Unmarshal(schemaOffer, &offer); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaOffer), schemaOffer[:4])
}
client := GetMongoClient()
client.AddDefinition(store)
client.AddDefinition(offer)
// Test
ctx := context.Background()
ctx = context.WithValue(ctx, "account", "xxxxxx")
ctx = context.WithValue(ctx, "store", "str_1234")
// 2. Insert data
offer1 := map[string]any {
"name": "Offer 1",
}
saved, err := client.InsertOne(ctx, "mydb", "offer", offer1)
if err != nil { t.Fatalf("Failed to insertOne %#v", err) }
v, ok := saved["store"]
if !ok || v != "str_1234" {
t.Fatalf("Should have set store not %s", v)
}
}
func TestEnsureId_EmptyId(t *testing.T) {
data := map[string]any {
"_id": "myidxxxxxx",
"name": "My Name Is",
func TestInsertOne_Discriminate_NoStore(t *testing.T) {
// 1. Register schemas
schemaStore, err := os.ReadFile("./.test/store.json")
if err != nil { t.Fatal(err) }
schemaOffer, err := os.ReadFile("./.test/offer.json")
if err != nil { t.Fatal(err) }
var store bson.M
if err := json.Unmarshal(schemaStore, &store); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaStore), schemaStore[:4])
}
ensureId(data, "")
if data["_id"] == "" {
t.Fatal("Id was updated")
var offer bson.M
if err := json.Unmarshal(schemaOffer, &offer); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaOffer), schemaOffer[:4])
}
client := GetMongoClient()
client.AddDefinition(store)
client.AddDefinition(offer)
// Test
ctx := context.Background()
// 2. Insert data
offer1 := map[string]any {
"name": "Offer 1",
}
_, err = client.InsertOne(ctx, "mydb", "offer", offer1)
if err == nil {
t.Fatal("Should have required store")
}
}
func TestEnsureId_ExistingId(t *testing.T) {
data := map[string]any {
"_id": "",
"name": "My Name Is",
func TestInsertOne_Discriminate_EnsureStore(t *testing.T) {
// 1. Register schemas
schemaStore, err := os.ReadFile("./.test/store.json")
if err != nil { t.Fatal(err) }
schemaOffer, err := os.ReadFile("./.test/offer.json")
if err != nil { t.Fatal(err) }
var store bson.M
if err := json.Unmarshal(schemaStore, &store); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaStore), schemaStore[:4])
}
ensureId(data, "")
if id, okid := data["_id"]; !okid || id == "" {
t.Fatal("Failed to add Id")
var offer bson.M
if err := json.Unmarshal(schemaOffer, &offer); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaOffer), schemaOffer[:4])
}
}
func TestEnsureCreatedAt(t *testing.T) {
data := map[string]any {
"name": "My Name Is",
}
synctest.Test(t, func(t *testing.T) {
now := ensureCreatedAt(data)
if createdAt, _ := data["created_at"]; createdAt != now {
t.Fatal("Failed to add CreatedAt")
}
})
client := GetMongoClient()
client.AddDefinition(store)
client.AddDefinition(offer)
// Test
ctx := context.Background()
ctx = context.WithValue(ctx, "store", "str_1234")
// 2. Insert data
offer1 := map[string]any {
"name": "Offer 100",
"store": "str_0000",
}
saved, err := client.InsertOne(ctx, "mydb", "offer", offer1)
if saved["store"] != "str_1234" {
t.Fatal("Wrong store")
}
}

147
main.go
View File

@@ -19,6 +19,10 @@ import (
"git.gsuntres.com/general/sys"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}
// WithiSessionFunc will run operations on the database within the same session.
// If this functions returns an error, the system will rollback the transaction.
// Sessions require a resplica set.
@@ -38,14 +42,29 @@ type IMongoClient interface {
GetCollection(database, name string) *mongo.Collection
}
type Timeseries struct {
TimeField string `bson:"timeField"`
MetaField string `bson:"metaField"`
Granularity string `bson:"granularity"`
}
type Discriminator struct {
Field string `bson:"field"`
CtxField string `bson:"ctxField"`
Collection string `bson:"collection"`
Required bool `bson:"required,omitempty"`
}
type CollectionDefinition struct {
Name string `bson:"_name"`
Singular string `bson:"singular"`
Plural string `bson:"plural"`
IdPrefix string `bson:"idPrefix"`
IndexSpecs []map[string]any `bson:"indexSpecs"`
Schema map[string]any `bson:"schema"`
Views map[string]any `bson:"views"`
Name string `bson:"_name"`
Singular string `bson:"singular"`
Plural string `bson:"plural"`
IdPrefix string `bson:"idPrefix"`
IndexSpecs []map[string]any `bson:"indexSpecs"`
Schema map[string]any `bson:"schema"`
Views map[string]any `bson:"views"`
Timeseries *Timeseries `bson:"timeseries,omitempty"`
Discriminator *Discriminator
}
// func (cd *CollectionDefinition) GetSchema(name string)
@@ -75,34 +94,6 @@ func (c *MongoClient) GetIdPrefix(name string) string {
return ""
}
const ADD_DEFINITION_SCHEMA = `
{
"type": "object",
"properties": {
"_name": {
"type": "string"
},
"idPrefix": {
"type": "string"
},
"indexSpecs": {
"type": "array"
},
"singular": {
"type": "string"
},
"system": {
"type": "boolean"
},
"plural": {
"type": "string"
}
},
"required": ["_name", "singular", "plural"],
"additionalProperties": true
}
`
func (c *MongoClient) AddDefinition(data map[string]any) {
if valid := commons.Validate(ADD_DEFINITION_SCHEMA, data); valid != nil {
log.Printf("failed to register data: %v", valid)
@@ -154,6 +145,7 @@ func (c *MongoClient) GetCollection(database, name string) *mongo.Collection {
if ok {
log.Printf("Schema found for %s; will use it", name)
ApplyTimeSeries(cdef, opts)
ApplySchema(cdef, opts)
} else {
log.Printf("No schema for %s", name)
@@ -168,16 +160,28 @@ func (c *MongoClient) GetCollection(database, name string) *mongo.Collection {
collection := db.Collection(name)
c.CreateIndexes(collection, cdef)
c.CreateViews(db, cdef)
return collection
}
}
func ApplyTimeSeries(cdef *CollectionDefinition, opts *options.CreateCollectionOptionsBuilder) {
if cdef.Timeseries != nil {
tsOpts := options.TimeSeries().
SetTimeField(cdef.Timeseries.TimeField).
SetMetaField(cdef.Timeseries.MetaField).
SetGranularity(cdef.Timeseries.Granularity)
opts.SetTimeSeriesOptions(tsOpts)
} else {
log.Printf("No timeseries")
}
}
func ApplySchema(cdef *CollectionDefinition, opts *options.CreateCollectionOptionsBuilder) {
// Add schema validation
if cdef.Schema != nil {
if cdef.Schema != nil && cdef.Timeseries == nil {
schemaBson, err := bson.Marshal(cdef.Schema)
if err != nil {
log.Printf("failed to parse schema: %v", err)
@@ -191,7 +195,7 @@ func ApplySchema(cdef *CollectionDefinition, opts *options.CreateCollectionOptio
opts.SetValidator(validator)
}
} else {
log.Printf("Invalid schema, do nothing.")
log.Printf("Validation disabled")
}
}
@@ -353,3 +357,70 @@ func MapToBsonD(m map[string]any) (bson.D, error) {
return d, nil
}
const ADD_DEFINITION_SCHEMA = `
{
"type": "object",
"properties": {
"_name": {
"type": "string"
},
"idPrefix": {
"type": "string"
},
"indexSpecs": {
"type": "array"
},
"singular": {
"type": "string"
},
"system": {
"type": "boolean"
},
"plural": {
"type": "string"
},
"schema": {
"type": "object"
},
"views": {
"type": "object"
},
"timeseries": {
"type": "object",
"properties": {
"timeField": {
"type": "string"
},
"metaField": {
"type": "string"
},
"granularity": {
"enum": [ "seconds", "minutes", "hours"]
}
},
"required": ["timeField", "metaField", "granularity"]
},
"discriminator": {
"type": "object",
"properties": {
"field": {
"type": "string"
},
"ctxField": {
"type": "string"
},
"collection": {
"type": "string"
},
"required": {
"type": "boolean"
}
},
"required": ["field", "ctxField", "collection"]
}
},
"required": ["_name", "singular", "plural"],
"additionalProperties": true
}
`

View File

@@ -1,9 +1,12 @@
package mongo
import (
"fmt"
"os"
"context"
"testing"
"time"
"encoding/json"
"go.mongodb.org/mongo-driver/v2/bson"
)
@@ -54,3 +57,59 @@ func TestCreateIndexes(t *testing.T) {
t.Fatal("Should have register index idx_1")
}
}
func TestCreateTimeSeries(t *testing.T) {
// 1. Register schemas
schemaEvent, err := os.ReadFile("./.test/event.json")
if err != nil { t.Fatal(err) }
var event bson.M
if err := json.Unmarshal(schemaEvent, &event); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaEvent), schemaEvent[:4])
}
client := GetMongoClient()
client.AddDefinition(event)
// 2. Insert data
event1 := map[string]any {
"name": "event1",
"createdAt": time.Now(),
}
_, err = client.InsertOne(context.Background(), "mydb", "event", event1)
if err != nil { t.Fatalf("Failed to insertOne %#v", err) }
c := client.GetCollection("mydb", "event")
db := c.Database()
cmd := bson.D{
{Key: "listCollections", Value: 1},
{Key: "filter", Value: bson.D{
{Key: "name", Value: "event"},
}},
}
var result struct {
Cursor struct {
FirstBatch []bson.M `bson:"firstBatch"`
} `bson:"cursor"`
}
err = db.RunCommand(context.Background(), cmd).Decode(&result)
if err != nil {
t.Fatal(err)
}
coll := result.Cursor.FirstBatch[0]
if coll["type"] != "timeseries" {
fmt.Println("❌ time series collection")
}
if opts, ok := coll["options"].(bson.M); ok {
if _, ok := opts["timeseries"]; !ok {
t.Fatal("❌ time series collection")
}
}
}

View File

@@ -51,6 +51,7 @@ func TestMain(m *testing.M) {
MongoUri: endpoint,
MongoUser: user,
MongoPass: pass,
MongoDebugQuery: false,
})
// 3. Run tests

View File

@@ -14,7 +14,7 @@ import (
// CreateViews will create views for the given collection and definition.
func (c *MongoClient) CreateViews(db *mongo.Database, cdef *CollectionDefinition) {
if cdef == nil || cdef.Views == nil {
log.Printf("No definitions will not create views")
log.Printf("No definition for views found.")
return
}

47
replace.go Normal file
View File

@@ -0,0 +1,47 @@
package mongo
import (
"context"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
)
func (c *MongoClient) Replace(ctx context.Context, database, name string, id string, data bson.M) (bson.M, error) {
collection := c.GetCollection(database, name)
filter := map[string]any { "_id": id }
var found bson.M
err := collection.FindOne(ctx, filter).Decode(&found)
if err != nil {
return nil, err
}
prepareForReplace(data, found)
if err := c.DiscriminatorCheckAndApplyToData(ctx, name, data); err != nil {
return nil, err
}
updateResult, err := collection.ReplaceOne(ctx, filter, data)
if err != nil {
return nil, err
}
PostReplace(updateResult, data, id)
return data, nil
}
func PostReplace(updateResult *mongo.UpdateResult, data bson.M, id string) {
if updateResult.ModifiedCount == 1 {
data["_id"] = id
}
}
func prepareForReplace(data bson.M, old bson.M) {
ensureNoId(data)
ensureUpdatedAt(data)
ensureExistingCreatedAt(data, old)
}

181
replace_test.go Normal file
View File

@@ -0,0 +1,181 @@
package mongo
import (
"os"
"context"
"testing"
"encoding/json"
"go.mongodb.org/mongo-driver/v2/bson"
)
func TestReplace(t *testing.T) {
data := map[string]any {
"_id": "su_2b3c00",
"name": "Ava Thompson",
"age": int32(26),
}
ctx := context.Background()
o, err := client.InsertOne(ctx, "mydb", "mycollection", data)
if err != nil { t.Fatalf("Failed to insertOne %#v", err) }
id := o["_id"].(string)
// first we retrieve the entity
fetched, err := client.GetOne(ctx, "mydb", "mycollection", id)
if err != nil { t.Fatalf("Failed to fetch %#v", err) }
fetched["name"] = "Noah Patel"
replaced, err := client.Replace(ctx, "mydb", "mycollection", id, fetched)
if err != nil { t.Fatalf("Failed to replace %#v", err) }
// t.Fatalf("-> %v", replaced)
if replaced["_id"] != fetched["_id"] {
t.Fatalf("Not the same entity")
}
if replaced["name"] != "Noah Patel" {
t.Fatal("unexpected name")
}
}
// Make sure discrimination field is used when replacing.
func TestReplace_Discrimination(t *testing.T) {
// 1. Register schemas
schemaStore, err := os.ReadFile("./.test/store.json")
if err != nil { t.Fatal(err) }
schemaOffer, err := os.ReadFile("./.test/offer.json")
if err != nil { t.Fatal(err) }
var store bson.M
if err := json.Unmarshal(schemaStore, &store); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaStore), schemaStore[:4])
}
var offer bson.M
if err := json.Unmarshal(schemaOffer, &offer); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaOffer), schemaOffer[:4])
}
client := GetMongoClient()
client.AddDefinition(store)
client.AddDefinition(offer)
// Save data
ctx1 := context.Background()
ctx1 = context.WithValue(ctx1, "account", "xxxxxx")
ctx1 = context.WithValue(ctx1, "store", "str_1234")
// Save offer
offer1 := map[string]any { "name": "OSRAM 10" }
saved, err := client.InsertOne(ctx1, "mydb", "offer", offer1)
if err != nil { t.Fatalf("Failed to insertOne %#v", err) }
id := saved["_id"].(string)
// // Now replace it
saved["name"] = "** OSRAM 10 CHANGED **"
replaced, err := client.Replace(ctx1, "mydb", "offer", id, saved)
if err != nil { t.Fatalf("Failed to replace %#v", err) }
if replaced["name"] != "** OSRAM 10 CHANGED **" {
t.Fatal("Unexpected replaced object")
}
found, err := client.GetOne(ctx1, "mydb", "offer", id)
if err != nil { t.Fatalf("Failed to find %#v", err) }
if found["name"] != "** OSRAM 10 CHANGED **" {
t.Fatal("Updates did not persist")
}
}
func TestReplace_Discrimination_NoStore(t *testing.T) {
// 1. Register schemas
schemaStore, err := os.ReadFile("./.test/store.json")
if err != nil { t.Fatal(err) }
schemaOffer, err := os.ReadFile("./.test/offer.json")
if err != nil { t.Fatal(err) }
var store bson.M
if err := json.Unmarshal(schemaStore, &store); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaStore), schemaStore[:4])
}
var offer bson.M
if err := json.Unmarshal(schemaOffer, &offer); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaOffer), schemaOffer[:4])
}
client := GetMongoClient()
client.AddDefinition(store)
client.AddDefinition(offer)
// Save data
ctx1 := context.Background()
ctx1 = context.WithValue(ctx1, "store", "str_1234")
// Save offer
offer1 := map[string]any { "name": "OSRAM 11" }
saved, err := client.InsertOne(ctx1, "mydb", "offer", offer1)
if err != nil { t.Fatalf("Failed to insertOne %#v", err) }
id := saved["_id"].(string)
// // Now replace it
saved["name"] = "** OSRAM 11 CHANGED **"
_, err = client.Replace(context.Background(), "mydb", "offer", id, saved)
if err == nil {
t.Fatal("should have required store")
}
}
// Should save the right store regardless of the input
func TestReplace_Discrimination_EnsureStore(t *testing.T) {
// 1. Register schemas
schemaStore, err := os.ReadFile("./.test/store.json")
if err != nil { t.Fatal(err) }
schemaOffer, err := os.ReadFile("./.test/offer.json")
if err != nil { t.Fatal(err) }
var store bson.M
if err := json.Unmarshal(schemaStore, &store); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaStore), schemaStore[:4])
}
var offer bson.M
if err := json.Unmarshal(schemaOffer, &offer); err != nil {
t.Fatalf("Length: %d, First bytes: %x\n", len(schemaOffer), schemaOffer[:4])
}
client := GetMongoClient()
client.AddDefinition(store)
client.AddDefinition(offer)
// Save data
ctx1 := context.Background()
ctx1 = context.WithValue(ctx1, "store", "str_1234")
// Save offer
offer1 := map[string]any { "name": "OSRAM 12" }
saved, err := client.InsertOne(ctx1, "mydb", "offer", offer1)
if err != nil { t.Fatalf("Failed to insertOne %#v", err) }
id := saved["_id"].(string)
// // Now replace it
saved["store"] = "str_0000"
changed, err := client.Replace(ctx1, "mydb", "offer", id, saved)
if err != nil { t.Fatalf("Failed to replace %v", err) }
if changed["store"] != "str_1234" {
t.Fatal("Should have ensured store")
}
}