Files
mongo/main.go
George Suntres cba3e326e8 Add views
2026-04-16 11:17:16 -04:00

356 lines
7.7 KiB
Go

// Package mongo provides a simple interface to the database.
package mongo
import (
"context"
"log"
"slices"
"strings"
"time"
"fmt"
"encoding/json"
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/bson"
"git.gsuntres.com/general/commons"
"git.gsuntres.com/general/sys"
)
// WithiSessionFunc will run operations on the database within the same session.
// If this functions returns an error, the system will rollback the transaction.
// Sessions require a resplica set.
type WithinSessionFunc func(context.Context, *mongo.Session) (any, error)
type IMongoClient interface {
// WithinSession takes a context, the database and the collection and excecutes the callback function provided.
WithinSession(context.Context, string, string, WithinSessionFunc) (any, error)
// InsertOne will insert data to the specified namespace.
InsertOne(context.Context, string, string, bson.M) (bson.M, error)
// Find
Find(context.Context, string, string, bson.M) ([]bson.M, error)
// GetCollection returns the requested collection within an account. It will create it if it doesn't exist.
GetCollection(database, name string) *mongo.Collection
}
type CollectionDefinition struct {
Name string `bson:"_name"`
Singular string `bson:"singular"`
Plural string `bson:"plural"`
IdPrefix string `bson:"idPrefix"`
IndexSpecs []map[string]any `bson:"indexSpecs"`
Schema map[string]any `bson:"schema"`
Views map[string]any `bson:"views"`
}
// func (cd *CollectionDefinition) GetSchema(name string)
// MongoClient
type MongoClient struct {
// Client the actual connected instance of mongo client.
Client *mongo.Client
// Debug set to true for dislaying info level logs.
Debug bool
// DebugQuery set to true to log all queries done.
DebugQuery bool
// Limit the default limit to use in queries.
Limit int64
// DBPrefix optinal string literal to distinquise
DBPrefix string
// Registry holds critical information about collection's structure, like schema and indexes
Registry map[string]*CollectionDefinition
}
func (c *MongoClient) GetIdPrefix(name string) string {
def, ok := c.Registry[name]
if ok {
return def.IdPrefix
}
return ""
}
const ADD_DEFINITION_SCHEMA = `
{
"type": "object",
"properties": {
"_name": {
"type": "string"
},
"idPrefix": {
"type": "string"
},
"indexSpecs": {
"type": "array"
},
"singular": {
"type": "string"
},
"system": {
"type": "boolean"
},
"plural": {
"type": "string"
}
},
"required": ["_name", "singular", "plural"],
"additionalProperties": true
}
`
func (c *MongoClient) AddDefinition(data map[string]any) {
if valid := commons.Validate(ADD_DEFINITION_SCHEMA, data); valid != nil {
log.Printf("failed to register data: %v", valid)
return
}
log.Printf("Registering %s", data["_name"])
b, err := bson.Marshal(data)
if err != nil {
log.Printf("failed to marshal: %v", err)
return
}
var cd CollectionDefinition
bson.Unmarshal(b, &cd)
if len(cd.Name) > 0 {
c.Registry[cd.Name] = &cd
}
}
func (c *MongoClient) GetCollection(database, name string) *mongo.Collection {
if c.Debug {
log.Printf("Using collection: %s.%s", database, name)
}
db := c.Client.Database(database)
// 1. List existing collections
names, err := db.ListCollectionNames(context.TODO(), bson.D{})
if err != nil {
log.Printf("Failed to list collections: %#v", err.Error())
return nil
}
// 2. If collection exist return it, otherwise create it and then return it
if slices.Contains(names, name) {
return db.Collection(name)
} else {
opts := options.CreateCollection()
// maybe get from schema
cdef, ok := c.Registry[name]
if ok {
log.Printf("Schema found for %s; will use it", name)
ApplySchema(cdef, opts)
} else {
log.Printf("No schema for %s", name)
}
if err := db.CreateCollection(context.TODO(), name, opts); err != nil {
log.Printf("Failed to create collection: %#v", err)
return nil
}
collection := db.Collection(name)
c.CreateIndexes(collection, cdef)
c.CreateViews(db, cdef)
return collection
}
}
func ApplySchema(cdef *CollectionDefinition, opts *options.CreateCollectionOptionsBuilder) {
// Add schema validation
if cdef.Schema != nil {
schemaBson, err := bson.Marshal(cdef.Schema)
if err != nil {
log.Printf("failed to parse schema: %v", err)
} else {
var validatorSchema bson.M
bson.Unmarshal(schemaBson, &validatorSchema)
validator := bson.M{
"$jsonSchema": validatorSchema,
}
opts.SetValidator(validator)
}
} else {
log.Printf("Invalid schema, do nothing.")
}
}
var client *MongoClient = &MongoClient{
Limit: 10,
Registry: make(map[string]*CollectionDefinition, 0),
}
func GetMongoClient() *MongoClient {
return client
}
const validSchema_StartProps = `
{
"type": "object",
"properties": {
"MongoUri": {
"type": "string"
},
"MongoUser": {
"type": "string"
},
"MongoPass": {
"type": "string"
}
},
"required": ["MongoUri"]
}
`
type MongoStartProps struct {
MongoUri string
MongoUser string
MongoPass string
MongoDebugQuery bool
MongoDBPrefix string
}
func Start(props *MongoStartProps) error {
if err := commons.Validate(validSchema_StartProps, props); err != nil {
return err
}
uri := props.MongoUri
user := props.MongoUser
pass := props.MongoPass
// Create a new client and connect to the server
var err error
cOptions := options.Client().
ApplyURI(uri).
SetAuth(options.Credential{
Username: user,
Password: pass,
}).
SetConnectTimeout(5 * time.Second).
SetServerAPIOptions(&options.ServerAPIOptions{
ServerAPIVersion: options.ServerAPIVersion1,
}).
SetBSONOptions(&options.BSONOptions{
DefaultDocumentM: true,
UseJSONStructTags: false,
NilSliceAsEmpty: true,
NilMapAsEmpty: true,
}).
SetRegistry(GetCustomRegistry())
client.DebugQuery = props.MongoDebugQuery
if client.DebugQuery {
// Debug queries
monitor := &event.CommandMonitor{
Started: func(_ context.Context, e *event.CommandStartedEvent) {
log.Printf("%d@Start %s#%s %s", e.RequestID, e.DatabaseName, e.CommandName, e.Command)
},
Succeeded: func(_ context.Context, e *event.CommandSucceededEvent) {
log.Printf("%d@OK in %s", e.RequestID, e.Reply)
},
Failed: func(_ context.Context, e *event.CommandFailedEvent) {
log.Printf("%d@Fail in %s", e.RequestID, e.Failure)
},
}
cOptions = cOptions.SetMonitor(monitor)
}
// set DBPrefix
client.DBPrefix = props.MongoDBPrefix
if err := cOptions.Validate(); err != nil {
log.Fatalf("Failed to validate mongo options: %+v", err.Error())
return err
}
client.Client, err = mongo.Connect(cOptions)
if err != nil {
log.Fatalf("Failed to connect: %+v", err.Error())
return err
}
sys.OnExit(func () {
if err := client.Client.Disconnect(context.TODO()); err != nil {
log.Fatalf("failed to disconnect: %v", err)
}
})
// Register defaults.go
var structfulDef bson.M
json.Unmarshal(structfulJson, &structfulDef)
client.AddDefinition(structfulDef)
return nil
}
func GetClient() *mongo.Client {
return client.Client
}
func (c *MongoClient) GetName(account string) string {
return fmt.Sprintf("%s%s", c.DBPrefix, account)
}
func (c *MongoClient) ExtractAccount(name string) string {
v, ok := strings.CutPrefix(name, c.DBPrefix)
if !ok {
return ""
}
return v
}
func Stop() error {
if err := client.Client.Disconnect(context.TODO()); err != nil {
return err
}
log.Printf("Successfully stopped mongo.")
return nil
}
func MapToBsonD(m map[string]any) (bson.D, error) {
data, err := bson.Marshal(m)
if err != nil {
return nil, err
}
var d bson.D
err = bson.Unmarshal(data, &d)
if err != nil {
return nil, err
}
return d, nil
}