Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2011ce1
add support for setting pipeline resources
PabloPardoGarcia Feb 27, 2026
384d0dc
use allow_mutation property to validate field updates
PabloPardoGarcia Feb 27, 2026
195bf97
use frozen instead of deprecated allow_mutation
PabloPardoGarcia Feb 27, 2026
8d67bc5
format code
PabloPardoGarcia Feb 27, 2026
6dc00fa
Merge 8d67bc542419668e4fbab8ef69f5585aede40dff into b7ec931f88834fc7c…
PabloPardoGarcia Feb 27, 2026
f984731
docs: update coverage badge
github-actions[bot] Feb 27, 2026
0d73434
Merge f9847317d67da0ab9bc646666a0089c9708f617a into b7ec931f88834fc7c…
PabloPardoGarcia Feb 27, 2026
8f4a220
chore: bump version to 3.8.0
github-actions[bot] Feb 27, 2026
8546915
add resources to tests
PabloPardoGarcia Feb 27, 2026
f4700ea
Merge branch 'add-support-for-pipeline-resources' of github.com:glass…
PabloPardoGarcia Feb 27, 2026
d4e0e9c
add deprecation warning in sources.topics[].replicas
PabloPardoGarcia Mar 2, 2026
c1f6918
support update resources
PabloPardoGarcia Mar 3, 2026
4d0fb00
Merge c1f6918b7889765f15e1931c183665f8907c8caa into b7ec931f88834fc7c…
PabloPardoGarcia Mar 3, 2026
931fe07
docs: update coverage badge
github-actions[bot] Mar 3, 2026
0b834f4
unify update logic
PabloPardoGarcia Mar 6, 2026
d2f39c9
not allow update resources of transform component if deduplication is…
PabloPardoGarcia Mar 6, 2026
19d7237
Merge branch 'add-support-for-pipeline-resources' of github.com:glass…
PabloPardoGarcia Mar 6, 2026
8de7018
Merge 19d7237cf786ce7669472bb9365683fca0ad92f5 into b7ec931f88834fc7c…
PabloPardoGarcia Mar 6, 2026
93a54eb
docs: update coverage badge
github-actions[bot] Mar 6, 2026
c6c3587
fix maxAge and maxBytes name
PabloPardoGarcia Mar 10, 2026
719a20a
Merge branch 'add-support-for-pipeline-resources' of github.com:glass…
PabloPardoGarcia Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<img src="https://github.com/glassflow/glassflow-python-sdk/workflows/Test/badge.svg?labelColor=&color=e69e3a">
</a>
<!-- Pytest Coverage Comment:Begin -->
<img src=https://img.shields.io/badge/coverage-92%25-brightgreen>
<img src=https://img.shields.io/badge/coverage-90%25-brightgreen>
<!-- Pytest Coverage Comment:End -->
</p>

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.7.3
3.8.0
7 changes: 7 additions & 0 deletions src/glassflow/etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ class PipelineInvalidConfigurationError(APIError):
"""Raised when a pipeline configuration is invalid."""


class ImmutableResourceError(GlassFlowError):
"""
Raised when attempting to update a pipeline resource field that is immutable
(frozen) and cannot be changed after pipeline creation.
"""


class InvalidDataTypeMappingError(GlassFlowError):
"""Exception raised when a data type mapping is invalid."""

Expand Down
18 changes: 18 additions & 0 deletions src/glassflow/etl/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@
)
from .metadata import MetadataConfig
from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus
from .resources import (
IngestorResources,
JoinResources,
NATSResources,
PipelineResourcesConfig,
Resources,
SinkResources,
StorageResources,
TransformResources,
)
from .schema import Schema, SchemaField
from .sink import SinkConfig, SinkConfigPatch, SinkType
from .source import (
Expand Down Expand Up @@ -69,4 +79,12 @@
"StatelessTransformationType",
"ExpressionConfig",
"Transformation",
"PipelineResourcesConfig",
"TransformResources",
"JoinResources",
"NATSResources",
"SinkResources",
"IngestorResources",
"StorageResources",
"Resources",
]
12 changes: 5 additions & 7 deletions src/glassflow/etl/models/filter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Optional

from pydantic import BaseModel, Field

Expand All @@ -9,17 +9,15 @@ class FilterConfig(BaseModel):

def update(self, patch: "FilterConfigPatch") -> "FilterConfig":
"""Apply a patch to this filter config."""
update_dict: dict[str, Any] = {}
update_dict = self.model_copy(deep=True)

# Check each field explicitly to handle model instances properly
if patch.enabled is not None:
update_dict["enabled"] = patch.enabled
update_dict.enabled = patch.enabled
if patch.expression is not None:
update_dict["expression"] = patch.expression
update_dict.expression = patch.expression

if update_dict:
return self.model_copy(update=update_dict)
return self
return update_dict


class FilterConfigPatch(BaseModel):
Expand Down
14 changes: 6 additions & 8 deletions src/glassflow/etl/models/join.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import List, Optional

from pydantic import BaseModel, Field, ValidationInfo, field_validator

Expand Down Expand Up @@ -66,18 +66,16 @@ def validate_type(

def update(self, patch: "JoinConfigPatch") -> "JoinConfig":
"""Apply a patch to this join config."""
update_dict: dict[str, Any] = {}
update_dict = self.model_copy(deep=True)

if patch.enabled is not None:
update_dict["enabled"] = patch.enabled
update_dict.enabled = patch.enabled
if patch.type is not None:
update_dict["type"] = patch.type
update_dict.type = patch.type
if patch.sources is not None:
update_dict["sources"] = patch.sources
update_dict.sources = patch.sources

if update_dict:
return self.model_copy(update=update_dict)
return self
return update_dict


class JoinConfigPatch(BaseModel):
Expand Down
43 changes: 43 additions & 0 deletions src/glassflow/etl/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

from pydantic import BaseModel, Field, field_validator, model_validator

from ..errors import ImmutableResourceError
from .base import CaseInsensitiveStrEnum
from .filter import FilterConfig, FilterConfigPatch
from .join import JoinConfig, JoinConfigPatch
from .metadata import MetadataConfig
from .resources import PipelineResourcesConfig
from .schema import Schema
from .sink import SinkConfig, SinkConfigPatch
from .source import SourceConfig, SourceConfigPatch
Expand Down Expand Up @@ -41,6 +43,7 @@ class PipelineConfig(BaseModel):
stateless_transformation: Optional[StatelessTransformationConfig] = Field(
default=StatelessTransformationConfig()
)
pipeline_resources: Optional[PipelineResourcesConfig] = Field(default=None)

@field_validator("pipeline_id")
@classmethod
Expand Down Expand Up @@ -114,6 +117,16 @@ def validate_config(self) -> "PipelineConfig":

return self

def _has_deduplication_enabled(self) -> bool:
"""
Check if the pipeline has deduplication enabled.
"""
return any(
topic.deduplication and topic.deduplication.enabled
for topic in self.source.topics
if topic.deduplication is not None
)

def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
"""
Apply a patch configuration to this pipeline configuration.
Expand All @@ -131,6 +144,23 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
if config_patch.name is not None:
updated_config.name = config_patch.name

# Update pipeline resources if provided
if config_patch.pipeline_resources is not None:
if (
config_patch.pipeline_resources.transform is not None
and config_patch.pipeline_resources.transform.replicas is not None
and self._has_deduplication_enabled()
and not config_patch._has_deduplication_disabled()
):
raise ImmutableResourceError(
"Cannot update pipeline resources of a transform component if the "
"pipeline has deduplication enabled"
)

updated_config.pipeline_resources = (
updated_config.pipeline_resources or PipelineResourcesConfig()
).update(config_patch.pipeline_resources)

# Update source if provided
if config_patch.source is not None:
updated_config.source = updated_config.source.update(config_patch.source)
Expand Down Expand Up @@ -179,4 +209,17 @@ class PipelineConfigPatch(BaseModel):
stateless_transformation: Optional[StatelessTransformationConfigPatch] = Field(
default=None
)
pipeline_resources: Optional[PipelineResourcesConfig] = Field(default=None)
version: Optional[str] = Field(default=None)

def _has_deduplication_disabled(self) -> bool:
"""
Check if the pipeline has deduplication disabled.
"""
disabled = False
if self.source is not None and self.source.topics is not None:
for topic in self.source.topics:
if topic.deduplication and not topic.deduplication.enabled:
disabled = True
break
return disabled
174 changes: 174 additions & 0 deletions src/glassflow/etl/models/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from typing import Optional

from pydantic import BaseModel, Field

from glassflow.etl.errors import ImmutableResourceError


class JetStreamResources(BaseModel):
max_age: Optional[str] = Field(default=None, frozen=True, alias="maxAge")
max_bytes: Optional[str] = Field(default=None, frozen=True, alias="maxBytes")

def update(self, patch: "JetStreamResources") -> "JetStreamResources":
"""Apply a patch to this jetstream resources config."""
if patch.max_age is not None or patch.max_bytes is not None:
raise ImmutableResourceError(
"Cannot update pipeline resources: 'maxAge' and 'maxBytes' in "
"nats.stream are immutable and cannot be changed after pipeline "
"creation."
)
return self.model_copy(deep=True)


class NATSResources(BaseModel):
stream: Optional[JetStreamResources] = Field(default=None)

def update(self, patch: "NATSResources") -> "NATSResources":
"""Apply a patch to this jetstream resources config."""
updated_config = self.model_copy(deep=True)
if patch.stream is not None:
updated_config.stream = updated_config.stream.update(patch.stream)
return updated_config


class Resources(BaseModel):
memory: Optional[str] = Field(default=None)
cpu: Optional[str] = Field(default=None)

def update(self, patch: "Resources") -> "Resources":
"""Apply a patch to this resources config."""
updated_config = self.model_copy(deep=True)
if patch.memory is not None:
updated_config.memory = patch.memory
if patch.cpu is not None:
updated_config.cpu = patch.cpu
return updated_config


class StorageResources(BaseModel):
size: Optional[str] = Field(default=None, frozen=True)

def update(self, patch: "StorageResources") -> "StorageResources":
"""Apply a patch to this storage resources config."""
if patch.size is not None:
raise ImmutableResourceError(
"Cannot update pipeline resources: 'size' in transform.storage is "
"immutable and cannot be changed after pipeline creation."
)
return self.model_copy(deep=True)


class TransformResources(BaseModel):
storage: Optional[StorageResources] = Field(default=None)
replicas: Optional[int] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)

def update(self, patch: "TransformResources") -> "TransformResources":
"""Apply a patch to this transform resources config."""
updated_config = self.model_copy(deep=True)
if patch.storage is not None:
updated_config.storage = updated_config.storage.update(patch.storage)
if patch.replicas is not None:
updated_config.replicas = patch.replicas
if patch.requests is not None:
updated_config.requests = updated_config.requests.update(patch.requests)
if patch.limits is not None:
updated_config.limits = updated_config.limits.update(patch.limits)
return updated_config


class SinkResources(BaseModel):
replicas: Optional[int] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)

def update(self, patch: "SinkResources") -> "SinkResources":
"""Apply a patch to this sink resources config."""
updated_config = self.model_copy(deep=True)
if patch.replicas is not None:
updated_config.replicas = patch.replicas
if patch.requests is not None:
updated_config.requests = updated_config.requests.update(patch.requests)
if patch.limits is not None:
updated_config.limits = updated_config.limits.update(patch.limits)
return updated_config


class JoinResources(BaseModel):
limits: Optional[Resources] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
replicas: Optional[int] = Field(default=None, frozen=True)

def update(self, patch: "JoinResources") -> "JoinResources":
"""Apply a patch to this join resources config."""
if patch.replicas is not None:
raise ImmutableResourceError(
"Cannot update pipeline resources: 'replicas' in join is immutable "
"and cannot be changed after pipeline creation."
)
updated_config = self.model_copy(deep=True)
if patch.limits is not None:
updated_config.limits = updated_config.limits.update(patch.limits)
if patch.requests is not None:
updated_config.requests = updated_config.requests.update(patch.requests)
return updated_config


class IngestorPodResources(BaseModel):
replicas: Optional[int] = Field(default=None)
requests: Optional[Resources] = Field(default=None)
limits: Optional[Resources] = Field(default=None)

def update(self, patch: "IngestorPodResources") -> "IngestorPodResources":
"""Apply a patch to this ingestor pod resources config."""
updated_config = self.model_copy(deep=True)
if patch.replicas is not None:
updated_config.replicas = patch.replicas
if patch.requests is not None:
updated_config.requests = updated_config.requests.update(patch.requests)
if patch.limits is not None:
updated_config.limits = updated_config.limits.update(patch.limits)
return updated_config


class IngestorResources(BaseModel):
base: Optional[IngestorPodResources] = Field(default=None)
left: Optional[IngestorPodResources] = Field(default=None)
right: Optional[IngestorPodResources] = Field(default=None)

def update(self, patch: "IngestorResources") -> "IngestorResources":
"""Apply a patch to this ingestor resources config."""
updated_config = self.model_copy(deep=True)

if patch.base is not None:
updated_config.base = updated_config.base.update(patch.base)
if patch.left is not None:
updated_config.left = updated_config.left.update(patch.left)
if patch.right is not None:
updated_config.right = updated_config.right.update(patch.right)
return updated_config


class PipelineResourcesConfig(BaseModel):
nats: Optional[NATSResources] = Field(default=None)
sink: Optional[SinkResources] = Field(default=None)
ingestor: Optional[IngestorResources] = Field(default=None)
transform: Optional[TransformResources] = Field(default=None)
join: Optional[JoinResources] = Field(default=None)

def update(self, patch: "PipelineResourcesConfig") -> "PipelineResourcesConfig":
"""Apply a patch to this pipeline resources config."""
updated_config = self.model_copy(deep=True)

if patch.nats is not None:
updated_config.nats = updated_config.nats.update(patch.nats)
if patch.sink is not None:
updated_config.sink = updated_config.sink.update(patch.sink)
if patch.ingestor is not None:
updated_config.ingestor = updated_config.ingestor.update(patch.ingestor)
if patch.transform is not None:
updated_config.transform = updated_config.transform.update(patch.transform)
if patch.join is not None:
updated_config.join = updated_config.join.update(patch.join)
return updated_config
Loading
Loading