Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/spf13/cobra"
"github.com/unicitynetwork/bft-core/keyvaluedb"
"github.com/unicitynetwork/bft-core/keyvaluedb/boltdb"
"github.com/unicitynetwork/bft-core/rootchain/consensus/trustbase"
"github.com/unicitynetwork/bft-go-base/types"
Expand Down Expand Up @@ -135,6 +134,7 @@ func runNode(ctx context.Context, flags *cliFlags, cmd *cobra.Command) error {
if err != nil {
return err
}
defer shardConfDB.Close()
shardConfStore, err := partition.NewShardConfStore(shardConfDB, log)
if err != nil {
return err
Expand Down Expand Up @@ -174,6 +174,7 @@ func runNode(ctx context.Context, flags *cliFlags, cmd *cobra.Command) error {
if err != nil {
return err
}
defer trustBaseDB.Close()
trustBaseStore, err := trustbase.NewTrustBaseStore(trustBaseDB, log)
if err != nil {
return fmt.Errorf("failed to create trust base store: %w", err)
Expand All @@ -191,6 +192,7 @@ func runNode(ctx context.Context, flags *cliFlags, cmd *cobra.Command) error {
if err != nil {
return err
}
defer blockDB.Close()

bootstrapConnectRetry := &network.BootstrapConnectRetry{
Count: flags.BootstrapConnectRetryCount,
Expand Down Expand Up @@ -238,7 +240,7 @@ func (f *cliFlags) pathWithDefault(path string, defaultFileName string) string {
return filepath.Join(f.HomeDir, defaultFileName)
}

func (f *cliFlags) initDB(path string, defaultFileName string) (keyvaluedb.KeyValueDB, error) {
func (f *cliFlags) initDB(path string, defaultFileName string) (*boltdb.BoltDB, error) {
path = f.pathWithDefault(path, defaultFileName)
db, err := boltdb.New(path)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ require (
github.com/spf13/viper v1.21.0
github.com/stretchr/testify v1.11.1
// use trust base store and generic storage implementation from bft-core for now
github.com/unicitynetwork/bft-core v1.0.2-0.20260316094447-b26e124a6938
github.com/unicitynetwork/bft-go-base v1.0.3-0.20260316092951-afcfbc83f42f
github.com/unicitynetwork/bft-core v1.1.0
github.com/unicitynetwork/bft-go-base v1.1.0
golang.org/x/sync v0.19.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,10 @@ github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/unicitynetwork/bft-core v1.0.2-0.20260316094447-b26e124a6938 h1:eK9OeFmqILTQTp9XNGonBDewxmVJH+hyP/c6+K/vE1g=
github.com/unicitynetwork/bft-core v1.0.2-0.20260316094447-b26e124a6938/go.mod h1:yYVnx6iuEJ1VC2D2ax/GYM/n2ECeAUnxO4yUetRoTaQ=
github.com/unicitynetwork/bft-go-base v1.0.3-0.20260316092951-afcfbc83f42f h1:HiqLjm+aM0VX5oLJLAEiqZCUnQhf1KrL6rCpco9FMFA=
github.com/unicitynetwork/bft-go-base v1.0.3-0.20260316092951-afcfbc83f42f/go.mod h1:hBnOG52VRy/vpgIBUulTgk7PBTwODZ2xkVjCEu5yRcQ=
github.com/unicitynetwork/bft-core v1.1.0 h1:VBoC0XyEsQoT7FTa2r3zJ+EL/CFERvxZFFnhJbhly08=
github.com/unicitynetwork/bft-core v1.1.0/go.mod h1:ish6bE4r7/XxhL1G9T7PSNBiRIj5W63QCYVYXoukJPc=
github.com/unicitynetwork/bft-go-base v1.1.0 h1:x1+kX0X+n4CmNibBs0f8oWwoZ5UCW40Gaq6WYhUcUZQ=
github.com/unicitynetwork/bft-go-base v1.1.0/go.mod h1:hBnOG52VRy/vpgIBUulTgk7PBTwODZ2xkVjCEu5yRcQ=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
Expand Down
5 changes: 5 additions & 0 deletions network/cbor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func serializeMsg(msg any) ([]byte, error) {
return append(lengthBytes[:bytesWritten], data...), nil
}

const maxMsgSize = 16 * 1024 * 1024 // 16 MB
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The maxMsgSize of 16MB might be too restrictive when combined with the default maxReturnBlocks of 1000 in ledger replication. If blocks are larger than ~16KB on average, the resulting LedgerReplicationResponse will exceed this limit and be rejected by the receiver. Consider either increasing this limit or ensuring that the replication logic can handle splitting large responses.


func deserializeMsg(r io.Reader, msg any) error {
src := bufio.NewReader(r)
// read data length
Expand All @@ -30,6 +32,9 @@ func deserializeMsg(r io.Reader, msg any) error {
if length64 == 0 {
return fmt.Errorf("unexpected data length zero")
}
if length64 > maxMsgSize {
return fmt.Errorf("message size %d exceeds maximum %d", length64, maxMsgSize)
}

lengthInt64 := int64(length64) /* #nosec G115 its unlikely that value of length64 exceeds int64 max value */
if err := types.Cbor.Decode(io.LimitReader(src, lengthInt64), msg); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions network/cbor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package network

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"slices"
"testing"

Expand Down Expand Up @@ -115,4 +117,14 @@ func Test_deserializeMsg(t *testing.T) {
err := deserializeMsg(bytes.NewReader(data), &dest)
require.EqualError(t, err, `unexpected data length zero`)
})

t.Run("length exceeds maximum message size", func(t *testing.T) {
// encode a length value that exceeds maxMsgSize
var buf [binary.MaxVarintLen64]byte
n := binary.PutUvarint(buf[:], maxMsgSize+1)

var dest testMsg
err := deserializeMsg(bytes.NewReader(buf[:n]), &dest)
require.EqualError(t, err, fmt.Sprintf("message size %d exceeds maximum %d", maxMsgSize+1, maxMsgSize))
})
}
5 changes: 5 additions & 0 deletions network/libp2p_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func (n *LibP2PNetwork) Send(ctx context.Context, msg any, receivers ...peer.ID)
func (n *LibP2PNetwork) SendMsgs(ctx context.Context, messages MsgQueue, receiver peer.ID) (resErr error) {
var stream libp2pNetwork.Stream
var err error
defer func() {
if stream != nil {
stream.Close()
}
}()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue #2 mentioned a second bug alongside the stream leak: SendMsgs still return nils at the end of the function (line 140), discarding any resErr accumulated when serialization of an individual message fails. Should be return resErr so the caller sees partial-failure errors.

for messages.Len() > 0 {
msg := messages.PopFront()
// create a stream with first message
Expand Down
3 changes: 3 additions & 0 deletions network/protocol/replication/ledger_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
WrongShard
BlocksNotFound
Unknown
Busy
)

var (
Expand Down Expand Up @@ -113,6 +114,8 @@ func (s Status) String() string {
return "Wrong Partition or Shard Identifier"
case Unknown:
return "Unknown"
case Busy:
return "Busy"
}
return "Unknown Status Code"
}
13 changes: 12 additions & 1 deletion partition/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ type (
CommittedUC() *types.UnicityCertificate
}

replicationRequest struct {
ctx context.Context
req *replication.LedgerReplicationRequest
}

roundTimer struct {
isRunning atomic.Bool
cancel context.CancelFunc
Expand All @@ -81,6 +86,7 @@ type (
// ---- recovery ----
recoveryLastProp *blockproposal.BlockProposal
lastLedgerReqTime time.Time
replicationCh chan replicationRequest

// ---- persistence ----
blockStore keyvaluedb.KeyValueDB
Expand All @@ -102,7 +108,7 @@ type (

func (n *Node) Run(ctx context.Context) error {
if err := n.network.RegisterValidatorProtocols(); err != nil {
n.log.ErrorContext(ctx, "Failed to register validator protocols", logger.Error(err))
return fmt.Errorf("failed to register validator protocols: %w", err)
}
n.sendHandshake(ctx)

Expand All @@ -114,6 +120,11 @@ func (n *Node) Run(ctx context.Context) error {
return err
})

g.Go(func() error {
n.replicationLoop(ctx)
return nil
})

return g.Wait()
}

Expand Down
1 change: 1 addition & 0 deletions partition/node_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewNode(ctx context.Context, txSystem TransactionSystem, conf *NodeConf, lo
trustBaseStore: conf.trustBaseStore,
network: conf.validatorNetwork,
lastLedgerReqTime: time.Time{},
replicationCh: make(chan replicationRequest, 1),
log: log,
}

Expand Down
107 changes: 68 additions & 39 deletions partition/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,49 +128,78 @@ func (n *Node) handleLedgerReplicationRequest(ctx context.Context, lr *replicati
return n.sendLedgerReplicationResponse(ctx, resp, lr.NodeID)
}
n.log.DebugContext(ctx, fmt.Sprintf("Preparing replication response from block %d", startBlock))
go func() {
blocks := make([]*types.Block, 0)
blockCnt := uint64(0)
dbIt := n.blockStore.Find(util.Uint64ToBytes(startBlock))
defer func() {
if err := dbIt.Close(); err != nil {
n.log.WarnContext(ctx, "closing DB iterator", logger.Error(err))
}
}()
var firstFetchedBlockNumber uint64
var lastFetchedBlockNumber uint64
var lastFetchedBlock *types.Block
for ; dbIt.Valid(); dbIt.Next() {
var bl types.Block
roundNo := util.BytesToUint64(dbIt.Key())
if err := dbIt.Value(&bl); err != nil {
n.log.WarnContext(ctx, fmt.Sprintf("Ledger replication reply incomplete, failed to read block %d", roundNo), logger.Error(err))
break
}
lastFetchedBlock = &bl
if firstFetchedBlockNumber == 0 {
firstFetchedBlockNumber = roundNo
}
lastFetchedBlockNumber = roundNo
blocks = append(blocks, lastFetchedBlock)
blockCnt++
if blockCnt >= n.conf.replicationConfig.maxReturnBlocks ||
(roundNo >= lr.EndBlockNumber && lr.EndBlockNumber > 0) {
break
}
}
select {
case n.replicationCh <- replicationRequest{ctx: ctx, req: lr}:
// worker will process
default:
resp := &replication.LedgerReplicationResponse{
UUID: lr.UUID,
Status: replication.Ok,
Blocks: blocks,
FirstBlockNumber: firstFetchedBlockNumber,
LastBlockNumber: lastFetchedBlockNumber,
UUID: lr.UUID,
Status: replication.Busy,
Message: "Too many concurrent replication requests",
}
return n.sendLedgerReplicationResponse(ctx, resp, lr.NodeID)
}
Comment on lines +131 to +141
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The replicationCh has a capacity of 1, meaning only one replication request can be queued while another is being processed. In a network with many nodes, this might lead to frequent Busy responses during recovery. Consider increasing the channel capacity or implementing a more flexible worker pool if high concurrency is expected.

return nil
}

func (n *Node) replicationLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case r := <-n.replicationCh:
n.processReplicationRequest(r.ctx, r.req)
}
if err := n.sendLedgerReplicationResponse(ctx, resp, lr.NodeID); err != nil {
n.log.WarnContext(ctx, fmt.Sprintf("Problem sending ledger replication response, %s", resp.Pretty()), logger.Error(err))
}
}

func (n *Node) processReplicationRequest(ctx context.Context, lr *replication.LedgerReplicationRequest) {
startBlock := lr.BeginBlockNumber
blocks := make([]*types.Block, 0)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is more efficient to pre-allocate the blocks slice with the maximum expected capacity to avoid multiple re-allocations during the loop.

Suggested change
blocks := make([]*types.Block, 0)
blocks := make([]*types.Block, 0, n.conf.replicationConfig.maxReturnBlocks)

blockCnt := uint64(0)
dbIt := n.blockStore.Find(util.Uint64ToBytes(startBlock))
defer func() {
if err := dbIt.Close(); err != nil {
n.log.WarnContext(ctx, "closing DB iterator", logger.Error(err))
}
}()
return nil
var firstFetchedBlockNumber uint64
var lastFetchedBlockNumber uint64
var lastFetchedBlock *types.Block
for ; dbIt.Valid(); dbIt.Next() {
select {
case <-ctx.Done():
return
default:
}
var bl types.Block
roundNo := util.BytesToUint64(dbIt.Key())
if err := dbIt.Value(&bl); err != nil {
n.log.WarnContext(ctx, fmt.Sprintf("Ledger replication reply incomplete, failed to read block %d", roundNo), logger.Error(err))
break
}
lastFetchedBlock = &bl
if firstFetchedBlockNumber == 0 {
firstFetchedBlockNumber = roundNo
}
lastFetchedBlockNumber = roundNo
blocks = append(blocks, lastFetchedBlock)
blockCnt++
if blockCnt >= n.conf.replicationConfig.maxReturnBlocks ||
(roundNo >= lr.EndBlockNumber && lr.EndBlockNumber > 0) {
break
}
}
resp := &replication.LedgerReplicationResponse{
UUID: lr.UUID,
Status: replication.Ok,
Blocks: blocks,
FirstBlockNumber: firstFetchedBlockNumber,
LastBlockNumber: lastFetchedBlockNumber,
}
if err := n.sendLedgerReplicationResponse(ctx, resp, lr.NodeID); err != nil {
n.log.WarnContext(ctx, fmt.Sprintf("Problem sending ledger replication response, %s", resp.Pretty()), logger.Error(err))
}
}

// handleLedgerReplicationResponse handles ledger replication responses from other partition nodes.
Expand Down
Loading