diff --git a/pytrickle/protocol.py b/pytrickle/protocol.py index 79d675e..6e24b76 100644 --- a/pytrickle/protocol.py +++ b/pytrickle/protocol.py @@ -422,6 +422,7 @@ async def control_loop(self, done: asyncio.Event) -> AsyncGenerator[dict, None]: data = json.loads(params) if data == keepalive_message: # Ignore periodic keepalive messages + logger.info("Received control keepalive message") continue logger.info("Received control message with params: %s", data) diff --git a/pytrickle/subscriber.py b/pytrickle/subscriber.py index 0a3ba81..dbd582b 100644 --- a/pytrickle/subscriber.py +++ b/pytrickle/subscriber.py @@ -82,7 +82,19 @@ async def preconnect(self) -> Optional[aiohttp.ClientResponse]: url = f"{self.base_url}/{idx}" logger.info(f"Trickle sub resetting index to leading edge {url}") resp.release() - # Continue immediately + # Continue immediately after small timeout for control url + if "control" in self.base_url: + await asyncio.sleep(1*attempt) + if attempt > 1: + if idx != -1: + idx = str(int(idx) + 1) + resp2 = await self.session.get(f"{self.base_url}/{idx}", headers={'Connection': 'close'}) + if resp2.status == 200: + resp.release() + return resp2 + else: + resp2.release() + await asyncio.sleep(1 * attempt) continue body = await resp.text()