From 301e8a3dc29175f92f53dce9cd382f816290c2b9 Mon Sep 17 00:00:00 2001 From: "Simon B.Robert" Date: Mon, 16 Feb 2026 14:01:53 -0500 Subject: [PATCH 1/2] Insert messages, verification and sequence number as a single transaction --- indexer/pkg/common/metadata.go | 9 + indexer/pkg/common/storage.go | 4 +- indexer/pkg/discovery/message_discovery.go | 68 ++--- .../pkg/discovery/message_discovery_test.go | 278 ++++++++---------- indexer/pkg/storage/postgres.go | 214 +++++++++----- internal/mocks/mock_IndexerStorage.go | 95 +++--- 6 files changed, 336 insertions(+), 332 deletions(-) diff --git a/indexer/pkg/common/metadata.go b/indexer/pkg/common/metadata.go index 9d0d0ef27..e1c12e04b 100644 --- a/indexer/pkg/common/metadata.go +++ b/indexer/pkg/common/metadata.go @@ -27,3 +27,12 @@ type MessageMetadata struct { IngestionTimestamp time.Time `json:"ingestionTimestamp"` LastErr string `json:"-"` // we want to exclude potentionally sensitive errors } + +const SequenceNumberNotSupported = -1 + +type DiscoveryBatch struct { + Messages []MessageWithMetadata + Verifications []VerifierResultWithMetadata + DiscoveryLocation string + SequenceNumber int +} diff --git a/indexer/pkg/common/storage.go b/indexer/pkg/common/storage.go index b27178537..3440c2ab0 100644 --- a/indexer/pkg/common/storage.go +++ b/indexer/pkg/common/storage.go @@ -23,6 +23,8 @@ type IndexerStorageWriter interface { VerifierResultsStorageWriter MessageStorageWriter DiscoveryStateWriter + // PersistDiscoveryBatch atomically persists messages, verifications, and the discovery sequence number. + PersistDiscoveryBatch(ctx context.Context, batch DiscoveryBatch) error } // VerifierResultsStorageReader provides the interface to retrieve verification results from storage. @@ -69,6 +71,4 @@ type DiscoveryStateReader interface { type DiscoveryStateWriter interface { // CreateDiscoveryState creates a new record containing metadata about the discovery source. CreateDiscoveryState(ctx context.Context, discoveryLocation string, startingSequenceNumber int) error - // UpdateDiscoverySequenceNumber updates the latest sequence number for that discovery source. - UpdateDiscoverySequenceNumber(ctx context.Context, discoveryLocation string, sequenceNumber int) error } diff --git a/indexer/pkg/discovery/message_discovery.go b/indexer/pkg/discovery/message_discovery.go index ba0685260..0a3fbebcb 100644 --- a/indexer/pkg/discovery/message_discovery.go +++ b/indexer/pkg/discovery/message_discovery.go @@ -131,9 +131,8 @@ func (a *AggregatorMessageDiscovery) validate() error { func (a *AggregatorMessageDiscovery) Start(ctx context.Context) chan common.VerifierResultWithMetadata { childCtx, cancelFunc := context.WithCancel(ctx) - a.wg.Add(2) + a.wg.Add(1) go a.run(childCtx) - go a.updateSequenceNumber(childCtx) a.cancelFunc = cancelFunc a.logger.Info("MessageDiscovery Started") @@ -184,30 +183,6 @@ func (a *AggregatorMessageDiscovery) run(ctx context.Context) { } } -func (a *AggregatorMessageDiscovery) updateSequenceNumber(ctx context.Context) { - defer a.wg.Done() - - ticker := time.NewTicker(time.Second * 5) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - a.logger.Info("updateSequenceNumber stopped due to context cancellation") - return - case <-ticker.C: - latestSequenceNumber, supports := a.aggregatorReader.GetSinceValue() - if !supports { - a.logger.Warnw("unable to update sequence number as reader does not support this.", "discoveryLocation", a.config.Address) - } - - if err := a.storageSink.UpdateDiscoverySequenceNumber(ctx, a.config.Address, int(latestSequenceNumber)); err != nil { - a.logger.Errorf("unable to update sequence number: %w", err) - } - } - } -} - func (a *AggregatorMessageDiscovery) consumeReader(ctx context.Context) { // We can be in a situation where multiple calls to consumeReader are running concurrently due to the ticker. // This might happen during high load, or other situations where the ticker is running faster than the reader. @@ -290,23 +265,9 @@ func (a *AggregatorMessageDiscovery) callReader(ctx context.Context) (bool, erro allVerifications = append(allVerifications, verifierResultWithMetadata) } - // Save all messages we've seen from the discovery source, if we're unable to persist them. - // We'll set the sequence value on the reader back to it's original value. - // This means we won't continue ingesting new messages until these ones are saved. - // - // This ensures that we won't miss a message. - if err := a.storageSink.BatchInsertMessages(ctx, messages); err != nil { - a.logger.Warn("Unable to save messages to storage, will retry") - if ableToSetSinceValue { - a.aggregatorReader.SetSinceValue(startingSequence) - } - return false, err - } - - if len(persistedVerifications) > 0 { - err := a.storageSink.BatchInsertCCVData(ctx, persistedVerifications) - if err != nil { - a.logger.Warn("Unable to save verifications to storage, will retry") + if len(messages) > 0 || len(persistedVerifications) > 0 { + if err := a.persistBatch(ctx, messages, persistedVerifications, ableToSetSinceValue); err != nil { + a.logger.Warnw("Unable to persist discovery batch, will retry", "error", err) if ableToSetSinceValue { a.aggregatorReader.SetSinceValue(startingSequence) } @@ -327,6 +288,27 @@ func (a *AggregatorMessageDiscovery) callReader(ctx context.Context) (bool, erro return len(queryResponse) > 0, nil } +func (a *AggregatorMessageDiscovery) persistBatch( + ctx context.Context, + messages []common.MessageWithMetadata, + verifications []common.VerifierResultWithMetadata, + ableToSetSinceValue bool, +) error { + sequenceNumber := common.SequenceNumberNotSupported + if ableToSetSinceValue { + if currentSequence, supports := a.aggregatorReader.GetSinceValue(); supports { + sequenceNumber = int(currentSequence) + } + } + + return a.storageSink.PersistDiscoveryBatch(ctx, common.DiscoveryBatch{ + Messages: messages, + Verifications: verifications, + DiscoveryLocation: a.config.Address, + SequenceNumber: sequenceNumber, + }) +} + func (a *AggregatorMessageDiscovery) isCircuitBreakerOpen() bool { return a.aggregatorReader.GetDiscoveryCircuitBreakerState() == circuitbreaker.OpenState } diff --git a/indexer/pkg/discovery/message_discovery_test.go b/indexer/pkg/discovery/message_discovery_test.go index 9770f5ef5..236a98302 100644 --- a/indexer/pkg/discovery/message_discovery_test.go +++ b/indexer/pkg/discovery/message_discovery_test.go @@ -68,38 +68,20 @@ func (ts *testSetup) CapturedCCVData() []common.VerifierResultWithMetadata { return result } -// CapturedSeqNumber returns the last captured sequence number for an address. -func (ts *testSetup) CapturedSeqNumber(address string) (int, bool) { - ts.mu.Lock() - defer ts.mu.Unlock() - v, ok := ts.capturedSeqNumbers[address] - return v, ok -} - // newMockStorage creates a MockIndexerStorage with default permissive expectations // that capture all written data for test assertions. func newMockStorage(t *testing.T, ts *testSetup) *mocks.MockIndexerStorage { t.Helper() store := mocks.NewMockIndexerStorage(t) - store.EXPECT().BatchInsertMessages(mock.Anything, mock.Anything). - Run(func(_ context.Context, msgs []common.MessageWithMetadata) { - ts.mu.Lock() - ts.capturedMessages = append(ts.capturedMessages, msgs...) - ts.mu.Unlock() - }).Return(nil).Maybe() - - store.EXPECT().BatchInsertCCVData(mock.Anything, mock.Anything). - Run(func(_ context.Context, data []common.VerifierResultWithMetadata) { + store.EXPECT().PersistDiscoveryBatch(mock.Anything, mock.Anything). + Run(func(_ context.Context, batch common.DiscoveryBatch) { ts.mu.Lock() - ts.capturedCCVData = append(ts.capturedCCVData, data...) - ts.mu.Unlock() - }).Return(nil).Maybe() - - store.EXPECT().UpdateDiscoverySequenceNumber(mock.Anything, mock.Anything, mock.Anything). - Run(func(_ context.Context, addr string, seq int) { - ts.mu.Lock() - ts.capturedSeqNumbers[addr] = seq + ts.capturedMessages = append(ts.capturedMessages, batch.Messages...) + ts.capturedCCVData = append(ts.capturedCCVData, batch.Verifications...) + if batch.SequenceNumber != common.SequenceNumberNotSupported { + ts.capturedSeqNumbers[batch.DiscoveryLocation] = batch.SequenceNumber + } ts.mu.Unlock() }).Return(nil).Maybe() @@ -726,110 +708,6 @@ func TestMessageDiscovery_NewMessageEmittedAndSaved(t *testing.T) { require.Len(t, storedMessages, 1, "exactly one message should be stored") } -// setupMessageDiscoveryTestWithSequenceNumberSupport creates a test setup with a reader that supports sequence numbers. -func setupMessageDiscoveryTestWithSequenceNumberSupport(t *testing.T, discoveryAddress string, initialSequenceNumber int64) *testSetup { - t.Helper() - cfg := config.DiscoveryConfig{ - AggregatorReaderConfig: config.AggregatorReaderConfig{ - Address: discoveryAddress, - }, - PollInterval: 50, - Timeout: 500, - } - return setupMessageDiscoveryTestWithSequenceNumberSupportAndConfig(t, cfg, initialSequenceNumber, 5*time.Second) -} - -// setupMessageDiscoveryTestWithSequenceNumberSupportAndConfig creates a test setup with sequence number support and custom config. -func setupMessageDiscoveryTestWithSequenceNumberSupportAndConfig(t *testing.T, cfg config.DiscoveryConfig, initialSequenceNumber int64, timeout time.Duration) *testSetup { - t.Helper() - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - - lggr := logger.Test(t) - mon := monitoring.NewNoopIndexerMonitoring() - - ts := &testSetup{ - Logger: lggr, - Monitor: mon, - Context: ctx, - Cancel: cancel, - capturedSeqNumbers: make(map[string]int), - } - - store := newMockStorage(t, ts) - ts.Storage = store - - store.EXPECT().CreateDiscoveryState(mock.Anything, cfg.Address, int(initialSequenceNumber)).Return(nil).Maybe() - - mockReader := internal.NewMockReader(internal.MockReaderConfig{ - EmitEmptyResponses: true, - }) - mockReader.SetSinceValue(initialSequenceNumber) - - timeProvider := mocks.NewMockTimeProvider(t) - timeProvider.EXPECT().GetTime().Return(time.Now().UTC()).Maybe() - - resilientReader := readers.NewResilientReader(mockReader, lggr, readers.DefaultResilienceConfig()) - - registry := registry.NewVerifierRegistry() - - discovery, _ := NewAggregatorMessageDiscovery( - WithLogger(lggr), - WithRegistry(registry), - WithTimeProvider(timeProvider), - WithMonitoring(mon), - WithStorage(store), - WithAggregator(resilientReader), - WithConfig(cfg), - ) - - ts.Discovery = discovery.(*AggregatorMessageDiscovery) - ts.Reader = resilientReader - ts.MockReader = mockReader - - return ts -} - -// TestUpdateSequenceNumber_UpdatesPeriodically tests that sequence numbers are updated periodically. -func TestUpdateSequenceNumber_UpdatesPeriodically(t *testing.T) { - discoveryAddress := "test-discovery-address" - initialSequenceNumber := int64(100) - newSequenceNumber := int64(150) - - ts := setupMessageDiscoveryTestWithSequenceNumberSupportAndConfig(t, config.DiscoveryConfig{ - AggregatorReaderConfig: config.AggregatorReaderConfig{ - Address: discoveryAddress, - }, - PollInterval: 50, - Timeout: 500, - }, initialSequenceNumber, 8*time.Second) - defer ts.Cleanup() - - messageCh := ts.Discovery.Start(ts.Context) - - go func() { - for { - select { - case <-ts.Context.Done(): - return - case _, ok := <-messageCh: - if !ok { - return - } - } - } - }() - - ts.MockReader.SetSinceValue(newSequenceNumber) - - // Wait for the update to happen (ticker runs every 5 seconds) - time.Sleep(6 * time.Second) - - seq, ok := ts.CapturedSeqNumber(discoveryAddress) - require.True(t, ok, "sequence number should have been updated") - assert.Equal(t, int(newSequenceNumber), seq, "sequence number should be updated in storage") -} - // TestMessageDiscovery_DiscoveryOnlyNotPersisted tests that discovery-only verifications // (those with MessageDiscoveryVersion prefix in CCVData) are emitted and saved as messages, // but NOT persisted as verifications. @@ -913,42 +791,126 @@ func createTestCCVDataWithCCVData(uniqueID int, timestamp int64, sourceChain, de } } -// TestUpdateSequenceNumber_StopsOnContextCancellation tests that the update goroutine stops on context cancellation. -func TestUpdateSequenceNumber_StopsOnContextCancellation(t *testing.T) { - discoveryAddress := "test-discovery-address" - initialSequenceNumber := int64(100) +func TestCallReader_PersistDiscoveryBatch(t *testing.T) { + tests := []struct { + name string + maxMessages int + preExhaust bool + persistErr error + initialSequence int64 + expectFound bool + expectErr bool + expectSinceReset bool + expectBatchCall bool + }{ + { + name: "happy_path_batch_committed_with_sequence", + maxMessages: 1, + initialSequence: 10, + expectFound: true, + expectBatchCall: true, + }, + { + name: "batch_failure_resets_since_value", + maxMessages: 1, + persistErr: errors.New("db error"), + initialSequence: 10, + expectErr: true, + expectSinceReset: true, + expectBatchCall: true, + }, + { + name: "empty_response_no_batch_call", + maxMessages: 1, + preExhaust: true, + }, + } - ts := setupMessageDiscoveryTestWithSequenceNumberSupport(t, discoveryAddress, initialSequenceNumber) - defer ts.Cleanup() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + discoveryAddress := "test-address" + cfg := config.DiscoveryConfig{ + AggregatorReaderConfig: config.AggregatorReaderConfig{ + Address: discoveryAddress, + }, + PollInterval: 50, + Timeout: 500, + } - // Start discovery - messageCh := ts.Discovery.Start(ts.Context) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - // Drain message channel in background to prevent blocking - go func() { - for range messageCh { - // - } - }() + lggr := logger.Test(t) + mon := monitoring.NewNoopIndexerMonitoring() - // Wait a moment for goroutines to start - time.Sleep(100 * time.Millisecond) + store := mocks.NewMockIndexerStorage(t) - // Cancel context - ts.Cancel() + var capturedBatch *common.DiscoveryBatch + if tt.expectBatchCall { + store.EXPECT().PersistDiscoveryBatch(mock.Anything, mock.Anything). + Run(func(_ context.Context, batch common.DiscoveryBatch) { + capturedBatch = &batch + }).Return(tt.persistErr).Once() + } - // Wait for goroutines to stop - time.Sleep(100 * time.Millisecond) + mockReader := internal.NewMockReader(internal.MockReaderConfig{ + EmitEmptyResponses: true, + MaxMessages: tt.maxMessages, + MessageGenerator: func(n int) common.VerifierResultWithMetadata { + return createTestCCVData(n, time.Now().UnixMilli(), 1, 2) + }, + }) - // Verify discovery stopped - err := ts.Discovery.Close() - assert.NoError(t, err, "close should complete successfully") + mockReader.SetSinceValue(tt.initialSequence) - // Verify context is canceled - select { - case <-ts.Context.Done(): - // Expected - default: - t.Fatal("context should be cancelled") + if tt.preExhaust { + _, _ = mockReader.ReadCCVData(ctx) + } + + resilientReader := readers.NewResilientReader(mockReader, lggr, readers.DefaultResilienceConfig()) + + timeProvider := mocks.NewMockTimeProvider(t) + timeProvider.EXPECT().GetTime().Return(time.Now().UTC()).Maybe() + + reg := registry.NewVerifierRegistry() + + disc, err := NewAggregatorMessageDiscovery( + WithLogger(lggr), + WithRegistry(reg), + WithTimeProvider(timeProvider), + WithMonitoring(mon), + WithStorage(store), + WithAggregator(resilientReader), + WithConfig(cfg), + ) + require.NoError(t, err) + + aggDisc := disc.(*AggregatorMessageDiscovery) + aggDisc.messageCh = make(chan common.VerifierResultWithMetadata, 10) + + found, callErr := aggDisc.callReader(ctx) + + if tt.expectErr { + require.Error(t, callErr) + } else { + require.NoError(t, callErr) + } + + assert.Equal(t, tt.expectFound, found) + + if tt.expectBatchCall && tt.persistErr == nil { + require.NotNil(t, capturedBatch) + assert.Len(t, capturedBatch.Messages, 1) + assert.Len(t, capturedBatch.Verifications, 1) + assert.Equal(t, discoveryAddress, capturedBatch.DiscoveryLocation) + assert.NotEqual(t, common.SequenceNumberNotSupported, capturedBatch.SequenceNumber) + } + + if tt.expectSinceReset { + currentSeq, ok := resilientReader.GetSinceValue() + require.True(t, ok) + assert.Equal(t, tt.initialSequence, currentSeq) + } + }) } } diff --git a/indexer/pkg/storage/postgres.go b/indexer/pkg/storage/postgres.go index f76fcb8ff..ea0d055c0 100644 --- a/indexer/pkg/storage/postgres.go +++ b/indexer/pkg/storage/postgres.go @@ -22,16 +22,16 @@ import ( var _ common.IndexerStorage = (*PostgresStorage)(nil) const ( - opGetCCVData = "GetCCVData" - opQueryCCVData = "QueryCCVData" - opInsertCCVData = "InsertCCVData" - opBatchInsertCCVData = "BatchInsertCCVData" - opBatchInsertMessages = "BatchInsertMessages" - opInsertMessage = "InsertMessage" - opQueryMessages = "QueryMessages" - opUpdateMessageStatus = "UpdateMessageStatus" - opCreateDiscoveryState = "CreateDiscoveryState" - opUpdateDiscoverySequenceNumber = "UpdateDiscoverySequenceNumber" + opGetCCVData = "GetCCVData" + opQueryCCVData = "QueryCCVData" + opInsertCCVData = "InsertCCVData" + opBatchInsertCCVData = "BatchInsertCCVData" + opBatchInsertMessages = "BatchInsertMessages" + opInsertMessage = "InsertMessage" + opQueryMessages = "QueryMessages" + opUpdateMessageStatus = "UpdateMessageStatus" + opCreateDiscoveryState = "CreateDiscoveryState" + opPersistDiscoveryBatch = "PersistDiscoveryBatch" ) type PostgresStorage struct { @@ -277,17 +277,7 @@ func (d *PostgresStorage) InsertCCVData(ctx context.Context, ccvData common.Veri return nil } -// BatchInsertCCVData inserts multiple CCVData entries into the database efficiently using a batch insert. -func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList []common.VerifierResultWithMetadata) error { - if len(ccvDataList) == 0 { - return nil - } - - startInsertMetric := time.Now() - d.mu.Lock() - defer d.mu.Unlock() - - // Build batch insert query with multiple value sets +func buildBatchInsertCCVDataQuery(ccvDataList []common.VerifierResultWithMetadata) (string, []any, error) { var query strings.Builder query.WriteString(` INSERT INTO indexer.verifier_results ( @@ -295,41 +285,35 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] verifier_source_address, verifier_dest_address, attestation_timestamp, - ingestion_timestamp, + ingestion_timestamp, source_chain_selector, dest_chain_selector, ccv_data, message, message_ccv_addresses, message_executor_address - ) VALUES - `) + ) VALUES `) args := make([]any, 0, len(ccvDataList)*11) valueClauses := make([]string, 0, len(ccvDataList)) for i, ccvData := range ccvDataList { - // Serialize message to JSON messageJSON, err := json.Marshal(ccvData.VerifierResult.Message) if err != nil { - d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opBatchInsertCCVData) - return fmt.Errorf("failed to marshal message to JSON at index %d: %w", i, err) + return "", nil, fmt.Errorf("failed to marshal message to JSON at index %d: %w", i, err) } - // Convert CCV addresses to hex string array ccvAddressesHex := make([]string, len(ccvData.VerifierResult.MessageCCVAddresses)) for j, addr := range ccvData.VerifierResult.MessageCCVAddresses { ccvAddressesHex[j] = addr.String() } - // Calculate parameter positions for this row baseIdx := i * 11 valueClause := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", baseIdx+1, baseIdx+2, baseIdx+3, baseIdx+4, baseIdx+5, baseIdx+6, baseIdx+7, baseIdx+8, baseIdx+9, baseIdx+10, baseIdx+11) valueClauses = append(valueClauses, valueClause) - // Add arguments for this row args = append(args, ccvData.VerifierResult.MessageID.String(), ccvData.VerifierResult.VerifierSourceAddress.String(), @@ -345,7 +329,6 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] ) } - // Complete the query with all value clauses and conflict resolution for i, vc := range valueClauses { if i > 0 { query.WriteString(", ") @@ -354,8 +337,26 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] } query.WriteString(" ON CONFLICT (message_id, verifier_source_address, verifier_dest_address) DO NOTHING") - // Execute the batch insert - result, err := d.execContext(ctx, query.String(), args...) + return query.String(), args, nil +} + +// BatchInsertCCVData inserts multiple CCVData entries into the database efficiently using a batch insert. +func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList []common.VerifierResultWithMetadata) error { + if len(ccvDataList) == 0 { + return nil + } + + startInsertMetric := time.Now() + d.mu.Lock() + defer d.mu.Unlock() + + query, args, err := buildBatchInsertCCVDataQuery(ccvDataList) + if err != nil { + d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opBatchInsertCCVData) + return err + } + + result, err := d.execContext(ctx, query, args...) if err != nil { d.lggr.Errorw("Failed to batch insert CCV data", "error", err, "count", len(ccvDataList)) d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opBatchInsertCCVData) @@ -370,13 +371,11 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] d.lggr.Debugw("Batch insert completed", "requested", len(ccvDataList), "inserted", rowsAffected) } - // Track unique messages and update metrics uniqueMessages := make(map[string]bool) for _, ccvData := range ccvDataList { uniqueMessages[ccvData.VerifierResult.MessageID.String()] = true } - // Check which message IDs are new for messageID := range uniqueMessages { msgBytes32, err := protocol.NewBytes32FromString(messageID) if err != nil { @@ -387,7 +386,6 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] } } - // Increment the verification records counter by the number of rows actually inserted for range rowsAffected { d.monitoring.Metrics().IncrementVerificationRecordsCounter(ctx) } @@ -396,17 +394,7 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] return nil } -// BatchInsertMessages implements common.IndexerStorage. -func (d *PostgresStorage) BatchInsertMessages(ctx context.Context, messages []common.MessageWithMetadata) error { - if len(messages) == 0 { - return nil - } - - startInsertMetric := time.Now() - d.mu.Lock() - defer d.mu.Unlock() - - // Build batch insert query with multiple value sets +func buildBatchInsertMessagesQuery(messages []common.MessageWithMetadata) (string, []any, error) { var query strings.Builder query.WriteString(` INSERT INTO indexer.messages ( @@ -417,27 +405,22 @@ func (d *PostgresStorage) BatchInsertMessages(ctx context.Context, messages []co source_chain_selector, dest_chain_selector, ingestion_timestamp - ) VALUES - `) + ) VALUES `) args := make([]any, 0, len(messages)*7) valueClauses := make([]string, 0, len(messages)) for i, msg := range messages { - // Serialize message to JSON messageJSON, err := json.Marshal(msg.Message) if err != nil { - d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opBatchInsertMessages) - return fmt.Errorf("failed to marshal message to JSON at index %d: %w", i, err) + return "", nil, fmt.Errorf("failed to marshal message to JSON at index %d: %w", i, err) } - // Calculate parameter positions for this row baseIdx := i * 7 valueClause := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)", baseIdx+1, baseIdx+2, baseIdx+3, baseIdx+4, baseIdx+5, baseIdx+6, baseIdx+7) valueClauses = append(valueClauses, valueClause) - // Add arguments for this row args = append(args, msg.Message.MustMessageID().String(), messageJSON, @@ -449,7 +432,6 @@ func (d *PostgresStorage) BatchInsertMessages(ctx context.Context, messages []co ) } - // Complete the query with all value clauses and conflict resolution for i, vc := range valueClauses { if i > 0 { query.WriteString(", ") @@ -458,8 +440,26 @@ func (d *PostgresStorage) BatchInsertMessages(ctx context.Context, messages []co } query.WriteString(" ON CONFLICT (message_id) DO NOTHING") - // Execute the batch insert - result, err := d.execContext(ctx, query.String(), args...) + return query.String(), args, nil +} + +// BatchInsertMessages implements common.IndexerStorage. +func (d *PostgresStorage) BatchInsertMessages(ctx context.Context, messages []common.MessageWithMetadata) error { + if len(messages) == 0 { + return nil + } + + startInsertMetric := time.Now() + d.mu.Lock() + defer d.mu.Unlock() + + query, args, err := buildBatchInsertMessagesQuery(messages) + if err != nil { + d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opBatchInsertMessages) + return err + } + + result, err := d.execContext(ctx, query, args...) if err != nil { d.lggr.Errorw("Failed to batch insert messages", "error", err, "count", len(messages)) d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opBatchInsertMessages) @@ -715,38 +715,90 @@ func (d *PostgresStorage) CreateDiscoveryState(ctx context.Context, discoveryLoc return nil } -func (d *PostgresStorage) UpdateDiscoverySequenceNumber(ctx context.Context, discoveryLocation string, sequenceNumber int) error { - startUpdateMetric := time.Now() - d.mu.Lock() - defer d.mu.Unlock() +func (d *PostgresStorage) PersistDiscoveryBatch(ctx context.Context, batch common.DiscoveryBatch) error { + if len(batch.Messages) == 0 && len(batch.Verifications) == 0 { + return nil + } - query := ` - UPDATE indexer.discovery_state - SET last_sequence_number = $1 - WHERE discovery_location = $2 - ` + startMetric := time.Now() - result, err := d.execContext(ctx, query, sequenceNumber, discoveryLocation) - if err != nil { - d.lggr.Errorw("Failed to update discovery state record", "error", err, "discoveryLocation", discoveryLocation, "sequenceNumber", sequenceNumber) - d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opUpdateDiscoverySequenceNumber) - d.monitoring.Metrics().RecordStorageWriteDuration(ctx, time.Since(startUpdateMetric)) - return fmt.Errorf("failed to update discovery state record: %w", err) - } + var verificationsInserted int64 - rowsAffected, err := result.RowsAffected() + err := sqlutil.TransactDataSource(ctx, d.ds, nil, func(tx sqlutil.DataSource) error { + if len(batch.Messages) > 0 { + query, args, err := buildBatchInsertMessagesQuery(batch.Messages) + if err != nil { + return err + } + if _, err := tx.ExecContext(ctx, query, args...); err != nil { + return fmt.Errorf("failed to batch insert messages: %w", err) + } + } + + if len(batch.Verifications) > 0 { + query, args, err := buildBatchInsertCCVDataQuery(batch.Verifications) + if err != nil { + return err + } + result, err := tx.ExecContext(ctx, query, args...) + if err != nil { + return fmt.Errorf("failed to batch insert CCV data: %w", err) + } + verificationsInserted, err = result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected for CCV insert: %w", err) + } + } + + if batch.SequenceNumber != common.SequenceNumberNotSupported { + query := `UPDATE indexer.discovery_state SET last_sequence_number = $1 WHERE discovery_location = $2` + result, err := tx.ExecContext(ctx, query, batch.SequenceNumber, batch.DiscoveryLocation) + if err != nil { + return fmt.Errorf("failed to update discovery sequence: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected for discovery sequence: %w", err) + } + if rowsAffected == 0 { + return fmt.Errorf("discovery record not found: %s", batch.DiscoveryLocation) + } + } + + return nil + }) if err != nil { - d.lggr.Warnw("Failed to get rows affected", "error", err) - d.monitoring.Metrics().RecordStorageWriteDuration(ctx, time.Since(startUpdateMetric)) - return fmt.Errorf("failed to get rows affected: %w", err) + d.lggr.Errorw("Failed to persist discovery batch", "error", err) + d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opPersistDiscoveryBatch) + d.monitoring.Metrics().RecordStorageWriteDuration(ctx, time.Since(startMetric)) + return err } - if rowsAffected == 0 { - d.monitoring.Metrics().RecordStorageWriteDuration(ctx, time.Since(startUpdateMetric)) - return fmt.Errorf("discovery record not found: %s", discoveryLocation) + if len(batch.Verifications) > 0 { + uniqueMessages := make(map[string]bool) + for _, ccvData := range batch.Verifications { + uniqueMessages[ccvData.VerifierResult.MessageID.String()] = true + } + for messageID := range uniqueMessages { + msgBytes32, err := protocol.NewBytes32FromString(messageID) + if err != nil { + continue + } + if err := d.trackUniqueMessage(ctx, msgBytes32); err != nil { + d.lggr.Warnw("Failed to track unique message", "error", err, "messageID", messageID) + } + } + for range verificationsInserted { + d.monitoring.Metrics().IncrementVerificationRecordsCounter(ctx) + } } - d.monitoring.Metrics().RecordStorageWriteDuration(ctx, time.Since(startUpdateMetric)) + d.lggr.Debugw("Discovery batch persisted", + "messages", len(batch.Messages), + "verifications", len(batch.Verifications), + "sequenceNumber", batch.SequenceNumber, + ) + d.monitoring.Metrics().RecordStorageWriteDuration(ctx, time.Since(startMetric)) return nil } diff --git a/internal/mocks/mock_IndexerStorage.go b/internal/mocks/mock_IndexerStorage.go index 26e041533..b724dab6e 100644 --- a/internal/mocks/mock_IndexerStorage.go +++ b/internal/mocks/mock_IndexerStorage.go @@ -434,6 +434,53 @@ func (_c *MockIndexerStorage_InsertMessage_Call) RunAndReturn(run func(context.C return _c } +// PersistDiscoveryBatch provides a mock function with given fields: ctx, batch +func (_m *MockIndexerStorage) PersistDiscoveryBatch(ctx context.Context, batch common.DiscoveryBatch) error { + ret := _m.Called(ctx, batch) + + if len(ret) == 0 { + panic("no return value specified for PersistDiscoveryBatch") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, common.DiscoveryBatch) error); ok { + r0 = rf(ctx, batch) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIndexerStorage_PersistDiscoveryBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PersistDiscoveryBatch' +type MockIndexerStorage_PersistDiscoveryBatch_Call struct { + *mock.Call +} + +// PersistDiscoveryBatch is a helper method to define mock.On call +// - ctx context.Context +// - batch common.DiscoveryBatch +func (_e *MockIndexerStorage_Expecter) PersistDiscoveryBatch(ctx interface{}, batch interface{}) *MockIndexerStorage_PersistDiscoveryBatch_Call { + return &MockIndexerStorage_PersistDiscoveryBatch_Call{Call: _e.mock.On("PersistDiscoveryBatch", ctx, batch)} +} + +func (_c *MockIndexerStorage_PersistDiscoveryBatch_Call) Run(run func(ctx context.Context, batch common.DiscoveryBatch)) *MockIndexerStorage_PersistDiscoveryBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.DiscoveryBatch)) + }) + return _c +} + +func (_c *MockIndexerStorage_PersistDiscoveryBatch_Call) Return(_a0 error) *MockIndexerStorage_PersistDiscoveryBatch_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIndexerStorage_PersistDiscoveryBatch_Call) RunAndReturn(run func(context.Context, common.DiscoveryBatch) error) *MockIndexerStorage_PersistDiscoveryBatch_Call { + _c.Call.Return(run) + return _c +} + // QueryCCVData provides a mock function with given fields: ctx, start, end, sourceChainSelectors, destChainSelectors, limit, offset func (_m *MockIndexerStorage) QueryCCVData(ctx context.Context, start int64, end int64, sourceChainSelectors []protocol.ChainSelector, destChainSelectors []protocol.ChainSelector, limit uint64, offset uint64) (map[string][]common.VerifierResultWithMetadata, error) { ret := _m.Called(ctx, start, end, sourceChainSelectors, destChainSelectors, limit, offset) @@ -562,54 +609,6 @@ func (_c *MockIndexerStorage_QueryMessages_Call) RunAndReturn(run func(context.C return _c } -// UpdateDiscoverySequenceNumber provides a mock function with given fields: ctx, discoveryLocation, sequenceNumber -func (_m *MockIndexerStorage) UpdateDiscoverySequenceNumber(ctx context.Context, discoveryLocation string, sequenceNumber int) error { - ret := _m.Called(ctx, discoveryLocation, sequenceNumber) - - if len(ret) == 0 { - panic("no return value specified for UpdateDiscoverySequenceNumber") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, int) error); ok { - r0 = rf(ctx, discoveryLocation, sequenceNumber) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockIndexerStorage_UpdateDiscoverySequenceNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateDiscoverySequenceNumber' -type MockIndexerStorage_UpdateDiscoverySequenceNumber_Call struct { - *mock.Call -} - -// UpdateDiscoverySequenceNumber is a helper method to define mock.On call -// - ctx context.Context -// - discoveryLocation string -// - sequenceNumber int -func (_e *MockIndexerStorage_Expecter) UpdateDiscoverySequenceNumber(ctx interface{}, discoveryLocation interface{}, sequenceNumber interface{}) *MockIndexerStorage_UpdateDiscoverySequenceNumber_Call { - return &MockIndexerStorage_UpdateDiscoverySequenceNumber_Call{Call: _e.mock.On("UpdateDiscoverySequenceNumber", ctx, discoveryLocation, sequenceNumber)} -} - -func (_c *MockIndexerStorage_UpdateDiscoverySequenceNumber_Call) Run(run func(ctx context.Context, discoveryLocation string, sequenceNumber int)) *MockIndexerStorage_UpdateDiscoverySequenceNumber_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(int)) - }) - return _c -} - -func (_c *MockIndexerStorage_UpdateDiscoverySequenceNumber_Call) Return(_a0 error) *MockIndexerStorage_UpdateDiscoverySequenceNumber_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockIndexerStorage_UpdateDiscoverySequenceNumber_Call) RunAndReturn(run func(context.Context, string, int) error) *MockIndexerStorage_UpdateDiscoverySequenceNumber_Call { - _c.Call.Return(run) - return _c -} - // UpdateMessageStatus provides a mock function with given fields: ctx, messageID, status, lastErr func (_m *MockIndexerStorage) UpdateMessageStatus(ctx context.Context, messageID protocol.Bytes32, status common.MessageStatus, lastErr string) error { ret := _m.Called(ctx, messageID, status, lastErr) From e608e8c4d3b8d6190831585c2d2bdebc1e472d4e Mon Sep 17 00:00:00 2001 From: "Simon B.Robert" Date: Mon, 16 Feb 2026 15:53:50 -0500 Subject: [PATCH 2/2] Small improvements --- indexer/pkg/storage/postgres.go | 51 +++++++++++---------------------- 1 file changed, 16 insertions(+), 35 deletions(-) diff --git a/indexer/pkg/storage/postgres.go b/indexer/pkg/storage/postgres.go index ea0d055c0..161638fea 100644 --- a/indexer/pkg/storage/postgres.go +++ b/indexer/pkg/storage/postgres.go @@ -246,7 +246,7 @@ func (d *PostgresStorage) InsertCCVData(ctx context.Context, ccvData common.Veri ) if err != nil { d.lggr.Errorw("Failed to insert CCV data", "error", err, "messageID", ccvData.VerifierResult.MessageID.String()) - d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opBatchInsertCCVData) + d.monitoring.Metrics().RecordStorageInsertErrorsCounter(ctx, opInsertCCVData) d.monitoring.Metrics().RecordStorageWriteDuration(ctx, time.Since(startInsertMetric)) return fmt.Errorf("failed to insert CCV data: %w", err) } @@ -264,11 +264,7 @@ func (d *PostgresStorage) InsertCCVData(ctx context.Context, ccvData common.Veri return ErrDuplicateCCVData } - // Check if this is a new unique message - if err := d.trackUniqueMessage(ctx, ccvData.VerifierResult.MessageID); err != nil { - d.lggr.Warnw("Failed to track unique message", "error", err, "messageID", ccvData.VerifierResult.MessageID.String()) - // Don't fail the insert if we can't track the unique message - } + d.trackUniqueMessages(ctx, []common.VerifierResultWithMetadata{ccvData}) // Increment the verification records counter d.monitoring.Metrics().IncrementVerificationRecordsCounter(ctx) @@ -371,20 +367,7 @@ func (d *PostgresStorage) BatchInsertCCVData(ctx context.Context, ccvDataList [] d.lggr.Debugw("Batch insert completed", "requested", len(ccvDataList), "inserted", rowsAffected) } - uniqueMessages := make(map[string]bool) - for _, ccvData := range ccvDataList { - uniqueMessages[ccvData.VerifierResult.MessageID.String()] = true - } - - for messageID := range uniqueMessages { - msgBytes32, err := protocol.NewBytes32FromString(messageID) - if err != nil { - continue - } - if err := d.trackUniqueMessage(ctx, msgBytes32); err != nil { - d.lggr.Warnw("Failed to track unique message", "error", err, "messageID", messageID) - } - } + d.trackUniqueMessages(ctx, ccvDataList) for range rowsAffected { d.monitoring.Metrics().IncrementVerificationRecordsCounter(ctx) @@ -775,19 +758,7 @@ func (d *PostgresStorage) PersistDiscoveryBatch(ctx context.Context, batch commo } if len(batch.Verifications) > 0 { - uniqueMessages := make(map[string]bool) - for _, ccvData := range batch.Verifications { - uniqueMessages[ccvData.VerifierResult.MessageID.String()] = true - } - for messageID := range uniqueMessages { - msgBytes32, err := protocol.NewBytes32FromString(messageID) - if err != nil { - continue - } - if err := d.trackUniqueMessage(ctx, msgBytes32); err != nil { - d.lggr.Warnw("Failed to track unique message", "error", err, "messageID", messageID) - } - } + d.trackUniqueMessages(ctx, batch.Verifications) for range verificationsInserted { d.monitoring.Metrics().IncrementVerificationRecordsCounter(ctx) } @@ -822,8 +793,18 @@ func (d *PostgresStorage) GetDiscoverySequenceNumber(ctx context.Context, discov return sequenceNumber, nil } -// trackUniqueMessage checks if this is the first time we're seeing this message ID -// and increments the unique messages counter if so. +func (d *PostgresStorage) trackUniqueMessages(ctx context.Context, verifications []common.VerifierResultWithMetadata) { + seen := make(map[protocol.Bytes32]struct{}, len(verifications)) + for _, v := range verifications { + seen[v.VerifierResult.MessageID] = struct{}{} + } + for messageID := range seen { + if err := d.trackUniqueMessage(ctx, messageID); err != nil { + d.lggr.Warnw("Failed to track unique message", "error", err, "messageID", messageID.String()) + } + } +} + func (d *PostgresStorage) trackUniqueMessage(ctx context.Context, messageID protocol.Bytes32) error { // Check whether exactly one row exists for this message_id. If so, // this indicates the message was first-seen by the storage insert that preceded this call.