Initial import
This commit is contained in:
352
main.go
Normal file
352
main.go
Normal file
@@ -0,0 +1,352 @@
|
||||
// Package mongo provides a simple interface to the database.
|
||||
package mongo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
"fmt"
|
||||
"encoding/json"
|
||||
|
||||
"go.mongodb.org/mongo-driver/v2/event"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
|
||||
"git.gsuntres.com/boxtep/boxtep/core"
|
||||
"git.gsuntres.com/boxtep/boxtep/sys"
|
||||
)
|
||||
|
||||
// WithiSessionFunc will run operations on the database within the same session.
|
||||
// If this functions returns an error, the system will rollback the transaction.
|
||||
// Sessions require a resplica set.
|
||||
type WithinSessionFunc func(context.Context, *mongo.Session) (any, error)
|
||||
|
||||
type IMongoClient interface {
|
||||
// WithinSession takes a context, the database and the collection and excecutes the callback function provided.
|
||||
WithinSession(context.Context, string, string, WithinSessionFunc) (any, error)
|
||||
|
||||
// InsertOne will insert data to the specified namespace.
|
||||
InsertOne(context.Context, string, string, bson.M) (bson.M, error)
|
||||
|
||||
// Find
|
||||
Find(context.Context, string, string, bson.M) ([]bson.M, error)
|
||||
|
||||
// GetCollection returns the requested collection within an account. It will create it if it doesn't exist.
|
||||
GetCollection(database, name string) *mongo.Collection
|
||||
}
|
||||
|
||||
type CollectionDefinition struct {
|
||||
Name string `bson:"_name"`
|
||||
Singular string `bson:"singular"`
|
||||
Plural string `bson: "plural"`
|
||||
IdPrefix string `bson: "idPrefix"`
|
||||
IndexSpecs []map[string]any `bson: "indexSpecs"`
|
||||
Schema map[string]any `bson: "schema"`
|
||||
}
|
||||
|
||||
// func (cd *CollectionDefinition) GetSchema(name string)
|
||||
|
||||
// MongoClient
|
||||
type MongoClient struct {
|
||||
// Client the actual connected instance of mongo client.
|
||||
Client *mongo.Client
|
||||
// Debug set to true for dislaying info level logs.
|
||||
Debug bool
|
||||
// DebugQuery set to true to log all queries done.
|
||||
DebugQuery bool
|
||||
// Limit the default limit to use in queries.
|
||||
Limit int64
|
||||
// DBPrefix optinal string literal to distinquise
|
||||
DBPrefix string
|
||||
// Registry holds critical information about collection's structure, like schema and indexes
|
||||
Registry map[string]*CollectionDefinition
|
||||
}
|
||||
|
||||
func (c *MongoClient) GetIdPrefix(name string) string {
|
||||
def, ok := c.Registry[name]
|
||||
if ok {
|
||||
return def.IdPrefix
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
const ADD_DEFINITION_SCHEMA = `
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"_name": {
|
||||
"type": "string"
|
||||
},
|
||||
"idPrefix": {
|
||||
"type": "string"
|
||||
},
|
||||
"indexSpecs": {
|
||||
"type": "array"
|
||||
},
|
||||
"singular": {
|
||||
"type": "string"
|
||||
},
|
||||
"system": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"plural": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["_name", "singular", "plural"],
|
||||
"additionalProperties": true
|
||||
}
|
||||
`
|
||||
|
||||
func (c *MongoClient) AddDefinition(data map[string]any) {
|
||||
if valid := core.Validate(ADD_DEFINITION_SCHEMA, data); valid != nil {
|
||||
log.Printf("failed to register data: %v", valid)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Registering %s", data["_name"])
|
||||
|
||||
b, err := bson.Marshal(data)
|
||||
if err != nil {
|
||||
log.Printf("failed to marshal: %v", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
var cd CollectionDefinition
|
||||
bson.Unmarshal(b, &cd)
|
||||
|
||||
if len(cd.Name) > 0 {
|
||||
c.Registry[cd.Name] = &cd
|
||||
}
|
||||
}
|
||||
|
||||
func (c *MongoClient) GetCollection(database, name string) *mongo.Collection {
|
||||
if c.Debug {
|
||||
log.Printf("Using collection: %s.%s", database, name)
|
||||
}
|
||||
|
||||
db := c.Client.Database(database)
|
||||
|
||||
// 1. List existing collections
|
||||
names, err := db.ListCollectionNames(context.TODO(), bson.D{})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Failed to list collections: %#v", err.Error())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2. If collection exist return it, otherwise create it and then return it
|
||||
if slices.Contains(names, name) {
|
||||
return db.Collection(name)
|
||||
} else {
|
||||
opts := options.CreateCollection()
|
||||
|
||||
// maybe get from schema
|
||||
cdef, ok := c.Registry[name]
|
||||
if ok {
|
||||
log.Printf("Schema found for %s; will use it", name)
|
||||
|
||||
ApplySchema(cdef, opts)
|
||||
} else {
|
||||
log.Printf("No schema for %s", name)
|
||||
}
|
||||
|
||||
if err := db.CreateCollection(context.TODO(), name, opts); err != nil {
|
||||
log.Printf("Failed to create collection: %#v", err)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
collection := db.Collection(name)
|
||||
|
||||
c.CreateIndexes(collection, cdef)
|
||||
|
||||
return collection
|
||||
}
|
||||
}
|
||||
|
||||
func ApplySchema(cdef *CollectionDefinition, opts *options.CreateCollectionOptionsBuilder) {
|
||||
// Add schema validation
|
||||
if cdef.Schema != nil {
|
||||
schemaBson, err := bson.Marshal(cdef.Schema)
|
||||
if err != nil {
|
||||
log.Printf("failed to parse schema: %v", err)
|
||||
} else {
|
||||
var validatorSchema bson.M
|
||||
bson.Unmarshal(schemaBson, &validatorSchema)
|
||||
validator := bson.M{
|
||||
"$jsonSchema": validatorSchema,
|
||||
}
|
||||
|
||||
opts.SetValidator(validator)
|
||||
}
|
||||
} else {
|
||||
log.Printf("Invalid schema, do nothing.")
|
||||
}
|
||||
}
|
||||
|
||||
var client *MongoClient = &MongoClient{
|
||||
Limit: 10,
|
||||
Registry: make(map[string]*CollectionDefinition, 0),
|
||||
}
|
||||
|
||||
func GetMongoClient() *MongoClient {
|
||||
return client
|
||||
}
|
||||
|
||||
const validSchema_StartProps = `
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"MongoUri": {
|
||||
"type": "string"
|
||||
},
|
||||
"MongoUser": {
|
||||
"type": "string"
|
||||
},
|
||||
"MongoPass": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["MongoUri"]
|
||||
}
|
||||
`
|
||||
|
||||
type MongoStartProps struct {
|
||||
MongoUri string
|
||||
MongoUser string
|
||||
MongoPass string
|
||||
MongoDebugQuery bool
|
||||
MongoDBPrefix string
|
||||
}
|
||||
|
||||
func Start(props *MongoStartProps) error {
|
||||
if err := core.Validate(validSchema_StartProps, props); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uri := props.MongoUri
|
||||
user := props.MongoUser
|
||||
pass := props.MongoPass
|
||||
|
||||
// Create a new client and connect to the server
|
||||
var err error
|
||||
|
||||
cOptions := options.Client().
|
||||
ApplyURI(uri).
|
||||
SetAuth(options.Credential{
|
||||
Username: user,
|
||||
Password: pass,
|
||||
}).
|
||||
SetConnectTimeout(5 * time.Second).
|
||||
SetServerAPIOptions(&options.ServerAPIOptions{
|
||||
ServerAPIVersion: options.ServerAPIVersion1,
|
||||
}).
|
||||
SetBSONOptions(&options.BSONOptions{
|
||||
DefaultDocumentM: true,
|
||||
UseJSONStructTags: false,
|
||||
NilSliceAsEmpty: true,
|
||||
NilMapAsEmpty: true,
|
||||
}).
|
||||
SetRegistry(GetCustomRegistry())
|
||||
|
||||
client.DebugQuery = props.MongoDebugQuery
|
||||
if client.DebugQuery {
|
||||
// Debug queries
|
||||
monitor := &event.CommandMonitor{
|
||||
Started: func(_ context.Context, e *event.CommandStartedEvent) {
|
||||
log.Printf("%d@Start %s#%s %s", e.RequestID, e.DatabaseName, e.CommandName, e.Command)
|
||||
},
|
||||
Succeeded: func(_ context.Context, e *event.CommandSucceededEvent) {
|
||||
log.Printf("%d@OK in %s", e.RequestID, e.Reply)
|
||||
},
|
||||
Failed: func(_ context.Context, e *event.CommandFailedEvent) {
|
||||
log.Printf("%d@Fail in %s", e.RequestID, e.Failure)
|
||||
},
|
||||
}
|
||||
|
||||
cOptions = cOptions.SetMonitor(monitor)
|
||||
}
|
||||
|
||||
// set DBPrefix
|
||||
client.DBPrefix = props.MongoDBPrefix
|
||||
|
||||
if err := cOptions.Validate(); err != nil {
|
||||
log.Fatalf("Failed to validate mongo options: %+v", err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
client.Client, err = mongo.Connect(cOptions)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect: %+v", err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
sys.OnExit(func () {
|
||||
if err := client.Client.Disconnect(context.TODO()); err != nil {
|
||||
log.Fatalf("failed to disconnect: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Register defaults.go
|
||||
var structfulDef bson.M
|
||||
json.Unmarshal(structfulJson, &structfulDef)
|
||||
|
||||
client.AddDefinition(structfulDef)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetClient() *mongo.Client {
|
||||
return client.Client
|
||||
}
|
||||
|
||||
func (c *MongoClient) GetName(account string) string {
|
||||
return fmt.Sprintf("%s%s", c.DBPrefix, account)
|
||||
}
|
||||
|
||||
func (c *MongoClient) ExtractAccount(name string) string {
|
||||
v, ok := strings.CutPrefix(name, c.DBPrefix)
|
||||
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
return v
|
||||
}
|
||||
|
||||
func Stop() error {
|
||||
if err := client.Client.Disconnect(context.TODO()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Successfully stopped mongo.")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func MapToBsonD(m map[string]any) (bson.D, error) {
|
||||
data, err := bson.Marshal(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var d bson.D
|
||||
|
||||
err = bson.Unmarshal(data, &d)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return d, nil
|
||||
}
|
||||
Reference in New Issue
Block a user