353 lines
7.7 KiB
Go
353 lines
7.7 KiB
Go
// Package mongo provides a simple interface to the database.
|
|
package mongo
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
"fmt"
|
|
"encoding/json"
|
|
|
|
"go.mongodb.org/mongo-driver/v2/event"
|
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
|
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
|
|
// "git.gsuntres.com/boxtep/boxtep/core"
|
|
"git.gsuntres.com/general/sys"
|
|
)
|
|
|
|
// WithiSessionFunc will run operations on the database within the same session.
|
|
// If this functions returns an error, the system will rollback the transaction.
|
|
// Sessions require a resplica set.
|
|
type WithinSessionFunc func(context.Context, *mongo.Session) (any, error)
|
|
|
|
type IMongoClient interface {
|
|
// WithinSession takes a context, the database and the collection and excecutes the callback function provided.
|
|
WithinSession(context.Context, string, string, WithinSessionFunc) (any, error)
|
|
|
|
// InsertOne will insert data to the specified namespace.
|
|
InsertOne(context.Context, string, string, bson.M) (bson.M, error)
|
|
|
|
// Find
|
|
Find(context.Context, string, string, bson.M) ([]bson.M, error)
|
|
|
|
// GetCollection returns the requested collection within an account. It will create it if it doesn't exist.
|
|
GetCollection(database, name string) *mongo.Collection
|
|
}
|
|
|
|
type CollectionDefinition struct {
|
|
Name string `bson:"_name"`
|
|
Singular string `bson:"singular"`
|
|
Plural string `bson: "plural"`
|
|
IdPrefix string `bson: "idPrefix"`
|
|
IndexSpecs []map[string]any `bson: "indexSpecs"`
|
|
Schema map[string]any `bson: "schema"`
|
|
}
|
|
|
|
// func (cd *CollectionDefinition) GetSchema(name string)
|
|
|
|
// MongoClient
|
|
type MongoClient struct {
|
|
// Client the actual connected instance of mongo client.
|
|
Client *mongo.Client
|
|
// Debug set to true for dislaying info level logs.
|
|
Debug bool
|
|
// DebugQuery set to true to log all queries done.
|
|
DebugQuery bool
|
|
// Limit the default limit to use in queries.
|
|
Limit int64
|
|
// DBPrefix optinal string literal to distinquise
|
|
DBPrefix string
|
|
// Registry holds critical information about collection's structure, like schema and indexes
|
|
Registry map[string]*CollectionDefinition
|
|
}
|
|
|
|
func (c *MongoClient) GetIdPrefix(name string) string {
|
|
def, ok := c.Registry[name]
|
|
if ok {
|
|
return def.IdPrefix
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
const ADD_DEFINITION_SCHEMA = `
|
|
{
|
|
"type": "object",
|
|
"properties": {
|
|
"_name": {
|
|
"type": "string"
|
|
},
|
|
"idPrefix": {
|
|
"type": "string"
|
|
},
|
|
"indexSpecs": {
|
|
"type": "array"
|
|
},
|
|
"singular": {
|
|
"type": "string"
|
|
},
|
|
"system": {
|
|
"type": "boolean"
|
|
},
|
|
"plural": {
|
|
"type": "string"
|
|
}
|
|
},
|
|
"required": ["_name", "singular", "plural"],
|
|
"additionalProperties": true
|
|
}
|
|
`
|
|
|
|
func (c *MongoClient) AddDefinition(data map[string]any) {
|
|
if valid := commons.Validate(ADD_DEFINITION_SCHEMA, data); valid != nil {
|
|
log.Printf("failed to register data: %v", valid)
|
|
|
|
return
|
|
}
|
|
|
|
log.Printf("Registering %s", data["_name"])
|
|
|
|
b, err := bson.Marshal(data)
|
|
if err != nil {
|
|
log.Printf("failed to marshal: %v", err)
|
|
|
|
return
|
|
}
|
|
|
|
var cd CollectionDefinition
|
|
bson.Unmarshal(b, &cd)
|
|
|
|
if len(cd.Name) > 0 {
|
|
c.Registry[cd.Name] = &cd
|
|
}
|
|
}
|
|
|
|
func (c *MongoClient) GetCollection(database, name string) *mongo.Collection {
|
|
if c.Debug {
|
|
log.Printf("Using collection: %s.%s", database, name)
|
|
}
|
|
|
|
db := c.Client.Database(database)
|
|
|
|
// 1. List existing collections
|
|
names, err := db.ListCollectionNames(context.TODO(), bson.D{})
|
|
|
|
if err != nil {
|
|
log.Printf("Failed to list collections: %#v", err.Error())
|
|
|
|
return nil
|
|
}
|
|
|
|
// 2. If collection exist return it, otherwise create it and then return it
|
|
if slices.Contains(names, name) {
|
|
return db.Collection(name)
|
|
} else {
|
|
opts := options.CreateCollection()
|
|
|
|
// maybe get from schema
|
|
cdef, ok := c.Registry[name]
|
|
if ok {
|
|
log.Printf("Schema found for %s; will use it", name)
|
|
|
|
ApplySchema(cdef, opts)
|
|
} else {
|
|
log.Printf("No schema for %s", name)
|
|
}
|
|
|
|
if err := db.CreateCollection(context.TODO(), name, opts); err != nil {
|
|
log.Printf("Failed to create collection: %#v", err)
|
|
|
|
return nil
|
|
}
|
|
|
|
collection := db.Collection(name)
|
|
|
|
c.CreateIndexes(collection, cdef)
|
|
|
|
return collection
|
|
}
|
|
}
|
|
|
|
func ApplySchema(cdef *CollectionDefinition, opts *options.CreateCollectionOptionsBuilder) {
|
|
// Add schema validation
|
|
if cdef.Schema != nil {
|
|
schemaBson, err := bson.Marshal(cdef.Schema)
|
|
if err != nil {
|
|
log.Printf("failed to parse schema: %v", err)
|
|
} else {
|
|
var validatorSchema bson.M
|
|
bson.Unmarshal(schemaBson, &validatorSchema)
|
|
validator := bson.M{
|
|
"$jsonSchema": validatorSchema,
|
|
}
|
|
|
|
opts.SetValidator(validator)
|
|
}
|
|
} else {
|
|
log.Printf("Invalid schema, do nothing.")
|
|
}
|
|
}
|
|
|
|
var client *MongoClient = &MongoClient{
|
|
Limit: 10,
|
|
Registry: make(map[string]*CollectionDefinition, 0),
|
|
}
|
|
|
|
func GetMongoClient() *MongoClient {
|
|
return client
|
|
}
|
|
|
|
const validSchema_StartProps = `
|
|
{
|
|
"type": "object",
|
|
"properties": {
|
|
"MongoUri": {
|
|
"type": "string"
|
|
},
|
|
"MongoUser": {
|
|
"type": "string"
|
|
},
|
|
"MongoPass": {
|
|
"type": "string"
|
|
}
|
|
},
|
|
"required": ["MongoUri"]
|
|
}
|
|
`
|
|
|
|
type MongoStartProps struct {
|
|
MongoUri string
|
|
MongoUser string
|
|
MongoPass string
|
|
MongoDebugQuery bool
|
|
MongoDBPrefix string
|
|
}
|
|
|
|
func Start(props *MongoStartProps) error {
|
|
if err := commons.Validate(validSchema_StartProps, props); err != nil {
|
|
return err
|
|
}
|
|
|
|
uri := props.MongoUri
|
|
user := props.MongoUser
|
|
pass := props.MongoPass
|
|
|
|
// Create a new client and connect to the server
|
|
var err error
|
|
|
|
cOptions := options.Client().
|
|
ApplyURI(uri).
|
|
SetAuth(options.Credential{
|
|
Username: user,
|
|
Password: pass,
|
|
}).
|
|
SetConnectTimeout(5 * time.Second).
|
|
SetServerAPIOptions(&options.ServerAPIOptions{
|
|
ServerAPIVersion: options.ServerAPIVersion1,
|
|
}).
|
|
SetBSONOptions(&options.BSONOptions{
|
|
DefaultDocumentM: true,
|
|
UseJSONStructTags: false,
|
|
NilSliceAsEmpty: true,
|
|
NilMapAsEmpty: true,
|
|
}).
|
|
SetRegistry(GetCustomRegistry())
|
|
|
|
client.DebugQuery = props.MongoDebugQuery
|
|
if client.DebugQuery {
|
|
// Debug queries
|
|
monitor := &event.CommandMonitor{
|
|
Started: func(_ context.Context, e *event.CommandStartedEvent) {
|
|
log.Printf("%d@Start %s#%s %s", e.RequestID, e.DatabaseName, e.CommandName, e.Command)
|
|
},
|
|
Succeeded: func(_ context.Context, e *event.CommandSucceededEvent) {
|
|
log.Printf("%d@OK in %s", e.RequestID, e.Reply)
|
|
},
|
|
Failed: func(_ context.Context, e *event.CommandFailedEvent) {
|
|
log.Printf("%d@Fail in %s", e.RequestID, e.Failure)
|
|
},
|
|
}
|
|
|
|
cOptions = cOptions.SetMonitor(monitor)
|
|
}
|
|
|
|
// set DBPrefix
|
|
client.DBPrefix = props.MongoDBPrefix
|
|
|
|
if err := cOptions.Validate(); err != nil {
|
|
log.Fatalf("Failed to validate mongo options: %+v", err.Error())
|
|
|
|
return err
|
|
}
|
|
|
|
client.Client, err = mongo.Connect(cOptions)
|
|
|
|
if err != nil {
|
|
log.Fatalf("Failed to connect: %+v", err.Error())
|
|
|
|
return err
|
|
}
|
|
|
|
sys.OnExit(func () {
|
|
if err := client.Client.Disconnect(context.TODO()); err != nil {
|
|
log.Fatalf("failed to disconnect: %v", err)
|
|
}
|
|
})
|
|
|
|
// Register defaults.go
|
|
var structfulDef bson.M
|
|
json.Unmarshal(structfulJson, &structfulDef)
|
|
|
|
client.AddDefinition(structfulDef)
|
|
|
|
return nil
|
|
}
|
|
|
|
func GetClient() *mongo.Client {
|
|
return client.Client
|
|
}
|
|
|
|
func (c *MongoClient) GetName(account string) string {
|
|
return fmt.Sprintf("%s%s", c.DBPrefix, account)
|
|
}
|
|
|
|
func (c *MongoClient) ExtractAccount(name string) string {
|
|
v, ok := strings.CutPrefix(name, c.DBPrefix)
|
|
|
|
if !ok {
|
|
return ""
|
|
}
|
|
|
|
return v
|
|
}
|
|
|
|
func Stop() error {
|
|
if err := client.Client.Disconnect(context.TODO()); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("Successfully stopped mongo.")
|
|
|
|
return nil
|
|
}
|
|
|
|
func MapToBsonD(m map[string]any) (bson.D, error) {
|
|
data, err := bson.Marshal(m)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var d bson.D
|
|
|
|
err = bson.Unmarshal(data, &d)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return d, nil
|
|
}
|