From 84632f252d0197def277cca18389c85247154874 Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Mon, 23 Feb 2026 12:18:16 +0200 Subject: [PATCH 01/11] Add adaptive metadata upload batch sizing --- dagshub/common/config.py | 13 +++ dagshub/data_engine/model/datasource.py | 102 ++++++++++++++++++++++-- tests/data_engine/test_datasource.py | 45 +++++++++++ 3 files changed, 153 insertions(+), 7 deletions(-) diff --git a/dagshub/common/config.py b/dagshub/common/config.py index e82b0063..60084863 100644 --- a/dagshub/common/config.py +++ b/dagshub/common/config.py @@ -60,6 +60,19 @@ def set_host(new_host: str): DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_KEY = "DAGSHUB_DE_METADATA_UPLOAD_BATCH_SIZE" dataengine_metadata_upload_batch_size = int(os.environ.get(DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_KEY, 15000)) +DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_MIN_KEY = "DAGSHUB_DE_METADATA_UPLOAD_BATCH_SIZE_MIN" +dataengine_metadata_upload_batch_size_min = int(os.environ.get(DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_MIN_KEY, 150)) + +DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_INITIAL_KEY = "DAGSHUB_DE_METADATA_UPLOAD_BATCH_SIZE_INITIAL" +dataengine_metadata_upload_batch_size_initial = int( + os.environ.get(DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_INITIAL_KEY, dataengine_metadata_upload_batch_size_min) +) + +DATAENGINE_METADATA_UPLOAD_TARGET_BATCH_TIME_KEY = "DAGSHUB_DE_METADATA_UPLOAD_TARGET_BATCH_TIME" +dataengine_metadata_upload_target_batch_time = float( + os.environ.get(DATAENGINE_METADATA_UPLOAD_TARGET_BATCH_TIME_KEY, 5.0) +) + DISABLE_ANALYTICS_KEY = "DAGSHUB_DISABLE_ANALYTICS" disable_analytics = "DAGSHUB_DISABLE_ANALYTICS" in os.environ diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index 255bb76d..491edcda 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -755,16 +755,104 @@ def _upload_metadata(self, metadata_entries: List[DatapointMetadataUpdateEntry]) progress = get_rich_progress(rich.progress.MofNCompleteColumn()) - upload_batch_size = dagshub.common.config.dataengine_metadata_upload_batch_size + max_batch_size = max(1, dagshub.common.config.dataengine_metadata_upload_batch_size) + min_batch_size = max( + 1, + min(dagshub.common.config.dataengine_metadata_upload_batch_size_min, max_batch_size), + ) + current_batch_size = max( + min_batch_size, + min(dagshub.common.config.dataengine_metadata_upload_batch_size_initial, max_batch_size), + ) + target_batch_time = max(dagshub.common.config.dataengine_metadata_upload_target_batch_time, 0.01) + + def _next_batch_after_success(batch_size: int, bad_batch_size: Optional[int]) -> int: + # Keep expanding quickly until we find an upper bound, then binary-search between good and bad. + if bad_batch_size is not None and batch_size < bad_batch_size: + next_batch_size = batch_size + max(1, (bad_batch_size - batch_size) // 2) + else: + next_batch_size = batch_size * 2 + + next_batch_size = min(max_batch_size, next_batch_size) + if next_batch_size <= batch_size and batch_size < max_batch_size: + next_batch_size = batch_size + 1 + return max(min_batch_size, next_batch_size) + + def _next_batch_after_bad( + batch_size: int, + good_batch_size: Optional[int], + bad_batch_size: Optional[int], + ) -> int: + upper_bound = bad_batch_size if bad_batch_size is not None else batch_size + + if good_batch_size is not None and good_batch_size < upper_bound: + next_batch_size = good_batch_size + max(1, (upper_bound - good_batch_size) // 2) + else: + next_batch_size = upper_bound // 2 + + next_batch_size = max(min_batch_size, min(max_batch_size, next_batch_size)) + if next_batch_size >= batch_size: + next_batch_size = max(min_batch_size, batch_size - 1) + return next_batch_size + total_entries = len(metadata_entries) - total_task = progress.add_task(f"Uploading metadata (batch size {upload_batch_size})...", total=total_entries) + total_task = progress.add_task( + f"Uploading metadata (adaptive batch {current_batch_size}-{max_batch_size})...", + total=total_entries, + ) + + last_good_batch_size: Optional[int] = None + last_bad_batch_size: Optional[int] = None with progress: - for start in range(0, total_entries, upload_batch_size): - entries = metadata_entries[start : start + upload_batch_size] - logger.debug(f"Uploading {len(entries)} metadata entries...") - self.source.client.update_metadata(self, entries) - progress.update(total_task, advance=upload_batch_size) + start = 0 + while start < total_entries: + entries_left = total_entries - start + batch_size = min(current_batch_size, entries_left) + entries = metadata_entries[start : start + batch_size] + + progress.update( + total_task, + description=f"Uploading metadata (batch size {batch_size})...", + ) + logger.debug(f"Uploading {batch_size} metadata entries...") + + start_time = time.monotonic() + try: + self.source.client.update_metadata(self, entries) + except Exception as exc: + if batch_size <= min_batch_size: + logger.error( + f"Metadata upload failed at minimum batch size ({min_batch_size}); aborting.", + exc_info=True, + ) + raise + + last_bad_batch_size = ( + batch_size if last_bad_batch_size is None else min(last_bad_batch_size, batch_size) + ) + current_batch_size = _next_batch_after_bad(batch_size, last_good_batch_size, last_bad_batch_size) + logger.warning( + f"Metadata upload failed for batch size {batch_size} " + f"({exc.__class__.__name__}: {exc}). Retrying with batch size {current_batch_size}." + ) + continue + + elapsed = time.monotonic() - start_time + start += batch_size + progress.update(total_task, advance=batch_size) + + if elapsed <= target_batch_time: + last_good_batch_size = ( + batch_size if last_good_batch_size is None else max(last_good_batch_size, batch_size) + ) + current_batch_size = _next_batch_after_success(batch_size, last_bad_batch_size) + else: + last_bad_batch_size = ( + batch_size if last_bad_batch_size is None else min(last_bad_batch_size, batch_size) + ) + current_batch_size = _next_batch_after_bad(batch_size, last_good_batch_size, last_bad_batch_size) + progress.update(total_task, completed=total_entries, refresh=True) # Update the status from dagshub, so we get back the new metadata columns diff --git a/tests/data_engine/test_datasource.py b/tests/data_engine/test_datasource.py index bd1f1912..49359f59 100644 --- a/tests/data_engine/test_datasource.py +++ b/tests/data_engine/test_datasource.py @@ -8,6 +8,7 @@ import pandas as pd import pytest +import dagshub.common.config from dagshub.common.util import wrap_bytes from dagshub.data_engine.annotation import MetadataAnnotations from dagshub.data_engine.client.models import MetadataFieldSchema @@ -19,6 +20,10 @@ from tests.data_engine.util import add_string_fields, add_document_fields, add_annotation_fields +def _uploaded_batch_sizes(ds: Datasource): + return [len(call.args[1]) for call in ds.source.client.update_metadata.call_args_list] + + @pytest.fixture def metadata_df(): data_dict = { @@ -142,6 +147,46 @@ def test_uploading_to_document_turns_into_blob(ds): client_mock.update_metadata.assert_called_with(ds, expected_data_upload) +def test_upload_metadata_starts_small_and_grows(ds, mocker): + entries = [ + DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(14) + ] + + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 16) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 2) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + + ds._upload_metadata(entries) + + assert _uploaded_batch_sizes(ds) == [2, 4, 8] + + +def test_upload_metadata_retries_with_smaller_batch_after_failure(ds, mocker): + entries = [ + DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(10) + ] + + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + + has_failed = {"value": False} + + def _flaky_upload(_ds, upload_entries): + if len(upload_entries) == 8 and not has_failed["value"]: + has_failed["value"] = True + raise RuntimeError("simulated timeout") + + ds.source.client.update_metadata.side_effect = _flaky_upload + + ds._upload_metadata(entries) + + assert has_failed["value"] + assert _uploaded_batch_sizes(ds) == [8, 4, 6] + + def test_pandas_timestamp(ds): data_dict = { "file": ["test1", "test2"], From ded500751edf69b9f66d9dc0ca53a5426c66f211 Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Tue, 3 Mar 2026 16:12:46 +0200 Subject: [PATCH 02/11] Handle non-retryable metadata upload failures --- dagshub/data_engine/model/datasource.py | 23 ++++++++++++++++ tests/data_engine/test_datasource.py | 35 ++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index 491edcda..d9b20475 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -16,7 +16,10 @@ import rich.progress from dataclasses_json import DataClassJsonMixin, LetterCase, config +from gql.transport.exceptions import TransportConnectionFailed, TransportServerError from pathvalidate import sanitize_filepath +from requests.exceptions import ConnectionError as RequestsConnectionError +from requests.exceptions import Timeout as RequestsTimeout import dagshub.common.config from dagshub.common import rich_console @@ -42,6 +45,7 @@ from dagshub.data_engine.model.datapoint import Datapoint from dagshub.data_engine.model.datasource_state import DatasourceState from dagshub.data_engine.model.errors import ( + DataEngineGqlError, DatasetFieldComparisonError, DatasetNotFoundError, FieldNotFoundError, @@ -795,6 +799,21 @@ def _next_batch_after_bad( next_batch_size = max(min_batch_size, batch_size - 1) return next_batch_size + def _is_retryable_upload_error(exc: Exception) -> bool: + if isinstance(exc, DataEngineGqlError): + return isinstance(exc.original_exception, (TransportServerError, TransportConnectionFailed)) + return isinstance( + exc, + ( + TransportServerError, + TransportConnectionFailed, + TimeoutError, + ConnectionError, + RequestsConnectionError, + RequestsTimeout, + ), + ) + total_entries = len(metadata_entries) total_task = progress.add_task( f"Uploading metadata (adaptive batch {current_batch_size}-{max_batch_size})...", @@ -821,6 +840,10 @@ def _next_batch_after_bad( try: self.source.client.update_metadata(self, entries) except Exception as exc: + if not _is_retryable_upload_error(exc): + logger.error("Metadata upload failed with a non-retryable error; aborting.", exc_info=True) + raise + if batch_size <= min_batch_size: logger.error( f"Metadata upload failed at minimum batch size ({min_batch_size}); aborting.", diff --git a/tests/data_engine/test_datasource.py b/tests/data_engine/test_datasource.py index 49359f59..a25741fb 100644 --- a/tests/data_engine/test_datasource.py +++ b/tests/data_engine/test_datasource.py @@ -177,7 +177,7 @@ def test_upload_metadata_retries_with_smaller_batch_after_failure(ds, mocker): def _flaky_upload(_ds, upload_entries): if len(upload_entries) == 8 and not has_failed["value"]: has_failed["value"] = True - raise RuntimeError("simulated timeout") + raise TimeoutError("simulated timeout") ds.source.client.update_metadata.side_effect = _flaky_upload @@ -187,6 +187,39 @@ def _flaky_upload(_ds, upload_entries): assert _uploaded_batch_sizes(ds) == [8, 4, 6] +def test_upload_metadata_slow_success_reduces_batch_size(ds, mocker): + entries = [ + DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(12) + ] + + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1.0) + mocker.patch("dagshub.data_engine.model.datasource.time.monotonic", side_effect=[0.0, 2.0, 3.0, 3.1]) + + ds._upload_metadata(entries) + + assert _uploaded_batch_sizes(ds) == [8, 4] + + +def test_upload_metadata_non_retryable_error_does_not_retry(ds, mocker): + entries = [ + DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(10) + ] + + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + ds.source.client.update_metadata.side_effect = ValueError("simulated validation error") + + with pytest.raises(ValueError, match="simulated validation error"): + ds._upload_metadata(entries) + + assert _uploaded_batch_sizes(ds) == [8] + + def test_pandas_timestamp(ds): data_dict = { "file": ["test1", "test2"], From 567c8306f060d4d7cb1b56a3ce856fef0ac25b47 Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Tue, 3 Mar 2026 16:20:13 +0200 Subject: [PATCH 03/11] Allow retry shrink on partial metadata batches --- dagshub/data_engine/model/datasource.py | 11 ++++++++--- tests/data_engine/test_datasource.py | 25 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index d9b20475..da44c944 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -787,6 +787,11 @@ def _next_batch_after_bad( good_batch_size: Optional[int], bad_batch_size: Optional[int], ) -> int: + # If we're already below the configured minimum (for example, last partial chunk), + # keep shrinking until we reach 1. + if batch_size <= min_batch_size: + return max(1, batch_size - 1) + upper_bound = bad_batch_size if bad_batch_size is not None else batch_size if good_batch_size is not None and good_batch_size < upper_bound: @@ -796,7 +801,7 @@ def _next_batch_after_bad( next_batch_size = max(min_batch_size, min(max_batch_size, next_batch_size)) if next_batch_size >= batch_size: - next_batch_size = max(min_batch_size, batch_size - 1) + next_batch_size = max(1, batch_size - 1) return next_batch_size def _is_retryable_upload_error(exc: Exception) -> bool: @@ -844,9 +849,9 @@ def _is_retryable_upload_error(exc: Exception) -> bool: logger.error("Metadata upload failed with a non-retryable error; aborting.", exc_info=True) raise - if batch_size <= min_batch_size: + if batch_size <= 1: logger.error( - f"Metadata upload failed at minimum batch size ({min_batch_size}); aborting.", + "Metadata upload failed at minimum possible batch size (1); aborting.", exc_info=True, ) raise diff --git a/tests/data_engine/test_datasource.py b/tests/data_engine/test_datasource.py index a25741fb..f059df6c 100644 --- a/tests/data_engine/test_datasource.py +++ b/tests/data_engine/test_datasource.py @@ -220,6 +220,31 @@ def test_upload_metadata_non_retryable_error_does_not_retry(ds, mocker): assert _uploaded_batch_sizes(ds) == [8] +def test_upload_metadata_retries_partial_batch_below_min(ds, mocker): + entries = [ + DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(10) + ] + + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 4) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + + has_failed = {"value": False} + + def _flaky_upload(_ds, upload_entries): + if len(upload_entries) == 2 and not has_failed["value"]: + has_failed["value"] = True + raise TimeoutError("simulated timeout") + + ds.source.client.update_metadata.side_effect = _flaky_upload + + ds._upload_metadata(entries) + + assert has_failed["value"] + assert _uploaded_batch_sizes(ds) == [8, 2, 1, 1] + + def test_pandas_timestamp(ds): data_dict = { "file": ["test1", "test2"], From 4a767ab55ea32d3e09a1fb746ca079bfd2fc47d4 Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Tue, 3 Mar 2026 16:40:04 +0200 Subject: [PATCH 04/11] Add retry backoff for metadata upload failures --- dagshub/data_engine/model/datasource.py | 6 ++++++ tests/data_engine/test_datasource.py | 25 +++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index da44c944..9bf7b223 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -827,6 +827,7 @@ def _is_retryable_upload_error(exc: Exception) -> bool: last_good_batch_size: Optional[int] = None last_bad_batch_size: Optional[int] = None + consecutive_retryable_failures = 0 with progress: start = 0 @@ -856,6 +857,10 @@ def _is_retryable_upload_error(exc: Exception) -> bool: ) raise + consecutive_retryable_failures += 1 + retry_delay_sec = min(5.0, 0.25 * (2 ** min(consecutive_retryable_failures - 1, 4))) + time.sleep(retry_delay_sec) + last_bad_batch_size = ( batch_size if last_bad_batch_size is None else min(last_bad_batch_size, batch_size) ) @@ -867,6 +872,7 @@ def _is_retryable_upload_error(exc: Exception) -> bool: continue elapsed = time.monotonic() - start_time + consecutive_retryable_failures = 0 start += batch_size progress.update(total_task, advance=batch_size) diff --git a/tests/data_engine/test_datasource.py b/tests/data_engine/test_datasource.py index f059df6c..63cc4962 100644 --- a/tests/data_engine/test_datasource.py +++ b/tests/data_engine/test_datasource.py @@ -245,6 +245,31 @@ def _flaky_upload(_ds, upload_entries): assert _uploaded_batch_sizes(ds) == [8, 2, 1, 1] +def test_upload_metadata_backoff_resets_after_success(ds, mocker): + entries = [ + DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(12) + ] + + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + sleep_mock = mocker.patch("dagshub.data_engine.model.datasource.time.sleep") + + call_idx = {"value": 0} + + def _flaky_upload(_ds, _upload_entries): + call_idx["value"] += 1 + if call_idx["value"] in {1, 3}: + raise TimeoutError("simulated timeout") + + ds.source.client.update_metadata.side_effect = _flaky_upload + + ds._upload_metadata(entries) + + assert [c.args[0] for c in sleep_mock.call_args_list] == [0.25, 0.25] + + def test_pandas_timestamp(ds): data_dict = { "file": ["test1", "test2"], From 0f5e9d74dfc4dbfbb2355e5bd0aac25ab01070b2 Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Tue, 3 Mar 2026 17:50:47 +0200 Subject: [PATCH 05/11] Honor min batch size on retry failures --- dagshub/data_engine/model/datasource.py | 8 ++++---- tests/data_engine/test_datasource.py | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index 9bf7b223..af639b25 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -789,7 +789,7 @@ def _next_batch_after_bad( ) -> int: # If we're already below the configured minimum (for example, last partial chunk), # keep shrinking until we reach 1. - if batch_size <= min_batch_size: + if batch_size < min_batch_size: return max(1, batch_size - 1) upper_bound = bad_batch_size if bad_batch_size is not None else batch_size @@ -801,7 +801,7 @@ def _next_batch_after_bad( next_batch_size = max(min_batch_size, min(max_batch_size, next_batch_size)) if next_batch_size >= batch_size: - next_batch_size = max(1, batch_size - 1) + next_batch_size = max(min_batch_size, batch_size - 1) return next_batch_size def _is_retryable_upload_error(exc: Exception) -> bool: @@ -850,9 +850,9 @@ def _is_retryable_upload_error(exc: Exception) -> bool: logger.error("Metadata upload failed with a non-retryable error; aborting.", exc_info=True) raise - if batch_size <= 1: + if batch_size <= 1 or batch_size == min_batch_size: logger.error( - "Metadata upload failed at minimum possible batch size (1); aborting.", + f"Metadata upload failed at minimum batch size ({batch_size}); aborting.", exc_info=True, ) raise diff --git a/tests/data_engine/test_datasource.py b/tests/data_engine/test_datasource.py index 63cc4962..446928d3 100644 --- a/tests/data_engine/test_datasource.py +++ b/tests/data_engine/test_datasource.py @@ -270,6 +270,25 @@ def _flaky_upload(_ds, _upload_entries): assert [c.args[0] for c in sleep_mock.call_args_list] == [0.25, 0.25] +def test_upload_metadata_aborts_on_failure_at_min_batch(ds, mocker): + entries = [ + DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(6) + ] + + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 2) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + sleep_mock = mocker.patch("dagshub.data_engine.model.datasource.time.sleep") + ds.source.client.update_metadata.side_effect = TimeoutError("simulated timeout") + + with pytest.raises(TimeoutError, match="simulated timeout"): + ds._upload_metadata(entries) + + assert _uploaded_batch_sizes(ds) == [2] + sleep_mock.assert_not_called() + + def test_pandas_timestamp(ds): data_dict = { "file": ["test1", "test2"], From 82e4503f4d01236818a76470cd9f07da8ca3bc97 Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Wed, 4 Mar 2026 00:34:31 +0200 Subject: [PATCH 06/11] Mock retry backoff sleep in upload tests --- tests/data_engine/test_datasource.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/data_engine/test_datasource.py b/tests/data_engine/test_datasource.py index 446928d3..27b567e2 100644 --- a/tests/data_engine/test_datasource.py +++ b/tests/data_engine/test_datasource.py @@ -171,6 +171,7 @@ def test_upload_metadata_retries_with_smaller_batch_after_failure(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch("dagshub.data_engine.model.datasource.time.sleep", return_value=None) has_failed = {"value": False} @@ -229,6 +230,7 @@ def test_upload_metadata_retries_partial_batch_below_min(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 4) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch("dagshub.data_engine.model.datasource.time.sleep", return_value=None) has_failed = {"value": False} From 996b7fc44d690afde8aede5d5baaeaa43081c9aa Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Wed, 4 Mar 2026 00:57:07 +0200 Subject: [PATCH 07/11] Avoid reusing known-bad upload batch size --- dagshub/data_engine/model/datasource.py | 4 ++++ tests/data_engine/test_datasource.py | 26 +++++++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index af639b25..edf31754 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -778,8 +778,12 @@ def _next_batch_after_success(batch_size: int, bad_batch_size: Optional[int]) -> next_batch_size = batch_size * 2 next_batch_size = min(max_batch_size, next_batch_size) + if bad_batch_size is not None and bad_batch_size > min_batch_size and next_batch_size >= bad_batch_size: + next_batch_size = bad_batch_size - 1 if next_batch_size <= batch_size and batch_size < max_batch_size: next_batch_size = batch_size + 1 + if bad_batch_size is not None and bad_batch_size > min_batch_size and next_batch_size >= bad_batch_size: + next_batch_size = bad_batch_size - 1 return max(min_batch_size, next_batch_size) def _next_batch_after_bad( diff --git a/tests/data_engine/test_datasource.py b/tests/data_engine/test_datasource.py index 27b567e2..1733c162 100644 --- a/tests/data_engine/test_datasource.py +++ b/tests/data_engine/test_datasource.py @@ -188,6 +188,32 @@ def _flaky_upload(_ds, upload_entries): assert _uploaded_batch_sizes(ds) == [8, 4, 6] +def test_upload_metadata_does_not_retry_known_bad_batch_size(ds, mocker): + entries = [ + DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(32) + ] + + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 16) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch("dagshub.data_engine.model.datasource.time.sleep", return_value=None) + + has_failed = {"value": False} + + def _flaky_upload(_ds, upload_entries): + if len(upload_entries) == 8 and not has_failed["value"]: + has_failed["value"] = True + raise TimeoutError("simulated timeout") + + ds.source.client.update_metadata.side_effect = _flaky_upload + + ds._upload_metadata(entries) + + assert has_failed["value"] + assert _uploaded_batch_sizes(ds) == [8, 4, 6, 7, 7, 7, 1] + + def test_upload_metadata_slow_success_reduces_batch_size(ds, mocker): entries = [ DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(12) From 2338c949fd6b68c38417a3e1d6bac0a2ba8111db Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Wed, 4 Mar 2026 10:51:40 +0200 Subject: [PATCH 08/11] Extract metadata upload retry backoff constants --- dagshub/data_engine/model/datasource.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index edf31754..553b068f 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -90,6 +90,10 @@ MLFLOW_DATASOURCE_TAG_NAME = "dagshub.datasets.datasource_id" MLFLOW_DATASET_TAG_NAME = "dagshub.datasets.dataset_id" +METADATA_UPLOAD_RETRY_BACKOFF_BASE_SECONDS = 0.25 +METADATA_UPLOAD_RETRY_BACKOFF_MAX_SECONDS = 5.0 +METADATA_UPLOAD_RETRY_BACKOFF_EXPONENT_CAP = 4 + @dataclass class DatapointDeleteMetadataEntry(DataClassJsonMixin): @@ -862,7 +866,12 @@ def _is_retryable_upload_error(exc: Exception) -> bool: raise consecutive_retryable_failures += 1 - retry_delay_sec = min(5.0, 0.25 * (2 ** min(consecutive_retryable_failures - 1, 4))) + # Bounded exponential backoff: 0.25s, 0.5s, 1s, 2s, 4s, then capped at 5s. + retry_delay_sec = min( + METADATA_UPLOAD_RETRY_BACKOFF_MAX_SECONDS, + METADATA_UPLOAD_RETRY_BACKOFF_BASE_SECONDS + * (2 ** min(consecutive_retryable_failures - 1, METADATA_UPLOAD_RETRY_BACKOFF_EXPONENT_CAP)), + ) time.sleep(retry_delay_sec) last_bad_batch_size = ( From b884e9ea3867711daeac64ad5c28a8e29e9b6a01 Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Wed, 4 Mar 2026 11:02:51 +0200 Subject: [PATCH 09/11] Align metadata upload backoff cap with schedule --- dagshub/data_engine/model/datasource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index 553b068f..37491e51 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -91,7 +91,7 @@ MLFLOW_DATASET_TAG_NAME = "dagshub.datasets.dataset_id" METADATA_UPLOAD_RETRY_BACKOFF_BASE_SECONDS = 0.25 -METADATA_UPLOAD_RETRY_BACKOFF_MAX_SECONDS = 5.0 +METADATA_UPLOAD_RETRY_BACKOFF_MAX_SECONDS = 4.0 METADATA_UPLOAD_RETRY_BACKOFF_EXPONENT_CAP = 4 @@ -866,7 +866,7 @@ def _is_retryable_upload_error(exc: Exception) -> bool: raise consecutive_retryable_failures += 1 - # Bounded exponential backoff: 0.25s, 0.5s, 1s, 2s, 4s, then capped at 5s. + # Bounded exponential backoff: 0.25s, 0.5s, 1s, 2s, then capped at 4s. retry_delay_sec = min( METADATA_UPLOAD_RETRY_BACKOFF_MAX_SECONDS, METADATA_UPLOAD_RETRY_BACKOFF_BASE_SECONDS From 4c1132c69820588bc6e6eef73212e7884676bd9f Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Thu, 5 Mar 2026 16:20:41 +0200 Subject: [PATCH 10/11] Refactor adaptive metadata upload sizing and retries --- dagshub/common/config.py | 20 ++- dagshub/data_engine/model/datasource.py | 114 +++++------------ .../model/metadata/upload_batching.py | 116 ++++++++++++++++++ tests/data_engine/test_datasource.py | 26 ++-- 4 files changed, 176 insertions(+), 100 deletions(-) create mode 100644 dagshub/data_engine/model/metadata/upload_batching.py diff --git a/dagshub/common/config.py b/dagshub/common/config.py index 60084863..9a2e75f6 100644 --- a/dagshub/common/config.py +++ b/dagshub/common/config.py @@ -58,10 +58,16 @@ def set_host(new_host: str): recommended_annotate_limit = int(os.environ.get(RECOMMENDED_ANNOTATE_LIMIT_KEY, 1e5)) DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_KEY = "DAGSHUB_DE_METADATA_UPLOAD_BATCH_SIZE" -dataengine_metadata_upload_batch_size = int(os.environ.get(DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_KEY, 15000)) +DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_MAX_KEY = "DAGSHUB_DE_METADATA_UPLOAD_BATCH_SIZE_MAX" +dataengine_metadata_upload_batch_size = int( + os.environ.get( + DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_MAX_KEY, + os.environ.get(DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_KEY, 15000), + ) +) DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_MIN_KEY = "DAGSHUB_DE_METADATA_UPLOAD_BATCH_SIZE_MIN" -dataengine_metadata_upload_batch_size_min = int(os.environ.get(DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_MIN_KEY, 150)) +dataengine_metadata_upload_batch_size_min = int(os.environ.get(DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_MIN_KEY, 1)) DATAENGINE_METADATA_UPLOAD_BATCH_SIZE_INITIAL_KEY = "DAGSHUB_DE_METADATA_UPLOAD_BATCH_SIZE_INITIAL" dataengine_metadata_upload_batch_size_initial = int( @@ -69,9 +75,15 @@ def set_host(new_host: str): ) DATAENGINE_METADATA_UPLOAD_TARGET_BATCH_TIME_KEY = "DAGSHUB_DE_METADATA_UPLOAD_TARGET_BATCH_TIME" -dataengine_metadata_upload_target_batch_time = float( - os.environ.get(DATAENGINE_METADATA_UPLOAD_TARGET_BATCH_TIME_KEY, 5.0) +DATAENGINE_METADATA_UPLOAD_TARGET_BATCH_TIME_SECONDS_KEY = "DAGSHUB_DE_METADATA_UPLOAD_TARGET_BATCH_TIME_SECONDS" +dataengine_metadata_upload_target_batch_time_seconds = float( + os.environ.get( + DATAENGINE_METADATA_UPLOAD_TARGET_BATCH_TIME_SECONDS_KEY, + os.environ.get(DATAENGINE_METADATA_UPLOAD_TARGET_BATCH_TIME_KEY, 5.0), + ) ) +# Backwards compatibility for code that imports the old module attribute name. +dataengine_metadata_upload_target_batch_time = dataengine_metadata_upload_target_batch_time_seconds DISABLE_ANALYTICS_KEY = "DAGSHUB_DISABLE_ANALYTICS" disable_analytics = "DAGSHUB_DISABLE_ANALYTICS" in os.environ diff --git a/dagshub/data_engine/model/datasource.py b/dagshub/data_engine/model/datasource.py index 37491e51..447f58ab 100644 --- a/dagshub/data_engine/model/datasource.py +++ b/dagshub/data_engine/model/datasource.py @@ -16,10 +16,7 @@ import rich.progress from dataclasses_json import DataClassJsonMixin, LetterCase, config -from gql.transport.exceptions import TransportConnectionFailed, TransportServerError from pathvalidate import sanitize_filepath -from requests.exceptions import ConnectionError as RequestsConnectionError -from requests.exceptions import Timeout as RequestsTimeout import dagshub.common.config from dagshub.common import rich_console @@ -45,7 +42,6 @@ from dagshub.data_engine.model.datapoint import Datapoint from dagshub.data_engine.model.datasource_state import DatasourceState from dagshub.data_engine.model.errors import ( - DataEngineGqlError, DatasetFieldComparisonError, DatasetNotFoundError, FieldNotFoundError, @@ -57,6 +53,13 @@ run_preupload_transforms, validate_uploading_metadata, ) +from dagshub.data_engine.model.metadata.upload_batching import ( + AdaptiveUploadBatchConfig, + get_retry_delay_seconds, + is_retryable_metadata_upload_error, + next_batch_after_retryable_failure, + next_batch_after_success, +) from dagshub.data_engine.model.metadata.dtypes import DatapointMetadataUpdateEntry from dagshub.data_engine.model.metadata.transforms import DatasourceFieldInfo, _add_metadata from dagshub.data_engine.model.metadata_field_builder import MetadataFieldBuilder @@ -90,10 +93,6 @@ MLFLOW_DATASOURCE_TAG_NAME = "dagshub.datasets.datasource_id" MLFLOW_DATASET_TAG_NAME = "dagshub.datasets.dataset_id" -METADATA_UPLOAD_RETRY_BACKOFF_BASE_SECONDS = 0.25 -METADATA_UPLOAD_RETRY_BACKOFF_MAX_SECONDS = 4.0 -METADATA_UPLOAD_RETRY_BACKOFF_EXPONENT_CAP = 4 - @dataclass class DatapointDeleteMetadataEntry(DataClassJsonMixin): @@ -763,73 +762,17 @@ def _upload_metadata(self, metadata_entries: List[DatapointMetadataUpdateEntry]) progress = get_rich_progress(rich.progress.MofNCompleteColumn()) - max_batch_size = max(1, dagshub.common.config.dataengine_metadata_upload_batch_size) - min_batch_size = max( - 1, - min(dagshub.common.config.dataengine_metadata_upload_batch_size_min, max_batch_size), - ) - current_batch_size = max( - min_batch_size, - min(dagshub.common.config.dataengine_metadata_upload_batch_size_initial, max_batch_size), + batch_config = AdaptiveUploadBatchConfig.from_values( + max_batch_size=dagshub.common.config.dataengine_metadata_upload_batch_size, + min_batch_size=dagshub.common.config.dataengine_metadata_upload_batch_size_min, + initial_batch_size=dagshub.common.config.dataengine_metadata_upload_batch_size_initial, + target_batch_time_seconds=dagshub.common.config.dataengine_metadata_upload_target_batch_time_seconds, ) - target_batch_time = max(dagshub.common.config.dataengine_metadata_upload_target_batch_time, 0.01) - - def _next_batch_after_success(batch_size: int, bad_batch_size: Optional[int]) -> int: - # Keep expanding quickly until we find an upper bound, then binary-search between good and bad. - if bad_batch_size is not None and batch_size < bad_batch_size: - next_batch_size = batch_size + max(1, (bad_batch_size - batch_size) // 2) - else: - next_batch_size = batch_size * 2 - - next_batch_size = min(max_batch_size, next_batch_size) - if bad_batch_size is not None and bad_batch_size > min_batch_size and next_batch_size >= bad_batch_size: - next_batch_size = bad_batch_size - 1 - if next_batch_size <= batch_size and batch_size < max_batch_size: - next_batch_size = batch_size + 1 - if bad_batch_size is not None and bad_batch_size > min_batch_size and next_batch_size >= bad_batch_size: - next_batch_size = bad_batch_size - 1 - return max(min_batch_size, next_batch_size) - - def _next_batch_after_bad( - batch_size: int, - good_batch_size: Optional[int], - bad_batch_size: Optional[int], - ) -> int: - # If we're already below the configured minimum (for example, last partial chunk), - # keep shrinking until we reach 1. - if batch_size < min_batch_size: - return max(1, batch_size - 1) - - upper_bound = bad_batch_size if bad_batch_size is not None else batch_size - - if good_batch_size is not None and good_batch_size < upper_bound: - next_batch_size = good_batch_size + max(1, (upper_bound - good_batch_size) // 2) - else: - next_batch_size = upper_bound // 2 - - next_batch_size = max(min_batch_size, min(max_batch_size, next_batch_size)) - if next_batch_size >= batch_size: - next_batch_size = max(min_batch_size, batch_size - 1) - return next_batch_size - - def _is_retryable_upload_error(exc: Exception) -> bool: - if isinstance(exc, DataEngineGqlError): - return isinstance(exc.original_exception, (TransportServerError, TransportConnectionFailed)) - return isinstance( - exc, - ( - TransportServerError, - TransportConnectionFailed, - TimeoutError, - ConnectionError, - RequestsConnectionError, - RequestsTimeout, - ), - ) + current_batch_size = batch_config.initial_batch_size total_entries = len(metadata_entries) total_task = progress.add_task( - f"Uploading metadata (adaptive batch {current_batch_size}-{max_batch_size})...", + f"Uploading metadata (adaptive batch {batch_config.min_batch_size}-{batch_config.max_batch_size})...", total=total_entries, ) @@ -854,11 +797,11 @@ def _is_retryable_upload_error(exc: Exception) -> bool: try: self.source.client.update_metadata(self, entries) except Exception as exc: - if not _is_retryable_upload_error(exc): + if not is_retryable_metadata_upload_error(exc): logger.error("Metadata upload failed with a non-retryable error; aborting.", exc_info=True) raise - if batch_size <= 1 or batch_size == min_batch_size: + if batch_size <= 1: logger.error( f"Metadata upload failed at minimum batch size ({batch_size}); aborting.", exc_info=True, @@ -866,18 +809,18 @@ def _is_retryable_upload_error(exc: Exception) -> bool: raise consecutive_retryable_failures += 1 - # Bounded exponential backoff: 0.25s, 0.5s, 1s, 2s, then capped at 4s. - retry_delay_sec = min( - METADATA_UPLOAD_RETRY_BACKOFF_MAX_SECONDS, - METADATA_UPLOAD_RETRY_BACKOFF_BASE_SECONDS - * (2 ** min(consecutive_retryable_failures - 1, METADATA_UPLOAD_RETRY_BACKOFF_EXPONENT_CAP)), - ) + retry_delay_sec = get_retry_delay_seconds(consecutive_retryable_failures) time.sleep(retry_delay_sec) last_bad_batch_size = ( batch_size if last_bad_batch_size is None else min(last_bad_batch_size, batch_size) ) - current_batch_size = _next_batch_after_bad(batch_size, last_good_batch_size, last_bad_batch_size) + current_batch_size = next_batch_after_retryable_failure( + batch_size, + batch_config, + last_good_batch_size, + last_bad_batch_size, + ) logger.warning( f"Metadata upload failed for batch size {batch_size} " f"({exc.__class__.__name__}: {exc}). Retrying with batch size {current_batch_size}." @@ -889,16 +832,21 @@ def _is_retryable_upload_error(exc: Exception) -> bool: start += batch_size progress.update(total_task, advance=batch_size) - if elapsed <= target_batch_time: + if elapsed <= batch_config.target_batch_time_seconds: last_good_batch_size = ( batch_size if last_good_batch_size is None else max(last_good_batch_size, batch_size) ) - current_batch_size = _next_batch_after_success(batch_size, last_bad_batch_size) + current_batch_size = next_batch_after_success(batch_size, batch_config, last_bad_batch_size) else: last_bad_batch_size = ( batch_size if last_bad_batch_size is None else min(last_bad_batch_size, batch_size) ) - current_batch_size = _next_batch_after_bad(batch_size, last_good_batch_size, last_bad_batch_size) + current_batch_size = next_batch_after_retryable_failure( + batch_size, + batch_config, + last_good_batch_size, + last_bad_batch_size, + ) progress.update(total_task, completed=total_entries, refresh=True) diff --git a/dagshub/data_engine/model/metadata/upload_batching.py b/dagshub/data_engine/model/metadata/upload_batching.py new file mode 100644 index 00000000..6394ff03 --- /dev/null +++ b/dagshub/data_engine/model/metadata/upload_batching.py @@ -0,0 +1,116 @@ +from dataclasses import dataclass +from types import SimpleNamespace +from typing import Optional + +from gql.transport.exceptions import TransportConnectionFailed, TransportServerError +from requests.exceptions import ConnectionError as RequestsConnectionError +from requests.exceptions import Timeout as RequestsTimeout +from tenacity import wait_exponential + +from dagshub.data_engine.model.errors import DataEngineGqlError + +MIN_TARGET_BATCH_TIME_SECONDS = 0.01 +BATCH_GROWTH_FACTOR = 10 +RETRY_BACKOFF_BASE_SECONDS = 0.25 +RETRY_BACKOFF_MAX_SECONDS = 4.0 + +_retry_delay_strategy = wait_exponential( + multiplier=RETRY_BACKOFF_BASE_SECONDS, + min=RETRY_BACKOFF_BASE_SECONDS, + max=RETRY_BACKOFF_MAX_SECONDS, +) + + +@dataclass(frozen=True) +class AdaptiveUploadBatchConfig: + max_batch_size: int + min_batch_size: int + initial_batch_size: int + target_batch_time_seconds: float + + @classmethod + def from_values( + cls, + max_batch_size: int, + min_batch_size: int, + initial_batch_size: int, + target_batch_time_seconds: float, + ) -> "AdaptiveUploadBatchConfig": + normalized_max_batch_size = max(1, max_batch_size) + normalized_min_batch_size = max(1, min(min_batch_size, normalized_max_batch_size)) + normalized_initial_batch_size = max( + normalized_min_batch_size, + min(initial_batch_size, normalized_max_batch_size), + ) + normalized_target_batch_time_seconds = max(target_batch_time_seconds, MIN_TARGET_BATCH_TIME_SECONDS) + return cls( + max_batch_size=normalized_max_batch_size, + min_batch_size=normalized_min_batch_size, + initial_batch_size=normalized_initial_batch_size, + target_batch_time_seconds=normalized_target_batch_time_seconds, + ) + + +def _midpoint(lower_bound: int, upper_bound: int) -> int: + return lower_bound + max(1, (upper_bound - lower_bound) // 2) + + +def next_batch_after_success( + batch_size: int, + config: AdaptiveUploadBatchConfig, + bad_batch_size: Optional[int], +) -> int: + if bad_batch_size is not None and batch_size < bad_batch_size: + next_batch_size = _midpoint(batch_size, bad_batch_size) + next_batch_size = min(next_batch_size, bad_batch_size - 1) + else: + next_batch_size = batch_size * BATCH_GROWTH_FACTOR + + next_batch_size = min(config.max_batch_size, next_batch_size) + if next_batch_size <= batch_size and batch_size < config.max_batch_size: + next_batch_size = min(config.max_batch_size, batch_size + 1) + if bad_batch_size is not None: + next_batch_size = min(next_batch_size, bad_batch_size - 1) + + return max(config.min_batch_size, next_batch_size) + + +def next_batch_after_retryable_failure( + batch_size: int, + config: AdaptiveUploadBatchConfig, + good_batch_size: Optional[int], + bad_batch_size: Optional[int], +) -> int: + if batch_size <= 1: + return 1 + + upper_bound = min(batch_size, bad_batch_size) if bad_batch_size is not None else batch_size + if good_batch_size is not None and good_batch_size < upper_bound: + next_batch_size = _midpoint(good_batch_size, upper_bound) + else: + next_batch_size = batch_size // 2 + + next_batch_size = min(next_batch_size, upper_bound - 1, batch_size - 1, config.max_batch_size) + return max(1, next_batch_size) + + +def is_retryable_metadata_upload_error(exc: Exception) -> bool: + if isinstance(exc, DataEngineGqlError): + return isinstance(exc.original_exception, (TransportServerError, TransportConnectionFailed)) + + return isinstance( + exc, + ( + TransportServerError, + TransportConnectionFailed, + TimeoutError, + ConnectionError, + RequestsConnectionError, + RequestsTimeout, + ), + ) + + +def get_retry_delay_seconds(consecutive_retryable_failures: int) -> float: + retry_state = SimpleNamespace(attempt_number=max(1, consecutive_retryable_failures)) + return float(_retry_delay_strategy(retry_state)) diff --git a/tests/data_engine/test_datasource.py b/tests/data_engine/test_datasource.py index 1733c162..7e09801b 100644 --- a/tests/data_engine/test_datasource.py +++ b/tests/data_engine/test_datasource.py @@ -155,11 +155,11 @@ def test_upload_metadata_starts_small_and_grows(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 16) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 2) - mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time_seconds", 1000.0) ds._upload_metadata(entries) - assert _uploaded_batch_sizes(ds) == [2, 4, 8] + assert _uploaded_batch_sizes(ds) == [2, 12] def test_upload_metadata_retries_with_smaller_batch_after_failure(ds, mocker): @@ -170,7 +170,7 @@ def test_upload_metadata_retries_with_smaller_batch_after_failure(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) - mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time_seconds", 1000.0) mocker.patch("dagshub.data_engine.model.datasource.time.sleep", return_value=None) has_failed = {"value": False} @@ -196,7 +196,7 @@ def test_upload_metadata_does_not_retry_known_bad_batch_size(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 16) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) - mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time_seconds", 1000.0) mocker.patch("dagshub.data_engine.model.datasource.time.sleep", return_value=None) has_failed = {"value": False} @@ -211,7 +211,7 @@ def _flaky_upload(_ds, upload_entries): ds._upload_metadata(entries) assert has_failed["value"] - assert _uploaded_batch_sizes(ds) == [8, 4, 6, 7, 7, 7, 1] + assert _uploaded_batch_sizes(ds) == [8, 4, 6, 7, 7, 1] def test_upload_metadata_slow_success_reduces_batch_size(ds, mocker): @@ -222,7 +222,7 @@ def test_upload_metadata_slow_success_reduces_batch_size(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) - mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1.0) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time_seconds", 1.0) mocker.patch("dagshub.data_engine.model.datasource.time.monotonic", side_effect=[0.0, 2.0, 3.0, 3.1]) ds._upload_metadata(entries) @@ -238,7 +238,7 @@ def test_upload_metadata_non_retryable_error_does_not_retry(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) - mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time_seconds", 1000.0) ds.source.client.update_metadata.side_effect = ValueError("simulated validation error") with pytest.raises(ValueError, match="simulated validation error"): @@ -255,7 +255,7 @@ def test_upload_metadata_retries_partial_batch_below_min(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 4) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) - mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time_seconds", 1000.0) mocker.patch("dagshub.data_engine.model.datasource.time.sleep", return_value=None) has_failed = {"value": False} @@ -281,7 +281,7 @@ def test_upload_metadata_backoff_resets_after_success(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 8) - mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time_seconds", 1000.0) sleep_mock = mocker.patch("dagshub.data_engine.model.datasource.time.sleep") call_idx = {"value": 0} @@ -298,7 +298,7 @@ def _flaky_upload(_ds, _upload_entries): assert [c.args[0] for c in sleep_mock.call_args_list] == [0.25, 0.25] -def test_upload_metadata_aborts_on_failure_at_min_batch(ds, mocker): +def test_upload_metadata_retries_below_configured_min_before_aborting(ds, mocker): entries = [ DatapointMetadataUpdateEntry(f"dp-{i}", "field", str(i), MetadataFieldType.INTEGER) for i in range(6) ] @@ -306,15 +306,15 @@ def test_upload_metadata_aborts_on_failure_at_min_batch(ds, mocker): mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size", 8) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_min", 2) mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_batch_size_initial", 2) - mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time", 1000.0) + mocker.patch.object(dagshub.common.config, "dataengine_metadata_upload_target_batch_time_seconds", 1000.0) sleep_mock = mocker.patch("dagshub.data_engine.model.datasource.time.sleep") ds.source.client.update_metadata.side_effect = TimeoutError("simulated timeout") with pytest.raises(TimeoutError, match="simulated timeout"): ds._upload_metadata(entries) - assert _uploaded_batch_sizes(ds) == [2] - sleep_mock.assert_not_called() + assert _uploaded_batch_sizes(ds) == [2, 1] + assert [c.args[0] for c in sleep_mock.call_args_list] == [0.25] def test_pandas_timestamp(ds): From 6ae0a5828a5b5f753f0c1984c6f93661ac38df8c Mon Sep 17 00:00:00 2001 From: Guy Smoilovsky Date: Thu, 5 Mar 2026 16:34:43 +0200 Subject: [PATCH 11/11] Fix adaptive upload expected batch sequence test --- tests/data_engine/test_datasource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/data_engine/test_datasource.py b/tests/data_engine/test_datasource.py index 7e09801b..5fe2ea00 100644 --- a/tests/data_engine/test_datasource.py +++ b/tests/data_engine/test_datasource.py @@ -211,7 +211,7 @@ def _flaky_upload(_ds, upload_entries): ds._upload_metadata(entries) assert has_failed["value"] - assert _uploaded_batch_sizes(ds) == [8, 4, 6, 7, 7, 1] + assert _uploaded_batch_sizes(ds) == [8, 4, 6, 7, 7, 7, 1] def test_upload_metadata_slow_success_reduces_batch_size(ds, mocker):