Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,57 @@ and in particular `insert` and `update` data.
- delete
- truncate

### Data Transformation

WAL-Listener supports JavaScript-based data transformation, allowing you to modify event data before publishing to the message broker. This feature enables you to:

- Transform data formats
- Filter sensitive fields
- Add computed fields
- Apply business logic to events

#### Configuration Example

```yaml
listener:
transformations:
users:
type: "js"
script: |
function transform(data, oldData, action) {
// Transform user data
return {
user_id: data.id,
email: data.email,
action_type: action,
transformed_at: new Date().toISOString()
};
}
orders:
type: "js"
script: |
function transform(data, oldData, action) {
// Add order summary
data.total_items = data.items ? data.items.length : 0;
data.processed = true;
return data;
}
```

#### Transformation Function

Your JavaScript transformation function receives three parameters:

- `data` - Current row data (for insert/update operations)
- `oldData` - Previous row data (for update/delete operations)
- `action` - The type of operation ("insert", "update", "delete", "truncate")

The function must return an object that will be used as the transformed event data.

#### Thread Safety

The JavaScript transformer uses a thread-safe pool pattern to handle concurrent transformations efficiently. Each transformation runs in an isolated JavaScript runtime environment.

### Topic mapping
By default, the output NATS topic name consists of prefix, DB schema, and DB table name,
but if you want to send all updates in one topic, you should be configured the topic map:
Expand Down Expand Up @@ -114,6 +165,21 @@ listener:
- update
topicsMap:
schema_table_name: "notifier"
transformations:
users:
type: "js"
script: |
function transform(data, oldData, action) {
// Remove sensitive data
delete data.password_hash;
delete data.ssn;

// Add metadata
data.processed_at = new Date().toISOString();
data.event_type = action;

return data;
}
logger:
level: info
fmt: json
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ require (
github.com/cockroachdb/apd v1.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dlclark/regexp2 v1.11.4 // indirect
github.com/dop251/goja v0.0.0-20250630131328-58d95d85e994 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
Expand All @@ -42,9 +44,11 @@ require (
github.com/getsentry/sentry-go v0.29.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.11.4 h1:rPYF9/LECdNymJufQKmri9gV604RvvABwgOA8un7yAo=
github.com/dlclark/regexp2 v1.11.4/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/dop251/goja v0.0.0-20250630131328-58d95d85e994 h1:aQYWswi+hRL2zJqGacdCZx32XjKYV8ApXFGntw79XAM=
github.com/dop251/goja v0.0.0-20250630131328-58d95d85e994/go.mod h1:MxLav0peU43GgvwVgNbLAj1s/bSGboKkhuULvq/7hx4=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
Expand Down Expand Up @@ -72,6 +76,8 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
Expand Down Expand Up @@ -107,6 +113,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 h1:y3N7Bm7Y9/CtpiVkw/ZWj6lSlDF3F74SfKwfTCer72Q=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down
12 changes: 12 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ListenerCfg struct {
HeartbeatInterval time.Duration `valid:"required"`
Filter FilterStruct
TopicsMap map[string]string
Transformations map[string]Transformation
}

// PublisherCfg represent configuration for any publisher types.
Expand Down Expand Up @@ -72,6 +73,17 @@ type FilterStruct struct {
Tables map[string][]string
}

type TransformationEngineType string

const (
TransformationEngineTypeJS TransformationEngineType = "js"
)

type Transformation struct {
Type TransformationEngineType
Script string
}

// Validate config data.
func (c Config) Validate() error {
_, err := govalidator.ValidateStruct(c)
Expand Down
19 changes: 15 additions & 4 deletions internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@ type parser interface {
}

type replication interface {
CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error)
CreateReplicationSlotEx(
slotName, outputPlugin string,
) (consistentPoint string, snapshotName string, err error)
DropReplicationSlot(slotName string) (err error)
StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error)
StartReplication(
slotName string,
startLsn uint64,
timeline int64,
pluginArguments ...string,
) (err error)
WaitForReplicationMessage(ctx context.Context) (*pgx.ReplicationMessage, error)
SendStandbyStatus(k *pgx.StandbyStatus) (err error)
IsAlive() bool
Expand Down Expand Up @@ -345,7 +352,11 @@ func (l *Listener) Stream(ctx context.Context) error {
}
}

func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessage, txWAL *tx.WAL) error {
func (l *Listener) processMessage(
ctx context.Context,
msg *pgx.ReplicationMessage,
txWAL *tx.WAL,
) error {
if msg.WalMessage == nil {
l.log.Debug("empty wal-message")
return nil
Expand All @@ -359,7 +370,7 @@ func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessa
}

if txWAL.CommitTime != nil {
for event := range txWAL.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables) {
for event := range txWAL.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables, l.cfg.Listener.Transformations) {
subjectName := event.SubjectName(l.cfg)

if err := l.publisher.Publish(ctx, subjectName, event); err != nil {
Expand Down
24 changes: 19 additions & 5 deletions internal/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,11 @@ func TestListener_SendStandbyStatus(t *testing.T) {
},
},
wantErr: func(t require.TestingT, err error, i ...any) {
require.ErrorContains(t, err, "unable to send StandbyStatus object: replication err")
require.ErrorContains(
t,
err,
"unable to send StandbyStatus object: replication err",
)
},
},
}
Expand Down Expand Up @@ -578,7 +582,11 @@ func TestListener_AckWalMessage(t *testing.T) {
LSN: 24658872,
},
wantErr: func(t require.TestingT, err error, i ...any) {
require.ErrorContains(t, err, "send status: unable to send StandbyStatus object: some err")
require.ErrorContains(
t,
err,
"send status: unable to send StandbyStatus object: some err",
)
},
},
}
Expand Down Expand Up @@ -841,7 +849,9 @@ func TestListener_Process(t *testing.T) {
},
repo: func(t *testing.T) repository {
r := newMockrepository(t)
r.On("CreatePublication", mock.Anything, "wal-listener").Return(errors.New("some err")).Once()
r.On("CreatePublication", mock.Anything, "wal-listener").
Return(errors.New("some err")).
Once()
r.On("GetSlotLSN", mock.Anything, "slot1").Return("100/200", nil).Once()
r.On("IsReplicationActive", mock.Anything, "slot1").Return(false, nil).Once()
r.On("NewStandbyStatus", []uint64{1099511628288}).Return(
Expand Down Expand Up @@ -894,7 +904,9 @@ func TestListener_Process(t *testing.T) {
repo: func(t *testing.T) repository {
r := newMockrepository(t)
r.On("CreatePublication", mock.Anything, "wal-listener").Return(nil).Once()
r.On("GetSlotLSN", mock.Anything, "slot1").Return("", errors.New("some err")).Once()
r.On("GetSlotLSN", mock.Anything, "slot1").
Return("", errors.New("some err")).
Once()
return r
},
},
Expand Down Expand Up @@ -936,7 +948,9 @@ func TestListener_Process(t *testing.T) {
[]string{"proto_version '1'", "publication_names 'wal-listener'"},
).Return(nil).Once()
r.On("WaitForReplicationMessage", mock.Anything).Return(nil, nil)
r.On("CreateReplicationSlotEx", "slot1", "pgoutput").Return("100/200", "", nil).Once()
r.On("CreateReplicationSlotEx", "slot1", "pgoutput").
Return("100/200", "", nil).
Once()
r.On("Close").Return(nil).Once()

return r
Expand Down
21 changes: 20 additions & 1 deletion internal/listener/transaction/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) {
BeginTime: &postgresEpoch,
RelationStore: make(map[int32]RelationData),
Actions: make([]ActionData, 0),
transformers: nil,
},
wantErr: false,
},
Expand Down Expand Up @@ -529,6 +530,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) {
BeginTime: &postgresEpoch,
CommitTime: &postgresEpoch,
RelationStore: make(map[int32]RelationData),
transformers: nil,
},
wantErr: false,
},
Expand Down Expand Up @@ -587,6 +589,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) {
},
},
},
transformers: nil,
},
wantErr: false,
},
Expand Down Expand Up @@ -673,6 +676,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) {
OldColumns: []Column{},
},
},
transformers: nil,
},
wantErr: false,
},
Expand Down Expand Up @@ -774,6 +778,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) {
},
},
},
transformers: nil,
},
wantErr: false,
},
Expand Down Expand Up @@ -857,6 +862,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) {
},
},
},
transformers: nil,
},
wantErr: false,
},
Expand Down Expand Up @@ -926,6 +932,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) {
OldColumns: []Column{},
},
},
transformers: nil,
},
wantErr: false,
},
Expand Down Expand Up @@ -985,6 +992,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) {
},
},
},
transformers: nil,
},
wantErr: false,
},
Expand All @@ -1001,7 +1009,18 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) {
t.Errorf("ParseWalMessage() error = %v, wantErr %v", err, tt.wantErr)
}

assert.Equal(t, tt.want, tt.args.tx)
// Compare all fields except transformers since instances are different
if tt.want != nil {
assert.Equal(t, tt.want.log, tt.args.tx.log)
assert.Equal(t, tt.want.monitor, tt.args.tx.monitor)
assert.Equal(t, tt.want.LSN, tt.args.tx.LSN)
assert.Equal(t, tt.want.BeginTime, tt.args.tx.BeginTime)
assert.Equal(t, tt.want.CommitTime, tt.args.tx.CommitTime)
assert.Equal(t, tt.want.RelationStore, tt.args.tx.RelationStore)
assert.Equal(t, tt.want.Actions, tt.args.tx.Actions)
assert.Equal(t, tt.want.pool, tt.args.tx.pool)
// Skip transformers comparison since instances are different
}
})
}
}
Loading