diff --git a/eidangservices/federator/server/process.py b/eidangservices/federator/server/process.py index f8404b5..f16f611 100644 --- a/eidangservices/federator/server/process.py +++ b/eidangservices/federator/server/process.py @@ -22,9 +22,9 @@ GranularRequestStrategy, NetworkBulkRequestStrategy, NetworkCombiningRequestStrategy, AdaptiveNetworkBulkRequestStrategy) from eidangservices.federator.server.task import ( - RawDownloadTask, RawSplitAndAlignTask, StationTextDownloadTask, + ETask, RawDownloadTask, RawSplitAndAlignTask, StationTextDownloadTask, StationXMLDownloadTask, StationXMLNetworkCombinerTask, - WFCatalogSplitAndAlignTask) + StationTextInMemoryDownloadTask, WFCatalogSplitAndAlignTask) from eidangservices.utils.error import ErrorWithTraceback from eidangservices.utils.httperrors import FDSNHTTPError @@ -704,7 +704,7 @@ def _request(self): self._pool = mp.pool.ThreadPool(processes=pool_size) self._results = self._strategy.request( - self._pool, tasks={'default': StationTextDownloadTask}, + self._pool, tasks={'default': StationTextInMemoryDownloadTask}, query_params=self.query_params, keep_tempfiles=self._keep_tempfiles, http_method=self._http_method) @@ -733,25 +733,21 @@ def __iter__(self): yield '{}\n'.format(self.HEADER_CHANNEL) self._sizes.append(_result.length) - self.logger.debug( - 'Streaming from file {!r}.'.format( - _result.data)) try: - with open(_result.data, 'r', - encoding='utf-8') as fd: - for line in fd: - yield line - except Exception as err: + type_task = _result.extras['type_task'] + except KeyError as err: raise StreamingError(err) - if self._keep_tempfiles != KeepTempfiles.ALL: - self.logger.debug( - 'Removing temporary file {!r} ...'.format( - _result.data)) - try: - os.remove(_result.data) - except OSError as err: - RequestProcessorError(err) + if type_task == ETask.DOWNLOAD: + # return data from a temporary file + yield from self._generate_from_file( + _result.data) + elif type_task == ETask.DOWNLOAD_INMEM: + yield from self._generate_from_str( + _result.data) + else: + raise StreamingError( + 'Invalid task type: {}'.format(type_task)) elif _result.status_code == 413: self._handle_413(_result) @@ -780,6 +776,29 @@ def __iter__(self): self.logger.debug('GeneratorExit: Terminate ...') self._terminate() + def _generate_from_file(self, path): + """ + Generator returning data line-by-line from a temporary file. + """ + self.logger.debug('Streaming from file {!r}.'.format(path)) + try: + with open(path, 'r', encoding='utf-8') as fd: + for line in fd: + yield line + except Exception as err: + raise StreamingError(err) + + if self._keep_tempfiles != KeepTempfiles.ALL: + self.logger.debug('Removing temporary file {!r} ...'.format(path)) + try: + os.remove(path) + except OSError as err: + RequestProcessorError(err) + + def _generate_from_str(self, lst): + for line in lst: + yield line + class WFCatalogRequestProcessor(RequestProcessor): """ diff --git a/eidangservices/federator/server/task.py b/eidangservices/federator/server/task.py index c0fdb64..587baed 100644 --- a/eidangservices/federator/server/task.py +++ b/eidangservices/federator/server/task.py @@ -30,6 +30,7 @@ class ETask(enum.Enum): DOWNLOAD = 0 COMBINER = 1 SPLITALIGN = 2 + DOWNLOAD_INMEM = 3 # ----------------------------------------------------------------------------- @@ -1037,3 +1038,106 @@ def __call__(self): return Result.ok(data=self.path_tempfile, length=self._size, extras={'type_task': self._TYPE}) + + +# ----------------------------------------------------------------------------- +class InMemoryTask(TaskBase): + """ + Base class for downloading tasks returning in-memory results. + """ + + LOGGER = 'flask.app.federator.task_in_memory' + + _TYPE = ETask.DOWNLOAD_INMEM + + def __init__(self, request_handler, **kwargs): + """ + :param request_handler: Request handler to be used for downloading + :type request_handler: :py:class:`RequestHandlerBase` + """ + + super().__init__(self.LOGGER, **kwargs) + + self._request_handler = request_handler + self._http_get = kwargs.get('http_get', False) + + self._size = 0 + + @catch_default_task_exception + @with_ctx_guard + def __call__(self): + return self._run() + + def _run(self): + """ + Template method to be implemented by concrete implementations of + in-memory tasks. + """ + raise NotImplementedError + + def _handle_error(self, err): + try: + resp = err.response + try: + data = err.response.text + except AttributeError: + return Result.error('RequestsError', + status_code=503, + warning=type(err), + data=str(err), + extras={'type_task': self._TYPE, + 'req_handler': + self._request_handler}) + + except Exception as err: + return Result.error('InternalServerError', + status_code=500, + warning='Unhandled exception.', + data=str(err), + extras={'type_task': self._TYPE, + 'req_handler': self._request_handler}) + else: + if resp.status_code == 413: + data = self._request_handler + + return Result.error(status='EndpointError', + status_code=resp.status_code, + warning=str(err), data=data, + extras={'type_task': self._TYPE, + 'req_handler': self._request_handler}) + + +class StationTextInMemoryDownloadTask(InMemoryTask): + """ + Download task performing a :code:`fdsnws-station` download and returns the + result in-memory (i.e. without using intermediate data persistance). + """ + + def _run(self): + req = (self._request_handler.get() + if self._http_method == 'GET' else self._request_handler.post()) + + self.logger.debug( + 'Downloading (url={}, stream_epochs={}, http_method={!r}) ...'. + format(self._request_handler.url, + self._request_handler.stream_epochs, + self._http_method)) + try: + lines = [] + with binary_request(req, logger=self.logger) as ifd: + for line in ifd: + self._size += len(line) + if line.startswith(b'#'): + continue + lines.append(line.decode('utf-8').strip() + '\n') + + except RequestsError as err: + return self._handle_error(err) + else: + self.logger.debug( + 'Download (url={}, stream_epochs={}) finished.'.format( + self._request_handler.url, + self._request_handler.stream_epochs)) + + return Result.ok(data=''.join(lines), length=self._size, + extras={'type_task': self._TYPE})