Skip to content

Commit 589d466

Browse files
committed
first stab at cleaning up load/parse with support for local backup
1 parent 6ce938c commit 589d466

File tree

3 files changed

+98
-28
lines changed

3 files changed

+98
-28
lines changed

src/pyff/api.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import threading
33
from datetime import datetime, timedelta
44
from json import dumps
5-
from typing import Any, List, Mapping
5+
from typing import Any, List, Mapping, Iterator
66

77
import pkg_resources
88
import pyramid.httpexceptions as exc
@@ -389,19 +389,21 @@ def resources_handler(request):
389389
:return: a JSON representation of the set of resources currently loaded by the server
390390
"""
391391

392+
def _infos(resources: Iterator[Resource]) -> List[Mapping[str, Any]]:
393+
return list(filter(lambda i: 'State' in i and i['State'] is not None, [_info(r) for r in resources]))
394+
392395
def _info(r: Resource) -> List[Mapping[str, Any]]:
393396
nfo = r.info
394397
nfo['Valid'] = r.is_valid()
395398
nfo['Parser'] = r.last_parser
396399
if r.last_seen is not None:
397400
nfo['Last Seen'] = r.last_seen
398401
if len(r.children) > 0:
399-
nfo['Children'] = [_info(cr) for cr in r.children]
402+
nfo['Children'] = _infos(r.children)
400403

401404
return nfo
402405

403-
_resources = [_info(r) for r in request.registry.md.rm.children]
404-
response = Response(dumps(_resources, default=json_serializer))
406+
response = Response(dumps(_infos(request.registry.md.rm.children), default=json_serializer))
405407
response.headers['Content-Type'] = 'application/json'
406408

407409
return response

src/pyff/constants.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def __init__(self, *args, **kwargs):
185185

186186
def __get__(self, instance, owner):
187187
if len(self.args) > 0:
188-
return self.args[0].__get__(self)
188+
return self.args[0].__get__(instance, owner)
189189
else:
190190
return None
191191

@@ -454,6 +454,13 @@ class Config(object):
454454

455455
logger = S('logger', typeconv=as_string, info="python logger config - overides all other logging directives")
456456

457+
local_copy_dir = S(
458+
'local_copy_dir',
459+
typeconv=as_string,
460+
info="the directory where local backup copies of metadata is stored",
461+
default="/var/run/pyff/backup",
462+
)
463+
457464
@property
458465
def base_url(self):
459466
if self.public_url:

src/pyff/resource.py

Lines changed: 84 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from copy import deepcopy
1111
from datetime import datetime
1212
from threading import Condition, Lock
13-
from typing import Optional
13+
from typing import Optional, Dict
1414

1515
import requests
1616

@@ -19,7 +19,19 @@
1919
from .fetch import make_fetcher
2020
from .logs import get_log
2121
from .parse import parse_resource
22-
from .utils import Watchable, hex_digest, img_to_data, non_blocking_lock, url_get, utc_now
22+
23+
from .utils import (
24+
Watchable,
25+
hex_digest,
26+
img_to_data,
27+
non_blocking_lock,
28+
url_get,
29+
utc_now,
30+
resource_string,
31+
resource_filename,
32+
safe_write,
33+
hash_id,
34+
)
2335

2436
requests.packages.urllib3.disable_warnings()
2537

@@ -169,6 +181,10 @@ def __getstate__(self):
169181
def __setstate__(self, state):
170182
raise ValueError("this object should not be unpickled")
171183

184+
@property
185+
def local_copy_fn(self):
186+
return os.path.join(config.local_copy_dir, hash_id(self.url, 'sha256', False))
187+
172188
@property
173189
def post(self):
174190
return self.opts['via']
@@ -231,8 +247,12 @@ def is_expired(self) -> bool:
231247
def is_valid(self) -> bool:
232248
return not self.is_expired() and self.last_seen is not None and self.last_parser is not None
233249

234-
def add_info(self, info):
250+
def add_info(self) -> Dict:
251+
info = dict()
252+
info['State'] = None
253+
info['Resource'] = self.url
235254
self._infos.append(info)
255+
return info
236256

237257
def _replace(self, r):
238258
for i in range(0, len(self.children)):
@@ -275,35 +295,76 @@ def errors(self):
275295
else:
276296
return []
277297

278-
def parse(self, getter):
279-
info = dict()
280-
info['Resource'] = self.url
281-
self.add_info(info)
282-
data = None
283-
log.debug("getting {}".format(self.url))
298+
def load_backup(self, r):
299+
try:
300+
return resource_string(self.local_copy_fn)
301+
log.warn("Got status={:d} while getting {}. Fallback to local copy.".format(r.status_code, self.url))
302+
except IOError as ex:
303+
log.warn(
304+
"Caught an exception trying to load local backup for {} via {}: {}".format(
305+
r.url, self.local_copy_fn, ex
306+
)
307+
)
308+
return None
284309

285-
r = getter(self.url)
310+
def load_resource(self, getter):
311+
info = self.add_info()
312+
data: Optional[str] = None
313+
status: Optional[int] = None
286314

287-
info['HTTP Response Headers'] = r.headers
288-
log.debug(
289-
"got status_code={:d}, encoding={} from_cache={} from {}".format(
290-
r.status_code, r.encoding, getattr(r, "from_cache", False), self.url
291-
)
292-
)
293-
info['Status Code'] = str(r.status_code)
294-
info['Reason'] = r.reason
315+
log.debug("Loading resource {}".format(self.url))
295316

296-
if r.ok:
297-
data = r.text
298-
else:
317+
try:
318+
r = getter(self.url)
319+
320+
info['HTTP Response Headers'] = r.headers
321+
log.debug(
322+
"got status_code={:d}, encoding={} from_cache={} from {}".format(
323+
r.status_code, r.encoding, getattr(r, "from_cache", False), self.url
324+
)
325+
)
326+
status = r.status_code
327+
info['Reason'] = r.reason
328+
329+
if r.ok:
330+
data = r.text
331+
self.etag = r.headers.get('ETag', None) or hex_digest(r.text, 'sha256')
332+
elif self.local_copy_fn is not None:
333+
data = self.load_backup(r)
334+
if data is not None and len(data) > 0:
335+
info['Reason'] = "Retrieved from local cache because status: {} != 200".format(status)
336+
status = 218
337+
338+
info['Status Code'] = str(status)
339+
340+
except IOError as ex:
341+
log.warn("caught exception from {}: {}".format(self.url, ex))
342+
if self.local_copy_fn is not None:
343+
data = self.load_backup(r)
344+
if data is not None and len(data) > 0:
345+
info['Reason'] = "Retrieved from local cache because exception: {}".format(ex)
346+
status = 218
347+
348+
if data is None or not len(data) > 0:
299349
raise ResourceException("Got status={:d} while getting {}".format(r.status_code, self.url))
300350

351+
if status == 200:
352+
self.last_seen = utc_now().replace(microsecond=0)
353+
safe_write(self.local_copy_fn, data, True)
354+
355+
info['State'] = 'Fetched'
356+
357+
return data, info
358+
359+
def parse(self, getter):
360+
data, info = self.load_resource(getter)
361+
info['State'] = 'Parsing'
301362
parse_info = parse_resource(self, data)
302363
if parse_info is not None and isinstance(parse_info, dict):
303364
info.update(parse_info)
304365

366+
info['State'] = 'Parsed'
305367
if self.t is not None:
306-
self.last_seen = utc_now().replace(microsecond=0)
307368
if self.post and isinstance(self.post, list):
308369
for cb in self.post:
309370
if self.t is not None:
@@ -318,6 +379,6 @@ def parse(self, getter):
318379
for (eid, error) in list(info['Validation Errors'].items()):
319380
log.error(error)
320381

321-
self.etag = r.headers.get('ETag', None) or hex_digest(r.text, 'sha256')
382+
info['State'] = 'Ready'
322383

323384
return self.children

0 commit comments

Comments
 (0)