This repository was archived by the owner on Mar 30, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlightwave3.py
More file actions
461 lines (379 loc) · 19 KB
/
lightwave3.py
File metadata and controls
461 lines (379 loc) · 19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
"""
lightwave3.py - Python 3.11-compatible LightWaveRF Gen2 WebSocket client.
Drop-in replacement for lightwave2.LWLink2 as used by lwrf_pi.py.
Changes vs lightwave2.py
------------------------
1. @asyncio.coroutine / yield from removed (deleted in Python 3.11).
_consumer_handler is now a plain async def with await.
2. async_connect() is now iterative (not recursive) with a configurable
max_tries cap. Raises ConnectionError on exhaustion instead of looping
forever. Caller (LightwaveWorker.reconnect) will report the failure and
the watchdog will retry at a higher level.
3. _async_sendmessage guards against _websocket being None after a failed
reconnect attempt, raising ConnectionError instead of AttributeError.
4. Consumer task is started inside async_connect() via asyncio.create_task()
so it always has a proper Task context (required by Python 3.11 asyncio).
5. Feature objects (_LWRFFeature with .id and ._state) replace bare
[feature_id, value] lists. This matches the API that lwrf_pi.py
actually uses (feat.id, feat._state).
6. Callback signature: func(feature_name, feature_id, prev_value, new_value)
matching _on_state_change in lwrf_pi.py.
Roll back: change `import lightwave3 as lightwave2`
back to `import lightwave2`
in lwrf_pi.py.
"""
import asyncio
import json
import uuid
import logging
import aiohttp
_LOGGER = logging.getLogger(__name__)
AUTH_SERVER = "https://auth.lightwaverf.com/v2/lightwaverf/autouserlogin/lwapps"
TRANS_SERVER = "wss://v1-linkplus-app.lightwaverf.com"
VERSION = "1.6.8"
MAX_RETRIES = 5 # send retries per message
MAX_CONNECT_TRIES = 8 # connection attempts before giving up (~4 min of backoff)
# ------------------------------------------------------------------ #
# Wire protocol helpers #
# ------------------------------------------------------------------ #
class _LWRFMessage:
_tran_id = 0
_sender_id = str(uuid.uuid4())
def __init__(self, opclass: str, operation: str):
self._message = {
"class": opclass,
"operation": operation,
"version": 1,
"senderId": self._sender_id,
"transactionId": _LWRFMessage._tran_id,
"direction": "request",
"items": [],
}
_LWRFMessage._tran_id += 1
def additem(self, item: "_LWRFMessageItem"):
self._message["items"].append(item._item)
def json(self) -> str:
return json.dumps(self._message)
class _LWRFMessageItem:
_item_id = 0
def __init__(self, payload: dict = None):
self._item = {"itemId": _LWRFMessageItem._item_id, "payload": payload or {}}
_LWRFMessageItem._item_id += 1
# ------------------------------------------------------------------ #
# Domain model #
# ------------------------------------------------------------------ #
class _LWRFFeature:
"""One addressable capability on a device (switch, dimLevel, power…)."""
def __init__(self, feature_id: str, initial_state=None):
self.id = feature_id
self._state = initial_state
class _LWRFFeatureSet:
"""One logical device (socket, dimmer, sensor…)."""
def __init__(self):
self.featureset_id = None
self.name = None
self.product_code = None
self.features: dict[str, _LWRFFeature] = {}
self._switchable = False
self._dimmable = False
self._climate = False
self._gen2 = False
self._power_reporting = False
self._cover = False
def is_switch(self): return self._switchable and not self._dimmable
def is_light(self): return self._dimmable
def is_climate(self): return self._climate
def is_cover(self): return self._cover
def is_gen2(self): return self._gen2
def reports_power(self): return self._power_reporting
# ------------------------------------------------------------------ #
# Main client class #
# ------------------------------------------------------------------ #
class LWLink2:
"""
LightWaveRF Gen2 local-app WebSocket client.
Typical usage (mirrors lwrf_pi.py):
link = LWLink2(email, password)
await link.async_connect()
await link.async_get_hierarchy()
await link.async_register_callback(my_callback)
# my_callback(feature_name, feature_id, prev_value, new_value)
await link.async_turn_on_by_featureset_id(fs_id)
"""
def __init__(self, username: str = None, password: str = None):
self.featuresets: dict[str, _LWRFFeatureSet] = {}
self._username = username
self._password = password
self._authtoken = None
self._device_id = str(uuid.uuid4())
self._session: aiohttp.ClientSession | None = None
self._websocket = None
self._callbacks: list = []
self._group_ids: list = []
# Synchronise request/response within a single asyncio Task
self._transaction = None
self._waitingforresponse = asyncio.Event()
self._response = None
self._consumer_task: asyncio.Task | None = None
# ---------------------------------------------------------------- #
# Public API #
# ---------------------------------------------------------------- #
async def async_connect(self, max_tries: int = MAX_CONNECT_TRIES):
"""
Open WebSocket and authenticate.
Retries with exponential backoff (2s, 4s, 8s … capped at 120s).
Raises ConnectionError if every attempt fails.
"""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
for attempt in range(max_tries):
try:
if not self._websocket or self._websocket.closed:
self._websocket = await self._session.ws_connect(TRANS_SERVER)
# Consumer must be running before _authenticate() so it can
# receive and dispatch the auth response message.
if self._consumer_task is None or self._consumer_task.done():
self._consumer_task = asyncio.create_task(
self._consumer_handler(), name="LWRFConsumer"
)
await self._authenticate()
_LOGGER.info("async_connect: connected (attempt %d)", attempt + 1)
return
except Exception as exc:
delay = min(2 ** (attempt + 1), 120)
_LOGGER.warning(
"async_connect: attempt %d/%d failed (%s). Retrying in %ds",
attempt + 1, max_tries, exc, delay,
)
await asyncio.sleep(delay)
raise ConnectionError(
f"async_connect: failed after {max_tries} attempts"
)
async def async_get_hierarchy(self):
"""Fetch full device/featureset hierarchy and populate self.featuresets."""
_LOGGER.debug("async_get_hierarchy: fetching root groups")
msg = _LWRFMessage("user", "rootGroups")
msg.additem(_LWRFMessageItem())
resp = await self._async_sendmessage(msg)
self._group_ids = []
for item in resp["items"]:
self._group_ids += item["payload"]["groupIds"]
await self._async_read_groups()
await self._async_update_featureset_states()
async def async_register_callback(self, callback):
"""
Register a state-change callback.
Signature: callback(feature_name: str, feature_id: str,
prev_value, new_value)
"""
self._callbacks.append(callback)
async def async_turn_on_by_featureset_id(self, featureset_id: str):
feat_id = self.featuresets[featureset_id].features["switch"].id
await self._async_write_feature(feat_id, 1)
async def async_turn_off_by_featureset_id(self, featureset_id: str):
feat_id = self.featuresets[featureset_id].features["switch"].id
await self._async_write_feature(feat_id, 0)
async def async_set_brightness_by_featureset_id(self, featureset_id: str, level: int):
feat_id = self.featuresets[featureset_id].features["dimLevel"].id
await self._async_write_feature(feat_id, level)
async def async_set_temperature_by_featureset_id(self, featureset_id: str, level: float):
feat_id = self.featuresets[featureset_id].features["targetTemperature"].id
await self._async_write_feature(feat_id, int(level * 10))
async def async_cover_open_by_featureset_id(self, featureset_id: str):
feat_id = self.featuresets[featureset_id].features["threeWayRelay"].id
await self._async_write_feature(feat_id, 1)
async def async_cover_close_by_featureset_id(self, featureset_id: str):
feat_id = self.featuresets[featureset_id].features["threeWayRelay"].id
await self._async_write_feature(feat_id, 2)
async def async_cover_stop_by_featureset_id(self, featureset_id: str):
feat_id = self.featuresets[featureset_id].features["threeWayRelay"].id
await self._async_write_feature(feat_id, 0)
# ---- helpers kept for any caller that still uses the sync variants ----
def get_switches(self):
return [(fs.featureset_id, fs.name)
for fs in self.featuresets.values() if fs.is_switch()]
def get_lights(self):
return [(fs.featureset_id, fs.name)
for fs in self.featuresets.values() if fs.is_light()]
def get_climates(self):
return [(fs.featureset_id, fs.name)
for fs in self.featuresets.values() if fs.is_climate()]
def get_covers(self):
return [(fs.featureset_id, fs.name)
for fs in self.featuresets.values() if fs.is_cover()]
# ---------------------------------------------------------------- #
# Internal — messaging #
# ---------------------------------------------------------------- #
async def _async_sendmessage(self, message: _LWRFMessage, _retry: int = 1):
if not self._websocket or self._websocket.closed:
_LOGGER.debug("_async_sendmessage: websocket closed, reconnecting")
await self.async_connect()
# Guard: if still not available after reconnect, fail loudly
if not self._websocket or self._websocket.closed:
raise ConnectionError(
"_async_sendmessage: websocket unavailable after reconnect"
)
_LOGGER.debug("Sending: %s", message.json())
await self._websocket.send_str(message.json())
self._transaction = message._message["transactionId"]
self._waitingforresponse.clear()
await self._waitingforresponse.wait()
if self._response:
return self._response
if _retry >= MAX_RETRIES:
raise ConnectionError("_async_sendmessage: MAX_RETRIES exceeded")
_LOGGER.debug("_async_sendmessage: no response, retry %d", _retry + 1)
await self.async_connect()
return await self._async_sendmessage(message, _retry + 1)
async def _authenticate(self):
if not self._authtoken:
await self._get_access_token()
msg = _LWRFMessage("user", "authenticate")
msg.additem(_LWRFMessageItem({
"token": self._authtoken,
"clientDeviceId": self._device_id,
}))
resp = await self._async_sendmessage(msg)
if not resp or resp["items"][0]["success"]:
return
err = resp["items"][0]["error"]
code = err.get("code")
text = str(err.get("message", ""))
if code in ("200", 200):
pass # "Already authenticated" — fine
elif code in (405, "405") or "expired" in text or "not valid" in text:
self._authtoken = None
await self._authenticate()
else:
_LOGGER.warning("Unhandled auth error: %s", err)
async def _get_access_token(self):
auth = {"email": self._username, "password": self._password, "version": VERSION}
async with self._session.post(
AUTH_SERVER, headers={"x-lwrf-appid": "ios-01"}, json=auth
) as req:
if req.status == 200:
self._authtoken = (await req.json())["tokens"]["access_token"]
else:
raise ConnectionError(
f"Authentication failed (HTTP {req.status}): {await req.text()}"
)
# ---------------------------------------------------------------- #
# Internal — consumer Task #
# ---------------------------------------------------------------- #
async def _consumer_handler(self):
"""
Receive loop. Runs as an asyncio Task (created inside async_connect
so it always has a proper Task context). Handles reconnect by
scheduling a new async_connect Task rather than awaiting it directly,
so the consumer can keep running while the reconnect proceeds.
"""
_LOGGER.debug("_consumer_handler: started")
while True:
try:
if not self._websocket or self._websocket.closed:
await asyncio.sleep(1)
continue
msg = await self._websocket.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
await self._dispatch(msg.json())
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
_LOGGER.info("_consumer_handler: websocket %s — scheduling reconnect",
"closed" if msg.type == aiohttp.WSMsgType.CLOSED else "error")
self._response = None
self._websocket = None
self._waitingforresponse.set() # unblock any pending send
asyncio.create_task(self.async_connect(), name="LWRFReconnect")
await asyncio.sleep(1)
except asyncio.CancelledError:
_LOGGER.debug("_consumer_handler: cancelled")
return
except Exception as exc:
_LOGGER.warning("_consumer_handler: unexpected exception: %s", exc)
await asyncio.sleep(1)
async def _dispatch(self, message: dict):
"""Route a decoded JSON message from the cloud."""
# lightwave2 quirk: feature read/write responses use itemId, not transactionId
if (message.get("class") == "feature"
and message.get("operation") in ("write", "read")
and message.get("items")):
message["transactionId"] = message["items"][0]["itemId"]
# Transaction response — unblocks _async_sendmessage
if message.get("transactionId") == self._transaction:
self._response = message
self._waitingforresponse.set()
return
direction = message.get("direction")
opclass = message.get("class")
operation = message.get("operation")
# Group topology changed — reload hierarchy
if direction == "notification" and opclass == "group" and operation == "event":
asyncio.create_task(self.async_get_hierarchy(), name="LWRFHierarchy")
return
# Device state change notification
if direction == "notification" and operation == "event":
payload = message["items"][0].get("payload", {}) if message.get("items") else {}
if "featureId" in payload:
feat_id = payload["featureId"]
new_value = payload["value"]
feat_name, fs = self._find_feature(feat_id)
if fs and feat_name:
prev_value = fs.features[feat_name]._state
fs.features[feat_name]._state = new_value
for cb in self._callbacks:
cb(feat_name, feat_id, prev_value, new_value)
else:
_LOGGER.warning("_dispatch: unhandled event: %s", message)
return
_LOGGER.debug("_dispatch: unhandled message: %s", message)
# ---------------------------------------------------------------- #
# Internal — hierarchy loading #
# ---------------------------------------------------------------- #
async def _async_read_groups(self):
self.featuresets = {}
for gid in self._group_ids:
msg = _LWRFMessage("group", "read")
msg.additem(_LWRFMessageItem({
"groupId": gid, "blocks": True, "devices": True,
"features": True, "scripts": True,
"subgroups": True, "subgroupDepth": 10,
}))
resp = await self._async_sendmessage(msg)
for feat_data in resp["items"][0]["payload"]["features"].values():
for fs_id in feat_data["groups"]:
if fs_id not in self.featuresets:
fs = _LWRFFeatureSet()
fs.featureset_id = fs_id
fs.name = feat_data["name"]
fs.product_code = "unknown"
self.featuresets[fs_id] = fs
fs = self.featuresets[fs_id]
ftype = feat_data["attributes"]["type"]
fs.features[ftype] = _LWRFFeature(
feat_data["featureId"],
feat_data["attributes"].get("value"),
)
if ftype == "switch": fs._switchable = True
if ftype == "dimLevel": fs._dimmable = True
if ftype == "targetTemperature": fs._climate = True
if ftype == "identify": fs._gen2 = True
if ftype == "power": fs._power_reporting = True
if ftype == "threeWayRelay": fs._cover = True
async def _async_update_featureset_states(self):
for fs in self.featuresets.values():
for feat in fs.features.values():
msg = _LWRFMessage("feature", "read")
msg.additem(_LWRFMessageItem({"featureId": feat.id}))
resp = await self._async_sendmessage(msg)
if resp:
feat._state = resp["items"][0]["payload"]["value"]
async def _async_write_feature(self, feature_id: str, value):
msg = _LWRFMessage("feature", "write")
msg.additem(_LWRFMessageItem({"featureId": feature_id, "value": value}))
await self._async_sendmessage(msg)
def _find_feature(self, feature_id: str) -> tuple:
"""Return (feature_name, featureset) for feature_id, or (None, None)."""
for fs in self.featuresets.values():
for name, feat in fs.features.items():
if feat.id == feature_id:
return name, fs
return None, None