Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ botocore==1.8.9 # via boto3, s3transfer
docutils==0.14 # via botocore
jmespath==0.9.3 # via boto3, botocore
offspring==0.0.3
python-dateutil==2.6.1 # via botocore
python-dateutil==2.6.1
s3transfer==0.1.12 # via boto3
six==1.11.0 # via python-dateutil
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
url='https://github.com/NerdWalletOSS/kinesis-python',

install_requires=[
'python-dateutil',
'boto3>=1.4.4,<2.0',
'offspring>=0.0.3,<1.0',
'six>=1.11.0,<2.0',
Expand Down
16 changes: 13 additions & 3 deletions src/kinesis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import multiprocessing
import time

import dateutil.parser
import boto3
import six.moves.queue
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -80,7 +81,7 @@ class KinesisConsumer(object):
"""
LOCK_DURATION = 30

def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None):
def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_time=None, start_at_timestamp=None):
self.stream_name = stream_name
self.error_queue = multiprocessing.Queue()
self.record_queue = multiprocessing.Queue()
Expand All @@ -91,7 +92,8 @@ def __init__(self, stream_name, boto3_session=None, state=None, reader_sleep_tim
self.state = state

self.reader_sleep_time = reader_sleep_time

self.start_at_timestamp = start_at_timestamp

self.shards = {}
self.stream_data = None
self.run = True
Expand Down Expand Up @@ -136,7 +138,15 @@ def setup_shards(self):
iterator_args = self.state.get_iterator_args(self.state_shard_id(shard_data['ShardId']))
except AttributeError:
# no self.state
iterator_args = dict(ShardIteratorType='LATEST')
if self.start_at_timestamp is not None:
iterator_args = {
"ShardIteratorType": 'AT_TIMESTAMP',
"Timestamp": dateutil.parser.parse(
self.start_at_timestamp
)
}
else:
iterator_args = dict(ShardIteratorType='LATEST')

log.info("%s iterator arguments: %s", shard_data['ShardId'], iterator_args)

Expand Down