From 8db77ccdfd2bee6f36da1eff8a22771dc6dc27e7 Mon Sep 17 00:00:00 2001 From: Brad P Date: Thu, 27 Nov 2025 08:40:51 -0600 Subject: [PATCH 1/6] use -1 if subscriber returns no data on index --- pytrickle/subscriber.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytrickle/subscriber.py b/pytrickle/subscriber.py index 0a3ba81..64c84cd 100644 --- a/pytrickle/subscriber.py +++ b/pytrickle/subscriber.py @@ -78,7 +78,8 @@ async def preconnect(self) -> Optional[aiohttp.ClientResponse]: if resp.status == 470: # Channel exists but no data at this index, so reset - idx = resp.headers.get('Lp-Trickle-Latest') or '-1' + #idx = resp.headers.get('Lp-Trickle-Latest') or '-1' + idx = -1 url = f"{self.base_url}/{idx}" logger.info(f"Trickle sub resetting index to leading edge {url}") resp.release() From 945b2263ee7f3a0b363bbff6c2d28da0d05bcf6d Mon Sep 17 00:00:00 2001 From: Brad P Date: Thu, 27 Nov 2025 09:52:49 -0600 Subject: [PATCH 2/6] try timeout on control --- pytrickle/subscriber.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pytrickle/subscriber.py b/pytrickle/subscriber.py index 64c84cd..9063c66 100644 --- a/pytrickle/subscriber.py +++ b/pytrickle/subscriber.py @@ -83,7 +83,9 @@ async def preconnect(self) -> Optional[aiohttp.ClientResponse]: url = f"{self.base_url}/{idx}" logger.info(f"Trickle sub resetting index to leading edge {url}") resp.release() - # Continue immediately + # Continue immediately after small timeout for control url + if "control" in self.base_url: + await asyncio.sleep(0.2) continue body = await resp.text() From 92cb05d7ed6d5bdd978b2a9ff64d8568b51fdff3 Mon Sep 17 00:00:00 2001 From: Brad P Date: Thu, 27 Nov 2025 10:20:27 -0600 Subject: [PATCH 3/6] try a longer wait on control retries --- pytrickle/subscriber.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytrickle/subscriber.py b/pytrickle/subscriber.py index 9063c66..7048f67 100644 --- a/pytrickle/subscriber.py +++ b/pytrickle/subscriber.py @@ -85,7 +85,7 @@ async def preconnect(self) -> Optional[aiohttp.ClientResponse]: resp.release() # Continue immediately after small timeout for control url if "control" in self.base_url: - await asyncio.sleep(0.2) + await asyncio.sleep(1*attempt) continue body = await resp.text() From a0c8a50f25eebe5a99c63102eeccdbd12da4c327 Mon Sep 17 00:00:00 2001 From: Brad P Date: Thu, 27 Nov 2025 11:11:50 -0600 Subject: [PATCH 4/6] try another idx first --- pytrickle/subscriber.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pytrickle/subscriber.py b/pytrickle/subscriber.py index 7048f67..ce2d0c7 100644 --- a/pytrickle/subscriber.py +++ b/pytrickle/subscriber.py @@ -78,14 +78,23 @@ async def preconnect(self) -> Optional[aiohttp.ClientResponse]: if resp.status == 470: # Channel exists but no data at this index, so reset - #idx = resp.headers.get('Lp-Trickle-Latest') or '-1' - idx = -1 + idx = resp.headers.get('Lp-Trickle-Latest') or '-1' url = f"{self.base_url}/{idx}" logger.info(f"Trickle sub resetting index to leading edge {url}") resp.release() # Continue immediately after small timeout for control url if "control" in self.base_url: await asyncio.sleep(1*attempt) + if attempt > 1: + if idx != -1: + idx = idx + 1 + resp2 = await self.session.get(f"{self.base_url}/{idx}", headers={'Connection': 'close'}) + if resp2.status == 200: + resp.release() + return resp2 + else: + resp2.release() + await asyncio.sleep(1 * attempt) continue body = await resp.text() From 0544334f8736101ca94fb5a7e82d6329113d3944 Mon Sep 17 00:00:00 2001 From: Brad P Date: Thu, 27 Nov 2025 11:47:35 -0600 Subject: [PATCH 5/6] fix str to int --- pytrickle/subscriber.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytrickle/subscriber.py b/pytrickle/subscriber.py index ce2d0c7..dbd582b 100644 --- a/pytrickle/subscriber.py +++ b/pytrickle/subscriber.py @@ -87,7 +87,7 @@ async def preconnect(self) -> Optional[aiohttp.ClientResponse]: await asyncio.sleep(1*attempt) if attempt > 1: if idx != -1: - idx = idx + 1 + idx = str(int(idx) + 1) resp2 = await self.session.get(f"{self.base_url}/{idx}", headers={'Connection': 'close'}) if resp2.status == 200: resp.release() From af947bd30baaadd605d9e388c4347df74c607a10 Mon Sep 17 00:00:00 2001 From: Brad P Date: Thu, 27 Nov 2025 16:15:05 -0600 Subject: [PATCH 6/6] add log line for control keep alive msg --- pytrickle/protocol.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pytrickle/protocol.py b/pytrickle/protocol.py index 79d675e..6e24b76 100644 --- a/pytrickle/protocol.py +++ b/pytrickle/protocol.py @@ -422,6 +422,7 @@ async def control_loop(self, done: asyncio.Event) -> AsyncGenerator[dict, None]: data = json.loads(params) if data == keepalive_message: # Ignore periodic keepalive messages + logger.info("Received control keepalive message") continue logger.info("Received control message with params: %s", data)