From b71340fd25223ff02303a7bd990652bbe0b49c5c Mon Sep 17 00:00:00 2001 From: Dzmitry Zhybryk Date: Mon, 29 May 2023 16:25:56 +0700 Subject: [PATCH 1/2] add try/except for ClietPyloadError and ClientConnectorError --- aiohttp_sse_client/client.py | 59 +++++++++++++++++++----------------- setup.py | 2 +- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/aiohttp_sse_client/client.py b/aiohttp_sse_client/client.py index f3edbc5..a03bf59 100644 --- a/aiohttp_sse_client/client.py +++ b/aiohttp_sse_client/client.py @@ -6,7 +6,7 @@ from typing import Optional, Dict, Any import attr -from aiohttp import hdrs, ClientSession, ClientConnectionError +from aiohttp import hdrs, ClientSession, ClientConnectionError, ClientPayloadError, ClientConnectorError from multidict import MultiDict from yarl import URL @@ -56,6 +56,7 @@ class EventSource: .. seealso:: https://www.w3.org/TR/eventsource/#eventsource """ + def __init__(self, url: str, option: Optional[Dict[str, Any]] = None, reconnection_time: timedelta = DEFAULT_RECONNECTION_TIME, @@ -154,29 +155,33 @@ async def __anext__(self) -> MessageEvent: # async for ... in StreamReader only split line by \n while self._response.status != 204: - async for line_in_bytes in self._response.content: - line = line_in_bytes.decode('utf8') # type: str - line = line.rstrip('\n').rstrip('\r') - - if line == '': - # empty line - event = self._dispatch_event() - if event is not None: - return event - continue - - if line[0] == ':': - # comment line, ignore - continue - - if ':' in line: - # contains ':' - fields = line.split(':', 1) - field_name = fields[0] - field_value = fields[1].lstrip(' ') - self._process_field(field_name, field_value) - else: - self._process_field(line, '') + try: + async for line_in_bytes in self._response.content: + line = line_in_bytes.decode('utf8') # type: str + line = line.rstrip('\n').rstrip('\r') + + if line == '': + # empty line + event = self._dispatch_event() + if event is not None: + return event + continue + + if line[0] == ':': + # comment line, ignore + continue + + if ':' in line: + # contains ':' + fields = line.split(':', 1) + field_name = fields[0] + field_value = fields[1].lstrip(' ') + self._process_field(field_name, field_value) + else: + self._process_field(line, '') + except ClientPayloadError: + pass + self._ready_state = READY_STATE_CONNECTING if self._on_error: self._on_error() @@ -216,7 +221,7 @@ async def connect(self, retry=0): self._url, **self._kwargs ) - except ClientConnectionError: + except (ClientConnectionError, ClientConnectorError): if retry <= 0 or self._ready_state == READY_STATE_CLOSED: await self._fail_connect() raise @@ -252,8 +257,8 @@ async def connect(self, retry=0): if response.content_type != CONTENT_TYPE_EVENT_STREAM: error_message = \ - 'fetch {} failed with wrong Content-Type: {}'.format( - self._url, response.headers.get(hdrs.CONTENT_TYPE)) + 'fetch {} failed with wrong Content-Type: {}'.format( + self._url, response.headers.get(hdrs.CONTENT_TYPE)) _LOGGER.error(error_message) await self._fail_connect() diff --git a/setup.py b/setup.py index 062bb08..42fb5bf 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,6 @@ test_suite='tests', tests_require=test_requirements, url='https://github.com/rtfol/aiohttp-sse-client', - version='0.2.1', + version='0.2.2', zip_safe=False, ) From 4c4759829b8185c44d3c6fc7c4ae64ce41cdbf6c Mon Sep 17 00:00:00 2001 From: Dzmitry Zhybryk Date: Wed, 31 May 2023 08:42:49 +0700 Subject: [PATCH 2/2] bug fix --- aiohttp_sse_client/client.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/aiohttp_sse_client/client.py b/aiohttp_sse_client/client.py index a03bf59..8d6610f 100644 --- a/aiohttp_sse_client/client.py +++ b/aiohttp_sse_client/client.py @@ -6,7 +6,7 @@ from typing import Optional, Dict, Any import attr -from aiohttp import hdrs, ClientSession, ClientConnectionError, ClientPayloadError, ClientConnectorError +from aiohttp import hdrs, ClientSession, ClientPayloadError, ClientConnectionError, ClientConnectorError from multidict import MultiDict from yarl import URL @@ -185,12 +185,12 @@ async def __anext__(self) -> MessageEvent: self._ready_state = READY_STATE_CONNECTING if self._on_error: self._on_error() - self._reconnection_time *= 2 _LOGGER.debug('wait %s seconds for retry', self._reconnection_time.total_seconds()) await asyncio.sleep( self._reconnection_time.total_seconds()) - await self.connect() + self._reconnection_time *= 2 + await self.connect(self._max_connect_retry) raise StopAsyncIteration async def connect(self, retry=0): @@ -229,11 +229,11 @@ async def connect(self, retry=0): self._ready_state = READY_STATE_CONNECTING if self._on_error: self._on_error() - self._reconnection_time *= 2 _LOGGER.debug('wait %s seconds for retry', self._reconnection_time.total_seconds()) await asyncio.sleep( self._reconnection_time.total_seconds()) + self._reconnection_time *= 2 await self.connect(retry - 1) return @@ -341,3 +341,18 @@ def _process_field(self, field_name, field_value): pass pass + +# if __name__ == '__main__': +# import asyncio +# +# +# async def main(): +# async with EventSource("http://10.10.10.103:30106/api/v1/stream-events/", max_connect_retry=10) as event_source: +# try: +# async for event in event_source: +# print(event) +# except ConnectionError: +# pass +# +# +# asyncio.run(main())