-
Notifications
You must be signed in to change notification settings - Fork 90
Description
🚀 Feature
Add an optional max_cache_size at the CombinedStreamingDataset level that enforces a single total cache budget across all child StreamingDatasets, instead of only per-dataset cache limits.
Today, max_cache_size is only defined on StreamingDataset and passed into Cache -> BinaryReader -> PrepareChunksThread, where eviction is triggered based on the size of that dataset's cache directory.
Motivation
When composing many streaming datasets (e.g. 50+), each StreamingDataset can independently grow its cache up to its own max_cache_size (default is "100GB").
In a CombinedStreamingDataset, this can easily lead to runaway disk usage because eviction is enforced per dataset cache dir, not across the combined set:
CombinedStreamingDatasetsimply holds a list ofStreamingDatasets and instantiates iterators for each
(self._dataset_iters = [iter(dataset) for dataset in datasets]).- Each
StreamingDatasetlazily creates its ownCache(...)withmax_cache_size=self.max_cache_size. - Eviction happens when
_get_folder_size(self._config._cache_dir, ...) >= self._max_cache_sizeinsidePrepareChunksThread.
So, with N datasets you can effectively consume ~N × max_cache_size on local disk. With 50 datasets and defaults, that's an upper bound on the order of terabytes (even if you "only meant" to budget ~100GB
total).
This is especially painful in multi-node / shared environments where local scratch space is limited, and it's easy to miss because each dataset looks "correctly configured" in isolation.
Pitch
Introduce max_cache_size (and optionally an allocation strategy) on CombinedStreamingDataset, and have it enforce a global cache budget by distributing that budget across the child datasets before their caches are instantiated.
Proposed API:
CombinedStreamingDataset(
datasets=[...],
seed=42,
iterate_over_all=True,
batching_method="stratified",
max_cache_size="200GB", # NEW: total budget across all datasets
cache_allocation="proportional", # optional: "equal" | "proportional"
)Behavior:
- If
max_cache_sizeis not provided: keep current behavior (backward compatible). - If provided: compute a per-dataset budget and apply it to each child dataset's
StreamingDataset.max_cache_sizebefore callingiter(dataset)(since iterator construction triggers cache creation).
Allocation strategies:
equal:per_ds = total_budget / num_datasetsproportional(recommended default): allocate by combined sampling weights (already computed inCombinedStreamingDataset.__init__).- e.g.
per_ds_i = total_budget * weight_i - If a dataset is sampled more often, it benefits more from cache headroom.
- e.g.
Implementation details (minimal-intrusion approach):
- Add
max_cache_size: int | str | None = Noneandcache_allocation: Literal["equal","proportional"] = "proportional"toCombinedStreamingDataset.__init__. - In
CombinedStreamingDataset.__iter__, before creating_CombinedDatasetIterator, apply the computed per-dataset budget:- Set
dataset.max_cache_size = per_ds_budgetfor eachStreamingDatasetchild. StreamingDatasetusesself.max_cache_sizewhen constructingCache(...), which passes it down toBinaryReader(... max_cache_size=...)(the thingPrepareChunksThreaduses for eviction).
- Set
Edge cases / notes:
- If a child dataset already has a user-specified
max_cache_size, you can either:- override it when combined-level budget is set, or
- allow optional
cache_allocation="cap_only"semantics (combined-level acts as an upper cap but doesn't override smaller values).
- If
iterate_over_all=Trueand datasets are removed/re-added during iteration, you could keep it simple and allocate once based on initial set; or recompute allocation when active datasets change.
Eviction is already implemented and robust at the per-directory level in PrepareChunksThread. It's just ensuring the sum of directories remains bounded by controlling per-directory budgets.
Alternatives
- Manually set
max_cache_sizeon everyStreamingDataset- Becomes brittle and tedious with large mixtures.
- Users still need to manually update configs when the number of datasets changes.
- Use a single shared
cache_dir- Risky: different datasets may clash in the same directory (and even if they don't today, it's not an advertised contract).
- Still doesn't solve budgeting unless eviction becomes global over that directory.
- Implement a true global eviction policy across dataset cache dirs
- More correct but significantly more complex:
- need global accounting of chunk sizes across dirs
- need a consistent definition of "oldest" or "least needed" across datasets
- need safe deletion under concurrent workers/processes (locks already exist per-chunk, but cross-dataset coordination is non-trivial).
- The proposed budget-distribution approach solves the practical disk-exhaustion problem with minimal churn.
- More correct but significantly more complex:
Additional context
Relevant code paths showing where the current per-dataset-only limit is
enforced:
CombinedStreamingDatasetconstructs iterators for each dataset (no
cache budgeting logic at this level today).StreamingDatasetacceptsmax_cache_size(default"100GB") and
stores it.StreamingDataset._create_cache()passes
max_cache_size=self.max_cache_sizeintoCache.Cachepassesmax_cache_sizeintoBinaryReader, which spawns
PrepareChunksThread(max_cache_size=...).PrepareChunksThreadtriggers deletion based on
_get_folder_size(self._config._cache_dir) >= self._max_cache_size
(per cache dir).
This feature would prevent accidental disk exhaustion in large multi-dataset training setups while remaining fully backward compatible and minimally invasive to the existing cache/eviction architecture.