From 9368b63fe7999bfb4d95a14a3b701bf67ba18da0 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 28 Aug 2025 17:58:09 -0400 Subject: [PATCH 1/4] add RVDSS endpoint and python client support --- src/client/delphi_epidata.py | 42 +++++++++++++ src/server/_query.py | 14 +++-- src/server/endpoints/rvdss.py | 108 ++++++++++++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 4 deletions(-) create mode 100644 src/server/endpoints/rvdss.py diff --git a/src/client/delphi_epidata.py b/src/client/delphi_epidata.py index 998c85281..eb7802c07 100644 --- a/src/client/delphi_epidata.py +++ b/src/client/delphi_epidata.py @@ -675,6 +675,48 @@ def covid_hosp_facility_lookup(state=None, ccn=None, city=None, zip=None, fips_c # Make the API call return Epidata._request("covid_hosp_facility_lookup", params) + # Fetch Canadian respiratory RVDSS data + @staticmethod + def rvdss( + geo_type, + time_values, + geo_value, + as_of=None, + issues=None, + ): + """Fetch RVDSS data.""" + # Check parameters + if None in (geo_type, time_values, geo_value): + raise EpidataBadRequestException( + "`geo_type`, `time_values`, and `geo_value` are all required" + ) + + # Set up request + params = { + "data_source": data_source, + "signals": Epidata._list(signals), + "time_type": time_type, + "geo_type": geo_type, + "time_values": Epidata._list(time_values), + # Fake a time type param so that we can use some helper functions later. + "time_type": "week", + } + + if isinstance(geo_value, (list, tuple)): + params["geo_values"] = ",".join(geo_value) + else: + params["geo_value"] = geo_value + if as_of is not None: + params["as_of"] = as_of + if issues is not None: + params["issues"] = Epidata._list(issues) + # if lag is not None: + # params["lag"] = lag + + # Make the API call + return Epidata._request("rvdss", params) + + @staticmethod def async_epidata(param_list, batch_size=50): """[DEPRECATED] Make asynchronous Epidata calls for a list of parameters.""" diff --git a/src/server/_query.py b/src/server/_query.py index 267a78eb1..cb6797a8a 100644 --- a/src/server/_query.py +++ b/src/server/_query.py @@ -483,15 +483,21 @@ def apply_issues_filter(self, history_table: str, issues: Optional[TimeValues]) self.where_integers("issue", issues) return self - def apply_as_of_filter(self, history_table: str, as_of: Optional[int]) -> "QueryBuilder": + def apply_as_of_filter(self, history_table: str, as_of: Optional[int], use_source_signal: Optional[bool] = TRUE) -> "QueryBuilder": if as_of is not None: self.retable(history_table) sub_condition_asof = "(issue <= :as_of)" self.params["as_of"] = as_of - sub_fields = "max(issue) max_issue, time_type, time_value, `source`, `signal`, geo_type, geo_value" - sub_group = "time_type, time_value, `source`, `signal`, geo_type, geo_value" + alias = self.alias - sub_condition = f"x.max_issue = {alias}.issue AND x.time_type = {alias}.time_type AND x.time_value = {alias}.time_value AND x.source = {alias}.source AND x.signal = {alias}.signal AND x.geo_type = {alias}.geo_type AND x.geo_value = {alias}.geo_value" + source_signal_plain = source_signal_alias = "" + if use_source_signal: + source_signal_plain = ", `source`, `signal`" + source_signal_alias = f"AND x.source = {alias}.source AND x.signal = {alias}.signal" + + sub_fields = f"max(issue) max_issue, time_type, time_value{source_signal_plain}, geo_type, geo_value" + sub_group = f"time_type, time_value{source_signal_plain}, geo_type, geo_value" + sub_condition = f"x.max_issue = {alias}.issue AND x.time_type = {alias}.time_type AND x.time_value = {alias}.time_value{source_signal_alias} AND x.geo_type = {alias}.geo_type AND x.geo_value = {alias}.geo_value" self.subquery = f"JOIN (SELECT {sub_fields} FROM {self.table} WHERE {self.conditions_clause} AND {sub_condition_asof} GROUP BY {sub_group}) x ON {sub_condition}" return self diff --git a/src/server/endpoints/rvdss.py b/src/server/endpoints/rvdss.py new file mode 100644 index 000000000..208f908a3 --- /dev/null +++ b/src/server/endpoints/rvdss.py @@ -0,0 +1,108 @@ +from flask import Blueprint, request + +from .._params import extract_integers, extract_strings, extract_date, extract_dates +from .._query import execute_query, QueryBuilder +from .._validate import require_all + +bp = Blueprint("rvdss", __name__) + +db_table_name = "rvdss" + +def parse_time_set_epiweeks() -> TimeSet: + require_all(request, "epiweeks") + time_values = extract_dates("epiweeks") + if time_values == ["*"]: + return TimeSet("week", True) + return TimeSet("week", time_values) + + return parse_time_arg() + + +@bp.route("/", methods=("GET", "POST")) +def handle(): + require_all(request, "epiweeks", "geo_type", "geo_values") + + # epiweeks = extract_integers("epiweeks") + time_set = parse_time_set_epiweeks() + geo_sets = parse_geo_sets() + # issues = extract_integers("issues") # TODO + issues = extract_dates("issues") + # as_of = extract_date("as_of") + + # basic query info + q = QueryBuilder(db_table_name, "rv") + + fields_string = [ + "geo_type", + "geo_value", + "region", + ] + fields_int = [ + "epiweek", + "week", + "weekorder", + "year", + "time_value", + "issue", + ] + fields_float = [ + "sarscov2_tests", + "sarscov2_positive_tests", + "sarscov2_pct_positive", + "flu_tests", + "flua_tests", + "flub_tests", + "flu_positive_tests", + "flu_pct_positive", + "fluah1n1pdm09_positive_tests", + "fluah3_positive_tests", + "fluauns_positive_tests", + "flua_positive_tests", + "flua_pct_positive", + "flub_positive_tests", + "flub_pct_positive", + "rsv_tests", + "rsv_positive_tests", + "rsv_pct_positive", + "hpiv_tests", + "hpiv1_positive_tests", + "hpiv2_positive_tests", + "hpiv3_positive_tests", + "hpiv4_positive_tests", + "hpivother_positive_tests", + "hpiv_positive_tests", + "hpiv_pct_positive", + "adv_tests", + "adv_positive_tests", + "adv_pct_positive", + "hmpv_tests", + "hmpv_positive_tests", + "hmpv_pct_positive", + "evrv_tests", + "evrv_positive_tests", + "evrv_pct_positive", + "hcov_tests", + "hcov_positive_tests", + "hcov_pct_positive", + ] + + q.set_sort_order("epiweek", "time_value" "geo_type", "geo_value", "issue") + q.set_fields(fields_string, fields_int, fields_float) + + # q.where_strings("geo_type", geo_type) + q.apply_geo_filters("geo_type", "geo_value", geo_sets) + # TODO: can only use this if have a time_type field in data + # q.apply_time_filter("time_type", "time_value", time_set) + q.where_integers("epiweek", epiweeks) + + q.apply_issues_filter(db_table_name, issues) + # TODO: can only use this if have a time_type field in data + # q.apply_as_of_filter(db_table_name, as_of, use_source_signal = FALSE) + + # if issues is not None: + # q.where_integers("issue", issues) + # else: + # q.with_max_issue("epiweek", "geo_value") + + # send query + return execute_query(str(q), q.params, fields_string, fields_int, fields_float) From 7b70f5f15470385a8d5fa1cb3ec97113cb970279 Mon Sep 17 00:00:00 2001 From: nmdefries <42820733+nmdefries@users.noreply.github.com> Date: Tue, 16 Sep 2025 17:12:41 -0400 Subject: [PATCH 2/4] review feedback. simplify and clean up rvdss server code --- src/client/delphi_epidata.py | 13 ++--- src/server/_query.py | 12 +++- src/server/endpoints/rvdss.py | 107 +++++++++++++++------------------- 3 files changed, 62 insertions(+), 70 deletions(-) diff --git a/src/client/delphi_epidata.py b/src/client/delphi_epidata.py index eb7802c07..b64ee2e55 100644 --- a/src/client/delphi_epidata.py +++ b/src/client/delphi_epidata.py @@ -688,30 +688,25 @@ def rvdss( # Check parameters if None in (geo_type, time_values, geo_value): raise EpidataBadRequestException( - "`geo_type`, `time_values`, and `geo_value` are all required" + "`geo_type`, `geo_value`, and `time_values` are all required" ) # Set up request params = { - "data_source": data_source, - "signals": Epidata._list(signals), - "time_type": time_type, - "geo_type": geo_type, - "time_values": Epidata._list(time_values), # Fake a time type param so that we can use some helper functions later. "time_type": "week", + "geo_type": geo_type, + "time_values": Epidata._list(time_values), } if isinstance(geo_value, (list, tuple)): params["geo_values"] = ",".join(geo_value) else: - params["geo_value"] = geo_value + params["geo_values"] = geo_value if as_of is not None: params["as_of"] = as_of if issues is not None: params["issues"] = Epidata._list(issues) - # if lag is not None: - # params["lag"] = lag # Make the API call return Epidata._request("rvdss", params) diff --git a/src/server/_query.py b/src/server/_query.py index cb6797a8a..58be121b2 100644 --- a/src/server/_query.py +++ b/src/server/_query.py @@ -483,7 +483,14 @@ def apply_issues_filter(self, history_table: str, issues: Optional[TimeValues]) self.where_integers("issue", issues) return self - def apply_as_of_filter(self, history_table: str, as_of: Optional[int], use_source_signal: Optional[bool] = TRUE) -> "QueryBuilder": + # Note: This function was originally only used by covidcast, so it assumed + # that `source` and `signal` columns are always available. To make this usable + # for rvdss, too, which doesn't have the `source` or `signal` columns, add the + # `use_source_signal` flag to toggle inclusion of `source` and `signal` columns. + # + # `use_source_signal` defaults to True so if not passed, the function + # retains the historical behavior. + def apply_as_of_filter(self, history_table: str, as_of: Optional[int], use_source_signal: Optional[bool] = True) -> "QueryBuilder": if as_of is not None: self.retable(history_table) sub_condition_asof = "(issue <= :as_of)" @@ -491,9 +498,10 @@ def apply_as_of_filter(self, history_table: str, as_of: Optional[int], use_sourc alias = self.alias source_signal_plain = source_signal_alias = "" + # If `use_source_signal` toggled on, append `source` and `signal` columns to group-by list and join list. if use_source_signal: source_signal_plain = ", `source`, `signal`" - source_signal_alias = f"AND x.source = {alias}.source AND x.signal = {alias}.signal" + source_signal_alias = f" AND x.source = {alias}.source AND x.signal = {alias}.signal" sub_fields = f"max(issue) max_issue, time_type, time_value{source_signal_plain}, geo_type, geo_value" sub_group = f"time_type, time_value{source_signal_plain}, geo_type, geo_value" diff --git a/src/server/endpoints/rvdss.py b/src/server/endpoints/rvdss.py index 208f908a3..2e9198f01 100644 --- a/src/server/endpoints/rvdss.py +++ b/src/server/endpoints/rvdss.py @@ -1,6 +1,11 @@ from flask import Blueprint, request -from .._params import extract_integers, extract_strings, extract_date, extract_dates +from .._params import ( + extract_date, + extract_dates, + extract_strings, + parse_time_set, +) from .._query import execute_query, QueryBuilder from .._validate import require_all @@ -8,26 +13,15 @@ db_table_name = "rvdss" -def parse_time_set_epiweeks() -> TimeSet: - require_all(request, "epiweeks") - time_values = extract_dates("epiweeks") - if time_values == ["*"]: - return TimeSet("week", True) - return TimeSet("week", time_values) - - return parse_time_arg() - - @bp.route("/", methods=("GET", "POST")) def handle(): - require_all(request, "epiweeks", "geo_type", "geo_values") + require_all(request, "time_type", "time_values", "geo_type", "geo_values") - # epiweeks = extract_integers("epiweeks") - time_set = parse_time_set_epiweeks() - geo_sets = parse_geo_sets() - # issues = extract_integers("issues") # TODO + time_set = parse_time_set() + geo_type = extract_strings("geo_type") + geo_values = extract_strings("geo_values") issues = extract_dates("issues") - # as_of = extract_date("as_of") + as_of = extract_date("as_of") # basic query info q = QueryBuilder(db_table_name, "rv") @@ -36,73 +30,68 @@ def handle(): "geo_type", "geo_value", "region", + "time_type", ] fields_int = [ "epiweek", + "time_value", + "issue", "week", "weekorder", "year", - "time_value", - "issue", ] fields_float = [ - "sarscov2_tests", - "sarscov2_positive_tests", - "sarscov2_pct_positive", + "adv_pct_positive", + "adv_positive_tests", + "adv_tests", + "evrv_pct_positive", + "evrv_positive_tests", + "evrv_tests", + "flu_pct_positive", + "flu_positive_tests", "flu_tests", + "flua_pct_positive", + "flua_positive_tests", "flua_tests", - "flub_tests", - "flu_positive_tests", - "flu_pct_positive", "fluah1n1pdm09_positive_tests", "fluah3_positive_tests", "fluauns_positive_tests", - "flua_positive_tests", - "flua_pct_positive", - "flub_positive_tests", "flub_pct_positive", - "rsv_tests", - "rsv_positive_tests", - "rsv_pct_positive", - "hpiv_tests", + "flub_positive_tests", + "flub_tests", + "hcov_pct_positive", + "hcov_positive_tests", + "hcov_tests", + "hmpv_pct_positive", + "hmpv_positive_tests", + "hmpv_tests", "hpiv1_positive_tests", "hpiv2_positive_tests", "hpiv3_positive_tests", "hpiv4_positive_tests", - "hpivother_positive_tests", - "hpiv_positive_tests", "hpiv_pct_positive", - "adv_tests", - "adv_positive_tests", - "adv_pct_positive", - "hmpv_tests", - "hmpv_positive_tests", - "hmpv_pct_positive", - "evrv_tests", - "evrv_positive_tests", - "evrv_pct_positive", - "hcov_tests", - "hcov_positive_tests", - "hcov_pct_positive", + "hpiv_positive_tests", + "hpiv_tests", + "hpivother_positive_tests", + "rsv_pct_positive", + "rsv_positive_tests", + "rsv_tests", + "sarscov2_pct_positive", + "sarscov2_positive_tests", + "sarscov2_tests", ] - q.set_sort_order("epiweek", "time_value" "geo_type", "geo_value", "issue") + q.set_sort_order("epiweek", "time_value", "geo_type", "geo_value", "issue") q.set_fields(fields_string, fields_int, fields_float) - # q.where_strings("geo_type", geo_type) - q.apply_geo_filters("geo_type", "geo_value", geo_sets) - # TODO: can only use this if have a time_type field in data - # q.apply_time_filter("time_type", "time_value", time_set) - q.where_integers("epiweek", epiweeks) + q.where_strings("geo_type", geo_type) + # Only apply geo_values filter if wildcard "*" was NOT used. + if not (len(geo_values) == 1 and geo_values[0] == "*"): + q.where_strings("geo_value", geo_values) + q.apply_time_filter("time_type", "time_value", time_set) q.apply_issues_filter(db_table_name, issues) - # TODO: can only use this if have a time_type field in data - # q.apply_as_of_filter(db_table_name, as_of, use_source_signal = FALSE) - - # if issues is not None: - # q.where_integers("issue", issues) - # else: - # q.with_max_issue("epiweek", "geo_value") + q.apply_as_of_filter(db_table_name, as_of, use_source_signal = False) # send query return execute_query(str(q), q.params, fields_string, fields_int, fields_float) From 35f04ea0020ef1097369dd1d7b64a2d5700f323c Mon Sep 17 00:00:00 2001 From: nmdefries <42820733+nmdefries@users.noreply.github.com> Date: Tue, 16 Sep 2025 17:34:51 -0400 Subject: [PATCH 3/4] export rvdss endpoint --- src/server/endpoints/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/server/endpoints/__init__.py b/src/server/endpoints/__init__.py index 6dad05bf3..c0a095475 100644 --- a/src/server/endpoints/__init__.py +++ b/src/server/endpoints/__init__.py @@ -25,6 +25,7 @@ nowcast, paho_dengue, quidel, + rvdss, sensors, twitter, wiki, @@ -59,6 +60,7 @@ nowcast, paho_dengue, quidel, + rvdss, sensors, twitter, wiki, From 7de9698f0f7a73ecb590b60248de85c3a9a85cb2 Mon Sep 17 00:00:00 2001 From: nmdefries <42820733+nmdefries@users.noreply.github.com> Date: Tue, 16 Sep 2025 17:47:33 -0400 Subject: [PATCH 4/4] update actions/cache version --- .github/workflows/ci.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fd2d5fb4b..2bdb932b4 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -65,7 +65,7 @@ jobs: docker network create --driver bridge delphi-net docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata --cap-add=sys_nice delphi_database_epidata docker run --rm -d -p 6379:6379 --network delphi-net --env "REDIS_PASSWORD=1234" --name delphi_redis delphi_redis - + - run: | wget https://raw.githubusercontent.com/eficode/wait-for/master/wait-for @@ -103,7 +103,7 @@ jobs: with: node-version: '16.x' - name: Cache Node.js modules - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/.npm # npm cache files are stored in `~/.npm` on Linux/macOS key: ${{ runner.OS }}-node2-${{ hashFiles('**/package-lock.json') }}