diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index b4f40b423..c808cb52a 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -188,8 +188,9 @@ async def append(self, data: bytes) -> None: ie. `self.offset` bytes relative to the begining of the object. This method sends the provided `data` to the GCS server in chunks. - and persists data in GCS at every `_MAX_BUFFER_SIZE_BYTES` bytes by - calling `self.simple_flush`. + and persists data in GCS at every `_DEFAULT_FLUSH_INTERVAL_BYTES` bytes + or at the last chunk whichever is earlier. Persisting is done by setting + `flush=True` on request. :type data: bytes :param data: The bytes to append to the object. @@ -214,6 +215,8 @@ async def append(self, data: bytes) -> None: while start_idx < total_bytes: end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes) data_chunk = data[start_idx:end_idx] + is_last_chunk = end_idx == total_bytes + chunk_size = end_idx - start_idx await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest( write_offset=self.offset, @@ -221,13 +224,24 @@ async def append(self, data: bytes) -> None: content=data_chunk, crc32c=int.from_bytes(Checksum(data_chunk).digest(), "big"), ), + state_lookup=is_last_chunk, + flush=is_last_chunk + or ( + self.bytes_appended_since_last_flush + chunk_size + >= self.flush_interval + ), ) ) - chunk_size = end_idx - start_idx self.offset += chunk_size self.bytes_appended_since_last_flush += chunk_size + if self.bytes_appended_since_last_flush >= self.flush_interval: - await self.simple_flush() + self.bytes_appended_since_last_flush = 0 + + if is_last_chunk: + response = await self.write_obj_stream.recv() + self.persisted_size = response.persisted_size + self.offset = self.persisted_size self.bytes_appended_since_last_flush = 0 start_idx = end_idx @@ -292,20 +306,24 @@ async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object] raise ValueError("Stream is not open. Call open() before close().") if finalize_on_close: - await self.finalize() - else: - await self.flush() + return await self.finalize() await self.write_obj_stream.close() self._is_stream_open = False self.offset = None - return self.object_resource if finalize_on_close else self.persisted_size + return self.persisted_size async def finalize(self) -> _storage_v2.Object: """Finalizes the Appendable Object. Note: Once finalized no more data can be appended. + This method is different from `close`. if `.close()` is called data may + still be appended to object at a later point in time by opening with + generation number. + (i.e. `open(..., generation=)`. + However if `.finalize()` is called no more data can be appended to the + object. rtype: google.cloud.storage_v2.types.Object returns: The finalized object resource. @@ -322,6 +340,10 @@ async def finalize(self) -> _storage_v2.Object: response = await self.write_obj_stream.recv() self.object_resource = response.resource self.persisted_size = self.object_resource.size + await self.write_obj_stream.close() + + self._is_stream_open = False + self.offset = None return self.object_resource # helper methods. diff --git a/noxfile.py b/noxfile.py index 4f77d1e2d..1cef2a75f 100644 --- a/noxfile.py +++ b/noxfile.py @@ -192,7 +192,14 @@ def system(session): # 2021-05-06: defer installing 'google-cloud-*' to after this package, # in order to work around Python 2.7 googolapis-common-protos # issue. - session.install("mock", "pytest", "pytest-rerunfailures", "-c", constraints_path) + session.install( + "mock", + "pytest", + "pytest-rerunfailures", + "pytest-asyncio", + "-c", + constraints_path, + ) session.install("-e", ".", "-c", constraints_path) session.install( "google-cloud-testutils", diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index 8dc1c5af9..2ade654ad 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -307,3 +307,56 @@ async def _read_and_verify(expected_content, generation=None): del mrd del mrd_2 gc.collect() + + +@pytest.mark.asyncio +async def test_append_flushes_and_state_lookup(storage_client, blobs_to_delete): + """ + System test for AsyncAppendableObjectWriter, verifying flushing behavior + for both small and large appends. + """ + object_name = f"test-append-flush-varied-size-{uuid.uuid4()}" + grpc_client = AsyncGrpcClient().grpc_client + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + + # Schedule for cleanup + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + + # --- Part 1: Test with small data --- + small_data = b"small data" + + await writer.open() + assert writer._is_stream_open + + await writer.append(small_data) + persisted_size = await writer.state_lookup() + assert persisted_size == len(small_data) + + # --- Part 2: Test with large data --- + large_data = os.urandom(38 * 1024 * 1024) + + # Append data larger than the default flush interval (16 MiB). + # This should trigger the interval-based flushing logic. + await writer.append(large_data) + + # Verify the total data has been persisted. + total_size = len(small_data) + len(large_data) + persisted_size = await writer.state_lookup() + assert persisted_size == total_size + + # --- Part 3: Finalize and verify --- + final_object = await writer.close(finalize_on_close=True) + + assert not writer._is_stream_open + assert final_object.size == total_size + + # Verify the full content of the object. + full_data = small_data + large_data + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + content = buffer.getvalue() + assert content == full_data diff --git a/tests/unit/asyncio/retry/test_writes_resumption_strategy.py b/tests/unit/asyncio/retry/test_writes_resumption_strategy.py index a68578636..7d8b7934e 100644 --- a/tests/unit/asyncio/retry/test_writes_resumption_strategy.py +++ b/tests/unit/asyncio/retry/test_writes_resumption_strategy.py @@ -206,7 +206,9 @@ def test_update_state_from_response(self): strategy.update_state_from_response(response2, state) self.assertEqual(write_state.persisted_size, 1024) - final_resource = storage_type.Object(name="test-object", bucket="b", size=2048, finalize_time=datetime.now()) + final_resource = storage_type.Object( + name="test-object", bucket="b", size=2048, finalize_time=datetime.now() + ) response3 = storage_type.BidiWriteObjectResponse(resource=final_resource) strategy.update_state_from_response(response3, state) self.assertEqual(write_state.persisted_size, 2048) diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 31013f9a7..07f7047d8 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -364,6 +364,7 @@ async def test_close(mock_write_object_stream, mock_client): writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) writer._is_stream_open = True writer.offset = 1024 + writer.persisted_size = 1024 mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() mock_stream.recv = mock.AsyncMock( @@ -435,6 +436,7 @@ async def test_finalize(mock_write_object_stream, mock_client): mock_stream.recv = mock.AsyncMock( return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) ) + mock_stream.close = mock.AsyncMock() gcs_object = await writer.finalize() @@ -442,9 +444,12 @@ async def test_finalize(mock_write_object_stream, mock_client): _storage_v2.BidiWriteObjectRequest(finish_write=True) ) mock_stream.recv.assert_awaited_once() + mock_stream.close.assert_awaited_once() assert writer.object_resource == mock_resource assert writer.persisted_size == 123 assert gcs_object == mock_resource + assert writer._is_stream_open is False + assert writer.offset is None @pytest.mark.asyncio @@ -501,30 +506,39 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client writer.persisted_size = 100 mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - writer.simple_flush = mock.AsyncMock() data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1) + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse( + persisted_size=100 + len(data) + ) + ) + await writer.append(data) assert mock_stream.send.await_count == 2 - first_call = mock_stream.send.await_args_list[0] - second_call = mock_stream.send.await_args_list[1] + first_request = mock_stream.send.await_args_list[0].args[0] + second_request = mock_stream.send.await_args_list[1].args[0] # First chunk - assert first_call[0][0].write_offset == 100 - assert len(first_call[0][0].checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES - assert first_call[0][0].checksummed_data.crc32c == int.from_bytes( + assert first_request.write_offset == 100 + assert len(first_request.checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES + assert first_request.checksummed_data.crc32c == int.from_bytes( Checksum(data[:_MAX_CHUNK_SIZE_BYTES]).digest(), byteorder="big" ) - # Second chunk - assert second_call[0][0].write_offset == 100 + _MAX_CHUNK_SIZE_BYTES - assert len(second_call[0][0].checksummed_data.content) == 1 - assert second_call[0][0].checksummed_data.crc32c == int.from_bytes( + assert not first_request.flush + assert not first_request.state_lookup + + # Second chunk (last chunk) + assert second_request.write_offset == 100 + _MAX_CHUNK_SIZE_BYTES + assert len(second_request.checksummed_data.content) == 1 + assert second_request.checksummed_data.crc32c == int.from_bytes( Checksum(data[_MAX_CHUNK_SIZE_BYTES:]).digest(), byteorder="big" ) + assert second_request.flush + assert second_request.state_lookup assert writer.offset == 100 + len(data) - writer.simple_flush.assert_not_awaited() @pytest.mark.asyncio @@ -541,12 +555,25 @@ async def test_append_flushes_when_buffer_is_full( writer.persisted_size = 0 mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - writer.simple_flush = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock() data = b"a" * _DEFAULT_FLUSH_INTERVAL_BYTES await writer.append(data) - writer.simple_flush.assert_awaited_once() + num_chunks = _DEFAULT_FLUSH_INTERVAL_BYTES // _MAX_CHUNK_SIZE_BYTES + assert mock_stream.send.await_count == num_chunks + + # All but the last request should not have flush or state_lookup set. + for i in range(num_chunks - 1): + request = mock_stream.send.await_args_list[i].args[0] + assert not request.flush + assert not request.state_lookup + + # The last request should have flush and state_lookup set. + last_request = mock_stream.send.await_args_list[-1].args[0] + assert last_request.flush + assert last_request.state_lookup + assert writer.bytes_appended_since_last_flush == 0 @pytest.mark.asyncio @@ -561,12 +588,18 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client): writer.persisted_size = 0 mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - writer.simple_flush = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock() data = b"a" * (_DEFAULT_FLUSH_INTERVAL_BYTES * 2 + 1) await writer.append(data) - assert writer.simple_flush.await_count == 2 + flushed_requests = [ + call.args[0] for call in mock_stream.send.await_args_list if call.args[0].flush + ] + assert len(flushed_requests) == 3 + + last_request = mock_stream.send.await_args_list[-1].args[0] + assert last_request.state_lookup @pytest.mark.asyncio @@ -584,17 +617,35 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client): writer.persisted_size = 0 mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - writer.simple_flush = mock.AsyncMock() data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10) + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse( + persisted_size= len(data1) + ) + ) await writer.append(data1) + assert mock_stream.send.await_count == 2 + last_request_data1 = mock_stream.send.await_args_list[-1].args[0] + assert last_request_data1.flush + assert last_request_data1.state_lookup + data2 = b"b" * (_MAX_CHUNK_SIZE_BYTES + 20) + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse( + persisted_size= len(data2) + len(data1) + ) + ) await writer.append(data2) + assert mock_stream.send.await_count == 4 + last_request_data2 = mock_stream.send.await_args_list[-1].args[0] + assert last_request_data2.flush + assert last_request_data2.state_lookup + total_data_length = len(data1) + len(data2) assert writer.offset == total_data_length - assert writer.simple_flush.await_count == 0 @pytest.mark.asyncio