Skip to content

Commit 19b3737

Browse files
Jingjing TangJingjing Tang
authored andcommitted
set default pull start date to 2020-01-05
1 parent 92a750c commit 19b3737

File tree

3 files changed

+61
-41
lines changed

3 files changed

+61
-41
lines changed

google_health/delphi_google_health/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@
1010

1111
SIGNALS = [RAW, SMOOTHED]
1212
GEO_TYPES = [STATE, HRR, MSA, DMA]
13+
14+
PULL_START_DATE = "2020-01-05"

google_health/delphi_google_health/export.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# -*- coding: utf-8 -*-
22
"""Function to export the dataset in the format expected of the API.
33
"""
4+
from datetime import datetime
5+
46
import numpy as np
57
import pandas as pd
68

@@ -10,7 +12,8 @@
1012

1113

1214
def export_csv(
13-
df: pd.DataFrame, geo_name: str, sensor: str, smooth: bool, receiving_dir: str
15+
df: pd.DataFrame, geo_name: str, sensor: str, smooth: bool,
16+
start_date: str, receiving_dir: str
1417
) -> None:
1518
"""Export data set in format expected for injestion by the API
1619
@@ -27,6 +30,8 @@ def export_csv(
2730
name of the sensor; only used for naming the output file
2831
smooth: bool
2932
should the signal in "val" be smoothed?
33+
start_date: str
34+
Output start date as a string formated as "YYYY-MM-DD"
3035
receiving_dir: str
3136
path to location where the output CSV files to be uploaded should be stored
3237
"""
@@ -39,13 +44,16 @@ def export_csv(
3944
df["val"] /= RESCALE_VAL
4045
df["se"] = np.nan
4146
df["sample_size"] = np.nan
47+
48+
start_date = datetime.strptime(start_date, "%Y-%m-%d")
4249

4350
for date in df["timestamp"].unique():
44-
date_short = date.replace("-", "")
45-
export_fn = f"{date_short}_{geo_name}_{sensor}.csv"
46-
df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size"]].to_csv(
47-
f"{receiving_dir}/{export_fn}",
48-
index=False,
49-
na_rep="NA",
50-
float_format="%.8f",
51-
)
51+
if datetime.strptime(date, "%Y-%m-%d") >= start_date:
52+
date_short = date.replace("-", "")
53+
export_fn = f"{date_short}_{geo_name}_{sensor}.csv"
54+
df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size"]].to_csv(
55+
f"{receiving_dir}/{export_fn}",
56+
index=False,
57+
na_rep="NA",
58+
float_format="%.8f",
59+
)

google_health/delphi_google_health/run.py

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
from .pull_api import GoogleHealthTrends, get_counts_states, get_counts_dma
1818
from .map_values import derived_counts_from_dma
1919
from .export import export_csv
20-
from .constants import SIGNALS, RAW, SMOOTHED, MSA, HRR, STATE, DMA
20+
from .constants import (SIGNALS, RAW, SMOOTHED,
21+
MSA, HRR, STATE, DMA,
22+
PULL_START_DATE)
2123

2224

2325
def run_module():
@@ -39,12 +41,12 @@ def run_module():
3941
wip_signal = params["wip_signal"]
4042
cache_dir = params["cache_dir"]
4143

42-
arch_diff = S3ArchiveDiffer(
43-
cache_dir, export_dir,
44-
params["bucket_name"], "ght",
45-
params["aws_credentials"])
46-
arch_diff.update_cache()
47-
print(arch_diff)
44+
# arch_diff = S3ArchiveDiffer(
45+
# cache_dir, export_dir,
46+
# params["bucket_name"], "ght",
47+
# params["aws_credentials"])
48+
# arch_diff.update_cache()
49+
# print(arch_diff)
4850
# if missing start_date, set to today (GMT) minus 5 days
4951
if start_date == "":
5052
now = datetime.datetime.now(datetime.timezone.utc)
@@ -69,10 +71,10 @@ def run_module():
6971

7072
# read data frame version of the data
7173
df_state = get_counts_states(
72-
ght, start_date, end_date, static_dir=static_dir, data_dir=data_dir
74+
ght, PULL_START_DATE, end_date, static_dir=static_dir, data_dir=data_dir
7375
)
7476
df_dma = get_counts_dma(
75-
ght, start_date, end_date, static_dir=static_dir, data_dir=data_dir
77+
ght, PULL_START_DATE, end_date, static_dir=static_dir, data_dir=data_dir
7678
)
7779
df_hrr, df_msa = derived_counts_from_dma(df_dma, static_dir=static_dir)
7880

@@ -81,27 +83,35 @@ def run_module():
8183
for signal in signal_names:
8284
if signal.endswith(SMOOTHED):
8385
# export each geographic region, with both smoothed and unsmoothed data
84-
export_csv(df_state, STATE, signal, smooth=True, receiving_dir=export_dir)
85-
export_csv(df_dma, DMA, signal, smooth=True, receiving_dir=export_dir)
86-
export_csv(df_hrr, HRR, signal, smooth=True, receiving_dir=export_dir)
87-
export_csv(df_msa, MSA, signal, smooth=True, receiving_dir=export_dir)
86+
export_csv(df_state, STATE, signal, smooth=True,
87+
start_date=start_date, receiving_dir=export_dir)
88+
export_csv(df_dma, DMA, signal, smooth=True,
89+
start_date=start_date, receiving_dir=export_dir)
90+
export_csv(df_hrr, HRR, signal, smooth=True,
91+
start_date=start_date, receiving_dir=export_dir)
92+
export_csv(df_msa, MSA, signal, smooth=True,
93+
start_date = start_date, receiving_dir=export_dir)
8894
elif signal.endswith(RAW):
89-
export_csv(df_state, STATE, signal, smooth=False, receiving_dir=export_dir)
90-
export_csv(df_dma, DMA, signal, smooth=False, receiving_dir=export_dir)
91-
export_csv(df_hrr, HRR, signal, smooth=False, receiving_dir=export_dir)
92-
export_csv(df_msa, MSA, signal, smooth=False, receiving_dir=export_dir)
93-
# Diff exports, and make incremental versions
94-
_, common_diffs, new_files = arch_diff.diff_exports()
95-
96-
# Archive changed and new files only
97-
to_archive = [f for f, diff in common_diffs.items() if diff is not None]
98-
to_archive += new_files
99-
_, fails = arch_diff.archive_exports(to_archive)
100-
101-
# Filter existing exports to exclude those that failed to archive
102-
succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails}
103-
arch_diff.filter_exports(succ_common_diffs)
104-
105-
# Report failures: someone should probably look at them
106-
for exported_file in fails:
107-
print(f"Failed to archive '{exported_file}'")
95+
export_csv(df_state, STATE, signal, smooth=False,
96+
start_date=start_date, receiving_dir=export_dir)
97+
export_csv(df_dma, DMA, signal, smooth=False,
98+
start_date=start_date, receiving_dir=export_dir)
99+
export_csv(df_hrr, HRR, signal, smooth=False,
100+
start_date=start_date, receiving_dir=export_dir)
101+
export_csv(df_msa, MSA, signal, smooth=False,
102+
start_date=start_date, receiving_dir=export_dir)
103+
# # Diff exports, and make incremental versions
104+
# _, common_diffs, new_files = arch_diff.diff_exports()
105+
#
106+
# # Archive changed and new files only
107+
# to_archive = [f for f, diff in common_diffs.items() if diff is not None]
108+
# to_archive += new_files
109+
# _, fails = arch_diff.archive_exports(to_archive)
110+
#
111+
# # Filter existing exports to exclude those that failed to archive
112+
# succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails}
113+
# arch_diff.filter_exports(succ_common_diffs)
114+
#
115+
# # Report failures: someone should probably look at them
116+
# for exported_file in fails:
117+
# print(f"Failed to archive '{exported_file}'")

0 commit comments

Comments
 (0)