diff --git a/README.md b/README.md index 70186d2..a4a5790 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ - +

diff --git a/VERSION b/VERSION index c1e43e6..1981190 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.7.3 +3.8.0 diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py index 05102f1..b3651f7 100644 --- a/src/glassflow/etl/errors.py +++ b/src/glassflow/etl/errors.py @@ -54,6 +54,13 @@ class PipelineInvalidConfigurationError(APIError): """Raised when a pipeline configuration is invalid.""" +class ImmutableResourceError(GlassFlowError): + """ + Raised when attempting to update a pipeline resource field that is immutable + (frozen) and cannot be changed after pipeline creation. + """ + + class InvalidDataTypeMappingError(GlassFlowError): """Exception raised when a data type mapping is invalid.""" diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py index 683f900..4a65b18 100644 --- a/src/glassflow/etl/models/__init__.py +++ b/src/glassflow/etl/models/__init__.py @@ -10,6 +10,16 @@ ) from .metadata import MetadataConfig from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus +from .resources import ( + IngestorResources, + JoinResources, + NATSResources, + PipelineResourcesConfig, + Resources, + SinkResources, + StorageResources, + TransformResources, +) from .schema import Schema, SchemaField from .sink import SinkConfig, SinkConfigPatch, SinkType from .source import ( @@ -69,4 +79,12 @@ "StatelessTransformationType", "ExpressionConfig", "Transformation", + "PipelineResourcesConfig", + "TransformResources", + "JoinResources", + "NATSResources", + "SinkResources", + "IngestorResources", + "StorageResources", + "Resources", ] diff --git a/src/glassflow/etl/models/filter.py b/src/glassflow/etl/models/filter.py index 70fa603..7d63dca 100644 --- a/src/glassflow/etl/models/filter.py +++ b/src/glassflow/etl/models/filter.py @@ -1,4 +1,4 @@ -from typing import Any, Optional +from typing import Optional from pydantic import BaseModel, Field @@ -9,17 +9,15 @@ class FilterConfig(BaseModel): def update(self, patch: "FilterConfigPatch") -> "FilterConfig": """Apply a patch to this filter config.""" - update_dict: dict[str, Any] = {} + update_dict = self.model_copy(deep=True) # Check each field explicitly to handle model instances properly if patch.enabled is not None: - update_dict["enabled"] = patch.enabled + update_dict.enabled = patch.enabled if patch.expression is not None: - update_dict["expression"] = patch.expression + update_dict.expression = patch.expression - if update_dict: - return self.model_copy(update=update_dict) - return self + return update_dict class FilterConfigPatch(BaseModel): diff --git a/src/glassflow/etl/models/join.py b/src/glassflow/etl/models/join.py index 90cd71f..e0e697f 100644 --- a/src/glassflow/etl/models/join.py +++ b/src/glassflow/etl/models/join.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import List, Optional from pydantic import BaseModel, Field, ValidationInfo, field_validator @@ -66,18 +66,16 @@ def validate_type( def update(self, patch: "JoinConfigPatch") -> "JoinConfig": """Apply a patch to this join config.""" - update_dict: dict[str, Any] = {} + update_dict = self.model_copy(deep=True) if patch.enabled is not None: - update_dict["enabled"] = patch.enabled + update_dict.enabled = patch.enabled if patch.type is not None: - update_dict["type"] = patch.type + update_dict.type = patch.type if patch.sources is not None: - update_dict["sources"] = patch.sources + update_dict.sources = patch.sources - if update_dict: - return self.model_copy(update=update_dict) - return self + return update_dict class JoinConfigPatch(BaseModel): diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py index 61404e6..a9138f2 100644 --- a/src/glassflow/etl/models/pipeline.py +++ b/src/glassflow/etl/models/pipeline.py @@ -3,10 +3,12 @@ from pydantic import BaseModel, Field, field_validator, model_validator +from ..errors import ImmutableResourceError from .base import CaseInsensitiveStrEnum from .filter import FilterConfig, FilterConfigPatch from .join import JoinConfig, JoinConfigPatch from .metadata import MetadataConfig +from .resources import PipelineResourcesConfig from .schema import Schema from .sink import SinkConfig, SinkConfigPatch from .source import SourceConfig, SourceConfigPatch @@ -41,6 +43,7 @@ class PipelineConfig(BaseModel): stateless_transformation: Optional[StatelessTransformationConfig] = Field( default=StatelessTransformationConfig() ) + pipeline_resources: Optional[PipelineResourcesConfig] = Field(default=None) @field_validator("pipeline_id") @classmethod @@ -114,6 +117,16 @@ def validate_config(self) -> "PipelineConfig": return self + def _has_deduplication_enabled(self) -> bool: + """ + Check if the pipeline has deduplication enabled. + """ + return any( + topic.deduplication and topic.deduplication.enabled + for topic in self.source.topics + if topic.deduplication is not None + ) + def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig": """ Apply a patch configuration to this pipeline configuration. @@ -131,6 +144,23 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig": if config_patch.name is not None: updated_config.name = config_patch.name + # Update pipeline resources if provided + if config_patch.pipeline_resources is not None: + if ( + config_patch.pipeline_resources.transform is not None + and config_patch.pipeline_resources.transform.replicas is not None + and self._has_deduplication_enabled() + and not config_patch._has_deduplication_disabled() + ): + raise ImmutableResourceError( + "Cannot update pipeline resources of a transform component if the " + "pipeline has deduplication enabled" + ) + + updated_config.pipeline_resources = ( + updated_config.pipeline_resources or PipelineResourcesConfig() + ).update(config_patch.pipeline_resources) + # Update source if provided if config_patch.source is not None: updated_config.source = updated_config.source.update(config_patch.source) @@ -179,4 +209,17 @@ class PipelineConfigPatch(BaseModel): stateless_transformation: Optional[StatelessTransformationConfigPatch] = Field( default=None ) + pipeline_resources: Optional[PipelineResourcesConfig] = Field(default=None) version: Optional[str] = Field(default=None) + + def _has_deduplication_disabled(self) -> bool: + """ + Check if the pipeline has deduplication disabled. + """ + disabled = False + if self.source is not None and self.source.topics is not None: + for topic in self.source.topics: + if topic.deduplication and not topic.deduplication.enabled: + disabled = True + break + return disabled diff --git a/src/glassflow/etl/models/resources.py b/src/glassflow/etl/models/resources.py new file mode 100644 index 0000000..254b881 --- /dev/null +++ b/src/glassflow/etl/models/resources.py @@ -0,0 +1,174 @@ +from typing import Optional + +from pydantic import BaseModel, Field + +from glassflow.etl.errors import ImmutableResourceError + + +class JetStreamResources(BaseModel): + max_age: Optional[str] = Field(default=None, frozen=True, alias="maxAge") + max_bytes: Optional[str] = Field(default=None, frozen=True, alias="maxBytes") + + def update(self, patch: "JetStreamResources") -> "JetStreamResources": + """Apply a patch to this jetstream resources config.""" + if patch.max_age is not None or patch.max_bytes is not None: + raise ImmutableResourceError( + "Cannot update pipeline resources: 'maxAge' and 'maxBytes' in " + "nats.stream are immutable and cannot be changed after pipeline " + "creation." + ) + return self.model_copy(deep=True) + + +class NATSResources(BaseModel): + stream: Optional[JetStreamResources] = Field(default=None) + + def update(self, patch: "NATSResources") -> "NATSResources": + """Apply a patch to this jetstream resources config.""" + updated_config = self.model_copy(deep=True) + if patch.stream is not None: + updated_config.stream = updated_config.stream.update(patch.stream) + return updated_config + + +class Resources(BaseModel): + memory: Optional[str] = Field(default=None) + cpu: Optional[str] = Field(default=None) + + def update(self, patch: "Resources") -> "Resources": + """Apply a patch to this resources config.""" + updated_config = self.model_copy(deep=True) + if patch.memory is not None: + updated_config.memory = patch.memory + if patch.cpu is not None: + updated_config.cpu = patch.cpu + return updated_config + + +class StorageResources(BaseModel): + size: Optional[str] = Field(default=None, frozen=True) + + def update(self, patch: "StorageResources") -> "StorageResources": + """Apply a patch to this storage resources config.""" + if patch.size is not None: + raise ImmutableResourceError( + "Cannot update pipeline resources: 'size' in transform.storage is " + "immutable and cannot be changed after pipeline creation." + ) + return self.model_copy(deep=True) + + +class TransformResources(BaseModel): + storage: Optional[StorageResources] = Field(default=None) + replicas: Optional[int] = Field(default=None) + requests: Optional[Resources] = Field(default=None) + limits: Optional[Resources] = Field(default=None) + + def update(self, patch: "TransformResources") -> "TransformResources": + """Apply a patch to this transform resources config.""" + updated_config = self.model_copy(deep=True) + if patch.storage is not None: + updated_config.storage = updated_config.storage.update(patch.storage) + if patch.replicas is not None: + updated_config.replicas = patch.replicas + if patch.requests is not None: + updated_config.requests = updated_config.requests.update(patch.requests) + if patch.limits is not None: + updated_config.limits = updated_config.limits.update(patch.limits) + return updated_config + + +class SinkResources(BaseModel): + replicas: Optional[int] = Field(default=None) + requests: Optional[Resources] = Field(default=None) + limits: Optional[Resources] = Field(default=None) + + def update(self, patch: "SinkResources") -> "SinkResources": + """Apply a patch to this sink resources config.""" + updated_config = self.model_copy(deep=True) + if patch.replicas is not None: + updated_config.replicas = patch.replicas + if patch.requests is not None: + updated_config.requests = updated_config.requests.update(patch.requests) + if patch.limits is not None: + updated_config.limits = updated_config.limits.update(patch.limits) + return updated_config + + +class JoinResources(BaseModel): + limits: Optional[Resources] = Field(default=None) + requests: Optional[Resources] = Field(default=None) + replicas: Optional[int] = Field(default=None, frozen=True) + + def update(self, patch: "JoinResources") -> "JoinResources": + """Apply a patch to this join resources config.""" + if patch.replicas is not None: + raise ImmutableResourceError( + "Cannot update pipeline resources: 'replicas' in join is immutable " + "and cannot be changed after pipeline creation." + ) + updated_config = self.model_copy(deep=True) + if patch.limits is not None: + updated_config.limits = updated_config.limits.update(patch.limits) + if patch.requests is not None: + updated_config.requests = updated_config.requests.update(patch.requests) + return updated_config + + +class IngestorPodResources(BaseModel): + replicas: Optional[int] = Field(default=None) + requests: Optional[Resources] = Field(default=None) + limits: Optional[Resources] = Field(default=None) + + def update(self, patch: "IngestorPodResources") -> "IngestorPodResources": + """Apply a patch to this ingestor pod resources config.""" + updated_config = self.model_copy(deep=True) + if patch.replicas is not None: + updated_config.replicas = patch.replicas + if patch.requests is not None: + updated_config.requests = updated_config.requests.update(patch.requests) + if patch.limits is not None: + updated_config.limits = updated_config.limits.update(patch.limits) + return updated_config + + +class IngestorResources(BaseModel): + base: Optional[IngestorPodResources] = Field(default=None) + left: Optional[IngestorPodResources] = Field(default=None) + right: Optional[IngestorPodResources] = Field(default=None) + + def update(self, patch: "IngestorResources") -> "IngestorResources": + """Apply a patch to this ingestor resources config.""" + updated_config = self.model_copy(deep=True) + + if patch.base is not None: + updated_config.base = updated_config.base.update(patch.base) + if patch.left is not None: + updated_config.left = updated_config.left.update(patch.left) + if patch.right is not None: + updated_config.right = updated_config.right.update(patch.right) + return updated_config + + +class PipelineResourcesConfig(BaseModel): + nats: Optional[NATSResources] = Field(default=None) + sink: Optional[SinkResources] = Field(default=None) + ingestor: Optional[IngestorResources] = Field(default=None) + transform: Optional[TransformResources] = Field(default=None) + join: Optional[JoinResources] = Field(default=None) + + def update(self, patch: "PipelineResourcesConfig") -> "PipelineResourcesConfig": + """Apply a patch to this pipeline resources config.""" + updated_config = self.model_copy(deep=True) + + if patch.nats is not None: + updated_config.nats = updated_config.nats.update(patch.nats) + if patch.sink is not None: + updated_config.sink = updated_config.sink.update(patch.sink) + if patch.ingestor is not None: + updated_config.ingestor = updated_config.ingestor.update(patch.ingestor) + if patch.transform is not None: + updated_config.transform = updated_config.transform.update(patch.transform) + if patch.join is not None: + updated_config.join = updated_config.join.update(patch.join) + return updated_config diff --git a/src/glassflow/etl/models/sink.py b/src/glassflow/etl/models/sink.py index 36a3be2..29ec290 100644 --- a/src/glassflow/etl/models/sink.py +++ b/src/glassflow/etl/models/sink.py @@ -1,4 +1,4 @@ -from typing import Any, Optional +from typing import Optional from pydantic import BaseModel, Field @@ -26,39 +26,37 @@ class SinkConfig(BaseModel): def update(self, patch: "SinkConfigPatch") -> "SinkConfig": """Apply a patch to this sink config.""" - update_dict: dict[str, Any] = {} + update_dict = self.model_copy(deep=True) # Check each field explicitly to handle model instances properly if patch.provider is not None: - update_dict["provider"] = patch.provider + update_dict.provider = patch.provider if patch.host is not None: - update_dict["host"] = patch.host + update_dict.host = patch.host if patch.port is not None: - update_dict["port"] = patch.port + update_dict.port = patch.port if patch.http_port is not None: - update_dict["http_port"] = patch.http_port + update_dict.http_port = patch.http_port if patch.database is not None: - update_dict["database"] = patch.database + update_dict.database = patch.database if patch.username is not None: - update_dict["username"] = patch.username + update_dict.username = patch.username if patch.password is not None: - update_dict["password"] = patch.password + update_dict.password = patch.password if patch.secure is not None: - update_dict["secure"] = patch.secure + update_dict.secure = patch.secure if patch.skip_certificate_verification is not None: - update_dict["skip_certificate_verification"] = ( + update_dict.skip_certificate_verification = ( patch.skip_certificate_verification ) if patch.max_batch_size is not None: - update_dict["max_batch_size"] = patch.max_batch_size + update_dict.max_batch_size = patch.max_batch_size if patch.max_delay_time is not None: - update_dict["max_delay_time"] = patch.max_delay_time + update_dict.max_delay_time = patch.max_delay_time if patch.table is not None: - update_dict["table"] = patch.table + update_dict.table = patch.table - if update_dict: - return self.model_copy(update=update_dict) - return self + return update_dict class SinkConfigPatch(BaseModel): diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index dbeeb8d..c642a65 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -95,7 +95,10 @@ class TopicConfig(BaseModel): consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.LATEST name: str deduplication: Optional[DeduplicationConfig] = Field(default=DeduplicationConfig()) - replicas: Optional[int] = Field(default=1) + replicas: Optional[int] = Field( + default=1, + deprecated="Use pipeline_resources.ingestor..replicas instead", + ) @field_validator("replicas") @classmethod @@ -144,26 +147,24 @@ class SourceConfig(BaseModel): def update(self, patch: "SourceConfigPatch") -> "SourceConfig": """Apply a patch to this source config.""" - update_dict: dict[str, Any] = {} + update_dict = self.model_copy(deep=True) if patch.type is not None: - update_dict["type"] = patch.type + update_dict.type = patch.type if patch.provider is not None: - update_dict["provider"] = patch.provider + update_dict.provider = patch.provider # Handle connection_params patch if patch.connection_params is not None: - update_dict["connection_params"] = self.connection_params.update( + update_dict.connection_params = self.connection_params.update( patch.connection_params ) # Handle topics patch - full replacement only if provided if patch.topics is not None: - update_dict["topics"] = patch.topics + update_dict.topics = patch.topics - if update_dict: - return self.model_copy(update=update_dict) - return self + return update_dict class DeduplicationConfigPatch(BaseModel): diff --git a/src/glassflow/etl/models/stateless_transformation.py b/src/glassflow/etl/models/stateless_transformation.py index f41ff84..5568488 100644 --- a/src/glassflow/etl/models/stateless_transformation.py +++ b/src/glassflow/etl/models/stateless_transformation.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import List, Optional from pydantic import BaseModel, Field, model_validator @@ -63,21 +63,18 @@ def update( self, patch: "StatelessTransformationConfigPatch" ) -> "StatelessTransformationConfig": """Apply a patch to this stateless transformation config.""" - update_dict: dict[str, Any] = {} + update_dict = self.model_copy(deep=True) if patch.enabled is not None: - update_dict["enabled"] = patch.enabled + update_dict.enabled = patch.enabled if patch.id is not None: - update_dict["id"] = patch.id + update_dict.id = patch.id if patch.type is not None: - update_dict["type"] = patch.type + update_dict.type = patch.type if patch.config is not None: - update_dict["config"] = patch.config + update_dict.config = patch.config - if update_dict: - return self.model_copy(update=update_dict) - - return self + return update_dict class StatelessTransformationConfigPatch(BaseModel): diff --git a/tests/conftest.py b/tests/conftest.py index c37af4b..128c428 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -55,6 +55,12 @@ def valid_config_without_joins() -> dict: return pipeline_configs.get_valid_config_without_joins() +@pytest.fixture +def valid_config_with_pipeline_resources() -> dict: + """Fixture for a valid pipeline configuration including pipeline_resources.""" + return pipeline_configs.get_valid_config_with_pipeline_resources() + + @pytest.fixture def valid_config_with_dedup_disabled() -> dict: """Fixture for a valid pipeline configuration with deduplication disabled.""" diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py index a0f9ca6..e1f4e82 100644 --- a/tests/data/pipeline_configs.py +++ b/tests/data/pipeline_configs.py @@ -222,6 +222,43 @@ def get_valid_config_without_joins() -> dict: } +def get_valid_config_with_pipeline_resources() -> dict: + """Get a valid pipeline configuration including pipeline_resources.""" + config = copy.deepcopy(get_valid_pipeline_config()) + config["pipeline_resources"] = { + "nats": { + "stream": { + "maxAge": "72h", + "maxBytes": "1Gi", + }, + }, + "sink": { + "replicas": 2, + "requests": {"memory": "256Mi", "cpu": "100m"}, + "limits": {"memory": "512Mi", "cpu": "500m"}, + }, + "transform": { + "storage": {"size": "10Gi"}, + "replicas": 1, + "requests": {"memory": "128Mi", "cpu": "50m"}, + "limits": {"memory": "256Mi", "cpu": "200m"}, + }, + "join": { + "replicas": 1, + "requests": {"memory": "64Mi", "cpu": "25m"}, + "limits": {"memory": "128Mi", "cpu": "100m"}, + }, + "ingestor": { + "base": { + "replicas": 2, + "requests": {"memory": "128Mi", "cpu": "50m"}, + "limits": {"memory": "256Mi", "cpu": "200m"}, + }, + }, + } + return config + + def get_valid_config_with_dedup_disabled() -> dict: """Get a valid pipeline configuration with deduplication disabled.""" config = copy.deepcopy(get_valid_pipeline_config()) diff --git a/tests/test_models/test_pipeline_config.py b/tests/test_models/test_pipeline_config.py index cfe85e7..f9659f1 100644 --- a/tests/test_models/test_pipeline_config.py +++ b/tests/test_models/test_pipeline_config.py @@ -1,6 +1,7 @@ import pytest from glassflow.etl import models +from glassflow.etl.errors import ImmutableResourceError class TestPipelineConfig: @@ -115,3 +116,77 @@ def test_pipeline_config_pipeline_name_not_provided(self, valid_config): ) assert config.pipeline_id == "test-pipeline" assert config.name == "Test Pipeline" + + def test_pipeline_config_creation_with_pipeline_resources( + self, valid_config_with_pipeline_resources + ): + """Test PipelineConfig creation and validation with pipeline_resources.""" + config = models.PipelineConfig(**valid_config_with_pipeline_resources) + assert config.pipeline_id == "test-pipeline" + assert config.pipeline_resources is not None + + resources = config.pipeline_resources + # NATS / JetStream + assert resources.nats is not None + assert resources.nats.stream is not None + assert resources.nats.stream.max_age == "72h" + assert resources.nats.stream.max_bytes == "1Gi" + + # Sink + assert resources.sink is not None + assert resources.sink.replicas == 2 + assert resources.sink.requests is not None + assert resources.sink.requests.memory == "256Mi" + assert resources.sink.requests.cpu == "100m" + assert resources.sink.limits is not None + assert resources.sink.limits.memory == "512Mi" + assert resources.sink.limits.cpu == "500m" + + # Transform + assert resources.transform is not None + assert resources.transform.storage is not None + assert resources.transform.storage.size == "10Gi" + assert resources.transform.replicas == 1 + assert resources.transform.requests.memory == "128Mi" + assert resources.transform.limits.cpu == "200m" + + # Join + assert resources.join is not None + assert resources.join.replicas == 1 + assert resources.join.requests.memory == "64Mi" + assert resources.join.limits.cpu == "100m" + + # Ingestor + assert resources.ingestor is not None + assert resources.ingestor.base is not None + assert resources.ingestor.base.replicas == 2 + assert resources.ingestor.base.requests.memory == "128Mi" + + def test_pipeline_config_pipeline_resources_optional(self, valid_config): + """Test that pipeline_resources is optional and defaults to None.""" + config = models.PipelineConfig(**valid_config) + assert config.pipeline_id == "test-pipeline" + assert config.pipeline_resources is None + + def test_pipeline_config_pipeline_resources_partial(self, valid_config): + """Test PipelineConfig with partial pipeline_resources (only some sections).""" + config_data = {**valid_config, "pipeline_resources": {"sink": {"replicas": 3}}} + config = models.PipelineConfig(**config_data) + assert config.pipeline_resources is not None + assert config.pipeline_resources.sink is not None + assert config.pipeline_resources.sink.replicas == 3 + assert config.pipeline_resources.nats is None + assert config.pipeline_resources.transform is None + + def test_pipeline_config_resources_update_transform_replicas_immutable_raises( + self, valid_config + ): + """Updating pipeline_resources with transform.replicas raises.""" + config = models.PipelineConfig(**valid_config) + patch = models.PipelineConfigPatch( + pipeline_resources=models.PipelineResourcesConfig( + transform=models.TransformResources(replicas=2) + ) + ) + with pytest.raises(ImmutableResourceError) as _: + config.update(patch) diff --git a/tests/test_models/test_resources.py b/tests/test_models/test_resources.py new file mode 100644 index 0000000..6822da1 --- /dev/null +++ b/tests/test_models/test_resources.py @@ -0,0 +1,122 @@ +import pytest + +from glassflow.etl.errors import ImmutableResourceError +from glassflow.etl.models.resources import ( + JetStreamResources, + JoinResources, + NATSResources, + PipelineResourcesConfig, + Resources, + StorageResources, + TransformResources, +) + + +class TestImmutableResourceErrors: + """ + Tests for ImmutableResourceError when updating frozen pipeline resource fields. + """ + + def test_jetstream_resources_update_immutable_max_age_raises(self): + """ + Updating nats.stream max_age raises ImmutableResourceError with clear message. + """ + current = JetStreamResources(maxAge="24h", maxBytes="512Mi") + patch = JetStreamResources(maxAge="72h", maxBytes=None) + with pytest.raises(ImmutableResourceError) as exc_info: + current.update(patch) + assert "maxAge" in str(exc_info.value) + assert "maxBytes" in str(exc_info.value) + assert "nats.stream" in str(exc_info.value) + assert "immutable" in str(exc_info.value).lower() + + def test_jetstream_resources_update_immutable_max_bytes_raises(self): + """Updating nats.stream max_bytes raises ImmutableResourceError.""" + current = JetStreamResources(maxAge="24h", maxBytes="512Mi") + patch = JetStreamResources(maxAge=None, maxBytes="1GB") + with pytest.raises(ImmutableResourceError) as exc_info: + current.update(patch) + assert "immutable" in str(exc_info.value).lower() + + def test_jetstream_resources_update_empty_patch_succeeds(self): + """Updating with no frozen fields changed does not raise.""" + current = JetStreamResources(maxAge="24h", maxBytes="512Mi") + patch = JetStreamResources() + result = current.update(patch) + assert result.max_age == "24h" + assert result.max_bytes == "512Mi" + + def test_storage_resources_update_immutable_size_raises(self): + """ + Updating transform.storage size raises ImmutableResourceError. + """ + current = StorageResources(size="5Gi") + patch = StorageResources(size="10Gi") + with pytest.raises(ImmutableResourceError) as exc_info: + current.update(patch) + assert "size" in str(exc_info.value) + assert "transform.storage" in str(exc_info.value) + assert "immutable" in str(exc_info.value).lower() + + def test_storage_resources_update_empty_patch_succeeds(self): + """Updating storage with no size change does not raise.""" + current = StorageResources(size="5Gi") + patch = StorageResources() + result = current.update(patch) + assert result.size == "5Gi" + + def test_join_resources_update_immutable_replicas_raises(self): + """Updating join replicas raises ImmutableResourceError with clear message.""" + current = JoinResources(replicas=1, limits=Resources(memory="64Mi")) + patch = JoinResources(replicas=2, limits=None, requests=None) + with pytest.raises(ImmutableResourceError) as exc_info: + current.update(patch) + assert "replicas" in str(exc_info.value) + assert "join" in str(exc_info.value) + assert "immutable" in str(exc_info.value).lower() + + def test_join_resources_update_limits_and_requests_succeeds(self): + """Updating join limits/requests (mutable) does not raise.""" + current = JoinResources( + replicas=1, + limits=Resources(memory="64Mi"), + requests=Resources(memory="32Mi"), + ) + patch = JoinResources( + replicas=None, + limits=Resources(memory="128Mi", cpu="100m"), + requests=Resources(cpu="50m"), + ) + result = current.update(patch) + assert result.replicas == 1 + assert result.limits is not None + assert result.limits.memory == "128Mi" + assert result.limits.cpu == "100m" + assert result.requests is not None + assert result.requests.cpu == "50m" + + def test_pipeline_resources_config_update_nats_immutable_raises(self): + """Updating pipeline_resources with nats.stream immutable fields raises.""" + current = PipelineResourcesConfig( + nats=NATSResources( + stream=JetStreamResources(maxAge="24h", maxBytes="512Mi") + ) + ) + patch = PipelineResourcesConfig( + nats=NATSResources(stream=JetStreamResources(maxAge="72h", maxBytes=None)) + ) + with pytest.raises(ImmutableResourceError) as exc_info: + current.update(patch) + assert "nats.stream" in str(exc_info.value) + + def test_pipeline_resources_config_update_transform_storage_immutable_raises(self): + """Updating pipeline_resources with transform.storage size raises.""" + current = PipelineResourcesConfig( + transform=TransformResources(storage=StorageResources(size="5Gi")) + ) + patch = PipelineResourcesConfig( + transform=TransformResources(storage=StorageResources(size="10Gi")) + ) + with pytest.raises(ImmutableResourceError) as exc_info: + current.update(patch) + assert "transform.storage" in str(exc_info.value)