From 2011ce1312e16c4316e9eb658506eb7552a5bf56 Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Fri, 27 Feb 2026 17:18:39 +0100
Subject: [PATCH 01/14] add support for setting pipeline resources
---
src/glassflow/etl/models/pipeline.py | 2 +
src/glassflow/etl/models/resources.py | 78 +++++++++++++++++++++++++++
2 files changed, 80 insertions(+)
create mode 100644 src/glassflow/etl/models/resources.py
diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py
index 61404e6..521f709 100644
--- a/src/glassflow/etl/models/pipeline.py
+++ b/src/glassflow/etl/models/pipeline.py
@@ -7,6 +7,7 @@
from .filter import FilterConfig, FilterConfigPatch
from .join import JoinConfig, JoinConfigPatch
from .metadata import MetadataConfig
+from .resources import PipelineResourcesConfig
from .schema import Schema
from .sink import SinkConfig, SinkConfigPatch
from .source import SourceConfig, SourceConfigPatch
@@ -41,6 +42,7 @@ class PipelineConfig(BaseModel):
stateless_transformation: Optional[StatelessTransformationConfig] = Field(
default=StatelessTransformationConfig()
)
+ pipeline_resources: Optional[PipelineResourcesConfig] = Field(default=None)
@field_validator("pipeline_id")
@classmethod
diff --git a/src/glassflow/etl/models/resources.py b/src/glassflow/etl/models/resources.py
new file mode 100644
index 0000000..c29be32
--- /dev/null
+++ b/src/glassflow/etl/models/resources.py
@@ -0,0 +1,78 @@
+from typing import Annotated, Optional
+
+from pydantic import BaseModel, Field, WithJsonSchema
+
+
+class JetStreamResources(BaseModel):
+ max_age: Annotated[Optional[str], WithJsonSchema({"immuttable": True})] = Field(
+ default=None
+ )
+ max_bytes: Annotated[Optional[str], WithJsonSchema({"immuttable": True})] = Field(
+ default=None
+ )
+
+
+class NATSResources(BaseModel):
+ stream: Optional[JetStreamResources] = Field(default=None)
+
+
+class Resources(BaseModel):
+ memory: Annotated[Optional[str], WithJsonSchema({"immuttable": False})] = Field(
+ default=None
+ )
+ cpu: Annotated[Optional[str], WithJsonSchema({"immuttable": False})] = Field(
+ default=None
+ )
+
+
+class StorageResources(BaseModel):
+ size: Annotated[Optional[str], WithJsonSchema({"immuttable": True})] = Field(
+ default=None
+ )
+
+
+class TransformResources(BaseModel):
+ storage: Optional[StorageResources] = Field(default=None)
+ replicas: Annotated[Optional[int], WithJsonSchema({"immuttable": True})] = Field(
+ default=None
+ )
+ requests: Optional[Resources] = Field(default=None)
+ limits: Optional[Resources] = Field(default=None)
+
+
+class SinkResources(BaseModel):
+ replicas: Annotated[Optional[int], WithJsonSchema({"immuttable": True})] = Field(
+ default=None
+ )
+ requests: Optional[Resources] = Field(default=None)
+ limits: Optional[Resources] = Field(default=None)
+
+
+class JoinResources(BaseModel):
+ limits: Optional[Resources] = Field(default=None)
+ requests: Optional[Resources] = Field(default=None)
+ replicas: Annotated[Optional[int], WithJsonSchema({"immuttable": True})] = Field(
+ default=None
+ )
+
+
+class IngestorPodResources(BaseModel):
+ replicas: Annotated[Optional[int], WithJsonSchema({"immuttable": False})] = Field(
+ default=None
+ )
+ requests: Optional[Resources] = Field(default=None)
+ limits: Optional[Resources] = Field(default=None)
+
+
+class IngestorResources(BaseModel):
+ base: Optional[IngestorPodResources] = Field(default=None)
+ left: Optional[IngestorPodResources] = Field(default=None)
+ right: Optional[IngestorPodResources] = Field(default=None)
+
+
+class PipelineResourcesConfig(BaseModel):
+ nats: Optional[NATSResources] = Field(default=None)
+ sink: Optional[SinkResources] = Field(default=None)
+ ingestor: Optional[IngestorResources] = Field(default=None)
+ transform: Optional[TransformResources] = Field(default=None)
+ join: Optional[JoinResources] = Field(default=None)
From 384d0dc0614fb4ab6c81e3d42ce12d7aae4c9c9c Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Fri, 27 Feb 2026 17:51:59 +0100
Subject: [PATCH 02/14] use allow_mutation property to validate field updates
---
src/glassflow/etl/models/__init__.py | 2 +
src/glassflow/etl/models/resources.py | 142 ++++++++++++++++++++------
2 files changed, 112 insertions(+), 32 deletions(-)
diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py
index 683f900..66ac1d1 100644
--- a/src/glassflow/etl/models/__init__.py
+++ b/src/glassflow/etl/models/__init__.py
@@ -10,6 +10,7 @@
)
from .metadata import MetadataConfig
from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus
+from .resources import PipelineResourcesConfig
from .schema import Schema, SchemaField
from .sink import SinkConfig, SinkConfigPatch, SinkType
from .source import (
@@ -69,4 +70,5 @@
"StatelessTransformationType",
"ExpressionConfig",
"Transformation",
+ "PipelineResourcesConfig",
]
diff --git a/src/glassflow/etl/models/resources.py b/src/glassflow/etl/models/resources.py
index c29be32..2776790 100644
--- a/src/glassflow/etl/models/resources.py
+++ b/src/glassflow/etl/models/resources.py
@@ -1,78 +1,156 @@
-from typing import Annotated, Optional
+from typing import Optional
-from pydantic import BaseModel, Field, WithJsonSchema
+from pydantic import BaseModel, Field
class JetStreamResources(BaseModel):
- max_age: Annotated[Optional[str], WithJsonSchema({"immuttable": True})] = Field(
- default=None
- )
- max_bytes: Annotated[Optional[str], WithJsonSchema({"immuttable": True})] = Field(
- default=None
- )
+ max_age: Optional[str] = Field(default=None, allow_mutation=False)
+ max_bytes: Optional[str] = Field(default=None, allow_mutation=False)
+
+ def update(self, patch: "JetStreamResources") -> "JetStreamResources":
+ """Apply a patch to this jetstream resources config."""
+ updated_config = self.model_copy(deep=True)
+ if patch.max_age is not None:
+ updated_config.max_age = patch.max_age
+ if patch.max_bytes is not None:
+ updated_config.max_bytes = patch.max_bytes
+ return updated_config
class NATSResources(BaseModel):
- stream: Optional[JetStreamResources] = Field(default=None)
+ stream: Optional[JetStreamResources] = Field(default=None, allow_mutation=False)
+ def update(self, patch: "NATSResources") -> "NATSResources":
+ """Apply a patch to this jetstream resources config."""
+ updated_config = self.model_copy(deep=True)
+ if patch.stream is not None:
+ updated_config.stream = updated_config.stream.update(patch.stream)
+ return updated_config
class Resources(BaseModel):
- memory: Annotated[Optional[str], WithJsonSchema({"immuttable": False})] = Field(
- default=None
- )
- cpu: Annotated[Optional[str], WithJsonSchema({"immuttable": False})] = Field(
- default=None
- )
+ memory: Optional[str] = Field(default=None, allow_mutation=True)
+ cpu: Optional[str] = Field(default=None, allow_mutation=True)
+
+ def update(self, patch: "Resources") -> "Resources":
+ """Apply a patch to this resources config."""
+ updated_config = self.model_copy(deep=True)
+ if patch.memory is not None:
+ updated_config.memory = patch.memory
+ if patch.cpu is not None:
+ updated_config.cpu = patch.cpu
+ return updated_config
class StorageResources(BaseModel):
- size: Annotated[Optional[str], WithJsonSchema({"immuttable": True})] = Field(
- default=None
- )
+ size: Optional[str] = Field(default=None, allow_mutation=False)
+ def update(self, patch: "StorageResources") -> "StorageResources":
+ """Apply a patch to this storage resources config."""
+ updated_config = self.model_copy(deep=True)
+ if patch.size is not None:
+ updated_config.size = patch.size
+ return updated_config
class TransformResources(BaseModel):
- storage: Optional[StorageResources] = Field(default=None)
- replicas: Annotated[Optional[int], WithJsonSchema({"immuttable": True})] = Field(
- default=None
- )
+ storage: Optional[StorageResources] = Field(default=None, allow_mutation=False)
+ replicas: Optional[int] = Field(default=None, allow_mutation=False)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)
+ def update(self, patch: "TransformResources") -> "TransformResources":
+ """Apply a patch to this transform resources config."""
+ updated_config = self.model_copy(deep=True)
+ if patch.storage is not None:
+ updated_config.storage = updated_config.storage.update(patch.storage)
+ if patch.replicas is not None:
+ updated_config.replicas = patch.replicas
+ return updated_config
class SinkResources(BaseModel):
- replicas: Annotated[Optional[int], WithJsonSchema({"immuttable": True})] = Field(
- default=None
- )
+ replicas: Optional[int] = Field(default=None, allow_mutation=False)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)
+ def update(self, patch: "SinkResources") -> "SinkResources":
+ """Apply a patch to this sink resources config."""
+ updated_config = self.model_copy(deep=True)
+ if patch.replicas is not None:
+ updated_config.replicas = patch.replicas
+ if patch.requests is not None:
+ updated_config.requests = updated_config.requests.update(patch.requests)
+ if patch.limits is not None:
+ updated_config.limits = updated_config.limits.update(patch.limits)
+ return updated_config
class JoinResources(BaseModel):
limits: Optional[Resources] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
- replicas: Annotated[Optional[int], WithJsonSchema({"immuttable": True})] = Field(
- default=None
- )
+ replicas: Optional[int] = Field(default=None, allow_mutation=False)
+
+ def update(self, patch: "JoinResources") -> "JoinResources":
+ """Apply a patch to this join resources config."""
+ updated_config = self.model_copy(deep=True)
+ if patch.limits is not None:
+ updated_config.limits = updated_config.limits.update(patch.limits)
+ if patch.requests is not None:
+ updated_config.requests = updated_config.requests.update(patch.requests)
+ if patch.replicas is not None:
+ updated_config.replicas = patch.replicas
class IngestorPodResources(BaseModel):
- replicas: Annotated[Optional[int], WithJsonSchema({"immuttable": False})] = Field(
- default=None
- )
+ replicas: Optional[int] = Field(default=None, allow_mutation=True)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)
+ def update(self, patch: "IngestorPodResources") -> "IngestorPodResources":
+ """Apply a patch to this ingestor pod resources config."""
+ updated_config = self.model_copy(deep=True)
+ if patch.replicas is not None:
+ updated_config.replicas = patch.replicas
+ if patch.requests is not None:
+ updated_config.requests = updated_config.requests.update(patch.requests)
+ if patch.limits is not None:
+ updated_config.limits = updated_config.limits.update(patch.limits)
+ return updated_config
+
class IngestorResources(BaseModel):
base: Optional[IngestorPodResources] = Field(default=None)
left: Optional[IngestorPodResources] = Field(default=None)
right: Optional[IngestorPodResources] = Field(default=None)
+ def update(self, patch: "IngestorResources") -> "IngestorResources":
+ """Apply a patch to this ingestor resources config."""
+ updated_config = self.model_copy(deep=True)
+
+ if patch.base is not None:
+ updated_config.base = updated_config.base.update(patch.base)
+ if patch.left is not None:
+ updated_config.left = updated_config.left.update(patch.left)
+ if patch.right is not None:
+ updated_config.right = updated_config.right.update(patch.right)
+ return updated_config
class PipelineResourcesConfig(BaseModel):
- nats: Optional[NATSResources] = Field(default=None)
+ nats: Optional[NATSResources] = Field(default=None, allow_mutation=False)
sink: Optional[SinkResources] = Field(default=None)
ingestor: Optional[IngestorResources] = Field(default=None)
transform: Optional[TransformResources] = Field(default=None)
join: Optional[JoinResources] = Field(default=None)
+
+ def update(self, patch: "PipelineResourcesConfig") -> "PipelineResourcesConfig":
+ """Apply a patch to this pipeline resources config."""
+ updated_config = self.model_copy(deep=True)
+
+ if patch.nats is not None:
+ updated_config.nats = updated_config.nats.update(patch.nats)
+ if patch.sink is not None:
+ updated_config.sink = updated_config.sink.update(patch.sink)
+ if patch.ingestor is not None:
+ updated_config.ingestor = updated_config.ingestor.update(patch.ingestor)
+ if patch.transform is not None:
+ updated_config.transform = updated_config.transform.update(patch.transform)
+ if patch.join is not None:
+ updated_config.join = updated_config.join.update(patch.join)
+ return updated_config
From 195bf97639c30134daf0de627c35fee56fc6a463 Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Fri, 27 Feb 2026 17:57:05 +0100
Subject: [PATCH 03/14] use frozen instead of deprecated allow_mutation
---
src/glassflow/etl/models/resources.py | 28 +++++++++++++++------------
1 file changed, 16 insertions(+), 12 deletions(-)
diff --git a/src/glassflow/etl/models/resources.py b/src/glassflow/etl/models/resources.py
index 2776790..15376de 100644
--- a/src/glassflow/etl/models/resources.py
+++ b/src/glassflow/etl/models/resources.py
@@ -4,8 +4,8 @@
class JetStreamResources(BaseModel):
- max_age: Optional[str] = Field(default=None, allow_mutation=False)
- max_bytes: Optional[str] = Field(default=None, allow_mutation=False)
+ max_age: Optional[str] = Field(default=None, frozen=True)
+ max_bytes: Optional[str] = Field(default=None, frozen=True)
def update(self, patch: "JetStreamResources") -> "JetStreamResources":
"""Apply a patch to this jetstream resources config."""
@@ -18,7 +18,7 @@ def update(self, patch: "JetStreamResources") -> "JetStreamResources":
class NATSResources(BaseModel):
- stream: Optional[JetStreamResources] = Field(default=None, allow_mutation=False)
+ stream: Optional[JetStreamResources] = Field(default=None)
def update(self, patch: "NATSResources") -> "NATSResources":
"""Apply a patch to this jetstream resources config."""
@@ -28,8 +28,8 @@ def update(self, patch: "NATSResources") -> "NATSResources":
return updated_config
class Resources(BaseModel):
- memory: Optional[str] = Field(default=None, allow_mutation=True)
- cpu: Optional[str] = Field(default=None, allow_mutation=True)
+ memory: Optional[str] = Field(default=None)
+ cpu: Optional[str] = Field(default=None)
def update(self, patch: "Resources") -> "Resources":
"""Apply a patch to this resources config."""
@@ -42,7 +42,7 @@ def update(self, patch: "Resources") -> "Resources":
class StorageResources(BaseModel):
- size: Optional[str] = Field(default=None, allow_mutation=False)
+ size: Optional[str] = Field(default=None, frozen=True)
def update(self, patch: "StorageResources") -> "StorageResources":
"""Apply a patch to this storage resources config."""
@@ -52,8 +52,8 @@ def update(self, patch: "StorageResources") -> "StorageResources":
return updated_config
class TransformResources(BaseModel):
- storage: Optional[StorageResources] = Field(default=None, allow_mutation=False)
- replicas: Optional[int] = Field(default=None, allow_mutation=False)
+ storage: Optional[StorageResources] = Field(default=None)
+ replicas: Optional[int] = Field(default=None, frozen=True)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)
@@ -64,10 +64,14 @@ def update(self, patch: "TransformResources") -> "TransformResources":
updated_config.storage = updated_config.storage.update(patch.storage)
if patch.replicas is not None:
updated_config.replicas = patch.replicas
+ if patch.requests is not None:
+ updated_config.requests = updated_config.requests.update(patch.requests)
+ if patch.limits is not None:
+ updated_config.limits = updated_config.limits.update(patch.limits)
return updated_config
class SinkResources(BaseModel):
- replicas: Optional[int] = Field(default=None, allow_mutation=False)
+ replicas: Optional[int] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)
@@ -85,7 +89,7 @@ def update(self, patch: "SinkResources") -> "SinkResources":
class JoinResources(BaseModel):
limits: Optional[Resources] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
- replicas: Optional[int] = Field(default=None, allow_mutation=False)
+ replicas: Optional[int] = Field(default=None, frozen=True)
def update(self, patch: "JoinResources") -> "JoinResources":
"""Apply a patch to this join resources config."""
@@ -99,7 +103,7 @@ def update(self, patch: "JoinResources") -> "JoinResources":
class IngestorPodResources(BaseModel):
- replicas: Optional[int] = Field(default=None, allow_mutation=True)
+ replicas: Optional[int] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)
@@ -133,7 +137,7 @@ def update(self, patch: "IngestorResources") -> "IngestorResources":
return updated_config
class PipelineResourcesConfig(BaseModel):
- nats: Optional[NATSResources] = Field(default=None, allow_mutation=False)
+ nats: Optional[NATSResources] = Field(default=None)
sink: Optional[SinkResources] = Field(default=None)
ingestor: Optional[IngestorResources] = Field(default=None)
transform: Optional[TransformResources] = Field(default=None)
From 8d67bc542419668e4fbab8ef69f5585aede40dff Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Fri, 27 Feb 2026 17:57:23 +0100
Subject: [PATCH 04/14] format code
---
src/glassflow/etl/models/resources.py | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/src/glassflow/etl/models/resources.py b/src/glassflow/etl/models/resources.py
index 15376de..8c2889c 100644
--- a/src/glassflow/etl/models/resources.py
+++ b/src/glassflow/etl/models/resources.py
@@ -27,6 +27,7 @@ def update(self, patch: "NATSResources") -> "NATSResources":
updated_config.stream = updated_config.stream.update(patch.stream)
return updated_config
+
class Resources(BaseModel):
memory: Optional[str] = Field(default=None)
cpu: Optional[str] = Field(default=None)
@@ -51,6 +52,7 @@ def update(self, patch: "StorageResources") -> "StorageResources":
updated_config.size = patch.size
return updated_config
+
class TransformResources(BaseModel):
storage: Optional[StorageResources] = Field(default=None)
replicas: Optional[int] = Field(default=None, frozen=True)
@@ -70,6 +72,7 @@ def update(self, patch: "TransformResources") -> "TransformResources":
updated_config.limits = updated_config.limits.update(patch.limits)
return updated_config
+
class SinkResources(BaseModel):
replicas: Optional[int] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
@@ -86,6 +89,7 @@ def update(self, patch: "SinkResources") -> "SinkResources":
updated_config.limits = updated_config.limits.update(patch.limits)
return updated_config
+
class JoinResources(BaseModel):
limits: Optional[Resources] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
@@ -136,6 +140,7 @@ def update(self, patch: "IngestorResources") -> "IngestorResources":
updated_config.right = updated_config.right.update(patch.right)
return updated_config
+
class PipelineResourcesConfig(BaseModel):
nats: Optional[NATSResources] = Field(default=None)
sink: Optional[SinkResources] = Field(default=None)
From f9847317d67da0ab9bc646666a0089c9708f617a Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
Date: Fri, 27 Feb 2026 16:58:26 +0000
Subject: [PATCH 05/14] docs: update coverage badge
---
README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/README.md b/README.md
index 70186d2..cb8988a 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@
-
+
From 8f4a2204ed674b74e1d7f27bf3829120caa5ce71 Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
Date: Fri, 27 Feb 2026 17:04:02 +0000
Subject: [PATCH 06/14] chore: bump version to 3.8.0
---
VERSION | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/VERSION b/VERSION
index c1e43e6..1981190 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.7.3
+3.8.0
From 8546915e24bf83dce4f8212e0e4986e4c39545a3 Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Fri, 27 Feb 2026 18:10:40 +0100
Subject: [PATCH 07/14] add resources to tests
---
tests/conftest.py | 6 +++
tests/data/pipeline_configs.py | 37 ++++++++++++++
tests/test_models/test_pipeline_config.py | 61 +++++++++++++++++++++++
3 files changed, 104 insertions(+)
diff --git a/tests/conftest.py b/tests/conftest.py
index c37af4b..128c428 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -55,6 +55,12 @@ def valid_config_without_joins() -> dict:
return pipeline_configs.get_valid_config_without_joins()
+@pytest.fixture
+def valid_config_with_pipeline_resources() -> dict:
+ """Fixture for a valid pipeline configuration including pipeline_resources."""
+ return pipeline_configs.get_valid_config_with_pipeline_resources()
+
+
@pytest.fixture
def valid_config_with_dedup_disabled() -> dict:
"""Fixture for a valid pipeline configuration with deduplication disabled."""
diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py
index a0f9ca6..36b1863 100644
--- a/tests/data/pipeline_configs.py
+++ b/tests/data/pipeline_configs.py
@@ -222,6 +222,43 @@ def get_valid_config_without_joins() -> dict:
}
+def get_valid_config_with_pipeline_resources() -> dict:
+ """Get a valid pipeline configuration including pipeline_resources."""
+ config = copy.deepcopy(get_valid_pipeline_config())
+ config["pipeline_resources"] = {
+ "nats": {
+ "stream": {
+ "max_age": "72h",
+ "max_bytes": "1GB",
+ },
+ },
+ "sink": {
+ "replicas": 2,
+ "requests": {"memory": "256Mi", "cpu": "100m"},
+ "limits": {"memory": "512Mi", "cpu": "500m"},
+ },
+ "transform": {
+ "storage": {"size": "10Gi"},
+ "replicas": 1,
+ "requests": {"memory": "128Mi", "cpu": "50m"},
+ "limits": {"memory": "256Mi", "cpu": "200m"},
+ },
+ "join": {
+ "replicas": 1,
+ "requests": {"memory": "64Mi", "cpu": "25m"},
+ "limits": {"memory": "128Mi", "cpu": "100m"},
+ },
+ "ingestor": {
+ "base": {
+ "replicas": 2,
+ "requests": {"memory": "128Mi", "cpu": "50m"},
+ "limits": {"memory": "256Mi", "cpu": "200m"},
+ },
+ },
+ }
+ return config
+
+
def get_valid_config_with_dedup_disabled() -> dict:
"""Get a valid pipeline configuration with deduplication disabled."""
config = copy.deepcopy(get_valid_pipeline_config())
diff --git a/tests/test_models/test_pipeline_config.py b/tests/test_models/test_pipeline_config.py
index cfe85e7..dd744cc 100644
--- a/tests/test_models/test_pipeline_config.py
+++ b/tests/test_models/test_pipeline_config.py
@@ -115,3 +115,64 @@ def test_pipeline_config_pipeline_name_not_provided(self, valid_config):
)
assert config.pipeline_id == "test-pipeline"
assert config.name == "Test Pipeline"
+
+ def test_pipeline_config_creation_with_pipeline_resources(
+ self, valid_config_with_pipeline_resources
+ ):
+ """Test PipelineConfig creation and validation with pipeline_resources."""
+ config = models.PipelineConfig(**valid_config_with_pipeline_resources)
+ assert config.pipeline_id == "test-pipeline"
+ assert config.pipeline_resources is not None
+
+ resources = config.pipeline_resources
+ # NATS / JetStream
+ assert resources.nats is not None
+ assert resources.nats.stream is not None
+ assert resources.nats.stream.max_age == "72h"
+ assert resources.nats.stream.max_bytes == "1GB"
+
+ # Sink
+ assert resources.sink is not None
+ assert resources.sink.replicas == 2
+ assert resources.sink.requests is not None
+ assert resources.sink.requests.memory == "256Mi"
+ assert resources.sink.requests.cpu == "100m"
+ assert resources.sink.limits is not None
+ assert resources.sink.limits.memory == "512Mi"
+ assert resources.sink.limits.cpu == "500m"
+
+ # Transform
+ assert resources.transform is not None
+ assert resources.transform.storage is not None
+ assert resources.transform.storage.size == "10Gi"
+ assert resources.transform.replicas == 1
+ assert resources.transform.requests.memory == "128Mi"
+ assert resources.transform.limits.cpu == "200m"
+
+ # Join
+ assert resources.join is not None
+ assert resources.join.replicas == 1
+ assert resources.join.requests.memory == "64Mi"
+ assert resources.join.limits.cpu == "100m"
+
+ # Ingestor
+ assert resources.ingestor is not None
+ assert resources.ingestor.base is not None
+ assert resources.ingestor.base.replicas == 2
+ assert resources.ingestor.base.requests.memory == "128Mi"
+
+ def test_pipeline_config_pipeline_resources_optional(self, valid_config):
+ """Test that pipeline_resources is optional and defaults to None."""
+ config = models.PipelineConfig(**valid_config)
+ assert config.pipeline_id == "test-pipeline"
+ assert config.pipeline_resources is None
+
+ def test_pipeline_config_pipeline_resources_partial(self, valid_config):
+ """Test PipelineConfig with partial pipeline_resources (only some sections)."""
+ config_data = {**valid_config, "pipeline_resources": {"sink": {"replicas": 3}}}
+ config = models.PipelineConfig(**config_data)
+ assert config.pipeline_resources is not None
+ assert config.pipeline_resources.sink is not None
+ assert config.pipeline_resources.sink.replicas == 3
+ assert config.pipeline_resources.nats is None
+ assert config.pipeline_resources.transform is None
From d4e0e9cefaa63c0bab5f7e0a03df872624e496b9 Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Mon, 2 Mar 2026 11:15:59 +0100
Subject: [PATCH 08/14] add deprecation warning in sources.topics[].replicas
---
src/glassflow/etl/models/source.py | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py
index dbeeb8d..c1b1500 100644
--- a/src/glassflow/etl/models/source.py
+++ b/src/glassflow/etl/models/source.py
@@ -95,7 +95,10 @@ class TopicConfig(BaseModel):
consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.LATEST
name: str
deduplication: Optional[DeduplicationConfig] = Field(default=DeduplicationConfig())
- replicas: Optional[int] = Field(default=1)
+ replicas: Optional[int] = Field(
+ default=1,
+ deprecated="Use pipeline_resources.ingestor..replicas instead",
+ )
@field_validator("replicas")
@classmethod
From c1f6918b7889765f15e1931c183665f8907c8caa Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Tue, 3 Mar 2026 12:01:03 +0100
Subject: [PATCH 09/14] support update resources
---
src/glassflow/etl/errors.py | 7 ++
src/glassflow/etl/models/pipeline.py | 7 ++
src/glassflow/etl/models/resources.py | 33 ++++---
tests/test_models/test_resources.py | 122 ++++++++++++++++++++++++++
4 files changed, 157 insertions(+), 12 deletions(-)
create mode 100644 tests/test_models/test_resources.py
diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py
index 05102f1..b3651f7 100644
--- a/src/glassflow/etl/errors.py
+++ b/src/glassflow/etl/errors.py
@@ -54,6 +54,13 @@ class PipelineInvalidConfigurationError(APIError):
"""Raised when a pipeline configuration is invalid."""
+class ImmutableResourceError(GlassFlowError):
+ """
+ Raised when attempting to update a pipeline resource field that is immutable
+ (frozen) and cannot be changed after pipeline creation.
+ """
+
+
class InvalidDataTypeMappingError(GlassFlowError):
"""Exception raised when a data type mapping is invalid."""
diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py
index 521f709..e2b7af3 100644
--- a/src/glassflow/etl/models/pipeline.py
+++ b/src/glassflow/etl/models/pipeline.py
@@ -167,6 +167,12 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
or StatelessTransformationConfig()
).update(config_patch.stateless_transformation)
+ # Update pipeline resources if provided
+ if config_patch.pipeline_resources is not None:
+ updated_config.pipeline_resources = (
+ updated_config.pipeline_resources or PipelineResourcesConfig()
+ ).update(config_patch.pipeline_resources)
+
return updated_config
@@ -181,4 +187,5 @@ class PipelineConfigPatch(BaseModel):
stateless_transformation: Optional[StatelessTransformationConfigPatch] = Field(
default=None
)
+ pipeline_resources: Optional[PipelineResourcesConfig] = Field(default=None)
version: Optional[str] = Field(default=None)
diff --git a/src/glassflow/etl/models/resources.py b/src/glassflow/etl/models/resources.py
index 8c2889c..739f975 100644
--- a/src/glassflow/etl/models/resources.py
+++ b/src/glassflow/etl/models/resources.py
@@ -2,6 +2,8 @@
from pydantic import BaseModel, Field
+from glassflow.etl.errors import ImmutableResourceError
+
class JetStreamResources(BaseModel):
max_age: Optional[str] = Field(default=None, frozen=True)
@@ -9,12 +11,13 @@ class JetStreamResources(BaseModel):
def update(self, patch: "JetStreamResources") -> "JetStreamResources":
"""Apply a patch to this jetstream resources config."""
- updated_config = self.model_copy(deep=True)
- if patch.max_age is not None:
- updated_config.max_age = patch.max_age
- if patch.max_bytes is not None:
- updated_config.max_bytes = patch.max_bytes
- return updated_config
+ if patch.max_age is not None or patch.max_bytes is not None:
+ raise ImmutableResourceError(
+ "Cannot update pipeline resources: 'max_age' and 'max_bytes' in "
+ "nats.stream are immutable and cannot be changed after pipeline "
+ "creation."
+ )
+ return self.model_copy(deep=True)
class NATSResources(BaseModel):
@@ -47,15 +50,17 @@ class StorageResources(BaseModel):
def update(self, patch: "StorageResources") -> "StorageResources":
"""Apply a patch to this storage resources config."""
- updated_config = self.model_copy(deep=True)
if patch.size is not None:
- updated_config.size = patch.size
- return updated_config
+ raise ImmutableResourceError(
+ "Cannot update pipeline resources: 'size' in transform.storage is "
+ "immutable and cannot be changed after pipeline creation."
+ )
+ return self.model_copy(deep=True)
class TransformResources(BaseModel):
storage: Optional[StorageResources] = Field(default=None)
- replicas: Optional[int] = Field(default=None, frozen=True)
+ replicas: Optional[int] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)
@@ -97,13 +102,17 @@ class JoinResources(BaseModel):
def update(self, patch: "JoinResources") -> "JoinResources":
"""Apply a patch to this join resources config."""
+ if patch.replicas is not None:
+ raise ImmutableResourceError(
+ "Cannot update pipeline resources: 'replicas' in join is immutable "
+ "and cannot be changed after pipeline creation."
+ )
updated_config = self.model_copy(deep=True)
if patch.limits is not None:
updated_config.limits = updated_config.limits.update(patch.limits)
if patch.requests is not None:
updated_config.requests = updated_config.requests.update(patch.requests)
- if patch.replicas is not None:
- updated_config.replicas = patch.replicas
+ return updated_config
class IngestorPodResources(BaseModel):
diff --git a/tests/test_models/test_resources.py b/tests/test_models/test_resources.py
new file mode 100644
index 0000000..e437e9a
--- /dev/null
+++ b/tests/test_models/test_resources.py
@@ -0,0 +1,122 @@
+import pytest
+
+from glassflow.etl.errors import ImmutableResourceError
+from glassflow.etl.models.resources import (
+ JetStreamResources,
+ JoinResources,
+ NATSResources,
+ PipelineResourcesConfig,
+ Resources,
+ StorageResources,
+ TransformResources,
+)
+
+
+class TestImmutableResourceErrors:
+ """
+ Tests for ImmutableResourceError when updating frozen pipeline resource fields.
+ """
+
+ def test_jetstream_resources_update_immutable_max_age_raises(self):
+ """
+ Updating nats.stream max_age raises ImmutableResourceError with clear message.
+ """
+ current = JetStreamResources(max_age="24h", max_bytes="512Mi")
+ patch = JetStreamResources(max_age="72h", max_bytes=None)
+ with pytest.raises(ImmutableResourceError) as exc_info:
+ current.update(patch)
+ assert "max_age" in str(exc_info.value)
+ assert "max_bytes" in str(exc_info.value)
+ assert "nats.stream" in str(exc_info.value)
+ assert "immutable" in str(exc_info.value).lower()
+
+ def test_jetstream_resources_update_immutable_max_bytes_raises(self):
+ """Updating nats.stream max_bytes raises ImmutableResourceError."""
+ current = JetStreamResources(max_age="24h", max_bytes="512Mi")
+ patch = JetStreamResources(max_age=None, max_bytes="1GB")
+ with pytest.raises(ImmutableResourceError) as exc_info:
+ current.update(patch)
+ assert "immutable" in str(exc_info.value).lower()
+
+ def test_jetstream_resources_update_empty_patch_succeeds(self):
+ """Updating with no frozen fields changed does not raise."""
+ current = JetStreamResources(max_age="24h", max_bytes="512Mi")
+ patch = JetStreamResources()
+ result = current.update(patch)
+ assert result.max_age == "24h"
+ assert result.max_bytes == "512Mi"
+
+ def test_storage_resources_update_immutable_size_raises(self):
+ """
+ Updating transform.storage size raises ImmutableResourceError.
+ """
+ current = StorageResources(size="5Gi")
+ patch = StorageResources(size="10Gi")
+ with pytest.raises(ImmutableResourceError) as exc_info:
+ current.update(patch)
+ assert "size" in str(exc_info.value)
+ assert "transform.storage" in str(exc_info.value)
+ assert "immutable" in str(exc_info.value).lower()
+
+ def test_storage_resources_update_empty_patch_succeeds(self):
+ """Updating storage with no size change does not raise."""
+ current = StorageResources(size="5Gi")
+ patch = StorageResources()
+ result = current.update(patch)
+ assert result.size == "5Gi"
+
+ def test_join_resources_update_immutable_replicas_raises(self):
+ """Updating join replicas raises ImmutableResourceError with clear message."""
+ current = JoinResources(replicas=1, limits=Resources(memory="64Mi"))
+ patch = JoinResources(replicas=2, limits=None, requests=None)
+ with pytest.raises(ImmutableResourceError) as exc_info:
+ current.update(patch)
+ assert "replicas" in str(exc_info.value)
+ assert "join" in str(exc_info.value)
+ assert "immutable" in str(exc_info.value).lower()
+
+ def test_join_resources_update_limits_and_requests_succeeds(self):
+ """Updating join limits/requests (mutable) does not raise."""
+ current = JoinResources(
+ replicas=1,
+ limits=Resources(memory="64Mi"),
+ requests=Resources(memory="32Mi"),
+ )
+ patch = JoinResources(
+ replicas=None,
+ limits=Resources(memory="128Mi", cpu="100m"),
+ requests=Resources(cpu="50m"),
+ )
+ result = current.update(patch)
+ assert result.replicas == 1
+ assert result.limits is not None
+ assert result.limits.memory == "128Mi"
+ assert result.limits.cpu == "100m"
+ assert result.requests is not None
+ assert result.requests.cpu == "50m"
+
+ def test_pipeline_resources_config_update_nats_immutable_raises(self):
+ """Updating pipeline_resources with nats.stream immutable fields raises."""
+ current = PipelineResourcesConfig(
+ nats=NATSResources(
+ stream=JetStreamResources(max_age="24h", max_bytes="512Mi")
+ )
+ )
+ patch = PipelineResourcesConfig(
+ nats=NATSResources(stream=JetStreamResources(max_age="72h", max_bytes=None))
+ )
+ with pytest.raises(ImmutableResourceError) as exc_info:
+ current.update(patch)
+ assert "nats.stream" in str(exc_info.value)
+
+ def test_pipeline_resources_config_update_transform_storage_immutable_raises(self):
+ """Updating pipeline_resources with transform.storage size raises."""
+ current = PipelineResourcesConfig(
+ transform=TransformResources(storage=StorageResources(size="5Gi"))
+ )
+ patch = PipelineResourcesConfig(
+ transform=TransformResources(storage=StorageResources(size="10Gi"))
+ )
+ with pytest.raises(ImmutableResourceError) as exc_info:
+ current.update(patch)
+ assert "transform.storage" in str(exc_info.value)
From 931fe07c92a9636db66d56857465f430fddb05fc Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
Date: Tue, 3 Mar 2026 11:01:59 +0000
Subject: [PATCH 10/14] docs: update coverage badge
---
README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/README.md b/README.md
index cb8988a..e3d8a17 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@
-
+
From 0b834f46414b08e7f83a9b5013ec6fd278e4475d Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Fri, 6 Mar 2026 16:22:30 +0100
Subject: [PATCH 11/14] unify update logic
---
src/glassflow/etl/models/filter.py | 12 +++----
src/glassflow/etl/models/join.py | 14 ++++----
src/glassflow/etl/models/sink.py | 32 +++++++++----------
src/glassflow/etl/models/source.py | 14 ++++----
.../etl/models/stateless_transformation.py | 17 ++++------
5 files changed, 39 insertions(+), 50 deletions(-)
diff --git a/src/glassflow/etl/models/filter.py b/src/glassflow/etl/models/filter.py
index 70fa603..7d63dca 100644
--- a/src/glassflow/etl/models/filter.py
+++ b/src/glassflow/etl/models/filter.py
@@ -1,4 +1,4 @@
-from typing import Any, Optional
+from typing import Optional
from pydantic import BaseModel, Field
@@ -9,17 +9,15 @@ class FilterConfig(BaseModel):
def update(self, patch: "FilterConfigPatch") -> "FilterConfig":
"""Apply a patch to this filter config."""
- update_dict: dict[str, Any] = {}
+ update_dict = self.model_copy(deep=True)
# Check each field explicitly to handle model instances properly
if patch.enabled is not None:
- update_dict["enabled"] = patch.enabled
+ update_dict.enabled = patch.enabled
if patch.expression is not None:
- update_dict["expression"] = patch.expression
+ update_dict.expression = patch.expression
- if update_dict:
- return self.model_copy(update=update_dict)
- return self
+ return update_dict
class FilterConfigPatch(BaseModel):
diff --git a/src/glassflow/etl/models/join.py b/src/glassflow/etl/models/join.py
index 90cd71f..e0e697f 100644
--- a/src/glassflow/etl/models/join.py
+++ b/src/glassflow/etl/models/join.py
@@ -1,4 +1,4 @@
-from typing import Any, List, Optional
+from typing import List, Optional
from pydantic import BaseModel, Field, ValidationInfo, field_validator
@@ -66,18 +66,16 @@ def validate_type(
def update(self, patch: "JoinConfigPatch") -> "JoinConfig":
"""Apply a patch to this join config."""
- update_dict: dict[str, Any] = {}
+ update_dict = self.model_copy(deep=True)
if patch.enabled is not None:
- update_dict["enabled"] = patch.enabled
+ update_dict.enabled = patch.enabled
if patch.type is not None:
- update_dict["type"] = patch.type
+ update_dict.type = patch.type
if patch.sources is not None:
- update_dict["sources"] = patch.sources
+ update_dict.sources = patch.sources
- if update_dict:
- return self.model_copy(update=update_dict)
- return self
+ return update_dict
class JoinConfigPatch(BaseModel):
diff --git a/src/glassflow/etl/models/sink.py b/src/glassflow/etl/models/sink.py
index 36a3be2..29ec290 100644
--- a/src/glassflow/etl/models/sink.py
+++ b/src/glassflow/etl/models/sink.py
@@ -1,4 +1,4 @@
-from typing import Any, Optional
+from typing import Optional
from pydantic import BaseModel, Field
@@ -26,39 +26,37 @@ class SinkConfig(BaseModel):
def update(self, patch: "SinkConfigPatch") -> "SinkConfig":
"""Apply a patch to this sink config."""
- update_dict: dict[str, Any] = {}
+ update_dict = self.model_copy(deep=True)
# Check each field explicitly to handle model instances properly
if patch.provider is not None:
- update_dict["provider"] = patch.provider
+ update_dict.provider = patch.provider
if patch.host is not None:
- update_dict["host"] = patch.host
+ update_dict.host = patch.host
if patch.port is not None:
- update_dict["port"] = patch.port
+ update_dict.port = patch.port
if patch.http_port is not None:
- update_dict["http_port"] = patch.http_port
+ update_dict.http_port = patch.http_port
if patch.database is not None:
- update_dict["database"] = patch.database
+ update_dict.database = patch.database
if patch.username is not None:
- update_dict["username"] = patch.username
+ update_dict.username = patch.username
if patch.password is not None:
- update_dict["password"] = patch.password
+ update_dict.password = patch.password
if patch.secure is not None:
- update_dict["secure"] = patch.secure
+ update_dict.secure = patch.secure
if patch.skip_certificate_verification is not None:
- update_dict["skip_certificate_verification"] = (
+ update_dict.skip_certificate_verification = (
patch.skip_certificate_verification
)
if patch.max_batch_size is not None:
- update_dict["max_batch_size"] = patch.max_batch_size
+ update_dict.max_batch_size = patch.max_batch_size
if patch.max_delay_time is not None:
- update_dict["max_delay_time"] = patch.max_delay_time
+ update_dict.max_delay_time = patch.max_delay_time
if patch.table is not None:
- update_dict["table"] = patch.table
+ update_dict.table = patch.table
- if update_dict:
- return self.model_copy(update=update_dict)
- return self
+ return update_dict
class SinkConfigPatch(BaseModel):
diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py
index c1b1500..c642a65 100644
--- a/src/glassflow/etl/models/source.py
+++ b/src/glassflow/etl/models/source.py
@@ -147,26 +147,24 @@ class SourceConfig(BaseModel):
def update(self, patch: "SourceConfigPatch") -> "SourceConfig":
"""Apply a patch to this source config."""
- update_dict: dict[str, Any] = {}
+ update_dict = self.model_copy(deep=True)
if patch.type is not None:
- update_dict["type"] = patch.type
+ update_dict.type = patch.type
if patch.provider is not None:
- update_dict["provider"] = patch.provider
+ update_dict.provider = patch.provider
# Handle connection_params patch
if patch.connection_params is not None:
- update_dict["connection_params"] = self.connection_params.update(
+ update_dict.connection_params = self.connection_params.update(
patch.connection_params
)
# Handle topics patch - full replacement only if provided
if patch.topics is not None:
- update_dict["topics"] = patch.topics
+ update_dict.topics = patch.topics
- if update_dict:
- return self.model_copy(update=update_dict)
- return self
+ return update_dict
class DeduplicationConfigPatch(BaseModel):
diff --git a/src/glassflow/etl/models/stateless_transformation.py b/src/glassflow/etl/models/stateless_transformation.py
index f41ff84..5568488 100644
--- a/src/glassflow/etl/models/stateless_transformation.py
+++ b/src/glassflow/etl/models/stateless_transformation.py
@@ -1,4 +1,4 @@
-from typing import Any, List, Optional
+from typing import List, Optional
from pydantic import BaseModel, Field, model_validator
@@ -63,21 +63,18 @@ def update(
self, patch: "StatelessTransformationConfigPatch"
) -> "StatelessTransformationConfig":
"""Apply a patch to this stateless transformation config."""
- update_dict: dict[str, Any] = {}
+ update_dict = self.model_copy(deep=True)
if patch.enabled is not None:
- update_dict["enabled"] = patch.enabled
+ update_dict.enabled = patch.enabled
if patch.id is not None:
- update_dict["id"] = patch.id
+ update_dict.id = patch.id
if patch.type is not None:
- update_dict["type"] = patch.type
+ update_dict.type = patch.type
if patch.config is not None:
- update_dict["config"] = patch.config
+ update_dict.config = patch.config
- if update_dict:
- return self.model_copy(update=update_dict)
-
- return self
+ return update_dict
class StatelessTransformationConfigPatch(BaseModel):
From d2f39c995d75b65fb6de938cd1a621c8f3c80e61 Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Fri, 6 Mar 2026 16:41:26 +0100
Subject: [PATCH 12/14] not allow update resources of transform component if
deduplication is enabled
---
src/glassflow/etl/models/__init__.py | 18 ++++++++-
src/glassflow/etl/models/pipeline.py | 46 ++++++++++++++++++++---
tests/test_models/test_pipeline_config.py | 14 +++++++
3 files changed, 71 insertions(+), 7 deletions(-)
diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py
index 66ac1d1..4a65b18 100644
--- a/src/glassflow/etl/models/__init__.py
+++ b/src/glassflow/etl/models/__init__.py
@@ -10,7 +10,16 @@
)
from .metadata import MetadataConfig
from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus
-from .resources import PipelineResourcesConfig
+from .resources import (
+ IngestorResources,
+ JoinResources,
+ NATSResources,
+ PipelineResourcesConfig,
+ Resources,
+ SinkResources,
+ StorageResources,
+ TransformResources,
+)
from .schema import Schema, SchemaField
from .sink import SinkConfig, SinkConfigPatch, SinkType
from .source import (
@@ -71,4 +80,11 @@
"ExpressionConfig",
"Transformation",
"PipelineResourcesConfig",
+ "TransformResources",
+ "JoinResources",
+ "NATSResources",
+ "SinkResources",
+ "IngestorResources",
+ "StorageResources",
+ "Resources",
]
diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py
index e2b7af3..a9138f2 100644
--- a/src/glassflow/etl/models/pipeline.py
+++ b/src/glassflow/etl/models/pipeline.py
@@ -3,6 +3,7 @@
from pydantic import BaseModel, Field, field_validator, model_validator
+from ..errors import ImmutableResourceError
from .base import CaseInsensitiveStrEnum
from .filter import FilterConfig, FilterConfigPatch
from .join import JoinConfig, JoinConfigPatch
@@ -116,6 +117,16 @@ def validate_config(self) -> "PipelineConfig":
return self
+ def _has_deduplication_enabled(self) -> bool:
+ """
+ Check if the pipeline has deduplication enabled.
+ """
+ return any(
+ topic.deduplication and topic.deduplication.enabled
+ for topic in self.source.topics
+ if topic.deduplication is not None
+ )
+
def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
"""
Apply a patch configuration to this pipeline configuration.
@@ -133,6 +144,23 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
if config_patch.name is not None:
updated_config.name = config_patch.name
+ # Update pipeline resources if provided
+ if config_patch.pipeline_resources is not None:
+ if (
+ config_patch.pipeline_resources.transform is not None
+ and config_patch.pipeline_resources.transform.replicas is not None
+ and self._has_deduplication_enabled()
+ and not config_patch._has_deduplication_disabled()
+ ):
+ raise ImmutableResourceError(
+ "Cannot update pipeline resources of a transform component if the "
+ "pipeline has deduplication enabled"
+ )
+
+ updated_config.pipeline_resources = (
+ updated_config.pipeline_resources or PipelineResourcesConfig()
+ ).update(config_patch.pipeline_resources)
+
# Update source if provided
if config_patch.source is not None:
updated_config.source = updated_config.source.update(config_patch.source)
@@ -167,12 +195,6 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
or StatelessTransformationConfig()
).update(config_patch.stateless_transformation)
- # Update pipeline resources if provided
- if config_patch.pipeline_resources is not None:
- updated_config.pipeline_resources = (
- updated_config.pipeline_resources or PipelineResourcesConfig()
- ).update(config_patch.pipeline_resources)
-
return updated_config
@@ -189,3 +211,15 @@ class PipelineConfigPatch(BaseModel):
)
pipeline_resources: Optional[PipelineResourcesConfig] = Field(default=None)
version: Optional[str] = Field(default=None)
+
+ def _has_deduplication_disabled(self) -> bool:
+ """
+ Check if the pipeline has deduplication disabled.
+ """
+ disabled = False
+ if self.source is not None and self.source.topics is not None:
+ for topic in self.source.topics:
+ if topic.deduplication and not topic.deduplication.enabled:
+ disabled = True
+ break
+ return disabled
diff --git a/tests/test_models/test_pipeline_config.py b/tests/test_models/test_pipeline_config.py
index dd744cc..d4875c1 100644
--- a/tests/test_models/test_pipeline_config.py
+++ b/tests/test_models/test_pipeline_config.py
@@ -1,6 +1,7 @@
import pytest
from glassflow.etl import models
+from glassflow.etl.errors import ImmutableResourceError
class TestPipelineConfig:
@@ -176,3 +177,16 @@ def test_pipeline_config_pipeline_resources_partial(self, valid_config):
assert config.pipeline_resources.sink.replicas == 3
assert config.pipeline_resources.nats is None
assert config.pipeline_resources.transform is None
+
+ def test_pipeline_config_resources_update_transform_replicas_immutable_raises(
+ self, valid_config
+ ):
+ """Updating pipeline_resources with transform.replicas raises."""
+ config = models.PipelineConfig(**valid_config)
+ patch = models.PipelineConfigPatch(
+ pipeline_resources=models.PipelineResourcesConfig(
+ transform=models.TransformResources(replicas=2)
+ )
+ )
+ with pytest.raises(ImmutableResourceError) as _:
+ config.update(patch)
From 93a54eb48192681f6fb7fa8fb412520f052f4b35 Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
Date: Fri, 6 Mar 2026 15:42:38 +0000
Subject: [PATCH 13/14] docs: update coverage badge
---
README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/README.md b/README.md
index e3d8a17..a4a5790 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@
-
+
From c6c35875678dc0cffb38e6ae7d8c8f5d8f45748b Mon Sep 17 00:00:00 2001
From: Pablo Pardo Garcia
Date: Tue, 10 Mar 2026 14:10:51 +0100
Subject: [PATCH 14/14] fix maxAge and maxBytes name
---
src/glassflow/etl/models/resources.py | 6 +++---
tests/data/pipeline_configs.py | 4 ++--
tests/test_models/test_pipeline_config.py | 2 +-
tests/test_models/test_resources.py | 18 +++++++++---------
4 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/src/glassflow/etl/models/resources.py b/src/glassflow/etl/models/resources.py
index 739f975..254b881 100644
--- a/src/glassflow/etl/models/resources.py
+++ b/src/glassflow/etl/models/resources.py
@@ -6,14 +6,14 @@
class JetStreamResources(BaseModel):
- max_age: Optional[str] = Field(default=None, frozen=True)
- max_bytes: Optional[str] = Field(default=None, frozen=True)
+ max_age: Optional[str] = Field(default=None, frozen=True, alias="maxAge")
+ max_bytes: Optional[str] = Field(default=None, frozen=True, alias="maxBytes")
def update(self, patch: "JetStreamResources") -> "JetStreamResources":
"""Apply a patch to this jetstream resources config."""
if patch.max_age is not None or patch.max_bytes is not None:
raise ImmutableResourceError(
- "Cannot update pipeline resources: 'max_age' and 'max_bytes' in "
+ "Cannot update pipeline resources: 'maxAge' and 'maxBytes' in "
"nats.stream are immutable and cannot be changed after pipeline "
"creation."
)
diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py
index 36b1863..e1f4e82 100644
--- a/tests/data/pipeline_configs.py
+++ b/tests/data/pipeline_configs.py
@@ -228,8 +228,8 @@ def get_valid_config_with_pipeline_resources() -> dict:
config["pipeline_resources"] = {
"nats": {
"stream": {
- "max_age": "72h",
- "max_bytes": "1GB",
+ "maxAge": "72h",
+ "maxBytes": "1Gi",
},
},
"sink": {
diff --git a/tests/test_models/test_pipeline_config.py b/tests/test_models/test_pipeline_config.py
index d4875c1..f9659f1 100644
--- a/tests/test_models/test_pipeline_config.py
+++ b/tests/test_models/test_pipeline_config.py
@@ -130,7 +130,7 @@ def test_pipeline_config_creation_with_pipeline_resources(
assert resources.nats is not None
assert resources.nats.stream is not None
assert resources.nats.stream.max_age == "72h"
- assert resources.nats.stream.max_bytes == "1GB"
+ assert resources.nats.stream.max_bytes == "1Gi"
# Sink
assert resources.sink is not None
diff --git a/tests/test_models/test_resources.py b/tests/test_models/test_resources.py
index e437e9a..6822da1 100644
--- a/tests/test_models/test_resources.py
+++ b/tests/test_models/test_resources.py
@@ -21,26 +21,26 @@ def test_jetstream_resources_update_immutable_max_age_raises(self):
"""
Updating nats.stream max_age raises ImmutableResourceError with clear message.
"""
- current = JetStreamResources(max_age="24h", max_bytes="512Mi")
- patch = JetStreamResources(max_age="72h", max_bytes=None)
+ current = JetStreamResources(maxAge="24h", maxBytes="512Mi")
+ patch = JetStreamResources(maxAge="72h", maxBytes=None)
with pytest.raises(ImmutableResourceError) as exc_info:
current.update(patch)
- assert "max_age" in str(exc_info.value)
- assert "max_bytes" in str(exc_info.value)
+ assert "maxAge" in str(exc_info.value)
+ assert "maxBytes" in str(exc_info.value)
assert "nats.stream" in str(exc_info.value)
assert "immutable" in str(exc_info.value).lower()
def test_jetstream_resources_update_immutable_max_bytes_raises(self):
"""Updating nats.stream max_bytes raises ImmutableResourceError."""
- current = JetStreamResources(max_age="24h", max_bytes="512Mi")
- patch = JetStreamResources(max_age=None, max_bytes="1GB")
+ current = JetStreamResources(maxAge="24h", maxBytes="512Mi")
+ patch = JetStreamResources(maxAge=None, maxBytes="1GB")
with pytest.raises(ImmutableResourceError) as exc_info:
current.update(patch)
assert "immutable" in str(exc_info.value).lower()
def test_jetstream_resources_update_empty_patch_succeeds(self):
"""Updating with no frozen fields changed does not raise."""
- current = JetStreamResources(max_age="24h", max_bytes="512Mi")
+ current = JetStreamResources(maxAge="24h", maxBytes="512Mi")
patch = JetStreamResources()
result = current.update(patch)
assert result.max_age == "24h"
@@ -99,11 +99,11 @@ def test_pipeline_resources_config_update_nats_immutable_raises(self):
"""Updating pipeline_resources with nats.stream immutable fields raises."""
current = PipelineResourcesConfig(
nats=NATSResources(
- stream=JetStreamResources(max_age="24h", max_bytes="512Mi")
+ stream=JetStreamResources(maxAge="24h", maxBytes="512Mi")
)
)
patch = PipelineResourcesConfig(
- nats=NATSResources(stream=JetStreamResources(max_age="72h", max_bytes=None))
+ nats=NATSResources(stream=JetStreamResources(maxAge="72h", maxBytes=None))
)
with pytest.raises(ImmutableResourceError) as exc_info:
current.update(patch)