From 328c4b513f08e565c76f28e8c07ff89a4cb22204 Mon Sep 17 00:00:00 2001 From: Vlad <53381472+VladChernenko@users.noreply.github.com> Date: Sun, 30 Nov 2025 04:58:50 +0100 Subject: [PATCH] Adjust leader rotation proof eligibility --- entrypoint.go | 7 +- handlers/handlers.go | 7 + threads/finalization_thread.go | 423 +++++++++++++++++++++++++++++++++ websocket_pack/routes.go | 150 ++++++------ 4 files changed, 516 insertions(+), 71 deletions(-) create mode 100644 threads/finalization_thread.go diff --git a/entrypoint.go b/entrypoint.go index ecc0780..6cbcae0 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -44,10 +44,13 @@ func RunBlockchain() { //✅ 4.Start a separate thread to work with voting for blocks in a sync way (for security) go threads.LeaderRotationThread() - //✅ 5.Logical thread to build the temporary sequence of blocks to execute them (prepare for execution thread) + //✅ 5.Start finalization thread to aggregate leader rotation proofs + go threads.FinalizationThread() + + //✅ 6.Logical thread to build the temporary sequence of blocks to execute them (prepare for execution thread) go threads.SequenceAlignmentThread() - //✅ 6.Start execution process - take blocks and execute transactions + //✅ 7.Start execution process - take blocks and execute transactions go threads.ExecutionThread() //___________________ RUN SERVERS - WEBSOCKET AND HTTP __________________ diff --git a/handlers/handlers.go b/handlers/handlers.go index 9202375..1a0b185 100644 --- a/handlers/handlers.go +++ b/handlers/handlers.go @@ -18,6 +18,13 @@ var APPROVEMENT_THREAD_METADATA = struct { }, } +var FINALIZATION_THREAD_CACHE = struct { + RWMutex sync.RWMutex + EpochHandlers map[int]structures.EpochDataHandler +}{ + EpochHandlers: make(map[int]structures.EpochDataHandler), +} + var EXECUTION_THREAD_METADATA = struct { RWMutex sync.RWMutex Handler structures.ExecutionThreadMetadataHandler diff --git a/threads/finalization_thread.go b/threads/finalization_thread.go new file mode 100644 index 0000000..125f952 --- /dev/null +++ b/threads/finalization_thread.go @@ -0,0 +1,423 @@ +package threads + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/modulrcloud/modulr-core/cryptography" + "github.com/modulrcloud/modulr-core/databases" + "github.com/modulrcloud/modulr-core/handlers" + "github.com/modulrcloud/modulr-core/structures" + "github.com/modulrcloud/modulr-core/utils" + "github.com/modulrcloud/modulr-core/websocket_pack" + + "github.com/gorilla/websocket" +) + +type leaderRotationCache struct { + AfpForFirstBlock structures.AggregatedFinalizationProof + SkipData structures.LeaderVotingStat + Proofs map[string]string +} + +type epochRotationState struct { + caches map[string]*leaderRotationCache + wsConns map[string]*websocket.Conn + waiter *utils.QuorumWaiter +} + +var ( + leaderRotationMutex = sync.Mutex{} + leaderRotationStates = make(map[int]*epochRotationState) + processingEpochId = -1 + defaultHash = structures.NewLeaderVotingStatTemplate().Hash + finalizationProgressKey = []byte("ALRP_PROGRESS") +) + +func FinalizationThread() { + + for { + + if processingEpochId == -1 { + processingEpochId = loadFinalizationProgress() + } + + processingHandler := getOrLoadEpochHandler(processingEpochId) + networkParams := getNetworkParameters() + + if processingHandler == nil { + time.Sleep(200 * time.Millisecond) + continue + } + + state := ensureLeaderRotationState(processingHandler) + majority := utils.GetQuorumMajority(processingHandler) + + for _, leaderIndex := range leadersReadyForAlrp(processingHandler, &networkParams) { + tryCollectLeaderRotationProofs(processingHandler, leaderIndex, majority, state) + } + + if allLeaderRotationProofsCollected(processingHandler) { + processingEpochId++ + persistFinalizationProgress(processingEpochId) + } + + time.Sleep(200 * time.Millisecond) + } +} + +func ensureLeaderRotationState(epochHandler *structures.EpochDataHandler) *epochRotationState { + + leaderRotationMutex.Lock() + defer leaderRotationMutex.Unlock() + + if state, ok := leaderRotationStates[epochHandler.Id]; ok { + return state + } + + state := &epochRotationState{ + caches: make(map[string]*leaderRotationCache), + wsConns: make(map[string]*websocket.Conn), + waiter: utils.NewQuorumWaiter(len(epochHandler.Quorum)), + } + + utils.OpenWebsocketConnectionsWithQuorum(epochHandler.Quorum, state.wsConns) + leaderRotationStates[epochHandler.Id] = state + + return state +} + +func getOrLoadEpochHandler(epochId int) *structures.EpochDataHandler { + + handlers.FINALIZATION_THREAD_CACHE.RWMutex.RLock() + handler, ok := handlers.FINALIZATION_THREAD_CACHE.EpochHandlers[epochId] + handlers.FINALIZATION_THREAD_CACHE.RWMutex.RUnlock() + + if ok { + h := handler + return &h + } + + key := []byte("EPOCH_HANDLER:" + strconv.Itoa(epochId)) + raw, err := databases.EPOCH_DATA.Get(key, nil) + if err != nil { + return nil + } + + var loaded structures.EpochDataHandler + if json.Unmarshal(raw, &loaded) != nil { + return nil + } + + handlers.FINALIZATION_THREAD_CACHE.RWMutex.Lock() + handlers.FINALIZATION_THREAD_CACHE.EpochHandlers[epochId] = loaded + handlers.FINALIZATION_THREAD_CACHE.RWMutex.Unlock() + + return &loaded +} + +func loadFinalizationProgress() int { + + if raw, err := databases.FINALIZATION_VOTING_STATS.Get(finalizationProgressKey, nil); err == nil { + if idx, convErr := strconv.Atoi(string(raw)); convErr == nil { + return idx + } + } + + return 0 +} + +func persistFinalizationProgress(epochId int) { + + _ = databases.FINALIZATION_VOTING_STATS.Put(finalizationProgressKey, []byte(strconv.Itoa(epochId)), nil) +} + +func getNetworkParameters() structures.NetworkParameters { + + handlers.APPROVEMENT_THREAD_METADATA.RWMutex.RLock() + params := handlers.APPROVEMENT_THREAD_METADATA.Handler.NetworkParameters + handlers.APPROVEMENT_THREAD_METADATA.RWMutex.RUnlock() + + return params +} + +func leadersReadyForAlrp(epochHandler *structures.EpochDataHandler, networkParams *structures.NetworkParameters) []int { + + ready := make([]int, 0) + + for idx := range epochHandler.LeadersSequence { + + if leaderHasAlrp(epochHandler.Id, epochHandler.LeadersSequence[idx]) { + continue + } + + leaderFinished := idx < epochHandler.CurrentLeaderIndex || + (idx == epochHandler.CurrentLeaderIndex && leaderTimeIsOut(epochHandler, networkParams, idx)) + + if leaderFinished { + ready = append(ready, idx) + } + } + + return ready +} + +func leaderHasAlrp(epochId int, leader string) bool { + + key := []byte(fmt.Sprintf("ALRP:%d:%s", epochId, leader)) + _, err := databases.FINALIZATION_VOTING_STATS.Get(key, nil) + return err == nil +} + +func allLeaderRotationProofsCollected(epochHandler *structures.EpochDataHandler) bool { + + for _, leader := range epochHandler.LeadersSequence { + if !leaderHasAlrp(epochHandler.Id, leader) { + return false + } + } + + return true +} + +func leaderTimeIsOut(epochHandler *structures.EpochDataHandler, networkParams *structures.NetworkParameters, leaderIndex int) bool { + + leadershipTimeframe := networkParams.LeadershipDuration + return utils.GetUTCTimestampInMilliSeconds() >= int64(epochHandler.StartTimestamp)+(int64(leaderIndex)+1)*leadershipTimeframe +} + +func tryCollectLeaderRotationProofs(epochHandler *structures.EpochDataHandler, leaderIndex, majority int, state *epochRotationState) { + + leaderPubKey := epochHandler.LeadersSequence[leaderIndex] + epochFullID := epochHandler.Hash + "#" + strconv.Itoa(epochHandler.Id) + + if leaderHasAlrp(epochHandler.Id, leaderPubKey) || state.waiter == nil { + return + } + + cache := ensureLeaderRotationCache(state, epochHandler.Id, leaderPubKey) + + request := websocket_pack.WsLeaderRotationProofRequest{ + Route: "get_leader_rotation_proof", + IndexOfLeaderToRotate: leaderIndex, + AfpForFirstBlock: cache.AfpForFirstBlock, + SkipData: cache.SkipData, + } + + message, err := json.Marshal(request) + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + responses, ok := state.waiter.SendAndWait(ctx, message, epochHandler.Quorum, state.wsConns, majority) + if !ok { + return + } + + for _, raw := range responses { + handleLeaderRotationResponse(raw, epochHandler, leaderPubKey, epochFullID, state) + } + + leaderRotationMutex.Lock() + proofsCount := len(cache.Proofs) + leaderRotationMutex.Unlock() + + if proofsCount >= majority { + persistAggregatedLeaderRotationProof(cache, epochHandler.Id, leaderPubKey) + } +} + +func ensureLeaderRotationCache(state *epochRotationState, epochId int, leaderPubKey string) *leaderRotationCache { + + key := fmt.Sprintf("%d:%s", epochId, leaderPubKey) + + leaderRotationMutex.Lock() + defer leaderRotationMutex.Unlock() + + if cache, ok := state.caches[key]; ok { + return cache + } + + cache := &leaderRotationCache{ + SkipData: loadLeaderSkipData(epochId, leaderPubKey), + Proofs: make(map[string]string), + } + + cache.AfpForFirstBlock = loadAfpForFirstBlock(epochId, leaderPubKey) + + state.caches[key] = cache + + return cache +} + +func loadLeaderSkipData(epochId int, leaderPubKey string) structures.LeaderVotingStat { + + skipData := structures.NewLeaderVotingStatTemplate() + key := []byte(fmt.Sprintf("%d:%s", epochId, leaderPubKey)) + + if raw, err := databases.FINALIZATION_VOTING_STATS.Get(key, nil); err == nil { + _ = json.Unmarshal(raw, &skipData) + } + + return skipData +} + +func loadAfpForFirstBlock(epochId int, leaderPubKey string) structures.AggregatedFinalizationProof { + + var afp structures.AggregatedFinalizationProof + key := []byte("AFP:" + fmt.Sprintf("%d:%s:0", epochId, leaderPubKey)) + + if raw, err := databases.EPOCH_DATA.Get(key, nil); err == nil { + _ = json.Unmarshal(raw, &afp) + } + + return afp +} + +func handleLeaderRotationResponse(raw []byte, epochHandler *structures.EpochDataHandler, leaderPubKey, epochFullID string, state *epochRotationState) { + + var statusHolder map[string]any + + if err := json.Unmarshal(raw, &statusHolder); err != nil { + return + } + + status, ok := statusHolder["status"].(string) + if !ok { + return + } + + switch status { + case "OK": + var response websocket_pack.WsLeaderRotationProofResponseOk + if json.Unmarshal(raw, &response) == nil { + handleLeaderRotationOk(response, epochHandler, leaderPubKey, epochFullID, state) + } + case "UPGRADE": + var response websocket_pack.WsLeaderRotationProofResponseUpgrade + if json.Unmarshal(raw, &response) == nil { + handleLeaderRotationUpgrade(response, epochHandler, leaderPubKey, state) + } + } +} + +func handleLeaderRotationOk(response websocket_pack.WsLeaderRotationProofResponseOk, epochHandler *structures.EpochDataHandler, leaderPubKey, epochFullID string, state *epochRotationState) { + + if response.ForLeaderPubkey != leaderPubKey { + return + } + + cache := ensureLeaderRotationCache(state, epochHandler.Id, leaderPubKey) + firstBlockHash := defaultHash + + if cache.SkipData.Index >= 0 && cache.AfpForFirstBlock.BlockHash != "" { + firstBlockHash = cache.AfpForFirstBlock.BlockHash + } + + dataToVerify := strings.Join([]string{"LEADER_ROTATION_PROOF", leaderPubKey, firstBlockHash, strconv.Itoa(cache.SkipData.Index), cache.SkipData.Hash, epochFullID}, ":") + + quorumMap := make(map[string]bool) + for _, pk := range epochHandler.Quorum { + quorumMap[strings.ToLower(pk)] = true + } + + if cryptography.VerifySignature(dataToVerify, response.Voter, response.Sig) { + lowered := strings.ToLower(response.Voter) + leaderRotationMutex.Lock() + if quorumMap[lowered] { + cache.Proofs[response.Voter] = response.Sig + } + leaderRotationMutex.Unlock() + } +} + +func handleLeaderRotationUpgrade(response websocket_pack.WsLeaderRotationProofResponseUpgrade, epochHandler *structures.EpochDataHandler, leaderPubKey string, state *epochRotationState) { + + if response.ForLeaderPubkey != leaderPubKey { + return + } + + if !validateUpgradePayload(response, epochHandler, leaderPubKey) { + return + } + + cache := ensureLeaderRotationCache(state, epochHandler.Id, leaderPubKey) + + leaderRotationMutex.Lock() + cache.SkipData = response.SkipData + cache.AfpForFirstBlock = response.AfpForFirstBlock + cache.Proofs = make(map[string]string) + leaderRotationMutex.Unlock() +} + +func validateUpgradePayload(response websocket_pack.WsLeaderRotationProofResponseUpgrade, epochHandler *structures.EpochDataHandler, leaderPubKey string) bool { + + if response.SkipData.Index >= 0 { + + parts := strings.Split(response.SkipData.Afp.BlockId, ":") + + if len(parts) != 3 { + return false + } + + indexFromId, err := strconv.Atoi(parts[2]) + if err != nil || indexFromId != response.SkipData.Index || parts[0] != strconv.Itoa(epochHandler.Id) || parts[1] != leaderPubKey { + return false + } + + if response.SkipData.Hash != response.SkipData.Afp.BlockHash { + return false + } + + if !utils.VerifyAggregatedFinalizationProof(&response.SkipData.Afp, epochHandler) { + return false + } + } + + if response.AfpForFirstBlock.BlockId != "" { + + parts := strings.Split(response.AfpForFirstBlock.BlockId, ":") + + if len(parts) != 3 || parts[0] != strconv.Itoa(epochHandler.Id) || parts[1] != leaderPubKey || parts[2] != "0" { + return false + } + + if !utils.VerifyAggregatedFinalizationProof(&response.AfpForFirstBlock, epochHandler) { + return false + } + } + + return true +} + +func persistAggregatedLeaderRotationProof(cache *leaderRotationCache, epochId int, leaderPubKey string) { + + leaderRotationMutex.Lock() + defer leaderRotationMutex.Unlock() + + firstBlockHash := defaultHash + + if cache.SkipData.Index >= 0 && cache.AfpForFirstBlock.BlockHash != "" { + firstBlockHash = cache.AfpForFirstBlock.BlockHash + } + + aggregated := structures.AggregatedLeaderRotationProof{ + FirstBlockHash: firstBlockHash, + SkipIndex: cache.SkipData.Index, + SkipHash: cache.SkipData.Hash, + Proofs: cache.Proofs, + } + + key := []byte(fmt.Sprintf("ALRP:%d:%s", epochId, leaderPubKey)) + if value, err := json.Marshal(aggregated); err == nil { + _ = databases.FINALIZATION_VOTING_STATS.Put(key, value, nil) + } +} diff --git a/websocket_pack/routes.go b/websocket_pack/routes.go index f80210c..7741c00 100644 --- a/websocket_pack/routes.go +++ b/websocket_pack/routes.go @@ -211,47 +211,46 @@ func GetLeaderRotationProof(parsedRequest WsLeaderRotationProofRequest, connecti } handlers.APPROVEMENT_THREAD_METADATA.RWMutex.RLock() + currentApprovementHandler := handlers.APPROVEMENT_THREAD_METADATA.Handler + handlers.APPROVEMENT_THREAD_METADATA.RWMutex.RUnlock() - defer handlers.APPROVEMENT_THREAD_METADATA.RWMutex.RUnlock() + epochIndex := detectEpochIndexForRotation(parsedRequest, currentApprovementHandler.EpochDataHandler.Id) + epochHandler := getEpochHandlerForRotation(epochIndex) + if epochHandler == nil || parsedRequest.IndexOfLeaderToRotate >= len(epochHandler.LeadersSequence) { + return + } - epochHandler := &handlers.APPROVEMENT_THREAD_METADATA.Handler.EpochDataHandler + epochMatchesApprovement := currentApprovementHandler.EpochDataHandler.Id == epochHandler.Id + epochAlreadyPassed := currentApprovementHandler.EpochDataHandler.Id > epochHandler.Id + rotationAllowed := epochAlreadyPassed || (epochMatchesApprovement && parsedRequest.IndexOfLeaderToRotate < currentApprovementHandler.EpochDataHandler.CurrentLeaderIndex) - epochIndex := epochHandler.Id + if !rotationAllowed { + return + } epochFullID := epochHandler.Hash + "#" + strconv.Itoa(epochIndex) - leaderToRotate := epochHandler.LeadersSequence[parsedRequest.IndexOfLeaderToRotate] + maxRotatedLeaderIndex := epochHandler.CurrentLeaderIndex + if epochAlreadyPassed { + maxRotatedLeaderIndex = len(epochHandler.LeadersSequence) + } else if epochMatchesApprovement { + maxRotatedLeaderIndex = currentApprovementHandler.EpochDataHandler.CurrentLeaderIndex + } - if epochHandler.CurrentLeaderIndex > parsedRequest.IndexOfLeaderToRotate { - + if maxRotatedLeaderIndex > parsedRequest.IndexOfLeaderToRotate { localVotingData := structures.NewLeaderVotingStatTemplate() - localVotingDataRaw, err := databases.FINALIZATION_VOTING_STATS.Get([]byte(strconv.Itoa(epochIndex)+":"+leaderToRotate), nil) - if err == nil { - json.Unmarshal(localVotingDataRaw, &localVotingData) - } - propSkipData := parsedRequest.SkipData - if localVotingData.Index > propSkipData.Index { - - // Try to return with AFP for the first block - firstBlockID := strconv.Itoa(epochHandler.Id) + ":" + leaderToRotate + ":0" - afpForFirstBlockBytes, err := databases.EPOCH_DATA.Get([]byte("AFP:"+firstBlockID), nil) - if err == nil { - var afpForFirstBlock structures.AggregatedFinalizationProof - err := json.Unmarshal(afpForFirstBlockBytes, &afpForFirstBlock) - if err == nil { - responseData := WsLeaderRotationProofResponseUpgrade{ Voter: globals.CONFIGURATION.PublicKey, ForLeaderPubkey: leaderToRotate, @@ -259,113 +258,126 @@ func GetLeaderRotationProof(parsedRequest WsLeaderRotationProofRequest, connecti AfpForFirstBlock: afpForFirstBlock, SkipData: localVotingData, } - jsonResponse, err := json.Marshal(responseData) - if err == nil { - connection.WriteMessage(gws.OpcodeText, jsonResponse) - } - } - } - } else { - - //________________________________________________ Verify the proposed AFP ________________________________________________ - afpIsOk := false - parts := strings.Split(propSkipData.Afp.BlockId, ":") - if len(parts) != 3 { return } - indexOfBlockInAfp, err := strconv.Atoi(parts[2]) - if err != nil { return } - if propSkipData.Index > -1 { - if propSkipData.Hash == propSkipData.Afp.BlockHash && propSkipData.Index == indexOfBlockInAfp { - afpIsOk = utils.VerifyAggregatedFinalizationProof(&propSkipData.Afp, epochHandler) - } - } else { - afpIsOk = true } - if afpIsOk { - dataToSignForLeaderRotation, firstBlockAfpIsOk := "", false - if parsedRequest.SkipData.Index == -1 { - dataToSignForLeaderRotation = "LEADER_ROTATION_PROOF:" + leaderToRotate dataToSignForLeaderRotation += ":0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef:-1" dataToSignForLeaderRotation += ":0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef:" + epochFullID - firstBlockAfpIsOk = true - } else if parsedRequest.SkipData.Index >= 0 { - blockIdOfFirstBlock := strconv.Itoa(epochIndex) + ":" + leaderToRotate + ":0" - if parsedRequest.AfpForFirstBlock.BlockId == blockIdOfFirstBlock && utils.VerifyAggregatedFinalizationProof(&parsedRequest.AfpForFirstBlock, epochHandler) { - firstBlockHash := parsedRequest.AfpForFirstBlock.BlockHash - dataToSignForLeaderRotation = "LEADER_ROTATION_PROOF:" + leaderToRotate + ":" + firstBlockHash + ":" + strconv.Itoa(propSkipData.Index) + ":" + propSkipData.Hash + ":" + epochFullID - firstBlockAfpIsOk = true - } - } + if firstBlockAfpIsOk { + leaderRotationProofMessage := WsLeaderRotationProofResponseOk{ + Voter: globals.CONFIGURATION.PublicKey, + ForLeaderPubkey: leaderToRotate, + Status: "OK", + Sig: cryptography.GenerateSignature(globals.CONFIGURATION.PrivateKey, dataToSignForLeaderRotation), + } + jsonResponse, err := json.Marshal(leaderRotationProofMessage) + if err == nil { + connection.WriteMessage(gws.OpcodeText, jsonResponse) + } + } + } + } + } +} - // If proof is ok - generate LRP(leader rotation proof) +func detectEpochIndexForRotation(parsedRequest WsLeaderRotationProofRequest, fallback int) int { - if firstBlockAfpIsOk { + if parsedRequest.SkipData.Afp.BlockId != "" { + if idx := epochIndexFromBlockId(parsedRequest.SkipData.Afp.BlockId); idx >= 0 { + return idx + } + } - leaderRotationProofMessage := WsLeaderRotationProofResponseOk{ + if parsedRequest.AfpForFirstBlock.BlockId != "" { + if idx := epochIndexFromBlockId(parsedRequest.AfpForFirstBlock.BlockId); idx >= 0 { + return idx + } + } - Voter: globals.CONFIGURATION.PublicKey, + return fallback +} - ForLeaderPubkey: leaderToRotate, +func epochIndexFromBlockId(blockId string) int { - Status: "OK", + parts := strings.Split(blockId, ":") + if len(parts) == 0 { + return -1 + } - Sig: cryptography.GenerateSignature(globals.CONFIGURATION.PrivateKey, dataToSignForLeaderRotation), - } + if idx, err := strconv.Atoi(parts[0]); err == nil { + return idx + } - jsonResponse, err := json.Marshal(leaderRotationProofMessage) + return -1 +} - if err == nil { +func getEpochHandlerForRotation(epochIndex int) *structures.EpochDataHandler { - connection.WriteMessage(gws.OpcodeText, jsonResponse) + handlers.APPROVEMENT_THREAD_METADATA.RWMutex.RLock() + currentHandler := handlers.APPROVEMENT_THREAD_METADATA.Handler.EpochDataHandler + handlers.APPROVEMENT_THREAD_METADATA.RWMutex.RUnlock() - } + if currentHandler.Id == epochIndex { + return ¤tHandler + } - } + handlers.FINALIZATION_THREAD_CACHE.RWMutex.RLock() + cached, ok := handlers.FINALIZATION_THREAD_CACHE.EpochHandlers[epochIndex] + handlers.FINALIZATION_THREAD_CACHE.RWMutex.RUnlock() - } + if ok { + return &cached + } + key := []byte("EPOCH_HANDLER:" + strconv.Itoa(epochIndex)) + if raw, err := databases.EPOCH_DATA.Get(key, nil); err == nil { + var stored structures.EpochDataHandler + if json.Unmarshal(raw, &stored) == nil { + handlers.FINALIZATION_THREAD_CACHE.RWMutex.Lock() + handlers.FINALIZATION_THREAD_CACHE.EpochHandlers[epochIndex] = stored + handlers.FINALIZATION_THREAD_CACHE.RWMutex.Unlock() + return &stored } - } + return nil } func GetBlockWithProof(parsedRequest WsBlockWithAfpRequest, connection *gws.Conn) {