diff --git a/replayor/.dockerignore b/replayor/.dockerignore index eb13b439..7b481709 100644 --- a/replayor/.dockerignore +++ b/replayor/.dockerignore @@ -1,3 +1,5 @@ geth-data/ geth-data-archive/ reth-data/ +reth-data-templates/ +multi-replayor/ diff --git a/replayor/.gitignore b/replayor/.gitignore index 1f2bc975..ffb52d91 100644 --- a/replayor/.gitignore +++ b/replayor/.gitignore @@ -11,6 +11,8 @@ reth-data/ tps.log test_tps.log genesis.json +/multi-replayor +/reth-data-templates # Environment configuration files (keep .example files) *.env diff --git a/replayor/Dockerfile b/replayor/Dockerfile index 6455f342..ec7ca09f 100644 --- a/replayor/Dockerfile +++ b/replayor/Dockerfile @@ -9,7 +9,8 @@ WORKDIR /app COPY go.mod go.sum ./ RUN go mod download -COPY . . +COPY ./cmd ./cmd +COPY ./packages ./packages RUN go build -o bin/replayor ./cmd/replayor/main.go WORKDIR /app @@ -25,4 +26,4 @@ WORKDIR /app COPY --from=builder /app/bin/replayor /bin/replayor -CMD ["/app/replayor.sh"] \ No newline at end of file +CMD ["/app/replayor.sh"] diff --git a/replayor/Makefile b/replayor/Makefile index 2c178af2..9c558726 100644 --- a/replayor/Makefile +++ b/replayor/Makefile @@ -17,6 +17,10 @@ build: go build ./cmd/replayor/main.go .PHONY: build +build-multi-replayor: + go build -o multi-replayor ./cmd/multi-replayor/main.go +.PHONY: build-multi-replayor + test: go test ./... .PHONY: test diff --git a/replayor/cmd/multi-replayor/common.go b/replayor/cmd/multi-replayor/common.go new file mode 100644 index 00000000..69b9bd53 --- /dev/null +++ b/replayor/cmd/multi-replayor/common.go @@ -0,0 +1,291 @@ +package main + +import ( + "fmt" + "io" + "os" + "path/filepath" + "regexp" + "sort" + "strconv" + "strings" + + "github.com/urfave/cli/v2" +) + +// Shared flags used by both run and gen-template commands +var ( + SourceNodeDataFlag = &cli.StringFlag{ + Name: "source-node-data", + Usage: "The source data directory to copy for each partition", + EnvVars: []string{"SOURCE_NODE_DATA"}, + } + SegmentsFlag = &cli.StringFlag{ + Name: "segments", + Usage: "Block number segments, e.g. --segments=0-10,10-20,20-30", + Required: true, + EnvVars: []string{"SEGMENTS"}, + } + WorkDirFlag = &cli.StringFlag{ + Name: "work-dir", + Usage: "Working directory to create partition directories", + Value: "./multi-replayor-work", + EnvVars: []string{"WORK_DIR"}, + } + RollupConfigPathFlag = &cli.StringFlag{ + Name: "rollup-config-path", + Usage: "Path to rollup.json file", + Value: "./rollup.json", + EnvVars: []string{"ROLLUP_CONFIG_PATH"}, + } + JwtSecretPathFlag = &cli.StringFlag{ + Name: "jwt-secret-path", + Usage: "Path to jwt.txt file", + Value: "./jwt.txt", + EnvVars: []string{"JWT_SECRET_PATH"}, + } + GenesisJsonPathFlag = &cli.StringFlag{ + Name: "genesis-json-path", + Usage: "Path to genesis.json file (optional)", + Value: "./genesis.json", + EnvVars: []string{"GENESIS_JSON_PATH"}, + } + ChainFlag = &cli.StringFlag{ + Name: "chain", + Usage: "empty or xlayer-testnet or xlayer-mainnet", + Value: "", + EnvVars: []string{"CHAIN"}, + } +) + +// SharedFlags returns the common flags used by multiple commands +func SharedFlags() []cli.Flag { + return []cli.Flag{ + SourceNodeDataFlag, + SegmentsFlag, + WorkDirFlag, + RollupConfigPathFlag, + JwtSecretPathFlag, + GenesisJsonPathFlag, + ChainFlag, + } +} + +// PartitionRange represents a block range for a partition +type PartitionRange struct { + ID int + StartBlock int + EndBlock int + BlockCount int +} + +// ParseSegments parses a segments string like "0-10,10-20,20-30" into []PartitionRange +func ParseSegments(segmentsStr string) ([]PartitionRange, error) { + parts := strings.Split(segmentsStr, ",") + if len(parts) == 0 { + return nil, fmt.Errorf("segments string is empty") + } + + ranges := make([]PartitionRange, 0, len(parts)) + for i, part := range parts { + part = strings.TrimSpace(part) + if part == "" { + continue + } + dashIdx := strings.Index(part, "-") + if dashIdx <= 0 { + return nil, fmt.Errorf("invalid segment format %q, expected start-end (e.g. 0-10)", part) + } + startStr := part[:dashIdx] + endStr := part[dashIdx+1:] + + startBlock, err := strconv.Atoi(startStr) + if err != nil { + return nil, fmt.Errorf("invalid start block in segment %q: %w", part, err) + } + endBlock, err := strconv.Atoi(endStr) + if err != nil { + return nil, fmt.Errorf("invalid end block in segment %q: %w", part, err) + } + if startBlock < 0 { + return nil, fmt.Errorf("start block must be non-negative in segment %q", part) + } + if endBlock <= startBlock { + return nil, fmt.Errorf("end block must be greater than start block in segment %q", part) + } + + ranges = append(ranges, PartitionRange{ + ID: i, + StartBlock: startBlock, + EndBlock: endBlock, + BlockCount: endBlock - startBlock, + }) + } + + if len(ranges) == 0 { + return nil, fmt.Errorf("no valid segments found") + } + + return ranges, nil +} + +// GetUniqueTemplateBlocks returns unique block numbers for template generation +// This includes all partition start blocks plus end blocks +func GetUniqueTemplateBlocks(ranges []PartitionRange) []int { + // Use a map to deduplicate + blockSet := make(map[int]bool) + for _, r := range ranges { + blockSet[r.StartBlock] = true + blockSet[r.EndBlock] = true + } + + // Convert to sorted slice + blocks := make([]int, 0, len(blockSet)) + for b := range blockSet { + blocks = append(blocks, b) + } + sort.Ints(blocks) + + return blocks +} + +// TemplateDir represents a template directory with its block number +type TemplateDir struct { + Path string + BlockNumber int +} + +// FindTemplateDirectories scans a directory for reth-{block} template directories +func FindTemplateDirectories(dir string) ([]TemplateDir, error) { + entries, err := os.ReadDir(dir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + + pattern := regexp.MustCompile(`^reth-(\d+)$`) + var templates []TemplateDir + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + matches := pattern.FindStringSubmatch(entry.Name()) + if matches == nil { + continue + } + + blockNum, err := strconv.Atoi(matches[1]) + if err != nil { + continue + } + + templates = append(templates, TemplateDir{ + Path: filepath.Join(dir, entry.Name()), + BlockNumber: blockNum, + }) + } + + // Sort by block number + sort.Slice(templates, func(i, j int) bool { + return templates[i].BlockNumber < templates[j].BlockNumber + }) + + return templates, nil +} + +// FindBestTemplate finds the smallest template directory with block number >= targetBlock +// Returns the template path, block number, and whether a template was found +func FindBestTemplate(templateDir string, targetBlock int) (string, int, bool) { + templates, err := FindTemplateDirectories(templateDir) + if err != nil || len(templates) == 0 { + return "", 0, false + } + + // Find the smallest template >= targetBlock + for _, t := range templates { + if t.BlockNumber >= targetBlock { + return t.Path, t.BlockNumber, true + } + } + + return "", 0, false +} + +// TemplateDataDir returns the path to the template data directory +func TemplateDataDir(workDir string, blockNumber int) string { + return filepath.Join(workDir, fmt.Sprintf("reth-%d", blockNumber)) +} + +// FileExists checks if a file exists +func FileExists(path string) bool { + _, err := os.Stat(path) + return err == nil +} + +// CopyFileIfExists copies a file from src to dst if it exists +func CopyFileIfExists(src, dst string) error { + if !FileExists(src) { + return nil // File doesn't exist, skip + } + + // Ensure destination directory exists + if err := os.MkdirAll(filepath.Dir(dst), 0755); err != nil { + return fmt.Errorf("failed to create destination directory: %w", err) + } + + srcFile, err := os.Open(src) + if err != nil { + return err + } + defer srcFile.Close() + + dstFile, err := os.OpenFile(dst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + defer dstFile.Close() + + _, err = io.Copy(dstFile, srcFile) + return err +} + +// CopyDirectory copies a directory recursively from src to dst +func CopyDirectory(src, dst string) error { + return filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + relPath, err := filepath.Rel(src, path) + if err != nil { + return err + } + + dstPath := filepath.Join(dst, relPath) + + if info.IsDir() { + return os.MkdirAll(dstPath, info.Mode()) + } + + // Copy file + srcFile, err := os.Open(path) + if err != nil { + return err + } + defer srcFile.Close() + + dstFile, err := os.OpenFile(dstPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, info.Mode()) + if err != nil { + return err + } + defer dstFile.Close() + + _, err = io.Copy(dstFile, srcFile) + return err + }) +} + diff --git a/replayor/cmd/multi-replayor/embed/Dockerfile b/replayor/cmd/multi-replayor/embed/Dockerfile new file mode 100644 index 00000000..6455f342 --- /dev/null +++ b/replayor/cmd/multi-replayor/embed/Dockerfile @@ -0,0 +1,28 @@ +FROM okexchain/go-builder:v1 AS builder + +RUN apt-get install -y wget \ + && wget -qO- https://get.docker.com | sh \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . +RUN go build -o bin/replayor ./cmd/replayor/main.go + +WORKDIR /app + +FROM okexchain/go-builder:v1 + +# Install certs +# RUN apk add --no-cache ca-certificates +# COPY ./oplabs.crt /usr/local/share/ca-certificates/oplabs.crt +# RUN chmod 644 /usr/local/share/ca-certificates/oplabs.crt && update-ca-certificates + +WORKDIR /app + +COPY --from=builder /app/bin/replayor /bin/replayor + +CMD ["/app/replayor.sh"] \ No newline at end of file diff --git a/replayor/cmd/multi-replayor/embed/docker-compose.yml b/replayor/cmd/multi-replayor/embed/docker-compose.yml new file mode 100644 index 00000000..1fb90b75 --- /dev/null +++ b/replayor/cmd/multi-replayor/embed/docker-compose.yml @@ -0,0 +1,71 @@ +networks: + default: + name: ${DOCKER_NETWORK:-dev-replayor} + +services: +# node: # this is Optimism's geth client w/ a patch to not reject txns as part of FCU +# build: ./geth +# ports: +# - 8545:8545 +# - 8551:8551 +# command: [ "/app/geth" ] +# volumes: +# - ./geth-data:/data +# - ./secret:/secret +# env_file: "test-configs/base-mainnet-replay.env" + node-unwind: # this is Optimism's geth client w/ a patch to not reject txns as part of FCU + image: "op-reth:latest" + ports: + - 9123:9123 + - 8553:8553 + environment: + - ENV_FILE=reth.docker.env + working_dir: /app + entrypoint: [ "bash", "/app/unwind.sh" ] + volumes: + - ./:/app + - ${GENESIS_JSON_PATH:-./genesis.json}:/app/genesis.json:ro + - ${ROLLUP_JSON_PATH:-./rollup.json}:/app/rollup.json:ro + - ${JWT_TXT_PATH:-./jwt.txt}:/app/jwt.txt:ro + - ${RETH_DATA_PATH:-./reth-data}:/app/reth-data + node-init: # this is Optimism's geth client w/ a patch to not reject txns as part of FCU + image: "op-reth:latest" + ports: + - 9123:9123 + - 8553:8553 + environment: + - ENV_FILE=reth.docker.env + working_dir: /app + entrypoint: [ "bash", "/app/op-reth-init.sh" ] + volumes: + - ./:/app + - ${GENESIS_JSON_PATH:-./genesis.json}:/app/genesis.json:ro + - ${ROLLUP_JSON_PATH:-./rollup.json}:/app/rollup.json:ro + - ${JWT_TXT_PATH:-./jwt.txt}:/app/jwt.txt:ro + - ${RETH_DATA_PATH:-./reth-data}:/app/reth-data + node: # this is Optimism's geth client w/ a patch to not reject txns as part of FCU + image: "op-reth:latest" + ports: + - 9123:9123 + - 8553:8553 + environment: + - ENV_FILE=reth.docker.env + working_dir: /app + entrypoint: [ "bash", "/app/reth.sh" ] + volumes: + - ./:/app + - ${GENESIS_JSON_PATH:-./genesis.json}:/app/genesis.json:ro + - ${ROLLUP_JSON_PATH:-./rollup.json}:/app/rollup.json:ro + - ${JWT_TXT_PATH:-./jwt.txt}:/app/jwt.txt:ro + - ${RETH_DATA_PATH:-./reth-data}:/app/reth-data + replayor: + image: "replayor:latest" + command: [ "bash", "/app/replayor.sh" ] + working_dir: /app + extra_hosts: + - "host.docker.internal:host-gateway" + environment: + - ENV_FILE=replayor.docker.env + volumes: + - ./:/app + - ${ROLLUP_JSON_PATH:-./rollup.json}:/app/rollup.json:ro diff --git a/replayor/cmd/multi-replayor/embed/replayor.docker.env.example b/replayor/cmd/multi-replayor/embed/replayor.docker.env.example new file mode 100644 index 00000000..6dd57e11 --- /dev/null +++ b/replayor/cmd/multi-replayor/embed/replayor.docker.env.example @@ -0,0 +1,36 @@ +# Replayor Configuration +# Copy this file and modify the values according to your setup + +# Path to the replayor binary +REPLAYOR_BINARY=replayor + +# Enable continuous mode (true/false) +CONTINUOUS_MODE=true + +# Engine API JWT secret (hex string) +ENGINE_API_SECRET=0c00f14247582fcd3c837311148cda1f56e7c2caa42fb1ba8a3cc7843603846e + +# Engine API URL (authenticated RPC endpoint) +ENGINE_API_URL=http://node:8553 + +# Execution client URL (HTTP RPC endpoint) +EXECUTION_URL=http://node:9123 + +# Source node URL (where to fetch blocks from) +SOURCE_NODE_URL=http://host.docker.internal:8123 + +# Strategy (replay, stress, etc.) +STRATEGY=replay + +# Path to rollup configuration file +ROLLUP_CONFIG_PATH=./rollup.json + +# Path to store results +DISK_PATH=./result + +# Storage type (disk, s3, etc.) +STORAGE_TYPE=disk + +# Number of blocks to process +BLOCK_COUNT=1 + diff --git a/replayor/cmd/multi-replayor/embed/replayor.sh b/replayor/cmd/multi-replayor/embed/replayor.sh new file mode 100755 index 00000000..6458972d --- /dev/null +++ b/replayor/cmd/multi-replayor/embed/replayor.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +set -e +set -x + +# Load environment variables from .env file +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ENV_FILE="${ENV_FILE:-${SCRIPT_DIR}/replayor.env}" + +if [ -f "$ENV_FILE" ]; then + echo "Loading configuration from $ENV_FILE" + set -a + source "$ENV_FILE" + set +a +else + echo "Warning: $ENV_FILE not found, using default values" +fi + +# Set default values if not provided in .env +REPLAYOR_BINARY="${REPLAYOR_BINARY:-replayor}" +ENGINE_API_SECRET="${ENGINE_API_SECRET:-0c00f14247582fcd3c837311148cda1f56e7c2caa42fb1ba8a3cc7843603846e}" +ENGINE_API_URL="${ENGINE_API_URL:-http://127.0.0.1:8553}" +EXECUTION_URL="${EXECUTION_URL:-http://127.0.0.1:9123}" +SOURCE_NODE_URL="${SOURCE_NODE_URL:-http://127.0.0.1:8123}" +STRATEGY="${STRATEGY:-replay}" +ROLLUP_CONFIG_PATH="${ROLLUP_CONFIG_PATH:-./rollup.json}" +DISK_PATH="${DISK_PATH:-./result}" +STORAGE_TYPE="${STORAGE_TYPE:-disk}" +BLOCK_COUNT="${BLOCK_COUNT:-1}" + +echo "Starting Replayor in $(pwd)" +echo "$CONTINUOUS_MODE" +if [ "$CONTINUOUS_MODE" = "true" ]; then + exec "$REPLAYOR_BINARY" \ + --engine-api-secret="$ENGINE_API_SECRET" \ + --engine-api-url="$ENGINE_API_URL" \ + --execution-url="$EXECUTION_URL" \ + --source-node-url="$SOURCE_NODE_URL" \ + --strategy="$STRATEGY" \ + --rollup-config-path="$ROLLUP_CONFIG_PATH" \ + --disk-path="$DISK_PATH" \ + --storage-type="$STORAGE_TYPE" \ + --log.level warn \ + --continuous +else + exec "$REPLAYOR_BINARY" \ + --engine-api-secret="$ENGINE_API_SECRET" \ + --engine-api-url="$ENGINE_API_URL" \ + --execution-url="$EXECUTION_URL" \ + --source-node-url="$SOURCE_NODE_URL" \ + --strategy="$STRATEGY" \ + --rollup-config-path="$ROLLUP_CONFIG_PATH" \ + --disk-path="$DISK_PATH" \ + --storage-type="$STORAGE_TYPE" \ + --log.level warn \ + --block-count="$BLOCK_COUNT" +fi + diff --git a/replayor/cmd/multi-replayor/embed/reth.docker.env.example b/replayor/cmd/multi-replayor/embed/reth.docker.env.example new file mode 100644 index 00000000..75450a1f --- /dev/null +++ b/replayor/cmd/multi-replayor/embed/reth.docker.env.example @@ -0,0 +1,43 @@ +# Reth Node Configuration +# Copy this file and modify the values according to your setup + +# Path to the reth binary (op-reth or reth) +#RETH_BINARY=/Users/oker/go/bin/code/ethereum/reth/target/debug/op-reth +RETH_BINARY=op-reth + +# Data directory for reth node +RETH_DATA_DIR=./reth-data + +# Chain configuration file (genesis.json or rollup.json) +#RETH_CHAIN=/Users/oker/go/bin/code/opstack-old/xlayer-toolkit-fusaka/devnet/config-op/genesis.json +RETH_CHAIN=./genesis.json + +# JWT secret file for authenticated RPC +#RETH_JWT_SECRET=/Users/oker/go/bin/code/opstack-old/xlayer-toolkit-fusaka/devnet/config-op/jwt.txt +RETH_JWT_SECRET=./jwt.txt + + +# HTTP RPC port +RETH_HTTP_PORT=9123 + +# WebSocket RPC port +RETH_WS_PORT=9124 + +# Authenticated RPC port (for consensus client communication) +RETH_AUTHRPC_PORT=8553 + +# P2P networking port +RETH_P2P_PORT=50505 + +# Verbosity level (-v, -vv, -vvv, -vvvv) +RETH_VERBOSITY=-vvvv + +# HTTP API modules (comma-separated) +RETH_HTTP_API=web3,debug,eth,txpool,net,miner,admin + +# WebSocket API modules (comma-separated) +RETH_WS_API=web3,debug,eth,txpool,net + +# Unwind target block (optional, used by unwind.sh) +UNWIND_TO_BLOCK=8602500 + diff --git a/replayor/cmd/multi-replayor/embed/reth.sh b/replayor/cmd/multi-replayor/embed/reth.sh new file mode 100755 index 00000000..50d4fc14 --- /dev/null +++ b/replayor/cmd/multi-replayor/embed/reth.sh @@ -0,0 +1,47 @@ +#!/bin/bash + +set -e + +# Load environment variables from .env file +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ENV_FILE="${ENV_FILE:-${SCRIPT_DIR}/reth.env}" + +if [ -f "$ENV_FILE" ]; then + echo "Loading configuration from $ENV_FILE" + set -a + source "$ENV_FILE" + set +a +else + echo "Warning: $ENV_FILE not found, using default values" +fi + +# Set default values if not provided in .env +RETH_BINARY="${RETH_BINARY:-op-reth}" +RETH_DATA_DIR="${RETH_DATA_DIR:-./reth-data}" +RETH_CHAIN="${RETH_CHAIN:-./rollup.json}" +RETH_JWT_SECRET="${RETH_JWT_SECRET:-./jwt.txt}" +RETH_HTTP_PORT="${RETH_HTTP_PORT:-9123}" +RETH_WS_PORT="${RETH_WS_PORT:-9124}" +RETH_AUTHRPC_PORT="${RETH_AUTHRPC_PORT:-8553}" +RETH_P2P_PORT="${RETH_P2P_PORT:-50505}" +RETH_VERBOSITY="${RETH_VERBOSITY:--vvvv}" +RETH_HTTP_API="${RETH_HTTP_API:-web3,debug,eth,txpool,net,miner,admin}" +RETH_WS_API="${RETH_WS_API:-web3,debug,eth,txpool,net}" + +exec "$RETH_BINARY" node --color=never -vv \ + --datadir="$RETH_DATA_DIR" \ + --chain="$RETH_CHAIN" \ + --http \ + --http.corsdomain=* \ + --http.port="$RETH_HTTP_PORT" \ + --http.addr=0.0.0.0 \ + --http.api="$RETH_HTTP_API" \ + --ws \ + --ws.addr=0.0.0.0 \ + --ws.port="$RETH_WS_PORT" \ + --ws.origins=* \ + --ws.api="$RETH_WS_API" \ + --authrpc.addr=0.0.0.0 \ + --authrpc.port="$RETH_AUTHRPC_PORT" \ + --port "$RETH_P2P_PORT" \ + --authrpc.jwtsecret="$RETH_JWT_SECRET" diff --git a/replayor/cmd/multi-replayor/embed/unwind.sh b/replayor/cmd/multi-replayor/embed/unwind.sh new file mode 100755 index 00000000..31f53c04 --- /dev/null +++ b/replayor/cmd/multi-replayor/embed/unwind.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +set -e +set -x + +# Load environment variables from .env file +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ENV_FILE="${ENV_FILE:-${SCRIPT_DIR}/reth.env}" + +if [ -f "$ENV_FILE" ]; then + echo "Loading configuration from $ENV_FILE" + set -a + source "$ENV_FILE" + set +a +else + echo "Warning: $ENV_FILE not found, using default values" +fi + +# Set default values if not provided in .env +RETH_BINARY="${RETH_BINARY:-op-reth}" +RETH_DATA_DIR="${RETH_DATA_DIR:-./reth-data}" +RETH_CHAIN="${RETH_CHAIN:-./rollup.json}" + +# Get target block from command line argument or environment variable +TARGET_BLOCK="${1:-${UNWIND_TO_BLOCK:-8594000}}" + +if [ -z "$TARGET_BLOCK" ]; then + echo "Error: No target block specified" + echo "Usage: $0 " + echo " or: UNWIND_TO_BLOCK= $0" + exit 1 +fi + +echo "Unwinding to block $TARGET_BLOCK" + +exec "$RETH_BINARY" stage unwind --color=never \ + --datadir="$RETH_DATA_DIR" \ + --chain="$RETH_CHAIN" \ + to-block "$TARGET_BLOCK" diff --git a/replayor/cmd/multi-replayor/gen.go b/replayor/cmd/multi-replayor/gen.go new file mode 100644 index 00000000..f13b2f54 --- /dev/null +++ b/replayor/cmd/multi-replayor/gen.go @@ -0,0 +1,365 @@ +package main + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "os/signal" + "path/filepath" + "strings" + "sync" + "syscall" + + oplog "github.com/ethereum-optimism/optimism/op-service/log" + "github.com/ethereum/go-ethereum/log" + "github.com/urfave/cli/v2" +) + +// GenTemplateCommand returns the gen-template subcommand +func GenTemplateCommand() *cli.Command { + flags := SharedFlags() + flags = append(flags, oplog.CLIFlags("MULTI_REPLAYOR")...) + + return &cli.Command{ + Name: "gen-template", + Usage: "Generate reth data templates for each partition start block", + Flags: flags, + Action: func(cliCtx *cli.Context) error { + return genTemplateAction(cliCtx) + }, + } +} + +func genTemplateAction(cliCtx *cli.Context) error { + logger := oplog.NewLogger(oplog.AppOut(cliCtx), oplog.ReadCLIConfig(cliCtx)) + + sourceNodeData := cliCtx.String("source-node-data") + segmentsStr := cliCtx.String("segments") + workDir := cliCtx.String("work-dir") + rollupConfigPath := cliCtx.String("rollup-config-path") + jwtSecretPath := cliCtx.String("jwt-secret-path") + genesisJsonPath := cliCtx.String("genesis-json-path") + chain := cliCtx.String("chain") + + // Validate inputs + if sourceNodeData == "" { + return fmt.Errorf("--source-node-data is required") + } + + // Parse segments + segments, err := ParseSegments(segmentsStr) + if err != nil { + return fmt.Errorf("invalid --segments: %w", err) + } + + // Validate source data directory + if _, err := os.Stat(sourceNodeData); os.IsNotExist(err) { + return fmt.Errorf("source-node-data directory does not exist: %s", sourceNodeData) + } + + // Calculate unique template blocks + templateBlocks := GetUniqueTemplateBlocks(segments) + + logger.Info("Starting template generation", + "source-node-data", sourceNodeData, + "segments", segmentsStr, + "work-dir", workDir, + "template-blocks", templateBlocks) + + // Create work directory + if err := os.MkdirAll(workDir, 0755); err != nil { + return fmt.Errorf("failed to create work directory: %w", err) + } + + // Create context for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-sigChan + logger.Info("Received shutdown signal, stopping template generation") + cancel() + }() + + // Generate templates in parallel + var wg sync.WaitGroup + var mu sync.Mutex + var errors []error + + for i, blockNum := range templateBlocks { + wg.Add(1) + go func(idx int, targetBlock int) { + defer wg.Done() + + logger.Info("Generating template", + "index", idx, + "target-block", targetBlock) + + err := generateTemplate(ctx, logger, genTemplateConfig{ + workDir: workDir, + templateID: idx, + sourceNodeData: sourceNodeData, + targetBlock: targetBlock, + rollupConfigPath: rollupConfigPath, + jwtSecretPath: jwtSecretPath, + genesisJsonPath: genesisJsonPath, + chain: chain, + }) + if err != nil { + mu.Lock() + errors = append(errors, fmt.Errorf("template %d (block %d) failed: %w", idx, targetBlock, err)) + mu.Unlock() + logger.Error("Template generation failed", + "index", idx, + "target-block", targetBlock, + "error", err) + } else { + logger.Info("Template generated successfully", + "index", idx, + "target-block", targetBlock) + } + }(i, blockNum) + } + + // Wait for all template generations to complete + wg.Wait() + + if len(errors) > 0 { + logger.Error("Some template generations failed", "error-count", len(errors)) + for _, err := range errors { + logger.Error("Error", "error", err) + } + return fmt.Errorf("%d template generation(s) failed", len(errors)) + } + + logger.Info("All templates generated successfully", + "template-count", len(templateBlocks), + "work-dir", workDir) + return nil +} + +type genTemplateConfig struct { + workDir string + templateID int + sourceNodeData string + targetBlock int + rollupConfigPath string + jwtSecretPath string + genesisJsonPath string + chain string +} + +func generateTemplate(ctx context.Context, logger log.Logger, cfg genTemplateConfig) error { + // Template directory is named reth-{blockNumber} + templateDir := TemplateDataDir(cfg.workDir, cfg.targetBlock) + + // Check if template already exists + if FileExists(templateDir) { + logger.Info("Template already exists, skipping", + "template-id", cfg.templateID, + "target-block", cfg.targetBlock, + "path", templateDir) + return nil + } + + // Create a temporary working directory for this template generation + tempDir := filepath.Join(cfg.workDir, fmt.Sprintf(".gen-template-%d", cfg.templateID)) + if err := os.MkdirAll(tempDir, 0755); err != nil { + return fmt.Errorf("failed to create temp directory: %w", err) + } + defer os.RemoveAll(tempDir) // Clean up temp directory when done + + // Calculate ports (unique per template to avoid conflicts) + authrpcPort := 17700 + cfg.templateID + wsPort := 17800 + cfg.templateID + httpPort := 17900 + cfg.templateID + + logger.Info("Setting up template generation", + "template-id", cfg.templateID, + "target-block", cfg.targetBlock, + "temp-dir", tempDir, + "authrpc-port", authrpcPort) + + // Copy source data to temp directory + tempDataDir := filepath.Join(tempDir, "reth-data") + if err := CopyDirectory(cfg.sourceNodeData, tempDataDir); err != nil { + return fmt.Errorf("failed to copy source data: %w", err) + } + + // Copy required configuration files + rollupExists := FileExists(cfg.rollupConfigPath) + if rollupExists { + if err := CopyFileIfExists(cfg.rollupConfigPath, filepath.Join(tempDir, "rollup.json")); err != nil { + return fmt.Errorf("failed to copy rollup.json: %w", err) + } + } + + jwtExists := FileExists(cfg.jwtSecretPath) + if jwtExists { + if err := CopyFileIfExists(cfg.jwtSecretPath, filepath.Join(tempDir, "jwt.txt")); err != nil { + return fmt.Errorf("failed to copy jwt.txt: %w", err) + } + } + + genesisExists := FileExists(cfg.genesisJsonPath) + if genesisExists { + if err := CopyFileIfExists(cfg.genesisJsonPath, filepath.Join(tempDir, "genesis.json")); err != nil { + logger.Warn("failed to copy genesis.json", "error", err) + genesisExists = false + } + } + + if !rollupExists { + return fmt.Errorf("rollup.json not found at %s", cfg.rollupConfigPath) + } + if !jwtExists { + return fmt.Errorf("jwt.txt not found at %s", cfg.jwtSecretPath) + } + if !genesisExists { + logger.Warn("genesis.json not found, continuing without it", "path", cfg.genesisJsonPath) + } + + // Generate configuration files for unwind + if err := generateTemplateConfigFiles(tempDir, cfg.templateID, cfg.targetBlock, authrpcPort, wsPort, httpPort, genesisExists, cfg.chain); err != nil { + return fmt.Errorf("failed to generate config files: %w", err) + } + + // Setup log file + unwindLog, err := os.Create(filepath.Join(tempDir, "unwind.log")) + if err != nil { + return fmt.Errorf("failed to create unwind.log: %w", err) + } + defer unwindLog.Close() + + // Run unwind + logger.Info("Running unwind for template", + "template-id", cfg.templateID, + "target-block", cfg.targetBlock) + + unwindCmd := exec.CommandContext(ctx, "docker", "compose", "-f", "docker-compose.yml", "up", "node-unwind") + unwindCmd.Dir = tempDir + unwindCmd.Stdout = io.MultiWriter(unwindLog, os.Stdout) + unwindCmd.Stderr = io.MultiWriter(unwindLog, os.Stderr) + unwindCmdErr := unwindCmd.Run() + + // Check container exit code + unwindExitCode, unwindCheckErr := checkContainerExitCode(ctx, tempDir, "node-unwind", cfg.templateID, logger) + if unwindCheckErr == nil { + if unwindExitCode != 0 { + logger.Error("node-unwind container exited with non-zero code", + "template-id", cfg.templateID, + "exit-code", unwindExitCode) + return fmt.Errorf("template %d: node-unwind container exited with status %d", cfg.templateID, unwindExitCode) + } + logger.Info("node-unwind container exited successfully", + "template-id", cfg.templateID, + "exit-code", unwindExitCode) + } else { + logger.Warn("Failed to check container exit code", + "template-id", cfg.templateID, + "error", unwindCheckErr) + } + + if unwindCmdErr != nil { + if err := checkCommandExitStatus(unwindCmdErr, fmt.Sprintf("template-%d node-unwind", cfg.templateID)); err != nil { + logger.Error("node-unwind command failed", + "template-id", cfg.templateID, + "error", err) + return fmt.Errorf("template %d: node-unwind failed: %w", cfg.templateID, err) + } + } + + // Clean up docker compose resources + stopCmd := exec.CommandContext(ctx, "docker", "compose", "-f", "docker-compose.yml", "down") + stopCmd.Dir = tempDir + stopCmd.Run() // Ignore errors on cleanup + + // Move the reth-data to the final template directory + if err := os.Rename(tempDataDir, templateDir); err != nil { + // If rename fails (e.g., cross-device), try copy and delete + if err := CopyDirectory(tempDataDir, templateDir); err != nil { + return fmt.Errorf("failed to move reth-data to template directory: %w", err) + } + os.RemoveAll(tempDataDir) + } + + logger.Info("Template generated", + "template-id", cfg.templateID, + "target-block", cfg.targetBlock, + "path", templateDir) + + return nil +} + +func generateTemplateConfigFiles(tempDir string, templateID int, targetBlock, authrpcPort, wsPort, httpPort int, genesisExists bool, chain string) error { + // Generate docker-compose.yml + dockerComposePath := filepath.Join(tempDir, "docker-compose.yml") + dockerComposeContent := dockerComposeTemplate + + // Replace ports + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "9123:9123", fmt.Sprintf("%d:%d", httpPort, httpPort)) + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "8553:8553", fmt.Sprintf("%d:%d", authrpcPort, authrpcPort)) + + // Replace network name + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${DOCKER_NETWORK:-dev-replayor}", fmt.Sprintf("gen-template-%d", templateID)) + + // Handle genesis.json + if !genesisExists { + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, " - ${GENESIS_JSON_PATH:-./genesis.json}:/app/genesis.json:ro\n", "") + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "- ${GENESIS_JSON_PATH:-./genesis.json}:/app/genesis.json:ro", "") + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${GENESIS_JSON_PATH:-./genesis.json}", "") + } else { + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${GENESIS_JSON_PATH:-./genesis.json}", "./genesis.json") + } + + // Ensure other paths are relative + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${ROLLUP_JSON_PATH:-./rollup.json}", "./rollup.json") + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${JWT_TXT_PATH:-./jwt.txt}", "./jwt.txt") + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${RETH_DATA_PATH:-./reth-data}", "./reth-data") + + if err := os.WriteFile(dockerComposePath, []byte(dockerComposeContent), 0644); err != nil { + return fmt.Errorf("failed to write docker-compose.yml: %w", err) + } + + // Generate reth.docker.env + rethEnvPath := filepath.Join(tempDir, "reth.docker.env") + rethEnvContent := strings.ReplaceAll(rethDockerEnvTemplate, "RETH_HTTP_PORT=9123", fmt.Sprintf("RETH_HTTP_PORT=%d", httpPort)) + rethEnvContent = strings.ReplaceAll(rethEnvContent, "RETH_WS_PORT=9124", fmt.Sprintf("RETH_WS_PORT=%d", wsPort)) + rethEnvContent = strings.ReplaceAll(rethEnvContent, "RETH_AUTHRPC_PORT=8553", fmt.Sprintf("RETH_AUTHRPC_PORT=%d", authrpcPort)) + rethEnvContent = strings.ReplaceAll(rethEnvContent, "RETH_DATA_DIR=./reth-data", "RETH_DATA_DIR=./reth-data") + if chain == "xlayer-testnet" || chain == "xlayer-mainnet" { + rethEnvContent = strings.ReplaceAll(rethEnvContent, "RETH_CHAIN=./genesis.json", fmt.Sprintf("RETH_CHAIN=%s", chain)) + } + // Set unwind target block + rethEnvContent = strings.ReplaceAll(rethEnvContent, "UNWIND_TO_BLOCK=8602500", fmt.Sprintf("UNWIND_TO_BLOCK=%d", targetBlock)) + if err := os.WriteFile(rethEnvPath, []byte(rethEnvContent), 0644); err != nil { + return fmt.Errorf("failed to write reth.docker.env: %w", err) + } + + // Copy shell scripts + scripts := map[string]string{ + "unwind.sh": unwindShTemplate, + "reth.sh": rethShTemplate, + "replayor.sh": replayorShTemplate, + } + + for name, content := range scripts { + scriptPath := filepath.Join(tempDir, name) + if err := os.WriteFile(scriptPath, []byte(content), 0755); err != nil { + return fmt.Errorf("failed to write %s: %w", name, err) + } + } + + // Copy Dockerfile + dockerfilePath := filepath.Join(tempDir, "Dockerfile") + if err := os.WriteFile(dockerfilePath, []byte(dockerfileTemplate), 0644); err != nil { + return fmt.Errorf("failed to write Dockerfile: %w", err) + } + + return nil +} diff --git a/replayor/cmd/multi-replayor/main.go b/replayor/cmd/multi-replayor/main.go new file mode 100644 index 00000000..74455b3b --- /dev/null +++ b/replayor/cmd/multi-replayor/main.go @@ -0,0 +1,56 @@ +package main + +import ( + _ "embed" + "fmt" + "os" + + oplog "github.com/ethereum-optimism/optimism/op-service/log" + "github.com/ethereum/go-ethereum/log" + "github.com/urfave/cli/v2" +) + +var ( + Version = "v0.0.1" + GitCommit = "" + GitDate = "" +) + +//go:embed embed/docker-compose.yml +var dockerComposeTemplate string + +//go:embed embed/replayor.docker.env.example +var replayorDockerEnvTemplate string + +//go:embed embed/reth.docker.env.example +var rethDockerEnvTemplate string + +//go:embed embed/unwind.sh +var unwindShTemplate string + +//go:embed embed/reth.sh +var rethShTemplate string + +//go:embed embed/replayor.sh +var replayorShTemplate string + +//go:embed embed/Dockerfile +var dockerfileTemplate string + +func main() { + oplog.SetupDefaults() + app := cli.NewApp() + app.Version = fmt.Sprintf("%s-%s-%s", Version, GitCommit, GitDate) + app.Name = "multi-replayor" + app.Description = "Utility to schedule and run multiple replayor instances in parallel using docker-compose" + app.Commands = []*cli.Command{ + RunCommand(), + GenTemplateCommand(), + } + + err := app.Run(os.Args) + if err != nil { + log.Crit("Application failed", "message", err) + os.Exit(1) + } +} diff --git a/replayor/cmd/multi-replayor/run.go b/replayor/cmd/multi-replayor/run.go new file mode 100644 index 00000000..5d9f8542 --- /dev/null +++ b/replayor/cmd/multi-replayor/run.go @@ -0,0 +1,632 @@ +package main + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "os/signal" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + oplog "github.com/ethereum-optimism/optimism/op-service/log" + "github.com/ethereum/go-ethereum/log" + "github.com/urfave/cli/v2" +) + +// RunCommand returns the run subcommand +func RunCommand() *cli.Command { + flags := []cli.Flag{ + &cli.StringFlag{ + Name: "source-node-url", + Usage: "The URL of the source node to fetch transactions from", + Required: true, + EnvVars: []string{"SOURCE_NODE_URL"}, + }, + &cli.StringFlag{ + Name: "template-dir", + Usage: "Directory containing reth-{block} template directories (defaults to work-dir)", + EnvVars: []string{"TEMPLATE_DIR"}, + }, + &cli.StringFlag{ + Name: "reth-image", + Usage: "Docker image for op-reth node", + Value: "op-reth:latest", + EnvVars: []string{"RETH_IMAGE"}, + }, + } + flags = append(flags, SharedFlags()...) + flags = append(flags, oplog.CLIFlags("MULTI_REPLAYOR")...) + + return &cli.Command{ + Name: "run", + Usage: "Run multiple replayor instances in parallel", + Flags: flags, + Action: func(cliCtx *cli.Context) error { + return runAction(cliCtx) + }, + } +} + +func runAction(cliCtx *cli.Context) error { + logger := oplog.NewLogger(oplog.AppOut(cliCtx), oplog.ReadCLIConfig(cliCtx)) + + sourceNodeUrl := cliCtx.String("source-node-url") + sourceNodeData := cliCtx.String("source-node-data") + segmentsStr := cliCtx.String("segments") + workDir := cliCtx.String("work-dir") + templateDir := cliCtx.String("template-dir") + rethImage := cliCtx.String("reth-image") + rollupConfigPath := cliCtx.String("rollup-config-path") + jwtSecretPath := cliCtx.String("jwt-secret-path") + genesisJsonPath := cliCtx.String("genesis-json-path") + chain := cliCtx.String("chain") + + // Use work-dir as template-dir if not specified + if templateDir == "" { + templateDir = workDir + } + + // Parse segments + ranges, err := ParseSegments(segmentsStr) + if err != nil { + return fmt.Errorf("invalid --segments: %w", err) + } + + // Check if we have templates or source-node-data + templates, _ := FindTemplateDirectories(templateDir) + hasTemplates := len(templates) > 0 + + if !hasTemplates && sourceNodeData == "" { + return fmt.Errorf("either --source-node-data or templates in --template-dir must be provided") + } + + if sourceNodeData != "" { + if _, err := os.Stat(sourceNodeData); os.IsNotExist(err) { + return fmt.Errorf("source-node-data directory does not exist: %s", sourceNodeData) + } + } + + logger.Info("Starting multi-replayor", + "source-node-url", sourceNodeUrl, + "source-node-data", sourceNodeData, + "template-dir", templateDir, + "has-templates", hasTemplates, + "segments", segmentsStr, + "work-dir", workDir) + + logger.Info("Parsed segments", + "segment-count", len(ranges)) + + // Create work directory + if err := os.MkdirAll(workDir, 0755); err != nil { + return fmt.Errorf("failed to create work directory: %w", err) + } + + // Create context for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + go func() { + <-sigChan + logger.Info("Received shutdown signal, stopping all docker-compose instances") + cancel() + }() + + // Start all partitions + var wg sync.WaitGroup + var mu sync.Mutex + var errors []error + + for _, r := range ranges { + wg.Add(1) + go func(pr PartitionRange) { + defer wg.Done() + + logger.Info("Starting partition", + "partition", pr.ID, + "start-block", pr.StartBlock, + "end-block", pr.EndBlock, + "block-count", pr.BlockCount) + + err := runPartition(ctx, logger, runPartitionConfig{ + workDir: workDir, + templateDir: templateDir, + partitionID: pr.ID, + sourceNodeUrl: sourceNodeUrl, + sourceNodeData: sourceNodeData, + startBlock: pr.StartBlock, + blockCount: pr.BlockCount, + rollupConfigPath: rollupConfigPath, + jwtSecretPath: jwtSecretPath, + genesisJsonPath: genesisJsonPath, + chain: chain, + rethImage: rethImage, + }) + if err != nil { + mu.Lock() + errors = append(errors, fmt.Errorf("partition %d failed: %w", pr.ID, err)) + mu.Unlock() + logger.Error("Partition failed", + "partition", pr.ID, + "error", err) + } else { + logger.Info("Partition completed", + "partition", pr.ID) + } + }(r) + } + + // Wait for all partitions to complete + wg.Wait() + + if len(errors) > 0 { + logger.Error("Some partitions failed", "error-count", len(errors)) + for _, err := range errors { + logger.Error("Error", "error", err) + } + return fmt.Errorf("%d partition(s) failed", len(errors)) + } + + logger.Info("All partitions completed successfully") + return nil +} + +type runPartitionConfig struct { + workDir string + templateDir string + partitionID int + sourceNodeUrl string + sourceNodeData string + startBlock int + blockCount int + rollupConfigPath string + jwtSecretPath string + genesisJsonPath string + chain string + rethImage string +} + +func runPartition(ctx context.Context, logger log.Logger, cfg runPartitionConfig) error { + partitionDir := filepath.Join(cfg.workDir, fmt.Sprintf("multi-replayor-%d", cfg.partitionID)) + + // Create partition directory + if err := os.MkdirAll(partitionDir, 0755); err != nil { + return fmt.Errorf("failed to create partition directory: %w", err) + } + + // Calculate ports + authrpcPort := 16700 + cfg.partitionID + wsPort := 16800 + cfg.partitionID + httpPort := 16900 + cfg.partitionID + + logger.Info("Setting up partition", + "partition", cfg.partitionID, + "directory", partitionDir, + "authrpc-port", authrpcPort, + "ws-port", wsPort, + "http-port", httpPort) + + // Determine data source: template or source-node-data + destDataDir := filepath.Join(partitionDir, "reth-data") + var unwindToBlock int + var needsUnwind bool + + // Try to find a suitable template + templatePath, templateBlock, foundTemplate := FindBestTemplate(cfg.templateDir, cfg.startBlock) + if foundTemplate { + logger.Info("Found template for partition", + "partition", cfg.partitionID, + "template-path", templatePath, + "template-block", templateBlock, + "target-block", cfg.startBlock) + + // Copy from template + if err := CopyDirectory(templatePath, destDataDir); err != nil { + return fmt.Errorf("failed to copy template data: %w", err) + } + + // Check if we need additional unwind + if templateBlock > cfg.startBlock { + needsUnwind = true + unwindToBlock = cfg.startBlock + logger.Info("Template block > target, will unwind", + "partition", cfg.partitionID, + "from-template-block", templateBlock, + "to-target-block", cfg.startBlock) + } else { + needsUnwind = false + logger.Info("Template block matches target, no unwind needed", + "partition", cfg.partitionID) + } + } else { + // No template found, use source-node-data + if cfg.sourceNodeData == "" { + return fmt.Errorf("no suitable template found and source-node-data not provided") + } + + logger.Info("No template found, using source-node-data", + "partition", cfg.partitionID, + "source", cfg.sourceNodeData) + + if err := CopyDirectory(cfg.sourceNodeData, destDataDir); err != nil { + return fmt.Errorf("failed to copy source data: %w", err) + } + + needsUnwind = true + unwindToBlock = cfg.startBlock + } + + // Copy required configuration files and track which ones exist + rollupExists := FileExists(cfg.rollupConfigPath) + if rollupExists { + if err := CopyFileIfExists(cfg.rollupConfigPath, filepath.Join(partitionDir, "rollup.json")); err != nil { + return fmt.Errorf("failed to copy rollup.json: %w", err) + } + } + + jwtExists := FileExists(cfg.jwtSecretPath) + if jwtExists { + if err := CopyFileIfExists(cfg.jwtSecretPath, filepath.Join(partitionDir, "jwt.txt")); err != nil { + return fmt.Errorf("failed to copy jwt.txt: %w", err) + } + } + + genesisExists := FileExists(cfg.genesisJsonPath) + if genesisExists { + if err := CopyFileIfExists(cfg.genesisJsonPath, filepath.Join(partitionDir, "genesis.json")); err != nil { + logger.Warn("failed to copy genesis.json", "error", err) + genesisExists = false + } + } + + if !rollupExists { + return fmt.Errorf("rollup.json not found at %s", cfg.rollupConfigPath) + } + if !jwtExists { + return fmt.Errorf("jwt.txt not found at %s", cfg.jwtSecretPath) + } + if !genesisExists { + logger.Warn("genesis.json not found, continuing without it", "path", cfg.genesisJsonPath) + } + + // Generate configuration files + if err := generateConfigFiles(partitionDir, cfg.partitionID, cfg.sourceNodeUrl, cfg.startBlock, cfg.blockCount, authrpcPort, wsPort, httpPort, genesisExists, cfg.chain, cfg.rethImage); err != nil { + return fmt.Errorf("failed to generate config files: %w", err) + } + + // Setup log files + unwindLog, err := os.Create(filepath.Join(partitionDir, "unwind.log")) + if err != nil { + return fmt.Errorf("failed to create unwind.log: %w", err) + } + defer unwindLog.Close() + + nodeLog, err := os.Create(filepath.Join(partitionDir, "node.log")) + if err != nil { + return fmt.Errorf("failed to create node.log: %w", err) + } + defer nodeLog.Close() + + replayorLog, err := os.Create(filepath.Join(partitionDir, "replayor.log")) + if err != nil { + return fmt.Errorf("failed to create replayor.log: %w", err) + } + defer replayorLog.Close() + + // Step 1: Run node-unwind (if needed) + if needsUnwind { + logger.Info("Running node-unwind", "partition", cfg.partitionID, "unwind-to", unwindToBlock) + wd, _ := os.Getwd() + logger.Info("Running replayor with docker compose file", "partition", cfg.partitionID, "directory", filepath.Join(partitionDir, "docker-compose.yml"), "wd", wd) + + // Update unwind target in reth.docker.env + rethEnvPath := filepath.Join(partitionDir, "reth.docker.env") + rethEnvContent, err := os.ReadFile(rethEnvPath) + if err != nil { + return fmt.Errorf("failed to read reth.docker.env: %w", err) + } + // Update UNWIND_TO_BLOCK value + updatedContent := strings.ReplaceAll(string(rethEnvContent), + fmt.Sprintf("UNWIND_TO_BLOCK=%d", cfg.startBlock), + fmt.Sprintf("UNWIND_TO_BLOCK=%d", unwindToBlock)) + if err := os.WriteFile(rethEnvPath, []byte(updatedContent), 0644); err != nil { + return fmt.Errorf("failed to update reth.docker.env: %w", err) + } + + unwindCmd := exec.CommandContext(ctx, "docker", "compose", "-f", "docker-compose.yml", "up", "node-unwind") + unwindCmd.Dir = partitionDir + unwindCmd.Stdout = io.MultiWriter(unwindLog, os.Stdout) + unwindCmd.Stderr = io.MultiWriter(unwindLog, os.Stderr) + // Run the command - even if it returns success, we need to check container exit code + unwindCmdErr := unwindCmd.Run() + + // Always check container exit code, even if docker compose command succeeded + unwindExitCode, unwindCheckErr := checkContainerExitCode(ctx, partitionDir, "node-unwind", cfg.partitionID, logger) + if unwindCheckErr == nil { + if unwindExitCode != 0 { + logger.Error("node-unwind container exited with non-zero code", "partition", cfg.partitionID, "exit-code", unwindExitCode) + return fmt.Errorf("partition %d: node-unwind container exited with status %d", cfg.partitionID, unwindExitCode) + } + logger.Info("node-unwind container exited successfully", "partition", cfg.partitionID, "exit-code", unwindExitCode) + } else { + logger.Warn("Failed to check container exit code", "partition", cfg.partitionID, "error", unwindCheckErr) + } + + // Also check if docker compose command itself failed + if unwindCmdErr != nil { + if err := checkCommandExitStatus(unwindCmdErr, fmt.Sprintf("partition-%d node-unwind", cfg.partitionID)); err != nil { + logger.Error("node-unwind command failed", "partition", cfg.partitionID, "error", err) + return fmt.Errorf("partition %d: node-unwind failed: %w", cfg.partitionID, err) + } + } + } else { + logger.Info("Skipping node-unwind (using pre-unwound template)", "partition", cfg.partitionID) + } + + // Step 2: Run node (in background) + logger.Info("Starting node", "partition", cfg.partitionID) + nodeCmd := exec.CommandContext(ctx, "docker", "compose", "-f", "docker-compose.yml", "up", "-d", "node") + nodeCmd.Dir = partitionDir + nodeCmd.Stdout = io.MultiWriter(nodeLog, os.Stdout) + nodeCmd.Stderr = io.MultiWriter(nodeLog, os.Stderr) + if err := checkCommandExitStatus(nodeCmd.Run(), fmt.Sprintf("partition-%d node start", cfg.partitionID)); err != nil { + logger.Error("node start command failed", "partition", cfg.partitionID, "error", err) + return fmt.Errorf("partition %d: node start failed: %w", cfg.partitionID, err) + } + + // Stream node logs in background + go func() { + logsCmd := exec.CommandContext(ctx, "docker", "compose", "-f", "docker-compose.yml", "logs", "-f", "node") + logsCmd.Dir = partitionDir + logsCmd.Stdout = io.MultiWriter(nodeLog, os.Stdout) + logsCmd.Stderr = io.MultiWriter(nodeLog, os.Stderr) + logsCmd.Run() + }() + + // Wait for node to be ready by checking HTTP endpoint + logger.Info("Waiting for node to be ready", "partition", cfg.partitionID, "http-port", httpPort) + if err := waitForNodeReady(ctx, logger, httpPort, 60); err != nil { + logger.Error("node did not become ready", "partition", cfg.partitionID, "error", err) + return fmt.Errorf("partition %d: node did not become ready: %w", cfg.partitionID, err) + } + logger.Info("Node is ready", "partition", cfg.partitionID) + + // Step 3: Run replayor + logger.Info("Running replayor", "partition", cfg.partitionID) + replayorCmd := exec.CommandContext(ctx, "docker", "compose", "-f", "docker-compose.yml", "up", "replayor") + replayorCmd.Dir = partitionDir + replayorCmd.Stdout = io.MultiWriter(replayorLog, os.Stdout) + replayorCmd.Stderr = io.MultiWriter(replayorLog, os.Stderr) + // Run the command - even if it returns success, we need to check container exit code + replayorCmdErr := replayorCmd.Run() + + // Always check container exit code, even if docker compose command succeeded + replayorExitCode, replayorCheckErr := checkContainerExitCode(ctx, partitionDir, "replayor", cfg.partitionID, logger) + if replayorCheckErr == nil { + if replayorExitCode != 0 { + logger.Error("replayor container exited with non-zero code", "partition", cfg.partitionID, "exit-code", replayorExitCode) + return fmt.Errorf("partition %d: replayor container exited with status %d", cfg.partitionID, replayorExitCode) + } + logger.Info("replayor container exited successfully", "partition", cfg.partitionID, "exit-code", replayorExitCode) + } else { + logger.Warn("Failed to check container exit code", "partition", cfg.partitionID, "error", replayorCheckErr) + } + + // Also check if docker compose command itself failed + if replayorCmdErr != nil { + if err := checkCommandExitStatus(replayorCmdErr, fmt.Sprintf("partition-%d replayor", cfg.partitionID)); err != nil { + logger.Error("replayor command failed", "partition", cfg.partitionID, "error", err) + return fmt.Errorf("partition %d: replayor failed: %w", cfg.partitionID, err) + } + } + + // Stop node after replayor completes + logger.Info("Stopping node", "partition", cfg.partitionID) + stopCmd := exec.CommandContext(ctx, "docker", "compose", "-f", "docker-compose.yml", "down") + stopCmd.Dir = partitionDir + stopCmd.Run() // Ignore errors on cleanup + + return nil +} + +func generateConfigFiles(partitionDir string, partitionID int, sourceNodeUrl string, startBlock, blockCount, authrpcPort, wsPort, httpPort int, genesisExists bool, chain string, rethImage string) error { + // Generate docker-compose.yml + dockerComposePath := filepath.Join(partitionDir, "docker-compose.yml") + dockerComposeContent := dockerComposeTemplate + + // Replace reth image + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, `"op-reth:latest"`, fmt.Sprintf(`"%s"`, rethImage)) + + // Replace ports in all services (node-unwind, node-init, node) + // Format: "9123:9123" -> "httpPort:httpPort" and "8553:8553" -> "authrpcPort:authrpcPort" + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "9123:9123", fmt.Sprintf("%d:%d", httpPort, httpPort)) + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "8553:8553", fmt.Sprintf("%d:%d", authrpcPort, authrpcPort)) + + // Replace network name + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${DOCKER_NETWORK:-dev-replayor}", fmt.Sprintf("dev-replayor-%d", partitionID)) + + // Remove genesis.json volume mount if file doesn't exist + if !genesisExists { + // Remove the genesis.json volume line from all services + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, " - ${GENESIS_JSON_PATH:-./genesis.json}:/app/genesis.json:ro\n", "") + // Also handle cases where it might be on the same line or different format + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "- ${GENESIS_JSON_PATH:-./genesis.json}:/app/genesis.json:ro", "") + // Set environment variable to empty to avoid Docker trying to resolve it + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${GENESIS_JSON_PATH:-./genesis.json}", "") + } else { + // Ensure the path is relative to partition directory + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${GENESIS_JSON_PATH:-./genesis.json}", "./genesis.json") + } + + // Ensure other paths are also relative + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${ROLLUP_JSON_PATH:-./rollup.json}", "./rollup.json") + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${JWT_TXT_PATH:-./jwt.txt}", "./jwt.txt") + dockerComposeContent = strings.ReplaceAll(dockerComposeContent, "${RETH_DATA_PATH:-./reth-data}", "./reth-data") + + if err := os.WriteFile(dockerComposePath, []byte(dockerComposeContent), 0644); err != nil { + return fmt.Errorf("failed to write docker-compose.yml: %w", err) + } + + // Generate reth.docker.env + rethEnvPath := filepath.Join(partitionDir, "reth.docker.env") + rethEnvContent := strings.ReplaceAll(rethDockerEnvTemplate, "RETH_HTTP_PORT=9123", fmt.Sprintf("RETH_HTTP_PORT=%d", httpPort)) + rethEnvContent = strings.ReplaceAll(rethEnvContent, "RETH_WS_PORT=9124", fmt.Sprintf("RETH_WS_PORT=%d", wsPort)) + rethEnvContent = strings.ReplaceAll(rethEnvContent, "RETH_AUTHRPC_PORT=8553", fmt.Sprintf("RETH_AUTHRPC_PORT=%d", authrpcPort)) + rethEnvContent = strings.ReplaceAll(rethEnvContent, "RETH_DATA_DIR=./reth-data", "RETH_DATA_DIR=./reth-data") + if chain == "xlayer-testnet" || chain == "xlayer-mainnet" { + rethEnvContent = strings.ReplaceAll(rethEnvContent, "RETH_CHAIN=./genesis.json", fmt.Sprintf("RETH_CHAIN=%s", chain)) + } + // Set unwind target block to startBlock - 1 (unwind to one block before the start) + unwindBlock := startBlock + if unwindBlock < 0 { + unwindBlock = 0 + } + rethEnvContent = strings.ReplaceAll(rethEnvContent, "UNWIND_TO_BLOCK=8602500", fmt.Sprintf("UNWIND_TO_BLOCK=%d", unwindBlock)) + if err := os.WriteFile(rethEnvPath, []byte(rethEnvContent), 0644); err != nil { + return fmt.Errorf("failed to write reth.docker.env: %w", err) + } + + // Generate replayor.docker.env + replayorEnvPath := filepath.Join(partitionDir, "replayor.docker.env") + replayorEnvContent := strings.ReplaceAll(replayorDockerEnvTemplate, "SOURCE_NODE_URL=http://host.docker.internal:8123", fmt.Sprintf("SOURCE_NODE_URL=%s", sourceNodeUrl)) + replayorEnvContent = strings.ReplaceAll(replayorEnvContent, "ENGINE_API_URL=http://node:8553", fmt.Sprintf("ENGINE_API_URL=http://node:%d", authrpcPort)) + replayorEnvContent = strings.ReplaceAll(replayorEnvContent, "EXECUTION_URL=http://node:9123", fmt.Sprintf("EXECUTION_URL=http://node:%d", httpPort)) + replayorEnvContent = strings.ReplaceAll(replayorEnvContent, "CONTINUOUS_MODE=true", "CONTINUOUS_MODE=false") + replayorEnvContent = strings.ReplaceAll(replayorEnvContent, "BLOCK_COUNT=1", fmt.Sprintf("BLOCK_COUNT=%d", blockCount)) + // Add benchmark start block + replayorEnvContent += fmt.Sprintf("\nBENCHMARK_START_BLOCK=%d\n", startBlock) + if err := os.WriteFile(replayorEnvPath, []byte(replayorEnvContent), 0644); err != nil { + return fmt.Errorf("failed to write replayor.docker.env: %w", err) + } + + // Copy shell scripts + scripts := map[string]string{ + "unwind.sh": unwindShTemplate, + "reth.sh": rethShTemplate, + "replayor.sh": replayorShTemplate, + } + + for name, content := range scripts { + scriptPath := filepath.Join(partitionDir, name) + if err := os.WriteFile(scriptPath, []byte(content), 0755); err != nil { + return fmt.Errorf("failed to write %s: %w", name, err) + } + } + + // Copy Dockerfile + dockerfilePath := filepath.Join(partitionDir, "Dockerfile") + if err := os.WriteFile(dockerfilePath, []byte(dockerfileTemplate), 0644); err != nil { + return fmt.Errorf("failed to write Dockerfile: %w", err) + } + + return nil +} + +func waitForNodeReady(ctx context.Context, logger log.Logger, httpPort int, maxWaitSeconds int) error { + url := fmt.Sprintf("http://127.0.0.1:%d", httpPort) + client := &http.Client{ + Timeout: 2 * time.Second, + } + + deadline := time.Now().Add(time.Duration(maxWaitSeconds) * time.Second) + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + // Try to call eth_blockNumber to check if node is ready + req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(`{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}`)) + if err != nil { + continue + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + logger.Debug("Node not ready yet", "port", httpPort, "error", err) + continue + } + resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + logger.Info("Node is ready", "port", httpPort) + return nil + } + } + } + + return fmt.Errorf("node did not become ready within %d seconds", maxWaitSeconds) +} + +// checkCommandExitStatus checks if a command exited with a non-zero status code +// and returns an error with the exit code if so +func checkCommandExitStatus(err error, cmdName string) error { + if err == nil { + return nil + } + + // Check if it's an ExitError to get the exit code + if exitErr, ok := err.(*exec.ExitError); ok { + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCode := status.ExitStatus() + // Include stderr output if available + stderr := string(exitErr.Stderr) + if stderr != "" { + return fmt.Errorf("%s exited with status %d\nstderr: %s", cmdName, exitCode, stderr) + } + return fmt.Errorf("%s exited with status %d", cmdName, exitCode) + } + // If we can't get the exit code, still return the error + return fmt.Errorf("%s failed: %w", cmdName, err) + } + + // For other types of errors (e.g., context cancellation), return as-is + return err +} + +// checkContainerExitCode checks the exit code of a docker compose service container +// Returns the exit code and nil error if successful, or 0 and error if check failed +func checkContainerExitCode(ctx context.Context, partitionDir, serviceName string, partitionID int, logger log.Logger) (int, error) { + // Get the project name from the directory name + projectName := filepath.Base(partitionDir) + + // First, try to get container ID using docker compose ps + psCmd := exec.CommandContext(ctx, "docker", "compose", "-f", "docker-compose.yml", "-p", projectName, "ps", "-a", "-q", serviceName) + psCmd.Dir = partitionDir + containerIDBytes, err := psCmd.Output() + if err != nil { + logger.Debug("Failed to get container ID with docker compose", "partition", partitionID, "service", serviceName, "error", err) + return 0, fmt.Errorf("failed to get container ID: %w", err) + } + + containerID := strings.TrimSpace(string(containerIDBytes)) + if containerID == "" { + return 0, fmt.Errorf("container not found for service %s", serviceName) + } + + // Use docker inspect to get the exit code + inspectCmd := exec.CommandContext(ctx, "docker", "inspect", "--format", "{{.State.ExitCode}}", containerID) + exitCodeBytes, err := inspectCmd.Output() + if err != nil { + return 0, fmt.Errorf("failed to inspect container: %w", err) + } + + var exitCode int + if _, err := fmt.Sscanf(strings.TrimSpace(string(exitCodeBytes)), "%d", &exitCode); err != nil { + return 0, fmt.Errorf("failed to parse exit code: %w", err) + } + + return exitCode, nil +} diff --git a/replayor/packages/replayor/benchmark.go b/replayor/packages/replayor/benchmark.go index 01170d20..372fd7db 100644 --- a/replayor/packages/replayor/benchmark.go +++ b/replayor/packages/replayor/benchmark.go @@ -344,10 +344,20 @@ func (r *Benchmark) addBlock(ctx context.Context, currentBlock strategies.BlockC // Record TPS for FCU with transactions r.tpsTracker.RecordFCU(len(txns)) - envelope, err := r.clients.EngineApi.GetPayload(ctx, eth.PayloadInfo{ - ID: *result.PayloadID, - Timestamp: uint64(currentBlock.Time), - }) + var envelope *eth.ExecutionPayloadEnvelope + for i := 0; i < 5; i++ { + envelope, err = r.clients.EngineApi.GetPayload(ctx, eth.PayloadInfo{ + ID: *result.PayloadID, + Timestamp: uint64(currentBlock.Time), + }) + if err != nil { + l.Warn("get payload failed", "err", err, "payloadId", *result.PayloadID) + time.Sleep(5 * time.Second) + } else { + break + } + } + if err != nil { l.Crit("get payload failed", "err", err, "payloadId", *result.PayloadID) } @@ -582,3 +592,4 @@ func NewBenchmark( return r } + diff --git a/replayor/rollup.json b/replayor/rollup.json index 676fc88e..c64084de 100644 --- a/replayor/rollup.json +++ b/replayor/rollup.json @@ -1,19 +1,19 @@ { "genesis": { "l1": { - "hash": "0xacab6b84b0f0e6c15a59dc3f5f86e465c8ee661a57bd8d0cf4418ed701bade2a", - "number": 19 + "hash": "0xd6058858cd52906bf6025167275128e1b37627dfb25e84165a1e7dacab6b1a48", + "number": 23668700 }, "l2": { - "hash": "0xa4a5a697f30162412228e21f6ddf961c94d27d98a032a9c19a3631b349dec422", - "number": 8593921 + "hash": "0xdc33d8c0ec9de14fc2c21bd6077309a0a856df22821bd092a2513426e096a789", + "number": 42810021 }, - "l2_time": 1764733959, + "l2_time": 1761579057, "system_config": { - "batcherAddr": "0x3c44cdddb6a900fa2b585dd299e03d12fa4293bc", + "batcherAddr": "0xdfd6c636dcb5a013c2431316c4a0762b84e70a5d", "overhead": "0x0000000000000000000000000000000000000000000000000000000000000000", "scalar": "0x010000000000000000000000000000000000000000000000000c3c9d00000558", - "gasLimit": 200000000, + "gasLimit": 50000000, "eip1559Params": "0x0000000000000000", "operatorFeeParams": "0x0000000000000000000000000000000000000000000000000000000000000000", "minBaseFee": 0, @@ -24,8 +24,8 @@ "max_sequencer_drift": 600, "seq_window_size": 7200, "channel_timeout": 300, - "l1_chain_id": 1337, - "l2_chain_id": 195, + "l1_chain_id": 1, + "l2_chain_id": 196, "regolith_time": 0, "canyon_time": 0, "delta_time": 0, @@ -34,14 +34,14 @@ "granite_time": 0, "holocene_time": 0, "isthmus_time": 0, - "jovian_time": 0, - "batch_inbox_address": "0x006d918f650e2b4a9f360977c4447e6376eb632e", - "deposit_contract_address": "0x54fe0e35f556c164037a6b194033628b530f4f88", - "l1_system_config_address": "0x41d1ae422398231f16476f17583edca3b4f28927", - "protocol_versions_address": "0x0165878a594ca255338adfa4d48449f69242eb8f", + "jovian_time": 1764691201, + "batch_inbox_address": "0x002bde9b0c0857aee2cffdea6b8723eaf5989449", + "deposit_contract_address": "0x64057ad1ddac804d0d26a7275b193d9daca19993", + "l1_system_config_address": "0x5065809af286321a05fbf85713b5d5de7c8f0433", + "protocol_versions_address": "0xc1fb115d8249a7e6b27c8bc6914cab7edf0b0f7e", "chain_op_config": { - "eip1559Elasticity": 6, - "eip1559Denominator": 50, - "eip1559DenominatorCanyon": 250 + "eip1559Elasticity": 1, + "eip1559Denominator": 100000000, + "eip1559DenominatorCanyon": 100000000 } -} +} \ No newline at end of file