diff --git a/src/kinesis/consumer.py b/src/kinesis/consumer.py index d2a3d1b..1471878 100644 --- a/src/kinesis/consumer.py +++ b/src/kinesis/consumer.py @@ -21,19 +21,20 @@ class ShardReader(SubprocessLoop): # this can be influeced per-reader instance via the sleep_time arg DEFAULT_SLEEP_TIME = 1.0 - def __init__(self, shard_id, shard_iter, record_queue, error_queue, boto3_session=None, sleep_time=None): + def __init__(self, shard_id, shard_iter, record_queue, error_queue, boto3_session=None, sleep_time=None, endpoint_url=None): self.shard_id = shard_id self.shard_iter = shard_iter self.record_queue = record_queue self.error_queue = error_queue self.boto3_session = boto3_session or boto3.Session() + self.endpoint_url = endpoint_url self.sleep_time = sleep_time or self.DEFAULT_SLEEP_TIME self.start() def begin(self): """Begin the shard reader main loop""" log.info("Shard reader for %s starting", self.shard_id) - self.client = self.boto3_session.client('kinesis') + self.client = self.boto3_session.client('kinesis', endpoint_url=self.endpoint_url) self.retries = 0 def loop(self): @@ -80,13 +81,14 @@ class KinesisConsumer(object): """ LOCK_DURATION = 30 - def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None): + def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None, endpoint_url=None): self.stream_name = stream_name self.error_queue = multiprocessing.Queue() self.record_queue = multiprocessing.Queue() self.boto3_session = boto3_session or boto3.Session() - self.kinesis_client = self.boto3_session.client('kinesis') + self.endpoint_url = endpoint_url + self.kinesis_client = self.boto3_session.client('kinesis', endpoint_url=endpoint_url) self.state = state @@ -154,6 +156,7 @@ def setup_shards(self): self.error_queue, boto3_session=self.boto3_session, sleep_time=self.reader_sleep_time, + endpoint_url=self.endpoint_url ) else: log.debug( @@ -198,6 +201,7 @@ def __iter__(self): if not self.run: break + item['ShardId']=shard_id log.debug(item) yield item diff --git a/src/kinesis/producer.py b/src/kinesis/producer.py index 0ca9816..fe3554f 100644 --- a/src/kinesis/producer.py +++ b/src/kinesis/producer.py @@ -59,7 +59,7 @@ class AsyncProducer(SubprocessLoop): MAX_SIZE = (2 ** 20) MAX_COUNT = 500 - def __init__(self, stream_name, buffer_time, queue, max_count=None, max_size=None, boto3_session=None): + def __init__(self, stream_name, buffer_time, queue, max_count=None, max_size=None, boto3_session=None, endpoint_url=None): self.stream_name = stream_name self.buffer_time = buffer_time self.queue = queue @@ -71,7 +71,8 @@ def __init__(self, stream_name, buffer_time, queue, max_count=None, max_size=Non if boto3_session is None: boto3_session = boto3.Session() - self.client = boto3_session.client('kinesis') + self.endpoint_url = endpoint_url + self.client = boto3_session.client('kinesis', endpoint_url=endpoint_url) self.start() @@ -135,10 +136,10 @@ def flush_records(self): class KinesisProducer(object): """Produce to Kinesis streams via an AsyncProducer""" - def __init__(self, stream_name, buffer_time=0.5, max_count=None, max_size=None, boto3_session=None): + def __init__(self, stream_name, buffer_time=0.5, max_count=None, max_size=None, boto3_session=None, endpoint_url=None): self.queue = multiprocessing.Queue() self.async_producer = AsyncProducer(stream_name, buffer_time, self.queue, max_count=max_count, - max_size=max_size, boto3_session=boto3_session) + max_size=max_size, boto3_session=boto3_session, endpoint_url=endpoint_url) def put(self, data, explicit_hash_key=None, partition_key=None): self.queue.put((data, explicit_hash_key, partition_key)) diff --git a/src/kinesis/state.py b/src/kinesis/state.py index 9808fc4..a16ab64 100644 --- a/src/kinesis/state.py +++ b/src/kinesis/state.py @@ -13,10 +13,10 @@ class DynamoDB(object): - def __init__(self, table_name, boto3_session=None): + def __init__(self, table_name, boto3_session=None, endpoint_url=None): self.boto3_session = boto3_session or boto3.Session() - self.dynamo_resource = self.boto3_session.resource('dynamodb') + self.dynamo_resource = self.boto3_session.resource('dynamodb', endpoint_url=endpoint_url) self.dynamo_table = self.dynamo_resource.Table(table_name) self.shards = {} diff --git a/test/test_consumer.py b/test/test_consumer.py index e160cc4..677312d 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -12,7 +12,7 @@ def test_setup_shards(mocker): consumer = KinesisConsumer('testing', boto3_session=mock_boto3_session) - mock_boto3_session.client.assert_called_with('kinesis') + mock_boto3_session.client.assert_called_with('kinesis', endpoint_url=None) consumer.kinesis_client.describe_stream.return_value = { 'StreamDescription': { @@ -42,5 +42,6 @@ def test_setup_shards(mocker): consumer.record_queue, consumer.error_queue, boto3_session=consumer.boto3_session, + endpoint_url=None, sleep_time=consumer.reader_sleep_time ) diff --git a/test/test_producer.py b/test/test_producer.py index 0eafb5e..35a9c0a 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -12,7 +12,8 @@ def test_producer(mocker): producer.queue, max_count=None, max_size=None, - boto3_session=None + boto3_session=None, + endpoint_url=None ) mocked_queue = mocker.patch.object(producer, 'queue')