diff --git a/op-alt-da/cli.go b/op-alt-da/cli.go index 84364e47952a7..ac56b54f9ca06 100644 --- a/op-alt-da/cli.go +++ b/op-alt-da/cli.go @@ -16,6 +16,7 @@ var ( PutTimeoutFlagName = altDAFlags("put-timeout") GetTimeoutFlagName = altDAFlags("get-timeout") MaxConcurrentRequestsFlagName = altDAFlags("max-concurrent-da-requests") + BatchedCommitmentsFlagName = altDAFlags("batched-commitments") ) // altDAFlags returns the flag names for altDA @@ -77,6 +78,13 @@ func CLIFlags(envPrefix string, category string) []cli.Flag { EnvVars: altDAEnvs(envPrefix, "MAX_CONCURRENT_DA_REQUESTS"), Category: category, }, + &cli.BoolFlag{ + Name: BatchedCommitmentsFlagName, + Usage: "Use Batched Commitments", + Value: false, + EnvVars: altDAEnvs(envPrefix, "BATCHED_COMMITMENTS"), + Category: category, + }, } } @@ -88,6 +96,7 @@ type CLIConfig struct { PutTimeout time.Duration GetTimeout time.Duration MaxConcurrentRequests uint64 + BatchedCommitments bool } func (c CLIConfig) Check() error { @@ -115,5 +124,6 @@ func ReadCLIConfig(c *cli.Context) CLIConfig { PutTimeout: c.Duration(PutTimeoutFlagName), GetTimeout: c.Duration(GetTimeoutFlagName), MaxConcurrentRequests: c.Uint64(MaxConcurrentRequestsFlagName), + BatchedCommitments: c.Bool(BatchedCommitmentsFlagName), } } diff --git a/op-alt-da/commitment.go b/op-alt-da/commitment.go index 157eb8b185434..32704b2b83d49 100644 --- a/op-alt-da/commitment.go +++ b/op-alt-da/commitment.go @@ -2,9 +2,11 @@ package altda import ( "bytes" + "encoding/binary" "encoding/hex" "errors" "fmt" + "io" "github.com/ethereum-optimism/optimism/op-node/rollup/derive/params" "github.com/ethereum/go-ethereum/crypto" @@ -25,6 +27,8 @@ func CommitmentTypeFromString(s string) (CommitmentType, error) { return Keccak256CommitmentType, nil case GenericCommitmentString: return GenericCommitmentType, nil + case BatchedCommitmentString: + return BatchedCommitmentType, nil default: return 0, fmt.Errorf("invalid commitment type: %s", s) } @@ -36,8 +40,10 @@ func CommitmentTypeFromString(s string) (CommitmentType, error) { const ( Keccak256CommitmentType CommitmentType = 0 GenericCommitmentType CommitmentType = 1 + BatchedCommitmentType CommitmentType = 2 KeccakCommitmentString string = "KeccakCommitment" GenericCommitmentString string = "GenericCommitment" + BatchedCommitmentString string = "BatchedCommitment" ) // CommitmentData is the binary representation of a commitment. @@ -81,6 +87,8 @@ func DecodeCommitmentData(input []byte) (CommitmentData, error) { return DecodeKeccak256(data) case GenericCommitmentType: return DecodeGenericCommitment(data) + case BatchedCommitmentType: + return DecodeBatchedCommitment(data) default: return nil, ErrInvalidCommitment } @@ -167,3 +175,152 @@ func (c GenericCommitment) Verify(input []byte) error { func (c GenericCommitment) String() string { return hex.EncodeToString(c.Encode()) } + +// BatchedCommitment represents a collection of Keccak256 commitments +type BatchedCommitment []byte + +// NewBatchedCommitment creates a new batched commitment from the given commitments +func NewBatchedCommitment(comms []CommitmentData) (BatchedCommitment, error) { + if len(comms) == 0 { + return nil, ErrInvalidCommitment + } + + // Verify all commitments are of the same type + subType := comms[0].CommitmentType() + for i, comm := range comms { + if comm.CommitmentType() != subType { + return nil, fmt.Errorf("commitment at index %d has different type than first commitment", i) + } + } + + // Calculate total size needed: 1 byte for subcommitment type + (2 bytes length + raw data) for each commitment + totalSize := 1 // subcommitment type byte + for _, comm := range comms { + rawData := comm.Encode()[1:] // Skip the type byte since we'll store it once at the start + totalSize += 2 + len(rawData) // 2 bytes for length + raw data + } + + result := make([]byte, totalSize) + + // Write the subcommitment type byte + result[0] = byte(subType) + pos := 1 + + for _, comm := range comms { + rawData := comm.Encode()[1:] // Skip the type byte + // Write length (2 bytes, big endian) + result[pos] = byte(len(rawData) >> 8) + result[pos+1] = byte(len(rawData)) + pos += 2 + + // Write raw commitment data (without type byte) + copy(result[pos:], rawData) + pos += len(rawData) + } + + return BatchedCommitment(result), nil +} + +// DecodeBatchedCommitment validates and casts the commitment into a BatchedCommitment +func DecodeBatchedCommitment(commitment []byte) (BatchedCommitment, error) { + // Need at least: 1 byte for type + 2 bytes for first commitment length + if len(commitment) < 3 { + return nil, ErrInvalidCommitment + } + + // Skip the subcommitment type byte when validating the structure + reader := bytes.NewReader(commitment[1:]) + for reader.Len() > 0 { + // Read length (2 bytes) + var length uint16 + if err := binary.Read(reader, binary.BigEndian, &length); err != nil { + return nil, ErrInvalidCommitment + } + + // Ensure we have enough bytes for this commitment + if reader.Len() < int(length) { + return nil, ErrInvalidCommitment + } + + // Skip the commitment data + if _, err := reader.Seek(int64(length), io.SeekCurrent); err != nil { + return nil, ErrInvalidCommitment + } + } + + return BatchedCommitment(commitment), nil +} + +// CommitmentType returns the commitment type of BatchedCommitment +func (c BatchedCommitment) CommitmentType() CommitmentType { + return BatchedCommitmentType +} + +// Encode adds a commitment type prefix that describes the commitment +func (c BatchedCommitment) Encode() []byte { + return append([]byte{byte(BatchedCommitmentType)}, c...) +} + +// TxData adds an extra version byte to signal it's a commitment +func (c BatchedCommitment) TxData() []byte { + return append([]byte{params.DerivationVersion1}, c.Encode()...) +} + +// Verify checks if any of the batched commitments match the given input +func (c BatchedCommitment) Verify(input []byte) error { + commitments, err := c.GetCommitments() + if err != nil { + return err + } + + for _, comm := range commitments { + if err := comm.Verify(input); err == nil { + return nil // Found a matching commitment + } + } + return ErrCommitmentMismatch +} + +// GetCommitments returns the individual commitments in the batch +func (c BatchedCommitment) GetCommitments() ([]CommitmentData, error) { + if len(c) < 1 { + return nil, ErrInvalidCommitment + } + + // First byte is the subcommitment type + subType := CommitmentType(c[0]) + reader := bytes.NewReader(c[1:]) + var commitments []CommitmentData + + for reader.Len() > 0 { + // Read length (2 bytes) + var length uint16 + if err := binary.Read(reader, binary.BigEndian, &length); err != nil { + return nil, ErrInvalidCommitment + } + + // Read commitment data + data := make([]byte, length) + if _, err := io.ReadFull(reader, data); err != nil { + return nil, ErrInvalidCommitment + } + + // Reconstruct full commitment with type byte + fullData := append([]byte{byte(subType)}, data...) + + // Decode the commitment + comm, err := DecodeCommitmentData(fullData) + if err != nil { + return nil, err + } + commitments = append(commitments, comm) + } + + return commitments, nil +} + +func (c BatchedCommitment) String() string { + return hex.EncodeToString(c.Encode()) +} + + diff --git a/op-alt-da/commitment_test.go b/op-alt-da/commitment_test.go index e4656133d69e0..7ebaebade8fb2 100644 --- a/op-alt-da/commitment_test.go +++ b/op-alt-da/commitment_test.go @@ -4,43 +4,57 @@ import ( "testing" "github.com/ethereum-optimism/optimism/op-node/rollup/derive/params" + "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/require" ) +func encodeCommitmentData(commitmentType CommitmentType, data []byte) []byte { + return append([]byte{byte(commitmentType)}, data...) +} + // TestCommitmentData tests the CommitmentData type and its implementations, // by encoding and decoding the commitment data and verifying the input data. func TestCommitmentData(t *testing.T) { + t.Parallel() type tcase struct { name string commType CommitmentType + commInput []byte commData []byte expectedErr error } + input := []byte{0} + hash := crypto.Keccak256(input) + testCases := []tcase{ { name: "valid keccak256 commitment", commType: Keccak256CommitmentType, - commData: []byte("abcdefghijklmnopqrstuvwxyz012345"), - expectedErr: ErrInvalidCommitment, + commInput: input, + commData: encodeCommitmentData(Keccak256CommitmentType, hash), + expectedErr: nil, }, { name: "invalid keccak256 commitment", commType: Keccak256CommitmentType, - commData: []byte("ab_baddata_yz012345"), + commInput: input, + commData: encodeCommitmentData(Keccak256CommitmentType, []byte("ab_baddata_yz012345")), expectedErr: ErrInvalidCommitment, }, { name: "valid generic commitment", commType: GenericCommitmentType, - commData: []byte("any length of data! wow, that's so generic!"), - expectedErr: ErrInvalidCommitment, + commInput: []byte("any input works"), + commData: encodeCommitmentData(GenericCommitmentType, []byte("any length of data! wow, that's so generic!")), + expectedErr: nil, // This should actually be valid now }, { name: "invalid commitment type", commType: 9, - commData: []byte("abcdefghijklmnopqrstuvwxyz012345"), + commInput: []byte("some input"), + commData: encodeCommitmentData(CommitmentType(9), []byte("abcdefghijklmnopqrstuvwxyz012345")), expectedErr: ErrInvalidCommitment, }, } @@ -58,7 +72,7 @@ func TestCommitmentData(t *testing.T) { require.Equal(t, append([]byte{params.DerivationVersion1}, tc.commData...), comm.TxData()) // Test that Verify() returns no error for the correct data - require.NoError(t, comm.Verify(tc.commData)) + require.NoError(t, comm.Verify(tc.commInput)) // Test that Verify() returns error for the incorrect data // don't do this for GenericCommitmentType, which does not do any verification if tc.commType != GenericCommitmentType { @@ -68,3 +82,128 @@ func TestCommitmentData(t *testing.T) { }) } } + + +func TestBatchedCommitment(t *testing.T) { + t.Parallel() + + t.Run("empty batch", func(t *testing.T) { + _, err := NewBatchedCommitment(nil) + require.ErrorIs(t, err, ErrInvalidCommitment) + }) + + t.Run("mixed types", func(t *testing.T) { + comms := []CommitmentData{ + NewKeccak256Commitment([]byte("data1")), + NewGenericCommitment([]byte("data2")), + } + _, err := NewBatchedCommitment(comms) + require.Error(t, err) + }) + + t.Run("valid keccak batch", func(t *testing.T) { + inputs := [][]byte{ + []byte("data1"), + []byte("data2"), + []byte("data3"), + } + comms := make([]CommitmentData, len(inputs)) + for i, input := range inputs { + comms[i] = NewKeccak256Commitment(input) + } + + // Create batch + batch, err := NewBatchedCommitment(comms) + require.NoError(t, err) + + // Decode batch + decoded, err := batch.GetCommitments() + require.NoError(t, err) + require.Equal(t, len(comms), len(decoded)) + + // Verify each commitment matches and can verify its input + for i, comm := range decoded { + require.Equal(t, Keccak256CommitmentType, comm.CommitmentType()) + require.NoError(t, comm.Verify(inputs[i])) + require.Equal(t, comms[i].Encode(), comm.Encode()) + } + }) + + t.Run("valid generic batch", func(t *testing.T) { + datas := [][]byte{ + []byte("generic1"), + []byte("generic2"), + []byte("generic3"), + } + comms := make([]CommitmentData, len(datas)) + for i, data := range datas { + comms[i] = NewGenericCommitment(data) + } + + // Create batch + batch, err := NewBatchedCommitment(comms) + require.NoError(t, err) + + // Test batch encoding/decoding + decoded, err := batch.GetCommitments() + require.NoError(t, err) + require.Equal(t, len(comms), len(decoded)) + + // Verify each commitment matches + for i, comm := range decoded { + require.Equal(t, GenericCommitmentType, comm.CommitmentType()) + require.Equal(t, comms[i].Encode(), comm.Encode()) + } + }) + + t.Run("malformed batch data", func(t *testing.T) { + testCases := []struct { + name string + data []byte + }{ + {"empty data", []byte{}}, + {"only type byte", []byte{byte(Keccak256CommitmentType)}}, + {"incomplete length", []byte{byte(Keccak256CommitmentType), 0}}, + {"length with no data", []byte{byte(Keccak256CommitmentType), 0, 32}}, + {"invalid type", []byte{255, 0, 32, 1, 2, 3}}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, err := DecodeBatchedCommitment(tc.data) + require.ErrorIs(t, err, ErrInvalidCommitment) + }) + } + }) + + t.Run("batch roundtrip", func(t *testing.T) { + // Create a batch + comms := []CommitmentData{ + NewKeccak256Commitment([]byte("data1")), + NewKeccak256Commitment([]byte("data2")), + } + batch, err := NewBatchedCommitment(comms) + require.NoError(t, err) + + // Encode it + encoded := batch.Encode() + + // Decode it + decoded, err := DecodeCommitmentData(encoded) + require.NoError(t, err) + + // Verify it's a batched commitment + batchComm, ok := decoded.(BatchedCommitment) + require.True(t, ok) + + // Get the individual commitments + decodedComms, err := batchComm.GetCommitments() + require.NoError(t, err) + + // Verify they match the original + require.Equal(t, len(comms), len(decodedComms)) + for i := range comms { + require.Equal(t, comms[i].Encode(), decodedComms[i].Encode()) + } + }) +} diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go index 6b936c112d346..fdd09b7f7b0e8 100644 --- a/op-batcher/batcher/channel.go +++ b/op-batcher/batcher/channel.go @@ -132,14 +132,14 @@ func (c *channel) ID() derive.ChannelID { // NextTxData should only be called after HasTxData returned true. func (c *channel) NextTxData() txData { nf := c.cfg.MaxFramesPerTx() - txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs} + txdata := txData{frames: make([]frameData, 0, nf), daType: c.cfg.DaType} for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ { frame := c.channelBuilder.NextFrame() txdata.frames = append(txdata.frames, frame) } id := txdata.ID().String() - c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob) + c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "da_type", txdata.daType) c.pendingTransactions[id] = txdata return txdata @@ -147,7 +147,7 @@ func (c *channel) NextTxData() txData { func (c *channel) HasTxData() bool { if c.IsFull() || // If the channel is full, we should start to submit it - !c.cfg.UseBlobs { // If using calldata, we only send one frame per tx + c.cfg.DaType == DaTypeCalldata { // If using calldata, we only send one frame per tx return c.channelBuilder.HasPendingFrame() } // Collect enough frames if channel is not full yet diff --git a/op-batcher/batcher/channel_config.go b/op-batcher/batcher/channel_config.go index bf0f5ffb4adbe..5054bb3989507 100644 --- a/op-batcher/batcher/channel_config.go +++ b/op-batcher/batcher/channel_config.go @@ -46,9 +46,12 @@ type ChannelConfig struct { // BatchType indicates whether the channel uses SingularBatch or SpanBatch. BatchType uint - // UseBlobs indicates that this channel should be sent as a multi-blob - // transaction with one blob per frame. - UseBlobs bool + // DaType indicates how the frames in this channel should be sent to the L1. + DaType DaType +} + +func (cc ChannelConfig) UseBlobs() bool { + return cc.DaType == DaTypeBlob } // ChannelConfig returns a copy of the receiver. @@ -93,7 +96,7 @@ func (cc *ChannelConfig) ReinitCompressorConfig() { } func (cc *ChannelConfig) MaxFramesPerTx() int { - if !cc.UseBlobs { + if cc.DaType == DaTypeCalldata { return 1 } return cc.TargetNumFrames diff --git a/op-batcher/batcher/channel_config_provider_test.go b/op-batcher/batcher/channel_config_provider_test.go index 95e51a921e5fd..fccc26d649219 100644 --- a/op-batcher/batcher/channel_config_provider_test.go +++ b/op-batcher/batcher/channel_config_provider_test.go @@ -31,11 +31,12 @@ func TestDynamicEthChannelConfig_ChannelConfig(t *testing.T) { calldataCfg := ChannelConfig{ MaxFrameSize: 120_000 - 1, TargetNumFrames: 1, + DaType: DaTypeCalldata, } blobCfg := ChannelConfig{ MaxFrameSize: eth.MaxBlobDataSize - 1, TargetNumFrames: 3, // gets closest to amortized fixed tx costs - UseBlobs: true, + DaType: DaTypeBlob, } tests := []struct { diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 1ea412c4b4337..0fad16927e300 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -207,16 +207,16 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool) (txData, erro newCfg := s.cfgProvider.ChannelConfig(isPectra) // No change: - if newCfg.UseBlobs == s.defaultCfg.UseBlobs { + if newCfg.UseBlobs() == s.defaultCfg.UseBlobs() { s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type", - "useBlobs", s.defaultCfg.UseBlobs) + "useBlobs", s.defaultCfg.UseBlobs()) return s.nextTxData(channel) } // Change: s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...", - "useBlobsBefore", s.defaultCfg.UseBlobs, - "useBlobsAfter", newCfg.UseBlobs) + "useBlobsBefore", s.defaultCfg.UseBlobs(), + "useBlobsAfter", newCfg.UseBlobs()) // Invalidate the channel so its blocks // get requeued: @@ -317,7 +317,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { "compression_algo", cfg.CompressorConfig.CompressionAlgo, "target_num_frames", cfg.TargetNumFrames, "max_frame_size", cfg.MaxFrameSize, - "use_blobs", cfg.UseBlobs, + "da_type", cfg.DaType, ) s.metr.RecordChannelOpened(pc.ID(), s.pendingBlocks()) diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index d7c8abcd87e91..9205b6eae2e8d 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -290,11 +290,12 @@ func newFakeDynamicEthChannelConfig(lgr log.Logger, calldataCfg := ChannelConfig{ MaxFrameSize: 120_000 - 1, TargetNumFrames: 1, + DaType: DaTypeCalldata, } blobCfg := ChannelConfig{ MaxFrameSize: eth.MaxBlobDataSize - 1, TargetNumFrames: 3, // gets closest to amortized fixed tx costs - UseBlobs: true, + DaType: DaTypeBlob, } calldataCfg.InitNoneCompressor() blobCfg.InitNoneCompressor() @@ -348,7 +349,7 @@ func TestChannelManager_TxData(t *testing.T) { cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs) + require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.DaType == DaTypeBlob) // Seed channel manager with a block rng := rand.New(rand.NewSource(99)) @@ -385,8 +386,8 @@ func TestChannelManager_TxData(t *testing.T) { } require.Equal(t, tc.numExpectedAssessments, cfg.assessments) - require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.asBlob) - require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.UseBlobs) + require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.daType == DaTypeBlob) + require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.DaType == DaTypeBlob) }) } diff --git a/op-batcher/batcher/channel_test.go b/op-batcher/batcher/channel_test.go index b36ce9311bcea..5e1a4414ada63 100644 --- a/op-batcher/batcher/channel_test.go +++ b/op-batcher/batcher/channel_test.go @@ -131,7 +131,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { const n = 6 lgr := testlog.Logger(t, log.LevelWarn) ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{ - UseBlobs: false, + DaType: DaTypeCalldata, TargetNumFrames: n, CompressorConfig: compressor.Config{ CompressionAlgo: derive.Zlib, @@ -172,7 +172,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { const n = eth.MaxBlobsPerBlobTx lgr := testlog.Logger(t, log.LevelWarn) ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{ - UseBlobs: true, + DaType: DaTypeBlob, TargetNumFrames: n, CompressorConfig: compressor.Config{ CompressionAlgo: derive.Zlib, diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 20ca4eb054818..94a60f8edd5b3 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive/params" "github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/txmgr" @@ -780,22 +781,10 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh // publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1. func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) { - // sanity checks - if nf := len(txdata.frames); nf != 1 { - l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) - } - if txdata.asBlob { - l.Log.Crit("Unexpected blob txdata with AltDA enabled") - } - // when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop // since it may take a while for the request to return. goroutineSpawned := daGroup.TryGo(func() error { - // TODO: probably shouldn't be using the global shutdownCtx here, see https://go.dev/blog/context-and-structs - // but sendTransaction receives l.killCtx as an argument, which currently is only canceled after waiting for the main loop - // to exit, which would wait on this DA call to finish, which would take a long time. - // So we prefer to mimic the behavior of txmgr and cancel all pending DA/txmgr requests when the batcher is stopped. - comm, err := l.AltDA.SetInput(l.shutdownCtx, txdata.CallData()) + comm, err := l.createAltDACommitment(txdata) if err != nil { // Don't log context cancelled events because they are expected, // and can happen after tests complete which causes a panic. @@ -809,6 +798,7 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t } return nil } + l.Log.Info("Set altda input", "commitment", comm, "tx", txdata.ID()) candidate := l.calldataTxCandidate(comm.TxData()) l.sendTx(txdata, false, candidate, queue, receiptsCh) @@ -822,21 +812,57 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t } } +func (l *BatchSubmitter) createAltDACommitment(txdata txData) (altda.CommitmentData, error) { + inputs := l.prepareAltDAInputs(txdata) + comms := make([]altda.CommitmentData, 0, len(inputs)) + + for _, input := range inputs { + // TODO: probably shouldn't be using the global shutdownCtx here, see https://go.dev/blog/context-and-structs + // but sendTransaction receives l.killCtx as an argument, which currently is only canceled after waiting for the main loop + // to exit, which would wait on this DA call to finish, which would take a long time. + // So we prefer to mimic the behavior of txmgr and cancel all pending DA/txmgr requests when the batcher is stopped. + comm, err := l.AltDA.SetInput(l.shutdownCtx, input) + if err != nil { + return nil, err + } + comms = append(comms, comm) + } + + // Return single or batched commitment based on how many frames we had + if len(comms) == 1 { + return comms[0], nil + } + return altda.NewBatchedCommitment(comms) +} + +func (l *BatchSubmitter) prepareAltDAInputs(txdata txData) [][]byte { + if !l.Config.UseBatchedCommitments { + return [][]byte{txdata.CallData()} + } + + result := make([][]byte, len(txdata.frames)) + for i, f := range txdata.frames { + result[i] = append([]byte{params.DerivationVersion0}, f.data...) + } + return result +} + // sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`. // This call will block if the txmgr queue is at the max-pending limit. // The method will block if the queue's MaxPendingTransactions is exceeded. func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error { var err error - - // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. - if l.Config.UseAltDA { + var candidate *txmgr.TxCandidate + switch txdata.daType { + case DaTypeAltDA: + if !l.Config.UseAltDA { + l.Log.Crit("Received AltDA type txdata without AltDA being enabled") + } + // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup) // we return nil to allow publishStateToL1 to keep processing the next txdata return nil - } - - var candidate *txmgr.TxCandidate - if txdata.asBlob { + case DaTypeBlob: if candidate, err = l.blobTxCandidate(txdata); err != nil { // We could potentially fall through and try a calldata tx instead, but this would // likely result in the chain spending more in gas fees than it is tuned for, so best @@ -844,12 +870,14 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef // or configuration issue. return fmt.Errorf("could not create blob tx candidate: %w", err) } - } else { + case DaTypeCalldata: // sanity check if nf := len(txdata.frames); nf != 1 { l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) } candidate = l.calldataTxCandidate(txdata.CallData()) + default: + l.Log.Crit("Unknown DA type", "da_type", txdata.daType) } l.sendTx(txdata, false, candidate, queue, receiptsCh) @@ -867,7 +895,7 @@ func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.T candidate.GasLimit = intrinsicGas } - queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh) + queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.daType == DaTypeBlob}, *candidate, receiptsCh) } func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) { diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index f884c57b3eabd..ddf927cbaf5c0 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -39,6 +39,9 @@ type BatcherConfig struct { // UseAltDA is true if the rollup config has a DA challenge address so the batcher // will post inputs to the DA server and post commitments to blobs or calldata. UseAltDA bool + + UseBatchedCommitments bool + // maximum number of concurrent blob put requests to the DA server MaxConcurrentDARequests uint64 @@ -218,30 +221,40 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { TargetNumFrames: cfg.TargetNumFrames, SubSafetyMargin: cfg.SubSafetyMargin, BatchType: cfg.BatchType, + // DaType: set below } - switch cfg.DataAvailabilityType { - case flags.BlobsType, flags.AutoType: - if !cfg.TestUseMaxTxSizeForBlobs { - // account for version byte prefix - cc.MaxFrameSize = eth.MaxBlobDataSize - 1 + if bs.UseAltDA { + if cfg.DataAvailabilityType == flags.CalldataType { + cc.DaType = DaTypeAltDA + } else { + return fmt.Errorf("altDA is currently only supported with calldata DA Type") } - cc.UseBlobs = true - case flags.CalldataType: // do nothing - default: - return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType) - } + if cc.MaxFrameSize > altda.MaxInputSize { + return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize) + } + } else { - if bs.UseAltDA && cc.MaxFrameSize > altda.MaxInputSize { - return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize) + switch cfg.DataAvailabilityType { + case flags.BlobsType, flags.AutoType: + if !cfg.TestUseMaxTxSizeForBlobs { + // account for version byte prefix + cc.MaxFrameSize = eth.MaxBlobDataSize - 1 + } + cc.DaType = DaTypeBlob + case flags.CalldataType: // do nothing + cc.DaType = DaTypeCalldata + default: + return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType) + } } cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo) - if cc.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { + if cc.UseBlobs() && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { return errors.New("cannot use Blobs before Ecotone") } - if !cc.UseBlobs && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { + if !cc.UseBlobs() && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!") } @@ -273,7 +286,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { calldataCC := cc calldataCC.TargetNumFrames = 1 calldataCC.MaxFrameSize = 120_000 - calldataCC.UseBlobs = false + calldataCC.DaType = DaTypeCalldata calldataCC.ReinitCompressorConfig() bs.ChannelConfig = NewDynamicEthChannelConfig(bs.Log, 10*time.Second, bs.TxManager, cc, calldataCC) @@ -375,6 +388,7 @@ func (bs *BatcherService) initAltDA(cfg *CLIConfig) error { } bs.AltDA = config.NewDAClient() bs.UseAltDA = config.Enabled + bs.UseBatchedCommitments = config.BatchedCommitments return nil } diff --git a/op-batcher/batcher/test_batch_submitter.go b/op-batcher/batcher/test_batch_submitter.go index 93083aa0dc6d7..f497a81209dc2 100644 --- a/op-batcher/batcher/test_batch_submitter.go +++ b/op-batcher/batcher/test_batch_submitter.go @@ -28,7 +28,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error { var candidate *txmgr.TxCandidate var err error cc := l.channelMgr.cfgProvider.ChannelConfig(true) - if cc.UseBlobs { + if cc.UseBlobs() { candidate = l.calldataTxCandidate([]byte{}) } else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil { return err diff --git a/op-batcher/batcher/tx_data.go b/op-batcher/batcher/tx_data.go index 0165f85f079ed..7f8ff538be1c6 100644 --- a/op-batcher/batcher/tx_data.go +++ b/op-batcher/batcher/tx_data.go @@ -9,6 +9,15 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" ) +// DaType determines how txData is submitted to L1. +type DaType int + +const ( + DaTypeCalldata DaType = iota + DaTypeBlob + DaTypeAltDA +) + // txData represents the data for a single transaction. // // Note: The batcher currently sends exactly one frame per transaction. This @@ -16,7 +25,7 @@ import ( // different channels. type txData struct { frames []frameData - asBlob bool // indicates whether this should be sent as blob + daType DaType } func singleFrameTxData(frame frameData) txData { diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index dee068cdb8f45..405fa1fdf81f7 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -82,8 +82,10 @@ var ( EnvVars: prefixEnvVars("MAX_BLOCKS_PER_SPAN_BATCH"), } TargetNumFramesFlag = &cli.IntFlag{ - Name: "target-num-frames", - Usage: "The target number of frames to create per channel. Controls number of blobs per blob tx, if using Blob DA.", + Name: "target-num-frames", + Usage: "The target number of frames to create per channel. " + + "Controls number of blobs per blob tx, if using Blob DA, " + + "or number of frames per blob, if using altDA.", Value: 1, EnvVars: prefixEnvVars("TARGET_NUM_FRAMES"), } diff --git a/op-e2e/actions/altda/altda_batched_test.go b/op-e2e/actions/altda/altda_batched_test.go new file mode 100644 index 0000000000000..5d602c97580b2 --- /dev/null +++ b/op-e2e/actions/altda/altda_batched_test.go @@ -0,0 +1,440 @@ +package altda + +import ( + "math/rand" + "testing" + + "github.com/ethereum-optimism/optimism/op-e2e/config" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/event" + + "github.com/ethereum-optimism/optimism/op-e2e/actions/helpers" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + + altda "github.com/ethereum-optimism/optimism/op-alt-da" + "github.com/ethereum-optimism/optimism/op-alt-da/bindings" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" + "github.com/ethereum-optimism/optimism/op-service/sources" + "github.com/ethereum-optimism/optimism/op-service/testlog" +) + +// L2AltDA is a test harness for manipulating AltDA state. + +type AltDAParamBatched func(p *e2eutils.TestParams) + +// Same as altda_test.go, but with a batched batcher config +func NewL2AltDABatched(t helpers.Testing, params ...AltDAParamBatched) *L2AltDA { + p := &e2eutils.TestParams{ + MaxSequencerDrift: 40, + SequencerWindowSize: 12, + ChannelTimeout: 12, + L1BlockTime: 12, + UseAltDA: true, + AllocType: config.AllocTypeAltDA, + } + for _, apply := range params { + apply(p) + } + log := testlog.Logger(t, log.LvlDebug) + + dp := e2eutils.MakeDeployParams(t, p) + sd := e2eutils.Setup(t, dp, helpers.DefaultAlloc) + + require.True(t, sd.RollupCfg.AltDAEnabled()) + + miner := helpers.NewL1Miner(t, log, sd.L1Cfg) + l1Client := miner.EthClient() + + jwtPath := e2eutils.WriteDefaultJWT(t) + engine := helpers.NewL2Engine(t, log, sd.L2Cfg, jwtPath) + engCl := engine.EngineClient(t, sd.RollupCfg) + + storage := &altda.DAErrFaker{Client: altda.NewMockDAClient(log)} + + l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic)) + require.NoError(t, err) + + altDACfg, err := sd.RollupCfg.GetOPAltDAConfig() + require.NoError(t, err) + + daMgr := altda.NewAltDAWithStorage(log, altDACfg, storage, &altda.NoopMetrics{}) + + sequencer := helpers.NewL2Sequencer(t, log, l1F, miner.BlobStore(), daMgr, engCl, sd.RollupCfg, 0) + miner.ActL1SetFeeRecipient(common.Address{'A'}) + sequencer.ActL2PipelineFull(t) + + batcher := helpers.NewL2Batcher(log, sd.RollupCfg, helpers.BatchedCommsBatcherCfg(dp, storage), sequencer.RollupClient(), l1Client, engine.EthClient(), engCl) + + addresses := e2eutils.CollectAddresses(sd, dp) + cl := engine.EthClient() + l2UserEnv := &helpers.BasicUserEnv[*helpers.L2Bindings]{ + EthCl: cl, + Signer: types.LatestSigner(sd.L2Cfg.Config), + AddressCorpora: addresses, + Bindings: helpers.NewL2Bindings(t, cl, engine.GethClient()), + } + alice := helpers.NewCrossLayerUser(log, dp.Secrets.Alice, rand.New(rand.NewSource(0xa57b)), p.AllocType) + alice.L2.SetUserEnv(l2UserEnv) + + contract, err := bindings.NewDataAvailabilityChallenge(sd.RollupCfg.AltDAConfig.DAChallengeAddress, l1Client) + require.NoError(t, err) + + challengeWindow, err := contract.ChallengeWindow(nil) + require.NoError(t, err) + require.Equal(t, altDACfg.ChallengeWindow, challengeWindow.Uint64()) + + resolveWindow, err := contract.ResolveWindow(nil) + require.NoError(t, err) + require.Equal(t, altDACfg.ResolveWindow, resolveWindow.Uint64()) + + return &L2AltDA{ + log: log, + storage: storage, + daMgr: daMgr, + altDACfg: altDACfg, + contract: contract, + batcher: batcher, + sequencer: sequencer, + engine: engine, + engCl: engCl, + sd: sd, + dp: dp, + miner: miner, + alice: alice, + } +} + +func (a *L2AltDA) ActSequencerIncludeBigTxs(t helpers.Testing, n int) { + rng := rand.New(rand.NewSource(555)) + + a.sequencer.ActL2StartBlock(t) + // build an L2 block with i large txs of random data (each should take a whole frame) + for i := 0; i < n ; i++ { + data := make([]byte, 120_000) // very large L2 txs, as large as the tx-pool will accept + _, err := rng.Read(data[:]) // fill with random bytes, to make compression ineffective + require.NoError(t, err) + + a.alice.L2.ActResetTxOpts(t) + a.alice.L2.ActSetTxToAddr(&a.dp.Addresses.Bob)(t) + a.alice.L2.ActSetTxCalldata(data)(t) + a.alice.L2.ActMakeTx(t) + a.engine.ActL2IncludeTx(a.alice.Address())(t) + } + a.sequencer.ActL2EndBlock(t) +} + +func (a *L2AltDA) ActSubmitBatchedCommitments(t helpers.Testing, n int) { + a.ActSequencerIncludeBigTxs(t, n) + + // This should buffer 1 block, which will be consumed as 2 frames because of the size + a.batcher.ActBufferAll(t) + + // close the channel + a.batcher.ActL2ChannelClose(t) + + // Batch submit 2 commitments + a.batcher.ActL2SubmitBatchedCommitments(t, n, func(tx *types.DynamicFeeTx) { + // skip txdata version byte, and only store the second commitment + comm, err := altda.DecodeCommitmentData(tx.Data[1:]) + require.NoError(t, err) + + if batchedComm, ok := comm.(altda.BatchedCommitment); ok { + // The commitment implements BatchedCommitmentData + comms, err := batchedComm.GetCommitments() + require.NoError(t, err) + + require.Equal(t, len(comms), n) + + // Store last commitment + a.lastComm = comms[n - 1].Encode() + } else { + require.Fail(t, "Decoded commitment is not BatchedCommitment") + } + }) + + // Include batched commitments in L1 block + a.miner.ActL1StartBlock(12)(t) + a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t) + a.miner.ActL1EndBlock(t) + + a.lastCommBn = a.miner.L1Chain().CurrentBlock().Number.Uint64() +} + + +func TestAltDABatched_Derivation(gt *testing.T) { + t := helpers.NewDefaultTesting(gt) + harness := NewL2AltDABatched(t) + verifier := harness.NewVerifier(t) + + harness.ActSubmitBatchedCommitments(t, 2) + + // Send a head signal to the verifier + verifier.ActL1HeadSignal(t) + harness.sequencer.ActL1HeadSignal(t) + + verifier.ActL2PipelineFull(t) + harness.sequencer.ActL2PipelineFull(t) + + require.Equal(t, harness.sequencer.SyncStatus().UnsafeL2, verifier.SyncStatus().SafeL2, "verifier synced sequencer data") +} + +// Commitment is challenged but never resolved, chain reorgs when challenge window expires. +func TestAltDABatched_ChallengeExpired(gt *testing.T) { + t := helpers.NewDefaultTesting(gt) + harness := NewL2AltDABatched(t) + + // generate enough initial l1 blocks to have a finalized head. + harness.ActL1Blocks(t, 5) + + // Include a new l2 transaction, submitting an 2 batched commitments to the l1. + harness.ActSubmitBatchedCommitments(t, 2) + + // Challenge the input commitment on the l1 challenge contract. + harness.ActChallengeLastInput(t) + + blk := harness.GetLastTxBlock(t) + + // catch up the sequencer derivation pipeline with the new l1 blocks. + harness.sequencer.ActL2PipelineFull(t) + + // create enough l1 blocks to expire the resolve window. + harness.ActExpireLastInput(t) + + // catch up the sequencer derivation pipeline with the new l1 blocks. + harness.sequencer.ActL2PipelineFull(t) + + // the L1 finalized signal should trigger altDA to finalize the engine queue. + harness.ActL1Finalized(t) + + // move one more block for engine controller to update. + harness.ActL1Blocks(t, 1) + harness.sequencer.ActL2PipelineFull(t) + + // get new block with same number to compare + newBlk, err := harness.engine.EthClient().BlockByNumber(t.Ctx(), blk.Number()) + require.NoError(t, err) + + // reorg happened even though data was available + require.NotEqual(t, blk.Hash(), newBlk.Hash()) + + // now delete the data from the storage service so it is not available at all + // to the verifier derivation pipeline. + harness.ActDeleteLastInput(t) + + syncStatus := harness.sequencer.SyncStatus() + + // verifier is able to sync with expired missing data + verifier := harness.NewVerifier(t) + verifier.ActL2PipelineFull(t) + verifier.ActL1FinalizedSignal(t) + + verifSyncStatus := verifier.SyncStatus() + + require.Equal(t, syncStatus.FinalizedL2, verifSyncStatus.FinalizedL2) +} + + +// Commitment is challenged after sequencer derived the chain but data disappears. A verifier +// derivation pipeline stalls until the challenge is resolved and then resumes with data from the contract. +func TestAltDABatched_ChallengeResolved(gt *testing.T) { + t := helpers.NewDefaultTesting(gt) + harness := NewL2AltDABatched(t) + + harness.ActSubmitBatchedCommitments(t, 2) + + // generate 3 l1 blocks. + harness.ActL1Blocks(t, 3) + + // challenge the input commitment for that l2 transaction on the l1 challenge contract. + harness.ActChallengeLastInput(t) + + // catch up sequencer derivation pipeline. + // this syncs the latest event within the AltDA manager. + harness.sequencer.ActL2PipelineFull(t) + + // resolve the challenge on the l1 challenge contract. + harness.ActResolveLastChallenge(t) + + // catch up the sequencer derivation pipeline with the new l1 blocks. + // this syncs the resolved status and input data within the AltDA manager. + harness.sequencer.ActL2PipelineFull(t) + + // finalize l1 + harness.ActL1Finalized(t) + + // delete the data from the storage service so it is not available at all + // to the verifier derivation pipeline. + harness.ActDeleteLastInput(t) + + syncStatus := harness.sequencer.SyncStatus() + + // new verifier is able to sync and resolve the input from calldata + verifier := harness.NewVerifier(t) + verifier.ActL2PipelineFull(t) + verifier.ActL1FinalizedSignal(t) + + verifSyncStatus := verifier.SyncStatus() + + require.Equal(t, syncStatus.SafeL2, verifSyncStatus.SafeL2) +} + +// DA storage service goes offline while sequencer keeps making blocks. When storage comes back online, it should be able to catch up. +func TestAltDABatched_StorageError(gt *testing.T) { + t := helpers.NewDefaultTesting(gt) + harness := NewL2AltDABatched(t) + + harness.ActSubmitBatchedCommitments(t, 2) + + txBlk := harness.GetLastTxBlock(t) + + // mock a storage client error when trying to get the pre-image. + // this simulates the storage service going offline for example. + harness.storage.ActGetPreImageFail() + + // try to derive the l2 chain from the submitted inputs commitments. + // the storage call will fail the first time then succeed. + harness.sequencer.ActL2PipelineFull(t) + + // sequencer derivation was able to sync to latest l1 origin + syncStatus := harness.sequencer.SyncStatus() + require.Equal(t, uint64(1), syncStatus.SafeL2.Number) + require.Equal(t, txBlk.Hash(), syncStatus.SafeL2.Hash) +} + +// L1 chain reorgs a resolved challenge so it expires instead causing +// the l2 chain to reorg as well. +func TestAltDABatched_ChallengeReorg(gt *testing.T) { + t := helpers.NewDefaultTesting(gt) + harness := NewL2AltDABatched(t) + + harness.ActSubmitBatchedCommitments(t, 2) + + // add a buffer of L1 blocks + harness.ActL1Blocks(t, 3) + + // challenge the input commitment + harness.ActChallengeLastInput(t) + + // keep track of the block where the L2 tx was included + blk := harness.GetLastTxBlock(t) + + // progress derivation pipeline + harness.sequencer.ActL2PipelineFull(t) + + // resolve the challenge so pipeline can progress + harness.ActResolveLastChallenge(t) + + // derivation marks the challenge as resolve, chain is not impacted + harness.sequencer.ActL2PipelineFull(t) + + // Rewind the L1, essentially reorging the challenge resolution + harness.miner.ActL1RewindToParent(t) + + // Now the L1 chain advances without the challenge resolution + // so the challenge is expired. + harness.ActExpireLastInput(t) + + // derivation pipeline reorgs the commitment out of the chain + harness.sequencer.ActL2PipelineFull(t) + + newBlk, err := harness.engine.EthClient().BlockByNumber(t.Ctx(), blk.Number()) + require.NoError(t, err) + + // confirm the reorg did happen + require.NotEqual(t, blk.Hash(), newBlk.Hash()) +} + +// Sequencer stalls as data is not available, batcher keeps posting, untracked commitments are +// challenged and resolved, then sequencer resumes and catches up. +func TestAltDABatched_SequencerStalledMultiChallenges(gt *testing.T) { + t := helpers.NewDefaultTesting(gt) + a := NewL2AltDABatched(t) + + a.ActSubmitBatchedCommitments(t, 2) + + // keep track of the related commitment (second batched commitment) + comm1 := a.lastComm + input1, err := a.storage.GetInput(t.Ctx(), altda.Keccak256Commitment(comm1[1:])) + bn1 := a.lastCommBn + require.NoError(t, err) + + // delete it from the DA provider so the pipeline cannot verify it + a.ActDeleteLastInput(t) + + // build more empty l2 unsafe blocks as the l1 origin progresses + a.ActL1Blocks(t, 10) + a.sequencer.ActBuildToL1HeadUnsafe(t) + + // build another L2 block without advancing derivation + a.alice.L2.ActResetTxOpts(t) + a.alice.L2.ActSetTxToAddr(&a.dp.Addresses.Bob)(t) + a.alice.L2.ActMakeTx(t) + + a.sequencer.ActL2StartBlock(t) + a.engine.ActL2IncludeTx(a.alice.Address())(t) + a.sequencer.ActL2EndBlock(t) + + a.batcher.ActL2BatchBuffer(t) + a.batcher.ActL2ChannelClose(t) + a.batcher.ActL2BatchSubmit(t, func(tx *types.DynamicFeeTx) { + a.lastComm = tx.Data[1:] + }) + + // include it in L1 + a.miner.ActL1StartBlock(12)(t) + a.miner.ActL1IncludeTx(a.dp.Addresses.Batcher)(t) + a.miner.ActL1EndBlock(t) + + a.sequencer.ActL1HeadSignal(t) + + unsafe := a.sequencer.L2Unsafe() + unsafeBlk, err := a.engine.EthClient().BlockByHash(t.Ctx(), unsafe.Hash) + require.NoError(t, err) + + // advance the pipeline until it errors out as it is still stuck + // on deriving the first commitment + a.sequencer.ActL2EventsUntil(t, func(ev event.Event) bool { + x, ok := ev.(rollup.EngineTemporaryErrorEvent) + if ok { + require.ErrorContains(t, x.Err, "failed to fetch input data") + } + return ok + }, 100, false) + + // keep track of the second commitment + comm2 := a.lastComm + _, err = a.storage.GetInput(t.Ctx(), altda.Keccak256Commitment(comm2[1:])) + require.NoError(t, err) + a.lastCommBn = a.miner.L1Chain().CurrentBlock().Number.Uint64() + + // ensure the second commitment is distinct from the first + require.NotEqual(t, comm1, comm2) + + // challenge the last commitment while the pipeline is stuck on the first + a.ActChallengeLastInput(t) + + // resolve the latest commitment before the first one is even challenged. + a.ActResolveLastChallenge(t) + + // now we delete it to force the pipeline to resolve the second commitment + // from the challenge data. + a.ActDeleteLastInput(t) + + // finally challenge the first commitment + a.ActChallengeInput(t, comm1, bn1) + + // resolve it immediately so we can resume derivation + a.ActResolveInput(t, comm1, input1, bn1) + + // pipeline can go on + a.sequencer.ActL2PipelineFull(t) + + // verify that the chain did not reorg out + safeBlk, err := a.engine.EthClient().BlockByNumber(t.Ctx(), unsafeBlk.Number()) + require.NoError(t, err) + require.Equal(t, unsafeBlk.Hash(), safeBlk.Hash()) +} diff --git a/op-e2e/actions/helpers/l2_batcher.go b/op-e2e/actions/helpers/l2_batcher.go index f270bddb717a0..e84ae9c36aeb3 100644 --- a/op-e2e/actions/helpers/l2_batcher.go +++ b/op-e2e/actions/helpers/l2_batcher.go @@ -61,6 +61,7 @@ type BatcherCfg struct { ForceSubmitSingularBatch bool ForceSubmitSpanBatch bool UseAltDA bool + UseBatchedCommitments bool DataAvailabilityType batcherFlags.DataAvailabilityType AltDA AltDAInputSetter @@ -86,6 +87,18 @@ func AltDABatcherCfg(dp *e2eutils.DeployParams, altDA AltDAInputSetter) *Batcher } } +func BatchedCommsBatcherCfg(dp *e2eutils.DeployParams, altDA AltDAInputSetter) *BatcherCfg { + return &BatcherCfg{ + MinL1TxSize: 0, + MaxL1TxSize: 128_000, + BatcherKey: dp.Secrets.Batcher, + DataAvailabilityType: batcherFlags.CalldataType, + AltDA: altDA, + UseAltDA: true, + UseBatchedCommitments:true, + } +} + type L2BlockRefs interface { L2BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L2BlockRef, error) } @@ -383,6 +396,87 @@ func (s *L2Batcher) ActL2BatchSubmitRaw(t Testing, payload []byte, txOpts ...fun s.LastSubmitted = tx } +func (s *L2Batcher) ActL2SubmitBatchedCommitments(t Testing, numFrames int, txOpts ...func(tx *types.DynamicFeeTx)) { + if !s.l2BatcherCfg.UseAltDA || !s.l2BatcherCfg.UseBatchedCommitments { + t.InvalidAction("ActL2SubmitBatchedCommitments only available for Alt DA type with BatchedCommitments enabled") + return + } + + if s.L2ChannelOut == nil { + t.InvalidAction("need to buffer data first, cannot batch submit with empty buffer") + return + } + + // Load and encode the commitment content (1 frame per commitment) + inputs := make([][]byte, numFrames) + for i := 0; i < numFrames; i++ { + if s.L2ChannelOut == nil { + break + } + data := new(bytes.Buffer) + data.WriteByte(derive_params.DerivationVersion0) + // subtract one, to account for the version byte + if _, err := s.L2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF { + s.l2Submitting = false + if i < numFrames - 1 { + t.Fatalf("failed read %d frames, only read %d", numFrames, i+1) + } + s.L2ChannelOut = nil + } else if err != nil { + s.l2Submitting = false + t.Fatalf("failed to output channel data to frame: %v", err) + } + + inputs[i] = data.Bytes() + } + s.log.Debug("Number of commitments to batch", "len", len(inputs)) + + // Iterate over encoded frames and set the input for the da client + comms := make([]altda.CommitmentData, numFrames) + for i, calldata := range inputs { + comm, err := s.l2BatcherCfg.AltDA.SetInput(t.Ctx(), calldata) + require.NoError(t, err, "failed to set input for altda") + s.log.Debug("Set input for", "commitment", common.Bytes2Hex(comm.Encode())) + comms[i] = comm + } + + batchedComm, err := altda.NewBatchedCommitment(comms) + require.NoError(t, err, "error creating batched commitment") + + nonce, err := s.l1.PendingNonceAt(t.Ctx(), s.BatcherAddr) + require.NoError(t, err, "need batcher nonce") + + gasTipCap := big.NewInt(2 * params.GWei) + pendingHeader, err := s.l1.HeaderByNumber(t.Ctx(), big.NewInt(-1)) + require.NoError(t, err, "need l1 pending header for gas price estimation") + gasFeeCap := new(big.Int).Add(gasTipCap, new(big.Int).Mul(pendingHeader.BaseFee, big.NewInt(2))) + + var txData types.TxData + rawTx := &types.DynamicFeeTx{ + ChainID: s.rollupCfg.L1ChainID, + Nonce: nonce, + To: &s.rollupCfg.BatchInboxAddress, + GasTipCap: gasTipCap, + GasFeeCap: gasFeeCap, + Data: batchedComm.TxData(), + } + for _, opt := range txOpts { + opt(rawTx) + } + + gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true, false) + require.NoError(t, err, "need to compute intrinsic gas") + rawTx.Gas = gas + txData = rawTx + + tx, err := types.SignNewTx(s.l2BatcherCfg.BatcherKey, s.l1Signer, txData) + require.NoError(t, err, "need to sign tx") + + err = s.l1.SendTransaction(t.Ctx(), tx) + require.NoError(t, err, "need to send tx") + s.LastSubmitted = tx +} + func (s *L2Batcher) ActL2BatchSubmitMultiBlob(t Testing, numBlobs int) { if s.l2BatcherCfg.DataAvailabilityType != batcherFlags.BlobsType { t.InvalidAction("ActL2BatchSubmitMultiBlob only available for Blobs DA type") diff --git a/op-node/rollup/derive/altda_data_source.go b/op-node/rollup/derive/altda_data_source.go index 2945a2a9e57b2..bbfe864896ec7 100644 --- a/op-node/rollup/derive/altda_data_source.go +++ b/op-node/rollup/derive/altda_data_source.go @@ -19,8 +19,8 @@ type AltDADataSource struct { fetcher AltDAInputFetcher l1 L1Fetcher id eth.L1BlockRef - // keep track of a pending commitment so we can keep trying to fetch the input. - comm altda.CommitmentData + // keep track of a pending commitments so we can keep trying to fetch the input. + comms []altda.CommitmentData } func NewAltDADataSource(log log.Logger, src DataIter, l1 L1Fetcher, fetcher AltDAInputFetcher, id eth.L1BlockRef) *AltDADataSource { @@ -45,7 +45,7 @@ func (s *AltDADataSource) Next(ctx context.Context) (eth.Data, error) { return nil, NewTemporaryError(fmt.Errorf("failed to advance altDA L1 origin: %w", err)) } - if s.comm == nil { + if len(s.comms) == 0 { // the l1 source returns the input commitment for the batch. data, err := s.src.Next(ctx) if err != nil { @@ -68,10 +68,24 @@ func (s *AltDADataSource) Next(ctx context.Context) (eth.Data, error) { s.log.Warn("invalid commitment", "commitment", data, "err", err) return nil, NotEnoughData } - s.comm = comm + + if batchedComm, ok := comm.(altda.BatchedCommitment); ok { + // The commitment implements BatchedCommitmentData + s.comms, err = batchedComm.GetCommitments() + if err != nil { + s.log.Warn("invalid commitment", "commitment", data, "err", err) + return nil, NotEnoughData + } + } else { + // The commitment is not a BatchedComitment + s.comms = []altda.CommitmentData{comm} + } } + + currComm := s.comms[0] + // use the commitment to fetch the input from the AltDA provider. - data, err := s.fetcher.GetInput(ctx, s.l1, s.comm, s.id) + data, err := s.fetcher.GetInput(ctx, s.l1, currComm, s.id) // GetInput may call for a reorg if the pipeline is stalled and the AltDA manager // continued syncing origins detached from the pipeline origin. if errors.Is(err, altda.ErrReorgRequired) { @@ -79,26 +93,27 @@ func (s *AltDADataSource) Next(ctx context.Context) (eth.Data, error) { return nil, NewResetError(err) } else if errors.Is(err, altda.ErrExpiredChallenge) { // this commitment was challenged and the challenge expired. - s.log.Warn("challenge expired, skipping batch", "comm", s.comm) - s.comm = nil - // skip the input + s.log.Warn("challenge expired, skipping batch", "comm", currComm) + // skip this commitment + s.comms = s.comms[1:] return s.Next(ctx) } else if errors.Is(err, altda.ErrMissingPastWindow) { - return nil, NewCriticalError(fmt.Errorf("data for comm %s not available: %w", s.comm, err)) + return nil, NewCriticalError(fmt.Errorf("data for comm %s not available: %w", currComm, err)) } else if errors.Is(err, altda.ErrPendingChallenge) { // continue stepping without slowing down. return nil, NotEnoughData } else if err != nil { // return temporary error so we can keep retrying. - return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %s from da service: %w", s.comm, err)) + return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %s from da service: %w", currComm, err)) } // inputs are limited to a max size to ensure they can be challenged in the DA contract. - if s.comm.CommitmentType() == altda.Keccak256CommitmentType && len(data) > altda.MaxInputSize { + // TODO: maybe abstract this into a CommitmentData ValidateInput function? + if currComm.CommitmentType() == altda.Keccak256CommitmentType && len(data) > altda.MaxInputSize { s.log.Warn("input data exceeds max size", "size", len(data), "max", altda.MaxInputSize) - s.comm = nil + s.comms = s.comms[1:] return s.Next(ctx) } // reset the commitment so we can fetch the next one from the source at the next iteration. - s.comm = nil + s.comms = s.comms[1:] return data, nil }