55when the module is run with `python -m MODULE_NAME`.
66"""
77from datetime import datetime , date , timedelta
8- from itertools import product
98from os .path import join
9+ from os import remove , listdir
10+ from shutil import copy
1011
1112import numpy as np
1213import pandas as pd
13- from delphi_utils import read_params
14+ from delphi_utils import read_params , S3ArchiveDiffer
1415
1516from .pull import pull_nchs_mortality_data
1617from .export import export_csv
1718
1819# global constants
1920METRICS = [
20- 'covid_deaths' , 'total_deaths' , 'pneumonia_deaths ' ,
21- 'pneumonia_and_covid_deaths' , 'influenza_deaths' ,
21+ 'covid_deaths' , 'total_deaths' , 'percent_of_expected_deaths ' ,
22+ 'pneumonia_deaths' , ' pneumonia_and_covid_deaths' , 'influenza_deaths' ,
2223 'pneumonia_influenza_or_covid_19_deaths'
2324]
2425SENSORS = [
@@ -37,27 +38,109 @@ def run_module():
3738 days = date .today ().weekday () + 2 )
3839 export_start_date = export_start_date .strftime ('%Y-%m-%d' )
3940 export_dir = params ["export_dir" ]
41+ daily_export_dir = params ["daily_export_dir" ]
42+ cache_dir = params ["cache_dir" ]
43+ daily_cache_dir = params ["daily_cache_dir" ]
4044 static_file_dir = params ["static_file_dir" ]
4145 token = params ["token" ]
46+ test_mode = params ["mode" ]
47+
48+ daily_arch_diff = S3ArchiveDiffer (
49+ daily_cache_dir , daily_export_dir ,
50+ params ["bucket_name" ], "nchs_mortality" ,
51+ params ["aws_credentials" ])
52+ daily_arch_diff .update_cache ()
4253
4354 map_df = pd .read_csv (
4455 join (static_file_dir , "state_pop.csv" ), dtype = {"fips" : int }
4556 )
4657
47- df = pull_nchs_mortality_data (token , map_df )
48- for metric , sensor in product ( METRICS , SENSORS ) :
49- print ( metric , sensor )
50- if sensor == "num" :
58+ df = pull_nchs_mortality_data (token , map_df , test_mode )
59+ for metric in METRICS :
60+ if metric == 'percent_of_expected_deaths' :
61+ print ( metric )
5162 df ["val" ] = df [metric ]
63+ df ["se" ] = np .nan
64+ df ["sample_size" ] = np .nan
65+ sensor_name = "_" .join (["wip" , metric ])
66+ export_csv (
67+ df ,
68+ geo_name = geo_res ,
69+ export_dir = daily_export_dir ,
70+ start_date = datetime .strptime (export_start_date , "%Y-%m-%d" ),
71+ sensor = sensor_name ,
72+ )
5273 else :
53- df ["val" ] = df [metric ] / df ["population" ] * INCIDENCE_BASE
54- df ["se" ] = np .nan
55- df ["sample_size" ] = np .nan
56- sensor_name = "_" .join (["wip" , metric , sensor ])
57- export_csv (
58- df ,
59- geo_name = geo_res ,
60- export_dir = export_dir ,
61- start_date = datetime .strptime (export_start_date , "%Y-%m-%d" ),
62- sensor = sensor_name ,
63- )
74+ for sensor in SENSORS :
75+ print (metric , sensor )
76+ if sensor == "num" :
77+ df ["val" ] = df [metric ]
78+ else :
79+ df ["val" ] = df [metric ] / df ["population" ] * INCIDENCE_BASE
80+ df ["se" ] = np .nan
81+ df ["sample_size" ] = np .nan
82+ sensor_name = "_" .join (["wip" , metric , sensor ])
83+ export_csv (
84+ df ,
85+ geo_name = geo_res ,
86+ export_dir = daily_export_dir ,
87+ start_date = datetime .strptime (export_start_date , "%Y-%m-%d" ),
88+ sensor = sensor_name ,
89+ )
90+
91+ # Weekly run of archive utility on Monday
92+ # - Does not upload to S3, that is handled by daily run of archive utility
93+ # - Exports issues into receiving for the API
94+ if datetime .today ().weekday () == 0 :
95+ # Copy todays raw output to receiving
96+ for output_file in listdir (daily_export_dir ):
97+ copy (
98+ join (daily_export_dir , output_file ),
99+ join (export_dir , output_file ))
100+
101+ weekly_arch_diff = S3ArchiveDiffer (
102+ cache_dir , export_dir ,
103+ params ["bucket_name" ], "nchs_mortality" ,
104+ params ["aws_credentials" ])
105+
106+ # Dont update cache from S3 (has daily files), only simulate a update_cache() call
107+ weekly_arch_diff ._cache_updated = True
108+
109+ # Diff exports, and make incremental versions
110+ _ , common_diffs , new_files = weekly_arch_diff .diff_exports ()
111+
112+ # Archive changed and new files only
113+ to_archive = [f for f , diff in common_diffs .items () if diff is not None ]
114+ to_archive += new_files
115+ _ , fails = weekly_arch_diff .archive_exports (to_archive , update_s3 = False )
116+
117+ # Filter existing exports to exclude those that failed to archive
118+ succ_common_diffs = {f : diff for f , diff in common_diffs .items () if f not in fails }
119+ weekly_arch_diff .filter_exports (succ_common_diffs )
120+
121+ # Report failures: someone should probably look at them
122+ for exported_file in fails :
123+ print (f"Failed to archive (weekly) '{ exported_file } '" )
124+
125+ # Daily run of archiving utility
126+ # - Uploads changed files to S3
127+ # - Does not export any issues into receiving
128+
129+ # Diff exports, and make incremental versions
130+ _ , common_diffs , new_files = daily_arch_diff .diff_exports ()
131+
132+ # Archive changed and new files only
133+ to_archive = [f for f , diff in common_diffs .items () if diff is not None ]
134+ to_archive += new_files
135+ _ , fails = daily_arch_diff .archive_exports (to_archive )
136+
137+ # Daily output not needed anymore, remove them
138+ for exported_file in new_files :
139+ remove (exported_file )
140+ for exported_file , diff_file in common_diffs .items ():
141+ remove (exported_file )
142+ remove (diff_file )
143+
144+ # Report failures: someone should probably look at them
145+ for exported_file in fails :
146+ print (f"Failed to archive (daily) '{ exported_file } '" )
0 commit comments