Skip to content
This repository was archived by the owner on Oct 21, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/AggregatorGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ export class AggregatorGateway {

private static async setupSmt(smtStorage: ISmtStorage, aggregatorServerId: string): Promise<Smt> {
const smt = new SparseMerkleTree(new DataHasherFactory(HashAlgorithm.SHA256, NodeDataHasher));
const smtWrapper = await Smt.create(smt);
const smtWrapper = new Smt(smt);

let totalLeaves = 0;
const chunkSize = 1000;
Expand Down Expand Up @@ -465,8 +465,9 @@ export class AggregatorGateway {
}));

await this.smt.addLeaves(leavesToAdd);
const rootHash = await this.smt.rootHash();

logger.info(`Updated in-memory SMT for follower node, new root hash: ${(this.smt.rootHash).toString()}`);
logger.info(`Updated in-memory SMT for follower node, new root hash: ${rootHash.toString()}`);
});

logger.info(`BlockRecords change listener initialized for server ${this.serverId}`);
Expand Down
2 changes: 1 addition & 1 deletion src/AggregatorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class AggregatorService {

public async getInclusionProof(requestId: RequestId): Promise<InclusionProof> {
const record = await this.recordStorage.get(requestId);
const merkleTreePath = this.smt.getPath(requestId.toBitString().toBigInt());
const merkleTreePath = await this.smt.getPath(requestId.toBitString().toBigInt());

if (!record) {
return new InclusionProof(merkleTreePath, null, null);
Expand Down
2 changes: 1 addition & 1 deletion src/RoundManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class RoundManager {
}

let submitHashResponse;
const rootHash = this.smt.rootHash;
const rootHash = await this.smt.rootHash();
try {
loggerWithMetadata.info(`Submitting hash to BFT: ${rootHash.toString()}...`);
submitHashResponse = await this.bftClient.submitHash(rootHash);
Expand Down
3 changes: 2 additions & 1 deletion src/router/AggregatorRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ export function setupRouter(
app.get('/health', async (req: Request, res: Response) => {
let smtRootHash: string | null = null;
try {
smtRootHash = aggregatorService ? aggregatorService.getSmt().rootHash.toString() : null;
const hash = await aggregatorService?.getSmt().rootHash();
smtRootHash = hash?.toString() ?? null;
} catch (error) {
logger.error('Error getting SMT root hash in health endpoint:', error);
}
Expand Down
121 changes: 20 additions & 101 deletions src/smt/Smt.ts
Original file line number Diff line number Diff line change
@@ -1,140 +1,59 @@
import { DataHash } from '@unicitylabs/commons/lib/hash/DataHash.js';
import { LeafInBranchError } from '@unicitylabs/commons/lib/smt/LeafInBranchError.js';
import { MerkleTreePath } from '@unicitylabs/commons/lib/smt/MerkleTreePath.js';
import { MerkleTreeRootNode } from '@unicitylabs/commons/lib/smt/MerkleTreeRootNode.js';
import { SparseMerkleTree } from '@unicitylabs/commons/lib/smt/SparseMerkleTree.js';

import logger from '../logger.js';


/**
* Wrapper for SparseMerkleTree that provides concurrency control
* using a locking mechanism to ensure sequential execution of
* asynchronous operations.
*/
export class Smt {
private smtUpdateLock: boolean = false;
private waitingPromises: Array<{
resolve: () => void;
reject: (error: Error) => void;
timer: NodeJS.Timeout;
}> = [];

// Lock timeout in milliseconds (10 seconds)
private readonly LOCK_TIMEOUT_MS = 10000;

/**
* Creates a new SMT wrapper
* @param smt The SparseMerkleTree to wrap
* @param _root SparseMerkleTreeRoot representing the current state of the tree
*/
private constructor(
private readonly smt: SparseMerkleTree,
private _root: MerkleTreeRootNode,
) {}
public constructor(private readonly smt: SparseMerkleTree) {}
Comment thread
martti007 marked this conversation as resolved.

/**
* Gets the root hash of the tree
*/
public get rootHash(): DataHash {
return this._root.hash;
}

public static async create(smt: SparseMerkleTree): Promise<Smt> {
return new Smt(smt, await smt.calculateRoot());
public async rootHash(): Promise<DataHash> {
const root = await this.smt.calculateRoot();
return root.hash;
}

/**
* Adds a leaf to the SMT with locking to prevent concurrent updates
*/
public addLeaf(path: bigint, value: Uint8Array): Promise<void> {
Comment thread
martti007 marked this conversation as resolved.
return this.withSmtLock(async () => {
await this.smt.addLeaf(path, value);
this._root = await this.smt.calculateRoot();
});
return this.smt.addLeaf(path, value);
Comment thread
martti007 marked this conversation as resolved.
}

/**
* Gets a proof path for a leaf with locking to ensure consistent view
*/
public getPath(path: bigint): MerkleTreePath {
return this._root.getPath(path);
public async getPath(path: bigint): Promise<MerkleTreePath> {
Comment thread
martti007 marked this conversation as resolved.
const root = await this.smt.calculateRoot();
return root.getPath(path);
}
Comment thread
martti007 marked this conversation as resolved.

/**
* Adds multiple leaves atomically with a single lock
*/
public addLeaves(leaves: Array<{ path: bigint; value: Uint8Array }>): Promise<void> {
return this.withSmtLock(async () => {
await Promise.all(
leaves.map((leaf) =>
this.smt.addLeaf(leaf.path, leaf.value).catch((error) => {
if (error instanceof LeafInBranchError) {
logger.warn(`Leaf already exists in tree for path ${leaf.path} - skipping`);
} else {
throw error;
}
}),
),
);

this._root = await this.smt.calculateRoot();
});
}

/**
* Acquires a lock for SMT updates with a timeout
* @returns A promise that resolves when the lock is acquired
*/
private acquireSmtLock(): Promise<void> {
if (!this.smtUpdateLock) {
this.smtUpdateLock = true;
return Promise.resolve();
}

return new Promise<void>((resolve, reject) => {
// Create a timeout that will reject the promise if the lock isn't acquired in time
const timer = setTimeout(() => {
// Remove this waiting promise from the queue
const index = this.waitingPromises.findIndex((p) => p.timer === timer);
if (index !== -1) {
this.waitingPromises.splice(index, 1);
}

reject(new Error(`SMT lock acquisition timed out after ${this.LOCK_TIMEOUT_MS}ms`));
}, this.LOCK_TIMEOUT_MS);

this.waitingPromises.push({ resolve, reject, timer });
});
}

/**
* Releases the SMT update lock and resolves the next waiting promise
*/
private releaseSmtLock(): void {
if (this.waitingPromises.length > 0) {
const next = this.waitingPromises.shift();
// Clear the timeout since we're resolving this promise
if (next) {
clearTimeout(next.timer);
next.resolve();
}
} else {
this.smtUpdateLock = false;
}
}

/**
* Executes a function while holding the SMT lock
* @param fn The function to execute with the lock held
* @returns The result of the function
*/
public async withSmtLock<T>(fn: () => Promise<T>): Promise<T> {
await this.acquireSmtLock();
try {
return await fn();
} finally {
this.releaseSmtLock();
}
public async addLeaves(leaves: Array<{ path: bigint; value: Uint8Array }>): Promise<void> {
Comment thread
martti007 marked this conversation as resolved.
await Promise.all(
leaves.map((leaf) =>
this.smt.addLeaf(leaf.path, leaf.value).catch((error) => {
if (error instanceof LeafInBranchError) {
logger.warn(`Leaf already exists in tree for path ${leaf.path} - skipping`);
} else {
throw error;
}
}),
),
);
}
}
4 changes: 2 additions & 2 deletions tests/AggregatorServiceTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ describe('AggregatorService Tests', () => {
initialBlockHash: '185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969',
},
bftClient,
await Smt.create(smt),
new Smt(smt),
{} as never,
recordStorage,
{} as never,
Expand All @@ -95,7 +95,7 @@ describe('AggregatorService Tests', () => {

aggregatorService = new AggregatorService(
roundManager,
await Smt.create(smt),
new Smt(smt),
recordStorage,
blockStorage,
blockRecordsStorage,
Expand Down
5 changes: 2 additions & 3 deletions tests/RoundManagerUnitTest.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { HashAlgorithm } from '@unicitylabs/commons/lib/hash/HashAlgorithm.js';
import { DataHasherFactory } from '@unicitylabs/commons/lib/hash/DataHasherFactory.js';
import { HashAlgorithm } from '@unicitylabs/commons/lib/hash/HashAlgorithm.js';
import { NodeDataHasher } from '@unicitylabs/commons/lib/hash/NodeDataHasher.js';
import { SparseMerkleTree } from '@unicitylabs/commons/lib/smt/SparseMerkleTree.js';
import mongoose from 'mongoose';
Expand All @@ -9,7 +9,6 @@ import { CommitmentStorage } from '../src/commitment/CommitmentStorage.js';
import { MockBftClient } from './consensus/bft/MockBftClient.js';
import { connectToSharedMongo, disconnectFromSharedMongo, generateTestCommitments, clearAllCollections } from './TestUtils.js';
import { BlockStorage } from '../src/hashchain/BlockStorage.js';
import logger from '../src/logger.js';
import { AggregatorRecordStorage } from '../src/records/AggregatorRecordStorage.js';
import { BlockRecordsStorage } from '../src/records/BlockRecordsStorage.js';
import { RoundManager } from '../src/RoundManager.js';
Expand Down Expand Up @@ -56,7 +55,7 @@ describe('Round Manager Tests', () => {
blockRecordsStorage = await BlockRecordsStorage.create('test-server');
smtStorage = new SmtStorage();
smt = new SparseMerkleTree(new DataHasherFactory(HashAlgorithm.SHA256, NodeDataHasher));
const smtWrapper = await Smt.create(smt);
const smtWrapper = new Smt(smt);

roundManager = new RoundManager(
config,
Expand Down
2 changes: 1 addition & 1 deletion tests/benchmarks/BlockCreationBenchmarkTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ describe('Block Creation Performance Benchmarks', () => {
});

beforeEach(async () => {
smt = await Smt.create(new SparseMerkleTree(new DataHasherFactory(HashAlgorithm.SHA256, NodeDataHasher)));
smt = new Smt(new SparseMerkleTree(new DataHasherFactory(HashAlgorithm.SHA256, NodeDataHasher)));
mockBftClient = new MockBftClient();

const originalSubmitHash = mockBftClient.submitHash;
Expand Down
1 change: 0 additions & 1 deletion tests/benchmarks/SmtBenchmarkTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { v4 as uuidv4 } from 'uuid';

import logger from '../../src/logger.js';


interface ISmtBenchmarkResult {
testDescription: string;
treeSize: number;
Expand Down
2 changes: 1 addition & 1 deletion tests/smt/SmtChunkedLoadingTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describe('SMT Chunked Loading Tests', () => {
logger.info('Verifying SMT root hash through round manager...');

const roundManager = gateway.getRoundManager();
const actualRootHash = roundManager.smt.rootHash;
const actualRootHash = await roundManager.smt.rootHash();

logger.info(`Actual root hash from gateway: ${actualRootHash.toString()}`);
logger.info(`Expected root hash: ${expectedRootHash.toString()}`);
Expand Down
Loading