diff --git a/.gitignore b/.gitignore index e138c70f..5ed84a96 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,6 @@ coverage.txt # conduit example cmd/conduit/data + +# python virtualenv +.venv diff --git a/.goreleaser.yaml b/.goreleaser.yaml index a7612a90..6577d50c 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -1,5 +1,5 @@ env: - - DOCKER_NAME=algorand/conduit + - DOCKER_GITHUB_NAME=algorand/conduit before: hooks: @@ -37,8 +37,8 @@ dockers: goos: linux goarch: amd64 image_templates: - - "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-amd64" - - "{{ .Env.DOCKER_NAME }}:{{ .Version }}-amd64" + - "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-amd64" + - "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}-amd64" build_flag_templates: - --platform=linux/amd64 - --label=org.opencontainers.image.title={{ .ProjectName }} @@ -52,8 +52,8 @@ dockers: goos: linux goarch: arm64 image_templates: - - "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-arm64" - - "{{ .Env.DOCKER_NAME }}:{{ .Version }}-arm64" + - "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-arm64" + - "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}-arm64" build_flag_templates: - --platform=linux/arm64 - --label=org.opencontainers.image.title={{ .ProjectName }} @@ -66,14 +66,14 @@ dockers: # automatically select amd64/arm64 when using image. docker_manifests: - - name_template: "{{ .Env.DOCKER_NAME }}:{{ .Version }}" + - name_template: "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}" image_templates: - - "{{ .Env.DOCKER_NAME }}:{{ .Version }}-amd64" - - "{{ .Env.DOCKER_NAME }}:{{ .Version }}-arm64" - - name_template: "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}" + - "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}-amd64" + - "{{ .Env.DOCKER_GITHUB_NAME }}:{{ .Version }}-arm64" + - name_template: "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}" image_templates: - - "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-amd64" - - "{{ .Env.DOCKER_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-arm64" + - "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-amd64" + - "{{ .Env.DOCKER_GITHUB_NAME }}:latest{{ if .IsSnapshot }}-snapshot{{ end }}-arm64" archives: - name_template: >- @@ -94,3 +94,12 @@ changelog: - '^chore:' - '^docs:' - '^test:' + +release: + draft: true + header: | + ![GitHub Logo](https://raw.githubusercontent.com/algorand/go-algorand/master/release/release-banner.jpg) + footer: | + **Full Changelog**: https://github.com/{{ .Env.DOCKER_GITHUB_NAME }}/compare{{ .PreviousTag }}...{{ .Tag }} + --- + [Docker images for this release are available on Docker Hub.](https://hub.docker.com/r/algorand/conduit) diff --git a/conduit/pipeline/common.go b/conduit/pipeline/common.go index 4c31a3ff..d579a714 100644 --- a/conduit/pipeline/common.go +++ b/conduit/pipeline/common.go @@ -27,7 +27,7 @@ type pluginOutput interface { pluginInput | empty } -// Retries is a wrapper for retrying a function call f() with a cancellation context, +// retries is a wrapper for retrying a function call f() with a cancellation context, // a delay and a max retry count. It attempts to call the wrapped function at least once // and only after the first attempt will pay attention to a context cancellation. // This can allow the pipeline to receive a cancellation and guarantee attempting to finish @@ -45,7 +45,7 @@ type pluginOutput interface { // - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries // - when p.cfg.retryCount == 0, the error will be the last error encountered // - the returned duration dur is the total time spent in the function, including retries -func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) { +func retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) { start := time.Now() for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ { @@ -74,9 +74,9 @@ func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipe return } -// RetriesNoOutput applies the same logic as Retries, but for functions that return no output. -func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { - _, d, err := Retries(func(x X) (empty, error) { +// retriesNoOutput applies the same logic as Retries, but for functions that return no output. +func retriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { + _, d, err := retries(func(x X) (empty, error) { return empty{}, f(x) }, a, p, msg) return d, err diff --git a/conduit/pipeline/common_test.go b/conduit/pipeline/common_test.go index cd9dcd4a..164cdcb9 100644 --- a/conduit/pipeline/common_test.go +++ b/conduit/pipeline/common_test.go @@ -106,8 +106,8 @@ func TestRetries(t *testing.T) { for _, tc := range cases { tc := tc - // run cases for Retries() - t.Run("Retries() "+tc.name, func(t *testing.T) { + // run cases for retries() + t.Run("retries() "+tc.name, func(t *testing.T) { t.Parallel() ctx, ccf := context.WithCancelCause(context.Background()) p := &pipelineImpl{ @@ -127,7 +127,7 @@ func TestRetries(t *testing.T) { yChan := make(chan uint64) errChan := make(chan error) go func() { - y, _, err := Retries(succeedAfter, 0, p, "test") + y, _, err := retries(succeedAfter, 0, p, "test") yChan <- y errChan <- err }() @@ -144,7 +144,7 @@ func TestRetries(t *testing.T) { return } - y, _, err := Retries(succeedAfter, 0, p, "test") + y, _, err := retries(succeedAfter, 0, p, "test") if tc.retryCount == 0 { // WLOG tc.neverSucceed == false require.NoError(t, err, tc.name) @@ -163,8 +163,8 @@ func TestRetries(t *testing.T) { } }) - // run cases for RetriesNoOutput() - t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) { + // run cases for retriesNoOutput() + t.Run("retriesNoOutput() "+tc.name, func(t *testing.T) { t.Parallel() ctx, ccf := context.WithCancelCause(context.Background()) p := &pipelineImpl{ @@ -183,7 +183,7 @@ func TestRetries(t *testing.T) { errChan := make(chan error) go func() { - _, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") + _, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test") errChan <- err }() time.Sleep(5 * time.Millisecond) @@ -197,7 +197,7 @@ func TestRetries(t *testing.T) { return } - _, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") + _, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test") if tc.retryCount == 0 { // WLOG tc.neverSucceed == false require.NoError(t, err, tc.name) } else { // retryCount > 0 so doesn't retry forever diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 61e95da6..5a1adf43 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -481,7 +481,7 @@ func (p *pipelineImpl) importerHandler(importer importers.Importer, roundChan <- totalSelectWait += waitTime p.logger.Tracef("importer handler waited %dms to receive round %d", waitTime.Milliseconds(), rnd) - blkData, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name) + blkData, importTime, lastError := retries(importer.GetBlock, rnd, p, importer.Metadata().Name) if lastError != nil { p.cancelWithProblem(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError)) return @@ -533,7 +533,7 @@ func (p *pipelineImpl) processorHandler(idx int, proc processors.Processor, blkI var procTime time.Duration var lastError error - blk, procTime, lastError = Retries(proc.Process, blk, p, proc.Metadata().Name) + blk, procTime, lastError = retries(proc.Process, blk, p, proc.Metadata().Name) if lastError != nil { p.cancelWithProblem(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError)) return @@ -598,7 +598,7 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug } var exportTime time.Duration - exportTime, lastError = RetriesNoOutput(exporter.Receive, blk, p, eName) + exportTime, lastError = retriesNoOutput(exporter.Receive, blk, p, eName) if lastError != nil { lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRound, lastError) return @@ -640,16 +640,15 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug // WARNING: removing/re-log-levelling the following will BREAK: // - the E2E test (Search for "Pipeline round" in subslurp.py) // - the internal tools logstats collector (See func ConduitCollector in logstats.go of internal-tools repo) - p.logger.Infof(logstatsE2Elog(nextRound, lastRound, len(blk.Payset), exportTime)) + p.logger.Infof(logstatsE2Elog(lastRound, len(blk.Payset), exportTime)) } } }() } -func logstatsE2Elog(nextRound, lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string { +func logstatsE2Elog(lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string { return fmt.Sprintf( - "UPDATED Pipeline NextRound=%d. FINISHED Pipeline round r=%d (%d txn) exported in %s", - nextRound, + "FINISHED Pipeline round r=%d (%d txn) exported in %s", lastRound, topLevelTxnCount, exportTime, @@ -696,8 +695,6 @@ func (p *pipelineImpl) Start() { } } }(p.pipelineMetadata.NextRound) - - <-p.ctx.Done() } func (p *pipelineImpl) Wait() { diff --git a/conduit/pipeline/pipeline_bench_test.go b/conduit/pipeline/pipeline_bench_test.go index 5b48bc7f..f0aef54a 100644 --- a/conduit/pipeline/pipeline_bench_test.go +++ b/conduit/pipeline/pipeline_bench_test.go @@ -18,8 +18,8 @@ import ( ) const ( - logLevel = log.ErrorLevel // log.DebugLevel // log.InfoLevel // log.TraceLevel // - retryCount = 3 // math.MaxUint64 + logLevel = log.ErrorLevel + retryCount = 3 ) type sleepingImporter struct { diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index ea8db42c..eac08f84 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -990,13 +990,12 @@ func TestMetrics(t *testing.T) { } func TestLogStatsE2Elog(t *testing.T) { - nextRound := uint64(1337) round := uint64(42) numTxns := 13 duration := 12345600 * time.Microsecond - expectedLog := "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" - log := logstatsE2Elog(nextRound, round, numTxns, duration) + expectedLog := "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" + log := logstatsE2Elog(round, numTxns, duration) require.Equal(t, expectedLog, log) logstatsRex, err := regexp.Compile(`round r=(\d+) \((\d+) txn\) exported in (.*)`) diff --git a/conduit/plugins/exporters/filewriter/README.md b/conduit/plugins/exporters/filewriter/README.md index 524aef31..c91e5c6e 100644 --- a/conduit/plugins/exporters/filewriter/README.md +++ b/conduit/plugins/exporters/filewriter/README.md @@ -2,7 +2,7 @@ Write block data to files. This plugin works with the file rerader plugin to create a simple file-based pipeine. -The genesis is always exported to a plain JSON file named `genesis.json` regardless of the `FilenamePattern`. +The genesis file is always exported to a plain JSON file named `genesis.json` regardless of the `FilenamePattern`. ## Configuration diff --git a/conduit/plugins/exporters/filewriter/file_exporter_test.go b/conduit/plugins/exporters/filewriter/file_exporter_test.go index 3c12ead7..9ee054f6 100644 --- a/conduit/plugins/exporters/filewriter/file_exporter_test.go +++ b/conduit/plugins/exporters/filewriter/file_exporter_test.go @@ -49,8 +49,10 @@ func getConfigWithPattern(t *testing.T, pattern string) (config, tempdir string) } func TestDefaults(t *testing.T) { - require.Equal(t, defaultEncodingFormat, MessagepackFormat) - require.Equal(t, defaultIsGzip, true) + format, gzip, err := ParseFilenamePattern(FilePattern) + require.NoError(t, err) + require.Equal(t, format, defaultEncodingFormat) + require.Equal(t, gzip, defaultIsGzip) } func TestExporterMetadata(t *testing.T) { diff --git a/conduit/plugins/exporters/filewriter/util.go b/conduit/plugins/exporters/filewriter/util.go index a552baf4..0e787ecd 100644 --- a/conduit/plugins/exporters/filewriter/util.go +++ b/conduit/plugins/exporters/filewriter/util.go @@ -61,7 +61,7 @@ func ParseFilenamePattern(pattern string) (EncodingFormat, bool, error) { return blockFormat, gzip, nil } -// EncodeToFile enocods an object to a file using a given format and possible gzip compression. +// EncodeToFile encodes an object to a file using a given format and possible gzip compression. func EncodeToFile(filename string, v interface{}, format EncodingFormat, isGzip bool) error { file, err := os.Create(filename) if err != nil { @@ -91,7 +91,7 @@ func Encode(format EncodingFormat, writer io.Writer, v interface{}) error { case MessagepackFormat: handle = msgpack.LenientCodecHandle default: - return fmt.Errorf("EncodeToFile(): unhandled format %d", format) + return fmt.Errorf("Encode(): unhandled format %d", format) } return codec.NewEncoder(writer, handle).Encode(v) } diff --git a/conduit/plugins/exporters/postgresql/postgresql_exporter.go b/conduit/plugins/exporters/postgresql/postgresql_exporter.go index ad8ee82b..c701b6d1 100644 --- a/conduit/plugins/exporters/postgresql/postgresql_exporter.go +++ b/conduit/plugins/exporters/postgresql/postgresql_exporter.go @@ -59,7 +59,7 @@ func createIndexerDB(logger *logrus.Logger, readonly bool, cfg plugins.PluginCon if err := cfg.UnmarshalConfig(&eCfg); err != nil { return nil, nil, eCfg, fmt.Errorf("connect failure in unmarshalConfig: %v", err) } - logger.Debugf("createIndexerDB: eCfg.Delete=%+v", eCfg.Delete) + logger.Debugf("createIndexerDB: eCfg.MaxConns=%d, eCfg.Test=%t, eCfg.Delete=%+v", eCfg.MaxConns, eCfg.Test, eCfg.Delete) // Inject a dummy db for unit testing dbName := "postgres" @@ -67,7 +67,7 @@ func createIndexerDB(logger *logrus.Logger, readonly bool, cfg plugins.PluginCon dbName = "dummy" } var opts idb.IndexerDbOptions - opts.MaxConn = eCfg.MaxConn + opts.MaxConns = eCfg.MaxConns opts.ReadOnly = readonly // for some reason when ConnectionString is empty, it's automatically diff --git a/conduit/plugins/exporters/postgresql/postgresql_exporter_config.go b/conduit/plugins/exporters/postgresql/postgresql_exporter_config.go index 4ccb4530..6a0212bd 100644 --- a/conduit/plugins/exporters/postgresql/postgresql_exporter_config.go +++ b/conduit/plugins/exporters/postgresql/postgresql_exporter_config.go @@ -17,10 +17,11 @@ type ExporterConfig struct { See https://github.com/jackc/pgconn for more details */ ConnectionString string `yaml:"connection-string"` - /* max-conn specifies the maximum connection number for the connection pool.
+ /* max-conns specifies the maximum connection number for the connection pool.
This means the total number of active queries that can be running concurrently can never be more than this. */ - MaxConn uint32 `yaml:"max-conn"` + + MaxConns int32 `yaml:"max-conns"` /* test will replace an actual DB connection being created via the connection string, with a mock DB for unit testing. */ diff --git a/conduit/plugins/exporters/postgresql/postgresql_exporter_test.go b/conduit/plugins/exporters/postgresql/postgresql_exporter_test.go index c1bdd801..11c1b884 100644 --- a/conduit/plugins/exporters/postgresql/postgresql_exporter_test.go +++ b/conduit/plugins/exporters/postgresql/postgresql_exporter_test.go @@ -111,7 +111,7 @@ func TestUnmarshalConfigsContainingDeleteTask(t *testing.T) { pgsqlExp := postgresqlExporter{} ecfg := ExporterConfig{ ConnectionString: "", - MaxConn: 0, + MaxConns: 0, Test: true, Delete: util.PruneConfigurations{ Rounds: 3000, @@ -133,7 +133,7 @@ func TestUnmarshalConfigsContainingDeleteTask(t *testing.T) { pgsqlExp := postgresqlExporter{} cfg := ExporterConfig{ ConnectionString: "", - MaxConn: 0, + MaxConns: 0, Test: true, Delete: util.PruneConfigurations{}, } @@ -152,7 +152,7 @@ func TestUnmarshalConfigsContainingDeleteTask(t *testing.T) { pgsqlExp := postgresqlExporter{} cfg := ExporterConfig{ ConnectionString: "", - MaxConn: 0, + MaxConns: 0, Test: true, Delete: util.PruneConfigurations{ Rounds: 1, diff --git a/conduit/plugins/importers/all/all.go b/conduit/plugins/importers/all/all.go index 7222c549..a6450a47 100644 --- a/conduit/plugins/importers/all/all.go +++ b/conduit/plugins/importers/all/all.go @@ -4,4 +4,5 @@ import ( // Call package wide init function _ "github.com/algorand/conduit/conduit/plugins/importers/algod" _ "github.com/algorand/conduit/conduit/plugins/importers/filereader" + _ "github.com/algorand/conduit/conduit/plugins/importers/noop" ) diff --git a/conduit/plugins/importers/filereader/filereader_test.go b/conduit/plugins/importers/filereader/filereader_test.go index b7e7e1dd..09a1ff57 100644 --- a/conduit/plugins/importers/filereader/filereader_test.go +++ b/conduit/plugins/importers/filereader/filereader_test.go @@ -40,8 +40,10 @@ func init() { } func TestDefaults(t *testing.T) { - require.Equal(t, defaultEncodingFormat, filewriter.MessagepackFormat) - require.Equal(t, defaultIsGzip, true) + format, gzip, err := filewriter.ParseFilenamePattern(filewriter.FilePattern) + require.NoError(t, err) + require.Equal(t, format, defaultEncodingFormat) + require.Equal(t, gzip, defaultIsGzip) } func TestImporterorterMetadata(t *testing.T) { @@ -64,9 +66,7 @@ func initializeTestData(t *testing.T, dir string, numRounds int) sdk.Genesis { Timestamp: 1234, } - genesisFilename := filewriter.GenesisFilename - - err := filewriter.EncodeToFile(path.Join(dir, genesisFilename), genesisA, filewriter.JSONFormat, false) + err := filewriter.EncodeToFile(path.Join(dir, filewriter.GenesisFilename), genesisA, filewriter.JSONFormat, false) require.NoError(t, err) for i := 0; i < numRounds; i++ { diff --git a/conduit/plugins/importers/noop/noop_importer.go b/conduit/plugins/importers/noop/noop_importer.go new file mode 100644 index 00000000..8fb429e3 --- /dev/null +++ b/conduit/plugins/importers/noop/noop_importer.go @@ -0,0 +1,81 @@ +package noop + +import ( + "context" + _ "embed" // used to embed config + "fmt" + "time" + + "github.com/sirupsen/logrus" + + sdk "github.com/algorand/go-algorand-sdk/v2/types" + + "github.com/algorand/conduit/conduit/data" + "github.com/algorand/conduit/conduit/plugins" + "github.com/algorand/conduit/conduit/plugins/importers" +) + +// PluginName to use when configuring. +var PluginName = "noop" + +const sleepForGetBlock = 100 * time.Millisecond + +// `noopImporter`s will function without ever erroring. This means they will also process out of order blocks +// which may or may not be desirable for different use cases--it can hide errors in actual importers expecting in order +// block processing. +// The `noopImporter` will maintain `Round` state according to the round of the last block it processed. +// It also sleeps 100 milliseconds between blocks to slow down the pipeline. +type noopImporter struct { + round uint64 + cfg ImporterConfig +} + +//go:embed sample.yaml +var sampleConfig string + +var metadata = plugins.Metadata{ + Name: PluginName, + Description: "noop importer", + Deprecated: false, + SampleConfig: sampleConfig, +} + +func (imp *noopImporter) Metadata() plugins.Metadata { + return metadata +} + +func (imp *noopImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *logrus.Logger) error { + if err := cfg.UnmarshalConfig(&imp.cfg); err != nil { + return fmt.Errorf("init failure in unmarshalConfig: %v", err) + } + imp.round = imp.cfg.Round + return nil +} + +func (imp *noopImporter) Close() error { + return nil +} + +func (imp *noopImporter) GetGenesis() (*sdk.Genesis, error) { + return &sdk.Genesis{}, nil +} + +func (imp *noopImporter) GetBlock(rnd uint64) (data.BlockData, error) { + time.Sleep(sleepForGetBlock) + imp.round = rnd + return data.BlockData{ + BlockHeader: sdk.BlockHeader{ + Round: sdk.Round(rnd), + }, + }, nil +} + +func (imp *noopImporter) Round() uint64 { + return imp.round +} + +func init() { + importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer { + return &noopImporter{} + })) +} diff --git a/conduit/plugins/importers/noop/noop_importer_config.go b/conduit/plugins/importers/noop/noop_importer_config.go new file mode 100644 index 00000000..f49964e5 --- /dev/null +++ b/conduit/plugins/importers/noop/noop_importer_config.go @@ -0,0 +1,7 @@ +package noop + +// ImporterConfig specific to the noop importer +type ImporterConfig struct { + // Optionally specify the round to start on + Round uint64 `yaml:"round"` +} diff --git a/conduit/plugins/importers/noop/sample.yaml b/conduit/plugins/importers/noop/sample.yaml new file mode 100644 index 00000000..a4e99563 --- /dev/null +++ b/conduit/plugins/importers/noop/sample.yaml @@ -0,0 +1,3 @@ +name: noop +# noop has no config +config: diff --git a/e2e_tests/src/e2e_conduit/subslurp.py b/e2e_tests/src/e2e_conduit/subslurp.py index 796982ce..97598394 100644 --- a/e2e_tests/src/e2e_conduit/subslurp.py +++ b/e2e_tests/src/e2e_conduit/subslurp.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) # Matches conduit log output: -# "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" +# "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s" FINISH_ROUND: re.Pattern = re.compile(b"FINISHED Pipeline round r=(\d+)") diff --git a/examples/Makefile b/examples/Makefile new file mode 100644 index 00000000..30ac0406 --- /dev/null +++ b/examples/Makefile @@ -0,0 +1,10 @@ +CDATA = pypolars_data + +conduit: clean build + ../cmd/conduit/conduit -d $(CDATA) + +build: + cd .. && make + +clean: + rm -f $(CDATA)/metadata.json \ No newline at end of file diff --git a/go.mod b/go.mod index 92a65b89..88d87d4f 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/algorand/conduit go 1.20 +replace github.com/algorand/indexer/v3 => github.com/algorand/indexer/v3 v3.1.1-0.20230905234129-74a692078e66 + require ( github.com/algorand/go-algorand-sdk/v2 v2.2.0 github.com/algorand/go-codec/codec v1.1.10 diff --git a/go.sum b/go.sum index 14295a01..28ea684d 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ github.com/algorand/go-algorand-sdk/v2 v2.2.0 h1:zWwK+k/WArtZJUSkDXTDj4a0GUik2iO github.com/algorand/go-algorand-sdk/v2 v2.2.0/go.mod h1:+3+4EZmMUcQk6bgmtC5Ic5kKZE/g6SmfiW098tYLkPE= github.com/algorand/go-codec/codec v1.1.10 h1:zmWYU1cp64jQVTOG8Tw8wa+k0VfwgXIPbnDfiVa+5QA= github.com/algorand/go-codec/codec v1.1.10/go.mod h1:YkEx5nmr/zuCeaDYOIhlDg92Lxju8tj2d2NrYqP7g7k= -github.com/algorand/indexer/v3 v3.0.0 h1:FxQVt1KdwvJrKUAhJPeo+YAOygnJzgjKT8MUEawH+zc= -github.com/algorand/indexer/v3 v3.0.0/go.mod h1:P+RpgLu0lR/6RT8ZwspLHBNKVeAMzwRfCSMVsfiwf40= +github.com/algorand/indexer/v3 v3.1.1-0.20230905234129-74a692078e66 h1:Bl8WhnudNAoV+69+XWwadDRzPP+HY50qe2gbc9LRPAo= +github.com/algorand/indexer/v3 v3.1.1-0.20230905234129-74a692078e66/go.mod h1:DZ4i4kpH8CJDlK2bqugSfC+FbVMdf81go7nDsqJpchI= github.com/algorand/oapi-codegen v1.12.0-algorand.0 h1:W9PvED+wAJc+9EeXPONnA+0zE9UhynEqoDs4OgAxKhk= github.com/algorand/oapi-codegen v1.12.0-algorand.0/go.mod h1:tIWJ9K/qrLDVDt5A1p82UmxZIEGxv2X+uoujdhEAL48= github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= diff --git a/performance/.gitignore b/performance/.gitignore new file mode 100644 index 00000000..becaa2ad --- /dev/null +++ b/performance/.gitignore @@ -0,0 +1,3 @@ +conduit*.log +pg.log* +metadata.json \ No newline at end of file diff --git a/performance/Makefile b/performance/Makefile new file mode 100644 index 00000000..2e42763b --- /dev/null +++ b/performance/Makefile @@ -0,0 +1,121 @@ +random := $(shell echo $$RANDOM) +PGCONT = performance_pg +PGUSER = algorand +PGDB = performance_db # postgres +PGLOGS = pg.log +CONDUIT = ../cmd/conduit/conduit +PGCONN = postgresql://$(PGUSER):$(PGUSER)@localhost:65432/$(PGDB) + +echo: + @echo "random--->$(random)" + @echo "PGCONT--->$(PGCONT)" + @echo "PGUSER--->$(PGUSER)" + @echo "PGDB--->$(PGDB)" + @echo "PGLOGS--->$(PGLOGS)" + @echo "CONDUIT--->$(CONDUIT)" + @echo "PGCONN--->$(PGCONN)" + +nuke: pg-down clean + rm conduit_*.log || true + rm pg.log_* || true + +nuke-and-run: nuke perf-run + +nuke-and-debug: nuke pg-up + mv ../cmd/conduit/conduit.log "../cmd/conduit/conduit_$(random).log" || true + +perf-run: pg-up run-conduit + +clean: # pg-down should be called manually + rm conduit_data/metadata.json || true + +clean-go-cache: + cd .. && go clean -cache -testcache -modcache + +pg-up: save-logs + docker-compose up -d + sleep 5 + make pg-logs-tail + docker exec -it $(PGCONT) psql -U algorand -d postgres -c "create database $(PGDB);" + make pg-query QUERY="-c \"CREATE EXTENSION pg_stat_statements;\"" + +pg-down: + docker-compose down + +pg-logs-tail: + docker-compose logs -f > $(PGLOGS) & + +save-logs: + mv $(PGLOGS) "$(PGLOGS)_$(random)" || true + mv conduit.log "conduit_$(random).log" || true + +build: + cd .. && go mod tidy && make + +pg-enter: + docker exec -it $(PGCONT) psql -U algorand -d postgres + +run-conduit: build + $(CONDUIT) -d conduit_data + +# - for query hackery... prefer pg_stats.ipynb + +QUERY_COL = substring(trim(regexp_replace(regexp_replace(query, '--.*?$$', '', 'gn'), '\\s+', ' ', 'g')), 1, 100) AS query +TOTAL_SECS_COL = round((total_exec_time/1000)::numeric, 3) AS tot_s +MEAN_SECS_COL = round((mean_exec_time/1000)::numeric, 3) AS mean_s +MIN_SECS_COL = round((min_exec_time/1000)::numeric, 3) AS min_s +MAX_SECS_COL = round((max_exec_time/1000)::numeric, 3) AS max_s +CPU_COL = round((100 * total_exec_time / sum(total_exec_time::numeric) OVER ())::numeric, 2) AS \"cpu%\" +LIMIT = 15 + +define QUERY_TOTAL_TIME +-c "SELECT dbid, $(QUERY_COL), $(TOTAL_SECS_COL), calls, $(MEAN_SECS_COL), $(CPU_COL) \ +FROM pg_stat_statements \ +ORDER BY total_exec_time DESC \ +LIMIT $(LIMIT);" +endef + +define QUERY_SLOWEST +-c "SELECT dbid, $(QUERY_COL), calls, $(TOTAL_SECS_COL), $(MIN_SECS_COL), $(MAX_SECS_COL), $(MEAN_SECS_COL) \ +FROM pg_stat_statements \ +ORDER BY mean_exec_time DESC \ +LIMIT $(LIMIT);" +endef + +define QUERY_MEMHOG +-c "SELECT dbid, $(QUERY_COL), (shared_blks_hit+shared_blks_dirtied) as mem \ +FROM pg_stat_statements \ +ORDER BY (shared_blks_hit+shared_blks_dirtied) DESC \ +LIMIT $(LIMIT);" +endef + +QUERY := -c "SELECT * FROM pg_stat_statements LIMIT 0;" +pg-query: + psql $(PGCONN) $(QUERY) + + +pg-txn-stats: QUERY=-c "SELECT max(round) AS max_round, count(*) AS txn_count from txn;" +pg-txn-stats: pg-query + +pg-conn: QUERY= +pg-conn: pg-query + +pg-stats: + make pg-stats-1 + make pg-stats-2 + make pg-stats-3 + +pg-stats-1: QUERY=$(QUERY_TOTAL_TIME) +pg-stats-1: pg-query + +pg-stats-2: QUERY=$(QUERY_SLOWEST) +pg-stats-2: pg-query + +pg-stats-3: QUERY=$(QUERY_MEMHOG) +pg-stats-3: pg-query + +pg-blocking-vac: QUERY=-c "VACUUM FULL ANALYZE;" +pg-blocking-vac: pg-query + +pg-vac: QUERY=-c "VACUUM;" +pg-vac: pg-query \ No newline at end of file diff --git a/performance/conduit_data/conduit.yml b/performance/conduit_data/conduit.yml new file mode 100644 index 00000000..9fdf4ab8 --- /dev/null +++ b/performance/conduit_data/conduit.yml @@ -0,0 +1,28 @@ +log-level: debug +log-file: conduit.log +retry-count: 1 +retry-delay: "1s" +#pid-filepath: /path/to/pidfile +hide-banner: true +metrics: + mode: ON + addr: ":9999" + prefix: "conduit" +importer: + name: file_reader + config: + # assumes we run conduit inside of the `performance/` directory: + block-dir: "filereader_blocks" + filename-pattern: "%[1]d_block.msgp.gz" +processors: +exporter: + name: postgresql + config: + connection-string: "host=localhost user=algorand password=algorand dbname=performance_db port=65432 sslmode=disable" + max-conn: 20 +telemetry: + enabled: false + # uri: "" + # index: "" + # username: "" + # password: "" diff --git a/performance/docker-compose.yml b/performance/docker-compose.yml new file mode 100644 index 00000000..620c852a --- /dev/null +++ b/performance/docker-compose.yml @@ -0,0 +1,20 @@ +version: '3' + +services: + postgres: + image: postgres + container_name: performance_pg + ports: + - "65432:5432" + environment: + POSTGRES_PASSWORD: algorand + POSTGRES_USER: algorand + POSTGRES_DB: postgres + POSTGRES_HOST_AUTH_METHOD: trust + command: + -c shared_preload_libraries='pg_stat_statements' + # pg logs for the stress test with 10 blocks was 3 GB which + # interfered with performance measurements + # -c log_statement='all' + # -c log_duration=on + # -c log_min_duration_statement=0 diff --git a/performance/filereader_blocks/.gitignore b/performance/filereader_blocks/.gitignore new file mode 100644 index 00000000..694556cb --- /dev/null +++ b/performance/filereader_blocks/.gitignore @@ -0,0 +1 @@ +*.msgp.gz diff --git a/performance/filereader_blocks/genesis.json b/performance/filereader_blocks/genesis.json new file mode 100644 index 00000000..4d171267 --- /dev/null +++ b/performance/filereader_blocks/genesis.json @@ -0,0 +1,30 @@ +{ + "alloc": [ + { + "addr": "AEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAKE3PRHE", + "comment": "", + "state": { + "algo": 1000000000000 + } + }, + { + "addr": "AIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGFFWAF4", + "comment": "", + "state": { + "algo": 1000000000000 + } + }, + { + "addr": "AMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAANVWEXNA", + "comment": "", + "state": { + "algo": 1000000000000 + } + } + ], + "fees": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAVIOOBQA", + "id": "v1", + "network": "generated-network", + "proto": "future", + "rwd": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABFFF5B2Y" +} \ No newline at end of file diff --git a/performance/pg_stats.ipynb b/performance/pg_stats.ipynb new file mode 100644 index 00000000..24c78da6 --- /dev/null +++ b/performance/pg_stats.ipynb @@ -0,0 +1,3317 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Imports" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "import json\n", + "from pathlib import Path\n", + "import re\n", + "\n", + "import pandas as pd\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Constants" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CONDUIT_LOG=PosixPath('/Users/zeph/github/algorand/conduit/performance/conduit.log')\n" + ] + } + ], + "source": [ + "CONDUIT_LOG = Path.cwd() / \"conduit.log\"\n", + "\n", + "PGCONN = \"postgresql://algorand:algorand@localhost:65432/performance_db\"\n", + "QUERY_CHARS = 1000\n", + "LIMIT = 15\n", + "\n", + "# Query columns\n", + "QUERY_COL = f\"substring(trim(regexp_replace(regexp_replace(query, '--.*?$', '', 'gn'), '\\\\s+', ' ', 'g')), 1, {QUERY_CHARS}) AS query\"\n", + "TOTAL_SECS_COL = \"round((total_exec_time/1000)::numeric, 3) AS tot_s\"\n", + "MEAN_SECS_COL = \"round((mean_exec_time/1000)::numeric, 3) AS mean_s\"\n", + "MIN_SECS_COL = \"round((min_exec_time/1000)::numeric, 3) AS min_s\"\n", + "MAX_SECS_COL = \"round((max_exec_time/1000)::numeric, 3) AS max_s\"\n", + "CPU_COL = \"round((100 * total_exec_time / sum(total_exec_time::numeric) OVER ())::numeric, 2) AS cpu_pct\"\n", + "\n", + "# Queries\n", + "QUERY_TOTAL_TIME = f\"\"\"SELECT dbid, {QUERY_COL}, {TOTAL_SECS_COL}, calls, {MEAN_SECS_COL}, {CPU_COL}\n", + "FROM pg_stat_statements\n", + "ORDER BY total_exec_time DESC\n", + "LIMIT {LIMIT}\"\"\"\n", + "\n", + "QUERY_SLOWEST = f\"\"\"SELECT dbid, {QUERY_COL}, calls, {TOTAL_SECS_COL}, {MIN_SECS_COL}, {MAX_SECS_COL}, {MEAN_SECS_COL}\n", + "FROM pg_stat_statements\n", + "ORDER BY mean_exec_time DESC\n", + "LIMIT {LIMIT}\"\"\"\n", + "\n", + "QUERY_MEMHOG = f\"\"\"SELECT dbid, {QUERY_COL}, (shared_blks_hit+shared_blks_dirtied) as mem\n", + "FROM pg_stat_statements\n", + "ORDER BY (shared_blks_hit+shared_blks_dirtied) DESC\n", + "LIMIT {LIMIT}\"\"\"\n", + "\n", + "print(f\"{CONDUIT_LOG=}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Parse the log\n", + "\n", + "## Overall" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Start Time: 2023-08-31 13:14:22.006342-05:00\n", + "Finish Time: 2023-08-31 13:15:29.770026-05:00\n", + "Log Rounds: 10\n", + "Total Export Time: 0:01:07.763684\n", + "Mean Export Time: 6.7763684 seconds\n" + ] + } + ], + "source": [ + "with open(CONDUIT_LOG) as f:\n", + " log_content = f.read()\n", + "\n", + "lines = log_content.strip().split(\"\\n\")\n", + "\n", + "\n", + "# Regular expressions for extracting required data\n", + "start_time_pattern = re.compile(r'Block 1 read time')\n", + "finish_time_pattern = re.compile(r'round r=(\\d+) .* exported in')\n", + "time_pattern = re.compile(\n", + " r'(?P