-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
54 lines (40 loc) · 1.31 KB
/
utils.py
File metadata and controls
54 lines (40 loc) · 1.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import asyncio
import json
import os
from collections import defaultdict, namedtuple
from os.path import exists, isdir, join, dirname
from pathlib import Path
from app_state import state
from loguru import logger
def makedirs(*parts):
absdir = join(*parts)
try:
os.makedirs(absdir)
except os.error:
if not isdir(absdir):
raise
return absdir
_stations = defaultdict(dict)
def stations():
global _stations
if not _stations:
for x in json.load(open(Path(__file__).parent / 'stations.json')):
_stations[x.get('region_number')][x.get('station_number')] = x
return _stations
async def as_completed(tasks):
n = 0
pending = list(tasks)
while pending:
done, pending = await asyncio.wait(pending, timeout=10, return_when=asyncio.FIRST_COMPLETED)
#logger.info(f'{len(done)} done, {len(pending)} pending')
for x in done:
x.exception() # Consume exception to prevent loging flood
for x in done:
n += 1
yield n, x
def sigint_handler(*a):
logger.warning('Got SIGINT. Stopping event loop.')
# This flag is checked in aiopool
# See https://github.com/MagicStack/uvloop/issues/243
state._stopping = True
asyncio.get_running_loop().stop()