Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,15 @@ def get_iter(
# (otherwise potentially overwritten in temp-plugin)
targets_list = targets

if processor is None:
processor = list(self.processors)[0]

if isinstance(processor, str):
processor = self.processors[processor]

if not hasattr(processor, "iter"):
raise ValueError("Processors must implement a iter methed.")

is_superrun = run_id.startswith("_")

# If multiple targets of the same kind, create a MergeOnlyPlugin
Expand All @@ -1557,7 +1566,7 @@ def get_iter(
p = type(temp_name, (strax.MergeOnlyPlugin,), dict(depends_on=tuple(targets)))
self.register(p)
targets = (temp_name,)
elif not allow_multiple:
elif not allow_multiple or processor is strax.SingleThreadProcessor:
raise RuntimeError("Cannot automerge different data kinds!")
elif self.context_config["timeout"] > 7200 or (
self.context_config["allow_lazy"] and not self.context_config["allow_multiprocess"]
Expand All @@ -1582,15 +1591,6 @@ def get_iter(
if k.startswith("_temp"):
del self._plugin_class_registry[k]

if processor is None:
processor = list(self.processors)[0]

if isinstance(processor, str):
processor = self.processors[processor]

if not hasattr(processor, "iter"):
raise ValueError("Processors must implement a iter methed.")

seen_a_chunk = False
generator = processor(
components,
Expand Down
2 changes: 1 addition & 1 deletion strax/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .single_thread import SingleThreadProcessor

PROCESSORS = {
"default": ThreadedMailboxProcessor,
"default": SingleThreadProcessor,
"threaded_mailbox": ThreadedMailboxProcessor,
"single_thread": SingleThreadProcessor,
}
6 changes: 4 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,15 @@ def test_allow_multiple(targets=("peaks", "records")):
raise ValueError(f"{function} could run with allow_multiple")

try:
mystrax.make(run_id=run_id, targets=targets)
mystrax.make(run_id=run_id, targets=targets, processor="threaded_mailbox")
except RuntimeError:
# Great, we shouldn't be allowed
pass

assert not mystrax.is_stored(run_id, "peaks")
mystrax.make(run_id=run_id, allow_multiple=True, targets=targets)
mystrax.make(
run_id=run_id, allow_multiple=True, targets=targets, processor="threaded_mailbox"
)

for t in targets:
assert mystrax.is_stored(run_id, t)
Expand Down
1 change: 1 addition & 0 deletions tests/test_inline_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_inline(self, **make_kwargs):
run_id,
targets,
allow_multiple=True,
processor="threaded_mailbox",
**make_kwargs,
)
for target in list(st._plugin_class_registry.keys()):
Expand Down
23 changes: 13 additions & 10 deletions tests/test_loop_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _loop_test_inner(
target="added_thing",
time_selection="fully_contained",
force_value_error=False,
processor=None,
):
"""Test loop plugins for random data. For this test we are going to setup two plugins that will
be looped over and combined into a loop plugin (depending on the target, this may be a multi
Expand Down Expand Up @@ -161,7 +162,7 @@ def compute_loop(self, big_kinda_data, small_kinda_data):
st.register((BigThing, SmallThing, AddBigToSmall, AddBigToSmallMultiOutput))

# Make small thing in order to allow re-chunking
st.make(run_id="some_run", targets="small_thing")
st.make(run_id="some_run", targets="small_thing", processor=processor)

# Make the loop plugin
result = st.get_array(run_id="some_run", targets=target)
Expand Down Expand Up @@ -212,16 +213,18 @@ def test_loop_plugin_multi_output(
)
@settings(deadline=None)
@example(big_data=np.array([], dtype=full_dt_dtype), nchunks=2)
def test_value_error_for_loop_plugin(big_data, nchunks):
def test_error_for_loop_plugin(big_data, nchunks):
"""Make sure that we are are getting the right ValueError."""
try:
_loop_test_inner(big_data, nchunks, force_value_error=True)
raise RuntimeError(
"did not run into ValueError despite the fact we are having multiple none-type chunks"
)
except ValueError:
# Good we got the ValueError we wanted
pass
for error, processor in zip([RuntimeError, ValueError], ["single_thread", "threaded_mailbox"]):
try:
_loop_test_inner(big_data, nchunks, force_value_error=True, processor=processor)
raise RuntimeError(
f"Did not run into {error.__name__} despite the fact "
"we are having multiple none-type chunks"
)
except error:
# Good we got the Error we wanted
pass


@given(
Expand Down
4 changes: 3 additions & 1 deletion tests/test_mongo_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ def test_allow_incomplete(self):

self.log.info(f"Starting with empty db {self.chunk_summary}")
# Get the iterator separately and complete with "next(iterator)
iterator = self.st.get_iter(self.test_run_id, self.mongo_target)
iterator = self.st.get_iter(
self.test_run_id, self.mongo_target, processor="threaded_mailbox"
)

self.log.info(f"Got iterator, still no data?: {self.chunk_summary}")
# Chunk 0
Expand Down