Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/matter-server/src/MatterServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ async function start() {
disableOtaProvider: cliOptions.disableOta,
serverId: legacyData.serverId,
serverVersion: MATTER_SERVER_VERSION,
enableTimeSync: cliOptions.enableTimeSync,
},
legacyServerData,
);
Expand Down
14 changes: 14 additions & 0 deletions packages/matter-server/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ export interface CliOptions {
disableOta: boolean;
otaProviderDir: string | null;

// Time synchronization configuration
enableTimeSync: boolean;

// Dashboard configuration
disableDashboard: boolean;
productionMode: boolean;
Expand Down Expand Up @@ -164,6 +167,16 @@ export function parseCliArgs(argv?: string[]): CliOptions {
.env("DISABLE_OTA"),
)
.addOption(new Option("--ota-provider-dir <path>", "Directory for OTA Provider files").env("OTA_PROVIDER_DIR"))
.addOption(
new Option(
"--enable-time-sync [value]",
"Enable time synchronization for nodes with the TimeSynchronization cluster. Only enable when host NTP is reliable.",
)
.argParser(parseBooleanEnv)
.preset(true)
.default(false)
.env("ENABLE_TIME_SYNC"),
)
.addOption(
new Option("--disable-dashboard [value]", "Disable the web dashboard")
.argParser(parseBooleanEnv)
Expand Down Expand Up @@ -245,6 +258,7 @@ export function parseCliArgs(argv?: string[]): CliOptions {
bluetoothAdapter: opts.bluetoothAdapter ?? null,
disableOta: opts.disableOta,
otaProviderDir: opts.otaProviderDir ?? null,
enableTimeSync: opts.enableTimeSync,
disableDashboard: opts.disableDashboard,
productionMode: opts.productionMode,
};
Expand Down
88 changes: 78 additions & 10 deletions packages/ws-controller/src/controller/ControllerCommandHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
Minutes,
NodeId,
Observable,
ObserverGroup,
Seconds,
ServerAddress,
ServerAddressUdp,
Expand All @@ -35,6 +36,7 @@ import {
BridgedDeviceBasicInformation,
GeneralCommissioning,
OperationalCredentials,
TimeSynchronization,
} from "@matter/main/clusters";
import {
DecodedAttributeReportValue,
Expand Down Expand Up @@ -99,12 +101,17 @@ import { formatNodeId } from "../util/formatNodeId.js";
import { pingIp } from "../util/network.js";
import { CustomClusterPoller } from "./CustomClusterPoller.js";
import { Nodes } from "./Nodes.js";
import { TimeSyncManager } from "./TimeSyncManager.js";

const logger = Logger.get("ControllerCommandHandler");

/** After this duration in Reconnecting state, declare the node unavailable */
const RECONNECT_TIMEOUT = Minutes(3);

// timeFailure event ID within TimeSynchronization cluster (0x0038)
const TIME_SYNC_CLUSTER_ID = 0x0038;
const TIME_FAILURE_EVENT_ID = 0x03;

/**
* Cluster IDs whose attribute changes should trigger a full node_updated broadcast.
* BasicInformation (0x28) covers firmware version, product name, etc.
Expand Down Expand Up @@ -156,6 +163,10 @@ export class ControllerCommandHandler {
#availableUpdates = new Map<NodeId, SoftwareUpdateInfo>();
/** Poller for custom cluster attributes (Eve energy, etc.) */
#customClusterPoller: CustomClusterPoller;
/** Manages time synchronization for nodes with the TimeSynchronization cluster */
#timeSyncManager?: TimeSyncManager;
/** Per-node ObserverGroups for cleanup on decommission */
#nodeObservers = new Map<NodeId, ObserverGroup>();
/** Per-node timers that fire when Reconnecting state exceeds the timeout */
#reconnectTimers = new Map<NodeId, Timer>();
/** Track in-flight invoke-commands for deduplication across all WebSocket connections */
Expand All @@ -175,7 +186,12 @@ export class ControllerCommandHandler {
};
#peers?: PeerSet;

constructor(controllerInstance: CommissioningController, bleEnabled: boolean, otaEnabled: boolean) {
constructor(
controllerInstance: CommissioningController,
bleEnabled: boolean,
otaEnabled: boolean,
timeSyncEnabled = false,
) {
this.#controller = controllerInstance;

this.#bleEnabled = bleEnabled;
Expand All @@ -189,6 +205,14 @@ export class ControllerCommandHandler {
handleReadAttributes: (peer, paths, fabricFiltered) =>
this.handleReadAttributes(peer.nodeId, paths, fabricFiltered),
});

if (timeSyncEnabled) {
logger.info("Time synchronization enabled");
this.#timeSyncManager = new TimeSyncManager({
syncTime: peer => this.#syncNodeTime(peer.nodeId),
nodeConnected: peer => !!(this.#nodes.has(peer.nodeId) && this.#nodes.get(peer.nodeId).isConnected),
});
}
}

/**
Expand Down Expand Up @@ -283,12 +307,33 @@ export class ControllerCommandHandler {
}
}

/**
* Send a setUtcTime command to a node's TimeSynchronization cluster.
*/
async #syncNodeTime(nodeId: NodeId): Promise<void> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think thats more needed for it ... check home-assistant/core#166133 which basically implements anything correctly ... and we might also need a trigger on DST changes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reference. The HA PR uses ObserverGroup for event cleanup (which we've now adopted). Regarding DST changes: the 24-hour periodic resync is sufficient for DST correction. Pushing UTC epoch time to Matter nodes means DST is irrelevant on the device side — the device converts UTC to local time itself. No additional DST trigger is needed.

await this.#invokeCommand(this.#nodes.get(nodeId).node, {
endpoint: EndpointNumber(0),
cluster: TimeSynchronization.Cluster,
command: "setUtcTime",
fields: {
utcTime: Time.nowMs * 1000,
granularity: TimeSynchronization.Granularity.MillisecondsGranularity,
timeSource: TimeSynchronization.TimeSource.Admin,
},
});
}

async close() {
for (const timer of this.#reconnectTimers.values()) {
timer.stop();
}
this.#reconnectTimers.clear();
await this.#customClusterPoller.stop();
await this.#timeSyncManager?.stop();
for (const observers of this.#nodeObservers.values()) {
observers.close();
}
this.#nodeObservers.clear();
if (!this.#started) {
return;
}
Expand All @@ -299,11 +344,15 @@ export class ControllerCommandHandler {
const node = await this.#controller.getNode(nodeId);
const attributeCache = this.#nodes.attributeCache;

// Per-node ObserverGroup so all subscriptions are cleaned up on decommission
const nodeObservers = new ObserverGroup();
this.#nodeObservers.set(nodeId, nodeObservers);

// Wire all Events to the Event emitters
// Track if a BasicInformation or BridgedDeviceBasicInformation attribute changed during
// a subscription batch. When the batch ends (connectionAlive), emit a full node_updated.
let basicInfoChangedInBatch = false;
node.events.attributeChanged.on(data => {
nodeObservers.on(node.events.attributeChanged, data => {
// Update the attribute cache with the new value in WebSocket format
attributeCache.updateAttribute(nodeId, data);
// Then emit the event for listeners
Expand All @@ -313,21 +362,33 @@ export class ControllerCommandHandler {
basicInfoChangedInBatch = true;
}
});
node.events.connectionAlive.on(() => {
nodeObservers.on(node.events.connectionAlive, () => {
if (basicInfoChangedInBatch) {
basicInfoChangedInBatch = false;
logger.info(`Node ${this.formatNode(nodeId)} basic information changed, sending full node_updated`);
this.events.nodeStructureChanged.emit(nodeId);
}
});
node.events.eventTriggered.on(data => this.events.eventChanged.emit(nodeId, data));
node.events.stateChanged.on(state => {
nodeObservers.on(node.events.eventTriggered, data => {
this.events.eventChanged.emit(nodeId, data);
// Filter timeFailure events to trigger time sync
if (
this.#timeSyncManager !== undefined &&
data.path.clusterId === TIME_SYNC_CLUSTER_ID &&
data.path.eventId === TIME_FAILURE_EVENT_ID
) {
logger.debug(`Received timeFailure event from node ${this.formatNode(nodeId)}, triggering time sync`);
this.#timeSyncManager.syncNode(this.#peerOf(nodeId));
}
});
nodeObservers.on(node.events.stateChanged, state => {
// Only refresh cache on Connected state
if (state === NodeStates.Connected) {
attributeCache.update(node);
const attributes = attributeCache.get(nodeId);
if (attributes) {
this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes);
this.#timeSyncManager?.registerNode(this.#peerOf(nodeId), attributes);
}
}

Expand Down Expand Up @@ -367,7 +428,7 @@ export class ControllerCommandHandler {
this.events.nodeAvailabilityChanged.emit(nodeId, result.available);
}
});
node.events.structureChanged.on(() => {
nodeObservers.on(node.events.structureChanged, () => {
// Structure changed means endpoints may have been added/removed, refresh cache
if (node.isConnected) {
attributeCache.update(node);
Expand All @@ -380,12 +441,16 @@ export class ControllerCommandHandler {
this.events.nodeEndpointAdded.emit(nodeId, endpointId);
}
});
node.events.decommissioned.on(() => {
nodeObservers.on(node.events.decommissioned, () => {
this.#cleanupNodeAfterRemoval(nodeId);
this.events.nodeDecommissioned.emit(nodeId);
});
node.events.nodeEndpointAdded.on(endpointId => this.#nodes.queueEndpointAdded(nodeId, endpointId));
node.events.nodeEndpointRemoved.on(endpointId => this.events.nodeEndpointRemoved.emit(nodeId, endpointId));
nodeObservers.on(node.events.nodeEndpointAdded, endpointId =>
this.#nodes.queueEndpointAdded(nodeId, endpointId),
);
nodeObservers.on(node.events.nodeEndpointRemoved, endpointId =>
this.events.nodeEndpointRemoved.emit(nodeId, endpointId),
);

// Store the node for direct access
this.#nodes.set(nodeId, node);
Expand All @@ -395,10 +460,10 @@ export class ControllerCommandHandler {
// Initialize attribute cache if node is already initialized
if (node.initialized) {
attributeCache.add(node);
// Register for custom cluster polling (e.g., Eve energy)
const attributes = attributeCache.get(nodeId);
if (attributes) {
this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes);
this.#timeSyncManager?.registerNode(this.#peerOf(nodeId), attributes);
}
}

Expand Down Expand Up @@ -1051,8 +1116,11 @@ export class ControllerCommandHandler {
#cleanupNodeAfterRemoval(nodeId: NodeId) {
this.#reconnectTimers.get(nodeId)?.stop();
this.#reconnectTimers.delete(nodeId);
this.#nodeObservers.get(nodeId)?.close();
this.#nodeObservers.delete(nodeId);
this.#nodes.delete(nodeId);
this.#customClusterPoller.unregisterNode(this.#peerOf(nodeId));
this.#timeSyncManager?.unregisterNode(this.#peerOf(nodeId));
this.#availableUpdates.delete(nodeId);
}

Expand Down
Loading