From 736434f9a6745c76caedab144b87c1f7730a851c Mon Sep 17 00:00:00 2001 From: nugrahaibnu-ipb Date: Mon, 12 Jan 2026 20:55:34 +0700 Subject: [PATCH] feat(DB Event): DB Event, CDC, Bulog (Log management) --- .gitignore | 2 +- config.cpo | 3 ++ data.log | 0 docs/Bulog.md | 16 ++++++ docs/DB Event.md | 32 +++++++++++ docs/config cpo.md | 11 ++++ example.sawit | Bin 0 -> 8192 bytes examples/create_local/main.go | 13 ++--- examples/dbevent/main.go | 31 +++++++++++ examples/logs/sawit.cpo | 16 ++++++ examples/logs/sawit.log | 0 internal/app/bootstrap.go | 39 ++++++++++++++ internal/config/config.go | 66 +++++++++++++++++++++++ internal/constant/constant.go | 19 +++++++ internal/engine/dbevent.go | 10 ++++ internal/engine/engine.go | 68 +++++++++++++++++------ internal/entity/engine_option.go | 5 ++ internal/event/dbevent_handler.go | 87 ++++++++++++++++++++++++++++++ internal/server/server.go | 5 +- internal/utils/bulog.go | 86 +++++++++++++++++++++++++++++ 20 files changed, 480 insertions(+), 29 deletions(-) create mode 100644 config.cpo create mode 100644 data.log create mode 100644 docs/Bulog.md create mode 100644 docs/DB Event.md create mode 100644 docs/config cpo.md create mode 100644 example.sawit create mode 100644 examples/dbevent/main.go create mode 100644 examples/logs/sawit.cpo create mode 100644 examples/logs/sawit.log create mode 100644 internal/app/bootstrap.go create mode 100644 internal/config/config.go create mode 100644 internal/constant/constant.go create mode 100644 internal/engine/dbevent.go create mode 100644 internal/entity/engine_option.go create mode 100644 internal/event/dbevent_handler.go create mode 100644 internal/utils/bulog.go diff --git a/.gitignore b/.gitignore index fbef8db..0786743 100644 --- a/.gitignore +++ b/.gitignore @@ -33,7 +33,7 @@ go.work.sum # env file .env - +./examples/logs/*.cpo # Editor/IDE # .idea/ # .vscode/ diff --git a/config.cpo b/config.cpo new file mode 100644 index 0000000..30120ba --- /dev/null +++ b/config.cpo @@ -0,0 +1,3 @@ +CDC_ADAPTER=cpo +CDC_FILE_PATH=examples/logs/sawit.cpo +LOG_PATH=examples/logs/sawit.log \ No newline at end of file diff --git a/data.log b/data.log new file mode 100644 index 0000000..e69de29 diff --git a/docs/Bulog.md b/docs/Bulog.md new file mode 100644 index 0000000..6bc63de --- /dev/null +++ b/docs/Bulog.md @@ -0,0 +1,16 @@ +**Bulog** is module to create Log easier and pretty with JSON format. With this format Log can be analyzed more and easy to create aggregation log. + +## Log Level +| Method | Level | Description| +|---------|------|-------------| +| Kabarin | Info | create log as info | +| Awas | Warning | create log as Warning | +| Kacau | Error | create log as Error | +| Dolog | Debug | create log as debug | + +Output example: +``` +{"level":"Info","message":"Give info to user","usage":80,"timestamp":"2026-01-09T11:37:23.392Z"} +{"level":"Warning","message":"CPU usage has been exceeded","usage":90,"timestamp":"2026-01-09T11:37:23.398Z"} +{"level":"Error","message":"No disk space available","usage":100,"timestamp":"2026-01-09T11:37:23.398Z"} +``` \ No newline at end of file diff --git a/docs/DB Event.md b/docs/DB Event.md new file mode 100644 index 0000000..467ad24 --- /dev/null +++ b/docs/DB Event.md @@ -0,0 +1,32 @@ + +**DB Event** is a feature for `sawitDB` to provide Event Drivent capabilities especially regarding Change Data Capture. This feature is listening for event below: +| Event Name | Description| +|---------|-------------------| +| OnTableCreated | Triggered when new table is crated | +| OnTableDropped | Triggered when a table is dropped | +| OnTableSelected | Triggered when select record from a table | +| OnTableInserted | Triggered when insert record into a table | +| OnTableUpdated | Triggered when a table updataed rows | +| OnTableDeleted | Triggered when rows deleted from a table | + +## Enabled CDC (Change Data Capture) +SawitDB is supported for CDC, to enabled CDC you need follow these step +1. Determined **CDC Adapter**, currently we still only support **CPO** adapter. +2. **CDC Adapter** captures change on SawitDB and write as AQL to file `.cpo`, all changed will be recorded on this file. Below how to config CDC and Adapter on `config.cpo` for server side + +``` +CDC_FILE_PATH=./examples/logs/sawit.cpo +CDC_ADAPTER=cpo +``` +Next will be supported for Kafka +## Disabled CDC +If you don't want use CDC you just need let `CDC_ADAPTER` empty on `config.cpo` +## Custome Event Handler +By default EventHandler provide by `./internal/event/dbevent_handler.go` but you can use custom event handler as below: +``` +const event = require('./dbeventHandlerExample') +const db = new SawitDB(dbPath,{dbevent:new event()}); +``` +Full example can view on `./examples/dbevent/main.go`. +To see output CDC on `./examples/logs/sawit.cpo` + diff --git a/docs/config cpo.md b/docs/config cpo.md new file mode 100644 index 0000000..1621852 --- /dev/null +++ b/docs/config cpo.md @@ -0,0 +1,11 @@ +**SawitDB** used file `.cpo` as config file, eg: `config.cpo` to use it similar to `.env` file, here you can store all configuration variable for entire database. + +## Config +To load configuration it is provided in `internal/config/config.go` below is an example: +``` + if err := config.LoadEnv("config.cpo"); err != nil { + log.Fatal(err) + } +``` +## CPO vs ENV +you can use `.env` in service level such as to config the service like port, endpoint, IP etc. `.cpo` should be used for confi in database engine level like how to enabled/disabled feature, set path file storage, security, etc. `.cpo` similar to `.cfg` in Mysql \ No newline at end of file diff --git a/example.sawit b/example.sawit new file mode 100644 index 0000000000000000000000000000000000000000..2f20261230b08dde8190aaa2fa9eb4da2af33d1a GIT binary patch literal 8192 zcmeIup$dR707cR1r#y@cDp;270};%Mh6N4clI-72g1_LnFW>Uc len(results) { endIndex = len(results) } + if sendEvent { + db.event.OnTableSelected(table, results[startIndex:endIndex], db.query) + } return results[startIndex:endIndex], nil } @@ -587,7 +609,11 @@ func (db *SawitDB) scanTable(entry *TableEntry, criteria *parser.Criteria) ([]ma return results, nil } -func (db *SawitDB) delete(table string, criteria *parser.Criteria) (string, error) { +/** +* this function is called recrusively +* so I add sendEvent as param in order to inform the function should trigger event or not + */ +func (db *SawitDB) delete(table string, criteria *parser.Criteria, sendEvent bool) (string, error) { entry, err := db.findTableEntry(table) if err != nil { return "", err @@ -595,6 +621,11 @@ func (db *SawitDB) delete(table string, criteria *parser.Criteria) (string, erro if entry == nil { return "", fmt.Errorf("Kebun '%s' tidak ditemukan.", table) } + var dataDeleted []map[string]interface{} + if sendEvent { + recordDeleted, _ := db._select(table, criteria, nil, nil, nil, false) + dataDeleted = recordDeleted + } currentPageId := entry.StartPage deletedCount := 0 @@ -656,12 +687,14 @@ func (db *SawitDB) delete(table string, criteria *parser.Criteria) (string, erro } currentPageId = binary.LittleEndian.Uint32(pData[0:]) } - + if sendEvent { + db.event.OnTableDeleted(table, dataDeleted, db.query) + } return fmt.Sprintf("Berhasil menggusur %d bibit.", deletedCount), nil } func (db *SawitDB) update(table string, updates map[string]interface{}, criteria *parser.Criteria) (string, error) { - records, err := db._select(table, criteria, nil, nil, nil) + records, err := db._select(table, criteria, nil, nil, nil, false) // dont send event if err != nil { return "", err } @@ -670,16 +703,17 @@ func (db *SawitDB) update(table string, updates map[string]interface{}, criteria } // Inefficient: Delete then Insert - db.delete(table, criteria) + db.delete(table, criteria, false) // dont sent event count := 0 for _, rec := range records { for k, v := range updates { rec[k] = v } - db.insert(table, rec) + db.insert(table, rec, false) // don't send event count++ } + db.event.OnTableUpdated(table, records, db.query) return fmt.Sprintf("Berhasil memupuk %d bibit.", count), nil } @@ -702,7 +736,7 @@ func (db *SawitDB) createIndex(table string, field string) (string, error) { index.KeyField = field // Build - records, _ := db._select(table, nil, nil, nil, nil) // All + records, _ := db._select(table, nil, nil, nil, nil, false) // All for _, rec := range records { if val, ok := rec[field]; ok { index.Insert(val, rec) @@ -734,7 +768,7 @@ func (db *SawitDB) showIndexes(table string) (interface{}, error) { } func (db *SawitDB) aggregate(table string, fn string, field string, criteria *parser.Criteria, groupBy string) (interface{}, error) { - records, err := db._select(table, criteria, nil, nil, nil) + records, err := db._select(table, criteria, nil, nil, nil, false) if err != nil { return nil, err } diff --git a/internal/entity/engine_option.go b/internal/entity/engine_option.go new file mode 100644 index 0000000..399aa19 --- /dev/null +++ b/internal/entity/engine_option.go @@ -0,0 +1,5 @@ +package entity + +type EngineOption struct { + IsEvent bool +} diff --git a/internal/event/dbevent_handler.go b/internal/event/dbevent_handler.go new file mode 100644 index 0000000..7e0ef93 --- /dev/null +++ b/internal/event/dbevent_handler.go @@ -0,0 +1,87 @@ +package event + +import ( + "os" + "sync" + + "github.com/WowoEngine/SawitDB-Go/internal/utils" +) + +type DBEventHandler struct { + log *utils.Bulog + file *os.File + mu sync.Mutex +} + +func NewDBEventHandler(path string, bulog *utils.Bulog) (*DBEventHandler, error) { + if path != "" { + file, err := os.OpenFile( + path, + os.O_APPEND|os.O_CREATE|os.O_WRONLY, + 0644, + ) + if err != nil { + return nil, err + } + + return &DBEventHandler{ + log: bulog, + file: file, + }, err + } + + return &DBEventHandler{ + log: bulog, + file: nil, + }, nil +} + +func (w *DBEventHandler) write(aql string) { + w.mu.Lock() + defer w.mu.Unlock() + + line := aql + "\n" + + w.file.WriteString(line) +} + +func (w *DBEventHandler) Close() error { + return w.file.Close() +} +func (e *DBEventHandler) OnTableCreated(name string, index int, aql string) { + e.log.Kabarin(map[string]interface{}{"OnTableCreated": name, "index": index, "aql": aql}) + if e.file != nil { + e.write(aql) + } + +} +func (e *DBEventHandler) OnTableDropped(name string, index int, aql string) { + e.log.Kabarin(map[string]interface{}{"OnTableDropped": name, "index": index, "aql": aql}) + if e.file != nil { + e.write(aql) + } +} +func (e *DBEventHandler) OnTableSelected(name string, data []map[string]interface{}, aql string) { + e.log.Kabarin(map[string]interface{}{"OnTableSelected": name, "data": data, "aql": aql}) + if e.file != nil { + e.write(aql) + } +} +func (e *DBEventHandler) OnTableInserted(name string, data map[string]interface{}, aql string) { + e.log.Kabarin(map[string]interface{}{"OnTableInserted": name, "data": data, "aql": aql}) + if e.file != nil { + e.write(aql) + } +} +func (e *DBEventHandler) OnTableDeleted(name string, data []map[string]interface{}, aql string) { + e.log.Kabarin(map[string]interface{}{"OnTableDeleted": name, "data": data, "aql": aql}) + if e.file != nil { + e.write(aql) + } +} +func (e *DBEventHandler) OnTableUpdated(name string, data []map[string]interface{}, aql string) { + e.log.Kabarin(map[string]interface{}{"OnTableUpdated": name, "data": data, "aql": aql}) + if e.file != nil { + e.write(aql) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index b8cd514..1cd9be5 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -13,6 +13,7 @@ import ( "time" "github.com/WowoEngine/SawitDB-Go/internal/engine" + "github.com/WowoEngine/SawitDB-Go/internal/entity" ) type Config struct { @@ -23,6 +24,8 @@ type Config struct { QueryTimeout time.Duration LogLevel string Auth map[string]string + Event engine.DBEvent + Options entity.EngineOption } type SawitServer struct { @@ -223,7 +226,7 @@ func (s *SawitServer) getOrCreateDatabase(name string) (*engine.SawitDB, error) } dbPath := filepath.Join(s.Config.DataDir, name+".sawit") - db, err := engine.NewSawitDB(dbPath) + db, err := engine.NewSawitDB(dbPath, s.Config.Event, &s.Config.Options) if err != nil { return nil, err } diff --git a/internal/utils/bulog.go b/internal/utils/bulog.go new file mode 100644 index 0000000..0535305 --- /dev/null +++ b/internal/utils/bulog.go @@ -0,0 +1,86 @@ +package utils + +import ( + "encoding/json" + "os" + "sync" + "time" + + "github.com/WowoEngine/SawitDB-Go/internal/constant" +) + +type LogEntry struct { + Timestamp string `json:"timestamp"` + Level constant.LogLevel `json:"level"` + Message interface{} `json:"message"` +} + +type Bulog struct { + file *os.File + mu sync.Mutex + isWriteToFile bool +} + +// NewLogger membuat logger JSON +func NewBulog(path string) (*Bulog, error) { + + filePath := path + write := false + if filePath != "" { + file, err := os.OpenFile( + filePath, + os.O_APPEND|os.O_CREATE|os.O_WRONLY, + 0666, + ) + if err == nil { + write = true + } + return &Bulog{ + file: file, + isWriteToFile: write, + }, nil + } + + return &Bulog{ + file: nil, + isWriteToFile: write, + }, nil +} + +func (l *Bulog) log(level constant.LogLevel, message interface{}) { + l.mu.Lock() + defer l.mu.Unlock() + + entry := LogEntry{ + Level: level, + Message: message, + Timestamp: time.Now().Format(time.RFC3339), + } + + jsonData, _ := json.Marshal(entry) + + // write to file + if l.isWriteToFile { + l.file.Write(jsonData) + l.file.Write([]byte("\n")) + } + + os.Stdout.Write(jsonData) + os.Stdout.Write([]byte("\n")) +} + +func (l *Bulog) Kabarin(msg interface{}) { + l.log(constant.LOG_INFO, msg) +} + +func (l *Bulog) Awas(msg interface{}) { + l.log(constant.LOG_WARN, msg) +} + +func (l *Bulog) Kacau(msg interface{}) { + l.log(constant.LOG_ERROR, msg) +} + +func (l *Bulog) Dolog(msg interface{}) { + l.log(constant.LOG_DEBUG, msg) +}