From ee68caff4e28a752f03e59d836beb694094f0f3a Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Mon, 13 Apr 2026 12:27:46 +0300 Subject: [PATCH 1/5] git add partition/node_init_test.go --- partition/node_init_test.go | 214 ++++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 partition/node_init_test.go diff --git a/partition/node_init_test.go b/partition/node_init_test.go new file mode 100644 index 0000000..32a81cb --- /dev/null +++ b/partition/node_init_test.go @@ -0,0 +1,214 @@ +package partition + +import ( + "context" + "crypto" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/unicitynetwork/bft-core/keyvaluedb/memorydb" + "github.com/unicitynetwork/bft-core/rootchain/consensus/trustbase" + "github.com/unicitynetwork/bft-go-base/types" + "github.com/unicitynetwork/bft-go-base/util" + + testcertificates "github.com/unicitynetwork/finality-gadget/internal/testutils/certificates" + testlogger "github.com/unicitynetwork/finality-gadget/internal/testutils/logger" + testsig "github.com/unicitynetwork/finality-gadget/internal/testutils/sig" + testtrustbase "github.com/unicitynetwork/finality-gadget/internal/testutils/trustbase" + "github.com/unicitynetwork/finality-gadget/txsystem/state" +) + +type MockTransactionSystem struct { + committedUC *types.UnicityCertificate + currentState []byte +} + +func (m *MockTransactionSystem) StateSummary() (*state.Summary, error) { + return state.NewStateSummary(m.currentState, []byte{}, 0, nil), nil +} + +func (m *MockTransactionSystem) LeaderPropose(ctx context.Context, round uint64) (*state.Summary, error) { + return nil, nil +} + +func (m *MockTransactionSystem) FollowerVerify(ctx context.Context, round uint64, proposedRoot []byte) (*state.Summary, error) { + return state.NewStateSummary(proposedRoot, []byte{}, 0, nil), nil +} + +func (m *MockTransactionSystem) UpdateConfig(shardConf *types.PartitionDescriptionRecord) error { + return nil +} + +func (m *MockTransactionSystem) RestoreState(ctx context.Context, uc *types.UnicityCertificate) error { + m.committedUC = uc + m.currentState = uc.GetStateHash() + return nil +} + +func (m *MockTransactionSystem) Revert() {} + +func (m *MockTransactionSystem) Commit(uc *types.UnicityCertificate) error { + m.committedUC = uc + m.currentState = uc.GetStateHash() + return nil +} + +func (m *MockTransactionSystem) CommittedUC() *types.UnicityCertificate { + if m.committedUC == nil { + return &types.UnicityCertificate{ + Version: 1, + InputRecord: &types.InputRecord{RoundNumber: 0}, + } + } + return m.committedUC +} + +func TestNodeStartupAndRecoveryEquivalence(t *testing.T) { + log := testlogger.New(t) + + keyConf, nodeInfo := createKeyConf(t) + shardConf := &types.PartitionDescriptionRecord{ + Version: 1, + NetworkID: 3, + PartitionID: 3, + ShardID: types.ShardID{}, + PartitionTypeID: 3, + TypeIDLen: 8, + UnitIDLen: 256, + T2Timeout: 2500 * time.Millisecond, + Epoch: 0, + EpochStart: 1, + Validators: []*types.NodeInfo{nodeInfo}, + } + signer, _ := testsig.CreateSignerAndVerifier(t) + trustBase := testtrustbase.NewTrustBase(t, signer) + + shardDB := memorydb.New() + shardConfStore, err := NewShardConfStore(shardDB, log) + require.NoError(t, err) + require.NoError(t, shardConfStore.Store(shardConf)) + + trustBaseStore, err := trustbase.NewTrustBaseStore(memorydb.New(), log) + require.NoError(t, err) + require.NoError(t, trustBaseStore.Store(trustBase)) + + // Create 3 historical blocks + var blocks []*types.Block + prevBlockHash := []byte("genesis") + prevStateHash := []byte(nil) + for i := 1; i <= 3; i++ { + h := &types.Header{ + Version: 1, + PartitionID: 3, + PreviousBlockHash: prevBlockHash, + ProposerID: nodeInfo.NodeID, + } + txs := make([]*types.TransactionRecord, 0) + stateHash := []byte{byte(i)} + + blockHash, err := types.BlockHash(crypto.SHA256, h, []*types.TransactionRecord{}, stateHash, prevStateHash) + require.NoError(t, err) + + ir := &types.InputRecord{ + RoundNumber: uint64(i), + Hash: stateHash, + PreviousHash: prevStateHash, + BlockHash: blockHash, + Epoch: 0, + SummaryValue: []byte{}, + ETHash: []byte{}, + Timestamp: uint64(time.Now().UnixMilli()), + } + + uc := testcertificates.CreateUnicityCertificate(t, signer, ir, shardConf, uint64(i), nil, make([]byte, 32)) + ucBytes, err := types.Cbor.Marshal(uc) + require.NoError(t, err) + + b := &types.Block{ + Header: h, + Transactions: txs, + UnicityCertificate: ucBytes, + } + blocks = append(blocks, b) + + prevBlockHash = uc.GetBlockHash() + prevStateHash = uc.GetStateHash() + } + + // 1. Simulate Startup Initialization (Fast-forward via blockStore.Last) + blockDBInit := memorydb.New() + for i, b := range blocks { + err := blockDBInit.Write(util.Uint64ToBytes(uint64(i+1)), b) + require.NoError(t, err) + } + + confInit, err := NewNodeConf(keyConf, shardConfStore, trustBaseStore, + WithBlockDB(blockDBInit), + WithUnicityCertificateValidator(&AlwaysValidCertificateValidator{}), + ) + require.NoError(t, err) + + txSystemInit := &MockTransactionSystem{currentState: []byte("genesis")} + nodeInit := &Node{ + conf: confInit, + transactionSystem: txSystemInit, + blockStore: confInit.blockDB, + state: &ConsensusState{status: initializing}, + shardConfStore: confInit.shardConfStore, + trustBaseStore: confInit.trustBaseStore, + log: log, + } + err = nodeInit.initState(context.Background()) + require.NoError(t, err) + + require.Equal(t, uint64(3), txSystemInit.CommittedUC().GetRoundNumber()) + require.Equal(t, []byte{3}, txSystemInit.currentState) + + // 2. Simulate Network Recovery (Empty DB initially, catch up via handleBlock) + blockDBRec := memorydb.New() + confRec, err := NewNodeConf(keyConf, shardConfStore, trustBaseStore, + WithBlockDB(blockDBRec), + WithUnicityCertificateValidator(&AlwaysValidCertificateValidator{}), + ) + require.NoError(t, err) + + // Initial UC (round 0) + initialUC := &types.UnicityCertificate{ + Version: 1, + InputRecord: &types.InputRecord{ + RoundNumber: 0, + BlockHash: []byte("genesis"), + }, + } + txSystemRec := &MockTransactionSystem{ + currentState: []byte(nil), + committedUC: initialUC, + } + + nodeRec := &Node{ + conf: confRec, + transactionSystem: txSystemRec, + blockStore: confRec.blockDB, + state: &ConsensusState{status: recovering}, + shardConfStore: confRec.shardConfStore, + trustBaseStore: confRec.trustBaseStore, + log: log, + } + nodeRec.state.fuc = initialUC + nodeRec.state.luc = initialUC + nodeRec.shardConf.Store(shardConf) + + // Apply blocks sequentially + for _, b := range blocks { + err := nodeRec.handleBlock(context.Background(), b) + require.NoError(t, err) + } + + require.Equal(t, uint64(3), txSystemRec.CommittedUC().GetRoundNumber()) + require.Equal(t, []byte{3}, txSystemRec.currentState) + + // Compare that the end states of transaction systems are identical + require.Equal(t, txSystemInit.CommittedUC().GetRoundNumber(), txSystemRec.CommittedUC().GetRoundNumber()) + require.Equal(t, txSystemInit.currentState, txSystemRec.currentState) +} From eb5aa9d863b0650eaf595c7347fb6837521bb9b1 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Mon, 13 Apr 2026 13:58:59 +0300 Subject: [PATCH 2/5] improve reliability of ipc_client.go --- pow/ipc_client.go | 85 +++++++++++++++-------------------------------- 1 file changed, 27 insertions(+), 58 deletions(-) diff --git a/pow/ipc_client.go b/pow/ipc_client.go index 6b83a36..1fbe79e 100644 --- a/pow/ipc_client.go +++ b/pow/ipc_client.go @@ -8,7 +8,8 @@ import ( "io" "net" "net/http" - "regexp" + "sync/atomic" + "time" "github.com/unicitynetwork/finality-gadget/pow/types" ) @@ -20,6 +21,7 @@ import ( type IPCClient struct { socketPath string httpClient *http.Client + nextID atomic.Int64 } // NewIPCClient creates a new client that connects to the PoW node via a Unix socket. @@ -27,10 +29,12 @@ func NewIPCClient(socketPath string) *IPCClient { return &IPCClient{ socketPath: socketPath, httpClient: &http.Client{ + Timeout: 5 * time.Second, // Configure the HTTP client to dial a unix socket instead of tcp Transport: &http.Transport{ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return net.Dial("unix", socketPath) + var d net.Dialer + return d.DialContext(ctx, "unix", socketPath) }, }, }, @@ -52,48 +56,46 @@ type jsonRpcResponse struct { ID int `json:"id"` } -// callRaw sends a JSON-RPC request and returns the raw response body. -func (c *IPCClient) callRaw(ctx context.Context, method string, params []interface{}) ([]byte, error) { +// call sends a JSON-RPC request to the PoW node. +func (c *IPCClient) call(ctx context.Context, method string, params []interface{}, result interface{}) error { + reqID := int(c.nextID.Add(1)) reqBody := &jsonRpcRequest{ JSONRPC: "2.0", Method: method, Params: params, - ID: 1, + ID: reqID, } payload, err := json.Marshal(reqBody) if err != nil { - return nil, fmt.Errorf("failed to marshal request: %w", err) + return fmt.Errorf("failed to marshal request: %w", err) } // The URL doesn't strictly matter since the dialer forces the unix socket req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost/", bytes.NewReader(payload)) if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) + return fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/json") resp, err := c.httpClient.Do(req) if err != nil { - return nil, fmt.Errorf("rpc call failed: %w", err) + return fmt.Errorf("rpc call failed: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected http status: %s", resp.Status) + return fmt.Errorf("unexpected http status: %s", resp.Status) } if resp.Body == nil { - return []byte{}, nil + return nil } - return io.ReadAll(resp.Body) -} -// call sends a JSON-RPC request to the PoW node. -func (c *IPCClient) call(ctx context.Context, method string, params []interface{}, result interface{}) error { - bodyBytes, err := c.callRaw(ctx, method, params) + // Limit response size to 5MB to prevent OOM + bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 5*1024*1024)) if err != nil { - return err + return fmt.Errorf("failed to read response body: %w", err) } var rpcResp jsonRpcResponse @@ -101,6 +103,11 @@ func (c *IPCClient) call(ctx context.Context, method string, params []interface{ return fmt.Errorf("failed to decode response: %w. Response body: %s", err, string(bodyBytes)) } + // TODO PoW node always returns ID=0 + //if rpcResp.ID != reqID { + // return fmt.Errorf("rpc response ID mismatch: expected %d, got %d", reqID, rpcResp.ID) + //} + if rpcResp.Error != nil { return fmt.Errorf("rpc error: %v", rpcResp.Error) } @@ -117,24 +124,9 @@ func (c *IPCClient) call(ctx context.Context, method string, params []interface{ // GetTip returns the current tip of the active PoW chain. func (c *IPCClient) GetTip(ctx context.Context) (*types.BlockHeader, error) { // 1. Call getbestblockhash to get the hash of the current tip - bodyBytes, err := c.callRaw(ctx, "getbestblockhash", []interface{}{}) - if err != nil { - return nil, fmt.Errorf("failed to get best block hash: %w", err) - } - bodyBytes = fixUnquotedResultString(bodyBytes) - - var rpcResp jsonRpcResponse - if err := json.Unmarshal(bodyBytes, &rpcResp); err != nil { - return nil, fmt.Errorf("failed to decode getbestblockhash response: %w. Response body: %s", err, string(bodyBytes)) - } - - if rpcResp.Error != nil { - return nil, fmt.Errorf("getbestblockhash rpc error: %v", rpcResp.Error) - } - var hash string - if err := json.Unmarshal(rpcResp.Result, &hash); err != nil { - return nil, fmt.Errorf("failed to unmarshal best block hash: %w", err) + if err := c.call(ctx, "getbestblockhash", []interface{}{}, &hash); err != nil { + return nil, fmt.Errorf("failed to get best block hash: %w", err) } // 2. Call getblockheader to retrieve the block's details @@ -167,24 +159,9 @@ func (c *IPCClient) GetChainTips(ctx context.Context) ([]types.ChainTip, error) // GetBlockHeaderByHeight returns the block at a specific height in the active chain. func (c *IPCClient) GetBlockHeaderByHeight(ctx context.Context, height uint64) (*types.BlockHeader, error) { // 1. Get the hash of the block at this height - bodyBytes, err := c.callRaw(ctx, "getblockhash", []interface{}{height}) - if err != nil { - return nil, fmt.Errorf("failed to get block hash for height %d: %w", height, err) - } - bodyBytes = fixUnquotedResultString(bodyBytes) - - var rpcResp jsonRpcResponse - if err := json.Unmarshal(bodyBytes, &rpcResp); err != nil { - return nil, fmt.Errorf("failed to decode getblockhash response: %w. Response body: %s", err, string(bodyBytes)) - } - - if rpcResp.Error != nil { - return nil, fmt.Errorf("getblockhash rpc error: %v", rpcResp.Error) - } - var hash string - if err := json.Unmarshal(rpcResp.Result, &hash); err != nil { - return nil, fmt.Errorf("failed to unmarshal block hash: %w", err) + if err := c.call(ctx, "getblockhash", []interface{}{height}, &hash); err != nil { + return nil, fmt.Errorf("failed to get block hash for height %d: %w", height, err) } // 2. Fetch the block info using the hash @@ -195,11 +172,3 @@ func (c *IPCClient) GetBlockHeaderByHeight(ctx context.Context, height uint64) ( return block, nil } - -var unquotedHexResultRe = regexp.MustCompile(`("result"\s*:\s*)([a-fA-F0-9]+)(\s*[,}])`) - -// fixUnquotedResultString works around a bug in the PoW C++ node where -// certain RPCs return an unquoted string, e.g. {"result":ead522...,"error":null,"id":0} -func fixUnquotedResultString(bodyBytes []byte) []byte { - return unquotedHexResultRe.ReplaceAll(bodyBytes, []byte(`${1}"${2}"${3}`)) -} From d92ad73a38a5d6acf98fc0105859471755bc3b26 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Mon, 13 Apr 2026 14:30:29 +0300 Subject: [PATCH 3/5] add usage example to readme --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index 3360fd5..1c5043e 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,11 @@ The FGP is a partition under Unicity BFT. It is a headers-only partition (with no units or transactions, and no state or block data other than the hash of the PoW block being finalized). +The finality gadget operates by periodically querying the underlying Proof-of-Work (PoW) `unicity-node` to fetch the +current best block header. By default, every 2.4 hours, the FGP leader proposes a new state transition that includes the +hash of this PoW block. Once certified by the BFT partition via a Unicity Certificate (UC), this PoW block is considered +finalized and added to the partition state, preventing deep reorgs on the base layer. + See Sec 6.3 Finality Gadget of the YP for more details. ## Prerequisites @@ -68,3 +73,19 @@ Or via Docker: ```bash docker run unicity-fgp:local --help ``` + +### Example Run Command + +The following is an example of how to start an FGP node locally. By default, the node assumes the underlying PoW socket +is located at `$HOME/.unicity/node.sock` (configurable via `--pow-socket-path`). + +```bash +build/fgp run \ + --home "test-nodes/fgp1" \ + --trust-base test-nodes/trust-base.json \ + --shard-conf test-nodes/shard-conf-3_0.json \ + --address "/ip4/127.0.0.1/tcp/30666" \ + --bootnodes "/ip4/127.0.0.1/tcp/26662/p2p/16Uiu2HAmQMpRWSCskWgsHnAPqHCcUHqe9hHS3Sxta3ziAsz7yU1h" \ + --log-format text \ + --log-level info +``` From 61a9cb310389a9e5c93db19ac165c912be264a22 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Mon, 13 Apr 2026 14:34:37 +0300 Subject: [PATCH 4/5] consolidate T1 timeout handling --- partition/node.go | 8 ++++++-- partition/node_init.go | 1 - partition/round.go | 40 +++++++++++++++++++++++++++++----------- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/partition/node.go b/partition/node.go index 6f349f8..b5cdfae 100644 --- a/partition/node.go +++ b/partition/node.go @@ -60,8 +60,9 @@ type ( } roundTimer struct { - stop atomic.Value - event chan struct{} + isRunning atomic.Bool + cancel context.CancelFunc + event chan struct{} } // Node represents a member in the partition and implements an instance of a specific TransactionSystem. @@ -151,6 +152,9 @@ func (n *Node) loop(ctx context.Context) error { case <-ticker.C: n.handleMonitoring(ctx, lastUCReceived, lastBlockReceived) } + + // central location to manage T1 timeout + n.ensureT1TimerState(ctx) } } diff --git a/partition/node_init.go b/partition/node_init.go index 7aca686..553589e 100644 --- a/partition/node_init.go +++ b/partition/node_init.go @@ -40,7 +40,6 @@ func NewNode(ctx context.Context, txSystem TransactionSystem, conf *NodeConf, lo lastLedgerReqTime: time.Time{}, log: log, } - n.timer.stop.Store(func() {}) shardConf, err := n.shardConfStore.GetFirst() if err != nil { diff --git a/partition/round.go b/partition/round.go index 6b20f59..c70809d 100644 --- a/partition/round.go +++ b/partition/round.go @@ -9,25 +9,45 @@ import ( func (n *Node) startNewRound(ctx context.Context) error { n.resetProposal() - n.startT1Timer(ctx) // not a fatal issue, but log anyway n.deletePendingProposal(ctx) return nil } -func (n *Node) startT1Timer(ctx context.Context) { - // stop existing timer - if stopFunc, ok := n.timer.stop.Load().(func()); ok && stopFunc != nil { - stopFunc() +func (n *Node) ensureT1TimerState(ctx context.Context) { + isLeader := n.state.leader == n.peer.ID() + shouldBeRunning := isLeader && n.state.status != recovering + isRunning := n.timer.isRunning.Load() + + if shouldBeRunning && !isRunning { + n.startT1Timer(ctx) + } else if !shouldBeRunning && isRunning { + n.stopT1Timer() } +} + +func (n *Node) stopT1Timer() { + if n.timer.cancel != nil { + n.timer.cancel() + n.timer.cancel = nil + } + n.timer.isRunning.Store(false) +} + +func (n *Node) startT1Timer(ctx context.Context) { + n.stopT1Timer() txCtx, txCancel := context.WithCancel(ctx) - n.timer.stop.Store(func() { txCancel() }) + n.timer.cancel = txCancel + n.timer.isRunning.Store(true) go func() { select { case <-time.After(n.conf.t1Timeout): + // Mark as not running so that ensureT1TimerState can restart the timer + n.timer.isRunning.Store(false) + // Rather than call handleT1TimeoutEvent directly send signal to main // loop - helps to avoid concurrency issues with (repeat) UC handling. select { @@ -40,11 +60,6 @@ func (n *Node) startT1Timer(ctx context.Context) { } func (n *Node) handleT1TimeoutEvent(ctx context.Context) { - if stopFunc, ok := n.timer.stop.Load().(func()); ok && stopFunc != nil { - stopFunc() - } - n.timer.stop.Store(func() {}) - if n.state.status == recovering { n.log.InfoContext(ctx, "T1 timeout: node is recovering") return @@ -67,10 +82,13 @@ func (n *Node) handleT1TimeoutEvent(ctx context.Context) { if err := n.sendBlockProposal(ctx, stateSummary.Root()); err != nil { n.log.WarnContext(ctx, "Failed to send BlockProposal", logger.Error(err)) + n.transactionSystem.Revert() return } if err := n.sendCertificationRequest(ctx, n.peer.ID().String(), stateSummary); err != nil { n.log.WarnContext(ctx, "Failed to send certification request", logger.Error(err)) + n.transactionSystem.Revert() + return } } From 8b578d952adcebdacea9ad5a81bcb474599c4522 Mon Sep 17 00:00:00 2001 From: Lennart Ploom Date: Mon, 13 Apr 2026 17:22:38 +0300 Subject: [PATCH 5/5] use http.MaxBytesReader over io.LimitReader --- pow/ipc_client.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pow/ipc_client.go b/pow/ipc_client.go index 1fbe79e..58cbf7e 100644 --- a/pow/ipc_client.go +++ b/pow/ipc_client.go @@ -92,8 +92,11 @@ func (c *IPCClient) call(ctx context.Context, method string, params []interface{ return nil } - // Limit response size to 5MB to prevent OOM - bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 5*1024*1024)) + // wrap resp.Body with MaxBytesReader to limit response size + resp.Body = http.MaxBytesReader(nil, resp.Body, 5*1024*1024) + + // read all bytes to be able to log the response if error occurs + bodyBytes, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("failed to read response body: %w", err) }