Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions strax/processors/post_office.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ def register_producer(
else:
# Multi-output producer, recurse
for sub_topic in topic:
self._multi_output_topics[sub_topic] = topic
# if sub_topic is already registered as loader,
# we can not handle it as _multi_output_topics
if sub_topic not in registered:
self._multi_output_topics[sub_topic] = topic
self.register_producer(iterator, sub_topic)
return
assert isinstance(topic, str)
Expand Down Expand Up @@ -264,7 +266,10 @@ def _fetch_new(self, topic):
# This is what our caller wants
desired_sub_msg = sub_msg
log.debug(f"Got submessage {sub_msg} for sub topic {sub_msg_topic}")
self._ack_msg_produced(sub_msg, sub_msg_topic)
# sub_msg_topic not in self._multi_output_topics means
# it already has a loader (as producer)
if sub_msg_topic in self._multi_output_topics:
self._ack_msg_produced(sub_msg, sub_msg_topic)
return desired_sub_msg

def _ack_msg_produced(self, msg, topic):
Expand Down