From e72bf8d5f5ab76c62f44eb116e1f92fdff5d6b24 Mon Sep 17 00:00:00 2001 From: Mark van Proctor <6936351+markvp@users.noreply.github.com> Date: Sat, 2 May 2026 20:11:19 -0700 Subject: [PATCH 1/5] Add TimeSyncManager to sync UTC time on nodes with TimeSynchronization cluster Introduces opt-in time synchronization via --enable-time-sync / ENABLE_TIME_SYNC. Adds NodeProcessor abstract base class for timer-driven periodic node processing, and TimeSyncManager which extends it to sync UTC time using PeerAddress (matching the CustomClusterPoller upstream migration). Syncs on reconnect after startup window and periodically every 24 hours. Co-Authored-By: Claude Sonnet 4.6 --- packages/matter-server/src/MatterServer.ts | 1 + packages/matter-server/src/cli.ts | 14 ++ .../controller/ControllerCommandHandler.ts | 84 +++++++-- .../src/controller/MatterController.ts | 5 + .../src/controller/NodeProcessor.ts | 105 ++++++++++++ .../src/controller/TimeSyncManager.ts | 159 ++++++++++++++++++ 6 files changed, 358 insertions(+), 10 deletions(-) create mode 100644 packages/ws-controller/src/controller/NodeProcessor.ts create mode 100644 packages/ws-controller/src/controller/TimeSyncManager.ts diff --git a/packages/matter-server/src/MatterServer.ts b/packages/matter-server/src/MatterServer.ts index 7b5670d5..bf3ecfed 100644 --- a/packages/matter-server/src/MatterServer.ts +++ b/packages/matter-server/src/MatterServer.ts @@ -151,6 +151,7 @@ async function start() { disableOtaProvider: cliOptions.disableOta, serverId: legacyData.serverId, serverVersion: MATTER_SERVER_VERSION, + enableTimeSync: cliOptions.enableTimeSync, }, legacyServerData, ); diff --git a/packages/matter-server/src/cli.ts b/packages/matter-server/src/cli.ts index 2bf40221..d6a5de63 100644 --- a/packages/matter-server/src/cli.ts +++ b/packages/matter-server/src/cli.ts @@ -62,6 +62,9 @@ export interface CliOptions { disableOta: boolean; otaProviderDir: string | null; + // Time synchronization configuration + enableTimeSync: boolean; + // Dashboard configuration disableDashboard: boolean; productionMode: boolean; @@ -164,6 +167,16 @@ export function parseCliArgs(argv?: string[]): CliOptions { .env("DISABLE_OTA"), ) .addOption(new Option("--ota-provider-dir ", "Directory for OTA Provider files").env("OTA_PROVIDER_DIR")) + .addOption( + new Option( + "--enable-time-sync [value]", + "Enable time synchronization for nodes with the TimeSynchronization cluster. Only enable when host NTP is reliable.", + ) + .argParser(parseBooleanEnv) + .preset(true) + .default(false) + .env("ENABLE_TIME_SYNC"), + ) .addOption( new Option("--disable-dashboard [value]", "Disable the web dashboard") .argParser(parseBooleanEnv) @@ -245,6 +258,7 @@ export function parseCliArgs(argv?: string[]): CliOptions { bluetoothAdapter: opts.bluetoothAdapter ?? null, disableOta: opts.disableOta, otaProviderDir: opts.otaProviderDir ?? null, + enableTimeSync: opts.enableTimeSync, disableDashboard: opts.disableDashboard, productionMode: opts.productionMode, }; diff --git a/packages/ws-controller/src/controller/ControllerCommandHandler.ts b/packages/ws-controller/src/controller/ControllerCommandHandler.ts index 8ce8b55e..10f09d8a 100644 --- a/packages/ws-controller/src/controller/ControllerCommandHandler.ts +++ b/packages/ws-controller/src/controller/ControllerCommandHandler.ts @@ -18,6 +18,7 @@ import { Minutes, NodeId, Observable, + ObserverGroup, Seconds, ServerAddress, ServerAddressUdp, @@ -35,6 +36,7 @@ import { BridgedDeviceBasicInformation, GeneralCommissioning, OperationalCredentials, + TimeSynchronization, } from "@matter/main/clusters"; import { DecodedAttributeReportValue, @@ -99,12 +101,17 @@ import { formatNodeId } from "../util/formatNodeId.js"; import { pingIp } from "../util/network.js"; import { CustomClusterPoller } from "./CustomClusterPoller.js"; import { Nodes } from "./Nodes.js"; +import { TimeSyncManager } from "./TimeSyncManager.js"; const logger = Logger.get("ControllerCommandHandler"); /** After this duration in Reconnecting state, declare the node unavailable */ const RECONNECT_TIMEOUT = Minutes(3); +// timeFailure event ID within TimeSynchronization cluster (0x0038) +const TIME_SYNC_CLUSTER_ID = 0x0038; +const TIME_FAILURE_EVENT_ID = 0x03; + /** * Cluster IDs whose attribute changes should trigger a full node_updated broadcast. * BasicInformation (0x28) covers firmware version, product name, etc. @@ -156,6 +163,10 @@ export class ControllerCommandHandler { #availableUpdates = new Map(); /** Poller for custom cluster attributes (Eve energy, etc.) */ #customClusterPoller: CustomClusterPoller; + /** Manages time synchronization for nodes with the TimeSynchronization cluster */ + #timeSyncManager?: TimeSyncManager; + /** Per-node ObserverGroups for cleanup on decommission */ + #nodeObservers = new Map(); /** Per-node timers that fire when Reconnecting state exceeds the timeout */ #reconnectTimers = new Map(); /** Track in-flight invoke-commands for deduplication across all WebSocket connections */ @@ -175,7 +186,12 @@ export class ControllerCommandHandler { }; #peers?: PeerSet; - constructor(controllerInstance: CommissioningController, bleEnabled: boolean, otaEnabled: boolean) { + constructor( + controllerInstance: CommissioningController, + bleEnabled: boolean, + otaEnabled: boolean, + timeSyncEnabled = false, + ) { this.#controller = controllerInstance; this.#bleEnabled = bleEnabled; @@ -189,6 +205,14 @@ export class ControllerCommandHandler { handleReadAttributes: (peer, paths, fabricFiltered) => this.handleReadAttributes(peer.nodeId, paths, fabricFiltered), }); + + if (timeSyncEnabled) { + logger.info("Time synchronization enabled"); + this.#timeSyncManager = new TimeSyncManager({ + syncTime: peer => this.#syncNodeTime(peer.nodeId), + nodeConnected: peer => !!(this.#nodes.has(peer.nodeId) && this.#nodes.get(peer.nodeId).isConnected), + }); + } } /** @@ -283,12 +307,29 @@ export class ControllerCommandHandler { } } + /** + * Send a setUtcTime command to a node's TimeSynchronization cluster. + */ + async #syncNodeTime(nodeId: NodeId): Promise { + const client = this.#nodes.clusterClientByIdFor(nodeId, EndpointNumber(0), TimeSynchronization.Cluster.id); + await client.commands.setUtcTime({ + utcTime: Time.nowMs * 1000, + granularity: TimeSynchronization.Granularity.MillisecondsGranularity, + timeSource: TimeSynchronization.TimeSource.Admin, + }); + } + async close() { for (const timer of this.#reconnectTimers.values()) { timer.stop(); } this.#reconnectTimers.clear(); await this.#customClusterPoller.stop(); + await this.#timeSyncManager?.stop(); + for (const observers of this.#nodeObservers.values()) { + observers.close(); + } + this.#nodeObservers.clear(); if (!this.#started) { return; } @@ -299,11 +340,15 @@ export class ControllerCommandHandler { const node = await this.#controller.getNode(nodeId); const attributeCache = this.#nodes.attributeCache; + // Per-node ObserverGroup so all subscriptions are cleaned up on decommission + const nodeObservers = new ObserverGroup(); + this.#nodeObservers.set(nodeId, nodeObservers); + // Wire all Events to the Event emitters // Track if a BasicInformation or BridgedDeviceBasicInformation attribute changed during // a subscription batch. When the batch ends (connectionAlive), emit a full node_updated. let basicInfoChangedInBatch = false; - node.events.attributeChanged.on(data => { + nodeObservers.on(node.events.attributeChanged, data => { // Update the attribute cache with the new value in WebSocket format attributeCache.updateAttribute(nodeId, data); // Then emit the event for listeners @@ -313,21 +358,33 @@ export class ControllerCommandHandler { basicInfoChangedInBatch = true; } }); - node.events.connectionAlive.on(() => { + nodeObservers.on(node.events.connectionAlive, () => { if (basicInfoChangedInBatch) { basicInfoChangedInBatch = false; logger.info(`Node ${this.formatNode(nodeId)} basic information changed, sending full node_updated`); this.events.nodeStructureChanged.emit(nodeId); } }); - node.events.eventTriggered.on(data => this.events.eventChanged.emit(nodeId, data)); - node.events.stateChanged.on(state => { + nodeObservers.on(node.events.eventTriggered, data => { + this.events.eventChanged.emit(nodeId, data); + // Filter timeFailure events to trigger time sync + if ( + this.#timeSyncManager !== undefined && + data.path.clusterId === TIME_SYNC_CLUSTER_ID && + data.path.eventId === TIME_FAILURE_EVENT_ID + ) { + logger.debug(`Received timeFailure event from node ${this.formatNode(nodeId)}, triggering time sync`); + this.#timeSyncManager.syncNode(this.#peerOf(nodeId)); + } + }); + nodeObservers.on(node.events.stateChanged, state => { // Only refresh cache on Connected state if (state === NodeStates.Connected) { attributeCache.update(node); const attributes = attributeCache.get(nodeId); if (attributes) { this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes); + this.#timeSyncManager?.registerNode(this.#peerOf(nodeId), attributes); } } @@ -367,7 +424,7 @@ export class ControllerCommandHandler { this.events.nodeAvailabilityChanged.emit(nodeId, result.available); } }); - node.events.structureChanged.on(() => { + nodeObservers.on(node.events.structureChanged, () => { // Structure changed means endpoints may have been added/removed, refresh cache if (node.isConnected) { attributeCache.update(node); @@ -380,12 +437,16 @@ export class ControllerCommandHandler { this.events.nodeEndpointAdded.emit(nodeId, endpointId); } }); - node.events.decommissioned.on(() => { + nodeObservers.on(node.events.decommissioned, () => { this.#cleanupNodeAfterRemoval(nodeId); this.events.nodeDecommissioned.emit(nodeId); }); - node.events.nodeEndpointAdded.on(endpointId => this.#nodes.queueEndpointAdded(nodeId, endpointId)); - node.events.nodeEndpointRemoved.on(endpointId => this.events.nodeEndpointRemoved.emit(nodeId, endpointId)); + nodeObservers.on(node.events.nodeEndpointAdded, endpointId => + this.#nodes.queueEndpointAdded(nodeId, endpointId), + ); + nodeObservers.on(node.events.nodeEndpointRemoved, endpointId => + this.events.nodeEndpointRemoved.emit(nodeId, endpointId), + ); // Store the node for direct access this.#nodes.set(nodeId, node); @@ -395,10 +456,10 @@ export class ControllerCommandHandler { // Initialize attribute cache if node is already initialized if (node.initialized) { attributeCache.add(node); - // Register for custom cluster polling (e.g., Eve energy) const attributes = attributeCache.get(nodeId); if (attributes) { this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes); + this.#timeSyncManager?.registerNode(this.#peerOf(nodeId), attributes); } } @@ -1051,8 +1112,11 @@ export class ControllerCommandHandler { #cleanupNodeAfterRemoval(nodeId: NodeId) { this.#reconnectTimers.get(nodeId)?.stop(); this.#reconnectTimers.delete(nodeId); + this.#nodeObservers.get(nodeId)?.close(); + this.#nodeObservers.delete(nodeId); this.#nodes.delete(nodeId); this.#customClusterPoller.unregisterNode(this.#peerOf(nodeId)); + this.#timeSyncManager?.unregisterNode(this.#peerOf(nodeId)); this.#availableUpdates.delete(nodeId); } diff --git a/packages/ws-controller/src/controller/MatterController.ts b/packages/ws-controller/src/controller/MatterController.ts index 16da9c23..684a5112 100644 --- a/packages/ws-controller/src/controller/MatterController.ts +++ b/packages/ws-controller/src/controller/MatterController.ts @@ -47,6 +47,8 @@ export interface MatterControllerOptions { serverId?: string; /** Server version string (e.g., "0.2.10" or "0.2.10-alpha.0"). Used for BasicInformation cluster. */ serverVersion?: string; + /** Enable time synchronization for nodes with the TimeSynchronization cluster. Only enable when host NTP is reliable. */ + enableTimeSync?: boolean; } /** @@ -76,6 +78,7 @@ export class MatterController { #legacyCommissionedDates?: Map; #enableTestNetDcl = false; #disableOtaProvider = true; + #enableTimeSync = false; readonly #borderRouterDiscovery: BorderRouterDiscovery; static async create( @@ -152,6 +155,7 @@ export class MatterController { this.#serverVersion = options.serverVersion ?? "0.0.0"; this.#enableTestNetDcl = options.enableTestNetDcl ?? this.#enableTestNetDcl; this.#disableOtaProvider = options.disableOtaProvider ?? this.#disableOtaProvider; + this.#enableTimeSync = options.enableTimeSync ?? this.#enableTimeSync; } protected async initialize( @@ -192,6 +196,7 @@ export class MatterController { this.#controllerInstance, this.#env.vars.get("ble.enable", false), !this.#disableOtaProvider, + this.#enableTimeSync, ); this.#commandHandler.events.started.once(async () => { diff --git a/packages/ws-controller/src/controller/NodeProcessor.ts b/packages/ws-controller/src/controller/NodeProcessor.ts new file mode 100644 index 00000000..d5193b32 --- /dev/null +++ b/packages/ws-controller/src/controller/NodeProcessor.ts @@ -0,0 +1,105 @@ +/** + * @license + * Copyright 2025-2026 Open Home Foundation + * SPDX-License-Identifier: Apache-2.0 + */ + +import { CancelablePromise, Duration, Millis, Time, Timer } from "@matter/main"; +import { PeerAddress, PeerAddressSet } from "@matter/main/protocol"; + +/** + * Abstract base class for timer-driven periodic processing of registered nodes. + * Handles timer lifecycle, node registration, and the per-node processing loop + * with inter-node delay. Subclasses provide the actual processing logic. + */ +export abstract class NodeProcessor { + readonly #targetInterval: Duration; + readonly #timer: Timer; + #peers = new PeerAddressSet(); + #isProcessing = false; + #currentDelayPromise?: CancelablePromise; + #closed = false; + + constructor(timerName: string, initialDelay: number, targetInterval: number) { + this.#targetInterval = Millis(targetInterval); + this.#timer = Time.getTimer(timerName, Millis(initialDelay), () => void this.#processAll()); + } + + protected get closed(): boolean { + return this.#closed; + } + + /** Register a peer. Returns true if this was a new registration. */ + protected registerPeer(peer: PeerAddress): boolean { + const isNew = !this.#peers.has(peer); + this.#peers.add(peer); + return isNew; + } + + /** Unregister a peer. Stops the timer if no peers remain. Returns true if was registered. */ + protected unregisterPeer(peer: PeerAddress): boolean { + const removed = this.#peers.delete(peer); + if (removed && this.#peers.size === 0) { + this.#timer.stop(); + } + return removed; + } + + protected hasPeer(peer: PeerAddress): boolean { + return this.#peers.has(peer); + } + + /** Start the timer if there are registered peers and it is not already running. */ + protected scheduleIfNeeded(): void { + if (this.#peers.size === 0 || this.#closed) return; + if (this.#timer.isRunning || this.#isProcessing) return; + this.#timer.start(); + } + + async stop(): Promise { + this.#closed = true; + this.#currentDelayPromise?.cancel(new Error("Close")); + this.#timer.stop(); + } + + /** Returns false if this peer should be skipped during processing (e.g. not connected). */ + protected abstract shouldProcess(peer: PeerAddress): boolean; + + /** Perform work for a single peer. Must handle its own errors. */ + protected abstract processNode(peer: PeerAddress): Promise; + + /** Called after a full processing cycle completes. Override for cycle-complete logging. */ + protected onCycleComplete(_processedCount: number, _intervalFormatted: string): void {} + + async #processAll(): Promise { + if (this.#isProcessing) return; + + if (this.#timer.interval !== this.#targetInterval) { + this.#timer.interval = this.#targetInterval; + } + + this.#isProcessing = true; + let processedCount = 0; + + try { + const peers = Array.from(this.#peers); + for (let i = 0; i < peers.length; i++) { + const peer = peers[i]; + if (!this.#peers.has(peer) || !this.shouldProcess(peer)) continue; + processedCount++; + await this.processNode(peer); + if (i < peers.length - 1 && !this.#closed) { + this.#currentDelayPromise = Time.sleep("node-processor-delay", Millis(2_000)).finally(() => { + this.#currentDelayPromise = undefined; + }); + await this.#currentDelayPromise; + } + } + } finally { + this.#isProcessing = false; + this.scheduleIfNeeded(); + } + + this.onCycleComplete(processedCount, Duration.format(this.#timer.interval)); + } +} diff --git a/packages/ws-controller/src/controller/TimeSyncManager.ts b/packages/ws-controller/src/controller/TimeSyncManager.ts new file mode 100644 index 00000000..a580df6c --- /dev/null +++ b/packages/ws-controller/src/controller/TimeSyncManager.ts @@ -0,0 +1,159 @@ +/** + * @license + * Copyright 2025-2026 Open Home Foundation + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Handles time synchronization for nodes with the TimeSynchronization cluster. + * Syncs UTC time on two triggers: + * 1. Node connects/reconnects after startup (immediate, once startup window has elapsed) + * 2. Periodic resync every 24 hours + * + * A startup window of 30–60 minutes prevents syncing during initial node connection + * while the host's NTP is still stabilizing. This manager must only be enabled when + * the host time source is known to be reliable (see --enable-time-sync CLI flag). + */ + +import { Duration, Hours, Logger, Minutes } from "@matter/main"; +import { PeerAddress, PeerAddressMap } from "@matter/main/protocol"; +import { AttributesData } from "../types/CommandHandler.js"; +import { formatNodeId } from "../util/formatNodeId.js"; +import { NodeProcessor } from "./NodeProcessor.js"; + +const logger = Logger.get("TimeSyncManager"); + +// TimeSynchronization cluster ID (0x0038 = 56 decimal) +const TIME_SYNC_CLUSTER_ID = 0x0038; + +// Periodic resync interval: 24 hours +const RESYNC_INTERVAL = Hours(24); + +export interface TimeSyncConnector { + syncTime(peer: PeerAddress): Promise; + nodeConnected(peer: PeerAddress): boolean; +} + +/** + * Check if a node has the TimeSynchronization cluster based on its attribute cache. + * The cluster is always on endpoint 0 per the Matter spec. + */ +export function hasTimeSyncCluster(attributes: AttributesData): boolean { + const prefix = `0/${TIME_SYNC_CLUSTER_ID}/`; + for (const key of Object.keys(attributes)) { + if (key.startsWith(prefix)) { + return true; + } + } + return false; +} + +/** + * Manages time synchronization for nodes with the TimeSynchronization cluster. + */ +export class TimeSyncManager extends NodeProcessor { + readonly #connector: TimeSyncConnector; + // Tracks in-flight immediate syncs per node to prevent parallel syncs + #inFlightSyncs = new PeerAddressMap>(); + // True after the first periodic resync cycle, enabling immediate syncs on reconnect + #startupComplete = false; + + constructor(connector: TimeSyncConnector) { + // Startup window: random 30–60 minutes to stagger across server restarts and + // allow NTP to stabilize before pushing time to devices + const startupDelayMs = Minutes(30) + Math.random() * Minutes(30); + super("time-sync-resync", startupDelayMs, RESYNC_INTERVAL); + this.#connector = connector; + } + + /** + * Register a node for time sync if it has the TimeSynchronization cluster. + * Call this after a node connects and its attributes are available. + * Immediate sync is skipped during the startup window to avoid traffic while + * the server is initializing all nodes. + */ + registerNode(peer: PeerAddress, attributes: AttributesData): void { + if (!hasTimeSyncCluster(attributes)) { + this.unregisterNode(peer); + return; + } + + if (this.registerPeer(peer)) { + logger.info(`Registered node ${formatNodeId(peer)} for time synchronization`); + } + + // Only sync immediately if the startup window has elapsed. During startup, + // the first periodic resync handles all nodes once NTP has stabilized. + if (this.#startupComplete) { + this.syncNode(peer); + } + + this.scheduleIfNeeded(); + } + + /** + * Unregister a node from time sync tracking. + */ + unregisterNode(peer: PeerAddress): void { + if (this.unregisterPeer(peer)) { + logger.info(`Unregistered node ${formatNodeId(peer)} from time synchronization`); + } + } + + /** + * Trigger an immediate time sync for a node (fire-and-forget with deduplication). + * Called externally when a timeFailure event is received from the node. + */ + syncNode(peer: PeerAddress): void { + if (this.closed || !this.hasPeer(peer) || !this.#connector.nodeConnected(peer)) return; + if (this.#inFlightSyncs.has(peer)) { + logger.debug(`Time sync already in progress for node ${formatNodeId(peer)}, skipping`); + return; + } + const promise = this.#connector + .syncTime(peer) + .then(() => logger.info(`Synced time on node ${formatNodeId(peer)}`)) + .catch(error => logger.warn(`Failed to sync time on node ${formatNodeId(peer)}:`, error)) + .finally(() => { + this.#inFlightSyncs.delete(peer); + }); + this.#inFlightSyncs.set(peer, promise); + } + + /** For testing: advance past the startup window to enable immediate syncs. */ + completeStartup(): void { + this.#startupComplete = true; + } + + override async stop(): Promise { + await super.stop(); + await Promise.allSettled(this.#inFlightSyncs.values()); + this.#inFlightSyncs.clear(); + logger.info("Time sync manager stopped"); + } + + protected override shouldProcess(peer: PeerAddress): boolean { + return this.#connector.nodeConnected(peer) && !this.#inFlightSyncs.has(peer); + } + + protected override async processNode(peer: PeerAddress): Promise { + try { + await this.#connector.syncTime(peer); + logger.info(`Periodic resync: synced time on node ${formatNodeId(peer)}`); + } catch (error) { + logger.warn(`Periodic resync: failed to sync time on node ${formatNodeId(peer)}:`, error); + } + } + + protected override onCycleComplete(processedCount: number, _intervalFormatted: string): void { + if (!this.#startupComplete) { + this.#startupComplete = true; + logger.info("Time sync startup window complete, immediate syncs enabled on reconnect"); + } + if (processedCount > 0) { + logger.info( + `Periodic resync complete: synced ${processedCount} nodes. Next resync in ${Duration.format(RESYNC_INTERVAL)}`, + ); + } + } +} From 32a81ef1dc64a1ef1a3f65f0ecfb8b9387b865b8 Mon Sep 17 00:00:00 2001 From: Mark van Proctor <6936351+markvp@users.noreply.github.com> Date: Sat, 2 May 2026 20:19:44 -0700 Subject: [PATCH 2/5] Fix #syncNodeTime to use invokeCommand instead of non-existent clusterClientByIdFor Co-Authored-By: Claude Sonnet 4.6 --- .../src/controller/ControllerCommandHandler.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/ws-controller/src/controller/ControllerCommandHandler.ts b/packages/ws-controller/src/controller/ControllerCommandHandler.ts index 10f09d8a..2750a07b 100644 --- a/packages/ws-controller/src/controller/ControllerCommandHandler.ts +++ b/packages/ws-controller/src/controller/ControllerCommandHandler.ts @@ -311,11 +311,15 @@ export class ControllerCommandHandler { * Send a setUtcTime command to a node's TimeSynchronization cluster. */ async #syncNodeTime(nodeId: NodeId): Promise { - const client = this.#nodes.clusterClientByIdFor(nodeId, EndpointNumber(0), TimeSynchronization.Cluster.id); - await client.commands.setUtcTime({ - utcTime: Time.nowMs * 1000, - granularity: TimeSynchronization.Granularity.MillisecondsGranularity, - timeSource: TimeSynchronization.TimeSource.Admin, + await this.#invokeCommand(this.#nodes.get(nodeId).node, { + endpoint: EndpointNumber(0), + cluster: TimeSynchronization.Cluster, + command: "setUtcTime", + fields: { + utcTime: Time.nowMs * 1000, + granularity: TimeSynchronization.Granularity.MillisecondsGranularity, + timeSource: TimeSynchronization.TimeSource.Admin, + }, }); } From 78d1d3a9a72ef9384ff969caafe8fa645036a380 Mon Sep 17 00:00:00 2001 From: Mark van Proctor <6936351+markvp@users.noreply.github.com> Date: Sat, 2 May 2026 20:31:38 -0700 Subject: [PATCH 3/5] Refactor CustomClusterPoller to extend NodeProcessor base class Eliminates duplicated timer/loop boilerplate (~50 lines) by extending the shared NodeProcessor abstraction. Per-node attribute paths stay in the subclass; shouldProcess, processNode, and onCycleComplete implement the polling logic. Co-Authored-By: Claude Sonnet 4.6 --- .../src/controller/CustomClusterPoller.ts | 135 ++++-------------- 1 file changed, 29 insertions(+), 106 deletions(-) diff --git a/packages/ws-controller/src/controller/CustomClusterPoller.ts b/packages/ws-controller/src/controller/CustomClusterPoller.ts index 92b2eec2..800b6a6e 100644 --- a/packages/ws-controller/src/controller/CustomClusterPoller.ts +++ b/packages/ws-controller/src/controller/CustomClusterPoller.ts @@ -10,10 +10,11 @@ * a custom cluster without standard Matter subscription support. */ -import { CancelablePromise, Duration, Logger, Millis, Time, Timer } from "@matter/main"; +import { Logger } from "@matter/main"; import { PeerAddress, PeerAddressMap } from "@matter/main/protocol"; import { AttributesData } from "../types/CommandHandler.js"; import { formatNodeId } from "../util/formatNodeId.js"; +import { NodeProcessor } from "./NodeProcessor.js"; const logger = Logger.get("CustomClusterPoller"); @@ -37,8 +38,8 @@ const ELECTRICAL_POWER_MEASUREMENT_CLUSTER_ID = 0x0090; // 144 // Polling interval in milliseconds (60 seconds) const POLLING_INTERVAL_MS = 60_000; -// Maximum initial delay in milliseconds (random 30-60s to stagger startup) -const MAX_INITIAL_DELAY_MS = 30_000; +// Initial delay range: random 30-60s to stagger startup +const INITIAL_DELAY_MS = 30_000; // Attribute path format: endpoint/cluster/attribute type AttributePath = string; @@ -117,19 +118,14 @@ export function checkPolledAttributes(attributes: AttributesData): Set>(); - #pollerTimer: Timer; - #attributeReader: NodeAttributeReader; - #isPolling = false; - #currentDelayPromise?: CancelablePromise; + readonly #attributeReader: NodeAttributeReader; #currentReadPromise?: Promise; - #closed = false; constructor(attributeReader: NodeAttributeReader) { + super("eve-poller", INITIAL_DELAY_MS + Math.random() * INITIAL_DELAY_MS, POLLING_INTERVAL_MS); this.#attributeReader = attributeReader; - const delay = Millis(MAX_INITIAL_DELAY_MS + Math.random() * MAX_INITIAL_DELAY_MS); - this.#pollerTimer = Time.getTimer("eve-poller", delay, () => this.#pollAllNodes()); } /** @@ -140,126 +136,47 @@ export class CustomClusterPoller { const attributesToPoll = checkPolledAttributes(attributes); if (attributesToPoll.size === 0) { - // Remove from polling if it was previously registered this.unregisterNode(peer); return; } this.#polledAttributes.set(peer, attributesToPoll); - logger.info( - `Registered node ${formatNodeId(peer)} for custom attribute polling: ${Array.from(attributesToPoll).join(", ")}`, - ); + if (this.registerPeer(peer)) { + logger.info( + `Registered node ${formatNodeId(peer)} for custom attribute polling: ${Array.from(attributesToPoll).join(", ")}`, + ); + } - // Start the poller if not already running - this.#schedulePoller(); + this.scheduleIfNeeded(); } /** * Unregister a node from polling (e.g., when decommissioned or disconnected). */ unregisterNode(peer: PeerAddress): void { - if (this.#polledAttributes.delete(peer)) { + this.#polledAttributes.delete(peer); + if (this.unregisterPeer(peer)) { logger.info(`Unregistered node ${formatNodeId(peer)} from custom attribute polling`); } - if (this.#polledAttributes.size === 0) { - this.#pollerTimer.stop(); - } } - /** - * Stop all polling and cleanup. Awaits any in-flight read operation. - */ - async stop(): Promise { - this.#closed = true; - this.#currentDelayPromise?.cancel(new Error("Close")); - this.#pollerTimer?.stop(); - this.#polledAttributes.clear(); + override async stop(): Promise { + await super.stop(); if (this.#currentReadPromise) { await this.#currentReadPromise; } + this.#polledAttributes.clear(); logger.info("Custom attribute poller stopped"); } - /** - * Schedule the next polling cycle. - * Uses a random initial delay (0-30s) on first run to stagger startup, - * then polls every 30s thereafter. - */ - #schedulePoller(): void { - // No schedule if no nodes to poll - if (this.#polledAttributes.size === 0 || this.#closed) { - return; - } - - // Don't schedule if already scheduled - if (this.#pollerTimer?.isRunning || this.#isPolling) { - return; - } - - // Set the new interval - this.#pollerTimer.start(); + protected override shouldProcess(peer: PeerAddress): boolean { + return this.#attributeReader.nodeConnected(peer); } - /** - * Poll all registered nodes for their custom attributes. - */ - async #pollAllNodes(): Promise { - if (this.#isPolling) { - // Already polling, schedule next cycle - return; - } + protected override async processNode(peer: PeerAddress): Promise { + const attributePaths = this.#polledAttributes.get(peer); + if (!attributePaths) return; - const targetInterval = Millis(POLLING_INTERVAL_MS); - if (this.#pollerTimer.interval !== targetInterval) { - this.#pollerTimer.interval = targetInterval; - } - - this.#isPolling = true; - - let polledNodes = 0; - try { - const entries = Array.from(this.#polledAttributes.entries()); - for (let i = 0; i < entries.length; i++) { - if (this.#closed) { - break; - } - const [peer, attributePaths] = entries[i]; - if (!this.#polledAttributes.has(peer)) { - // Node was removed, so skip it - continue; - } - polledNodes++; - await this.#pollNode(peer, attributePaths); - // Small delay between nodes to avoid overwhelming the network - // Only add this delay if there are more nodes remaining to be polled - if (i < entries.length - 1) { - this.#currentDelayPromise = Time.sleep("sleep", Millis(2_000)).finally(() => { - this.#currentDelayPromise = undefined; - }); - await this.#currentDelayPromise; - } - } - } finally { - this.#isPolling = false; - // Schedule next polling cycle - this.#schedulePoller(); - } - if (polledNodes > 0) { - logger.info( - `Polled ${polledNodes} nodes for energy data. Scheduling next poll in ${Duration.format(this.#pollerTimer.interval)}`, - ); - } - } - - /** - * Poll a single node for its custom attributes. - * The read will automatically trigger change events through the normal attribute flow. - */ - async #pollNode(peer: PeerAddress, attributePaths: Set): Promise { - if (!this.#attributeReader.nodeConnected(peer)) { - logger.debug(`Node ${formatNodeId(peer)} not connected, skipping custom attribute polling`); - return; - } const paths = Array.from(attributePaths); logger.debug(`Polling ${paths.length} custom attributes for node ${formatNodeId(peer)}`); @@ -278,4 +195,10 @@ export class CustomClusterPoller { this.#currentReadPromise = undefined; } } + + protected override onCycleComplete(processedCount: number, intervalFormatted: string): void { + if (processedCount > 0) { + logger.info(`Polled ${processedCount} nodes for energy data. Next poll in ${intervalFormatted}`); + } + } } From 7a0e3e34861bdca89dd2dd6547518fcde9bf7fe7 Mon Sep 17 00:00:00 2001 From: Mark van Proctor <6936351+markvp@users.noreply.github.com> Date: Sat, 2 May 2026 20:44:08 -0700 Subject: [PATCH 4/5] Add TimeSyncManagerTest covering hasTimeSyncCluster and TimeSyncManager Tests cover: registration (startup window guard, connectivity check, cluster detection), syncNode (immediate fire, deduplication, in-flight tracking), unregisterNode, periodic resync via NodeProcessor timer (startup delay, 24h cycle, skip disconnected/in-flight peers), and stop() awaiting in-flight syncs. Co-Authored-By: Claude Sonnet 4.6 --- .../ws-controller/test/TimeSyncManagerTest.ts | 299 ++++++++++++++++++ 1 file changed, 299 insertions(+) create mode 100644 packages/ws-controller/test/TimeSyncManagerTest.ts diff --git a/packages/ws-controller/test/TimeSyncManagerTest.ts b/packages/ws-controller/test/TimeSyncManagerTest.ts new file mode 100644 index 00000000..ec20f2b6 --- /dev/null +++ b/packages/ws-controller/test/TimeSyncManagerTest.ts @@ -0,0 +1,299 @@ +/** + * @license + * Copyright 2025-2026 Open Home Foundation + * SPDX-License-Identifier: Apache-2.0 + */ + +import { FabricIndex, NodeId } from "@matter/main"; +import { PeerAddress, PeerAddressSet } from "@matter/main/protocol"; +import { hasTimeSyncCluster, TimeSyncConnector, TimeSyncManager } from "../src/controller/TimeSyncManager.js"; +import { AttributesData } from "../src/types/CommandHandler.js"; + +const TIME_SYNC_CLUSTER_ID = 0x0038; // 56 decimal +const ONE_MINUTE_MS = 60_000; +const ONE_DAY_MS = 24 * 60 * ONE_MINUTE_MS; + +// Startup delay is random 30–60 min; advancing 61 min always fires it +const PAST_STARTUP_MS = 61 * ONE_MINUTE_MS; + +const PEER_1 = PeerAddress({ fabricIndex: FabricIndex(1), nodeId: NodeId(1) }); +const PEER_2 = PeerAddress({ fabricIndex: FabricIndex(1), nodeId: NodeId(2) }); + +function makeTimeSyncAttrs(): AttributesData { + return { [`0/${TIME_SYNC_CLUSTER_ID}/0`]: 1 }; +} + +class StubConnector implements TimeSyncConnector { + readonly syncCalls: PeerAddress[] = []; + private readonly _connected = new PeerAddressSet(); + slowSync = false; + readonly syncResolvers: Array<() => void> = []; + + setConnected(peer: PeerAddress): void { + this._connected.add(peer); + } + + nodeConnected(peer: PeerAddress): boolean { + return this._connected.has(peer); + } + + async syncTime(peer: PeerAddress): Promise { + if (this.slowSync) { + await new Promise(resolve => this.syncResolvers.push(resolve)); + } + this.syncCalls.push(peer); + } + + resolveAll(): void { + const resolvers = this.syncResolvers.splice(0); + resolvers.forEach(r => r()); + } +} + +describe("hasTimeSyncCluster", () => { + it("returns true when TimeSynchronization cluster attributes are present", () => { + expect(hasTimeSyncCluster({ [`0/${TIME_SYNC_CLUSTER_ID}/0`]: 1 })).to.equal(true); + }); + + it("returns true for any attribute index on the cluster", () => { + expect(hasTimeSyncCluster({ [`0/${TIME_SYNC_CLUSTER_ID}/255`]: "x" })).to.equal(true); + }); + + it("returns false when no attributes are present", () => { + expect(hasTimeSyncCluster({})).to.equal(false); + }); + + it("returns false for attributes on a different cluster", () => { + expect(hasTimeSyncCluster({ "0/40/0": 1 })).to.equal(false); + }); + + it("only matches endpoint 0 per Matter spec", () => { + expect(hasTimeSyncCluster({ [`1/${TIME_SYNC_CLUSTER_ID}/0`]: 1 })).to.equal(false); + }); +}); + +describe("TimeSyncManager", () => { + let connector: StubConnector; + let manager: TimeSyncManager; + + beforeEach(() => { + MockTime.reset(); + connector = new StubConnector(); + manager = new TimeSyncManager(connector); + }); + + afterEach(async () => { + connector.resolveAll(); // unblock any pending slow syncs so stop() doesn't hang + await manager.stop(); + }); + + describe("registerNode", () => { + it("does not sync during the startup window even when connected", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("does not sync when node is not connected, even after startup", async () => { + manager.completeStartup(); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("syncs immediately once startupComplete is set and node is connected", async () => { + connector.setConnected(PEER_1); + manager.completeStartup(); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + }); + + it("does not sync for a node without the TimeSynchronization cluster", async () => { + connector.setConnected(PEER_1); + manager.completeStartup(); + manager.registerNode(PEER_1, { "0/40/0": 1 }); // no time sync cluster + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("unregisters the peer when re-registered without TimeSynchronization cluster", async () => { + manager.registerNode(PEER_1, makeTimeSyncAttrs()); // register + manager.registerNode(PEER_1, { "0/40/0": 1 }); // no longer has cluster + + manager.completeStartup(); + connector.setConnected(PEER_1); + manager.syncNode(PEER_1); // should be no-op since unregistered + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + }); + + describe("syncNode", () => { + beforeEach(() => { + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.completeStartup(); + }); + + it("calls syncTime when peer is registered and connected", async () => { + connector.setConnected(PEER_1); + manager.syncNode(PEER_1); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + }); + + it("does not call syncTime when peer is not connected", async () => { + manager.syncNode(PEER_1); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("does not call syncTime for an unregistered peer", async () => { + connector.setConnected(PEER_2); + manager.syncNode(PEER_2); // PEER_2 not registered + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + + it("deduplicates when a sync is already in flight", async () => { + connector.slowSync = true; + connector.setConnected(PEER_1); + + manager.syncNode(PEER_1); // starts in-flight sync + manager.syncNode(PEER_1); // duplicate — dropped + manager.syncNode(PEER_1); // duplicate — dropped + + connector.resolveAll(); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + }); + + it("allows a new sync after the previous one completes", async () => { + connector.slowSync = true; + connector.setConnected(PEER_1); + + manager.syncNode(PEER_1); + connector.resolveAll(); + await MockTime.yield3(); + + manager.syncNode(PEER_1); + connector.resolveAll(); + await MockTime.yield3(); + + expect(connector.syncCalls.length).to.equal(2); + }); + }); + + describe("unregisterNode", () => { + it("makes syncNode a no-op for the removed peer", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.unregisterNode(PEER_1); + manager.completeStartup(); + manager.syncNode(PEER_1); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(0); + }); + }); + + describe("periodic resync (via NodeProcessor timer)", () => { + it("syncs connected nodes after the startup delay fires", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + + await MockTime.advance(PAST_STARTUP_MS); + await MockTime.yield3(); + + expect(connector.syncCalls.length).to.equal(1); + }); + + it("skips disconnected nodes during resync", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.registerNode(PEER_2, makeTimeSyncAttrs()); // PEER_2 not connected + + await MockTime.advance(PAST_STARTUP_MS); + await MockTime.yield3(); + + expect(connector.syncCalls.length).to.equal(1); + expect(connector.syncCalls[0]).to.deep.equal(PEER_1); + }); + + it("skips peers that already have an in-flight sync", async () => { + connector.slowSync = true; + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.completeStartup(); + + manager.syncNode(PEER_1); // start in-flight sync + await MockTime.yield(); + + await MockTime.advance(PAST_STARTUP_MS); // periodic cycle fires + await MockTime.yield3(); + + // Only 1 syncTime call total — the periodic cycle skipped PEER_1 + connector.resolveAll(); + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + }); + + it("enables immediate syncs on reconnect after the startup cycle completes", async () => { + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + + await MockTime.advance(PAST_STARTUP_MS); // first cycle, no connected nodes + await MockTime.yield3(); + + // After startup, re-registering a connected node syncs immediately + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await MockTime.yield3(); + + expect(connector.syncCalls.length).to.equal(1); + }); + + it("resyncs again after 24 hours", async () => { + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + + await MockTime.advance(PAST_STARTUP_MS); // first cycle + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(1); + + await MockTime.advance(ONE_DAY_MS); // 24h resync + await MockTime.yield3(); + expect(connector.syncCalls.length).to.equal(2); + }); + }); + + describe("stop", () => { + it("completes cleanly when no nodes are registered", async () => { + await manager.stop(); + }); + + it("completes cleanly when nodes are registered but no syncs are in flight", async () => { + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + await manager.stop(); + }); + + it("awaits in-flight syncs before completing", async () => { + connector.slowSync = true; + connector.setConnected(PEER_1); + manager.registerNode(PEER_1, makeTimeSyncAttrs()); + manager.completeStartup(); + manager.syncNode(PEER_1); + + let stopped = false; + const stopPromise = manager.stop().then(() => { + stopped = true; + }); + + await MockTime.yield(); + expect(stopped).to.equal(false); // still waiting on in-flight sync + + connector.resolveAll(); + await stopPromise; + expect(stopped).to.equal(true); + }); + }); +}); From 41d31c69628debbb9687fc64c0592e72d38f3517 Mon Sep 17 00:00:00 2001 From: Mark van Proctor <6936351+markvp@users.noreply.github.com> Date: Sun, 3 May 2026 00:38:29 -0700 Subject: [PATCH 5/5] fix: floor startup delay to integer ms to satisfy MockTimer Math.random() produces a float; @matter/testing's MockTimer requires integer milliseconds and throws on fractional values. Co-Authored-By: Claude Sonnet 4.6 --- packages/ws-controller/src/controller/TimeSyncManager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/ws-controller/src/controller/TimeSyncManager.ts b/packages/ws-controller/src/controller/TimeSyncManager.ts index a580df6c..d152c61e 100644 --- a/packages/ws-controller/src/controller/TimeSyncManager.ts +++ b/packages/ws-controller/src/controller/TimeSyncManager.ts @@ -61,7 +61,7 @@ export class TimeSyncManager extends NodeProcessor { constructor(connector: TimeSyncConnector) { // Startup window: random 30–60 minutes to stagger across server restarts and // allow NTP to stabilize before pushing time to devices - const startupDelayMs = Minutes(30) + Math.random() * Minutes(30); + const startupDelayMs = Minutes(30) + Math.floor(Math.random() * Minutes(30)); super("time-sync-resync", startupDelayMs, RESYNC_INTERVAL); this.#connector = connector; }