2020from .weekday import Weekday
2121
2222
23- def write_to_csv (df , geo_level , write_se , day_shift , out_name , output_path = "." , start_date = None , end_date = None ):
23+ def write_to_csv (df , geo_level , write_se , day_shift , out_name , logger , output_path = "." , start_date = None , end_date = None ):
2424 """Write sensor values to csv.
2525
2626 Args:
@@ -43,15 +43,15 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
4343 assert df [suspicious_se_mask ].empty , " se contains suspiciously large values"
4444 assert not df ["se" ].isna ().any (), " se contains nan values"
4545 if write_se :
46- logging .info ("========= WARNING: WRITING SEs TO {0} =========" .format (out_name ))
46+ logger .info ("========= WARNING: WRITING SEs TO {0} =========" .format (out_name ))
4747 else :
4848 df ["se" ] = np .nan
4949
5050 assert not df ["val" ].isna ().any (), " val contains nan values"
5151 suspicious_val_mask = df ["val" ].gt (90 )
5252 if not df [suspicious_val_mask ].empty :
5353 for geo in df .loc [suspicious_val_mask , "geo_id" ]:
54- logging .warning ("value suspiciously high, {0}: {1}" .format (
54+ logger .warning ("value suspiciously high, {0}: {1}" .format (
5555 geo , out_name
5656 ))
5757
@@ -64,10 +64,10 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
6464 sensor = out_name ,
6565 write_empty_days = True
6666 )
67- logging .debug ("wrote {0} rows for {1} {2}" .format (
67+ logger .debug ("wrote {0} rows for {1} {2}" .format (
6868 df .size , df ["geo_id" ].unique ().size , geo_level
6969 ))
70- logging .debug ("wrote files to {0}" .format (output_path ))
70+ logger .debug ("wrote files to {0}" .format (output_path ))
7171 return dates
7272
7373
@@ -83,7 +83,8 @@ def __init__(self,
8383 weekday ,
8484 numtype ,
8585 se ,
86- wip_signal ):
86+ wip_signal ,
87+ logger ):
8788 """Init Sensor Updater.
8889
8990 Args:
@@ -96,7 +97,9 @@ def __init__(self,
9697 numtype: type of count data used, one of ["covid", "cli"]
9798 se: boolean to write out standard errors, if true, use an obfuscated name
9899 wip_signal: Prefix for WIP signals
100+ logger: the structured logger
99101 """
102+ self .logger = logger
100103 self .startdate , self .enddate , self .dropdate = [
101104 pd .to_datetime (t ) for t in (startdate , enddate , dropdate )]
102105 # handle dates
@@ -145,7 +148,7 @@ def geo_reindex(self, data):
145148 geo = self .geo
146149 gmpr = GeoMapper ()
147150 if geo not in {"county" , "state" , "msa" , "hrr" , "nation" , "hhs" }:
148- logging .error ("{0} is invalid, pick one of 'county', "
151+ self . logger .error ("{0} is invalid, pick one of 'county', "
149152 "'state', 'msa', 'hrr', 'hss','nation'" .format (geo ))
150153 return False
151154 if geo == "county" :
@@ -197,12 +200,12 @@ def update_sensor(self,
197200 sub_data .reset_index (level = 0 ,inplace = True )
198201 if self .weekday :
199202 sub_data = Weekday .calc_adjustment (wd_params , sub_data )
200- res = CHCSensor .fit (sub_data , self .burnindate , geo_id )
203+ res = CHCSensor .fit (sub_data , self .burnindate , geo_id , self . logger )
201204 res = pd .DataFrame (res ).loc [final_sensor_idxs ]
202205 dfs .append (res )
203206 else :
204207 n_cpu = min (10 , cpu_count ())
205- logging .debug ("starting pool with {0} workers" .format (n_cpu ))
208+ self . logger .debug ("starting pool with {0} workers" .format (n_cpu ))
206209 with Pool (n_cpu ) as pool :
207210 pool_results = []
208211 for geo_id , sub_data in data_frame .groupby (level = 0 ,as_index = False ):
@@ -211,7 +214,7 @@ def update_sensor(self,
211214 sub_data = Weekday .calc_adjustment (wd_params , sub_data )
212215 pool_results .append (
213216 pool .apply_async (
214- CHCSensor .fit , args = (sub_data , self .burnindate , geo_id ,),
217+ CHCSensor .fit , args = (sub_data , self .burnindate , geo_id , self . logger ),
215218 )
216219 )
217220 pool_results = [proc .get () for proc in pool_results ]
0 commit comments