Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type FC, useMemo } from "react";
import { type FC, useEffect, useMemo } from "react";
import { PortalPeerId } from "@/components/Box/components/PortalPeerId";
import { RelayReconnectingAlert } from "@/components/Box/components/RelayReconnectingAlert";
import { ShareAccessButton as ShareAccessButtonDumb } from "@/components/Box/components/ShareAccessButton";
Expand All @@ -19,6 +19,7 @@ import { NavigationAwayBlocker } from "../commons/components/NavigationAwayBlock
import { PageTitle } from "../commons/components/PageTitle";
import { useDataChannelSendMessages } from "./dataChannelSendMessages";
import { useSendKeyToLeader } from "./hooks";
import { keySharingWorkflowManager } from "./keySharingWorkflow";
import { useJoinLockedBoxConnection } from "./useJoinLockedBoxConnection";

export const JoinLockedBox: FC = () => {
Expand Down Expand Up @@ -48,6 +49,14 @@ export const JoinLockedBox: FC = () => {
};

const JoinLockedBoxContent: React.FC = () => {
useEffect(() => {
keySharingWorkflowManager.startAllWorkflows();

return () => {
keySharingWorkflowManager.reset();
};
}, []);

const state = useJoinLockedBoxStore((state) => state.state);
const roomToken = useJoinLockedBoxStore((state) => state.roomToken);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { loggerGate } from "@icod2/protocols";
import { WorkflowBuilder, WorkflowManager } from "@/services/libp2p/workflows";
import { useJoinLockedBoxStore } from "@/stores/boxStore/joinLockedBoxStore";
import {
type FollowerSendsPartialStateMessage,
isLeaderSendsPartialStateMessage,
isLeaderWelcome,
type LeaderSendsPartialStateMessage,
} from "../commons/leader-keyholder-interface";
import { usePeerToHolderMapRef } from "../commons/usePeerToHolderMapRef";

export const keySharingWorkflowManager = new WorkflowManager();

const createKeySharingWorkflow = () => {
return new WorkflowBuilder("keySharing")
.waitForWithCondition(isLeaderWelcome, () => {
const { shareAccessKeyByKeyHolderId } = useJoinLockedBoxStore.getState();

const isThereAnyShared = Object.values(shareAccessKeyByKeyHolderId).some(
(x) => x === true,
);

return isThereAnyShared;
})
.waitFor(isLeaderSendsPartialStateMessage, (message, _, proto) => {
const { connectedLeaderId, shareAccessKeyByKeyHolderId } =
useJoinLockedBoxStore.getState();

if (!connectedLeaderId) {
return;
}

const { onlineKeyHolders } = message as LeaderSendsPartialStateMessage;
const onlineKeyHoldersIds =
onlineKeyHolders?.map((keyHolder) => keyHolder.id) ?? [];

const keyHoldersIdsToSharedKeyWith = Object.entries(
shareAccessKeyByKeyHolderId,
)
.filter(
([keyHolderId, shared]) =>
shared && onlineKeyHoldersIds.includes(keyHolderId),
)
.map(([keyHolderId]) => keyHolderId);

const leaderPeerId = usePeerToHolderMapRef
.getValue()
.getPeerId(connectedLeaderId);

if (!leaderPeerId) {
loggerGate.canError && console.error("Leader peer ID not found");
return;
}

proto.sendMessageToPeer(leaderPeerId, {
type: "follower:send-partial-state",
keyHoldersIdsToSharedKeyWith,
} satisfies FollowerSendsPartialStateMessage);
})
.onError((error) => {
console.error("Key sharing workflow error:", error);

const state = useJoinLockedBoxStore.getState();
state.actions.setError(`Workflow error: ${error.message}`);
});
};

keySharingWorkflowManager.defineWorkflow(createKeySharingWorkflow().build());
keySharingWorkflowManager.startWorkflow("keySharing");
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { useJoinLockedBoxStore } from "@/stores/boxStore/joinLockedBoxStore";
import { usePeerToHolderMapRef } from "../commons/usePeerToHolderMapRef";
import { router } from "./dataChannelRouter";
import { useDataChannelSendMessages } from "./dataChannelSendMessages";
import { keySharingWorkflowManager } from "./keySharingWorkflow";
import { useOnChangeShareablePartOfState } from "./useSelectiveStatePusher";

export type JoinBoxConnectionError = ReturnType<
Expand All @@ -28,11 +29,18 @@ export function useJoinLockedBoxConnection({

useEffect(() => {
routerMng.addRouter("join-unlock-box", router.router);
routerMng.addRouter(
"join-unlock-box-workflows",
keySharingWorkflowManager.router,
);

console.log("useEffect in useJoinLockedBoxConnection");

return () => {
routerMng.removeRouter("join-unlock-box");
routerMng.removeRouter("join-unlock-box-workflows");
};
}, [routerMng]);
}, [routerMng.addRouter, routerMng.removeRouter]);

useEffect(() => {
useJoinLockedBoxStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export const OpenLockedBoxContent: FC = () => {
return () => {
routerMng.removeRouter("open-locked-box");
};
}, [routerMng]);
}, [routerMng.addRouter, routerMng.removeRouter]);

const { sendKey } = useSendMessageProto({
peerMessageProtoRef: messageProto.peerMessageProtoRef,
Expand Down
47 changes: 47 additions & 0 deletions fe/src/services/libp2p/workflows/WorkflowBuilder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { EventHandler, EventPredicate, WorkflowDefinition } from "./types";

export class WorkflowBuilder<
TMessage extends Record<string, unknown> = Record<string, unknown>,
> {
private workflow: Pick<WorkflowDefinition<TMessage>, "steps"> &
Partial<Omit<WorkflowDefinition<TMessage>, "steps">> = {
steps: [],
};

constructor(name: string) {
this.workflow.name = name;
}

waitFor<TSpecificMessage extends TMessage>(
event: EventPredicate<TSpecificMessage>,
handler?: EventHandler<TSpecificMessage>,
): WorkflowBuilder<TMessage> {
this.workflow.steps.push({
event: event as EventPredicate<TMessage>,
handler: handler as EventHandler<TMessage> | undefined,
});
return this;
}

waitForWithCondition<TSpecificMessage extends TMessage>(
event: EventPredicate<TSpecificMessage>,
condition: () => boolean,
handler?: EventHandler<TSpecificMessage>,
): WorkflowBuilder<TMessage> {
this.workflow.steps.push({
event: event as EventPredicate<TMessage>,
condition,
handler: handler as EventHandler<TMessage> | undefined,
});
return this;
}

onError(handler: (error: Error) => void): WorkflowBuilder<TMessage> {
this.workflow.onError = handler;
return this;
}

public build(): WorkflowDefinition<TMessage> {
return this.workflow as WorkflowDefinition<TMessage>;
}
}
153 changes: 153 additions & 0 deletions fe/src/services/libp2p/workflows/WorkflowManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import type { BasicProtoInterface, RouterItem } from "@/services/libp2p";
import type { WorkflowDefinition, WorkflowState } from "./types";

export class WorkflowManager<
TMessage extends Record<string, unknown> = Record<string, unknown>,
TProto extends BasicProtoInterface<TMessage> = BasicProtoInterface<TMessage>,
> {
private workflows: Map<string, WorkflowDefinition<TMessage>> = new Map();
private workflowStates: Map<string, WorkflowState> = new Map();
private eventBuffer: Array<{ peerId: string; message: TMessage }> = [];
private isProcessing = false;
private lastReferencedProto: TProto | undefined;
readonly router: RouterItem<TMessage, TProto>;

constructor() {
this.router = (peerId, message, proto) => {
this.lastReferencedProto = proto;
this.handleIncomingMessage(peerId, message);
};
}

defineWorkflow(workflow: WorkflowDefinition<TMessage>): void {
this.workflows.set(workflow.name, workflow);
this.workflowStates.set(workflow.name, {
currentStepIndex: 0,
isActive: false,
data: {},
});
}

startWorkflow(name: string, initialData?: Record<string, unknown>): void {
const workflow = this.workflows.get(name);
if (!workflow) {
throw new Error(`Workflow "${name}" not found`);
}

const state = this.workflowStates.get(name);
if (!state) {
throw new Error(`Workflow state for "${name}" not found`);
}

state.isActive = true;
state.currentStepIndex = 0;
state.data = { ...initialData };

this.processEventBuffer();
}

startAllWorkflows(): void {
this.workflowStates.forEach((state, name) => {
if (!state.isActive) {
this.startWorkflow(name);
}
});
}

stopWorkflow(name: string): void {
const state = this.workflowStates.get(name);
if (state) {
state.isActive = false;
state.currentStepIndex = 0;
state.data = {};
}
}

reset(): void {
this.workflowStates.forEach((state) => {
state.isActive = false;
state.currentStepIndex = 0;
state.data = {};
});
this.eventBuffer = [];
}

getWorkflowState(name: string): WorkflowState | undefined {
return this.workflowStates.get(name);
}

updateWorkflowData(name: string, data: Record<string, unknown>): void {
const state = this.workflowStates.get(name);
if (state) {
state.data = { ...state.data, ...data };
}
}

private handleIncomingMessage(peerId: string, message: TMessage): void {
this.eventBuffer.push({ peerId, message });

if (!this.isProcessing) {
this.processEventBuffer();
}
}

private async processEventBuffer(): Promise<void> {
if (this.isProcessing || this.eventBuffer.length === 0) {
return;
}

this.isProcessing = true;

while (this.eventBuffer.length > 0) {
const event = this.eventBuffer.shift();
if (!event) continue;

await this.processEvent(event.peerId, event.message);
}

this.isProcessing = false;
}

private async processEvent(peerId: string, message: TMessage): Promise<void> {
for (const [workflowName, workflow] of this.workflows.entries()) {
const state = this.workflowStates.get(workflowName);
if (!state || !state.isActive) continue;

const currentStep = workflow.steps[state.currentStepIndex];
if (!currentStep) continue;

try {
if (!currentStep.event(message)) continue;

if (currentStep.condition && !currentStep.condition()) continue;

state.data._lastPeerId = peerId;
state.data._lastMessage = message;

if (currentStep.handler) {
await currentStep.handler(
message,
workflow,
// biome-ignore lint/style/noNonNullAssertion: I'm positive it will always be not null
this.lastReferencedProto!,
);
}

state.currentStepIndex++;

if (state.currentStepIndex >= workflow.steps.length) {
state.isActive = false;
state.currentStepIndex = 0;
}
} catch (error) {
if (workflow.onError) {
workflow.onError(error as Error);
}

state.isActive = false;
state.currentStepIndex = 0;
state.data = {};
}
}
}
}
3 changes: 3 additions & 0 deletions fe/src/services/libp2p/workflows/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from "./types";
export { WorkflowBuilder } from "./WorkflowBuilder";
export { WorkflowManager } from "./WorkflowManager";
27 changes: 27 additions & 0 deletions fe/src/services/libp2p/workflows/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import type { BasicProtoInterface } from "../types";

export type EventPredicate<T = unknown> = (data: T) => boolean;
export type EventHandler<T = unknown, TProto = BasicProtoInterface<T>> = (
data: T,
workflow: WorkflowDefinition<T>,
proto: TProto,
) => void | Promise<void>;

export interface WorkflowStep<T = unknown> {
event: EventPredicate<T>;
condition?: () => boolean;
handler?: EventHandler<T>;
}

export interface WorkflowDefinition<T = unknown> {
name: string;
steps: WorkflowStep<T>[];
onComplete?: () => void | Promise<void>;
onError?: (error: Error) => void;
}

export interface WorkflowState {
currentStepIndex: number;
isActive: boolean;
data: Record<string, unknown>;
}