diff --git a/packages/matter-server/src/MatterServer.ts b/packages/matter-server/src/MatterServer.ts index 7b5670d5..bf3ecfed 100644 --- a/packages/matter-server/src/MatterServer.ts +++ b/packages/matter-server/src/MatterServer.ts @@ -151,6 +151,7 @@ async function start() { disableOtaProvider: cliOptions.disableOta, serverId: legacyData.serverId, serverVersion: MATTER_SERVER_VERSION, + enableTimeSync: cliOptions.enableTimeSync, }, legacyServerData, ); diff --git a/packages/matter-server/src/cli.ts b/packages/matter-server/src/cli.ts index 2bf40221..d6a5de63 100644 --- a/packages/matter-server/src/cli.ts +++ b/packages/matter-server/src/cli.ts @@ -62,6 +62,9 @@ export interface CliOptions { disableOta: boolean; otaProviderDir: string | null; + // Time synchronization configuration + enableTimeSync: boolean; + // Dashboard configuration disableDashboard: boolean; productionMode: boolean; @@ -164,6 +167,16 @@ export function parseCliArgs(argv?: string[]): CliOptions { .env("DISABLE_OTA"), ) .addOption(new Option("--ota-provider-dir ", "Directory for OTA Provider files").env("OTA_PROVIDER_DIR")) + .addOption( + new Option( + "--enable-time-sync [value]", + "Enable time synchronization for nodes with the TimeSynchronization cluster. Only enable when host NTP is reliable.", + ) + .argParser(parseBooleanEnv) + .preset(true) + .default(false) + .env("ENABLE_TIME_SYNC"), + ) .addOption( new Option("--disable-dashboard [value]", "Disable the web dashboard") .argParser(parseBooleanEnv) @@ -245,6 +258,7 @@ export function parseCliArgs(argv?: string[]): CliOptions { bluetoothAdapter: opts.bluetoothAdapter ?? null, disableOta: opts.disableOta, otaProviderDir: opts.otaProviderDir ?? null, + enableTimeSync: opts.enableTimeSync, disableDashboard: opts.disableDashboard, productionMode: opts.productionMode, }; diff --git a/packages/ws-controller/src/controller/ControllerCommandHandler.ts b/packages/ws-controller/src/controller/ControllerCommandHandler.ts index 8ce8b55e..2750a07b 100644 --- a/packages/ws-controller/src/controller/ControllerCommandHandler.ts +++ b/packages/ws-controller/src/controller/ControllerCommandHandler.ts @@ -18,6 +18,7 @@ import { Minutes, NodeId, Observable, + ObserverGroup, Seconds, ServerAddress, ServerAddressUdp, @@ -35,6 +36,7 @@ import { BridgedDeviceBasicInformation, GeneralCommissioning, OperationalCredentials, + TimeSynchronization, } from "@matter/main/clusters"; import { DecodedAttributeReportValue, @@ -99,12 +101,17 @@ import { formatNodeId } from "../util/formatNodeId.js"; import { pingIp } from "../util/network.js"; import { CustomClusterPoller } from "./CustomClusterPoller.js"; import { Nodes } from "./Nodes.js"; +import { TimeSyncManager } from "./TimeSyncManager.js"; const logger = Logger.get("ControllerCommandHandler"); /** After this duration in Reconnecting state, declare the node unavailable */ const RECONNECT_TIMEOUT = Minutes(3); +// timeFailure event ID within TimeSynchronization cluster (0x0038) +const TIME_SYNC_CLUSTER_ID = 0x0038; +const TIME_FAILURE_EVENT_ID = 0x03; + /** * Cluster IDs whose attribute changes should trigger a full node_updated broadcast. * BasicInformation (0x28) covers firmware version, product name, etc. @@ -156,6 +163,10 @@ export class ControllerCommandHandler { #availableUpdates = new Map(); /** Poller for custom cluster attributes (Eve energy, etc.) */ #customClusterPoller: CustomClusterPoller; + /** Manages time synchronization for nodes with the TimeSynchronization cluster */ + #timeSyncManager?: TimeSyncManager; + /** Per-node ObserverGroups for cleanup on decommission */ + #nodeObservers = new Map(); /** Per-node timers that fire when Reconnecting state exceeds the timeout */ #reconnectTimers = new Map(); /** Track in-flight invoke-commands for deduplication across all WebSocket connections */ @@ -175,7 +186,12 @@ export class ControllerCommandHandler { }; #peers?: PeerSet; - constructor(controllerInstance: CommissioningController, bleEnabled: boolean, otaEnabled: boolean) { + constructor( + controllerInstance: CommissioningController, + bleEnabled: boolean, + otaEnabled: boolean, + timeSyncEnabled = false, + ) { this.#controller = controllerInstance; this.#bleEnabled = bleEnabled; @@ -189,6 +205,14 @@ export class ControllerCommandHandler { handleReadAttributes: (peer, paths, fabricFiltered) => this.handleReadAttributes(peer.nodeId, paths, fabricFiltered), }); + + if (timeSyncEnabled) { + logger.info("Time synchronization enabled"); + this.#timeSyncManager = new TimeSyncManager({ + syncTime: peer => this.#syncNodeTime(peer.nodeId), + nodeConnected: peer => !!(this.#nodes.has(peer.nodeId) && this.#nodes.get(peer.nodeId).isConnected), + }); + } } /** @@ -283,12 +307,33 @@ export class ControllerCommandHandler { } } + /** + * Send a setUtcTime command to a node's TimeSynchronization cluster. + */ + async #syncNodeTime(nodeId: NodeId): Promise { + await this.#invokeCommand(this.#nodes.get(nodeId).node, { + endpoint: EndpointNumber(0), + cluster: TimeSynchronization.Cluster, + command: "setUtcTime", + fields: { + utcTime: Time.nowMs * 1000, + granularity: TimeSynchronization.Granularity.MillisecondsGranularity, + timeSource: TimeSynchronization.TimeSource.Admin, + }, + }); + } + async close() { for (const timer of this.#reconnectTimers.values()) { timer.stop(); } this.#reconnectTimers.clear(); await this.#customClusterPoller.stop(); + await this.#timeSyncManager?.stop(); + for (const observers of this.#nodeObservers.values()) { + observers.close(); + } + this.#nodeObservers.clear(); if (!this.#started) { return; } @@ -299,11 +344,15 @@ export class ControllerCommandHandler { const node = await this.#controller.getNode(nodeId); const attributeCache = this.#nodes.attributeCache; + // Per-node ObserverGroup so all subscriptions are cleaned up on decommission + const nodeObservers = new ObserverGroup(); + this.#nodeObservers.set(nodeId, nodeObservers); + // Wire all Events to the Event emitters // Track if a BasicInformation or BridgedDeviceBasicInformation attribute changed during // a subscription batch. When the batch ends (connectionAlive), emit a full node_updated. let basicInfoChangedInBatch = false; - node.events.attributeChanged.on(data => { + nodeObservers.on(node.events.attributeChanged, data => { // Update the attribute cache with the new value in WebSocket format attributeCache.updateAttribute(nodeId, data); // Then emit the event for listeners @@ -313,21 +362,33 @@ export class ControllerCommandHandler { basicInfoChangedInBatch = true; } }); - node.events.connectionAlive.on(() => { + nodeObservers.on(node.events.connectionAlive, () => { if (basicInfoChangedInBatch) { basicInfoChangedInBatch = false; logger.info(`Node ${this.formatNode(nodeId)} basic information changed, sending full node_updated`); this.events.nodeStructureChanged.emit(nodeId); } }); - node.events.eventTriggered.on(data => this.events.eventChanged.emit(nodeId, data)); - node.events.stateChanged.on(state => { + nodeObservers.on(node.events.eventTriggered, data => { + this.events.eventChanged.emit(nodeId, data); + // Filter timeFailure events to trigger time sync + if ( + this.#timeSyncManager !== undefined && + data.path.clusterId === TIME_SYNC_CLUSTER_ID && + data.path.eventId === TIME_FAILURE_EVENT_ID + ) { + logger.debug(`Received timeFailure event from node ${this.formatNode(nodeId)}, triggering time sync`); + this.#timeSyncManager.syncNode(this.#peerOf(nodeId)); + } + }); + nodeObservers.on(node.events.stateChanged, state => { // Only refresh cache on Connected state if (state === NodeStates.Connected) { attributeCache.update(node); const attributes = attributeCache.get(nodeId); if (attributes) { this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes); + this.#timeSyncManager?.registerNode(this.#peerOf(nodeId), attributes); } } @@ -367,7 +428,7 @@ export class ControllerCommandHandler { this.events.nodeAvailabilityChanged.emit(nodeId, result.available); } }); - node.events.structureChanged.on(() => { + nodeObservers.on(node.events.structureChanged, () => { // Structure changed means endpoints may have been added/removed, refresh cache if (node.isConnected) { attributeCache.update(node); @@ -380,12 +441,16 @@ export class ControllerCommandHandler { this.events.nodeEndpointAdded.emit(nodeId, endpointId); } }); - node.events.decommissioned.on(() => { + nodeObservers.on(node.events.decommissioned, () => { this.#cleanupNodeAfterRemoval(nodeId); this.events.nodeDecommissioned.emit(nodeId); }); - node.events.nodeEndpointAdded.on(endpointId => this.#nodes.queueEndpointAdded(nodeId, endpointId)); - node.events.nodeEndpointRemoved.on(endpointId => this.events.nodeEndpointRemoved.emit(nodeId, endpointId)); + nodeObservers.on(node.events.nodeEndpointAdded, endpointId => + this.#nodes.queueEndpointAdded(nodeId, endpointId), + ); + nodeObservers.on(node.events.nodeEndpointRemoved, endpointId => + this.events.nodeEndpointRemoved.emit(nodeId, endpointId), + ); // Store the node for direct access this.#nodes.set(nodeId, node); @@ -395,10 +460,10 @@ export class ControllerCommandHandler { // Initialize attribute cache if node is already initialized if (node.initialized) { attributeCache.add(node); - // Register for custom cluster polling (e.g., Eve energy) const attributes = attributeCache.get(nodeId); if (attributes) { this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes); + this.#timeSyncManager?.registerNode(this.#peerOf(nodeId), attributes); } } @@ -1051,8 +1116,11 @@ export class ControllerCommandHandler { #cleanupNodeAfterRemoval(nodeId: NodeId) { this.#reconnectTimers.get(nodeId)?.stop(); this.#reconnectTimers.delete(nodeId); + this.#nodeObservers.get(nodeId)?.close(); + this.#nodeObservers.delete(nodeId); this.#nodes.delete(nodeId); this.#customClusterPoller.unregisterNode(this.#peerOf(nodeId)); + this.#timeSyncManager?.unregisterNode(this.#peerOf(nodeId)); this.#availableUpdates.delete(nodeId); } diff --git a/packages/ws-controller/src/controller/CustomClusterPoller.ts b/packages/ws-controller/src/controller/CustomClusterPoller.ts index 92b2eec2..800b6a6e 100644 --- a/packages/ws-controller/src/controller/CustomClusterPoller.ts +++ b/packages/ws-controller/src/controller/CustomClusterPoller.ts @@ -10,10 +10,11 @@ * a custom cluster without standard Matter subscription support. */ -import { CancelablePromise, Duration, Logger, Millis, Time, Timer } from "@matter/main"; +import { Logger } from "@matter/main"; import { PeerAddress, PeerAddressMap } from "@matter/main/protocol"; import { AttributesData } from "../types/CommandHandler.js"; import { formatNodeId } from "../util/formatNodeId.js"; +import { NodeProcessor } from "./NodeProcessor.js"; const logger = Logger.get("CustomClusterPoller"); @@ -37,8 +38,8 @@ const ELECTRICAL_POWER_MEASUREMENT_CLUSTER_ID = 0x0090; // 144 // Polling interval in milliseconds (60 seconds) const POLLING_INTERVAL_MS = 60_000; -// Maximum initial delay in milliseconds (random 30-60s to stagger startup) -const MAX_INITIAL_DELAY_MS = 30_000; +// Initial delay range: random 30-60s to stagger startup +const INITIAL_DELAY_MS = 30_000; // Attribute path format: endpoint/cluster/attribute type AttributePath = string; @@ -117,19 +118,14 @@ export function checkPolledAttributes(attributes: AttributesData): Set>(); - #pollerTimer: Timer; - #attributeReader: NodeAttributeReader; - #isPolling = false; - #currentDelayPromise?: CancelablePromise; + readonly #attributeReader: NodeAttributeReader; #currentReadPromise?: Promise; - #closed = false; constructor(attributeReader: NodeAttributeReader) { + super("eve-poller", INITIAL_DELAY_MS + Math.random() * INITIAL_DELAY_MS, POLLING_INTERVAL_MS); this.#attributeReader = attributeReader; - const delay = Millis(MAX_INITIAL_DELAY_MS + Math.random() * MAX_INITIAL_DELAY_MS); - this.#pollerTimer = Time.getTimer("eve-poller", delay, () => this.#pollAllNodes()); } /** @@ -140,126 +136,47 @@ export class CustomClusterPoller { const attributesToPoll = checkPolledAttributes(attributes); if (attributesToPoll.size === 0) { - // Remove from polling if it was previously registered this.unregisterNode(peer); return; } this.#polledAttributes.set(peer, attributesToPoll); - logger.info( - `Registered node ${formatNodeId(peer)} for custom attribute polling: ${Array.from(attributesToPoll).join(", ")}`, - ); + if (this.registerPeer(peer)) { + logger.info( + `Registered node ${formatNodeId(peer)} for custom attribute polling: ${Array.from(attributesToPoll).join(", ")}`, + ); + } - // Start the poller if not already running - this.#schedulePoller(); + this.scheduleIfNeeded(); } /** * Unregister a node from polling (e.g., when decommissioned or disconnected). */ unregisterNode(peer: PeerAddress): void { - if (this.#polledAttributes.delete(peer)) { + this.#polledAttributes.delete(peer); + if (this.unregisterPeer(peer)) { logger.info(`Unregistered node ${formatNodeId(peer)} from custom attribute polling`); } - if (this.#polledAttributes.size === 0) { - this.#pollerTimer.stop(); - } } - /** - * Stop all polling and cleanup. Awaits any in-flight read operation. - */ - async stop(): Promise { - this.#closed = true; - this.#currentDelayPromise?.cancel(new Error("Close")); - this.#pollerTimer?.stop(); - this.#polledAttributes.clear(); + override async stop(): Promise { + await super.stop(); if (this.#currentReadPromise) { await this.#currentReadPromise; } + this.#polledAttributes.clear(); logger.info("Custom attribute poller stopped"); } - /** - * Schedule the next polling cycle. - * Uses a random initial delay (0-30s) on first run to stagger startup, - * then polls every 30s thereafter. - */ - #schedulePoller(): void { - // No schedule if no nodes to poll - if (this.#polledAttributes.size === 0 || this.#closed) { - return; - } - - // Don't schedule if already scheduled - if (this.#pollerTimer?.isRunning || this.#isPolling) { - return; - } - - // Set the new interval - this.#pollerTimer.start(); + protected override shouldProcess(peer: PeerAddress): boolean { + return this.#attributeReader.nodeConnected(peer); } - /** - * Poll all registered nodes for their custom attributes. - */ - async #pollAllNodes(): Promise { - if (this.#isPolling) { - // Already polling, schedule next cycle - return; - } + protected override async processNode(peer: PeerAddress): Promise { + const attributePaths = this.#polledAttributes.get(peer); + if (!attributePaths) return; - const targetInterval = Millis(POLLING_INTERVAL_MS); - if (this.#pollerTimer.interval !== targetInterval) { - this.#pollerTimer.interval = targetInterval; - } - - this.#isPolling = true; - - let polledNodes = 0; - try { - const entries = Array.from(this.#polledAttributes.entries()); - for (let i = 0; i < entries.length; i++) { - if (this.#closed) { - break; - } - const [peer, attributePaths] = entries[i]; - if (!this.#polledAttributes.has(peer)) { - // Node was removed, so skip it - continue; - } - polledNodes++; - await this.#pollNode(peer, attributePaths); - // Small delay between nodes to avoid overwhelming the network - // Only add this delay if there are more nodes remaining to be polled - if (i < entries.length - 1) { - this.#currentDelayPromise = Time.sleep("sleep", Millis(2_000)).finally(() => { - this.#currentDelayPromise = undefined; - }); - await this.#currentDelayPromise; - } - } - } finally { - this.#isPolling = false; - // Schedule next polling cycle - this.#schedulePoller(); - } - if (polledNodes > 0) { - logger.info( - `Polled ${polledNodes} nodes for energy data. Scheduling next poll in ${Duration.format(this.#pollerTimer.interval)}`, - ); - } - } - - /** - * Poll a single node for its custom attributes. - * The read will automatically trigger change events through the normal attribute flow. - */ - async #pollNode(peer: PeerAddress, attributePaths: Set): Promise { - if (!this.#attributeReader.nodeConnected(peer)) { - logger.debug(`Node ${formatNodeId(peer)} not connected, skipping custom attribute polling`); - return; - } const paths = Array.from(attributePaths); logger.debug(`Polling ${paths.length} custom attributes for node ${formatNodeId(peer)}`); @@ -278,4 +195,10 @@ export class CustomClusterPoller { this.#currentReadPromise = undefined; } } + + protected override onCycleComplete(processedCount: number, intervalFormatted: string): void { + if (processedCount > 0) { + logger.info(`Polled ${processedCount} nodes for energy data. Next poll in ${intervalFormatted}`); + } + } } diff --git a/packages/ws-controller/src/controller/MatterController.ts b/packages/ws-controller/src/controller/MatterController.ts index 16da9c23..684a5112 100644 --- a/packages/ws-controller/src/controller/MatterController.ts +++ b/packages/ws-controller/src/controller/MatterController.ts @@ -47,6 +47,8 @@ export interface MatterControllerOptions { serverId?: string; /** Server version string (e.g., "0.2.10" or "0.2.10-alpha.0"). Used for BasicInformation cluster. */ serverVersion?: string; + /** Enable time synchronization for nodes with the TimeSynchronization cluster. Only enable when host NTP is reliable. */ + enableTimeSync?: boolean; } /** @@ -76,6 +78,7 @@ export class MatterController { #legacyCommissionedDates?: Map; #enableTestNetDcl = false; #disableOtaProvider = true; + #enableTimeSync = false; readonly #borderRouterDiscovery: BorderRouterDiscovery; static async create( @@ -152,6 +155,7 @@ export class MatterController { this.#serverVersion = options.serverVersion ?? "0.0.0"; this.#enableTestNetDcl = options.enableTestNetDcl ?? this.#enableTestNetDcl; this.#disableOtaProvider = options.disableOtaProvider ?? this.#disableOtaProvider; + this.#enableTimeSync = options.enableTimeSync ?? this.#enableTimeSync; } protected async initialize( @@ -192,6 +196,7 @@ export class MatterController { this.#controllerInstance, this.#env.vars.get("ble.enable", false), !this.#disableOtaProvider, + this.#enableTimeSync, ); this.#commandHandler.events.started.once(async () => { diff --git a/packages/ws-controller/src/controller/NodeProcessor.ts b/packages/ws-controller/src/controller/NodeProcessor.ts new file mode 100644 index 00000000..d5193b32 --- /dev/null +++ b/packages/ws-controller/src/controller/NodeProcessor.ts @@ -0,0 +1,105 @@ +/** + * @license + * Copyright 2025-2026 Open Home Foundation + * SPDX-License-Identifier: Apache-2.0 + */ + +import { CancelablePromise, Duration, Millis, Time, Timer } from "@matter/main"; +import { PeerAddress, PeerAddressSet } from "@matter/main/protocol"; + +/** + * Abstract base class for timer-driven periodic processing of registered nodes. + * Handles timer lifecycle, node registration, and the per-node processing loop + * with inter-node delay. Subclasses provide the actual processing logic. + */ +export abstract class NodeProcessor { + readonly #targetInterval: Duration; + readonly #timer: Timer; + #peers = new PeerAddressSet(); + #isProcessing = false; + #currentDelayPromise?: CancelablePromise; + #closed = false; + + constructor(timerName: string, initialDelay: number, targetInterval: number) { + this.#targetInterval = Millis(targetInterval); + this.#timer = Time.getTimer(timerName, Millis(initialDelay), () => void this.#processAll()); + } + + protected get closed(): boolean { + return this.#closed; + } + + /** Register a peer. Returns true if this was a new registration. */ + protected registerPeer(peer: PeerAddress): boolean { + const isNew = !this.#peers.has(peer); + this.#peers.add(peer); + return isNew; + } + + /** Unregister a peer. Stops the timer if no peers remain. Returns true if was registered. */ + protected unregisterPeer(peer: PeerAddress): boolean { + const removed = this.#peers.delete(peer); + if (removed && this.#peers.size === 0) { + this.#timer.stop(); + } + return removed; + } + + protected hasPeer(peer: PeerAddress): boolean { + return this.#peers.has(peer); + } + + /** Start the timer if there are registered peers and it is not already running. */ + protected scheduleIfNeeded(): void { + if (this.#peers.size === 0 || this.#closed) return; + if (this.#timer.isRunning || this.#isProcessing) return; + this.#timer.start(); + } + + async stop(): Promise { + this.#closed = true; + this.#currentDelayPromise?.cancel(new Error("Close")); + this.#timer.stop(); + } + + /** Returns false if this peer should be skipped during processing (e.g. not connected). */ + protected abstract shouldProcess(peer: PeerAddress): boolean; + + /** Perform work for a single peer. Must handle its own errors. */ + protected abstract processNode(peer: PeerAddress): Promise; + + /** Called after a full processing cycle completes. Override for cycle-complete logging. */ + protected onCycleComplete(_processedCount: number, _intervalFormatted: string): void {} + + async #processAll(): Promise { + if (this.#isProcessing) return; + + if (this.#timer.interval !== this.#targetInterval) { + this.#timer.interval = this.#targetInterval; + } + + this.#isProcessing = true; + let processedCount = 0; + + try { + const peers = Array.from(this.#peers); + for (let i = 0; i < peers.length; i++) { + const peer = peers[i]; + if (!this.#peers.has(peer) || !this.shouldProcess(peer)) continue; + processedCount++; + await this.processNode(peer); + if (i < peers.length - 1 && !this.#closed) { + this.#currentDelayPromise = Time.sleep("node-processor-delay", Millis(2_000)).finally(() => { + this.#currentDelayPromise = undefined; + }); + await this.#currentDelayPromise; + } + } + } finally { + this.#isProcessing = false; + this.scheduleIfNeeded(); + } + + this.onCycleComplete(processedCount, Duration.format(this.#timer.interval)); + } +} diff --git a/packages/ws-controller/src/controller/TimeSyncManager.ts b/packages/ws-controller/src/controller/TimeSyncManager.ts new file mode 100644 index 00000000..d152c61e --- /dev/null +++ b/packages/ws-controller/src/controller/TimeSyncManager.ts @@ -0,0 +1,159 @@ +/** + * @license + * Copyright 2025-2026 Open Home Foundation + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Handles time synchronization for nodes with the TimeSynchronization cluster. + * Syncs UTC time on two triggers: + * 1. Node connects/reconnects after startup (immediate, once startup window has elapsed) + * 2. Periodic resync every 24 hours + * + * A startup window of 30–60 minutes prevents syncing during initial node connection + * while the host's NTP is still stabilizing. This manager must only be enabled when + * the host time source is known to be reliable (see --enable-time-sync CLI flag). + */ + +import { Duration, Hours, Logger, Minutes } from "@matter/main"; +import { PeerAddress, PeerAddressMap } from "@matter/main/protocol"; +import { AttributesData } from "../types/CommandHandler.js"; +import { formatNodeId } from "../util/formatNodeId.js"; +import { NodeProcessor } from "./NodeProcessor.js"; + +const logger = Logger.get("TimeSyncManager"); + +// TimeSynchronization cluster ID (0x0038 = 56 decimal) +const TIME_SYNC_CLUSTER_ID = 0x0038; + +// Periodic resync interval: 24 hours +const RESYNC_INTERVAL = Hours(24); + +export interface TimeSyncConnector { + syncTime(peer: PeerAddress): Promise; + nodeConnected(peer: PeerAddress): boolean; +} + +/** + * Check if a node has the TimeSynchronization cluster based on its attribute cache. + * The cluster is always on endpoint 0 per the Matter spec. + */ +export function hasTimeSyncCluster(attributes: AttributesData): boolean { + const prefix = `0/${TIME_SYNC_CLUSTER_ID}/`; + for (const key of Object.keys(attributes)) { + if (key.startsWith(prefix)) { + return true; + } + } + return false; +} + +/** + * Manages time synchronization for nodes with the TimeSynchronization cluster. + */ +export class TimeSyncManager extends NodeProcessor { + readonly #connector: TimeSyncConnector; + // Tracks in-flight immediate syncs per node to prevent parallel syncs + #inFlightSyncs = new PeerAddressMap>(); + // True after the first periodic resync cycle, enabling immediate syncs on reconnect + #startupComplete = false; + + constructor(connector: TimeSyncConnector) { + // Startup window: random 30–60 minutes to stagger across server restarts and + // allow NTP to stabilize before pushing time to devices + const startupDelayMs = Minutes(30) + Math.floor(Math.random() * Minutes(30)); + super("time-sync-resync", startupDelayMs, RESYNC_INTERVAL); + this.#connector = connector; + } + + /** + * Register a node for time sync if it has the TimeSynchronization cluster. + * Call this after a node connects and its attributes are available. + * Immediate sync is skipped during the startup window to avoid traffic while + * the server is initializing all nodes. + */ + registerNode(peer: PeerAddress, attributes: AttributesData): void { + if (!hasTimeSyncCluster(attributes)) { + this.unregisterNode(peer); + return; + } + + if (this.registerPeer(peer)) { + logger.info(`Registered node ${formatNodeId(peer)} for time synchronization`); + } + + // Only sync immediately if the startup window has elapsed. During startup, + // the first periodic resync handles all nodes once NTP has stabilized. + if (this.#startupComplete) { + this.syncNode(peer); + } + + this.scheduleIfNeeded(); + } + + /** + * Unregister a node from time sync tracking. + */ + unregisterNode(peer: PeerAddress): void { + if (this.unregisterPeer(peer)) { + logger.info(`Unregistered node ${formatNodeId(peer)} from time synchronization`); + } + } + + /** + * Trigger an immediate time sync for a node (fire-and-forget with deduplication). + * Called externally when a timeFailure event is received from the node. + */ + syncNode(peer: PeerAddress): void { + if (this.closed || !this.hasPeer(peer) || !this.#connector.nodeConnected(peer)) return; + if (this.#inFlightSyncs.has(peer)) { + logger.debug(`Time sync already in progress for node ${formatNodeId(peer)}, skipping`); + return; + } + const promise = this.#connector + .syncTime(peer) + .then(() => logger.info(`Synced time on node ${formatNodeId(peer)}`)) + .catch(error => logger.warn(`Failed to sync time on node ${formatNodeId(peer)}:`, error)) + .finally(() => { + this.#inFlightSyncs.delete(peer); + }); + this.#inFlightSyncs.set(peer, promise); + } + + /** For testing: advance past the startup window to enable immediate syncs. */ + completeStartup(): void { + this.#startupComplete = true; + } + + override async stop(): Promise { + await super.stop(); + await Promise.allSettled(this.#inFlightSyncs.values()); + this.#inFlightSyncs.clear(); + logger.info("Time sync manager stopped"); + } + + protected override shouldProcess(peer: PeerAddress): boolean { + return this.#connector.nodeConnected(peer) && !this.#inFlightSyncs.has(peer); + } + + protected override async processNode(peer: PeerAddress): Promise { + try { + await this.#connector.syncTime(peer); + logger.info(`Periodic resync: synced time on node ${formatNodeId(peer)}`); + } catch (error) { + logger.warn(`Periodic resync: failed to sync time on node ${formatNodeId(peer)}:`, error); + } + } + + protected override onCycleComplete(processedCount: number, _intervalFormatted: string): void { + if (!this.#startupComplete) { + this.#startupComplete = true; + logger.info("Time sync startup window complete, immediate syncs enabled on reconnect"); + } + if (processedCount > 0) { + logger.info( + `Periodic resync complete: synced ${processedCount} nodes. Next resync in ${Duration.format(RESYNC_INTERVAL)}`, + ); + } + } +} diff --git a/packages/ws-controller/test/TimeSyncManagerTest.ts b/packages/ws-controller/test/TimeSyncManagerTest.ts new file mode 100644 index 00000000..ec20f2b6 --- /dev/null +++ b/packages/ws-controller/test/TimeSyncManagerTest.ts @@ -0,0 +1,299 @@ +/** + * @license + * Copyright 2025-2026 Open Home Foundation + * SPDX-License-Identifier: Apache-2.0 + */ + +import { FabricIndex, NodeId } from "@matter/main"; +import { PeerAddress, PeerAddressSet } from "@matter/main/protocol"; +import { hasTimeSyncCluster, TimeSyncConnector, TimeSyncManager } from "../src/controller/TimeSyncManager.js"; +import { AttributesData } from "../src/types/CommandHandler.js"; + +const TIME_SYNC_CLUSTER_ID = 0x0038; // 56 decimal +const ONE_MINUTE_MS = 60_000; +const ONE_DAY_MS = 24 * 60 * ONE_MINUTE_MS; + +// Startup delay is random 30–60 min; advancing 61 min always fires it +const PAST_STARTUP_MS = 61 * ONE_MINUTE_MS; + +const PEER_1 = PeerAddress({ fabricIndex: FabricIndex(1), nodeId: NodeId(1) }); +const PEER_2 = PeerAddress({ fabricIndex: FabricIndex(1), nodeId: NodeId(2) }); + +function makeTimeSyncAttrs(): AttributesData { + return { [`0/${TIME_SYNC_CLUSTER_ID}/0`]: 1 }; +} + +class StubConnector implements TimeSyncConnector { + readonly syncCalls: PeerAddress[] = []; + private readonly _connected = new PeerAddressSet(); + slowSync = false; + readonly syncResolvers: Array<() => void> = []; + + setConnected(peer: PeerAddress): void { + this._connected.add(peer); + } + + nodeConnected(peer: PeerAddress): boolean { + return this._connected.has(peer); + } + + async syncTime(peer: PeerAddress): Promise { + if (this.slowSync) { + await new Promise(resolve => this.syncResolvers.push(resolve)); + } + this.syncCalls.push(peer); + } + + resolveAll(): void { + const resolvers = this.syncResolvers.splice(0); + resolvers.forEach(r => r()); + } +} + +describe("hasTimeSyncCluster", () => { + it("returns true when TimeSynchronization cluster attributes are present", () => { + expect(hasTimeSyncCluster({ [`0/${TIME_SYNC_CLUSTER_ID}/0`]: 1 })).to.equal(true); + }); + + it("returns true for any attribute index on the cluster", () => { + expect(hasTimeSyncCluster({ [`0/${TIME_SYNC_CLUSTER_ID}/255`]: "x" })).to.equal(true); + }); + + it("returns false when no attributes are present", () => { + expect(hasTimeSyncCluster({})).to.equal(false); + }); + + it("returns false for attributes on a different cluster", () => { + expect(hasTimeSyncCluster({ "0/40/0": 1 })).to.equal(false); + }); + + it("only matches endpoint 0 per Matter spec", () => { + expect(hasTimeSyncCluster({ [`1/${TIME_SYNC_CLUSTER_ID}/0`]: 1 })).to.equal(false); + }); +}); + +describe("TimeSyncManager", () => { + let connector: StubConnector; + let manager: TimeSyncManager; + + beforeEach(() => { + MockTime.reset(); + connector = new StubConnector(); + manager = new TimeSyncManager(connector); + }); + + afterEach(async () => { + connector.resolveAll(); // unblock any pending slow syncs so stop() doesn't hang + await manager.stop(); + }); + + describe("registerNode", () => { + it("does not sync during the startup window even when connected", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("does not sync when node is not connected, even after startup", async () => { + manager.completeStartup(); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("syncs immediately once startupComplete is set and node is connected", async () => { + connector.setConnected(PEER_1); + manager.completeStartup(); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + }); + + it("does not sync for a node without the TimeSynchronization cluster", async () => { + connector.setConnected(PEER_1); + manager.completeStartup(); + manager.registerNode(PEER_1, { "0/40/0": 1 }); // no time sync cluster + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("unregisters the peer when re-registered without TimeSynchronization cluster", async () => { + manager.registerNode(PEER_1, makeTimeSyncAttrs()); // register + manager.registerNode(PEER_1, { "0/40/0": 1 }); // no longer has cluster + + manager.completeStartup(); + connector.setConnected(PEER_1); + manager.syncNode(PEER_1); // should be no-op since unregistered + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + }); + + describe("syncNode", () => { + beforeEach(() => { + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.completeStartup(); + }); + + it("calls syncTime when peer is registered and connected", async () => { + connector.setConnected(PEER_1); + manager.syncNode(PEER_1); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + }); + + it("does not call syncTime when peer is not connected", async () => { + manager.syncNode(PEER_1); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("does not call syncTime for an unregistered peer", async () => { + connector.setConnected(PEER_2); + manager.syncNode(PEER_2); // PEER_2 not registered + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("deduplicates when a sync is already in flight", async () => { + connector.slowSync = true; + connector.setConnected(PEER_1); + + manager.syncNode(PEER_1); // starts in-flight sync + manager.syncNode(PEER_1); // duplicate — dropped + manager.syncNode(PEER_1); // duplicate — dropped + + connector.resolveAll(); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + }); + + it("allows a new sync after the previous one completes", async () => { + connector.slowSync = true; + connector.setConnected(PEER_1); + + manager.syncNode(PEER_1); + connector.resolveAll(); + await MockTime.yield3(); + + manager.syncNode(PEER_1); + connector.resolveAll(); + await MockTime.yield3(); + + expect(connector.syncCalls.length).to.equal(2); + }); + }); + + describe("unregisterNode", () => { + it("makes syncNode a no-op for the removed peer", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.unregisterNode(PEER_1); + manager.completeStartup(); + manager.syncNode(PEER_1); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + }); + + describe("periodic resync (via NodeProcessor timer)", () => { + it("syncs connected nodes after the startup delay fires", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + + await MockTime.advance(PAST_STARTUP_MS); + await MockTime.yield3(); + + expect(connector.syncCalls.length).to.equal(1); + }); + + it("skips disconnected nodes during resync", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.registerNode(PEER_2, makeTimeSyncAttrs()); // PEER_2 not connected + + await MockTime.advance(PAST_STARTUP_MS); + await MockTime.yield3(); + + expect(connector.syncCalls.length).to.equal(1); + expect(connector.syncCalls[0]).to.deep.equal(PEER_1); + }); + + it("skips peers that already have an in-flight sync", async () => { + connector.slowSync = true; + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.completeStartup(); + + manager.syncNode(PEER_1); // start in-flight sync + await MockTime.yield(); + + await MockTime.advance(PAST_STARTUP_MS); // periodic cycle fires + await MockTime.yield3(); + + // Only 1 syncTime call total — the periodic cycle skipped PEER_1 + connector.resolveAll(); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + }); + + it("enables immediate syncs on reconnect after the startup cycle completes", async () => { + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + + await MockTime.advance(PAST_STARTUP_MS); // first cycle, no connected nodes + await MockTime.yield3(); + + // After startup, re-registering a connected node syncs immediately + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await MockTime.yield3(); + + expect(connector.syncCalls.length).to.equal(1); + }); + + it("resyncs again after 24 hours", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + + await MockTime.advance(PAST_STARTUP_MS); // first cycle + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + + await MockTime.advance(ONE_DAY_MS); // 24h resync + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(2); + }); + }); + + describe("stop", () => { + it("completes cleanly when no nodes are registered", async () => { + await manager.stop(); + }); + + it("completes cleanly when nodes are registered but no syncs are in flight", async () => { + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await manager.stop(); + }); + + it("awaits in-flight syncs before completing", async () => { + connector.slowSync = true; + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.completeStartup(); + manager.syncNode(PEER_1); + + let stopped = false; + const stopPromise = manager.stop().then(() => { + stopped = true; + }); + + await MockTime.yield(); + expect(stopped).to.equal(false); // still waiting on in-flight sync + + connector.resolveAll(); + await stopPromise; + expect(stopped).to.equal(true); + }); + }); +});