Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ async def append(self, data: bytes) -> None:
ie. `self.offset` bytes relative to the begining of the object.

This method sends the provided `data` to the GCS server in chunks.
and persists data in GCS at every `_MAX_BUFFER_SIZE_BYTES` bytes by
calling `self.simple_flush`.
and persists data in GCS at every `_DEFAULT_FLUSH_INTERVAL_BYTES` bytes
or at the last chunk whichever is earlier. Persisting is done by setting
`flush=True` on request.

:type data: bytes
:param data: The bytes to append to the object.
Expand All @@ -214,20 +215,33 @@ async def append(self, data: bytes) -> None:
while start_idx < total_bytes:
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
data_chunk = data[start_idx:end_idx]
is_last_chunk = end_idx == total_bytes
chunk_size = end_idx - start_idx
await self.write_obj_stream.send(
_storage_v2.BidiWriteObjectRequest(
write_offset=self.offset,
checksummed_data=_storage_v2.ChecksummedData(
content=data_chunk,
crc32c=int.from_bytes(Checksum(data_chunk).digest(), "big"),
),
state_lookup=is_last_chunk,
flush=is_last_chunk
or (
self.bytes_appended_since_last_flush + chunk_size
>= self.flush_interval
),
)
)
chunk_size = end_idx - start_idx
self.offset += chunk_size
self.bytes_appended_since_last_flush += chunk_size

if self.bytes_appended_since_last_flush >= self.flush_interval:
await self.simple_flush()
self.bytes_appended_since_last_flush = 0

if is_last_chunk:
response = await self.write_obj_stream.recv()
self.persisted_size = response.persisted_size
self.offset = self.persisted_size
self.bytes_appended_since_last_flush = 0
start_idx = end_idx

Expand Down Expand Up @@ -292,20 +306,24 @@ async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]
raise ValueError("Stream is not open. Call open() before close().")

if finalize_on_close:
await self.finalize()
else:
await self.flush()
return await self.finalize()

await self.write_obj_stream.close()

self._is_stream_open = False
self.offset = None
return self.object_resource if finalize_on_close else self.persisted_size
return self.persisted_size

async def finalize(self) -> _storage_v2.Object:
"""Finalizes the Appendable Object.

Note: Once finalized no more data can be appended.
This method is different from `close`. if `.close()` is called data may
still be appended to object at a later point in time by opening with
generation number.
(i.e. `open(..., generation=<object_generation_number>)`.
However if `.finalize()` is called no more data can be appended to the
object.

rtype: google.cloud.storage_v2.types.Object
returns: The finalized object resource.
Expand All @@ -322,6 +340,10 @@ async def finalize(self) -> _storage_v2.Object:
response = await self.write_obj_stream.recv()
self.object_resource = response.resource
self.persisted_size = self.object_resource.size
await self.write_obj_stream.close()

self._is_stream_open = False
self.offset = None
return self.object_resource

# helper methods.
Expand Down
9 changes: 8 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,14 @@ def system(session):
# 2021-05-06: defer installing 'google-cloud-*' to after this package,
# in order to work around Python 2.7 googolapis-common-protos
# issue.
session.install("mock", "pytest", "pytest-rerunfailures", "-c", constraints_path)
session.install(
"mock",
"pytest",
"pytest-rerunfailures",
"pytest-asyncio",
"-c",
constraints_path,
)
session.install("-e", ".", "-c", constraints_path)
session.install(
"google-cloud-testutils",
Expand Down
53 changes: 53 additions & 0 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,56 @@ async def _read_and_verify(expected_content, generation=None):
del mrd
del mrd_2
gc.collect()


@pytest.mark.asyncio
async def test_append_flushes_and_state_lookup(storage_client, blobs_to_delete):
"""
System test for AsyncAppendableObjectWriter, verifying flushing behavior
for both small and large appends.
"""
object_name = f"test-append-flush-varied-size-{uuid.uuid4()}"
grpc_client = AsyncGrpcClient().grpc_client
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)

# Schedule for cleanup
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

# --- Part 1: Test with small data ---
small_data = b"small data"

await writer.open()
assert writer._is_stream_open

await writer.append(small_data)
persisted_size = await writer.state_lookup()
assert persisted_size == len(small_data)

# --- Part 2: Test with large data ---
large_data = os.urandom(38 * 1024 * 1024)

# Append data larger than the default flush interval (16 MiB).
# This should trigger the interval-based flushing logic.
await writer.append(large_data)

# Verify the total data has been persisted.
total_size = len(small_data) + len(large_data)
persisted_size = await writer.state_lookup()
assert persisted_size == total_size

# --- Part 3: Finalize and verify ---
final_object = await writer.close(finalize_on_close=True)

assert not writer._is_stream_open
assert final_object.size == total_size

# Verify the full content of the object.
full_data = small_data + large_data
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
content = buffer.getvalue()
assert content == full_data
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ def test_update_state_from_response(self):
strategy.update_state_from_response(response2, state)
self.assertEqual(write_state.persisted_size, 1024)

final_resource = storage_type.Object(name="test-object", bucket="b", size=2048, finalize_time=datetime.now())
final_resource = storage_type.Object(
name="test-object", bucket="b", size=2048, finalize_time=datetime.now()
)
response3 = storage_type.BidiWriteObjectResponse(resource=final_resource)
strategy.update_state_from_response(response3, state)
self.assertEqual(write_state.persisted_size, 2048)
Expand Down
85 changes: 68 additions & 17 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ async def test_close(mock_write_object_stream, mock_client):
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
writer._is_stream_open = True
writer.offset = 1024
writer.persisted_size = 1024
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
mock_stream.recv = mock.AsyncMock(
Expand Down Expand Up @@ -435,16 +436,20 @@ async def test_finalize(mock_write_object_stream, mock_client):
mock_stream.recv = mock.AsyncMock(
return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource)
)
mock_stream.close = mock.AsyncMock()

gcs_object = await writer.finalize()

mock_stream.send.assert_awaited_once_with(
_storage_v2.BidiWriteObjectRequest(finish_write=True)
)
mock_stream.recv.assert_awaited_once()
mock_stream.close.assert_awaited_once()
assert writer.object_resource == mock_resource
assert writer.persisted_size == 123
assert gcs_object == mock_resource
assert writer._is_stream_open is False
assert writer.offset is None


@pytest.mark.asyncio
Expand Down Expand Up @@ -501,30 +506,39 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client
writer.persisted_size = 100
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()

data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1)
mock_stream.recv = mock.AsyncMock(
return_value=_storage_v2.BidiWriteObjectResponse(
persisted_size=100 + len(data)
)
)

await writer.append(data)

assert mock_stream.send.await_count == 2
first_call = mock_stream.send.await_args_list[0]
second_call = mock_stream.send.await_args_list[1]
first_request = mock_stream.send.await_args_list[0].args[0]
second_request = mock_stream.send.await_args_list[1].args[0]

# First chunk
assert first_call[0][0].write_offset == 100
assert len(first_call[0][0].checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES
assert first_call[0][0].checksummed_data.crc32c == int.from_bytes(
assert first_request.write_offset == 100
assert len(first_request.checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES
assert first_request.checksummed_data.crc32c == int.from_bytes(
Checksum(data[:_MAX_CHUNK_SIZE_BYTES]).digest(), byteorder="big"
)
# Second chunk
assert second_call[0][0].write_offset == 100 + _MAX_CHUNK_SIZE_BYTES
assert len(second_call[0][0].checksummed_data.content) == 1
assert second_call[0][0].checksummed_data.crc32c == int.from_bytes(
assert not first_request.flush
assert not first_request.state_lookup

# Second chunk (last chunk)
assert second_request.write_offset == 100 + _MAX_CHUNK_SIZE_BYTES
assert len(second_request.checksummed_data.content) == 1
assert second_request.checksummed_data.crc32c == int.from_bytes(
Checksum(data[_MAX_CHUNK_SIZE_BYTES:]).digest(), byteorder="big"
)
assert second_request.flush
assert second_request.state_lookup

assert writer.offset == 100 + len(data)
writer.simple_flush.assert_not_awaited()


@pytest.mark.asyncio
Expand All @@ -541,12 +555,25 @@ async def test_append_flushes_when_buffer_is_full(
writer.persisted_size = 0
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()
mock_stream.recv = mock.AsyncMock()

data = b"a" * _DEFAULT_FLUSH_INTERVAL_BYTES
await writer.append(data)

writer.simple_flush.assert_awaited_once()
num_chunks = _DEFAULT_FLUSH_INTERVAL_BYTES // _MAX_CHUNK_SIZE_BYTES
assert mock_stream.send.await_count == num_chunks

# All but the last request should not have flush or state_lookup set.
for i in range(num_chunks - 1):
request = mock_stream.send.await_args_list[i].args[0]
assert not request.flush
assert not request.state_lookup

# The last request should have flush and state_lookup set.
last_request = mock_stream.send.await_args_list[-1].args[0]
assert last_request.flush
assert last_request.state_lookup
assert writer.bytes_appended_since_last_flush == 0


@pytest.mark.asyncio
Expand All @@ -561,12 +588,18 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client):
writer.persisted_size = 0
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()
mock_stream.recv = mock.AsyncMock()

data = b"a" * (_DEFAULT_FLUSH_INTERVAL_BYTES * 2 + 1)
await writer.append(data)

assert writer.simple_flush.await_count == 2
flushed_requests = [
call.args[0] for call in mock_stream.send.await_args_list if call.args[0].flush
]
assert len(flushed_requests) == 3

last_request = mock_stream.send.await_args_list[-1].args[0]
assert last_request.state_lookup


@pytest.mark.asyncio
Expand All @@ -584,17 +617,35 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client):
writer.persisted_size = 0
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()

data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10)
mock_stream.recv = mock.AsyncMock(
return_value=_storage_v2.BidiWriteObjectResponse(
persisted_size= len(data1)
)
)
await writer.append(data1)

assert mock_stream.send.await_count == 2
last_request_data1 = mock_stream.send.await_args_list[-1].args[0]
assert last_request_data1.flush
assert last_request_data1.state_lookup

data2 = b"b" * (_MAX_CHUNK_SIZE_BYTES + 20)
mock_stream.recv = mock.AsyncMock(
return_value=_storage_v2.BidiWriteObjectResponse(
persisted_size= len(data2) + len(data1)
)
)
await writer.append(data2)

assert mock_stream.send.await_count == 4
last_request_data2 = mock_stream.send.await_args_list[-1].args[0]
assert last_request_data2.flush
assert last_request_data2.state_lookup

total_data_length = len(data1) + len(data2)
assert writer.offset == total_data_length
assert writer.simple_flush.await_count == 0


@pytest.mark.asyncio
Expand Down