Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions indexer/pkg/common/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,12 @@ type MessageMetadata struct {
IngestionTimestamp time.Time `json:"ingestionTimestamp"`
LastErr string `json:"-"` // we want to exclude potentionally sensitive errors
}

const SequenceNumberNotSupported = -1

type DiscoveryBatch struct {
Messages []MessageWithMetadata
Verifications []VerifierResultWithMetadata
DiscoveryLocation string
SequenceNumber int
}
4 changes: 2 additions & 2 deletions indexer/pkg/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type IndexerStorageWriter interface {
VerifierResultsStorageWriter
MessageStorageWriter
DiscoveryStateWriter
// PersistDiscoveryBatch atomically persists messages, verifications, and the discovery sequence number.
PersistDiscoveryBatch(ctx context.Context, batch DiscoveryBatch) error
}

// VerifierResultsStorageReader provides the interface to retrieve verification results from storage.
Expand Down Expand Up @@ -69,6 +71,4 @@ type DiscoveryStateReader interface {
type DiscoveryStateWriter interface {
// CreateDiscoveryState creates a new record containing metadata about the discovery source.
CreateDiscoveryState(ctx context.Context, discoveryLocation string, startingSequenceNumber int) error
// UpdateDiscoverySequenceNumber updates the latest sequence number for that discovery source.
UpdateDiscoverySequenceNumber(ctx context.Context, discoveryLocation string, sequenceNumber int) error
}
68 changes: 25 additions & 43 deletions indexer/pkg/discovery/message_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,8 @@ func (a *AggregatorMessageDiscovery) validate() error {

func (a *AggregatorMessageDiscovery) Start(ctx context.Context) chan common.VerifierResultWithMetadata {
childCtx, cancelFunc := context.WithCancel(ctx)
a.wg.Add(2)
a.wg.Add(1)
go a.run(childCtx)
go a.updateSequenceNumber(childCtx)
a.cancelFunc = cancelFunc
a.logger.Info("MessageDiscovery Started")

Expand Down Expand Up @@ -184,30 +183,6 @@ func (a *AggregatorMessageDiscovery) run(ctx context.Context) {
}
}

func (a *AggregatorMessageDiscovery) updateSequenceNumber(ctx context.Context) {
defer a.wg.Done()

ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
a.logger.Info("updateSequenceNumber stopped due to context cancellation")
return
case <-ticker.C:
latestSequenceNumber, supports := a.aggregatorReader.GetSinceValue()
if !supports {
a.logger.Warnw("unable to update sequence number as reader does not support this.", "discoveryLocation", a.config.Address)
}

if err := a.storageSink.UpdateDiscoverySequenceNumber(ctx, a.config.Address, int(latestSequenceNumber)); err != nil {
a.logger.Errorf("unable to update sequence number: %w", err)
}
}
}
}

func (a *AggregatorMessageDiscovery) consumeReader(ctx context.Context) {
// We can be in a situation where multiple calls to consumeReader are running concurrently due to the ticker.
// This might happen during high load, or other situations where the ticker is running faster than the reader.
Expand Down Expand Up @@ -290,23 +265,9 @@ func (a *AggregatorMessageDiscovery) callReader(ctx context.Context) (bool, erro
allVerifications = append(allVerifications, verifierResultWithMetadata)
}

// Save all messages we've seen from the discovery source, if we're unable to persist them.
// We'll set the sequence value on the reader back to it's original value.
// This means we won't continue ingesting new messages until these ones are saved.
//
// This ensures that we won't miss a message.
if err := a.storageSink.BatchInsertMessages(ctx, messages); err != nil {
a.logger.Warn("Unable to save messages to storage, will retry")
if ableToSetSinceValue {
a.aggregatorReader.SetSinceValue(startingSequence)
}
return false, err
}

if len(persistedVerifications) > 0 {
err := a.storageSink.BatchInsertCCVData(ctx, persistedVerifications)
if err != nil {
a.logger.Warn("Unable to save verifications to storage, will retry")
if len(messages) > 0 || len(persistedVerifications) > 0 {
if err := a.persistBatch(ctx, messages, persistedVerifications, ableToSetSinceValue); err != nil {
a.logger.Warnw("Unable to persist discovery batch, will retry", "error", err)
if ableToSetSinceValue {
a.aggregatorReader.SetSinceValue(startingSequence)
}
Expand All @@ -327,6 +288,27 @@ func (a *AggregatorMessageDiscovery) callReader(ctx context.Context) (bool, erro
return len(queryResponse) > 0, nil
}

func (a *AggregatorMessageDiscovery) persistBatch(
ctx context.Context,
messages []common.MessageWithMetadata,
verifications []common.VerifierResultWithMetadata,
ableToSetSinceValue bool,
) error {
sequenceNumber := common.SequenceNumberNotSupported
if ableToSetSinceValue {
if currentSequence, supports := a.aggregatorReader.GetSinceValue(); supports {
sequenceNumber = int(currentSequence)
}
}

return a.storageSink.PersistDiscoveryBatch(ctx, common.DiscoveryBatch{
Messages: messages,
Verifications: verifications,
DiscoveryLocation: a.config.Address,
SequenceNumber: sequenceNumber,
})
}

func (a *AggregatorMessageDiscovery) isCircuitBreakerOpen() bool {
return a.aggregatorReader.GetDiscoveryCircuitBreakerState() == circuitbreaker.OpenState
}
Expand Down
Loading
Loading