diff --git a/src/saluki/consume.py b/src/saluki/consume.py index 5054bac..78014b0 100644 --- a/src/saluki/consume.py +++ b/src/saluki/consume.py @@ -29,6 +29,11 @@ def consume( { "bootstrap.servers": broker, "group.id": "saluki", + "session.timeout.ms": 6000, + "auto.offset.reset": "latest", + "enable.auto.offset.store": False, + "enable.auto.commit": False, + "metadata.max.age.ms": 6000 } ) diff --git a/src/saluki/listen.py b/src/saluki/listen.py index 861e9ab..b8c22ed 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -18,6 +18,8 @@ def listen(broker: str, topic: str, partition: int | None = None) -> None: { "bootstrap.servers": broker, "group.id": "saluki", + "auto.offset.reset": "latest", + "enable.auto.commit": False, } ) c.subscribe([topic]) diff --git a/src/saluki/main.py b/src/saluki/main.py index 8e38d2f..ee0a44a 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -4,7 +4,7 @@ import logging from logging import FileHandler - +from typing import Tuple from saluki.consume import consume from saluki.listen import listen @@ -17,7 +17,7 @@ _CONSUME = "consume" -def parse_kafka_uri(uri: str) -> (str, str): +def parse_kafka_uri(uri: str) -> Tuple[str, str]: """Parse Kafka connection URI. A broker hostname/ip must be present.