Skip to content
This repository was archived by the owner on Apr 9, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions eidangservices/federator/server/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
GranularRequestStrategy, NetworkBulkRequestStrategy,
NetworkCombiningRequestStrategy, AdaptiveNetworkBulkRequestStrategy)
from eidangservices.federator.server.task import (
RawDownloadTask, RawSplitAndAlignTask, StationTextDownloadTask,
ETask, RawDownloadTask, RawSplitAndAlignTask, StationTextDownloadTask,
StationXMLDownloadTask, StationXMLNetworkCombinerTask,
WFCatalogSplitAndAlignTask)
StationTextInMemoryDownloadTask, WFCatalogSplitAndAlignTask)
from eidangservices.utils.error import ErrorWithTraceback
from eidangservices.utils.httperrors import FDSNHTTPError

Expand Down Expand Up @@ -704,7 +704,7 @@ def _request(self):
self._pool = mp.pool.ThreadPool(processes=pool_size)

self._results = self._strategy.request(
self._pool, tasks={'default': StationTextDownloadTask},
self._pool, tasks={'default': StationTextInMemoryDownloadTask},
query_params=self.query_params,
keep_tempfiles=self._keep_tempfiles,
http_method=self._http_method)
Expand Down Expand Up @@ -733,25 +733,21 @@ def __iter__(self):
yield '{}\n'.format(self.HEADER_CHANNEL)

self._sizes.append(_result.length)
self.logger.debug(
'Streaming from file {!r}.'.format(
_result.data))
try:
with open(_result.data, 'r',
encoding='utf-8') as fd:
for line in fd:
yield line
except Exception as err:
type_task = _result.extras['type_task']
except KeyError as err:
raise StreamingError(err)

if self._keep_tempfiles != KeepTempfiles.ALL:
self.logger.debug(
'Removing temporary file {!r} ...'.format(
_result.data))
try:
os.remove(_result.data)
except OSError as err:
RequestProcessorError(err)
if type_task == ETask.DOWNLOAD:
# return data from a temporary file
yield from self._generate_from_file(
_result.data)
elif type_task == ETask.DOWNLOAD_INMEM:
yield from self._generate_from_str(
_result.data)
else:
raise StreamingError(
'Invalid task type: {}'.format(type_task))

elif _result.status_code == 413:
self._handle_413(_result)
Expand Down Expand Up @@ -780,6 +776,29 @@ def __iter__(self):
self.logger.debug('GeneratorExit: Terminate ...')
self._terminate()

def _generate_from_file(self, path):
"""
Generator returning data line-by-line from a temporary file.
"""
self.logger.debug('Streaming from file {!r}.'.format(path))
try:
with open(path, 'r', encoding='utf-8') as fd:
for line in fd:
yield line
except Exception as err:
raise StreamingError(err)

if self._keep_tempfiles != KeepTempfiles.ALL:
self.logger.debug('Removing temporary file {!r} ...'.format(path))
try:
os.remove(path)
except OSError as err:
RequestProcessorError(err)

def _generate_from_str(self, lst):
for line in lst:
yield line


class WFCatalogRequestProcessor(RequestProcessor):
"""
Expand Down
104 changes: 104 additions & 0 deletions eidangservices/federator/server/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ETask(enum.Enum):
DOWNLOAD = 0
COMBINER = 1
SPLITALIGN = 2
DOWNLOAD_INMEM = 3


# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1037,3 +1038,106 @@ def __call__(self):

return Result.ok(data=self.path_tempfile, length=self._size,
extras={'type_task': self._TYPE})


# -----------------------------------------------------------------------------
class InMemoryTask(TaskBase):
"""
Base class for downloading tasks returning in-memory results.
"""

LOGGER = 'flask.app.federator.task_in_memory'

_TYPE = ETask.DOWNLOAD_INMEM

def __init__(self, request_handler, **kwargs):
"""
:param request_handler: Request handler to be used for downloading
:type request_handler: :py:class:`RequestHandlerBase`
"""

super().__init__(self.LOGGER, **kwargs)

self._request_handler = request_handler
self._http_get = kwargs.get('http_get', False)

self._size = 0

@catch_default_task_exception
@with_ctx_guard
def __call__(self):
return self._run()

def _run(self):
"""
Template method to be implemented by concrete implementations of
in-memory tasks.
"""
raise NotImplementedError

def _handle_error(self, err):
try:
resp = err.response
try:
data = err.response.text
except AttributeError:
return Result.error('RequestsError',
status_code=503,
warning=type(err),
data=str(err),
extras={'type_task': self._TYPE,
'req_handler':
self._request_handler})

except Exception as err:
return Result.error('InternalServerError',
status_code=500,
warning='Unhandled exception.',
data=str(err),
extras={'type_task': self._TYPE,
'req_handler': self._request_handler})
else:
if resp.status_code == 413:
data = self._request_handler

return Result.error(status='EndpointError',
status_code=resp.status_code,
warning=str(err), data=data,
extras={'type_task': self._TYPE,
'req_handler': self._request_handler})


class StationTextInMemoryDownloadTask(InMemoryTask):
"""
Download task performing a :code:`fdsnws-station` download and returns the
result in-memory (i.e. without using intermediate data persistance).
"""

def _run(self):
req = (self._request_handler.get()
if self._http_method == 'GET' else self._request_handler.post())

self.logger.debug(
'Downloading (url={}, stream_epochs={}, http_method={!r}) ...'.
format(self._request_handler.url,
self._request_handler.stream_epochs,
self._http_method))
try:
lines = []
with binary_request(req, logger=self.logger) as ifd:
for line in ifd:
self._size += len(line)
if line.startswith(b'#'):
continue
lines.append(line.decode('utf-8').strip() + '\n')

except RequestsError as err:
return self._handle_error(err)
else:
self.logger.debug(
'Download (url={}, stream_epochs={}) finished.'.format(
self._request_handler.url,
self._request_handler.stream_epochs))

return Result.ok(data=''.join(lines), length=self._size,
extras={'type_task': self._TYPE})