Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ coverage.txt

# conduit example
cmd/conduit/data

# python virtualenv
.venv
31 changes: 20 additions & 11 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
env:
- DOCKER_NAME=algorand/conduit
- DOCKER_GITHUB_NAME=algorand/conduit

before:
hooks:
Expand Down Expand Up @@ -37,8 +37,8 @@ dockers:
goos: linux
goarch: amd64
image_templates:
- "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-amd64"
- "{{ .Env.DOCKER_NAME }}:{{ .Version }}-amd64"
- "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-amd64"
- "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}-amd64"
build_flag_templates:
- --platform=linux/amd64
- --label=org.opencontainers.image.title={{ .ProjectName }}
Expand All @@ -52,8 +52,8 @@ dockers:
goos: linux
goarch: arm64
image_templates:
- "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-arm64"
- "{{ .Env.DOCKER_NAME }}:{{ .Version }}-arm64"
- "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-arm64"
- "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}-arm64"
build_flag_templates:
- --platform=linux/arm64
- --label=org.opencontainers.image.title={{ .ProjectName }}
Expand All @@ -66,14 +66,14 @@ dockers:

# automatically select amd64/arm64 when using image.
docker_manifests:
- name_template: "{{ .Env.DOCKER_NAME }}:{{ .Version }}"
- name_template: "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}"
image_templates:
- "{{ .Env.DOCKER_NAME }}:{{ .Version }}-amd64"
- "{{ .Env.DOCKER_NAME }}:{{ .Version }}-arm64"
- name_template: "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}"
- "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}-amd64"
- "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}-arm64"
- name_template: "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}"
image_templates:
- "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-amd64"
- "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-arm64"
- "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-amd64"
- "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-arm64"

archives:
- name_template: >-
Expand All @@ -94,3 +94,12 @@ changelog:
- '^chore:'
- '^docs:'
- '^test:'

release:
draft: true
header: |
![GitHub Logo](https://raw.githubusercontent.com/algorand/go-algorand/master/release/release-banner.jpg)
footer: |
**Full Changelog**: https://github.com/{{ .Env.DOCKER_GITHUB_NAME }}/compare{{ .PreviousTag }}...{{ .Tag }}
---
[Docker images for this release are available on Docker Hub.](https://hub.docker.com/r/algorand/conduit)
10 changes: 5 additions & 5 deletions conduit/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type pluginOutput interface {
pluginInput | empty
}

// Retries is a wrapper for retrying a function call f() with a cancellation context,
// retries is a wrapper for retrying a function call f() with a cancellation context,
// a delay and a max retry count. It attempts to call the wrapped function at least once
// and only after the first attempt will pay attention to a context cancellation.
// This can allow the pipeline to receive a cancellation and guarantee attempting to finish
Expand All @@ -45,7 +45,7 @@ type pluginOutput interface {
// - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries
// - when p.cfg.retryCount == 0, the error will be the last error encountered
// - the returned duration dur is the total time spent in the function, including retries
func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
func retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
start := time.Now()

for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ {
Expand Down Expand Up @@ -74,9 +74,9 @@ func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipe
return
}

// RetriesNoOutput applies the same logic as Retries, but for functions that return no output.
func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := Retries(func(x X) (empty, error) {
// retriesNoOutput applies the same logic as Retries, but for functions that return no output.
func retriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := retries(func(x X) (empty, error) {
return empty{}, f(x)
}, a, p, msg)
return d, err
Expand Down
16 changes: 8 additions & 8 deletions conduit/pipeline/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func TestRetries(t *testing.T) {
for _, tc := range cases {
tc := tc

// run cases for Retries()
t.Run("Retries() "+tc.name, func(t *testing.T) {
// run cases for retries()
t.Run("retries() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
Expand All @@ -127,7 +127,7 @@ func TestRetries(t *testing.T) {
yChan := make(chan uint64)
errChan := make(chan error)
go func() {
y, _, err := Retries(succeedAfter, 0, p, "test")
y, _, err := retries(succeedAfter, 0, p, "test")
yChan <- y
errChan <- err
}()
Expand All @@ -144,7 +144,7 @@ func TestRetries(t *testing.T) {
return
}

y, _, err := Retries(succeedAfter, 0, p, "test")
y, _, err := retries(succeedAfter, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)

Expand All @@ -163,8 +163,8 @@ func TestRetries(t *testing.T) {
}
})

// run cases for RetriesNoOutput()
t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) {
// run cases for retriesNoOutput()
t.Run("retriesNoOutput() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
Expand All @@ -183,7 +183,7 @@ func TestRetries(t *testing.T) {

errChan := make(chan error)
go func() {
_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
_, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test")
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
Expand All @@ -197,7 +197,7 @@ func TestRetries(t *testing.T) {
return
}

_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
_, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)
} else { // retryCount > 0 so doesn't retry forever
Expand Down
15 changes: 6 additions & 9 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (p *pipelineImpl) importerHandler(importer importers.Importer, roundChan <-
totalSelectWait += waitTime
p.logger.Tracef("importer handler waited %dms to receive round %d", waitTime.Milliseconds(), rnd)

blkData, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
blkData, importTime, lastError := retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
if lastError != nil {
p.cancelWithProblem(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError))
return
Expand Down Expand Up @@ -533,7 +533,7 @@ func (p *pipelineImpl) processorHandler(idx int, proc processors.Processor, blkI

var procTime time.Duration
var lastError error
blk, procTime, lastError = Retries(proc.Process, blk, p, proc.Metadata().Name)
blk, procTime, lastError = retries(proc.Process, blk, p, proc.Metadata().Name)
if lastError != nil {
p.cancelWithProblem(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError))
return
Expand Down Expand Up @@ -598,7 +598,7 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug
}

var exportTime time.Duration
exportTime, lastError = RetriesNoOutput(exporter.Receive, blk, p, eName)
exportTime, lastError = retriesNoOutput(exporter.Receive, blk, p, eName)
if lastError != nil {
lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRound, lastError)
return
Expand Down Expand Up @@ -640,16 +640,15 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug
// WARNING: removing/re-log-levelling the following will BREAK:
// - the E2E test (Search for "Pipeline round" in subslurp.py)
// - the internal tools logstats collector (See func ConduitCollector in logstats.go of internal-tools repo)
p.logger.Infof(logstatsE2Elog(nextRound, lastRound, len(blk.Payset), exportTime))
p.logger.Infof(logstatsE2Elog(lastRound, len(blk.Payset), exportTime))
}
}
}()
}

func logstatsE2Elog(nextRound, lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string {
func logstatsE2Elog(lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string {
return fmt.Sprintf(
"UPDATED Pipeline NextRound=%d. FINISHED Pipeline round r=%d (%d txn) exported in %s",
nextRound,
"FINISHED Pipeline round r=%d (%d txn) exported in %s",
lastRound,
topLevelTxnCount,
exportTime,
Expand Down Expand Up @@ -696,8 +695,6 @@ func (p *pipelineImpl) Start() {
}
}
}(p.pipelineMetadata.NextRound)

<-p.ctx.Done()
}

func (p *pipelineImpl) Wait() {
Expand Down
4 changes: 2 additions & 2 deletions conduit/pipeline/pipeline_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
)

const (
logLevel = log.ErrorLevel // log.DebugLevel // log.InfoLevel // log.TraceLevel //
retryCount = 3 // math.MaxUint64
logLevel = log.ErrorLevel
retryCount = 3
)

type sleepingImporter struct {
Expand Down
5 changes: 2 additions & 3 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,13 +990,12 @@ func TestMetrics(t *testing.T) {
}

func TestLogStatsE2Elog(t *testing.T) {
nextRound := uint64(1337)
round := uint64(42)
numTxns := 13
duration := 12345600 * time.Microsecond

expectedLog := "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
log := logstatsE2Elog(nextRound, round, numTxns, duration)
expectedLog := "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
log := logstatsE2Elog(round, numTxns, duration)
require.Equal(t, expectedLog, log)

logstatsRex, err := regexp.Compile(`round r=(\d+) \((\d+) txn\) exported in (.*)`)
Expand Down
2 changes: 1 addition & 1 deletion conduit/plugins/exporters/filewriter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Write block data to files. This plugin works with the file rerader plugin to create a simple file-based pipeine.

The genesis is always exported to a plain JSON file named `genesis.json` regardless of the `FilenamePattern`.
The genesis file is always exported to a plain JSON file named `genesis.json` regardless of the `FilenamePattern`.

## Configuration

Expand Down
6 changes: 4 additions & 2 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func getConfigWithPattern(t *testing.T, pattern string) (config, tempdir string)
}

func TestDefaults(t *testing.T) {
require.Equal(t, defaultEncodingFormat, MessagepackFormat)
require.Equal(t, defaultIsGzip, true)
format, gzip, err := ParseFilenamePattern(FilePattern)
require.NoError(t, err)
require.Equal(t, format, defaultEncodingFormat)
require.Equal(t, gzip, defaultIsGzip)
}

func TestExporterMetadata(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/filewriter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ParseFilenamePattern(pattern string) (EncodingFormat, bool, error) {
return blockFormat, gzip, nil
}

// EncodeToFile enocods an object to a file using a given format and possible gzip compression.
// EncodeToFile encodes an object to a file using a given format and possible gzip compression.
func EncodeToFile(filename string, v interface{}, format EncodingFormat, isGzip bool) error {
file, err := os.Create(filename)
if err != nil {
Expand Down Expand Up @@ -91,7 +91,7 @@ func Encode(format EncodingFormat, writer io.Writer, v interface{}) error {
case MessagepackFormat:
handle = msgpack.LenientCodecHandle
default:
return fmt.Errorf("EncodeToFile(): unhandled format %d", format)
return fmt.Errorf("Encode(): unhandled format %d", format)
}
return codec.NewEncoder(writer, handle).Encode(v)
}
Expand Down
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/postgresql/postgresql_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ func createIndexerDB(logger *logrus.Logger, readonly bool, cfg plugins.PluginCon
if err := cfg.UnmarshalConfig(&eCfg); err != nil {
return nil, nil, eCfg, fmt.Errorf("connect failure in unmarshalConfig: %v", err)
}
logger.Debugf("createIndexerDB: eCfg.Delete=%+v", eCfg.Delete)
logger.Debugf("createIndexerDB: eCfg.MaxConns=%d, eCfg.Test=%t, eCfg.Delete=%+v", eCfg.MaxConns, eCfg.Test, eCfg.Delete)

// Inject a dummy db for unit testing
dbName := "postgres"
if eCfg.Test {
dbName = "dummy"
}
var opts idb.IndexerDbOptions
opts.MaxConn = eCfg.MaxConn
opts.MaxConns = eCfg.MaxConns
opts.ReadOnly = readonly

// for some reason when ConnectionString is empty, it's automatically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ type ExporterConfig struct {
See https://github.com/jackc/pgconn for more details
*/
ConnectionString string `yaml:"connection-string"`
/* <code>max-conn</code> specifies the maximum connection number for the connection pool.<br/>
/* <code>max-conns</code> specifies the maximum connection number for the connection pool.<br/>
This means the total number of active queries that can be running concurrently can never be more than this.
*/
MaxConn uint32 `yaml:"max-conn"`

MaxConns int32 `yaml:"max-conns"`
/* <code>test</code> will replace an actual DB connection being created via the connection string,
with a mock DB for unit testing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestUnmarshalConfigsContainingDeleteTask(t *testing.T) {
pgsqlExp := postgresqlExporter{}
ecfg := ExporterConfig{
ConnectionString: "",
MaxConn: 0,
MaxConns: 0,
Test: true,
Delete: util.PruneConfigurations{
Rounds: 3000,
Expand All @@ -133,7 +133,7 @@ func TestUnmarshalConfigsContainingDeleteTask(t *testing.T) {
pgsqlExp := postgresqlExporter{}
cfg := ExporterConfig{
ConnectionString: "",
MaxConn: 0,
MaxConns: 0,
Test: true,
Delete: util.PruneConfigurations{},
}
Expand All @@ -152,7 +152,7 @@ func TestUnmarshalConfigsContainingDeleteTask(t *testing.T) {
pgsqlExp := postgresqlExporter{}
cfg := ExporterConfig{
ConnectionString: "",
MaxConn: 0,
MaxConns: 0,
Test: true,
Delete: util.PruneConfigurations{
Rounds: 1,
Expand Down
1 change: 1 addition & 0 deletions conduit/plugins/importers/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ import (
// Call package wide init function
_ "github.com/algorand/conduit/conduit/plugins/importers/algod"
_ "github.com/algorand/conduit/conduit/plugins/importers/filereader"
_ "github.com/algorand/conduit/conduit/plugins/importers/noop"
)
10 changes: 5 additions & 5 deletions conduit/plugins/importers/filereader/filereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ func init() {
}

func TestDefaults(t *testing.T) {
require.Equal(t, defaultEncodingFormat, filewriter.MessagepackFormat)
require.Equal(t, defaultIsGzip, true)
format, gzip, err := filewriter.ParseFilenamePattern(filewriter.FilePattern)
require.NoError(t, err)
require.Equal(t, format, defaultEncodingFormat)
require.Equal(t, gzip, defaultIsGzip)
}

func TestImporterorterMetadata(t *testing.T) {
Expand All @@ -64,9 +66,7 @@ func initializeTestData(t *testing.T, dir string, numRounds int) sdk.Genesis {
Timestamp: 1234,
}

genesisFilename := filewriter.GenesisFilename

err := filewriter.EncodeToFile(path.Join(dir, genesisFilename), genesisA, filewriter.JSONFormat, false)
err := filewriter.EncodeToFile(path.Join(dir, filewriter.GenesisFilename), genesisA, filewriter.JSONFormat, false)
require.NoError(t, err)

for i := 0; i < numRounds; i++ {
Expand Down
Loading