2020from .weekday import Weekday
2121
2222
23- def write_to_csv (df , geo_level , write_se , day_shift , out_name , output_path = "." , start_date = None , end_date = None ):
23+ def write_to_csv (df , geo_level , write_se , day_shift , out_name , logger , output_path = "." , start_date = None , end_date = None ):
2424 """Write sensor values to csv.
2525
2626 Args:
@@ -47,15 +47,15 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
4747 assert df [suspicious_se_mask ].empty , " se contains suspiciously large values"
4848 assert not df ["se" ].isna ().any (), " se contains nan values"
4949 if write_se :
50- logging .info ("========= WARNING: WRITING SEs TO {0} =========" .format (out_name ))
50+ logger .info ("========= WARNING: WRITING SEs TO {0} =========" .format (out_name ))
5151 else :
5252 df .loc [:, "se" ] = np .nan
5353
5454 assert not df ["val" ].isna ().any (), " val contains nan values"
5555 suspicious_val_mask = df ["val" ].gt (90 )
5656 if not df [suspicious_val_mask ].empty :
5757 for geo in df .loc [suspicious_val_mask , "geo_id" ]:
58- logging .warning ("value suspiciously high, {0}: {1}" .format (
58+ logger .warning ("value suspiciously high, {0}: {1}" .format (
5959 geo , out_name
6060 ))
6161
@@ -68,10 +68,10 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
6868 sensor = out_name ,
6969 write_empty_days = True
7070 )
71- logging .debug ("wrote {0} rows for {1} {2}" .format (
71+ logger .debug ("wrote {0} rows for {1} {2}" .format (
7272 df .size , df ["geo_id" ].unique ().size , geo_level
7373 ))
74- logging .debug ("wrote files to {0}" .format (output_path ))
74+ logger .debug ("wrote files to {0}" .format (output_path ))
7575 return dates
7676
7777
@@ -87,7 +87,8 @@ def __init__(self,
8787 weekday ,
8888 numtype ,
8989 se ,
90- wip_signal ):
90+ wip_signal ,
91+ logger ):
9192 """Init Sensor Updator.
9293
9394 Args:
@@ -100,7 +101,9 @@ def __init__(self,
100101 numtype: type of count data used, one of ["covid", "cli"]
101102 se: boolean to write out standard errors, if true, use an obfuscated name
102103 wip_signal: Prefix for WIP signals
104+ logger: the structured logger
103105 """
106+ self .logger = logger
104107 self .startdate , self .enddate , self .dropdate = [
105108 pd .to_datetime (t ) for t in (startdate , enddate , dropdate )]
106109 # handle dates
@@ -149,7 +152,7 @@ def geo_reindex(self, data):
149152 geo = self .geo
150153 gmpr = GeoMapper ()
151154 if geo not in {"county" , "state" , "msa" , "hrr" , "nation" , "hhs" }:
152- logging .error ("{0} is invalid, pick one of 'county', "
155+ self . logger .error ("{0} is invalid, pick one of 'county', "
153156 "'state', 'msa', 'hrr', 'hss','nation'" .format (geo ))
154157 return False
155158 if geo == "county" :
@@ -201,12 +204,12 @@ def update_sensor(self,
201204 sub_data .reset_index (level = 0 ,inplace = True )
202205 if self .weekday :
203206 sub_data = Weekday .calc_adjustment (wd_params , sub_data )
204- res = CHCSensor .fit (sub_data , self .burnindate , geo_id )
207+ res = CHCSensor .fit (sub_data , self .burnindate , geo_id , self . logger )
205208 res = pd .DataFrame (res ).loc [final_sensor_idxs ]
206209 dfs .append (res )
207210 else :
208211 n_cpu = min (10 , cpu_count ())
209- logging .debug ("starting pool with {0} workers" .format (n_cpu ))
212+ self . logger .debug ("starting pool with {0} workers" .format (n_cpu ))
210213 with Pool (n_cpu ) as pool :
211214 pool_results = []
212215 for geo_id , sub_data in data_frame .groupby (level = 0 ,as_index = False ):
@@ -215,7 +218,7 @@ def update_sensor(self,
215218 sub_data = Weekday .calc_adjustment (wd_params , sub_data )
216219 pool_results .append (
217220 pool .apply_async (
218- CHCSensor .fit , args = (sub_data , self .burnindate , geo_id ,),
221+ CHCSensor .fit , args = (sub_data , self .burnindate , geo_id , self . logger ),
219222 )
220223 )
221224 pool_results = [proc .get () for proc in pool_results ]
@@ -244,7 +247,8 @@ def update_sensor(self,
244247 write_se = self .se ,
245248 day_shift = Config .DAY_SHIFT ,
246249 out_name = signal ,
247- output_path = output_path
250+ output_path = output_path ,
251+ logger = self .logger
248252 )
249253 if len (dates ) > 0 :
250254 stats .append ((max (dates ), len (dates )))
0 commit comments