diff --git a/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/JoinLockedBox.tsx b/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/JoinLockedBox.tsx index 9beb34c..2a2f4c7 100644 --- a/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/JoinLockedBox.tsx +++ b/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/JoinLockedBox.tsx @@ -1,4 +1,4 @@ -import { type FC, useMemo } from "react"; +import { type FC, useEffect, useMemo } from "react"; import { PortalPeerId } from "@/components/Box/components/PortalPeerId"; import { RelayReconnectingAlert } from "@/components/Box/components/RelayReconnectingAlert"; import { ShareAccessButton as ShareAccessButtonDumb } from "@/components/Box/components/ShareAccessButton"; @@ -19,6 +19,7 @@ import { NavigationAwayBlocker } from "../commons/components/NavigationAwayBlock import { PageTitle } from "../commons/components/PageTitle"; import { useDataChannelSendMessages } from "./dataChannelSendMessages"; import { useSendKeyToLeader } from "./hooks"; +import { keySharingWorkflowManager } from "./keySharingWorkflow"; import { useJoinLockedBoxConnection } from "./useJoinLockedBoxConnection"; export const JoinLockedBox: FC = () => { @@ -48,6 +49,14 @@ export const JoinLockedBox: FC = () => { }; const JoinLockedBoxContent: React.FC = () => { + useEffect(() => { + keySharingWorkflowManager.startAllWorkflows(); + + return () => { + keySharingWorkflowManager.reset(); + }; + }, []); + const state = useJoinLockedBoxStore((state) => state.state); const roomToken = useJoinLockedBoxStore((state) => state.roomToken); diff --git a/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/keySharingWorkflow.ts b/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/keySharingWorkflow.ts new file mode 100644 index 0000000..d9a05c6 --- /dev/null +++ b/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/keySharingWorkflow.ts @@ -0,0 +1,69 @@ +import { loggerGate } from "@icod2/protocols"; +import { WorkflowBuilder, WorkflowManager } from "@/services/libp2p/workflows"; +import { useJoinLockedBoxStore } from "@/stores/boxStore/joinLockedBoxStore"; +import { + type FollowerSendsPartialStateMessage, + isLeaderSendsPartialStateMessage, + isLeaderWelcome, + type LeaderSendsPartialStateMessage, +} from "../commons/leader-keyholder-interface"; +import { usePeerToHolderMapRef } from "../commons/usePeerToHolderMapRef"; + +export const keySharingWorkflowManager = new WorkflowManager(); + +const createKeySharingWorkflow = () => { + return new WorkflowBuilder("keySharing") + .waitForWithCondition(isLeaderWelcome, () => { + const { shareAccessKeyByKeyHolderId } = useJoinLockedBoxStore.getState(); + + const isThereAnyShared = Object.values(shareAccessKeyByKeyHolderId).some( + (x) => x === true, + ); + + return isThereAnyShared; + }) + .waitFor(isLeaderSendsPartialStateMessage, (message, _, proto) => { + const { connectedLeaderId, shareAccessKeyByKeyHolderId } = + useJoinLockedBoxStore.getState(); + + if (!connectedLeaderId) { + return; + } + + const { onlineKeyHolders } = message as LeaderSendsPartialStateMessage; + const onlineKeyHoldersIds = + onlineKeyHolders?.map((keyHolder) => keyHolder.id) ?? []; + + const keyHoldersIdsToSharedKeyWith = Object.entries( + shareAccessKeyByKeyHolderId, + ) + .filter( + ([keyHolderId, shared]) => + shared && onlineKeyHoldersIds.includes(keyHolderId), + ) + .map(([keyHolderId]) => keyHolderId); + + const leaderPeerId = usePeerToHolderMapRef + .getValue() + .getPeerId(connectedLeaderId); + + if (!leaderPeerId) { + loggerGate.canError && console.error("Leader peer ID not found"); + return; + } + + proto.sendMessageToPeer(leaderPeerId, { + type: "follower:send-partial-state", + keyHoldersIdsToSharedKeyWith, + } satisfies FollowerSendsPartialStateMessage); + }) + .onError((error) => { + console.error("Key sharing workflow error:", error); + + const state = useJoinLockedBoxStore.getState(); + state.actions.setError(`Workflow error: ${error.message}`); + }); +}; + +keySharingWorkflowManager.defineWorkflow(createKeySharingWorkflow().build()); +keySharingWorkflowManager.startWorkflow("keySharing"); diff --git a/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/useJoinLockedBoxConnection.ts b/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/useJoinLockedBoxConnection.ts index 8918eb9..09679af 100644 --- a/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/useJoinLockedBoxConnection.ts +++ b/fe/src/components/Box/sub-pages/RestoreBoxes/JoinLockedBox/useJoinLockedBoxConnection.ts @@ -4,6 +4,7 @@ import { useJoinLockedBoxStore } from "@/stores/boxStore/joinLockedBoxStore"; import { usePeerToHolderMapRef } from "../commons/usePeerToHolderMapRef"; import { router } from "./dataChannelRouter"; import { useDataChannelSendMessages } from "./dataChannelSendMessages"; +import { keySharingWorkflowManager } from "./keySharingWorkflow"; import { useOnChangeShareablePartOfState } from "./useSelectiveStatePusher"; export type JoinBoxConnectionError = ReturnType< @@ -28,11 +29,18 @@ export function useJoinLockedBoxConnection({ useEffect(() => { routerMng.addRouter("join-unlock-box", router.router); + routerMng.addRouter( + "join-unlock-box-workflows", + keySharingWorkflowManager.router, + ); + + console.log("useEffect in useJoinLockedBoxConnection"); return () => { routerMng.removeRouter("join-unlock-box"); + routerMng.removeRouter("join-unlock-box-workflows"); }; - }, [routerMng]); + }, [routerMng.addRouter, routerMng.removeRouter]); useEffect(() => { useJoinLockedBoxStore diff --git a/fe/src/components/Box/sub-pages/RestoreBoxes/OpenLockedBox/OpenLockedBox.tsx b/fe/src/components/Box/sub-pages/RestoreBoxes/OpenLockedBox/OpenLockedBox.tsx index 43d2d74..91aca2b 100644 --- a/fe/src/components/Box/sub-pages/RestoreBoxes/OpenLockedBox/OpenLockedBox.tsx +++ b/fe/src/components/Box/sub-pages/RestoreBoxes/OpenLockedBox/OpenLockedBox.tsx @@ -73,7 +73,7 @@ export const OpenLockedBoxContent: FC = () => { return () => { routerMng.removeRouter("open-locked-box"); }; - }, [routerMng]); + }, [routerMng.addRouter, routerMng.removeRouter]); const { sendKey } = useSendMessageProto({ peerMessageProtoRef: messageProto.peerMessageProtoRef, diff --git a/fe/src/services/libp2p/workflows/WorkflowBuilder.ts b/fe/src/services/libp2p/workflows/WorkflowBuilder.ts new file mode 100644 index 0000000..194d3b7 --- /dev/null +++ b/fe/src/services/libp2p/workflows/WorkflowBuilder.ts @@ -0,0 +1,47 @@ +import type { EventHandler, EventPredicate, WorkflowDefinition } from "./types"; + +export class WorkflowBuilder< + TMessage extends Record = Record, +> { + private workflow: Pick, "steps"> & + Partial, "steps">> = { + steps: [], + }; + + constructor(name: string) { + this.workflow.name = name; + } + + waitFor( + event: EventPredicate, + handler?: EventHandler, + ): WorkflowBuilder { + this.workflow.steps.push({ + event: event as EventPredicate, + handler: handler as EventHandler | undefined, + }); + return this; + } + + waitForWithCondition( + event: EventPredicate, + condition: () => boolean, + handler?: EventHandler, + ): WorkflowBuilder { + this.workflow.steps.push({ + event: event as EventPredicate, + condition, + handler: handler as EventHandler | undefined, + }); + return this; + } + + onError(handler: (error: Error) => void): WorkflowBuilder { + this.workflow.onError = handler; + return this; + } + + public build(): WorkflowDefinition { + return this.workflow as WorkflowDefinition; + } +} diff --git a/fe/src/services/libp2p/workflows/WorkflowManager.ts b/fe/src/services/libp2p/workflows/WorkflowManager.ts new file mode 100644 index 0000000..3b02d17 --- /dev/null +++ b/fe/src/services/libp2p/workflows/WorkflowManager.ts @@ -0,0 +1,153 @@ +import type { BasicProtoInterface, RouterItem } from "@/services/libp2p"; +import type { WorkflowDefinition, WorkflowState } from "./types"; + +export class WorkflowManager< + TMessage extends Record = Record, + TProto extends BasicProtoInterface = BasicProtoInterface, +> { + private workflows: Map> = new Map(); + private workflowStates: Map = new Map(); + private eventBuffer: Array<{ peerId: string; message: TMessage }> = []; + private isProcessing = false; + private lastReferencedProto: TProto | undefined; + readonly router: RouterItem; + + constructor() { + this.router = (peerId, message, proto) => { + this.lastReferencedProto = proto; + this.handleIncomingMessage(peerId, message); + }; + } + + defineWorkflow(workflow: WorkflowDefinition): void { + this.workflows.set(workflow.name, workflow); + this.workflowStates.set(workflow.name, { + currentStepIndex: 0, + isActive: false, + data: {}, + }); + } + + startWorkflow(name: string, initialData?: Record): void { + const workflow = this.workflows.get(name); + if (!workflow) { + throw new Error(`Workflow "${name}" not found`); + } + + const state = this.workflowStates.get(name); + if (!state) { + throw new Error(`Workflow state for "${name}" not found`); + } + + state.isActive = true; + state.currentStepIndex = 0; + state.data = { ...initialData }; + + this.processEventBuffer(); + } + + startAllWorkflows(): void { + this.workflowStates.forEach((state, name) => { + if (!state.isActive) { + this.startWorkflow(name); + } + }); + } + + stopWorkflow(name: string): void { + const state = this.workflowStates.get(name); + if (state) { + state.isActive = false; + state.currentStepIndex = 0; + state.data = {}; + } + } + + reset(): void { + this.workflowStates.forEach((state) => { + state.isActive = false; + state.currentStepIndex = 0; + state.data = {}; + }); + this.eventBuffer = []; + } + + getWorkflowState(name: string): WorkflowState | undefined { + return this.workflowStates.get(name); + } + + updateWorkflowData(name: string, data: Record): void { + const state = this.workflowStates.get(name); + if (state) { + state.data = { ...state.data, ...data }; + } + } + + private handleIncomingMessage(peerId: string, message: TMessage): void { + this.eventBuffer.push({ peerId, message }); + + if (!this.isProcessing) { + this.processEventBuffer(); + } + } + + private async processEventBuffer(): Promise { + if (this.isProcessing || this.eventBuffer.length === 0) { + return; + } + + this.isProcessing = true; + + while (this.eventBuffer.length > 0) { + const event = this.eventBuffer.shift(); + if (!event) continue; + + await this.processEvent(event.peerId, event.message); + } + + this.isProcessing = false; + } + + private async processEvent(peerId: string, message: TMessage): Promise { + for (const [workflowName, workflow] of this.workflows.entries()) { + const state = this.workflowStates.get(workflowName); + if (!state || !state.isActive) continue; + + const currentStep = workflow.steps[state.currentStepIndex]; + if (!currentStep) continue; + + try { + if (!currentStep.event(message)) continue; + + if (currentStep.condition && !currentStep.condition()) continue; + + state.data._lastPeerId = peerId; + state.data._lastMessage = message; + + if (currentStep.handler) { + await currentStep.handler( + message, + workflow, + // biome-ignore lint/style/noNonNullAssertion: I'm positive it will always be not null + this.lastReferencedProto!, + ); + } + + state.currentStepIndex++; + + if (state.currentStepIndex >= workflow.steps.length) { + state.isActive = false; + state.currentStepIndex = 0; + } + } catch (error) { + if (workflow.onError) { + workflow.onError(error as Error); + } + + state.isActive = false; + state.currentStepIndex = 0; + state.data = {}; + } + } + } +} diff --git a/fe/src/services/libp2p/workflows/index.ts b/fe/src/services/libp2p/workflows/index.ts new file mode 100644 index 0000000..3c6c483 --- /dev/null +++ b/fe/src/services/libp2p/workflows/index.ts @@ -0,0 +1,3 @@ +export * from "./types"; +export { WorkflowBuilder } from "./WorkflowBuilder"; +export { WorkflowManager } from "./WorkflowManager"; diff --git a/fe/src/services/libp2p/workflows/types.ts b/fe/src/services/libp2p/workflows/types.ts new file mode 100644 index 0000000..0494205 --- /dev/null +++ b/fe/src/services/libp2p/workflows/types.ts @@ -0,0 +1,27 @@ +import type { BasicProtoInterface } from "../types"; + +export type EventPredicate = (data: T) => boolean; +export type EventHandler> = ( + data: T, + workflow: WorkflowDefinition, + proto: TProto, +) => void | Promise; + +export interface WorkflowStep { + event: EventPredicate; + condition?: () => boolean; + handler?: EventHandler; +} + +export interface WorkflowDefinition { + name: string; + steps: WorkflowStep[]; + onComplete?: () => void | Promise; + onError?: (error: Error) => void; +} + +export interface WorkflowState { + currentStepIndex: number; + isActive: boolean; + data: Record; +}