Skip to content

Commit f868bb4

Browse files
authored
Add _len_ function to AIOProducer (#2140)
1 parent bee7343 commit f868bb4

File tree

2 files changed

+118
-0
lines changed

2 files changed

+118
-0
lines changed

src/confluent_kafka/experimental/aio/producer/_AIOProducer.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,27 @@ def __del__(self) -> None:
125125
if hasattr(self, '_buffer_timeout_manager'):
126126
self._buffer_timeout_manager.stop_timeout_monitoring()
127127

128+
def __len__(self) -> int:
129+
"""Return the total number of pending messages.
130+
131+
This includes:
132+
- Messages in librdkafka's output queue (waiting to be delivered to broker)
133+
- Messages in the async batch buffer (waiting to be sent to librdkafka)
134+
135+
Returns:
136+
int: Total number of pending messages across both queues
137+
"""
138+
if self._is_closed:
139+
return 0
140+
141+
# Count messages in librdkafka queue
142+
librdkafka_count = len(self._producer)
143+
144+
# Count messages in async batch buffer
145+
buffer_count = self._batch_processor.get_buffer_size()
146+
147+
return librdkafka_count + buffer_count
148+
128149
# ========================================================================
129150
# CORE PRODUCER OPERATIONS - Main public API
130151
# ========================================================================

tests/test_AIOProducer.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,3 +534,100 @@ async def test_edge_cases_batching(self, mock_producer, mock_common, basic_confi
534534
assert mock_flush.call_count >= 1 # At least one flush
535535

536536
await producer.close()
537+
538+
@pytest.mark.asyncio
539+
async def test_aio_producer_len_with_buffered_messages(self, mock_producer, mock_common, basic_config):
540+
"""Test that __len__ counts messages in async batch buffer"""
541+
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)
542+
543+
# Produce 5 messages (less than batch_size, so they stay in buffer)
544+
with patch.object(producer, '_flush_buffer') as mock_flush: # Prevent auto-flush
545+
for i in range(5):
546+
await producer.produce('test-topic', value=f'msg-{i}'.encode())
547+
548+
# Verify flush was not called (messages stayed in buffer)
549+
mock_flush.assert_not_called()
550+
551+
# len() should count messages in buffer
552+
assert len(producer) == 5
553+
assert producer._batch_processor.get_buffer_size() == 5
554+
assert len(producer._producer) == 0 # Nothing in librdkafka yet
555+
# Verify len() equals the sum
556+
assert len(producer) == producer._batch_processor.get_buffer_size() + len(producer._producer)
557+
558+
await producer.close()
559+
560+
@pytest.mark.asyncio
561+
async def test_aio_producer_len_after_flush(self, mock_producer, mock_common, basic_config):
562+
"""Test that __len__ counts messages after flush to librdkafka"""
563+
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)
564+
565+
# Produce and flush
566+
with patch.object(producer, '_flush_buffer'): # Prevent auto-flush
567+
for i in range(5):
568+
await producer.produce('test-topic', value=f'msg-{i}'.encode())
569+
570+
# Flush to move messages to librdkafka
571+
await producer.flush()
572+
573+
# After flush, messages move to librdkafka queue
574+
# Verify len() correctly equals the sum of buffer + librdkafka
575+
buffer_count = producer._batch_processor.get_buffer_size()
576+
librdkafka_count = len(producer._producer)
577+
total_len = len(producer)
578+
579+
# Buffer should be empty after flush
580+
assert buffer_count == 0
581+
# Verify len() equals the sum (this validates the __len__ implementation)
582+
assert total_len == buffer_count + librdkafka_count
583+
# Messages should be in librdkafka queue after flush
584+
assert total_len == librdkafka_count
585+
586+
await producer.close()
587+
588+
@pytest.mark.asyncio
589+
async def test_aio_producer_len_closed_producer(self, mock_producer, mock_common, basic_config):
590+
"""Test that __len__ returns 0 for closed producer"""
591+
producer = AIOProducer(basic_config, batch_size=10, buffer_timeout=0)
592+
593+
# Produce some messages
594+
with patch.object(producer, '_flush_buffer'): # Prevent auto-flush
595+
for i in range(3):
596+
await producer.produce('test-topic', value=f'msg-{i}'.encode())
597+
598+
# Verify messages are there
599+
assert len(producer) == 3
600+
601+
# Close producer
602+
await producer.close()
603+
604+
# len() should return 0 for closed producer
605+
assert len(producer) == 0
606+
607+
@pytest.mark.asyncio
608+
async def test_aio_producer_len_mixed_state(self, mock_producer, mock_common, basic_config):
609+
"""Test __len__ when messages are in both buffer and librdkafka queue"""
610+
producer = AIOProducer(basic_config, batch_size=5, buffer_timeout=0)
611+
612+
# Produce 7 messages - first 5 should flush (batch_size=5), last 2 stay in buffer
613+
with patch.object(producer, '_flush_buffer') as mock_flush:
614+
for i in range(7):
615+
await producer.produce('test-topic', value=f'msg-{i}'.encode())
616+
617+
# With batch_size=5, flush should be called after 5th message
618+
# Verify flush was called (at least once when batch_size reached)
619+
assert mock_flush.call_count >= 1
620+
621+
# After batch_size messages, some may have flushed
622+
# Total should be sum of buffer + librdkafka queue
623+
buffer_count = producer._batch_processor.get_buffer_size()
624+
librdkafka_count = len(producer._producer)
625+
total_count = len(producer)
626+
627+
# Verify len() correctly equals the sum (this validates the __len__ implementation)
628+
assert total_count == buffer_count + librdkafka_count
629+
# At least the messages beyond batch_size should be in buffer
630+
# (exact count depends on flush behavior)
631+
assert total_count >= 2 # At least the last 2 should be pending
632+
633+
await producer.close()

0 commit comments

Comments
 (0)