diff --git a/requirements.txt b/requirements.txt index b9b8e74..a87a443 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,6 @@ botocore==1.8.9 # via boto3, s3transfer docutils==0.14 # via botocore jmespath==0.9.3 # via boto3, botocore offspring==0.0.3 -python-dateutil==2.6.1 # via botocore +python-dateutil==2.6.1 s3transfer==0.1.12 # via boto3 six==1.11.0 # via python-dateutil diff --git a/setup.py b/setup.py index 3f87011..cf40d56 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ url='https://github.com/NerdWalletOSS/kinesis-python', install_requires=[ + 'python-dateutil', 'boto3>=1.4.4,<2.0', 'offspring>=0.0.3,<1.0', 'six>=1.11.0,<2.0', diff --git a/src/kinesis/consumer.py b/src/kinesis/consumer.py index d2a3d1b..7403d19 100644 --- a/src/kinesis/consumer.py +++ b/src/kinesis/consumer.py @@ -4,6 +4,7 @@ import multiprocessing import time +import dateutil.parser import boto3 import six.moves.queue from botocore.exceptions import ClientError @@ -80,7 +81,7 @@ class KinesisConsumer(object): """ LOCK_DURATION = 30 - def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None): + def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None, start_at_timestamp=None): self.stream_name = stream_name self.error_queue = multiprocessing.Queue() self.record_queue = multiprocessing.Queue() @@ -91,7 +92,8 @@ def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_tim self.state = state self.reader_sleep_time = reader_sleep_time - + self.start_at_timestamp = start_at_timestamp + self.shards = {} self.stream_data = None self.run = True @@ -136,7 +138,15 @@ def setup_shards(self): iterator_args = self.state.get_iterator_args(self.state_shard_id(shard_data['ShardId'])) except AttributeError: # no self.state - iterator_args = dict(ShardIteratorType='LATEST') + if self.start_at_timestamp is not None: + iterator_args = { + "ShardIteratorType": 'AT_TIMESTAMP', + "Timestamp": dateutil.parser.parse( + self.start_at_timestamp + ) + } + else: + iterator_args = dict(ShardIteratorType='LATEST') log.info("%s iterator arguments: %s", shard_data['ShardId'], iterator_args)