diff --git a/VERSION b/VERSION index 0b2eb36..c1e43e6 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.7.2 +3.7.3 diff --git a/src/glassflow/etl/dlq.py b/src/glassflow/etl/dlq.py index a484fe8..ebb55d4 100644 --- a/src/glassflow/etl/dlq.py +++ b/src/glassflow/etl/dlq.py @@ -40,9 +40,9 @@ def consume(self, batch_size: int = 100) -> List[Dict[str, Any]]: "GET", f"{self.endpoint}/consume", params={"batch_size": batch_size} ) response.raise_for_status() - if response.status_code != 204: - return response.json() - return [] + if response.status_code == 204 or not response.content: + return [] + return response.json() except errors.UnprocessableContentError as e: raise InvalidBatchSizeError( f"Invalid batch size: batch size should be larger than 1 " diff --git a/src/glassflow/etl/models/stateless_transformation.py b/src/glassflow/etl/models/stateless_transformation.py index 07a7f28..f41ff84 100644 --- a/src/glassflow/etl/models/stateless_transformation.py +++ b/src/glassflow/etl/models/stateless_transformation.py @@ -16,7 +16,9 @@ class Transformation(BaseModel): class ExpressionConfig(BaseModel): - transform: List[Transformation] = Field(description="The transformation expression") + transform: Optional[List[Transformation]] = Field( + description="The transformation expression", default=None + ) class StatelessTransformationConfig(BaseModel): @@ -50,6 +52,11 @@ def validate(self) -> "StatelessTransformationConfig": raise ValueError( "config is required when stateless transformation is enabled" ) + else: + if not self.config.transform: + raise ValueError( + "transform is required when stateless transformation is enabled" + ) return self def update( diff --git a/tests/test_dlq.py b/tests/test_dlq.py index 9ce4243..a4230e4 100644 --- a/tests/test_dlq.py +++ b/tests/test_dlq.py @@ -33,6 +33,45 @@ def test_consume_success(self, dlq, mock_success): {"id": "msg2", "content": "test message 2"}, ] + def test_consume_returns_empty_list_when_no_messages(self, dlq, mock_success): + """Test DLQ consume returns empty list when there are no messages to consume.""" + with mock_success(json_payloads=[[]]) as mock_get: + result = dlq.consume(batch_size=50) + + mock_get.assert_called_once_with( + "GET", f"{dlq.endpoint}/consume", params={"batch_size": 50} + ) + assert result == [] + + def test_consume_returns_empty_list_on_204_no_content(self, dlq): + """Test DLQ consume returns empty list when API responds with 204 No Content.""" + mock_response = mock_responses.create_mock_response_factory()( + status_code=204, + json_data=None, + ) + with patch("httpx.Client.request", return_value=mock_response) as mock_get: + result = dlq.consume(batch_size=50) + + mock_get.assert_called_once_with( + "GET", f"{dlq.endpoint}/consume", params={"batch_size": 50} + ) + assert result == [] + + def test_consume_returns_empty_list_on_200_empty_body(self, dlq): + """Test DLQ consume returns empty list when API returns 200 with empty body.""" + mock_response = mock_responses.create_mock_response_factory()( + status_code=200, + json_data=None, + ) + mock_response.content = b"" + with patch("httpx.Client.request", return_value=mock_response) as mock_get: + result = dlq.consume(batch_size=50) + + mock_get.assert_called_once_with( + "GET", f"{dlq.endpoint}/consume", params={"batch_size": 50} + ) + assert result == [] + @pytest.mark.parametrize( "scenario", [