From 0ee6a3a8101b5950d3c1041a2877034ecdc2226b Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Wed, 15 Apr 2026 15:59:45 +0300 Subject: [PATCH 1/8] bump bft-core --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index c818fdf..50ca2d8 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,8 @@ require ( github.com/spf13/viper v1.21.0 github.com/stretchr/testify v1.11.1 // use trust base store and generic storage implementation from bft-core for now - github.com/unicitynetwork/bft-core v1.0.2-0.20260316094447-b26e124a6938 - github.com/unicitynetwork/bft-go-base v1.0.3-0.20260316092951-afcfbc83f42f + github.com/unicitynetwork/bft-core v1.1.0 + github.com/unicitynetwork/bft-go-base v1.1.0 golang.org/x/sync v0.19.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index dedf4eb..1ad0b33 100644 --- a/go.sum +++ b/go.sum @@ -448,10 +448,10 @@ github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= -github.com/unicitynetwork/bft-core v1.0.2-0.20260316094447-b26e124a6938 h1:eK9OeFmqILTQTp9XNGonBDewxmVJH+hyP/c6+K/vE1g= -github.com/unicitynetwork/bft-core v1.0.2-0.20260316094447-b26e124a6938/go.mod h1:yYVnx6iuEJ1VC2D2ax/GYM/n2ECeAUnxO4yUetRoTaQ= -github.com/unicitynetwork/bft-go-base v1.0.3-0.20260316092951-afcfbc83f42f h1:HiqLjm+aM0VX5oLJLAEiqZCUnQhf1KrL6rCpco9FMFA= -github.com/unicitynetwork/bft-go-base v1.0.3-0.20260316092951-afcfbc83f42f/go.mod h1:hBnOG52VRy/vpgIBUulTgk7PBTwODZ2xkVjCEu5yRcQ= +github.com/unicitynetwork/bft-core v1.1.0 h1:VBoC0XyEsQoT7FTa2r3zJ+EL/CFERvxZFFnhJbhly08= +github.com/unicitynetwork/bft-core v1.1.0/go.mod h1:ish6bE4r7/XxhL1G9T7PSNBiRIj5W63QCYVYXoukJPc= +github.com/unicitynetwork/bft-go-base v1.1.0 h1:x1+kX0X+n4CmNibBs0f8oWwoZ5UCW40Gaq6WYhUcUZQ= +github.com/unicitynetwork/bft-go-base v1.1.0/go.mod h1:hBnOG52VRy/vpgIBUulTgk7PBTwODZ2xkVjCEu5yRcQ= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= From 6f0d5969c61374baf010f5bfd196260ad4c505a7 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Wed, 15 Apr 2026 16:14:08 +0300 Subject: [PATCH 2/8] cap libp2p message sizes --- network/cbor.go | 5 +++++ network/cbor_test.go | 12 ++++++++++++ 2 files changed, 17 insertions(+) diff --git a/network/cbor.go b/network/cbor.go index b6fc118..bbd7916 100644 --- a/network/cbor.go +++ b/network/cbor.go @@ -20,6 +20,8 @@ func serializeMsg(msg any) ([]byte, error) { return append(lengthBytes[:bytesWritten], data...), nil } +const maxMsgSize = 16 * 1024 * 1024 // 16 MB + func deserializeMsg(r io.Reader, msg any) error { src := bufio.NewReader(r) // read data length @@ -30,6 +32,9 @@ func deserializeMsg(r io.Reader, msg any) error { if length64 == 0 { return fmt.Errorf("unexpected data length zero") } + if length64 > maxMsgSize { + return fmt.Errorf("message size %d exceeds maximum %d", length64, maxMsgSize) + } lengthInt64 := int64(length64) /* #nosec G115 its unlikely that value of length64 exceeds int64 max value */ if err := types.Cbor.Decode(io.LimitReader(src, lengthInt64), msg); err != nil { diff --git a/network/cbor_test.go b/network/cbor_test.go index be58bdd..14f16e0 100644 --- a/network/cbor_test.go +++ b/network/cbor_test.go @@ -2,7 +2,9 @@ package network import ( "bytes" + "encoding/binary" "errors" + "fmt" "slices" "testing" @@ -115,4 +117,14 @@ func Test_deserializeMsg(t *testing.T) { err := deserializeMsg(bytes.NewReader(data), &dest) require.EqualError(t, err, `unexpected data length zero`) }) + + t.Run("length exceeds maximum message size", func(t *testing.T) { + // encode a length value that exceeds maxMsgSize + var buf [binary.MaxVarintLen64]byte + n := binary.PutUvarint(buf[:], maxMsgSize+1) + + var dest testMsg + err := deserializeMsg(bytes.NewReader(buf[:n]), &dest) + require.EqualError(t, err, fmt.Sprintf("message size %d exceeds maximum %d", maxMsgSize+1, maxMsgSize)) + }) } From 924a01a1553a9bfcd0c1a8bf6bb42b7003c71753 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Wed, 15 Apr 2026 16:18:40 +0300 Subject: [PATCH 3/8] close stream in SendMsgs --- network/libp2p_network.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/network/libp2p_network.go b/network/libp2p_network.go index 2afcd35..c2e2a69 100644 --- a/network/libp2p_network.go +++ b/network/libp2p_network.go @@ -105,6 +105,11 @@ func (n *LibP2PNetwork) Send(ctx context.Context, msg any, receivers ...peer.ID) func (n *LibP2PNetwork) SendMsgs(ctx context.Context, messages MsgQueue, receiver peer.ID) (resErr error) { var stream libp2pNetwork.Stream var err error + defer func() { + if stream != nil { + stream.Close() + } + }() for messages.Len() > 0 { msg := messages.PopFront() // create a stream with first message From f4156d309091fa8da1af32e73172265a15fa4517 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Wed, 15 Apr 2026 17:26:48 +0300 Subject: [PATCH 4/8] refactor node recovery Handle replication requests on dedicated replication thread, this prevents possible goroutine exhaustion attack vector and cleanly handles shutdown. --- .../replication/ledger_replication.go | 3 + partition/node.go | 11 + partition/node_init.go | 1 + partition/recovery.go | 107 ++++--- partition/recovery_test.go | 300 ++++++++++++++++++ 5 files changed, 383 insertions(+), 39 deletions(-) create mode 100644 partition/recovery_test.go diff --git a/network/protocol/replication/ledger_replication.go b/network/protocol/replication/ledger_replication.go index 10b2060..345fb71 100644 --- a/network/protocol/replication/ledger_replication.go +++ b/network/protocol/replication/ledger_replication.go @@ -14,6 +14,7 @@ const ( WrongShard BlocksNotFound Unknown + Busy ) var ( @@ -113,6 +114,8 @@ func (s Status) String() string { return "Wrong Partition or Shard Identifier" case Unknown: return "Unknown" + case Busy: + return "Busy" } return "Unknown Status Code" } diff --git a/partition/node.go b/partition/node.go index b5cdfae..93e00a7 100644 --- a/partition/node.go +++ b/partition/node.go @@ -59,6 +59,11 @@ type ( CommittedUC() *types.UnicityCertificate } + replicationRequest struct { + ctx context.Context + req *replication.LedgerReplicationRequest + } + roundTimer struct { isRunning atomic.Bool cancel context.CancelFunc @@ -81,6 +86,7 @@ type ( // ---- recovery ---- recoveryLastProp *blockproposal.BlockProposal lastLedgerReqTime time.Time + replicationCh chan replicationRequest // ---- persistence ---- blockStore keyvaluedb.KeyValueDB @@ -114,6 +120,11 @@ func (n *Node) Run(ctx context.Context) error { return err }) + g.Go(func() error { + n.replicationLoop(ctx) + return nil + }) + return g.Wait() } diff --git a/partition/node_init.go b/partition/node_init.go index 553589e..c67df7d 100644 --- a/partition/node_init.go +++ b/partition/node_init.go @@ -38,6 +38,7 @@ func NewNode(ctx context.Context, txSystem TransactionSystem, conf *NodeConf, lo trustBaseStore: conf.trustBaseStore, network: conf.validatorNetwork, lastLedgerReqTime: time.Time{}, + replicationCh: make(chan replicationRequest, 1), log: log, } diff --git a/partition/recovery.go b/partition/recovery.go index d71d72e..11b5f31 100644 --- a/partition/recovery.go +++ b/partition/recovery.go @@ -128,49 +128,78 @@ func (n *Node) handleLedgerReplicationRequest(ctx context.Context, lr *replicati return n.sendLedgerReplicationResponse(ctx, resp, lr.NodeID) } n.log.DebugContext(ctx, fmt.Sprintf("Preparing replication response from block %d", startBlock)) - go func() { - blocks := make([]*types.Block, 0) - blockCnt := uint64(0) - dbIt := n.blockStore.Find(util.Uint64ToBytes(startBlock)) - defer func() { - if err := dbIt.Close(); err != nil { - n.log.WarnContext(ctx, "closing DB iterator", logger.Error(err)) - } - }() - var firstFetchedBlockNumber uint64 - var lastFetchedBlockNumber uint64 - var lastFetchedBlock *types.Block - for ; dbIt.Valid(); dbIt.Next() { - var bl types.Block - roundNo := util.BytesToUint64(dbIt.Key()) - if err := dbIt.Value(&bl); err != nil { - n.log.WarnContext(ctx, fmt.Sprintf("Ledger replication reply incomplete, failed to read block %d", roundNo), logger.Error(err)) - break - } - lastFetchedBlock = &bl - if firstFetchedBlockNumber == 0 { - firstFetchedBlockNumber = roundNo - } - lastFetchedBlockNumber = roundNo - blocks = append(blocks, lastFetchedBlock) - blockCnt++ - if blockCnt >= n.conf.replicationConfig.maxReturnBlocks || - (roundNo >= lr.EndBlockNumber && lr.EndBlockNumber > 0) { - break - } - } + select { + case n.replicationCh <- replicationRequest{ctx: ctx, req: lr}: + // worker will process + default: resp := &replication.LedgerReplicationResponse{ - UUID: lr.UUID, - Status: replication.Ok, - Blocks: blocks, - FirstBlockNumber: firstFetchedBlockNumber, - LastBlockNumber: lastFetchedBlockNumber, + UUID: lr.UUID, + Status: replication.Busy, + Message: "Too many concurrent replication requests", + } + return n.sendLedgerReplicationResponse(ctx, resp, lr.NodeID) + } + return nil +} + +func (n *Node) replicationLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case r := <-n.replicationCh: + n.processReplicationRequest(r.ctx, r.req) } - if err := n.sendLedgerReplicationResponse(ctx, resp, lr.NodeID); err != nil { - n.log.WarnContext(ctx, fmt.Sprintf("Problem sending ledger replication response, %s", resp.Pretty()), logger.Error(err)) + } +} + +func (n *Node) processReplicationRequest(ctx context.Context, lr *replication.LedgerReplicationRequest) { + startBlock := lr.BeginBlockNumber + blocks := make([]*types.Block, 0) + blockCnt := uint64(0) + dbIt := n.blockStore.Find(util.Uint64ToBytes(startBlock)) + defer func() { + if err := dbIt.Close(); err != nil { + n.log.WarnContext(ctx, "closing DB iterator", logger.Error(err)) } }() - return nil + var firstFetchedBlockNumber uint64 + var lastFetchedBlockNumber uint64 + var lastFetchedBlock *types.Block + for ; dbIt.Valid(); dbIt.Next() { + select { + case <-ctx.Done(): + return + default: + } + var bl types.Block + roundNo := util.BytesToUint64(dbIt.Key()) + if err := dbIt.Value(&bl); err != nil { + n.log.WarnContext(ctx, fmt.Sprintf("Ledger replication reply incomplete, failed to read block %d", roundNo), logger.Error(err)) + break + } + lastFetchedBlock = &bl + if firstFetchedBlockNumber == 0 { + firstFetchedBlockNumber = roundNo + } + lastFetchedBlockNumber = roundNo + blocks = append(blocks, lastFetchedBlock) + blockCnt++ + if blockCnt >= n.conf.replicationConfig.maxReturnBlocks || + (roundNo >= lr.EndBlockNumber && lr.EndBlockNumber > 0) { + break + } + } + resp := &replication.LedgerReplicationResponse{ + UUID: lr.UUID, + Status: replication.Ok, + Blocks: blocks, + FirstBlockNumber: firstFetchedBlockNumber, + LastBlockNumber: lastFetchedBlockNumber, + } + if err := n.sendLedgerReplicationResponse(ctx, resp, lr.NodeID); err != nil { + n.log.WarnContext(ctx, fmt.Sprintf("Problem sending ledger replication response, %s", resp.Pretty()), logger.Error(err)) + } } // handleLedgerReplicationResponse handles ledger replication responses from other partition nodes. diff --git a/partition/recovery_test.go b/partition/recovery_test.go new file mode 100644 index 0000000..eda229d --- /dev/null +++ b/partition/recovery_test.go @@ -0,0 +1,300 @@ +package partition + +import ( + "context" + gocrypto "crypto" + "testing" + "time" + + "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + "github.com/unicitynetwork/bft-core/keyvaluedb" + "github.com/unicitynetwork/bft-core/keyvaluedb/memorydb" + "github.com/unicitynetwork/bft-core/rootchain/consensus/trustbase" + abcrypto "github.com/unicitynetwork/bft-go-base/crypto" + "github.com/unicitynetwork/bft-go-base/types" + "github.com/unicitynetwork/bft-go-base/util" + + testcertificates "github.com/unicitynetwork/finality-gadget/internal/testutils/certificates" + testlogger "github.com/unicitynetwork/finality-gadget/internal/testutils/logger" + testpeer "github.com/unicitynetwork/finality-gadget/internal/testutils/peer" + testsig "github.com/unicitynetwork/finality-gadget/internal/testutils/sig" + testtrustbase "github.com/unicitynetwork/finality-gadget/internal/testutils/trustbase" + "github.com/unicitynetwork/finality-gadget/network/protocol/replication" +) + +type MockValidatorNetwork struct { + sentMessages []any +} + +func (m *MockValidatorNetwork) Send(_ context.Context, msg any, _ ...peer.ID) error { + m.sentMessages = append(m.sentMessages, msg) + return nil +} + +func (m *MockValidatorNetwork) ReceivedChannel() <-chan any { return nil } + +func (m *MockValidatorNetwork) RegisterValidatorProtocols() error { return nil } + +func (m *MockValidatorNetwork) lastResponse() *replication.LedgerReplicationResponse { + if len(m.sentMessages) == 0 { + return nil + } + return m.sentMessages[len(m.sentMessages)-1].(*replication.LedgerReplicationResponse) +} + +type testNodeOpts struct { + blockCount int + maxReturnBlocks uint64 + committedRound uint64 + firstRound uint64 + partitionID types.PartitionID +} + +func createTestNode(t *testing.T, opts testNodeOpts) (*Node, *MockValidatorNetwork) { + t.Helper() + log := testlogger.New(t) + + if opts.maxReturnBlocks == 0 { + opts.maxReturnBlocks = 1000 + } + if opts.partitionID == 0 { + opts.partitionID = 3 + } + + signer, verifier := testsig.CreateSignerAndVerifier(t) + peerIDs := testpeer.GeneratePeerIDs(t, 1) + nodeInfo := testtrustbase.NewNodeInfoFromVerifier(t, peerIDs[0].String(), verifier) + trustBase := testtrustbase.NewTrustBase(t, signer) + + shardConf := &types.PartitionDescriptionRecord{ + Version: 1, + NetworkID: 3, + PartitionID: opts.partitionID, + ShardID: types.ShardID{}, + PartitionTypeID: 3, + TypeIDLen: 8, + UnitIDLen: 256, + T2Timeout: 2500 * time.Millisecond, + Epoch: 0, + EpochStart: 1, + Validators: []*types.NodeInfo{nodeInfo}, + } + + blockDB := memorydb.New() + var lastUC *types.UnicityCertificate + if opts.blockCount > 0 { + lastUC = createTestBlocks(t, blockDB, signer, shardConf, opts.blockCount) + } + + shardDB := memorydb.New() + shardConfStore, err := NewShardConfStore(shardDB, log) + require.NoError(t, err) + require.NoError(t, shardConfStore.Store(shardConf)) + + trustBaseStore, err := trustbase.NewTrustBaseStore(memorydb.New(), log) + require.NoError(t, err) + require.NoError(t, trustBaseStore.Store(trustBase)) + + committedRound := opts.committedRound + if committedRound == 0 && opts.blockCount > 0 { + committedRound = uint64(opts.blockCount) + } + + committedUC := &types.UnicityCertificate{ + Version: 1, + InputRecord: &types.InputRecord{RoundNumber: committedRound}, + } + if lastUC != nil && committedRound == uint64(opts.blockCount) { + committedUC = lastUC + } + + mockNet := &MockValidatorNetwork{} + + conf := &NodeConf{ + replicationConfig: ledgerReplicationConfig{ + maxReturnBlocks: opts.maxReturnBlocks, + maxFetchBlocks: 1000, + timeout: 1500 * time.Millisecond, + }, + shardConfStore: shardConfStore, + trustBaseStore: trustBaseStore, + } + + fuc := &types.UnicityCertificate{ + Version: 1, + InputRecord: &types.InputRecord{RoundNumber: opts.firstRound}, + } + + node := &Node{ + conf: conf, + transactionSystem: &MockTransactionSystem{committedUC: committedUC}, + blockStore: blockDB, + state: &ConsensusState{status: normal, fuc: fuc}, + network: mockNet, + replicationCh: make(chan replicationRequest, 1), + log: log, + } + node.shardConf.Store(shardConf) + + return node, mockNet +} + +func createTestBlocks(t *testing.T, db keyvaluedb.KeyValueDB, signer abcrypto.Signer, shardConf *types.PartitionDescriptionRecord, count int) *types.UnicityCertificate { + t.Helper() + prevBlockHash := []byte("genesis") + var prevStateHash []byte + var lastUC *types.UnicityCertificate + + for i := 1; i <= count; i++ { + h := &types.Header{ + Version: 1, + PartitionID: shardConf.PartitionID, + PreviousBlockHash: prevBlockHash, + ProposerID: shardConf.Validators[0].NodeID, + } + stateHash := []byte{byte(i)} + blockHash, err := types.BlockHash(gocrypto.SHA256, h, []*types.TransactionRecord{}, stateHash, prevStateHash) + require.NoError(t, err) + + ir := &types.InputRecord{ + RoundNumber: uint64(i), + Hash: stateHash, + PreviousHash: prevStateHash, + BlockHash: blockHash, + Epoch: 0, + SummaryValue: []byte{}, + ETHash: []byte{}, + Timestamp: uint64(time.Now().UnixMilli()), + } + + uc := testcertificates.CreateUnicityCertificate(t, signer, ir, shardConf, uint64(i), nil, make([]byte, 32)) + ucBytes, err := types.Cbor.Marshal(uc) + require.NoError(t, err) + + b := &types.Block{ + Header: h, + Transactions: make([]*types.TransactionRecord, 0), + UnicityCertificate: ucBytes, + } + require.NoError(t, db.Write(util.Uint64ToBytes(uint64(i)), b)) + + prevBlockHash = uc.GetBlockHash() + prevStateHash = uc.GetStateHash() + lastUC = uc + } + return lastUC +} + +func newReplicationRequest(partitionID types.PartitionID, shardID types.ShardID, nodeID string, begin, end uint64) *replication.LedgerReplicationRequest { + return &replication.LedgerReplicationRequest{ + UUID: uuid.New(), + PartitionID: partitionID, + ShardID: shardID, + NodeID: nodeID, + BeginBlockNumber: begin, + EndBlockNumber: end, + } +} + +func TestProcessReplicationRequest_ReturnsRequestedBlocks(t *testing.T) { + // when node has blocks 1-5 + node, mockNet := createTestNode(t, testNodeOpts{blockCount: 5}) + peerIDs := testpeer.GeneratePeerIDs(t, 1) + + // and request asks for blocks 2-4 + req := newReplicationRequest(node.PartitionID(), node.ShardID(), peerIDs[0].String(), 2, 4) + node.processReplicationRequest(t.Context(), req) + + // then exactly blocks 2-4 are returned + resp := mockNet.lastResponse() + require.NotNil(t, resp) + require.Equal(t, replication.Ok, resp.Status) + require.Len(t, resp.Blocks, 3) + require.Equal(t, uint64(2), resp.FirstBlockNumber) + require.Equal(t, uint64(4), resp.LastBlockNumber) +} + +func TestProcessReplicationRequest_RespectsMaxReturnBlocks(t *testing.T) { + // when node is configured to return 3 blocks at most + node, mockNet := createTestNode(t, testNodeOpts{blockCount: 10, maxReturnBlocks: 3}) + peerIDs := testpeer.GeneratePeerIDs(t, 1) + + // and request asks for 10 blocks + req := newReplicationRequest(node.PartitionID(), node.ShardID(), peerIDs[0].String(), 1, 10) + node.processReplicationRequest(t.Context(), req) + + // then only 3 blocks are returned + resp := mockNet.lastResponse() + require.NotNil(t, resp) + require.Equal(t, replication.Ok, resp.Status) + require.Len(t, resp.Blocks, 3) + require.Equal(t, uint64(1), resp.FirstBlockNumber) + require.Equal(t, uint64(3), resp.LastBlockNumber) +} + +func TestProcessReplicationRequest_CancelledContext(t *testing.T) { + // when context is already cancelled + node, mockNet := createTestNode(t, testNodeOpts{blockCount: 5}) + peerIDs := testpeer.GeneratePeerIDs(t, 1) + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + // and a replication request is processed + req := newReplicationRequest(node.PartitionID(), node.ShardID(), peerIDs[0].String(), 1, 5) + node.processReplicationRequest(ctx, req) + + // then no response is sent + require.Empty(t, mockNet.sentMessages) +} + +func TestHandleLedgerReplicationRequest_BusyWhenChannelFull(t *testing.T) { + // when replication worker is already busy + node, mockNet := createTestNode(t, testNodeOpts{blockCount: 5}) + peerIDs := testpeer.GeneratePeerIDs(t, 1) + node.replicationCh <- replicationRequest{} + + // and another replication request arrives + req := newReplicationRequest(node.PartitionID(), node.ShardID(), peerIDs[0].String(), 1, 5) + err := node.handleLedgerReplicationRequest(t.Context(), req) + require.NoError(t, err) + + // then the request is rejected with Busy status + resp := mockNet.lastResponse() + require.NotNil(t, resp) + require.Equal(t, replication.Busy, resp.Status) +} + +func TestHandleLedgerReplicationRequest_BlocksNotFound(t *testing.T) { + // when node has blocks up to round 3 + node, mockNet := createTestNode(t, testNodeOpts{blockCount: 3, committedRound: 3}) + peerIDs := testpeer.GeneratePeerIDs(t, 1) + + // and request asks for blocks starting at round 5 + req := newReplicationRequest(node.PartitionID(), node.ShardID(), peerIDs[0].String(), 5, 10) + err := node.handleLedgerReplicationRequest(t.Context(), req) + require.NoError(t, err) + + // then BlocksNotFound is returned + resp := mockNet.lastResponse() + require.NotNil(t, resp) + require.Equal(t, replication.BlocksNotFound, resp.Status) +} + +func TestHandleLedgerReplicationRequest_WrongShard(t *testing.T) { + // when node belongs to partition 3 + node, mockNet := createTestNode(t, testNodeOpts{blockCount: 3, partitionID: 3}) + peerIDs := testpeer.GeneratePeerIDs(t, 1) + + // and request is for partition 99 + req := newReplicationRequest(99, node.ShardID(), peerIDs[0].String(), 1, 3) + err := node.handleLedgerReplicationRequest(t.Context(), req) + require.NoError(t, err) + + // then WrongShard is returned + resp := mockNet.lastResponse() + require.NotNil(t, resp) + require.Equal(t, replication.WrongShard, resp.Status) +} From 0bc2ed48841db328d11e806c17b1e3f9ba36f18d Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Wed, 15 Apr 2026 21:57:05 +0300 Subject: [PATCH 5/8] prohibit dFG=0 --- txsystem/fgp_txsystem.go | 14 ++++++++++++++ txsystem/fgp_txsystem_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/txsystem/fgp_txsystem.go b/txsystem/fgp_txsystem.go index ef43460..bc44c71 100644 --- a/txsystem/fgp_txsystem.go +++ b/txsystem/fgp_txsystem.go @@ -43,6 +43,9 @@ func NewFGPTxSystem(shardConf types.PartitionDescriptionRecord, powClient powtyp if err != nil { return nil, fmt.Errorf("non numeric dFG defined in shard conf: %w", err) } + if dFG == 0 { + return nil, errors.New("dFG must be greater than 0") + } return &FGPTxSystem{ log: log, shardConf: shardConf, @@ -65,6 +68,9 @@ func (s *FGPTxSystem) UpdateConfig(shardConf *types.PartitionDescriptionRecord) if err != nil { return fmt.Errorf("non numeric dFG defined in shard conf: %w", err) } + if dFG == 0 { + return errors.New("dFG must be greater than 0") + } s.mu.Lock() defer s.mu.Unlock() @@ -146,6 +152,10 @@ func (s *FGPTxSystem) LeaderPropose(ctx context.Context, round uint64) (*state.S if t.Status == "active" { continue } + // sanity check: in a valid PoW chain, branchlen can never exceed height + if t.BranchLen > t.Height { + return nil, fmt.Errorf("invalid chain tip: branch length %d exceeds height %d", t.BranchLen, t.Height) + } forkHeight := t.Height - t.BranchLen if t.Height >= candidateHeight && forkHeight < candidateHeight { return nil, fmt.Errorf("competing fork visible: branch at height %d diverged at %d", t.Height, forkHeight) @@ -193,6 +203,10 @@ func (s *FGPTxSystem) FollowerVerify(ctx context.Context, round uint64, proposed } // 4. Ensure the block has sufficient confirmations + // sanity check: tip should never be behind a block that the node returned as active + if tip.Height < block.Height { + return nil, fmt.Errorf("proposed block height %d is ahead of tip height %d", block.Height, tip.Height) + } if tip.Height-block.Height < s.dFG-1 { return nil, fmt.Errorf("proposed block does not have sufficient confirmations (depth %d, required %d)", tip.Height-block.Height, s.dFG) } diff --git a/txsystem/fgp_txsystem_test.go b/txsystem/fgp_txsystem_test.go index 69b8999..5808f26 100644 --- a/txsystem/fgp_txsystem_test.go +++ b/txsystem/fgp_txsystem_test.go @@ -14,6 +14,31 @@ import ( powtypes "github.com/unicitynetwork/finality-gadget/pow/types" ) +func TestNewFGPTxSystem_dFGZero(t *testing.T) { + log := logger.New(t) + mockClient := &mockPowClient{} + + _, err := NewFGPTxSystem(types.PartitionDescriptionRecord{ + PartitionParams: map[string]string{"dFG": "0"}, + }, mockClient, log) + require.ErrorContains(t, err, "dFG must be greater than 0") +} + +func TestUpdateConfig_dFGZero(t *testing.T) { + log := logger.New(t) + mockClient := &mockPowClient{} + + s, err := NewFGPTxSystem(types.PartitionDescriptionRecord{ + PartitionParams: map[string]string{"dFG": "1"}, + }, mockClient, log) + require.NoError(t, err) + + err = s.UpdateConfig(&types.PartitionDescriptionRecord{ + PartitionParams: map[string]string{"dFG": "0"}, + }) + require.ErrorContains(t, err, "dFG must be greater than 0") +} + func TestProposeBlock(t *testing.T) { log := logger.New(t) From 7cc4c1b8b4ae1715605eeb0476738cbb6f7b2d59 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Wed, 15 Apr 2026 23:14:16 +0300 Subject: [PATCH 6/8] better genesis check --- txsystem/fgp_txsystem.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/txsystem/fgp_txsystem.go b/txsystem/fgp_txsystem.go index bc44c71..5ec3dde 100644 --- a/txsystem/fgp_txsystem.go +++ b/txsystem/fgp_txsystem.go @@ -22,7 +22,6 @@ type FGPTxSystem struct { log *slog.Logger shardConf types.PartitionDescriptionRecord powClient powtypes.Client - genesisHash []byte // genesis hash is nil dFG uint64 // number of PoW confirmations required for finalization committedStateHash []byte // PoW block hash committedStateHeight uint64 // PoW block height @@ -52,7 +51,6 @@ func NewFGPTxSystem(shardConf types.PartitionDescriptionRecord, powClient powtyp powClient: powClient, dFG: dFG, committedStateHash: nil, - genesisHash: nil, // initial UC hash is nil summaryValue: []byte{}, // always empty non-nil constant for FGP, nil is not allowed by the BFT nodes sumOfEarnedFees: 0, // always 0 for FGP etHash: nil, // always nil for FGP @@ -111,7 +109,7 @@ func (s *FGPTxSystem) LeaderPropose(ctx context.Context, round uint64) (*state.S // 2. Find the height of the latest FG-certified PoW block (since we do not store them in blocks we have to query it) // if no certified blocks yet (genesis block) then set hFG=0 var hFG uint64 - if !bytes.Equal(s.pendingStateHash, s.genesisHash) { + if s.committedStateHash != nil { hexHash := hex.EncodeToString(s.committedStateHash) lastCertBlock, err := s.powClient.GetBlockHeaderByHash(ctx, hexHash) if err != nil { From 1718933eb97cb50c66c7e4874f070a825c33b595 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Wed, 15 Apr 2026 23:24:11 +0300 Subject: [PATCH 7/8] close databases --- cmd/run.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 6296243..d75b6cd 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -9,7 +9,6 @@ import ( "time" "github.com/spf13/cobra" - "github.com/unicitynetwork/bft-core/keyvaluedb" "github.com/unicitynetwork/bft-core/keyvaluedb/boltdb" "github.com/unicitynetwork/bft-core/rootchain/consensus/trustbase" "github.com/unicitynetwork/bft-go-base/types" @@ -135,6 +134,7 @@ func runNode(ctx context.Context, flags *cliFlags, cmd *cobra.Command) error { if err != nil { return err } + defer shardConfDB.Close() shardConfStore, err := partition.NewShardConfStore(shardConfDB, log) if err != nil { return err @@ -174,6 +174,7 @@ func runNode(ctx context.Context, flags *cliFlags, cmd *cobra.Command) error { if err != nil { return err } + defer trustBaseDB.Close() trustBaseStore, err := trustbase.NewTrustBaseStore(trustBaseDB, log) if err != nil { return fmt.Errorf("failed to create trust base store: %w", err) @@ -191,6 +192,7 @@ func runNode(ctx context.Context, flags *cliFlags, cmd *cobra.Command) error { if err != nil { return err } + defer blockDB.Close() bootstrapConnectRetry := &network.BootstrapConnectRetry{ Count: flags.BootstrapConnectRetryCount, @@ -238,7 +240,7 @@ func (f *cliFlags) pathWithDefault(path string, defaultFileName string) string { return filepath.Join(f.HomeDir, defaultFileName) } -func (f *cliFlags) initDB(path string, defaultFileName string) (keyvaluedb.KeyValueDB, error) { +func (f *cliFlags) initDB(path string, defaultFileName string) (*boltdb.BoltDB, error) { path = f.pathWithDefault(path, defaultFileName) db, err := boltdb.New(path) if err != nil { From 45560988e9b5a9f0d80fc8633b8cd2a88614cb00 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Wed, 15 Apr 2026 23:51:48 +0300 Subject: [PATCH 8/8] return fatal error instead of logging --- partition/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/partition/node.go b/partition/node.go index 93e00a7..4f8652a 100644 --- a/partition/node.go +++ b/partition/node.go @@ -108,7 +108,7 @@ type ( func (n *Node) Run(ctx context.Context) error { if err := n.network.RegisterValidatorProtocols(); err != nil { - n.log.ErrorContext(ctx, "Failed to register validator protocols", logger.Error(err)) + return fmt.Errorf("failed to register validator protocols: %w", err) } n.sendHandshake(ctx)