diff --git a/quixstreams/app.py b/quixstreams/app.py index 30d71ab4e..7a7e3a81c 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -7,6 +7,8 @@ import uuid import warnings from collections import defaultdict +from datetime import datetime +from itertools import chain from pathlib import Path from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union, cast @@ -30,6 +32,8 @@ from .logging import LogLevel, configure_logging from .models import ( DeserializerType, + MessageContext, + Row, SerializerType, TimestampExtractor, Topic, @@ -152,6 +156,7 @@ def __init__( topic_create_timeout: float = 60, processing_guarantee: ProcessingGuarantee = "at-least-once", max_partition_buffer_size: int = 10000, + wall_clock_interval: float = 0.0, ): """ :param broker_address: Connection settings for Kafka. @@ -220,6 +225,12 @@ def __init__( It is a soft limit, and the actual number of buffered messages can be up to x2 higher. Lower value decreases the memory use, but increases the latency. Default - `10000`. + :param wall_clock_interval: the interval (seconds) at which to invoke + the registered wall clock logic. + The wall clock timing starts counting from application start. + TODO: Save and respect last wall clock timestamp. + If the value is 0, no wall clock logic will be invoked. + Default - `0.0`.

***Error Handlers***
To handle errors, `Application` accepts callbacks triggered when @@ -371,6 +382,10 @@ def __init__( recovery_manager=recovery_manager, ) + self._wall_clock_active = wall_clock_interval > 0 + self._wall_clock_interval = wall_clock_interval + self._wall_clock_last_sent = datetime.now().timestamp() + self._source_manager = SourceManager() self._sink_manager = SinkManager() self._dataframe_registry = DataFrameRegistry() @@ -900,6 +915,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): processing_context = self._processing_context source_manager = self._source_manager process_message = self._process_message + process_wall_clock = self._process_wall_clock printer = self._processing_context.printer run_tracker = self._run_tracker consumer = self._consumer @@ -912,6 +928,9 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): ) dataframes_composed = self._dataframe_registry.compose_all(sink=sink) + wall_clock_executors = self._dataframe_registry.compose_wall_clock() + if not wall_clock_executors: + self._wall_clock_active = False processing_context.init_checkpoint() run_tracker.set_as_running() @@ -923,6 +942,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): run_tracker.timeout_refresh() else: process_message(dataframes_composed) + process_wall_clock(wall_clock_executors) processing_context.commit_checkpoint() consumer.resume_backpressured() source_manager.raise_for_error() @@ -1006,6 +1026,97 @@ def _process_message(self, dataframe_composed): if self._on_message_processed is not None: self._on_message_processed(topic_name, partition, offset) + def _process_wall_clock(self, wall_clock_executors): + # Emit time-based "ticks" when the wall-clock interval elapses. + # For each executor (grouped by topics), select one partition per partition id + # and determine an offset to include in MessageContext. + if not self._wall_clock_active: + return + + # Rate-limit by interval; skip until enough time has elapsed since last send. + now = datetime.now().timestamp() + if self._wall_clock_last_sent > now - self._wall_clock_interval: + return + + # Synthetic "tick" payload (no value/key, headers empty, timestamp in ms). + value, key, timestamp, headers = None, None, int(now * 1000), {} + + # In-flight processed offsets within the current (open) checkpoint. + processed_offsets = self._processing_context.checkpoint.tp_offsets + # Only consider currently assigned topic-partitions. + assigned_tps = self._consumer.assignment() + # Cache known offsets to avoid resolving them multiple times for different executors. + # Keyed by (topic, partition) to avoid relying on TopicPartition instance identity. + known_offsets: dict[tuple[str, int], int] = {} + + for topics, executor in wall_clock_executors: + # candidate_partitions: partitions still needing an offset resolved + candidate_partitions: dict[int, set[TopicPartition]] = defaultdict(set) + # selected_partitions: final partition_id -> (topic, offset) + selected_partitions: dict[int, tuple[str, int]] = {} + + for tp in assigned_tps: + known_offset = known_offsets.get((tp.topic, tp.partition)) + if known_offset is not None: + selected_partitions[tp.partition] = (tp.topic, known_offset) + continue + + if tp.topic in topics and tp.partition not in selected_partitions: + # Prefer the most recent known processed offset if available. + if processed_offset := processed_offsets.get( + (tp.topic, tp.partition) + ): + # Use offset from the in-flight checkpoint. + selected_partitions[tp.partition] = (tp.topic, processed_offset) + known_offsets[(tp.topic, tp.partition)] = processed_offset + else: + # Will resolve via committed offsets below. + candidate_partitions[tp.partition].add(tp) + + if candidate_partitions: + # Best-effort: fetch committed offsets in batch for unresolved partitions. + committed_tps = self._consumer.committed( + list(chain(*candidate_partitions.values())), timeout=30 + ) + for tp in committed_tps: + if tp.error: + raise RuntimeError( + f"Failed to get committed offsets for " + f'"{tp.topic}[{tp.partition}]" from the broker: {tp.error}' + ) + if tp.partition not in selected_partitions: + # Committed offset is "next to consume"; last processed is offset - 1. + # The "invalid/unset" broker offset is negative. + offset = tp.offset - 1 if tp.offset >= 0 else tp.offset + selected_partitions[tp.partition] = (tp.topic, offset) + known_offsets[(tp.topic, tp.partition)] = offset + + # Execute callback for each selected topic-partition with its offset. + for partition, (topic, offset) in selected_partitions.items(): + row = Row( + value=value, + key=key, + timestamp=timestamp, + context=MessageContext( + topic=topic, + partition=partition, + offset=offset, + size=-1, + ), + headers=headers, + ) + context = copy_context() + context.run(set_message_context, row.context) + try: + context.run(executor, value, key, timestamp, headers) + except Exception as exc: + to_suppress = self._on_processing_error(exc, row, logger) + if not to_suppress: + raise + + # Record the emission time for rate-limiting. + self._wall_clock_last_sent = now + def _on_assign(self, _, topic_partitions: List[TopicPartition]): """ Assign new topic partitions to consumer and state. diff --git a/quixstreams/checkpointing/checkpoint.py b/quixstreams/checkpointing/checkpoint.py index 7bdb09044..0dc0c4bdb 100644 --- a/quixstreams/checkpointing/checkpoint.py +++ b/quixstreams/checkpointing/checkpoint.py @@ -1,7 +1,8 @@ import logging import time from abc import abstractmethod -from typing import Dict, Tuple +from types import MappingProxyType +from typing import Dict, Mapping, Tuple from confluent_kafka import KafkaException, TopicPartition @@ -55,6 +56,15 @@ def __init__( self._commit_every = commit_every self._total_offsets_processed = 0 + @property + def tp_offsets(self) -> Mapping[Tuple[str, int], int]: + """ + Read-only view of processed (but not yet committed) offsets in the current checkpoint. + + :return: a read-only mapping {(topic, partition): last_processed_offset} + """ + return MappingProxyType(self._tp_offsets) + def expired(self) -> bool: """ Returns `True` if checkpoint deadline has expired OR diff --git a/quixstreams/core/stream/stream.py b/quixstreams/core/stream/stream.py index f538f5307..d472b9379 100644 --- a/quixstreams/core/stream/stream.py +++ b/quixstreams/core/stream/stream.py @@ -249,11 +249,21 @@ def add_update( return self._add(update_func) @overload - def add_transform(self, func: TransformCallback, *, expand: Literal[False] = False): + def add_transform( + self, + func: TransformCallback, + *, + expand: Literal[False] = False, + ): pass @overload - def add_transform(self, func: TransformExpandedCallback, *, expand: Literal[True]): + def add_transform( + self, + func: TransformExpandedCallback, + *, + expand: Literal[True], + ): pass def add_transform( diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index 53e90c767..946d05d0b 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -1696,6 +1696,10 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame": stream=merged_stream, stream_id=merged_stream_id ) + def concat_wall_clock(self, stream: Stream) -> "StreamingDataFrame": + self._registry.register_wall_clock(self, stream) + return self.__dataframe_clone__(stream=self.stream.merge(stream)) + def join_asof( self, right: "StreamingDataFrame", diff --git a/quixstreams/dataframe/registry.py b/quixstreams/dataframe/registry.py index dd7138e0b..0e2a187c9 100644 --- a/quixstreams/dataframe/registry.py +++ b/quixstreams/dataframe/registry.py @@ -22,6 +22,7 @@ class DataFrameRegistry: def __init__(self) -> None: self._registry: dict[str, Stream] = {} + self._wall_clock_registry: dict[Stream, tuple[str, ...]] = {} self._topics: list[Topic] = [] self._repartition_origins: set[str] = set() self._topics_to_stream_ids: dict[str, set[str]] = {} @@ -69,6 +70,12 @@ def register_root( self._topics.append(topic) self._registry[topic.name] = dataframe.stream + def register_wall_clock( + self, dataframe: "StreamingDataFrame", stream: Stream + ) -> None: + # Store the topic names as an immutable tuple for stable typing + self._wall_clock_registry[stream] = tuple(t.name for t in dataframe.topics) + def register_groupby( self, source_sdf: "StreamingDataFrame", @@ -123,6 +130,17 @@ def compose_all( executors[topic] = root_executors[root_stream] return executors + def compose_wall_clock(self) -> list[tuple[tuple[str, ...], VoidExecutor]]: + """ + Compose all wall clock Streams and return executors keyed by stream_id. + Returns mapping: {stream_id: (topics, executor)} + """ + executors = [] + for root_stream, topics in self._wall_clock_registry.items(): + root_executors = root_stream.compose() + executors.append((topics, root_executors[root_stream])) + return executors + def register_stream_id(self, stream_id: str, topic_names: list[str]): """ Register a mapping between the stream_id and topic names.