From 563a6fa9d55f88f4eecd21ea85ed9af4cf9ca3ec Mon Sep 17 00:00:00 2001 From: Joseph Sawaya Date: Mon, 31 Mar 2025 10:02:17 -0400 Subject: [PATCH] feat: add version metadata in ta_cache_rollups we want to be able to evolve the schema of the rollups and we can do that by including a version tag in the GCS object metadata. However, even though in this case we're making the change in worker first, in the future, we should modify the reading code to handle the new schema before modifying the write code, since if we deploy both reader and writer at the same time, its possible a rollup written by a new version of the writer is read by an old version of the reader which doesn't understand the new format --- services/test_analytics/ta_cache_rollups.py | 43 ++++++++++++++++--- ..._cache_rollups__cache_test_rollups__0.json | 4 ++ ...test_rollups_use_timeseries_branch__0.json | 4 ++ ...e_test_rollups_use_timeseries_main__0.json | 4 ++ .../tests/test_ta_cache_rollups.py | 27 +++++++++--- 5 files changed, 70 insertions(+), 12 deletions(-) diff --git a/services/test_analytics/ta_cache_rollups.py b/services/test_analytics/ta_cache_rollups.py index fdca2c7a4..fdffcc110 100644 --- a/services/test_analytics/ta_cache_rollups.py +++ b/services/test_analytics/ta_cache_rollups.py @@ -1,5 +1,6 @@ from datetime import UTC from io import BytesIO +from typing import cast import polars as pl import shared.storage @@ -24,8 +25,33 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str: ) -POLARS_SCHEMA = [ +# version number that the cache rollup task will be writing to GCS +# if you're creating a new version of the schema, increment this +VERSION = "1" + +# list of schemas, you should leave the old ones here as a reference for now +# old schemas should basically be expired after 60 days, since there would be +# no relevant data included in those files after that amount of time + +# so from the time you deprecate an old schema, you only have to keep handling it +# for 60 days +NO_VERSION_POLARS_SCHEMA = [ + "computed_name", + ("flags", pl.List(pl.String)), + "failing_commits", + "last_duration", + "avg_duration", + "pass_count", + "fail_count", + "flaky_fail_count", + "skip_count", + ("updated_at", pl.Datetime(time_zone=UTC)), + "timestamp_bin", +] + +V1_POLARS_SCHEMA = [ "computed_name", + "testsuite", ("flags", pl.List(pl.String)), "failing_commits", "last_duration", @@ -40,7 +66,6 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str: def cache_rollups(repoid: int, branch: str | None = None): - storage_service = shared.storage.get_appropriate_storage_service(repoid) serialized_table: BytesIO with read_rollups_from_db_summary.labels("new").time(): @@ -55,6 +80,7 @@ def cache_rollups(repoid: int, branch: str | None = None): data = [ { "computed_name": summary.computed_name, + "testsuite": summary.testsuite, "flags": summary.flags, "failing_commits": summary.failing_commits, "last_duration": summary.last_duration_seconds, @@ -69,15 +95,20 @@ def cache_rollups(repoid: int, branch: str | None = None): for summary in summaries ] - serialized_table = pl.DataFrame( + df = pl.DataFrame( data, - POLARS_SCHEMA, + V1_POLARS_SCHEMA, orient="row", - ).write_ipc(None) + ) + serialized_table = df.write_ipc(None) serialized_table.seek(0) + storage_service = shared.storage.get_appropriate_storage_service(repoid) storage_service.write_file( - settings.GCS_BUCKET_NAME, rollup_blob_path(repoid, branch), serialized_table + cast(str, settings.GCS_BUCKET_NAME), + rollup_blob_path(repoid, branch), + serialized_table, + metadata={"version": VERSION}, ) rollup_size_summary.labels("new").observe(serialized_table.tell()) diff --git a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json index 987b14e08..916b0ce54 100644 --- a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json +++ b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json @@ -3,6 +3,10 @@ "computed_name2", "computed_name" ], + "testsuite": [ + "testsuite2", + "testsuite" + ], "flags": [ [ "test-rollups2" diff --git a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json index 6798c8832..714804777 100644 --- a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json +++ b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json @@ -3,6 +3,10 @@ "computed_name", "computed_name2" ], + "testsuite": [ + "testsuite", + "testsuite2" + ], "flags": [ [ "test-rollups" diff --git a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json index 987b14e08..916b0ce54 100644 --- a/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json +++ b/services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json @@ -3,6 +3,10 @@ "computed_name2", "computed_name" ], + "testsuite": [ + "testsuite2", + "testsuite" + ], "flags": [ [ "test-rollups2" diff --git a/services/test_analytics/tests/test_ta_cache_rollups.py b/services/test_analytics/tests/test_ta_cache_rollups.py index c8cf3b7b0..4d0c15b2a 100644 --- a/services/test_analytics/tests/test_ta_cache_rollups.py +++ b/services/test_analytics/tests/test_ta_cache_rollups.py @@ -1,4 +1,5 @@ import datetime as dt +from typing import cast import polars as pl import pytest @@ -8,14 +9,22 @@ TestrunBranchSummary, TestrunSummary, ) +from shared.storage.minio import MinioStorageService +from services.test_analytics.ta_cache_rollups import VERSION from services.test_analytics.utils import calc_test_id from tasks.cache_test_rollups import CacheTestRollupsTask -def read_table(storage, storage_path: str): +def read_table( + storage: MinioStorageService, + storage_path: str, + meta_container: dict[str, str] | None = None, +): decompressed_table: bytes = storage.read_file( - get_config("services", "minio", "bucket", default="archive"), storage_path + cast(str, get_config("services", "minio", "bucket", default="archive")), + storage_path, + metadata_container=meta_container, ) return pl.read_ipc(decompressed_table) @@ -82,8 +91,11 @@ def test_cache_test_rollups(storage, snapshot): branch=None, impl_type="new", ) - - table = read_table(storage, "test_analytics/repo_rollups/1.arrow") + meta = {} + table = read_table( + storage, "test_analytics/repo_rollups/1.arrow", meta_container=meta + ) + assert meta["version"] == VERSION table_dict = table.to_dict(as_series=False) del table_dict["timestamp_bin"] del table_dict["updated_at"] @@ -174,8 +186,11 @@ def test_cache_test_rollups_use_timeseries_main(storage, snapshot): branch="main", impl_type="new", ) - - table = read_table(storage, "test_analytics/branch_rollups/1/main.arrow") + meta = {} + table = read_table( + storage, "test_analytics/branch_rollups/1/main.arrow", meta_container=meta + ) + assert meta["version"] == VERSION table_dict = table.to_dict(as_series=False) del table_dict["timestamp_bin"] del table_dict["updated_at"]