From a04f20e3afe5644101fefabab32e7b6a4b6ddfd6 Mon Sep 17 00:00:00 2001 From: Benoit Brayer Date: Mon, 30 Sep 2019 10:00:34 +0200 Subject: [PATCH 1/5] Add support to "AT_TIMESTAMP" ShardIteratorType As documented here: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.get_shard_iterator I think those lines may be enough to support AT_TIMESTAMP behavior. It is not tested yet. It should be used this way: ```python3 from kinesis.consumer import KinesisConsumer consumer = KinesisConsumer(stream_name='my-stream', start_at_timestamp='20190901') for message in consumer: print "Received message: {0}".format(message) ``` Regards, M0dM --- src/kinesis/consumer.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/kinesis/consumer.py b/src/kinesis/consumer.py index d2a3d1b..aeb9605 100644 --- a/src/kinesis/consumer.py +++ b/src/kinesis/consumer.py @@ -80,7 +80,7 @@ class KinesisConsumer(object): """ LOCK_DURATION = 30 - def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None): + def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None, start_at_timestamp=None): self.stream_name = stream_name self.error_queue = multiprocessing.Queue() self.record_queue = multiprocessing.Queue() @@ -91,7 +91,8 @@ def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_tim self.state = state self.reader_sleep_time = reader_sleep_time - + self.start_at_timestamp = start_at_timestamp + self.shards = {} self.stream_data = None self.run = True @@ -136,7 +137,15 @@ def setup_shards(self): iterator_args = self.state.get_iterator_args(self.state_shard_id(shard_data['ShardId'])) except AttributeError: # no self.state - iterator_args = dict(ShardIteratorType='LATEST') + if self.start_at_timestamp is not None: + iterator_args = { + "ShardIteratorType"='AT_TIMESTAMP', + "Timestamp"=dateutil.parser.parse( + self.start_at_timestamp + ) + } + else: + iterator_args = dict(ShardIteratorType='LATEST') log.info("%s iterator arguments: %s", shard_data['ShardId'], iterator_args) From 954c9e431902eb207ca0cb10536a0f6dbd5cf604 Mon Sep 17 00:00:00 2001 From: Benoit Brayer Date: Mon, 30 Sep 2019 10:11:41 +0200 Subject: [PATCH 2/5] Fix dict typo --- src/kinesis/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/kinesis/consumer.py b/src/kinesis/consumer.py index aeb9605..1c020e7 100644 --- a/src/kinesis/consumer.py +++ b/src/kinesis/consumer.py @@ -139,8 +139,8 @@ def setup_shards(self): # no self.state if self.start_at_timestamp is not None: iterator_args = { - "ShardIteratorType"='AT_TIMESTAMP', - "Timestamp"=dateutil.parser.parse( + "ShardIteratorType": 'AT_TIMESTAMP', + "Timestamp": dateutil.parser.parse( self.start_at_timestamp ) } From ce4257c3167a3a013d563957ca870959c71a0406 Mon Sep 17 00:00:00 2001 From: Benoit Brayer Date: Mon, 30 Sep 2019 15:31:29 +0200 Subject: [PATCH 3/5] Fix missing dateutil.parser import --- src/kinesis/consumer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/kinesis/consumer.py b/src/kinesis/consumer.py index 1c020e7..7403d19 100644 --- a/src/kinesis/consumer.py +++ b/src/kinesis/consumer.py @@ -4,6 +4,7 @@ import multiprocessing import time +import dateutil.parser import boto3 import six.moves.queue from botocore.exceptions import ClientError From e30b81191917a25542a7540585513f890b359471 Mon Sep 17 00:00:00 2001 From: Benoit Brayer Date: Mon, 30 Sep 2019 15:33:38 +0200 Subject: [PATCH 4/5] Add python-dateutil missing dependency --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 3f87011..cf40d56 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ url='https://github.com/NerdWalletOSS/kinesis-python', install_requires=[ + 'python-dateutil', 'boto3>=1.4.4,<2.0', 'offspring>=0.0.3,<1.0', 'six>=1.11.0,<2.0', From 3c526ee1e1b0626022ef8ba610d7769925aa3ea8 Mon Sep 17 00:00:00 2001 From: Benoit Brayer Date: Mon, 30 Sep 2019 15:34:07 +0200 Subject: [PATCH 5/5] python-dateutil is now a direct dependency. --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b9b8e74..a87a443 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,6 @@ botocore==1.8.9 # via boto3, s3transfer docutils==0.14 # via botocore jmespath==0.9.3 # via boto3, botocore offspring==0.0.3 -python-dateutil==2.6.1 # via botocore +python-dateutil==2.6.1 s3transfer==0.1.12 # via boto3 six==1.11.0 # via python-dateutil