Skip to content

Commit 31940e1

Browse files
committed
perf: batch insert attemptDetail
1 parent 18bdf78 commit 31940e1

File tree

12 files changed

+138
-93
lines changed

12 files changed

+138
-93
lines changed

app/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (app *Application) initialize() error {
132132
PoolConcurrency: int(cfg.Worker.Pool.Concurrency),
133133
}
134134
deliverer := deliverer.NewHTTPDeliverer(&cfg.Worker.Deliverer)
135-
app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics, tracer, app.bus)
135+
app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics, tracer, app.bus, app.dispatcher)
136136
}
137137

138138
// admin

db/dao/attempt_detail_dao.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,25 @@ func NewAttemptDetailDao(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) At
3030
}
3131
}
3232

33-
func (dao *attemptDetailDao) Insert(ctx context.Context, attemptDetail *entities.AttemptDetail) error {
34-
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
33+
func (dao *attemptDetailDao) BatchInsert(ctx context.Context, entities []*entities.AttemptDetail) error {
34+
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.batch_insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
3535
defer span.End()
3636

3737
now := time.Now()
38-
values := []interface{}{attemptDetail.ID, attemptDetail.RequestHeaders, attemptDetail.RequestBody, attemptDetail.ResponseHeaders, attemptDetail.ResponseBody, now, now, attemptDetail.WorkspaceId}
3938

40-
sql := `INSERT INTO attempt_details (id, request_headers, request_body, response_headers, response_body, created_at, updated_at, ws_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
41-
ON CONFLICT (id) DO UPDATE SET
42-
request_headers = EXCLUDED.request_headers,
43-
request_body = EXCLUDED.request_body,
44-
response_headers = EXCLUDED.response_headers,
45-
response_body = EXCLUDED.response_body,
46-
updated_at = EXCLUDED.updated_at`
47-
48-
result, err := dao.DB(ctx).ExecContext(ctx, sql, values...)
39+
builder := psql.Insert(dao.opts.Table).Columns("id", "request_headers", "request_body", "response_headers", "response_body", "created_at", "updated_at", "ws_id")
40+
for _, entity := range entities {
41+
builder = builder.Values(entity.ID, entity.RequestHeaders, entity.RequestBody, entity.ResponseHeaders, entity.ResponseBody, now, now, entity.WorkspaceId)
42+
}
43+
sql, args := builder.Suffix(`
44+
ON CONFLICT (id) DO UPDATE SET
45+
request_headers = EXCLUDED.request_headers,
46+
request_body = EXCLUDED.request_body,
47+
response_headers = EXCLUDED.response_headers,
48+
response_body = EXCLUDED.response_body,
49+
updated_at = EXCLUDED.updated_at`).MustSql()
50+
dao.debugSQL(sql, args)
51+
result, err := dao.DB(ctx).ExecContext(ctx, sql, args...)
4952
if err != nil {
5053
return err
5154
}

db/dao/daos.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type SourceDAO interface {
5050

5151
type AttemptDetailDAO interface {
5252
BaseDAO[entities.AttemptDetail]
53-
Insert(ctx context.Context, attemptDetail *entities.AttemptDetail) error
53+
BatchInsert(ctx context.Context, entities []*entities.AttemptDetail) error
5454
}
5555

5656
type PluginDAO interface {

db/entities/types.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package entities
33
import (
44
"database/sql/driver"
55
"encoding/json"
6+
"errors"
67
"github.com/lib/pq"
78
"github.com/webhookx-io/webhookx/pkg/types"
89
)
@@ -26,7 +27,14 @@ type BaseModel struct {
2627
type Headers map[string]string
2728

2829
func (m *Headers) Scan(src interface{}) error {
29-
return json.Unmarshal(src.([]byte), m)
30+
switch v := src.(type) {
31+
case string:
32+
return json.Unmarshal([]byte(v), m)
33+
case []byte:
34+
return json.Unmarshal(v, m)
35+
default:
36+
return errors.New("unknown type")
37+
}
3038
}
3139

3240
func (m Headers) Value() (driver.Value, error) {
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN request_headers TYPE JSONB USING request_headers::JSONB;
2+
ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN response_headers TYPE JSONB USING response_headers::JSONB;
3+
CREATE INDEX idx_attempt_details_ws_id ON attempt_details (ws_id);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN request_headers TYPE TEXT;
2+
ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN response_headers TYPE TEXT;
3+
DROP INDEX idx_attempt_details_ws_id;

dispatcher/dispatcher.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/webhookx-io/webhookx/pkg/tracing"
1414
"github.com/webhookx-io/webhookx/pkg/types"
1515
"github.com/webhookx-io/webhookx/utils"
16-
"github.com/webhookx-io/webhookx/worker"
1716
"go.opentelemetry.io/otel/trace"
1817
"go.uber.org/zap"
1918
"time"
@@ -96,7 +95,7 @@ func (d *Dispatcher) dispatchBatch(ctx context.Context, events []*entities.Event
9695
return d.db.Attempts.BatchInsert(ctx, attempts)
9796
})
9897
if err == nil {
99-
go d.sendToQueue(context.WithoutCancel(ctx), attempts)
98+
go d.SendToQueue(context.WithoutCancel(ctx), attempts)
10099
}
101100
return n, err
102101
}
@@ -130,19 +129,19 @@ func (d *Dispatcher) DispatchEndpoint(ctx context.Context, event *entities.Event
130129
return err
131130
}
132131

133-
d.sendToQueue(ctx, attempts)
132+
d.SendToQueue(ctx, attempts)
134133

135134
return nil
136135
}
137136

138-
func (d *Dispatcher) sendToQueue(ctx context.Context, attempts []*entities.Attempt) {
137+
func (d *Dispatcher) SendToQueue(ctx context.Context, attempts []*entities.Attempt) {
139138
tasks := make([]*taskqueue.TaskMessage, 0, len(attempts))
140139
ids := make([]string, 0, len(attempts))
141140
for _, attempt := range attempts {
142141
tasks = append(tasks, &taskqueue.TaskMessage{
143142
ID: attempt.ID,
144143
ScheduledAt: attempt.ScheduledAt.Time,
145-
Data: &worker.MessageData{
144+
Data: &taskqueue.MessageData{
146145
EventID: attempt.EventId,
147146
EndpointId: attempt.EndpointId,
148147
Attempt: attempt.AttemptNumber,

pkg/taskqueue/queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ type TaskMessage struct {
1313
data []byte
1414
}
1515

16+
type MessageData struct {
17+
EventID string `json:"event_id"`
18+
EndpointId string `json:"endpoint_id"`
19+
Attempt int `json:"attempt"`
20+
Event string `json:"event"`
21+
}
22+
1623
func (t *TaskMessage) String() string {
1724
return t.ID + ":" + string(t.data)
1825
}

test/tracing/worker_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,12 @@ var _ = Describe("tracing worker", Ordered, func() {
5353
"github.com/webhookx-io/webhookx",
5454
}
5555
expectedScopeSpans := map[string]map[string]string{
56-
"worker.submit": {},
57-
"worker.handle_task": {},
58-
"dao.endpoints.get": {},
59-
"dao.plugins.list": {},
60-
"worker.deliver": {},
61-
"dao.attempt_details.insert": {},
62-
"taskqueue.redis.delete": {},
56+
"worker.submit": {},
57+
"worker.handle_task": {},
58+
"dao.endpoints.get": {},
59+
"dao.plugins.list": {},
60+
"worker.deliver": {},
61+
"taskqueue.redis.delete": {},
6362
}
6463

6564
n, err := helper.FileCountLine(helper.OtelCollectorTracesFile)

test/worker/requeue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ var _ = Describe("processRequeue", Ordered, func() {
4545
assert.NoError(GinkgoT(), err)
4646
w = worker.NewWorker(worker.WorkerOptions{
4747
RequeueJobInterval: time.Second,
48-
}, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue, metrics, tracer, mocks.MockBus{})
48+
}, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue, metrics, tracer, mocks.MockBus{}, nil)
4949

5050
// data
5151
ws := utils.Must(db.Workspaces.GetDefault(context.TODO()))

0 commit comments

Comments
 (0)