11from shutil import move
2+ import os
23import time
34
45import delphi .operations .secrets as secrets
78from delphi .epidata .acquisition .covidcast .csv_importer import CsvImporter
89
910SENSOR_CSV_PATH = "/common/covidcast_nowcast/receiving/"
11+ SUCCESS_DIR = "archive/successful"
12+ FAIL_DIR = "archive/failed"
1013TABLE_NAME = "covidcast_nowcast"
1114DB_NAME = "epidata"
1215CSV_DTYPES = {"sensor_name" : str , "geo_value" : str , "value" : float }
@@ -33,17 +36,20 @@ def main(csv_path: str = SENSOR_CSV_PATH) -> None:
3336 """
3437 user , pw = secrets .db .epi
3538 engine = sqlalchemy .create_engine (f"mysql+pymysql://{ user } :{ pw } @{ secrets .db .host } /{ DB_NAME } " )
36- for filepath , attributes in CsvImporter .find_csv_files (csv_path ):
37- if attributes is None :
38- move (filepath , filepath . replace ( "receiving" , "archive/failed" ) )
39+ for filepath , attribute in CsvImporter .find_csv_files (csv_path ):
40+ if attribute is None :
41+ _move_after_processing (filepath , success = False )
3942 continue
4043 try :
41- data = load_and_prepare_file (filepath , attributes )
42- data .to_sql (TABLE_NAME , engine , if_exists = "append" , index = False )
44+ data = load_and_prepare_file (filepath , attribute )
45+ conn = engine .connect ()
46+ with conn .begin ():
47+ method = _create_upsert_method (sqlalchemy .MetaData (conn ))
48+ data .to_sql (TABLE_NAME , engine , if_exists = "append" , method = method , index = False )
4349 except Exception :
44- move (filepath , filepath . replace ( "receiving" , "archive/failed" ) )
50+ _move_after_processing (filepath , success = False )
4551 raise
46- move (filepath , filepath . replace ( "receiving" , "archive/successful" ) )
52+ _move_after_processing (filepath , success = True )
4753
4854
4955def load_and_prepare_file (filepath : str , attributes : tuple ) -> pd .DataFrame :
@@ -75,5 +81,23 @@ def load_and_prepare_file(filepath: str, attributes: tuple) -> pd.DataFrame:
7581 return data
7682
7783
84+ def _move_after_processing (filepath , success ):
85+ archive_dir = SUCCESS_DIR if success else FAIL_DIR
86+ new_dir = os .path .dirname (filepath ).replace (
87+ "receiving" , archive_dir )
88+ os .makedirs (new_dir , exist_ok = True )
89+ move (filepath , filepath .replace ("receiving" , archive_dir ))
90+ print (f"{ filepath } moved to { archive_dir } " )
91+
92+
93+ def _create_upsert_method (meta ):
94+ def method (table , conn , keys , data_iter ):
95+ sql_table = sqlalchemy .Table (table .name , meta , autoload = True )
96+ insert_stmt = sqlalchemy .dialects .mysql .insert (sql_table ).values ([dict (zip (keys , data )) for data in data_iter ])
97+ upsert_stmt = insert_stmt .on_duplicate_key_update ({x .name : x for x in insert_stmt .inserted })
98+ conn .execute (upsert_stmt )
99+ return method
100+
101+
78102if __name__ == "__main__" :
79103 main ()
0 commit comments