Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/kinesis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ class ShardReader(SubprocessLoop):
# this can be influeced per-reader instance via the sleep_time arg
DEFAULT_SLEEP_TIME = 1.0

def __init__(self, shard_id, shard_iter, record_queue, error_queue, boto3_session=None, sleep_time=None):
def __init__(self, shard_id, shard_iter, record_queue, error_queue, boto3_session=None, sleep_time=None, endpoint_url=None):
self.shard_id = shard_id
self.shard_iter = shard_iter
self.record_queue = record_queue
self.error_queue = error_queue
self.boto3_session = boto3_session or boto3.Session()
self.endpoint_url = endpoint_url
self.sleep_time = sleep_time or self.DEFAULT_SLEEP_TIME
self.start()

def begin(self):
"""Begin the shard reader main loop"""
log.info("Shard reader for %s starting", self.shard_id)
self.client = self.boto3_session.client('kinesis')
self.client = self.boto3_session.client('kinesis', endpoint_url=self.endpoint_url)
self.retries = 0

def loop(self):
Expand Down Expand Up @@ -80,13 +81,14 @@ class KinesisConsumer(object):
"""
LOCK_DURATION = 30

def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None):
def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None, endpoint_url=None):
self.stream_name = stream_name
self.error_queue = multiprocessing.Queue()
self.record_queue = multiprocessing.Queue()

self.boto3_session = boto3_session or boto3.Session()
self.kinesis_client = self.boto3_session.client('kinesis')
self.endpoint_url = endpoint_url
self.kinesis_client = self.boto3_session.client('kinesis', endpoint_url=endpoint_url)

self.state = state

Expand Down Expand Up @@ -154,6 +156,7 @@ def setup_shards(self):
self.error_queue,
boto3_session=self.boto3_session,
sleep_time=self.reader_sleep_time,
endpoint_url=self.endpoint_url
)
else:
log.debug(
Expand Down Expand Up @@ -198,6 +201,7 @@ def __iter__(self):
if not self.run:
break

item['ShardId']=shard_id
log.debug(item)
yield item
Comment on lines +204 to 206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like the idea mutating the item.

Instead of adding the ShardId to the item what do you think about adding another option to init (something like return_shard_id=False) that if true would change the yield to:

if self.return_shard_id:
    yield shard_id, item
else:
    yield item


Expand Down
9 changes: 5 additions & 4 deletions src/kinesis/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AsyncProducer(SubprocessLoop):
MAX_SIZE = (2 ** 20)
MAX_COUNT = 500

def __init__(self, stream_name, buffer_time, queue, max_count=None, max_size=None, boto3_session=None):
def __init__(self, stream_name, buffer_time, queue, max_count=None, max_size=None, boto3_session=None, endpoint_url=None):
self.stream_name = stream_name
self.buffer_time = buffer_time
self.queue = queue
Expand All @@ -71,7 +71,8 @@ def __init__(self, stream_name, buffer_time, queue, max_count=None, max_size=Non

if boto3_session is None:
boto3_session = boto3.Session()
self.client = boto3_session.client('kinesis')
self.endpoint_url = endpoint_url
self.client = boto3_session.client('kinesis', endpoint_url=endpoint_url)

self.start()

Expand Down Expand Up @@ -135,10 +136,10 @@ def flush_records(self):
class KinesisProducer(object):
"""Produce to Kinesis streams via an AsyncProducer"""

def __init__(self, stream_name, buffer_time=0.5, max_count=None, max_size=None, boto3_session=None):
def __init__(self, stream_name, buffer_time=0.5, max_count=None, max_size=None, boto3_session=None, endpoint_url=None):
self.queue = multiprocessing.Queue()
self.async_producer = AsyncProducer(stream_name, buffer_time, self.queue, max_count=max_count,
max_size=max_size, boto3_session=boto3_session)
max_size=max_size, boto3_session=boto3_session, endpoint_url=endpoint_url)

def put(self, data, explicit_hash_key=None, partition_key=None):
self.queue.put((data, explicit_hash_key, partition_key))
4 changes: 2 additions & 2 deletions src/kinesis/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@


class DynamoDB(object):
def __init__(self, table_name, boto3_session=None):
def __init__(self, table_name, boto3_session=None, endpoint_url=None):
self.boto3_session = boto3_session or boto3.Session()

self.dynamo_resource = self.boto3_session.resource('dynamodb')
self.dynamo_resource = self.boto3_session.resource('dynamodb', endpoint_url=endpoint_url)
self.dynamo_table = self.dynamo_resource.Table(table_name)

self.shards = {}
Expand Down
3 changes: 2 additions & 1 deletion test/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_setup_shards(mocker):

consumer = KinesisConsumer('testing', boto3_session=mock_boto3_session)

mock_boto3_session.client.assert_called_with('kinesis')
mock_boto3_session.client.assert_called_with('kinesis', endpoint_url=None)

consumer.kinesis_client.describe_stream.return_value = {
'StreamDescription': {
Expand Down Expand Up @@ -42,5 +42,6 @@ def test_setup_shards(mocker):
consumer.record_queue,
consumer.error_queue,
boto3_session=consumer.boto3_session,
endpoint_url=None,
sleep_time=consumer.reader_sleep_time
)
3 changes: 2 additions & 1 deletion test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ def test_producer(mocker):
producer.queue,
max_count=None,
max_size=None,
boto3_session=None
boto3_session=None,
endpoint_url=None
)

mocked_queue = mocker.patch.object(producer, 'queue')
Expand Down