Skip to content

Commit 5fa4f19

Browse files
vm-001webhookx-x
authored andcommitted
perf(worker): store event in MessageData to avoid querying from database
1 parent acb856c commit 5fa4f19

File tree

5 files changed

+51
-38
lines changed

5 files changed

+51
-38
lines changed

db/entities/attempt.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ type Attempt struct {
2222
Request *AttemptRequest `json:"request" db:"request"`
2323
Response *AttemptResponse `json:"response" db:"response"`
2424

25+
Event *Event `json:"-" db:"-"`
26+
2527
BaseModel
2628
}
2729

dispatcher/dispatcher.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88
"github.com/webhookx-io/webhookx/db/query"
99
"github.com/webhookx-io/webhookx/eventbus"
1010
"github.com/webhookx-io/webhookx/mcache"
11-
"github.com/webhookx-io/webhookx/model"
1211
"github.com/webhookx-io/webhookx/pkg/metrics"
1312
"github.com/webhookx-io/webhookx/pkg/taskqueue"
1413
"github.com/webhookx-io/webhookx/pkg/tracing"
1514
"github.com/webhookx-io/webhookx/pkg/types"
1615
"github.com/webhookx-io/webhookx/utils"
16+
"github.com/webhookx-io/webhookx/worker"
1717
"go.opentelemetry.io/otel/trace"
1818
"go.uber.org/zap"
1919
"time"
@@ -114,6 +114,7 @@ func fanout(event *entities.Event, endpoints []*entities.Endpoint, mode entities
114114
AttemptNumber: 1,
115115
ScheduledAt: types.NewTime(now.Add(time.Second * time.Duration(delay))),
116116
TriggerMode: mode,
117+
Event: event,
117118
}
118119
attempt.WorkspaceId = event.WorkspaceId
119120
attempts = append(attempts, attempt)
@@ -141,10 +142,11 @@ func (d *Dispatcher) sendToQueue(ctx context.Context, attempts []*entities.Attem
141142
tasks = append(tasks, &taskqueue.TaskMessage{
142143
ID: attempt.ID,
143144
ScheduledAt: attempt.ScheduledAt.Time,
144-
Data: &model.MessageData{
145+
Data: &worker.MessageData{
145146
EventID: attempt.EventId,
146147
EndpointId: attempt.EndpointId,
147148
Attempt: attempt.AttemptNumber,
149+
Event: string(attempt.Event.Data),
148150
},
149151
})
150152
ids = append(ids, attempt.ID)

test/delivery/delivery_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ var _ = Describe("delivery", Ordered, func() {
6060
resp, err := proxyClient.R().
6161
SetBody(`{
6262
"event_type": "foo.bar",
63-
"data": {
64-
"key": "value"
65-
}
63+
"data": {"key": "value"}
6664
}`).
6765
Post("/")
6866
return err == nil && resp.StatusCode() == 200
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
package model
1+
package worker
22

33
type MessageData struct {
44
EventID string `json:"event_id"`
55
EndpointId string `json:"endpoint_id"`
66
Attempt int `json:"attempt"`
7+
Event string `json:"event"`
78
}

worker/worker.go

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/webhookx-io/webhookx/db/entities"
1111
"github.com/webhookx-io/webhookx/eventbus"
1212
"github.com/webhookx-io/webhookx/mcache"
13-
"github.com/webhookx-io/webhookx/model"
1413
"github.com/webhookx-io/webhookx/pkg/metrics"
1514
"github.com/webhookx-io/webhookx/pkg/plugin"
1615
plugintypes "github.com/webhookx-io/webhookx/pkg/plugin/types"
@@ -127,7 +126,7 @@ func (w *Worker) run() {
127126
defer span.End()
128127
ctx = tracingCtx
129128
}
130-
task.Data = &model.MessageData{}
129+
task.Data = &MessageData{}
131130
err = task.UnmarshalData(task.Data)
132131
if err != nil {
133132
w.log.Errorf("[worker] failed to unmarshal task: %v", err)
@@ -195,13 +194,19 @@ func (w *Worker) processRequeue() {
195194

196195
tasks := make([]*taskqueue.TaskMessage, 0, len(attempts))
197196
for _, attempt := range attempts {
197+
event, err := w.DB.Events.Get(ctx, attempt.EventId)
198+
if err != nil {
199+
w.log.Errorf("[worker] failed to get event: %v", err)
200+
break
201+
}
198202
task := &taskqueue.TaskMessage{
199203
ID: attempt.ID,
200204
ScheduledAt: attempt.ScheduledAt.Time,
201-
Data: &model.MessageData{
205+
Data: &MessageData{
202206
EventID: attempt.EventId,
203207
EndpointId: attempt.EndpointId,
204208
Attempt: attempt.AttemptNumber,
209+
Event: string(event.Data),
205210
},
206211
}
207212
tasks = append(tasks, task)
@@ -232,7 +237,7 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er
232237
defer span.End()
233238
ctx = tracingCtx
234239
}
235-
data := task.Data.(*model.MessageData)
240+
data := task.Data.(*MessageData)
236241

237242
// verify endpoint
238243
cacheKey := constants.EndpointCacheKey.Build(data.EndpointId)
@@ -247,15 +252,18 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er
247252
return w.DB.Attempts.UpdateErrorCode(ctx, task.ID, entities.AttemptStatusCanceled, entities.AttemptErrorCodeEndpointDisabled)
248253
}
249254

250-
// verify event
251-
cacheKey = constants.EventCacheKey.Build(data.EventID)
252-
opts := &mcache.LoadOptions{DisableLRU: true}
253-
event, err := mcache.Load(ctx, cacheKey, opts, w.DB.Events.Get, data.EventID)
254-
if err != nil {
255-
return err
256-
}
257-
if event == nil {
258-
return w.DB.Attempts.UpdateErrorCode(ctx, task.ID, entities.AttemptStatusCanceled, entities.AttemptErrorCodeUnknown)
255+
if data.Event == "" { // backward compatibility
256+
// verify event
257+
cacheKey = constants.EventCacheKey.Build(data.EventID)
258+
opts := &mcache.LoadOptions{DisableLRU: true}
259+
event, err := mcache.Load(ctx, cacheKey, opts, w.DB.Events.Get, data.EventID)
260+
if err != nil {
261+
return err
262+
}
263+
if event == nil {
264+
return w.DB.Attempts.UpdateErrorCode(ctx, task.ID, entities.AttemptStatusCanceled, entities.AttemptErrorCodeUnknown)
265+
}
266+
data.Event = string(event.Data)
259267
}
260268

261269
plugins, err := listEndpointPlugins(ctx, w.DB, endpoint.ID)
@@ -273,7 +281,7 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er
273281
URL: endpoint.Request.URL,
274282
Method: endpoint.Request.Method,
275283
Headers: endpoint.Request.Headers,
276-
Payload: event.Data,
284+
Payload: []byte(data.Event),
277285
}
278286
if pluginReq.Headers == nil {
279287
pluginReq.Headers = make(map[string]string)
@@ -326,22 +334,24 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er
326334
return err
327335
}
328336

329-
attemptDetail := &entities.AttemptDetail{
330-
ID: task.ID,
331-
RequestHeaders: utils.HeaderMap(request.Request.Header),
332-
RequestBody: utils.Pointer(string(request.Payload)),
333-
}
334-
if len(response.Header) > 0 {
335-
attemptDetail.ResponseHeaders = utils.Pointer(entities.Headers(utils.HeaderMap(response.Header)))
336-
}
337-
if response.ResponseBody != nil {
338-
attemptDetail.ResponseBody = utils.Pointer(string(response.ResponseBody))
339-
}
340-
attemptDetail.WorkspaceId = endpoint.WorkspaceId
341-
err = w.DB.AttemptDetails.Insert(ctx, attemptDetail)
342-
if err != nil {
343-
return err
344-
}
337+
go func() {
338+
attemptDetail := &entities.AttemptDetail{
339+
ID: task.ID,
340+
RequestHeaders: utils.HeaderMap(request.Request.Header),
341+
RequestBody: utils.Pointer(string(request.Payload)),
342+
}
343+
if len(response.Header) > 0 {
344+
attemptDetail.ResponseHeaders = utils.Pointer(entities.Headers(utils.HeaderMap(response.Header)))
345+
}
346+
if response.ResponseBody != nil {
347+
attemptDetail.ResponseBody = utils.Pointer(string(response.ResponseBody))
348+
}
349+
attemptDetail.WorkspaceId = endpoint.WorkspaceId
350+
err = w.DB.AttemptDetails.Insert(ctx, attemptDetail)
351+
if err != nil {
352+
w.log.Errorf("[worker] failed to insert attempt detail: %v", err)
353+
}
354+
}()
345355

346356
if result.Status == entities.AttemptStatusSuccess {
347357
return nil
@@ -355,7 +365,7 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er
355365
delay := endpoint.Retry.Config.Attempts[data.Attempt]
356366
nextAttempt := &entities.Attempt{
357367
ID: utils.KSUID(),
358-
EventId: event.ID,
368+
EventId: data.EventID,
359369
EndpointId: endpoint.ID,
360370
Status: entities.AttemptStatusInit,
361371
AttemptNumber: data.Attempt + 1,
@@ -372,7 +382,7 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er
372382
task = &taskqueue.TaskMessage{
373383
ID: nextAttempt.ID,
374384
ScheduledAt: nextAttempt.ScheduledAt.Time,
375-
Data: &model.MessageData{
385+
Data: &MessageData{
376386
EventID: data.EventID,
377387
EndpointId: data.EndpointId,
378388
Attempt: nextAttempt.AttemptNumber,

0 commit comments

Comments
 (0)