-
Notifications
You must be signed in to change notification settings - Fork 0
Know: Design Thoughts
We propose here two simple but powerful interfaces for the problem of reading and writing multi-channel streams.
Here we view the stream as an iterable of "slabs" of data. A slab is the data for a given interval of time. A sequence of slabs might look like this:
{'bt': 1000, 'tt': 1100, 'switch': 'off', 'audio_bytes': <bytes>, ...}
...
{'bt': 2200, 'tt': 2250, 'switch': 'on', 'audio_bytes': <bytes>, 'video_bytes': <bytes>, ...}
...
where the keys represent "channels" of information that could be of any type (categorical, numerical,...) or nature (time index, state, sensor information).
The intervals of time are not necessarily regular in size or step, nor do the channels have to be fix -- though of course, any regularity is good to know and allows for some optimizations and code simplifications. On the other hand, two useful conventions might be:
- the data carried by a slab is relevant to that interval of time in some well defined way
- the corresponding intervals of time are strictly increasing in some well defined interval ordering metric
The role of SlabsIter is both to source the stream and specify what to do with it, computing "stuff" on the way (in effect, creating further stream channels) and ending up with some target data services (storage, visualization, etc.).
We can view the live signal source as publishers and services (e.g. storage, visualization) that consume the data as subscribers.
This was the initial direction our architectural thinking took (see Appendix: Thoughts on live_src storage interface options and Appendix: Thoughts on live_src services).
Though the publisher-subscriber pattern can be useful here, the way SlabsIter does it is to consider the whole system as connected function of a DAG (computational Directed Acyclic Graph).
This DAG view unifies both publishers, subscribers, and any intermediates as functions. The root functions are "stream readers" (or "cursor functions") that each either provide the next piece of data for the channel(s) they control, or a sentinel indicating that there's no new data. Together, what these root functions provide is a "slab". The other functions could be considered to be the services, or simply helper functions that enable the flow information that ends up in a service.
Allow one to use the form
streams[channel_index, time_index]
for reading and writing streams.
Though one could stretch the use of SlabsIter far, using buffers and such, and indeed, this iterative process is what may be happening behind the scenes -- but for some use cases, being able to address channels and time directly (and simply) will lead to cleaner code.
SlabsIter is useful in when we can think of the process from an iterative one-slab-at-a-time flow.
Think imperative programming within a same iteration time-slice.
FlowMatrix is useful when it's more convenient to think of the process as a set of functions connecting (channels × interval) datas -- this makes it easier to express relations that would otherwise require some more complex buffer engineering to create computational relations in time.
More like declarative programming within a (possibly larger) time frame.
Takes care calling all of these in when entering.
contexts = Contexts(live_src, store)
with contexts:
live_src # is in the enter now! -- don't even need to do a contexts.live_src!Now available (as ContextFanout) in i2.multi_object.
The following assumes live_src is some kind of container (probably an iterable or iterator) that provides the connection to the live data we want to store.
If live_src is a mapping, we're just copying data from live_src to store, so the appropriate instruction would be:
store.update(live_src)live_src doesn't have to be a mapping for update to work.
An iterable of (key, value) pairs would suffice.
So one solution is to transform live_src to be such a key-value iterable then use update.
Transforming a stream into a mapping is here only meant to service the store's concern.
We shouldn't have live_src adapt itself.
Either we adapt the store (see later) or, at the very least, we do this work in the middleware:
from typing import Iterable, Callable, Any, Tuple
Items2Kvs = Callable[[Iterable[Item]], Iterable[Tuple[KT, VT]]]
Items2Kvs.__doc__ = "Function transforming an iterable of items to an iterable of (key, value) pairs"
items2kvs: Items2Kvs = enumerate # a nice default for any stream
live_kvs = items2kvs(live_src)
store.update(live_kvs) Tip: If you have a (single-to-single) item2kv function, you can "iterize" it like this:
items2kvs = functools.partial(map, item2kv)
Another useful Items2Kvs:
from time import time, sleep
def add_time(iterable):
for item in iterable:
yield time(), item
# Try it with
def add_delay(iterable, pre_delay_s=0.5, post_delay_s=0.5):
for item in iterable:
sleep(pre_delay_s)
yield item
sleep(post_delay_s)
it = add_time(add_delay(range(5)))
for x in it:
print(x)If instead we adapted the store to receive a stream of items,
it would make sense for the store to have an extend method
(see dol.appendable for tools to do that):
store.extend(live_src)Critique: An extend is never destructive. If updating store with the live_src feed could have the effect
of replacing some values of existing keys, extend may be less appropriate.
How about update here too?
Both mappings and sets have an update, and they both are able to "replace" in some sense, but lines are a bit blurred there so we might avoid that.
One could also consider store += live_src, but again, + is ambiguous, so is this a good option?
In the context of live stream storage, we usually index by strictly monotonous time, so we shouldn't ever have to replace anything in store.
For that reason, extend might be appropriate enough.
Another option, that would hide the details of "is it a mapping or a non-mapping iterable"?
# add a `__call__` to `store` and do:
for slab in live_src:
store(slab)
# i.e. this pattern: map(store, live_src)Storing a live data stream is only one example of what we might want to do with the live data: One amongst possibly several other live data services. How should the more general setting be organized?
Considerations:
- Should
live_srcbe an object with multiple reading services (services pull data from it), or an iterable that's pushed to the services. - Should the consumption be a parallel/async thing, or serial?
- Should services be able to "write" on
live_srcas well? (It's important to note: We don't mean write on the buffer that the original sources (sensors) wrote on. Thelive_srcis a wrapper of the original streamed data offering the possibility to write in a separate space (but offering a uniform view of both raw stream data, and new streams.
Consider the following code:
from dataclasses import dataclass
from typing import Iterable, Callable
@dataclass
class MultiService:
services: Iterable[Callable]
def __call__(self, *args, **kwargs):
for service in self.services:
service(*args, **kwargs)This could be used
- To push live data items to all services in one call: Here, a
MultiServiceinstance would be called in a loop - To give all services the live object from which they'd pull data according to their own rules: Here, each service would be called once
- services could be blocking or detached
Now available (as FuncFanout) in i2.multi_object.
A multi_streams object can be the layer we talk to to get our stream data.
Perhaps we should separate the way we consume the streams to generate slabs (walk) over:
slabs = walk(multi_streams)
for slab in slabs:
for service in services:
service(slab)In some cases though, we may want to have the services decide how and when they need to read from the streams. They may also need to write some (different) streams of their own. In this case we may want to just launch a bunch of non blocking services that
process_info = [launch(service, multi_streams) for service in services]
monitor(process_info) # to monitor and possibly kill, the processes