// Package mongo provides a simple interface to the database. package mongo import ( "context" "log" "slices" "strings" "time" "fmt" "encoding/json" "go.mongodb.org/mongo-driver/v2/event" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" "go.mongodb.org/mongo-driver/v2/bson" "git.gsuntres.com/general/commons" "git.gsuntres.com/general/sys" ) func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) } // WithiSessionFunc will run operations on the database within the same session. // If this functions returns an error, the system will rollback the transaction. // Sessions require a resplica set. type WithinSessionFunc func(context.Context, *mongo.Session) (any, error) type IMongoClient interface { // WithinSession takes a context, the database and the collection and excecutes the callback function provided. WithinSession(context.Context, string, string, WithinSessionFunc) (any, error) // InsertOne will insert data to the specified namespace. InsertOne(context.Context, string, string, bson.M) (bson.M, error) // Find Find(context.Context, string, string, bson.M) ([]bson.M, error) // GetCollection returns the requested collection within an account. It will create it if it doesn't exist. GetCollection(database, name string) *mongo.Collection } type Timeseries struct { TimeField string `bson:"timeField"` MetaField string `bson:"metaField"` Granularity string `bson:"granularity"` } type Discriminator struct { Field string `bson:"field"` CtxField string `bson:"ctxField"` Collection string `bson:"collection"` Required bool `bson:"required,omitempty"` } type CollectionDefinition struct { Name string `bson:"_name"` Singular string `bson:"singular"` Plural string `bson:"plural"` IdPrefix string `bson:"idPrefix"` IndexSpecs []map[string]any `bson:"indexSpecs"` Schema map[string]any `bson:"schema"` Views map[string]any `bson:"views"` Timeseries *Timeseries `bson:"timeseries,omitempty"` Discriminator *Discriminator } // MongoClient type MongoClient struct { // Client the actual connected instance of mongo client. Client *mongo.Client // Debug set to true for dislaying info level logs. Debug bool // DebugQuery set to true to log all queries done. DebugQuery bool // Limit the default limit to use in queries. Limit int64 // DBPrefix optinal string literal to distinquise DBPrefix string // Registry holds critical information about collection's structure, like schema and indexes Registry map[string]*CollectionDefinition } func (c *MongoClient) GetIdPrefix(name string) string { def, ok := c.Registry[name] if ok { return def.IdPrefix } return "" } func (c *MongoClient) AddDefinition(data map[string]any) { if valid := commons.Validate(ADD_DEFINITION_SCHEMA, data); valid != nil { log.Printf("failed to register data: %v", valid) return } log.Printf("Registering %s", data["_name"]) b, err := bson.Marshal(data) if err != nil { log.Printf("failed to marshal: %v", err) return } var cd CollectionDefinition bson.Unmarshal(b, &cd) if len(cd.Name) > 0 { c.Registry[cd.Name] = &cd } } func (c *MongoClient) DropDatabase_DANGER(database string) bool { log.Printf("DANGER attempt to drop database: %s", database) db := c.Client.Database(database) if err := db.Drop(context.Background()); err != nil { log.Printf("Failed to drop database: %v", err) return false } log.Printf("Database %s deleted", database) return true } func (c *MongoClient) GetCollection(database, name string) *mongo.Collection { if c.Debug { log.Printf("Using collection: %s.%s", database, name) } db := c.Client.Database(database) // 1. List existing collections names, err := db.ListCollectionNames(context.TODO(), bson.D{}) if err != nil { log.Printf("Failed to list collections: %#v", err.Error()) return nil } // 2. If collection exist return it, otherwise create it and then return it if slices.Contains(names, name) { return db.Collection(name) } else { opts := options.CreateCollection() // maybe get from schema cdef, ok := c.Registry[name] if ok { log.Printf("Schema found for %s; will use it", name) ApplyTimeSeries(cdef, opts) ApplySchema(cdef, opts) } else { log.Printf("No schema for %s", name) } if err := db.CreateCollection(context.TODO(), name, opts); err != nil { log.Printf("Failed to create collection: %#v", err) return nil } collection := db.Collection(name) c.CreateIndexes(collection, cdef) c.CreateViews(db, cdef) return collection } } func ApplyTimeSeries(cdef *CollectionDefinition, opts *options.CreateCollectionOptionsBuilder) { if cdef.Timeseries != nil { tsOpts := options.TimeSeries(). SetTimeField(cdef.Timeseries.TimeField). SetMetaField(cdef.Timeseries.MetaField). SetGranularity(cdef.Timeseries.Granularity) opts.SetTimeSeriesOptions(tsOpts) } else { log.Printf("No timeseries") } } func ApplySchema(cdef *CollectionDefinition, opts *options.CreateCollectionOptionsBuilder) { // Add schema validation if cdef.Schema != nil && cdef.Timeseries == nil { schemaBson, err := bson.Marshal(cdef.Schema) if err != nil { log.Printf("failed to parse schema: %v", err) } else { var validatorSchema bson.M bson.Unmarshal(schemaBson, &validatorSchema) validator := bson.M{ "$jsonSchema": validatorSchema, } opts.SetValidator(validator) } } else { log.Printf("Validation disabled") } } var client *MongoClient = &MongoClient{ Limit: 10, Registry: make(map[string]*CollectionDefinition, 0), } func GetMongoClient() *MongoClient { return client } const validSchema_StartProps = ` { "type": "object", "properties": { "MongoUri": { "type": "string" }, "MongoUser": { "type": "string" }, "MongoPass": { "type": "string" } }, "required": ["MongoUri"] } ` type MongoStartProps struct { MongoUri string MongoUser string MongoPass string MongoDebugQuery bool MongoDBPrefix string AllowTruncatingDoubles bool } func Start(props *MongoStartProps) error { if err := commons.Validate(validSchema_StartProps, props); err != nil { return err } uri := props.MongoUri user := props.MongoUser pass := props.MongoPass // Create a new client and connect to the server var err error cOptions := options.Client(). ApplyURI(uri). SetAuth(options.Credential{ Username: user, Password: pass, }). SetConnectTimeout(5 * time.Second). SetServerAPIOptions(&options.ServerAPIOptions{ ServerAPIVersion: options.ServerAPIVersion1, }). SetBSONOptions(&options.BSONOptions{ DefaultDocumentM: true, UseJSONStructTags: false, NilSliceAsEmpty: true, NilMapAsEmpty: true, AllowTruncatingDoubles: props.AllowTruncatingDoubles, }). SetRegistry(GetCustomRegistry()) client.DebugQuery = props.MongoDebugQuery if client.DebugQuery { // Debug queries monitor := &event.CommandMonitor{ Started: func(_ context.Context, e *event.CommandStartedEvent) { log.Printf("%d@Start %s#%s %s", e.RequestID, e.DatabaseName, e.CommandName, e.Command) }, Succeeded: func(_ context.Context, e *event.CommandSucceededEvent) { log.Printf("%d@OK in %s", e.RequestID, e.Reply) }, Failed: func(_ context.Context, e *event.CommandFailedEvent) { log.Printf("%d@Fail in %s", e.RequestID, e.Failure) }, } cOptions = cOptions.SetMonitor(monitor) } // set DBPrefix client.DBPrefix = props.MongoDBPrefix if err := cOptions.Validate(); err != nil { log.Fatalf("Failed to validate mongo options: %+v", err.Error()) return err } client.Client, err = mongo.Connect(cOptions) if err != nil { log.Fatalf("Failed to connect: %+v", err.Error()) return err } sys.OnExit(func () { if err := client.Client.Disconnect(context.TODO()); err != nil { log.Fatalf("failed to disconnect: %v", err) } }) // Register defaults.go var structfulDef bson.M json.Unmarshal(structfulJson, &structfulDef) client.AddDefinition(structfulDef) return nil } func GetClient() *mongo.Client { return client.Client } func (c *MongoClient) GetName(account string) string { return fmt.Sprintf("%s%s", c.DBPrefix, account) } func (c *MongoClient) ExtractAccount(name string) string { v, ok := strings.CutPrefix(name, c.DBPrefix) if !ok { return "" } return v } func Stop() error { if err := client.Client.Disconnect(context.TODO()); err != nil { return err } log.Printf("Successfully stopped mongo.") return nil } func MapToBsonD(m map[string]any) (bson.D, error) { data, err := bson.Marshal(m) if err != nil { return nil, err } var d bson.D err = bson.Unmarshal(data, &d) if err != nil { return nil, err } return d, nil } const ADD_DEFINITION_SCHEMA = ` { "type": "object", "properties": { "_name": { "type": "string" }, "idPrefix": { "type": "string" }, "indexSpecs": { "type": "array" }, "singular": { "type": "string" }, "system": { "type": "boolean" }, "plural": { "type": "string" }, "schema": { "type": "object" }, "views": { "type": "object" }, "timeseries": { "type": "object", "properties": { "timeField": { "type": "string" }, "metaField": { "type": "string" }, "granularity": { "enum": [ "seconds", "minutes", "hours"] } }, "required": ["timeField", "metaField", "granularity"] }, "discriminator": { "type": "object", "properties": { "field": { "type": "string" }, "ctxField": { "type": "string" }, "collection": { "type": "string" }, "required": { "type": "boolean" } }, "required": ["field", "ctxField", "collection"] } }, "required": ["_name", "singular", "plural"], "additionalProperties": true } `