Skip to content

Commit 922eb60

Browse files
committed
feat: adjust event processing to batch mode
- merge the processing logic for adding and modifying events - merge the processing logic for renaming and deleting events - optimize the batch event processing flow - refactor the event processing failure handling logic
1 parent f687814 commit 922eb60

File tree

6 files changed

+832
-195
lines changed

6 files changed

+832
-195
lines changed

cmd/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func main() {
136136
schedulerService := service.NewScheduler(syncRepo, scanRepo, storageManager, appLogger)
137137
fileScanService := service.NewFileScanService(workspaceRepo, eventRepo, scanRepo, storageManager, codebaseEmbeddingRepo, appLogger)
138138
uploadService := service.NewUploadService(schedulerService, syncRepo, appLogger, syncServiceConfig)
139-
embeddingProcessService := service.NewEmbeddingProcessService(workspaceRepo, eventRepo, codebaseEmbeddingRepo, uploadService, appLogger)
139+
embeddingProcessService := service.NewEmbeddingProcessService(workspaceRepo, eventRepo, codebaseEmbeddingRepo, uploadService, syncRepo, appLogger)
140140
embeddingStatusService := service.NewEmbeddingStatusService(codebaseEmbeddingRepo, workspaceRepo, eventRepo, syncRepo, appLogger)
141141

142142
// 创建存储
@@ -168,7 +168,7 @@ func main() {
168168

169169
// Initialize job layer
170170
fileScanJob := job.NewFileScanJob(fileScanService, storageManager, syncRepo, appLogger, 5*time.Minute)
171-
eventProcessorJob := job.NewEventProcessorJob(appLogger, syncRepo, embeddingProcessService, codegraphProcessor, storageManager)
171+
eventProcessorJob := job.NewEventProcessorJob(appLogger, syncRepo, embeddingProcessService, codegraphProcessor, 10*time.Second, storageManager)
172172
statusCheckerJob := job.NewStatusCheckerJob(embeddingStatusService, storageManager, syncRepo, appLogger, 5*time.Second)
173173
eventCleanerJob := job.NewEventCleanerJob(eventRepo, appLogger)
174174
indexCleanJob := job.NewIndexCleanJob(appLogger, indexer, workspaceRepo)

internal/job/event_processor_job.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@ import (
1313

1414
// EventProcessorJob 事件处理任务
1515
type EventProcessorJob struct {
16-
httpSync repository.SyncInterface
17-
embedding service.EmbeddingProcessService
18-
codegraph service.CodegraphProcessService
19-
storage repository.StorageInterface
20-
logger logger.Logger
21-
ctx context.Context
22-
cancel context.CancelFunc
16+
httpSync repository.SyncInterface
17+
embedding service.EmbeddingProcessService
18+
codegraph service.CodegraphProcessService
19+
storage repository.StorageInterface
20+
logger logger.Logger
21+
embeddingInterval time.Duration
22+
ctx context.Context
23+
cancel context.CancelFunc
2324
}
2425

2526
// NewEventProcessorJob 创建事件处理任务
@@ -28,17 +29,19 @@ func NewEventProcessorJob(
2829
httpSync repository.SyncInterface,
2930
embedding service.EmbeddingProcessService,
3031
codegraph service.CodegraphProcessService,
32+
embeddingInterval time.Duration,
3133
storage repository.StorageInterface,
3234
) *EventProcessorJob {
3335
ctx, cancel := context.WithCancel(context.Background())
3436
return &EventProcessorJob{
35-
httpSync: httpSync,
36-
embedding: embedding,
37-
codegraph: codegraph,
38-
storage: storage,
39-
logger: logger,
40-
ctx: ctx,
41-
cancel: cancel,
37+
httpSync: httpSync,
38+
embedding: embedding,
39+
codegraph: codegraph,
40+
storage: storage,
41+
logger: logger,
42+
embeddingInterval: embeddingInterval,
43+
ctx: ctx,
44+
cancel: cancel,
4245
}
4346
}
4447

@@ -53,7 +56,7 @@ func (j *EventProcessorJob) Start(ctx context.Context) {
5356

5457
// 立即执行一次事件处理
5558
authInfo := config.GetAuthInfo()
56-
if authInfo.ClientId != "" && authInfo.Token != "" && authInfo.ServerURL != "" {
59+
if j.embeddingInterval > 0 && authInfo.ClientId != "" && authInfo.Token != "" && authInfo.ServerURL != "" {
5760
j.embeddingProcessWorkspaces(ctx)
5861
}
5962

@@ -63,7 +66,7 @@ func (j *EventProcessorJob) Start(ctx context.Context) {
6366
j.logger.Error("recovered from panic in embedding processor: %v", r)
6467
}
6568
}()
66-
ticker := time.NewTicker(5 * time.Second)
69+
ticker := time.NewTicker(j.embeddingInterval)
6770
defer ticker.Stop()
6871

6972
for {

internal/repository/event.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ type EventRepository interface {
4949
BatchCreateEvents(events []*model.Event) error
5050
// BatchDeleteEvents 批量删除事件
5151
BatchDeleteEvents(ids []int64) error
52+
// UpdateEvents 批量更新事件嵌入信息
53+
UpdateEventsEmbedding(events []*model.Event) error
54+
// UpdateEventsEmbeddingStatus 批量更新事件嵌入状态
55+
UpdateEventsEmbeddingStatus(eventIDs []int64, status int) error
5256
// GetExpiredEventIDs 获取过期事件的ID列表
5357
GetExpiredEventIDs(cutoffTime time.Time) ([]int64, error)
5458
// GetTableName 获取表名
@@ -1359,6 +1363,102 @@ func (r *eventRepository) BatchDeleteEvents(ids []int64) error {
13591363
return nil
13601364
}
13611365

1366+
// UpdateEvents 批量更新事件嵌入信息
1367+
func (r *eventRepository) UpdateEventsEmbedding(events []*model.Event) error {
1368+
if len(events) == 0 {
1369+
return nil
1370+
}
1371+
1372+
tx, err := r.db.GetDB().Begin()
1373+
if err != nil {
1374+
return fmt.Errorf("failed to begin transaction: %w", err)
1375+
}
1376+
defer func() {
1377+
if err != nil {
1378+
tx.Rollback()
1379+
}
1380+
}()
1381+
1382+
query := `
1383+
UPDATE events
1384+
SET embedding_status = ?, sync_id = ?, file_hash = ?, updated_at = ?
1385+
WHERE id = ?
1386+
`
1387+
1388+
nowTime := time.Now()
1389+
stmt, err := tx.Prepare(query)
1390+
if err != nil {
1391+
return fmt.Errorf("failed to prepare statement: %w", err)
1392+
}
1393+
defer stmt.Close()
1394+
1395+
for _, event := range events {
1396+
_, err = stmt.Exec(
1397+
event.EmbeddingStatus,
1398+
event.SyncId,
1399+
event.FileHash,
1400+
nowTime,
1401+
event.ID,
1402+
)
1403+
if err != nil {
1404+
r.logger.Error("Failed to update event %d: %v", event.ID, err)
1405+
return fmt.Errorf("failed to update event %d: %w", event.ID, err)
1406+
}
1407+
}
1408+
1409+
if err = tx.Commit(); err != nil {
1410+
return fmt.Errorf("failed to commit transaction: %w", err)
1411+
}
1412+
1413+
r.logger.Info("Successfully updated %d events", len(events))
1414+
return nil
1415+
}
1416+
1417+
// UpdateEventsEmbeddingStatus 批量更新事件嵌入状态
1418+
func (r *eventRepository) UpdateEventsEmbeddingStatus(eventIDs []int64, status int) error {
1419+
if len(eventIDs) == 0 {
1420+
return nil
1421+
}
1422+
1423+
tx, err := r.db.GetDB().Begin()
1424+
if err != nil {
1425+
return fmt.Errorf("failed to begin transaction: %w", err)
1426+
}
1427+
defer func() {
1428+
if err != nil {
1429+
tx.Rollback()
1430+
}
1431+
}()
1432+
1433+
query := `
1434+
UPDATE events
1435+
SET embedding_status = ?, updated_at = ?
1436+
WHERE id = ?
1437+
`
1438+
1439+
nowTime := time.Now()
1440+
stmt, err := tx.Prepare(query)
1441+
if err != nil {
1442+
return fmt.Errorf("failed to prepare statement: %w", err)
1443+
}
1444+
defer stmt.Close()
1445+
1446+
for _, id := range eventIDs {
1447+
_, err = stmt.Exec(status, nowTime, id)
1448+
if err != nil {
1449+
r.logger.Error("Failed to update event status for ID %d: %v", id, err)
1450+
return fmt.Errorf("failed to update event status for ID %d: %w", id, err)
1451+
}
1452+
}
1453+
1454+
if err = tx.Commit(); err != nil {
1455+
return fmt.Errorf("failed to commit transaction: %w", err)
1456+
}
1457+
1458+
r.logger.Info("Successfully updated status for %d events", len(eventIDs))
1459+
return nil
1460+
}
1461+
13621462
// GetExpiredEventIDs 获取过期事件的ID列表
13631463
func (r *eventRepository) GetExpiredEventIDs(cutoffTime time.Time) ([]int64, error) {
13641464
query := `

0 commit comments

Comments
 (0)