-
Notifications
You must be signed in to change notification settings - Fork 52
implement js transformer with thread-safe #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Hey Muhammed! Thanks for your contribution! Do you think it’s possible to use JS for the sake of flexibility? But do we need flexibility, because the number of cases is limited, and we can express them in pseudo-language in the YAML configuration? What do you think? I also wanted to do load testing to understand the costs. P.S.: At one time, I thought about Go native plugins for solving such problems. But your solution is really elegant! |
|
Hey, thank you for your attention. In first step I implemented same solution like below; transformations:
users:
fields:
email:
action: mask
pattern: "***@***.com"
created_at:
action: rename
to: "timestamp"
id:
action: removeI found the above solution restrictive. I am agree with you, this kind of request is not very common. I thought this could solve multiple issues, so I proceeded with current solution. But If it is not ok for you I can replace with the other one. I didn't apply any load test, sorry about it, give me some time, I will do it and share results. |
|
I don’t have much free time, but if you can run the load testing yourself, please do — I’m just curious. When I have time, I’ll review how it works and adjust your pull request to match the general style, then merge it. Thanks a lot! |
|
I'm very sorry, I couldn't find the time, I hope I can do it tonight. Thanks for your attention again 🙏 |
|
it's not urgent ;) |
I did this because in other implementation we have to marshal unmarshal data on js because it was using reference. This usage is not holding reference
|
Hey I completed load tests and made an update on implementation. Preparation; CREATE TABLE public.users (
id serial4 NOT NULL,
"name" varchar(100) NULL,
email varchar(255) NULL,
created_at timestamp DEFAULT now() NULL,
CONSTRAINT users_pkey PRIMARY KEY (id)
);Added pprof to main.go _ "net/http/pprof"
...
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()Initial config like below transformations:
users:
type: js
script: |
function transform(data, oldData, action) {
data.email = data.email.replace(/(.{3}).*(@.*)/, '$1***$2');
delete data.id;
data.processed_at = new Date().toISOString();
return data;
}Started the app with go run ~ go run ./cmd/wal-listener/... -config config.ymlStarted to watch wal-listener with top ~ top | grep wal-listenerInitial memory was 9MB I created insert function CREATE OR REPLACE FUNCTION fast_bulk_insert(
record_count INTEGER DEFAULT 10000
) RETURNS INTEGER AS $$
BEGIN
INSERT INTO users (name, email, created_at)
SELECT
'Fast User ' || n,
'fastuser' || n || '@test.com',
NOW() + (random() * interval '365 days') - interval '182 days'
FROM generate_series(1, record_count) AS n;
RETURN record_count;
END;
$$ LANGUAGE plpgsql;Tested with 100k insert SELECT bulk_insert_test_data(100000);Memory ~104MB peak File: wal-listener
Type: inuse_space
Time: 2025-09-04 22:09:02 +03
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 49.53MB, 98.02% of 50.53MB total
Showing top 10 nodes out of 74
flat flat% sum% cum cum%
24.01MB 47.51% 47.51% 34.51MB 68.29% github.com/ihippik/wal-listener/v2/internal/listener/transaction.(*WAL).CreateActionData
10.50MB 20.78% 68.29% 10.50MB 20.78% github.com/ihippik/wal-listener/v2/internal/listener/transaction.(*Column).AssertValue
10.01MB 19.81% 88.10% 44.51MB 88.10% github.com/ihippik/wal-listener/v2/internal/listener/transaction.(*BinaryParser).ParseWalMessage
2MB 3.97% 92.07% 2MB 3.97% runtime.allocm
0.50MB 1% 93.06% 0.50MB 1% golang.org/x/net/trace.init
0.50MB 0.99% 94.06% 0.50MB 0.99% regexp.(*bitState).reset
0.50MB 0.99% 95.05% 0.50MB 0.99% regexp/syntax.(*compiler).inst
0.50MB 0.99% 96.04% 0.50MB 0.99% cloud.google.com/go/iam/apiv1/iampb.init
0.50MB 0.99% 97.03% 0.50MB 0.99% runtime.malg
0.50MB 0.99% 98.02% 0.50MB 0.99% net/http.ListenAndServeCpu File: wal-listener
Type: cpu
Time: 2025-09-04 22:10:33 +03
Duration: 30.09s, Total samples = 40.21s (133.65%)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 38.67s, 96.17% of 40.21s total
Dropped 289 nodes (cum <= 0.20s)
Showing top 10 nodes out of 94
flat flat% sum% cum cum%
15.81s 39.32% 39.32% 15.81s 39.32% runtime.kevent
9.71s 24.15% 63.47% 9.71s 24.15% syscall.syscall
9.29s 23.10% 86.57% 9.29s 23.10% runtime.pthread_cond_wait
2.39s 5.94% 92.51% 2.39s 5.94% runtime.pthread_cond_signal
0.66s 1.64% 94.16% 0.66s 1.64% runtime.usleep
0.27s 0.67% 94.83% 0.27s 0.67% runtime.pthread_kill
0.25s 0.62% 95.45% 1.01s 2.51% runtime.scanobject
0.18s 0.45% 95.90% 0.39s 0.97% runtime.greyobject
0.06s 0.15% 96.05% 0.21s 0.52% runtime.mallocgcSmallScanNoHeader
0.05s 0.12% 96.17% 2.01s 5.00% runtime.gcDrainGoroutine File: wal-listener
Type: goroutine
Time: 2025-09-04 22:09:48 +03
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 28, 100% of 28 total
Showing top 10 nodes out of 79
flat flat% sum% cum cum%
26 92.86% 92.86% 26 92.86% runtime.gopark
1 3.57% 96.43% 1 3.57% runtime.goroutineProfileWithLabels
1 3.57% 100% 1 3.57% runtime.sigNoteSleep
0 0% 100% 2 7.14% github.com/IBM/sarama.(*Broker).responseReceiver
0 0% 100% 1 3.57% github.com/IBM/sarama.(*asyncProducer).dispatcher
0 0% 100% 1 3.57% github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1
0 0% 100% 1 3.57% github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func2
0 0% 100% 1 3.57% github.com/IBM/sarama.(*asyncProducer).retryHandler
0 0% 100% 1 3.57% github.com/IBM/sarama.(*brokerProducer).run
0 0% 100% 1 3.57% github.com/IBM/sarama.(*client).backgroundMetadataUpdaterResults without transformer (I commented whole transform implementation); Memory ~104MB peak File: wal-listener
Type: inuse_space
Time: 2025-09-04 22:14:10 +03
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 49898.18kB, 100% of 49898.18kB total
Showing top 10 nodes out of 53
flat flat% sum% cum cum%
20996.48kB 42.08% 42.08% 31748.66kB 63.63% github.com/ihippik/wal-listener/v2/internal/listener/transaction.(*WAL).CreateActionData
11552kB 23.15% 65.23% 43300.66kB 86.78% github.com/ihippik/wal-listener/v2/internal/listener/transaction.(*BinaryParser).ParseWalMessage
10752.18kB 21.55% 86.78% 10752.18kB 21.55% github.com/ihippik/wal-listener/v2/internal/listener/transaction.(*Column).AssertValue
3078kB 6.17% 92.95% 3590.22kB 7.20% runtime.allocm
908.20kB 1.82% 94.77% 908.20kB 1.82% github.com/goccy/go-json/internal/encoder.initEncoder.func1
544.67kB 1.09% 95.86% 544.67kB 1.09% regexp/syntax.(*compiler).inst
525.43kB 1.05% 96.91% 525.43kB 1.05% github.com/russross/blackfriday/v2.map.init.0
516.76kB 1.04% 97.95% 516.76kB 1.04% runtime.procresize
512.23kB 1.03% 98.97% 512.23kB 1.03% google.golang.org/protobuf/internal/filedesc.newRawFile
512.22kB 1.03% 100% 512.22kB 1.03% runtime.malgCpu File: wal-listener
Type: cpu
Time: 2025-09-04 22:24:15 +03
Duration: 30.17s, Total samples = 48.63s (161.21%)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 47.89s, 98.48% of 48.63s total
Dropped 114 nodes (cum <= 0.24s)
Showing top 10 nodes out of 69
flat flat% sum% cum cum%
15.29s 31.44% 31.44% 15.29s 31.44% runtime.kevent
14.72s 30.27% 61.71% 14.72s 30.27% runtime.pthread_cond_wait
12.79s 26.30% 88.01% 12.80s 26.32% syscall.syscall
3.76s 7.73% 95.74% 3.76s 7.73% runtime.pthread_cond_signal
1.26s 2.59% 98.33% 1.26s 2.59% runtime.usleep
0.02s 0.041% 98.38% 0.71s 1.46% runtime.gcDrain
0.02s 0.041% 98.42% 0.66s 1.36% runtime.stealWork
0.01s 0.021% 98.44% 30.95s 63.64% runtime.findRunnable
0.01s 0.021% 98.46% 0.65s 1.34% runtime.lock2
0.01s 0.021% 98.48% 14.73s 30.29% runtime.notesleepGoroutine File: wal-listener
Type: goroutine
Time: 2025-09-04 22:25:39 +03
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 28, 100% of 28 total
Showing top 10 nodes out of 79
flat flat% sum% cum cum%
26 92.86% 92.86% 26 92.86% runtime.gopark
1 3.57% 96.43% 1 3.57% runtime.goroutineProfileWithLabels
1 3.57% 100% 1 3.57% runtime.sigNoteSleep
0 0% 100% 2 7.14% github.com/IBM/sarama.(*Broker).responseReceiver
0 0% 100% 1 3.57% github.com/IBM/sarama.(*asyncProducer).dispatcher
0 0% 100% 1 3.57% github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1
0 0% 100% 1 3.57% github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func2
0 0% 100% 1 3.57% github.com/IBM/sarama.(*asyncProducer).retryHandler
0 0% 100% 1 3.57% github.com/IBM/sarama.(*brokerProducer).run
0 0% 100% 1 3.57% github.com/IBM/sarama.(*client).backgroundMetadataUpdater |
|
Thank you very much, man! will check soon. |
hey @ihippik
I have added the ability to transform database events using JavaScript. There was a potential thread safety issue and I implemented a pool for it, but after inspecting the WAL action loop, I see it's not concurrent, which means we don't need the pool for now.
Why people need this kind of feature:
I already added the usage to the README, but here's an example:
WDYT?