From e16570c88fdee41fcf70086de8fe3dc38bf255be Mon Sep 17 00:00:00 2001 From: dachengx Date: Sun, 6 Jul 2025 11:53:44 -0500 Subject: [PATCH 1/2] Do not assign `_multi_output_topics` when the topic is from a loader --- strax/processors/post_office.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/strax/processors/post_office.py b/strax/processors/post_office.py index 306d8dbc9..8ff8887a8 100644 --- a/strax/processors/post_office.py +++ b/strax/processors/post_office.py @@ -132,8 +132,10 @@ def register_producer( else: # Multi-output producer, recurse for sub_topic in topic: - self._multi_output_topics[sub_topic] = topic + # if sub_topic is already registered as loader, + # we can not handle it as _multi_output_topics if sub_topic not in registered: + self._multi_output_topics[sub_topic] = topic self.register_producer(iterator, sub_topic) return assert isinstance(topic, str) @@ -264,7 +266,10 @@ def _fetch_new(self, topic): # This is what our caller wants desired_sub_msg = sub_msg log.debug(f"Got submessage {sub_msg} for sub topic {sub_msg_topic}") - self._ack_msg_produced(sub_msg, sub_msg_topic) + # sub_msg_topic not in self._multi_output_topics means + # it already has a loader (as producer) + if sub_msg_topic not in self._multi_output_topics: + self._ack_msg_produced(sub_msg, sub_msg_topic) return desired_sub_msg def _ack_msg_produced(self, msg, topic): From b86cc45abbaa6b3553a05a92a6cebcc2e803057e Mon Sep 17 00:00:00 2001 From: dachengx Date: Sun, 6 Jul 2025 12:39:10 -0500 Subject: [PATCH 2/2] Bug fix --- strax/processors/post_office.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/strax/processors/post_office.py b/strax/processors/post_office.py index 8ff8887a8..42866f8be 100644 --- a/strax/processors/post_office.py +++ b/strax/processors/post_office.py @@ -268,7 +268,7 @@ def _fetch_new(self, topic): log.debug(f"Got submessage {sub_msg} for sub topic {sub_msg_topic}") # sub_msg_topic not in self._multi_output_topics means # it already has a loader (as producer) - if sub_msg_topic not in self._multi_output_topics: + if sub_msg_topic in self._multi_output_topics: self._ack_msg_produced(sub_msg, sub_msg_topic) return desired_sub_msg