Skip to content

Commit f193f53

Browse files
committed
Have pandas upsert to db and clean up archive code
1 parent fe195f7 commit f193f53

File tree

2 files changed

+67
-9
lines changed

2 files changed

+67
-9
lines changed

integrations/acquisition/covidcast_nowcast/test_csv_uploading.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@ def test_uploading(self):
7272
success_dir = '/common/covidcast_nowcast/archive/successful/src/'
7373
failed_dir = '/common/covidcast_nowcast/archive/failed/src/'
7474
os.makedirs(receiving_dir, exist_ok=True)
75-
os.makedirs(success_dir, exist_ok=True)
76-
os.makedirs(failed_dir, exist_ok=True)
7775

7876
# valid
7977
with open(receiving_dir + '20200419_state_sig.csv', 'w') as f:
@@ -118,3 +116,39 @@ def test_uploading(self):
118116
}],
119117
'message': 'success',
120118
})
119+
120+
@patch('delphi.epidata.acquisition.covidcast_nowcast.load_sensors.CsvImporter.find_csv_files',
121+
new=FIXED_ISSUE_IMPORTER)
122+
def test_duplicate_row(self):
123+
"""Test duplicate unique keys are updated."""
124+
125+
# print full diff if something unexpected comes out
126+
self.maxDiff=None
127+
128+
receiving_dir = '/common/covidcast_nowcast/receiving/src/'
129+
os.makedirs(receiving_dir, exist_ok=True)
130+
131+
with open(receiving_dir + '20200419_state_sig.csv', 'w') as f:
132+
f.write('sensor_name,geo_value,value\n')
133+
f.write('testsensor,ca,1\n')
134+
main()
135+
with open(receiving_dir + '20200419_state_sig.csv', 'w') as f:
136+
f.write('sensor_name,geo_value,value\n')
137+
f.write('testsensor,ca,2\n')
138+
main()
139+
140+
# most most recent value is the one stored
141+
response = Epidata.covidcast_nowcast(
142+
'src', 'sig', 'testsensor', 'day', 'state', 20200419, 'ca')
143+
self.assertEqual(response, {
144+
'result': 1,
145+
'epidata': [{
146+
'time_value': 20200419,
147+
'geo_value': 'ca',
148+
'value': 2,
149+
'issue': 20200421,
150+
'lag': 2,
151+
'signal': 'sig',
152+
}],
153+
'message': 'success',
154+
})

src/acquisition/covidcast_nowcast/load_sensors.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from shutil import move
2+
import os
23
import time
34

45
import delphi.operations.secrets as secrets
@@ -7,6 +8,8 @@
78
from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter
89

910
SENSOR_CSV_PATH = "/common/covidcast_nowcast/receiving/"
11+
SUCCESS_DIR = "archive/successful"
12+
FAIL_DIR = "archive/failed"
1013
TABLE_NAME = "covidcast_nowcast"
1114
DB_NAME = "epidata"
1215
CSV_DTYPES = {"sensor_name": str, "geo_value": str, "value": float}
@@ -33,17 +36,20 @@ def main(csv_path: str = SENSOR_CSV_PATH) -> None:
3336
"""
3437
user, pw = secrets.db.epi
3538
engine = sqlalchemy.create_engine(f"mysql+pymysql://{user}:{pw}@{secrets.db.host}/{DB_NAME}")
36-
for filepath, attributes in CsvImporter.find_csv_files(csv_path):
37-
if attributes is None:
38-
move(filepath, filepath.replace("receiving", "archive/failed"))
39+
for filepath, attr in CsvImporter.find_csv_files(csv_path):
40+
if attr is None:
41+
_move_after_processing(filepath, success=False)
3942
continue
4043
try:
41-
data = load_and_prepare_file(filepath, attributes)
42-
data.to_sql(TABLE_NAME, engine, if_exists="append", index=False)
44+
data = load_and_prepare_file(filepath, attr)
45+
conn = engine.connect()
46+
with conn.begin():
47+
method = _create_upsert_method(sqlalchemy.MetaData(conn))
48+
data.to_sql(TABLE_NAME, engine, if_exists="append", method=method, index=False)
4349
except Exception:
44-
move(filepath, filepath.replace("receiving", "archive/failed"))
50+
_move_after_processing(filepath, success=False)
4551
raise
46-
move(filepath, filepath.replace("receiving", "archive/successful"))
52+
_move_after_processing(filepath, success=True)
4753

4854

4955
def load_and_prepare_file(filepath: str, attributes: tuple) -> pd.DataFrame:
@@ -75,5 +81,23 @@ def load_and_prepare_file(filepath: str, attributes: tuple) -> pd.DataFrame:
7581
return data
7682

7783

84+
def _move_after_processing(filepath, success):
85+
archive_dir = SUCCESS_DIR if success else FAIL_DIR
86+
new_dir = os.path.dirname(filepath).replace(
87+
"receiving", archive_dir)
88+
os.makedirs(new_dir, exist_ok=True)
89+
move(filepath, filepath.replace("receiving", archive_dir))
90+
print(f"{filepath} moved to {archive_dir}")
91+
92+
93+
def _create_upsert_method(meta):
94+
def method(table, conn, keys, data_iter):
95+
sql_table = sqlalchemy.Table(table.name, meta, autoload=True)
96+
insert_stmt = sqlalchemy.dialects.mysql.insert(sql_table).values([dict(zip(keys, data)) for data in data_iter])
97+
upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted})
98+
conn.execute(upsert_stmt)
99+
return method
100+
101+
78102
if __name__ == "__main__":
79103
main()

0 commit comments

Comments
 (0)