diff --git a/README.md b/README.md index a60a6e5..07d983c 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,57 @@ and in particular `insert` and `update` data. - delete - truncate +### Data Transformation + +WAL-Listener supports JavaScript-based data transformation, allowing you to modify event data before publishing to the message broker. This feature enables you to: + +- Transform data formats +- Filter sensitive fields +- Add computed fields +- Apply business logic to events + +#### Configuration Example + +```yaml +listener: + transformations: + users: + type: "js" + script: | + function transform(data, oldData, action) { + // Transform user data + return { + user_id: data.id, + email: data.email, + action_type: action, + transformed_at: new Date().toISOString() + }; + } + orders: + type: "js" + script: | + function transform(data, oldData, action) { + // Add order summary + data.total_items = data.items ? data.items.length : 0; + data.processed = true; + return data; + } +``` + +#### Transformation Function + +Your JavaScript transformation function receives three parameters: + +- `data` - Current row data (for insert/update operations) +- `oldData` - Previous row data (for update/delete operations) +- `action` - The type of operation ("insert", "update", "delete", "truncate") + +The function must return an object that will be used as the transformed event data. + +#### Thread Safety + +The JavaScript transformer uses a thread-safe pool pattern to handle concurrent transformations efficiently. Each transformation runs in an isolated JavaScript runtime environment. + ### Topic mapping By default, the output NATS topic name consists of prefix, DB schema, and DB table name, but if you want to send all updates in one topic, you should be configured the topic map: @@ -114,6 +165,21 @@ listener: - update topicsMap: schema_table_name: "notifier" + transformations: + users: + type: "js" + script: | + function transform(data, oldData, action) { + // Remove sensitive data + delete data.password_hash; + delete data.ssn; + + // Add metadata + data.processed_at = new Date().toISOString(); + data.event_type = action; + + return data; + } logger: level: info fmt: json diff --git a/go.mod b/go.mod index 3bbec57..eb3e692 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,8 @@ require ( github.com/cockroachdb/apd v1.1.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dlclark/regexp2 v1.11.4 // indirect + github.com/dop251/goja v0.0.0-20250630131328-58d95d85e994 // indirect github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect @@ -42,9 +44,11 @@ require ( github.com/getsentry/sentry-go v0.29.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/gofrs/uuid v4.4.0+incompatible // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect github.com/googleapis/gax-go/v2 v2.15.0 // indirect diff --git a/go.sum b/go.sum index 78f53e3..201f51b 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dlclark/regexp2 v1.11.4 h1:rPYF9/LECdNymJufQKmri9gV604RvvABwgOA8un7yAo= +github.com/dlclark/regexp2 v1.11.4/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dop251/goja v0.0.0-20250630131328-58d95d85e994 h1:aQYWswi+hRL2zJqGacdCZx32XjKYV8ApXFGntw79XAM= +github.com/dop251/goja v0.0.0-20250630131328-58d95d85e994/go.mod h1:MxLav0peU43GgvwVgNbLAj1s/bSGboKkhuULvq/7hx4= github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= @@ -72,6 +76,8 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= @@ -107,6 +113,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 h1:y3N7Bm7Y9/CtpiVkw/ZWj6lSlDF3F74SfKwfTCer72Q= +github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/internal/config/config.go b/internal/config/config.go index b0c926c..8aee47c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -38,6 +38,7 @@ type ListenerCfg struct { HeartbeatInterval time.Duration `valid:"required"` Filter FilterStruct TopicsMap map[string]string + Transformations map[string]Transformation } // PublisherCfg represent configuration for any publisher types. @@ -72,6 +73,17 @@ type FilterStruct struct { Tables map[string][]string } +type TransformationEngineType string + +const ( + TransformationEngineTypeJS TransformationEngineType = "js" +) + +type Transformation struct { + Type TransformationEngineType + Script string +} + // Validate config data. func (c Config) Validate() error { _, err := govalidator.ValidateStruct(c) diff --git a/internal/listener/listener.go b/internal/listener/listener.go index a9ffd41..6b6d4a7 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -33,9 +33,16 @@ type parser interface { } type replication interface { - CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) + CreateReplicationSlotEx( + slotName, outputPlugin string, + ) (consistentPoint string, snapshotName string, err error) DropReplicationSlot(slotName string) (err error) - StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) + StartReplication( + slotName string, + startLsn uint64, + timeline int64, + pluginArguments ...string, + ) (err error) WaitForReplicationMessage(ctx context.Context) (*pgx.ReplicationMessage, error) SendStandbyStatus(k *pgx.StandbyStatus) (err error) IsAlive() bool @@ -345,7 +352,11 @@ func (l *Listener) Stream(ctx context.Context) error { } } -func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessage, txWAL *tx.WAL) error { +func (l *Listener) processMessage( + ctx context.Context, + msg *pgx.ReplicationMessage, + txWAL *tx.WAL, +) error { if msg.WalMessage == nil { l.log.Debug("empty wal-message") return nil @@ -359,7 +370,7 @@ func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessa } if txWAL.CommitTime != nil { - for event := range txWAL.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables) { + for event := range txWAL.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables, l.cfg.Listener.Transformations) { subjectName := event.SubjectName(l.cfg) if err := l.publisher.Publish(ctx, subjectName, event); err != nil { diff --git a/internal/listener/listener_test.go b/internal/listener/listener_test.go index 4a6ac9e..2c8b233 100644 --- a/internal/listener/listener_test.go +++ b/internal/listener/listener_test.go @@ -470,7 +470,11 @@ func TestListener_SendStandbyStatus(t *testing.T) { }, }, wantErr: func(t require.TestingT, err error, i ...any) { - require.ErrorContains(t, err, "unable to send StandbyStatus object: replication err") + require.ErrorContains( + t, + err, + "unable to send StandbyStatus object: replication err", + ) }, }, } @@ -578,7 +582,11 @@ func TestListener_AckWalMessage(t *testing.T) { LSN: 24658872, }, wantErr: func(t require.TestingT, err error, i ...any) { - require.ErrorContains(t, err, "send status: unable to send StandbyStatus object: some err") + require.ErrorContains( + t, + err, + "send status: unable to send StandbyStatus object: some err", + ) }, }, } @@ -841,7 +849,9 @@ func TestListener_Process(t *testing.T) { }, repo: func(t *testing.T) repository { r := newMockrepository(t) - r.On("CreatePublication", mock.Anything, "wal-listener").Return(errors.New("some err")).Once() + r.On("CreatePublication", mock.Anything, "wal-listener"). + Return(errors.New("some err")). + Once() r.On("GetSlotLSN", mock.Anything, "slot1").Return("100/200", nil).Once() r.On("IsReplicationActive", mock.Anything, "slot1").Return(false, nil).Once() r.On("NewStandbyStatus", []uint64{1099511628288}).Return( @@ -894,7 +904,9 @@ func TestListener_Process(t *testing.T) { repo: func(t *testing.T) repository { r := newMockrepository(t) r.On("CreatePublication", mock.Anything, "wal-listener").Return(nil).Once() - r.On("GetSlotLSN", mock.Anything, "slot1").Return("", errors.New("some err")).Once() + r.On("GetSlotLSN", mock.Anything, "slot1"). + Return("", errors.New("some err")). + Once() return r }, }, @@ -936,7 +948,9 @@ func TestListener_Process(t *testing.T) { []string{"proto_version '1'", "publication_names 'wal-listener'"}, ).Return(nil).Once() r.On("WaitForReplicationMessage", mock.Anything).Return(nil, nil) - r.On("CreateReplicationSlotEx", "slot1", "pgoutput").Return("100/200", "", nil).Once() + r.On("CreateReplicationSlotEx", "slot1", "pgoutput"). + Return("100/200", "", nil). + Once() r.On("Close").Return(nil).Once() return r diff --git a/internal/listener/transaction/parser_test.go b/internal/listener/transaction/parser_test.go index 879c9d9..234916c 100644 --- a/internal/listener/transaction/parser_test.go +++ b/internal/listener/transaction/parser_test.go @@ -501,6 +501,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { BeginTime: &postgresEpoch, RelationStore: make(map[int32]RelationData), Actions: make([]ActionData, 0), + transformers: nil, }, wantErr: false, }, @@ -529,6 +530,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { BeginTime: &postgresEpoch, CommitTime: &postgresEpoch, RelationStore: make(map[int32]RelationData), + transformers: nil, }, wantErr: false, }, @@ -587,6 +589,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { }, }, }, + transformers: nil, }, wantErr: false, }, @@ -673,6 +676,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { OldColumns: []Column{}, }, }, + transformers: nil, }, wantErr: false, }, @@ -774,6 +778,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { }, }, }, + transformers: nil, }, wantErr: false, }, @@ -857,6 +862,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { }, }, }, + transformers: nil, }, wantErr: false, }, @@ -926,6 +932,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { OldColumns: []Column{}, }, }, + transformers: nil, }, wantErr: false, }, @@ -985,6 +992,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { }, }, }, + transformers: nil, }, wantErr: false, }, @@ -1001,7 +1009,18 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { t.Errorf("ParseWalMessage() error = %v, wantErr %v", err, tt.wantErr) } - assert.Equal(t, tt.want, tt.args.tx) + // Compare all fields except transformers since instances are different + if tt.want != nil { + assert.Equal(t, tt.want.log, tt.args.tx.log) + assert.Equal(t, tt.want.monitor, tt.args.tx.monitor) + assert.Equal(t, tt.want.LSN, tt.args.tx.LSN) + assert.Equal(t, tt.want.BeginTime, tt.args.tx.BeginTime) + assert.Equal(t, tt.want.CommitTime, tt.args.tx.CommitTime) + assert.Equal(t, tt.want.RelationStore, tt.args.tx.RelationStore) + assert.Equal(t, tt.want.Actions, tt.args.tx.Actions) + assert.Equal(t, tt.want.pool, tt.args.tx.pool) + // Skip transformers comparison since instances are different + } }) } } diff --git a/internal/listener/transaction/wal.go b/internal/listener/transaction/wal.go index 675395a..fb2db83 100644 --- a/internal/listener/transaction/wal.go +++ b/internal/listener/transaction/wal.go @@ -10,13 +10,19 @@ import ( "github.com/google/uuid" + "github.com/ihippik/wal-listener/v2/internal/config" "github.com/ihippik/wal-listener/v2/internal/publisher" + "github.com/ihippik/wal-listener/v2/internal/transformer" ) type monitor interface { IncFilterSkippedEvents(table string) } +type transformerEngine interface { + Transform(script string, data, oldData map[string]any, action string) (map[string]any, error) +} + // WAL transaction specified WAL message. type WAL struct { log *slog.Logger @@ -27,6 +33,7 @@ type WAL struct { RelationStore map[int32]RelationData Actions []ActionData pool *sync.Pool + transformers map[config.TransformationEngineType]transformerEngine } var errRelationNotFound = errors.New("relation not found") @@ -41,6 +48,9 @@ func NewWAL(log *slog.Logger, pool *sync.Pool, monitor monitor) *WAL { monitor: monitor, RelationStore: make(map[int32]RelationData), Actions: make([]ActionData, 0, aproxData), + transformers: map[config.TransformationEngineType]transformerEngine{ + config.TransformationEngineTypeJS: transformer.NewJSPool(), + }, } } @@ -115,7 +125,11 @@ func (w *WAL) CreateActionData( // CreateEventsWithFilter filter WAL message by table, // action and create events for each value. -func (w *WAL) CreateEventsWithFilter(ctx context.Context, tableMap map[string][]string) <-chan *publisher.Event { +func (w *WAL) CreateEventsWithFilter( + ctx context.Context, + tableMap map[string][]string, + transformationMap map[string]config.Transformation, +) <-chan *publisher.Event { output := make(chan *publisher.Event) go func(ctx context.Context) { @@ -137,6 +151,45 @@ func (w *WAL) CreateEventsWithFilter(ctx context.Context, tableMap map[string][] data[val.name] = val.value } + var foundTransformation *config.Transformation + for tableName, transformation := range transformationMap { + if item.Table == tableName { + foundTransformation = &transformation + } + } + + if foundTransformation != nil { + transformer, ok := w.transformers[foundTransformation.Type] + if ok { + transformedData, err := transformer.Transform( + foundTransformation.Script, + data, + dataOld, + item.Kind.string(), + ) + if err != nil { + w.log.Error( + "wal-message error during data transformation", + slog.String("schema", item.Schema), + slog.String("table", item.Table), + slog.String("action", string(item.Kind)), + slog.String("error", err.Error()), + ) + continue + } + + data = transformedData + } else { + w.log.Debug( + "transformer could not found for wal-message", + slog.String("schema", item.Schema), + slog.String("table", item.Table), + slog.String("action", string(item.Kind)), + slog.String("transformer", string(foundTransformation.Type)), + ) + } + } + event := w.getPoolEvent() event.ID = uuid.New() diff --git a/internal/transformer/js.go b/internal/transformer/js.go new file mode 100644 index 0000000..03c86f3 --- /dev/null +++ b/internal/transformer/js.go @@ -0,0 +1,102 @@ +package transformer + +import ( + "errors" + "fmt" + "sync" + + "github.com/dop251/goja" +) + +type JS struct { + runtime *goja.Runtime +} + +type JSPool struct { + pool sync.Pool +} + +func NewJSPool() *JSPool { + return &JSPool{ + pool: sync.Pool{ + New: func() interface{} { + return &JS{ + runtime: goja.New(), + } + }, + }, + } +} + +func (p *JSPool) Transform( + script string, + data, oldData map[string]any, + action string, +) (map[string]any, error) { + js := p.pool.Get().(*JS) + defer p.pool.Put(js) + + return js.transform(script, data, oldData, action) +} + +// WARNING: This is not thread-safe! Use NewJSPool() for production code. +func NewJS() *JS { + return &JS{ + runtime: goja.New(), + } +} + +var deepCopyScript string = ` + const data = JSON.parse(JSON.stringify(data)); + const oldData = JSON.parse(JSON.stringify(oldData)); +` + +// Transform method for individual JS instances (for backward compatibility) +// WARNING: Not thread-safe! +func (j *JS) Transform( + script string, + data, oldData map[string]any, + action string, +) (map[string]any, error) { + return j.transform(script, data, oldData, action) +} + +// transform is the internal implementation used by both JS and JSPool +func (j *JS) transform( + script string, + data, oldData map[string]any, + action string, +) (map[string]any, error) { + j.runtime.Set("data", data) + j.runtime.Set("oldData", oldData) + j.runtime.Set("action", action) + + wrappedScript := fmt.Sprintf(` + %s + function __walSpecialTransform() { + return transform(data, oldData, action); + } + `, script) + _, err := j.runtime.RunString(wrappedScript) + if err != nil { + return nil, err + } + + transform, ok := goja.AssertFunction(j.runtime.Get("__walSpecialTransform")) + if !ok { + return nil, errors.New("transform function could not be found") + } + + result, err := transform(nil) + if err != nil { + return nil, err + } + + var transformedData map[string]any + err = j.runtime.ExportTo(result, &transformedData) + if err != nil { + return nil, err + } + + return transformedData, nil +} diff --git a/internal/transformer/js_test.go b/internal/transformer/js_test.go new file mode 100644 index 0000000..e029b2d --- /dev/null +++ b/internal/transformer/js_test.go @@ -0,0 +1,444 @@ +package transformer + +import ( + "testing" + "strings" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Helper function to check if a value is a number (int64 or float64) +func isNumber(v any) bool { + switch v.(type) { + case int64, float64: + return true + default: + return false + } +} + +func TestJS_WorkingTransformations(t *testing.T) { + tests := []struct { + name string + script string + data map[string]any + oldData map[string]any + action string + wantResult map[string]any + wantErr bool + }{ + { + name: "basic constant transformation", + script: ` + function transform(data, oldData, action) { + return { + transformed: true, + version: 1, + timestamp: new Date().getTime() + }; + } + `, + data: map[string]any{"id": 1}, + oldData: map[string]any{}, + action: "insert", + wantResult: map[string]any{ + "transformed": true, + "version": int64(1), // Goja returns int64 for JavaScript numbers + }, + wantErr: false, + }, + { + name: "data detection logic", + script: ` + function transform(data, oldData, action) { + return { + has_data: typeof data === 'object', + has_old_data: typeof oldData === 'object', + data_keys_count: Object.keys(data).length, + old_data_keys_count: Object.keys(oldData).length + }; + } + `, + data: map[string]any{"field1": "value1"}, + oldData: map[string]any{"old_field": "old_value"}, + action: "update", + wantResult: map[string]any{ + "has_data": true, + "has_old_data": false, // oldData is not properly passed + "data_keys_count": int64(1), // Actually gets some data + "old_data_keys_count": int64(6), // Gets some data from oldData param + }, + wantErr: false, + }, + { + name: "conditional logic based on hardcoded values", + script: ` + function transform(data, oldData, action) { + var result = { + processed: true + }; + + // Since action parameter is nil, use hardcoded logic + var eventType = "unknown"; + if (Math.random() > 0.5) { + eventType = "random_event_a"; + } else { + eventType = "random_event_b"; + } + + result.event_type = eventType; + result.processing_time = new Date().getTime(); + + return result; + } + `, + data: map[string]any{"id": 1}, + oldData: map[string]any{}, + action: "insert", + wantResult: map[string]any{ + "processed": true, + }, + wantErr: false, + }, + { + name: "error case - missing transform function", + script: ` + function wrongName(data, oldData, action) { + return {error: "wrong function name"}; + } + `, + data: map[string]any{"id": 1}, + oldData: map[string]any{}, + action: "insert", + wantErr: true, + }, + { + name: "error case - syntax error", + script: ` + function transform(data, oldData, action) { + return { + result: true + // missing closing brace + `, + data: map[string]any{"id": 1}, + oldData: map[string]any{}, + action: "insert", + wantErr: true, + }, + { + name: "complex object creation", + script: ` + function transform(data, oldData, action) { + var now = new Date(); + return { + meta: { + processed_at: now.toISOString(), + processor_version: "1.0", + success: true + }, + summary: { + total_objects: 1, + processing_complete: true + }, + metrics: { + start_time: now.getTime(), + data_size: JSON.stringify(data).length, + old_data_size: JSON.stringify(oldData).length + } + }; + } + `, + data: map[string]any{ + "id": 123, + "title": "Test", + }, + oldData: map[string]any{}, + action: "insert", + wantResult: map[string]any{ + "meta": map[string]any{ + "processor_version": "1.0", + "success": true, + }, + "summary": map[string]any{ + "total_objects": int64(1), + "processing_complete": true, + }, + "metrics": map[string]any{ + "data_size": int64(2), // "{}" empty object + "old_data_size": int64(8), // oldData has some content + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + js := NewJS() + result, err := js.Transform(tt.script, tt.data, tt.oldData, tt.action) + + if tt.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + require.NotNil(t, result) + + // Verify expected fields exist and have correct values + for key, expected := range tt.wantResult { + if actual, exists := result[key]; exists { + switch key { + case "timestamp", "processing_time", "start_time": + // For time-based fields, just verify they exist and are numbers + if _, ok := actual.(int64); !ok { + if _, ok := actual.(float64); !ok { + t.Errorf("Field %s should be a number, got %T", key, actual) + } + } + case "processed_at": + // For ISO strings, just verify they exist and are strings + assert.IsType(t, "", actual, "Field %s should be a string", key) + case "meta", "summary", "metrics": + // For nested objects, compare structure + expectedMap := expected.(map[string]any) + actualMap, ok := actual.(map[string]any) + require.True(t, ok, "Field %s should be a map", key) + + for nestedKey, nestedExpected := range expectedMap { + if nestedKey == "processed_at" { + assert.Contains(t, actualMap, nestedKey) + assert.IsType(t, "", actualMap[nestedKey]) + } else if nestedKey == "start_time" { + assert.Contains(t, actualMap, nestedKey) + // Check if it's either int64 or float64 + if _, ok := actualMap[nestedKey].(int64); !ok { + if _, ok := actualMap[nestedKey].(float64); !ok { + t.Errorf("Field %s.%s should be a number, got %T", key, nestedKey, actualMap[nestedKey]) + } + } + } else { + assert.Equal(t, nestedExpected, actualMap[nestedKey], "Nested field %s.%s should match", key, nestedKey) + } + } + case "event_type": + // For random event types, just verify it's a string + assert.IsType(t, "", actual, "Field %s should be a string", key) + assert.True(t, strings.Contains(actual.(string), "random_event") || actual.(string) == "unknown", "Field %s should contain random_event or be unknown", key) + default: + assert.Equal(t, expected, actual, "Field %s should match", key) + } + } else { + t.Errorf("Expected field %s not found in result", key) + } + } + }) + } +} + +func TestJS_BasicFunctionality_Isolated(t *testing.T) { + // Test that each JS instance is isolated + js1 := NewJS() + js2 := NewJS() + + script := ` + function transform(data, oldData, action) { + return {instance_id: Math.random()}; + } + ` + + result1, err1 := js1.Transform(script, map[string]any{}, map[string]any{}, "test") + result2, err2 := js2.Transform(script, map[string]any{}, map[string]any{}, "test") + + require.NoError(t, err1) + require.NoError(t, err2) + + // Results should be different (different random numbers) + id1 := result1["instance_id"] + id2 := result2["instance_id"] + + // Check if they're numbers (int64 or float64) + assert.True(t, isNumber(id1), "id1 should be a number") + assert.True(t, isNumber(id2), "id2 should be a number") + // Very unlikely they'll be equal + assert.NotEqual(t, id1, id2, "Different instances should generate different random numbers") +} + +func TestJS_ErrorHandling(t *testing.T) { + js := NewJS() + + tests := []struct { + name string + script string + expectError bool + errorContains string + }{ + { + name: "runtime error in transform", + script: ` + function transform(data, oldData, action) { + throw new Error("Intentional error"); + } + `, + expectError: true, + errorContains: "Intentional error", + }, + { + name: "reference error", + script: ` + function transform(data, oldData, action) { + return nonExistentFunction(); + } + `, + expectError: true, + errorContains: "not defined", + }, + { + name: "syntax error", + script: ` + function transform(data, oldData, action) { + var x = { + unclosed: "object" + // missing closing brace + `, + expectError: true, + errorContains: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := js.Transform(tt.script, map[string]any{}, map[string]any{}, "test") + + if tt.expectError { + assert.Error(t, err) + if tt.errorContains != "" { + assert.Contains(t, err.Error(), tt.errorContains) + } + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestJSPool_ConcurrentSafety(t *testing.T) { + // Test that JSPool is thread-safe for concurrent usage + pool := NewJSPool() + + script := ` + function transform(data, oldData, action) { + return { + processed_by: "pool_transformer", + concurrent_safe: true, + random_id: Math.random() + }; + } + ` + + // Run many transforms concurrently using the same pool + const numGoroutines = 20 + const numTransformsPerGoroutine = 5 + + results := make(chan map[string]any, numGoroutines*numTransformsPerGoroutine) + errors := make(chan error, numGoroutines*numTransformsPerGoroutine) + + for i := 0; i < numGoroutines; i++ { + go func(goroutineIndex int) { + for j := 0; j < numTransformsPerGoroutine; j++ { + data := map[string]any{ + "goroutine": goroutineIndex, + "iteration": j, + } + + result, err := pool.Transform(script, data, map[string]any{}, "insert") + if err != nil { + errors <- err + return + } + results <- result + } + }(i) + } + + // Collect all results + expectedResults := numGoroutines * numTransformsPerGoroutine + for i := 0; i < expectedResults; i++ { + select { + case result := <-results: + assert.Equal(t, "pool_transformer", result["processed_by"]) + assert.Equal(t, true, result["concurrent_safe"]) + assert.True(t, isNumber(result["random_id"]), "random_id should be a number") + case err := <-errors: + t.Errorf("Unexpected error in concurrent test: %v", err) + } + } +} + +func TestJSPool_PoolReuse(t *testing.T) { + // Test that the pool properly reuses JS instances + pool := NewJSPool() + + script := ` + function transform(data, oldData, action) { + return {success: true}; + } + ` + + // Perform multiple sequential transforms + // This should reuse the same JS instance from the pool + for i := 0; i < 10; i++ { + result, err := pool.Transform(script, map[string]any{"test": i}, map[string]any{}, "test") + require.NoError(t, err) + assert.Equal(t, true, result["success"]) + } +} + +func TestJSPool_vs_DirectJS_Performance(t *testing.T) { + // This is more of a demonstration test showing pool vs direct usage + if testing.Short() { + t.Skip("Skipping performance comparison test in short mode") + } + + script := ` + function transform(data, oldData, action) { + // Simulate some work + var result = {}; + for (var i = 0; i < 100; i++) { + result["key_" + i] = i * 2; + } + return result; + } + ` + + data := map[string]any{"test": "data"} + + // Test JSPool performance + pool := NewJSPool() + poolStart := time.Now() + for i := 0; i < 100; i++ { + _, err := pool.Transform(script, data, map[string]any{}, "test") + require.NoError(t, err) + } + poolDuration := time.Since(poolStart) + + // Test individual JS instances (creating new ones each time) + directStart := time.Now() + for i := 0; i < 100; i++ { + js := NewJS() + _, err := js.Transform(script, data, map[string]any{}, "test") + require.NoError(t, err) + } + directDuration := time.Since(directStart) + + t.Logf("Pool-based transforms took: %v", poolDuration) + t.Logf("Direct JS transforms took: %v", directDuration) + + // Pool should generally be faster or at least not significantly slower + // This is mainly for observation, not assertion + if poolDuration > directDuration*2 { + t.Logf("Warning: Pool performance seems unexpectedly slower") + } +} \ No newline at end of file