From ae4c868ca31c54f4ed37fecbe5233e53c3270604 Mon Sep 17 00:00:00 2001 From: dachengx Date: Sun, 13 Oct 2024 16:03:50 -0500 Subject: [PATCH 1/5] Set `SingleThreadProcessor` as the default processor --- strax/processors/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/strax/processors/__init__.py b/strax/processors/__init__.py index 346fa5d35..f723a478a 100644 --- a/strax/processors/__init__.py +++ b/strax/processors/__init__.py @@ -8,7 +8,7 @@ from .single_thread import SingleThreadProcessor PROCESSORS = { - "default": ThreadedMailboxProcessor, + "default": SingleThreadProcessor, "threaded_mailbox": ThreadedMailboxProcessor, "single_thread": SingleThreadProcessor, } From 345e706c15b297bd22e6f942ac5854dac85c3d97 Mon Sep 17 00:00:00 2001 From: dachengx Date: Wed, 16 Oct 2024 10:17:29 -0500 Subject: [PATCH 2/5] Disallow multiple `data_kind`s processing with single thread processor --- strax/context.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/strax/context.py b/strax/context.py index 152eef795..ded55ace5 100644 --- a/strax/context.py +++ b/strax/context.py @@ -1545,6 +1545,15 @@ def get_iter( # (otherwise potentially overwritten in temp-plugin) targets_list = targets + if processor is None: + processor = list(self.processors)[0] + + if isinstance(processor, str): + processor = self.processors[processor] + + if not hasattr(processor, "iter"): + raise ValueError("Processors must implement a iter methed.") + is_superrun = run_id.startswith("_") # If multiple targets of the same kind, create a MergeOnlyPlugin @@ -1557,7 +1566,7 @@ def get_iter( p = type(temp_name, (strax.MergeOnlyPlugin,), dict(depends_on=tuple(targets))) self.register(p) targets = (temp_name,) - elif not allow_multiple: + elif not allow_multiple or processor is strax.SingleThreadProcessor: raise RuntimeError("Cannot automerge different data kinds!") elif self.context_config["timeout"] > 7200 or ( self.context_config["allow_lazy"] and not self.context_config["allow_multiprocess"] @@ -1582,15 +1591,6 @@ def get_iter( if k.startswith("_temp"): del self._plugin_class_registry[k] - if processor is None: - processor = list(self.processors)[0] - - if isinstance(processor, str): - processor = self.processors[processor] - - if not hasattr(processor, "iter"): - raise ValueError("Processors must implement a iter methed.") - seen_a_chunk = False generator = processor( components, From 97c14699ab339681991a9f088093dc0f021e28e5 Mon Sep 17 00:00:00 2001 From: dachengx Date: Wed, 16 Oct 2024 14:56:19 -0500 Subject: [PATCH 3/5] Fix bug --- tests/test_loop_plugins.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/tests/test_loop_plugins.py b/tests/test_loop_plugins.py index df6765659..446ee8bc6 100644 --- a/tests/test_loop_plugins.py +++ b/tests/test_loop_plugins.py @@ -40,6 +40,7 @@ def _loop_test_inner( target="added_thing", time_selection="fully_contained", force_value_error=False, + processor=None, ): """Test loop plugins for random data. For this test we are going to setup two plugins that will be looped over and combined into a loop plugin (depending on the target, this may be a multi @@ -161,7 +162,7 @@ def compute_loop(self, big_kinda_data, small_kinda_data): st.register((BigThing, SmallThing, AddBigToSmall, AddBigToSmallMultiOutput)) # Make small thing in order to allow re-chunking - st.make(run_id="some_run", targets="small_thing") + st.make(run_id="some_run", targets="small_thing", processor=processor) # Make the loop plugin result = st.get_array(run_id="some_run", targets=target) @@ -212,16 +213,18 @@ def test_loop_plugin_multi_output( ) @settings(deadline=None) @example(big_data=np.array([], dtype=full_dt_dtype), nchunks=2) -def test_value_error_for_loop_plugin(big_data, nchunks): +def test_error_for_loop_plugin(big_data, nchunks): """Make sure that we are are getting the right ValueError.""" - try: - _loop_test_inner(big_data, nchunks, force_value_error=True) - raise RuntimeError( - "did not run into ValueError despite the fact we are having multiple none-type chunks" - ) - except ValueError: - # Good we got the ValueError we wanted - pass + for error, processor in zip([RuntimeError, ValueError], ["single_thread", "threaded_mailbox"]): + try: + _loop_test_inner(big_data, nchunks, force_value_error=True, processor=processor) + raise RuntimeError( + f"Did not run into {error.__name__} despite the fact " + "we are having multiple none-type chunks" + ) + except error: + # Good we got the Error we wanted + pass @given( From c16e2fca679ffa5794658a827b1f9eaa6bd29aac Mon Sep 17 00:00:00 2001 From: dachengx Date: Wed, 16 Oct 2024 15:09:45 -0500 Subject: [PATCH 4/5] Fix bugs --- tests/test_core.py | 6 ++++-- tests/test_inline_plugin.py | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index ebbeabf16..cef201753 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -426,13 +426,15 @@ def test_allow_multiple(targets=("peaks", "records")): raise ValueError(f"{function} could run with allow_multiple") try: - mystrax.make(run_id=run_id, targets=targets) + mystrax.make(run_id=run_id, targets=targets, processor="threaded_mailbox") except RuntimeError: # Great, we shouldn't be allowed pass assert not mystrax.is_stored(run_id, "peaks") - mystrax.make(run_id=run_id, allow_multiple=True, targets=targets) + mystrax.make( + run_id=run_id, allow_multiple=True, targets=targets, processor="threaded_mailbox" + ) for t in targets: assert mystrax.is_stored(run_id, t) diff --git a/tests/test_inline_plugin.py b/tests/test_inline_plugin.py index 3f9759319..08a3f597a 100644 --- a/tests/test_inline_plugin.py +++ b/tests/test_inline_plugin.py @@ -56,6 +56,7 @@ def test_inline(self, **make_kwargs): run_id, targets, allow_multiple=True, + processor="threaded_mailbox", **make_kwargs, ) for target in list(st._plugin_class_registry.keys()): From db7b1c7181bdda6e2738c60712aeec8657286055 Mon Sep 17 00:00:00 2001 From: dachengx Date: Wed, 16 Oct 2024 15:36:21 -0500 Subject: [PATCH 5/5] Debug --- tests/test_mongo_frontend.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_mongo_frontend.py b/tests/test_mongo_frontend.py index 518f75cc7..c939e2522 100644 --- a/tests/test_mongo_frontend.py +++ b/tests/test_mongo_frontend.py @@ -149,7 +149,9 @@ def test_allow_incomplete(self): self.log.info(f"Starting with empty db {self.chunk_summary}") # Get the iterator separately and complete with "next(iterator) - iterator = self.st.get_iter(self.test_run_id, self.mongo_target) + iterator = self.st.get_iter( + self.test_run_id, self.mongo_target, processor="threaded_mailbox" + ) self.log.info(f"Got iterator, still no data?: {self.chunk_summary}") # Chunk 0