diff --git a/strax/context.py b/strax/context.py index 152eef795..ded55ace5 100644 --- a/strax/context.py +++ b/strax/context.py @@ -1545,6 +1545,15 @@ def get_iter( # (otherwise potentially overwritten in temp-plugin) targets_list = targets + if processor is None: + processor = list(self.processors)[0] + + if isinstance(processor, str): + processor = self.processors[processor] + + if not hasattr(processor, "iter"): + raise ValueError("Processors must implement a iter methed.") + is_superrun = run_id.startswith("_") # If multiple targets of the same kind, create a MergeOnlyPlugin @@ -1557,7 +1566,7 @@ def get_iter( p = type(temp_name, (strax.MergeOnlyPlugin,), dict(depends_on=tuple(targets))) self.register(p) targets = (temp_name,) - elif not allow_multiple: + elif not allow_multiple or processor is strax.SingleThreadProcessor: raise RuntimeError("Cannot automerge different data kinds!") elif self.context_config["timeout"] > 7200 or ( self.context_config["allow_lazy"] and not self.context_config["allow_multiprocess"] @@ -1582,15 +1591,6 @@ def get_iter( if k.startswith("_temp"): del self._plugin_class_registry[k] - if processor is None: - processor = list(self.processors)[0] - - if isinstance(processor, str): - processor = self.processors[processor] - - if not hasattr(processor, "iter"): - raise ValueError("Processors must implement a iter methed.") - seen_a_chunk = False generator = processor( components, diff --git a/strax/processors/__init__.py b/strax/processors/__init__.py index 346fa5d35..f723a478a 100644 --- a/strax/processors/__init__.py +++ b/strax/processors/__init__.py @@ -8,7 +8,7 @@ from .single_thread import SingleThreadProcessor PROCESSORS = { - "default": ThreadedMailboxProcessor, + "default": SingleThreadProcessor, "threaded_mailbox": ThreadedMailboxProcessor, "single_thread": SingleThreadProcessor, } diff --git a/tests/test_core.py b/tests/test_core.py index ebbeabf16..cef201753 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -426,13 +426,15 @@ def test_allow_multiple(targets=("peaks", "records")): raise ValueError(f"{function} could run with allow_multiple") try: - mystrax.make(run_id=run_id, targets=targets) + mystrax.make(run_id=run_id, targets=targets, processor="threaded_mailbox") except RuntimeError: # Great, we shouldn't be allowed pass assert not mystrax.is_stored(run_id, "peaks") - mystrax.make(run_id=run_id, allow_multiple=True, targets=targets) + mystrax.make( + run_id=run_id, allow_multiple=True, targets=targets, processor="threaded_mailbox" + ) for t in targets: assert mystrax.is_stored(run_id, t) diff --git a/tests/test_inline_plugin.py b/tests/test_inline_plugin.py index 3f9759319..08a3f597a 100644 --- a/tests/test_inline_plugin.py +++ b/tests/test_inline_plugin.py @@ -56,6 +56,7 @@ def test_inline(self, **make_kwargs): run_id, targets, allow_multiple=True, + processor="threaded_mailbox", **make_kwargs, ) for target in list(st._plugin_class_registry.keys()): diff --git a/tests/test_loop_plugins.py b/tests/test_loop_plugins.py index df6765659..446ee8bc6 100644 --- a/tests/test_loop_plugins.py +++ b/tests/test_loop_plugins.py @@ -40,6 +40,7 @@ def _loop_test_inner( target="added_thing", time_selection="fully_contained", force_value_error=False, + processor=None, ): """Test loop plugins for random data. For this test we are going to setup two plugins that will be looped over and combined into a loop plugin (depending on the target, this may be a multi @@ -161,7 +162,7 @@ def compute_loop(self, big_kinda_data, small_kinda_data): st.register((BigThing, SmallThing, AddBigToSmall, AddBigToSmallMultiOutput)) # Make small thing in order to allow re-chunking - st.make(run_id="some_run", targets="small_thing") + st.make(run_id="some_run", targets="small_thing", processor=processor) # Make the loop plugin result = st.get_array(run_id="some_run", targets=target) @@ -212,16 +213,18 @@ def test_loop_plugin_multi_output( ) @settings(deadline=None) @example(big_data=np.array([], dtype=full_dt_dtype), nchunks=2) -def test_value_error_for_loop_plugin(big_data, nchunks): +def test_error_for_loop_plugin(big_data, nchunks): """Make sure that we are are getting the right ValueError.""" - try: - _loop_test_inner(big_data, nchunks, force_value_error=True) - raise RuntimeError( - "did not run into ValueError despite the fact we are having multiple none-type chunks" - ) - except ValueError: - # Good we got the ValueError we wanted - pass + for error, processor in zip([RuntimeError, ValueError], ["single_thread", "threaded_mailbox"]): + try: + _loop_test_inner(big_data, nchunks, force_value_error=True, processor=processor) + raise RuntimeError( + f"Did not run into {error.__name__} despite the fact " + "we are having multiple none-type chunks" + ) + except error: + # Good we got the Error we wanted + pass @given( diff --git a/tests/test_mongo_frontend.py b/tests/test_mongo_frontend.py index 518f75cc7..c939e2522 100644 --- a/tests/test_mongo_frontend.py +++ b/tests/test_mongo_frontend.py @@ -149,7 +149,9 @@ def test_allow_incomplete(self): self.log.info(f"Starting with empty db {self.chunk_summary}") # Get the iterator separately and complete with "next(iterator) - iterator = self.st.get_iter(self.test_run_id, self.mongo_target) + iterator = self.st.get_iter( + self.test_run_id, self.mongo_target, processor="threaded_mailbox" + ) self.log.info(f"Got iterator, still no data?: {self.chunk_summary}") # Chunk 0