From 5fb5202c9b033d58b60fccd8ff37a910cd51387a Mon Sep 17 00:00:00 2001 From: christopher Date: Sun, 10 Jun 2018 17:57:48 -0400 Subject: [PATCH 1/7] ENH: add flatfield pipeline --- xpdan/pipelines/flatfield.py | 106 +++++++++++++++++++++++++++++++++++ xpdan/tests/utils.py | 2 +- 2 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 xpdan/pipelines/flatfield.py diff --git a/xpdan/pipelines/flatfield.py b/xpdan/pipelines/flatfield.py new file mode 100644 index 0000000..21f8526 --- /dev/null +++ b/xpdan/pipelines/flatfield.py @@ -0,0 +1,106 @@ +import os + +from bluesky.callbacks.broker import LiveImage +from shed.translation import FromEventStream +from xpdan.callbacks import StartStopCallback +from xpdan.pipelines.pipeline_utils import (_timestampstr, + Filler) +from xpdconf.conf import glbl_dict +from xpdconf.conf import XPD_SHUTTER_CONF +from xpdtools.calib import _save_calib_param +from xpdtools.pipelines.flatfield import * + +image_name = glbl_dict['image_field'] +shutter_name = glbl_dict['shutter_field'] +db = glbl_dict['exp_db'] +calibration_md_folder = {'folder': 'xpdAcq_calib_info.yml'} + +filler = Filler(db=db) +# Build the general pipeline from the raw_pipeline +raw_source = Stream(stream_name='raw source') + +# Fill the raw event stream +source = (raw_source + # Filler returns None for resource/datum data + .starmap(filler).filter(lambda x: x is not None)) + +# Get all the documents +start_docs = FromEventStream('start', (), source) +descriptor_docs = FromEventStream('descriptor', (), source, + event_stream_name='primary') +event_docs = FromEventStream('event', (), source, event_stream_name='primary') +all_docs = (event_docs + .combine_latest(start_docs, descriptor_docs, emit_on=0) + .starmap(lambda e, s, d: {'raw_event': e, 'raw_start': s, + 'raw_descriptor': d, + 'human_timestamp': _timestampstr( + s['time'])})) + +# If new calibration uid invalidate our current calibration cache +(FromEventStream('start', ('detector_calibration_client_uid',), source) + .unique(history=1) + .map(lambda x: geometry_img_shape.lossless_buffer.clear())) + +# Calibration information +(FromEventStream('start', ('bt_wavelength',), source) + .unique(history=1) + .connect(wavelength)) +(FromEventStream('start', ('dSpacing',), source) + .unique(history=1) + .connect(calibrant)) +(FromEventStream('start', ('detector',), source) + .unique(history=1) + .connect(detector)) + +(FromEventStream('start', (), source). + map(lambda x: 'detector_calibration_server_uid' in x). + connect(is_calibration_img)) +# Only pass through new calibrations (prevents us from recalculating cals) +(FromEventStream('start', ('calibration_md',), source). + unique(history=1). + connect(geo_input)) + +start_timestamp = FromEventStream('start', ('time',), source) + +# Clean out the cached darks and backgrounds on start +# so that this will run regardless of background/dark status +# note that we get the proper data (if it exists downstream) +start_docs.sink(lambda x: raw_background_dark.emit(0.0)) +start_docs.sink(lambda x: raw_background.emit(0.0)) +start_docs.sink(lambda x: raw_foreground_dark.emit(0.0)) + +# Shutter position +shutter_position = FromEventStream('event', ('data', shutter_name), raw_source) + +# Get foreground dark +((FromEventStream('event', ('data', image_name), raw_source) + .zip(shutter_position) + .filter(lambda x: x[1] == XPD_SHUTTER_CONF['close']) + .map(np.float32) + .connect(raw_foreground_dark))) + +# Get foreground +FromEventStream('event', ('seq_num',), source, stream_name='seq_num' + ).connect(img_counter) +(FromEventStream('event', ('data', image_name), source, principle=True, + stream_name='raw_foreground') + .zip(shutter_position) + .filter(lambda x: x[1] == XPD_SHUTTER_CONF['open']) + .map(np.float32) + .connect(raw_foreground)) + +# Save out calibration data to special place +h_timestamp = start_timestamp.map(_timestampstr) +(gen_geo_cal.pluck(0) + .zip_latest(h_timestamp) + .starsink(lambda x, y: _save_calib_param(x, y, os.path.join( + glbl_dict['config_base'], glbl_dict['calib_config_name'])))) + +raw_source.starsink(StartStopCallback()) + +ave_ff.map(np.nan_to_num).sink( + LiveImage('image', cmap='viridis', + limit_func=lambda x: (np.nanpercentile(x, .1), + np.nanpercentile(x, 99.9)), + # norm=SymLogNorm(.1), + window_title='percent off').update) diff --git a/xpdan/tests/utils.py b/xpdan/tests/utils.py index a4fb225..fbe396d 100644 --- a/xpdan/tests/utils.py +++ b/xpdan/tests/utils.py @@ -93,7 +93,7 @@ def insert_imgs(RE, reg, n, shape, save_dir, **kwargs): reg=reg) light_det = sim.SynSignalWithRegistry(name='pe1_image', func=lambda: (np.random.random( - shape) * 65535) .astype('uint16'), + shape) * 65535).astype('uint16'), reg=reg) beamtime_uid = str(uuid4()) base_md = dict(beamtime_uid=beamtime_uid, From f5fa56e3f0fb206ea28142fd0bb658062fdbddba Mon Sep 17 00:00:00 2001 From: christopher Date: Sun, 10 Jun 2018 19:09:22 -0400 Subject: [PATCH 2/7] news --- news/flatfield | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 news/flatfield diff --git a/news/flatfield b/news/flatfield new file mode 100644 index 0000000..d037671 --- /dev/null +++ b/news/flatfield @@ -0,0 +1,13 @@ +**Added:** + +* Flatfield data analysis pipeline + +**Changed:** None + +**Deprecated:** None + +**Removed:** None + +**Fixed:** None + +**Security:** None From 280c1dd9344481c04f61b4eecc045228d3ed46df Mon Sep 17 00:00:00 2001 From: christopher Date: Mon, 11 Jun 2018 16:00:43 -0400 Subject: [PATCH 3/7] FIX: pull from different streams --- xpdan/pipelines/flatfield.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/xpdan/pipelines/flatfield.py b/xpdan/pipelines/flatfield.py index 21f8526..3fc5e8a 100644 --- a/xpdan/pipelines/flatfield.py +++ b/xpdan/pipelines/flatfield.py @@ -69,23 +69,19 @@ start_docs.sink(lambda x: raw_background.emit(0.0)) start_docs.sink(lambda x: raw_foreground_dark.emit(0.0)) -# Shutter position -shutter_position = FromEventStream('event', ('data', shutter_name), raw_source) - # Get foreground dark -((FromEventStream('event', ('data', image_name), raw_source) - .zip(shutter_position) - .filter(lambda x: x[1] == XPD_SHUTTER_CONF['close']) +((FromEventStream('event', ('data', image_name), raw_source, + event_stream_name='dark') .map(np.float32) .connect(raw_foreground_dark))) # Get foreground -FromEventStream('event', ('seq_num',), source, stream_name='seq_num' +FromEventStream('event', ('seq_num',), source, stream_name='seq_num', + event_stream_name='primary' ).connect(img_counter) (FromEventStream('event', ('data', image_name), source, principle=True, + event_stream_name='primary', stream_name='raw_foreground') - .zip(shutter_position) - .filter(lambda x: x[1] == XPD_SHUTTER_CONF['open']) .map(np.float32) .connect(raw_foreground)) From 6b8e2cd3a0d4c39b88abb46526177deb1048d955 Mon Sep 17 00:00:00 2001 From: christopher Date: Mon, 11 Jun 2018 17:52:57 -0400 Subject: [PATCH 4/7] ENH: z_score vis --- xpdan/pipelines/flatfield.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/xpdan/pipelines/flatfield.py b/xpdan/pipelines/flatfield.py index 3fc5e8a..18586b3 100644 --- a/xpdan/pipelines/flatfield.py +++ b/xpdan/pipelines/flatfield.py @@ -9,6 +9,8 @@ from xpdconf.conf import XPD_SHUTTER_CONF from xpdtools.calib import _save_calib_param from xpdtools.pipelines.flatfield import * +from shed.translation import ToEventStream +from xpdtools.tools import z_score_image, overlay_mask image_name = glbl_dict['image_field'] shutter_name = glbl_dict['shutter_field'] @@ -100,3 +102,14 @@ np.nanpercentile(x, 99.9)), # norm=SymLogNorm(.1), window_title='percent off').update) +L = ave_ff.sink_to_list() +z_score = ( + pol_corrected_img. + combine_latest(binner, emit_on=0). + starmap(z_score_image, stream_name='z score'). + combine_latest(mask, emit_on=0).starmap(overlay_mask)) + +# Zscore +z_score_plot = ToEventStream(z_score, ('z_score',)).starsink( + LiveImage('z_score', cmap='viridis', window_title='z score', + limit_func=lambda im: (-2, 2)), stream_name='z score vis') \ No newline at end of file From 44909854df74aeb39a42770e67dcb476371df882 Mon Sep 17 00:00:00 2001 From: christopher Date: Sat, 30 Jun 2018 21:53:48 -0400 Subject: [PATCH 5/7] WIP: build saxs_waxs pipeline --- xpdan/pipelines/main.py | 191 +++++++++++++++++++++------------------- 1 file changed, 99 insertions(+), 92 deletions(-) diff --git a/xpdan/pipelines/main.py b/xpdan/pipelines/main.py index 3ae86cc..bf35ff4 100644 --- a/xpdan/pipelines/main.py +++ b/xpdan/pipelines/main.py @@ -1,35 +1,18 @@ import os import numpy as np -from bluesky.callbacks.broker import LiveImage -from shed.translation import FromEventStream, ToEventStream -from skbeam.io import save_output -from skbeam.io.fit2d import fit2d_save +from shed.translation import FromEventStream from streamz_ext import Stream -from tifffile import imsave +from xpdan.callbacks import StartStopCallback from xpdan.db_utils import query_background, query_dark, temporal_prox -from xpdan.formatters import render, clean_template -from xpdan.io import pdf_saver, dump_yml from xpdan.pipelines.pipeline_utils import (_timestampstr, - clear_combine_latest, Filler, - base_template) -from xpdan.callbacks import StartStopCallback + clear_combine_latest, Filler) from xpdconf.conf import glbl_dict from xpdtools.calib import _save_calib_param -# from xpdtools.pipelines.raw_pipeline import (geometry_img_shape, -# iq_comp, composition, wavelength, -# calibrant, detector, -# is_calibration_img, geo_input, -# raw_background_dark, -# raw_foreground_dark, img_counter, -# raw_foreground, gen_geo_cal, -# dark_corrected_foreground, q, -# mean, tth, mask, pdf, fq, sq, -# pol_corrected_img, raw_background) -from xpdtools.pipelines.raw_pipeline import * -from xpdtools.pipelines.raw_pipeline import (mask_setting, # noqa: F401 - ) -from xpdtools.tools import overlay_mask +from xpdtools.pipelines.raw_pipeline import make_pipeline, mask_setting + +pdf_pipeline = make_pipeline() +saxs_pipeline = make_pipeline() image_name = glbl_dict['image_field'] db = glbl_dict['exp_db'] @@ -62,43 +45,18 @@ 'human_timestamp': _timestampstr( s['time'])})) +# TODO: pull this from descriptor! # If new calibration uid invalidate our current calibration cache -(FromEventStream('start', ('detector_calibration_client_uid',), source) - .unique(history=1) - .map(lambda x: geometry_img_shape.lossless_buffer.clear())) - -# Clear composition every start document -(FromEventStream('start', (), source) - .sink(lambda x: clear_combine_latest(iq_comp, 1))) -FromEventStream('start', ('composition_string',), source).connect(composition) - -# Calibration information -(FromEventStream('start', ('bt_wavelength',), source) - .unique(history=1) - .connect(wavelength)) -(FromEventStream('start', ('dSpacing',), source) - .unique(history=1) - .connect(calibrant)) -(FromEventStream('start', ('detector',), source) - .unique(history=1) - .connect(detector)) - -(FromEventStream('start', (), source). - map(lambda x: 'detector_calibration_server_uid' in x). - connect(is_calibration_img)) -# Only pass through new calibrations (prevents us from recalculating cals) -(FromEventStream('start', ('calibration_md',), source). - unique(history=1). - connect(geo_input)) - +unique_geo = ( + FromEventStream('start', ('detector_calibration_client_uid',), source) + .unique(history=1) + ) +wavelength = (FromEventStream('start', ('bt_wavelength',), source) + .unique(history=1)) +composition = FromEventStream('start', ('composition_string',), source) +dspacing = FromEventStream('start', ('dSpacing',), source).unique(history=1) start_timestamp = FromEventStream('start', ('time',), source) - -# Clean out the cached darks and backgrounds on start -# so that this will run regardless of background/dark status -# note that we get the proper data (if it exists downstream) -start_docs.sink(lambda x: raw_background_dark.emit(0.0)) -start_docs.sink(lambda x: raw_background.emit(0.0)) -start_docs.sink(lambda x: raw_foreground_dark.emit(0.0)) +h_timestamp = start_timestamp.map(_timestampstr) bg_query = (start_docs.map(query_background, db=db)) bg_docs = (bg_query @@ -110,41 +68,90 @@ # Get bg dark bg_dark_query = (FromEventStream('start', (), bg_docs) - .map(query_dark, db=db) - ) -(FromEventStream('event', ('data', image_name), - bg_dark_query.map(lambda x: x[0].documents(fill=True)) - .flatten()).map(np.float32) - .connect(raw_background_dark)) - -# Get background -(FromEventStream('event', ('data', image_name), bg_docs).map(np.float32) - .connect(raw_background)) - -# Get foreground dark + .map(query_dark, db=db)) + fg_dark_query = (start_docs.map(query_dark, db=db)) fg_dark_query.filter(lambda x: x != [] and isinstance(x, list)).sink(print) fg_dark_query.filter(lambda x: x == []).sink(lambda x: print('No dark found!')) -(FromEventStream('event', ('data', image_name), - fg_dark_query.filter(lambda x: x != []) - .map(lambda x: x if not isinstance(x, list) else x[0]) - .map(lambda x: x.documents(fill=True)).flatten() - ).map(np.float32) - .connect(raw_foreground_dark)) - -# Get foreground -FromEventStream('event', ('seq_num',), source, stream_name='seq_num' - ).connect(img_counter) -(FromEventStream('event', ('data', image_name), source, principle=True, - stream_name='raw_foreground').map(np.float32) - .connect(raw_foreground)) - -# Save out calibration data to special place -h_timestamp = start_timestamp.map(_timestampstr) -(gen_geo_cal.pluck(0) - .zip_latest(h_timestamp) - .starsink(lambda x, y: _save_calib_param(x, y, os.path.join( - glbl_dict['config_base'], glbl_dict['calib_config_name'])))) + +for p, descriptor in zip([pdf_pipeline, saxs_pipeline], ['primary', 'saxs']): + unique_geo.map(lambda x: p['geometry_img_shape'].lossless_buffer.clear()) + + # Clear composition every start document + (start_docs + .sink(lambda x: clear_combine_latest(p['iq_comp'], 1))) + composition.connect(p['composition']) + + # Calibration information + (wavelength.connect(p['wavelength'])) + (dspacing.connect(p['calibrant'])) + + # TODO: pull from descriptor + (FromEventStream('start', ('detector',), source) + .unique(history=1) + .connect(p['detector'])) + + (FromEventStream('start', (), source). + map(lambda x: 'detector_calibration_server_uid' in x). + connect(p['is_calibration_img'])) + + # TODO: pull this from the descriptor + # Only pass through new calibrations (prevents us from recalculating cals) + (FromEventStream('start', ('calibration_md',), source). + unique(history=1). + connect(p['geo_input'])) + + # Clean out the cached darks and backgrounds on start + # so that this will run regardless of background/dark status + # note that we get the proper data (if it exists downstream) + start_docs.sink(lambda x: p['raw_background_dark'].emit(0.0)) + start_docs.sink(lambda x: p['raw_background'].emit(0.0)) + start_docs.sink(lambda x: p['raw_foreground_dark'].emit(0.0)) + + (FromEventStream('event', ('data', image_name), + bg_dark_query.map(lambda x: x[0].documents(fill=True)) + .flatten(), event_stream_name=descriptor).map(np.float32) + .connect(p['raw_background_dark'])) + + # Get background + (FromEventStream('event', ('data', image_name), bg_docs, + event_stream_name=descriptor).map(np.float32) + .connect(p['raw_background'])) + + # Get foreground dark + (FromEventStream('event', ('data', image_name), + fg_dark_query.filter(lambda x: x != []) + .map(lambda x: x if not isinstance(x, list) else x[0]) + .map(lambda x: x.documents(fill=True)).flatten(), + event_stream_name=descriptor + ).map(np.float32) + .connect(p['raw_foreground_dark'])) + + # Pull dark live if possible + # NOTE: THIS ASSUMES THAT THE DARK STREAMS ARE NAMED saxs_dark + (FromEventStream('event', ('data', image_name), + event_stream_name='{}_{}'.format(descriptor, 'dark') + ) + .map(np.float32) + .connect(p['raw_foreground_dark'])) + + # Get foreground + FromEventStream('event', ('seq_num',), source, stream_name='seq_num', + event_stream_name=descriptor + ).connect(p['img_counter']) + (FromEventStream('event', ('data', image_name), source, principle=True, + stream_name='raw_foreground', + event_stream_name=descriptor).map(np.float32) + .connect(p['raw_foreground'])) + + # TODO: save this in such a way that we can get at it for each calibration + # Save out calibration data to special place + (p['gen_geo_cal'].pluck(0) + .zip_latest(h_timestamp) + .starsink(lambda x, y: _save_calib_param(x, y, os.path.join( + glbl_dict['config_base'], + '{}_{}'.format(descriptor, glbl_dict['calib_config_name']))))) raw_source.starsink(StartStopCallback()) -# raw_source.visualize(os.path.expanduser('~/mystream.png'), source_node=True) +# raw_source.visualize(os.path.expanduser('~/mystream_new.png'), +# source_node=True) From 6741097cdf58605dad150cd41ec8508c04b0eb57 Mon Sep 17 00:00:00 2001 From: christopher Date: Tue, 10 Jul 2018 13:07:06 -0400 Subject: [PATCH 6/7] WIP: black --- xpdan/pipelines/main.py | 251 +++++++++++++++++++++++++--------------- xpdan/pipelines/save.py | 5 + 2 files changed, 161 insertions(+), 95 deletions(-) diff --git a/xpdan/pipelines/main.py b/xpdan/pipelines/main.py index bf35ff4..6442b66 100644 --- a/xpdan/pipelines/main.py +++ b/xpdan/pipelines/main.py @@ -5,8 +5,11 @@ from streamz_ext import Stream from xpdan.callbacks import StartStopCallback from xpdan.db_utils import query_background, query_dark, temporal_prox -from xpdan.pipelines.pipeline_utils import (_timestampstr, - clear_combine_latest, Filler) +from xpdan.pipelines.pipeline_utils import ( + _timestampstr, + clear_combine_latest, + Filler, +) from xpdconf.conf import glbl_dict from xpdtools.calib import _save_calib_param from xpdtools.pipelines.raw_pipeline import make_pipeline, mask_setting @@ -14,143 +17,201 @@ pdf_pipeline = make_pipeline() saxs_pipeline = make_pipeline() -image_name = glbl_dict['image_field'] -db = glbl_dict['exp_db'] -calibration_md_folder = {'folder': 'xpdAcq_calib_info.yml'} +image_name = glbl_dict["image_field"] +db = glbl_dict["exp_db"] +calibration_md_folder = {"folder": "xpdAcq_calib_info.yml"} filler = Filler(db=db) # Build the general pipeline from the raw_pipeline -raw_source = Stream(stream_name='raw source') +raw_source = Stream(stream_name="raw source") # TODO: change this when new dark logic comes # Check that the data isn't a dark -dk_uid = (FromEventStream('start', (), upstream=raw_source) - .map(lambda x: 'sc_dk_field_uid' in x)) +dk_uid = FromEventStream("start", (), upstream=raw_source).map( + lambda x: "sc_dk_field_uid" in x +) # Fill the raw event stream -source = (raw_source - .combine_latest(dk_uid) - .filter(lambda x: x[1]) - .pluck(0) - # Filler returns None for resource/datum data - .starmap(filler).filter(lambda x: x is not None)) +source = ( + raw_source.combine_latest(dk_uid) + .filter(lambda x: x[1]) + .pluck(0) + # Filler returns None for resource/datum data + .starmap(filler) + .filter(lambda x: x is not None) +) # Get all the documents -start_docs = FromEventStream('start', (), source) -descriptor_docs = FromEventStream('descriptor', (), source, - event_stream_name='primary') -event_docs = FromEventStream('event', (), source, event_stream_name='primary') -all_docs = (event_docs - .combine_latest(start_docs, descriptor_docs, emit_on=0) - .starmap(lambda e, s, d: {'raw_event': e, 'raw_start': s, - 'raw_descriptor': d, - 'human_timestamp': _timestampstr( - s['time'])})) +start_docs = FromEventStream("start", (), source) +descriptor_docs = FromEventStream( + "descriptor", (), source, event_stream_name="primary" +) +event_docs = FromEventStream("event", (), source, event_stream_name="primary") +all_docs = event_docs.combine_latest( + start_docs, descriptor_docs, emit_on=0 +).starmap( + lambda e, s, d: { + "raw_event": e, + "raw_start": s, + "raw_descriptor": d, + "human_timestamp": _timestampstr(s["time"]), + } +) # TODO: pull this from descriptor! # If new calibration uid invalidate our current calibration cache -unique_geo = ( - FromEventStream('start', ('detector_calibration_client_uid',), source) - .unique(history=1) - ) -wavelength = (FromEventStream('start', ('bt_wavelength',), source) - .unique(history=1)) -composition = FromEventStream('start', ('composition_string',), source) -dspacing = FromEventStream('start', ('dSpacing',), source).unique(history=1) -start_timestamp = FromEventStream('start', ('time',), source) +unique_geo = FromEventStream( + "start", ("detector_calibration_client_uid",), source +).unique(history=1) +wavelength = FromEventStream("start", ("bt_wavelength",), source).unique( + history=1 +) +composition = FromEventStream("start", ("composition_string",), source) +dspacing = FromEventStream("start", ("dSpacing",), source).unique(history=1) +start_timestamp = FromEventStream("start", ("time",), source) h_timestamp = start_timestamp.map(_timestampstr) -bg_query = (start_docs.map(query_background, db=db)) -bg_docs = (bg_query - .zip(start_docs) - .starmap(temporal_prox) - .filter(lambda x: x != []) - .map(lambda x: x[0].documents(fill=True)) - .flatten()) +bg_query = start_docs.map(query_background, db=db) +bg_docs = ( + bg_query.zip(start_docs) + .starmap(temporal_prox) + .filter(lambda x: x != []) + .map(lambda x: x[0].documents(fill=True)) + .flatten() +) # Get bg dark -bg_dark_query = (FromEventStream('start', (), bg_docs) - .map(query_dark, db=db)) +bg_dark_query = FromEventStream("start", (), bg_docs).map(query_dark, db=db) -fg_dark_query = (start_docs.map(query_dark, db=db)) +fg_dark_query = start_docs.map(query_dark, db=db) fg_dark_query.filter(lambda x: x != [] and isinstance(x, list)).sink(print) -fg_dark_query.filter(lambda x: x == []).sink(lambda x: print('No dark found!')) +fg_dark_query.filter(lambda x: x == []).sink(lambda x: print("No dark found!")) -for p, descriptor in zip([pdf_pipeline, saxs_pipeline], ['primary', 'saxs']): - unique_geo.map(lambda x: p['geometry_img_shape'].lossless_buffer.clear()) +for p, descriptor in zip([pdf_pipeline, saxs_pipeline], ["primary", "saxs"]): + unique_geo.map(lambda x: p["geometry_img_shape"].lossless_buffer.clear()) # Clear composition every start document - (start_docs - .sink(lambda x: clear_combine_latest(p['iq_comp'], 1))) - composition.connect(p['composition']) + (start_docs.sink(lambda x: clear_combine_latest(p["iq_comp"], 1))) + composition.connect(p["composition"]) # Calibration information - (wavelength.connect(p['wavelength'])) - (dspacing.connect(p['calibrant'])) + (wavelength.connect(p["wavelength"])) + (dspacing.connect(p["calibrant"])) # TODO: pull from descriptor - (FromEventStream('start', ('detector',), source) - .unique(history=1) - .connect(p['detector'])) + ( + FromEventStream("start", ("detector",), source) + .unique(history=1) + .connect(p["detector"]) + ) - (FromEventStream('start', (), source). - map(lambda x: 'detector_calibration_server_uid' in x). - connect(p['is_calibration_img'])) + ( + FromEventStream("start", (), source) + .map(lambda x: "detector_calibration_server_uid" in x) + .connect(p["is_calibration_img"]) + ) # TODO: pull this from the descriptor # Only pass through new calibrations (prevents us from recalculating cals) - (FromEventStream('start', ('calibration_md',), source). - unique(history=1). - connect(p['geo_input'])) + ( + FromEventStream("start", ("calibration_md",), source) + .unique(history=1) + .connect(p["geo_input"]) + ) # Clean out the cached darks and backgrounds on start # so that this will run regardless of background/dark status # note that we get the proper data (if it exists downstream) - start_docs.sink(lambda x: p['raw_background_dark'].emit(0.0)) - start_docs.sink(lambda x: p['raw_background'].emit(0.0)) - start_docs.sink(lambda x: p['raw_foreground_dark'].emit(0.0)) - - (FromEventStream('event', ('data', image_name), - bg_dark_query.map(lambda x: x[0].documents(fill=True)) - .flatten(), event_stream_name=descriptor).map(np.float32) - .connect(p['raw_background_dark'])) + start_docs.sink(lambda x: p["raw_background_dark"].emit(0.0)) + start_docs.sink(lambda x: p["raw_background"].emit(0.0)) + start_docs.sink(lambda x: p["raw_foreground_dark"].emit(0.0)) + + ( + FromEventStream( + "event", + ("data", image_name), + bg_dark_query.map(lambda x: x[0].documents(fill=True)).flatten(), + event_stream_name=descriptor, + ) + .map(np.float32) + .connect(p["raw_background_dark"]) + ) # Get background - (FromEventStream('event', ('data', image_name), bg_docs, - event_stream_name=descriptor).map(np.float32) - .connect(p['raw_background'])) + ( + FromEventStream( + "event", + ("data", image_name), + bg_docs, + event_stream_name=descriptor, + ) + .map(np.float32) + .connect(p["raw_background"]) + ) # Get foreground dark - (FromEventStream('event', ('data', image_name), - fg_dark_query.filter(lambda x: x != []) - .map(lambda x: x if not isinstance(x, list) else x[0]) - .map(lambda x: x.documents(fill=True)).flatten(), - event_stream_name=descriptor - ).map(np.float32) - .connect(p['raw_foreground_dark'])) + ( + FromEventStream( + "event", + ("data", image_name), + fg_dark_query.filter(lambda x: x != []) + .map(lambda x: x if not isinstance(x, list) else x[0]) + .map(lambda x: x.documents(fill=True)) + .flatten(), + event_stream_name=descriptor, + ) + .map(np.float32) + .connect(p["raw_foreground_dark"]) + ) # Pull dark live if possible # NOTE: THIS ASSUMES THAT THE DARK STREAMS ARE NAMED saxs_dark - (FromEventStream('event', ('data', image_name), - event_stream_name='{}_{}'.format(descriptor, 'dark') - ) - .map(np.float32) - .connect(p['raw_foreground_dark'])) + ( + FromEventStream( + "event", + ("data", image_name), + event_stream_name="{}_{}".format(descriptor, "dark"), + ) + .map(np.float32) + .connect(p["raw_foreground_dark"]) + ) # Get foreground - FromEventStream('event', ('seq_num',), source, stream_name='seq_num', - event_stream_name=descriptor - ).connect(p['img_counter']) - (FromEventStream('event', ('data', image_name), source, principle=True, - stream_name='raw_foreground', - event_stream_name=descriptor).map(np.float32) - .connect(p['raw_foreground'])) + FromEventStream( + "event", + ("seq_num",), + source, + stream_name="seq_num", + event_stream_name=descriptor, + ).connect(p["img_counter"]) + ( + FromEventStream( + "event", + ("data", image_name), + source, + principle=True, + stream_name="raw_foreground", + event_stream_name=descriptor, + ) + .map(np.float32) + .connect(p["raw_foreground"]) + ) # TODO: save this in such a way that we can get at it for each calibration # Save out calibration data to special place - (p['gen_geo_cal'].pluck(0) - .zip_latest(h_timestamp) - .starsink(lambda x, y: _save_calib_param(x, y, os.path.join( - glbl_dict['config_base'], - '{}_{}'.format(descriptor, glbl_dict['calib_config_name']))))) + ( + p["gen_geo_cal"] + .pluck(0) + .zip_latest(h_timestamp) + .starsink( + lambda x, y: _save_calib_param( + x, + y, + os.path.join( + glbl_dict["config_base"], + "{}_{}".format(descriptor, glbl_dict["calib_config_name"]), + ), + ) + ) + ) raw_source.starsink(StartStopCallback()) # raw_source.visualize(os.path.expanduser('~/mystream_new.png'), diff --git a/xpdan/pipelines/save.py b/xpdan/pipelines/save.py index 66dc787..c712be4 100644 --- a/xpdan/pipelines/save.py +++ b/xpdan/pipelines/save.py @@ -1,3 +1,5 @@ +from xpdan.formatters import render, clean_template +from xpdan.io import dump_yml from xpdan.pipelines.main import * # ''' @@ -8,6 +10,9 @@ # This could be done by having each saver inside a callback which takes both # analyzed and raw documents, and creates the path from those two. +from xpdan.pipelines.pipeline_utils import base_template + +from xpdconf.conf import glbl_dict start_yaml_string = (start_docs.map(lambda s: {'raw_start': s, 'ext': '.yaml', From 7c98f7a945ba27e1871f0f848bcf151747d3fb1d Mon Sep 17 00:00:00 2001 From: christopher Date: Tue, 10 Jul 2018 17:28:09 -0400 Subject: [PATCH 7/7] WIP: convert event pipeline to link based system --- xpdan/pipelines/factory.py | 30 ++++ xpdan/pipelines/main.py | 340 ++++++++++++++++++++----------------- 2 files changed, 210 insertions(+), 160 deletions(-) create mode 100644 xpdan/pipelines/factory.py diff --git a/xpdan/pipelines/factory.py b/xpdan/pipelines/factory.py new file mode 100644 index 0000000..56c8f48 --- /dev/null +++ b/xpdan/pipelines/factory.py @@ -0,0 +1,30 @@ +from bluesky.callbacks.core import CallbackBase + + +ex_pipeline_func_dict = { + "PDF": { + "func": pdf_pipeline_factory, + "input": "raw_input", + "cache_mod": pdf_cache_mod, + } +} + + +class PipelineFactory(CallbackBase): + def __init__(self, pipeline_func_dict): + self.pipelines = pipeline_func_dict + + def start(self, doc): + # Ask the experiment for the desired analysis + pipeline_name = doc["pipeline_name"] + if pipeline_name not in self.pipelines: + return None + # if novel make new pipeline, else use current pipeline + # Note that this doesn't support multiple starts (which requires + pipeline = self.pipelines[pipeline_name].get( + "pipeline", self.pipelines[pipeline_name]["func"]() + ) + # modify the cache + self.pipelines[pipeline_name]["cache_mod"](pipeline) + # return the input + return pipeline[self.pipelines[pipeline_name]["input"]] diff --git a/xpdan/pipelines/main.py b/xpdan/pipelines/main.py index 6442b66..d449882 100644 --- a/xpdan/pipelines/main.py +++ b/xpdan/pipelines/main.py @@ -3,203 +3,212 @@ import numpy as np from shed.translation import FromEventStream from streamz_ext import Stream +from streamz_ext.link import link from xpdan.callbacks import StartStopCallback from xpdan.db_utils import query_background, query_dark, temporal_prox from xpdan.pipelines.pipeline_utils import ( _timestampstr, - clear_combine_latest, Filler, ) from xpdconf.conf import glbl_dict from xpdtools.calib import _save_calib_param -from xpdtools.pipelines.raw_pipeline import make_pipeline, mask_setting - -pdf_pipeline = make_pipeline() -saxs_pipeline = make_pipeline() +from xpdtools.pipelines.raw_pipeline import ( + make_pipeline as make_raw_pipeline, +) image_name = glbl_dict["image_field"] db = glbl_dict["exp_db"] calibration_md_folder = {"folder": "xpdAcq_calib_info.yml"} -filler = Filler(db=db) -# Build the general pipeline from the raw_pipeline -raw_source = Stream(stream_name="raw source") -# TODO: change this when new dark logic comes -# Check that the data isn't a dark -dk_uid = FromEventStream("start", (), upstream=raw_source).map( - lambda x: "sc_dk_field_uid" in x -) -# Fill the raw event stream -source = ( - raw_source.combine_latest(dk_uid) - .filter(lambda x: x[1]) - .pluck(0) - # Filler returns None for resource/datum data - .starmap(filler) - .filter(lambda x: x is not None) -) -# Get all the documents -start_docs = FromEventStream("start", (), source) -descriptor_docs = FromEventStream( - "descriptor", (), source, event_stream_name="primary" -) -event_docs = FromEventStream("event", (), source, event_stream_name="primary") -all_docs = event_docs.combine_latest( - start_docs, descriptor_docs, emit_on=0 -).starmap( - lambda e, s, d: { - "raw_event": e, - "raw_start": s, - "raw_descriptor": d, - "human_timestamp": _timestampstr(s["time"]), - } -) +def make_general_pipeline(): + filler = Filler(db=db) + # Build the general pipeline from the raw_pipeline + raw_source = Stream(stream_name="raw source") -# TODO: pull this from descriptor! -# If new calibration uid invalidate our current calibration cache -unique_geo = FromEventStream( - "start", ("detector_calibration_client_uid",), source -).unique(history=1) -wavelength = FromEventStream("start", ("bt_wavelength",), source).unique( - history=1 -) -composition = FromEventStream("start", ("composition_string",), source) -dspacing = FromEventStream("start", ("dSpacing",), source).unique(history=1) -start_timestamp = FromEventStream("start", ("time",), source) -h_timestamp = start_timestamp.map(_timestampstr) - -bg_query = start_docs.map(query_background, db=db) -bg_docs = ( - bg_query.zip(start_docs) - .starmap(temporal_prox) - .filter(lambda x: x != []) - .map(lambda x: x[0].documents(fill=True)) - .flatten() -) - -# Get bg dark -bg_dark_query = FromEventStream("start", (), bg_docs).map(query_dark, db=db) + # TODO: change this when new dark logic comes + # Check that the data isn't a dark + dk_uid = FromEventStream("start", (), upstream=raw_source).map( + lambda x: "sc_dk_field_uid" in x + ) + # Fill the raw event stream + source = ( + raw_source.combine_latest(dk_uid) + .filter(lambda x: x[1]) + .pluck(0) + # Filler returns None for resource/datum data + .starmap(filler) + .filter(lambda x: x is not None) + ) + # Get all the documents + start_docs = FromEventStream("start", (), source) + descriptor_docs = FromEventStream( + "descriptor", (), source, event_stream_name="primary" + ) + event_docs = FromEventStream( + "event", (), source, event_stream_name="primary" + ) + all_docs = event_docs.combine_latest( + start_docs, descriptor_docs, emit_on=0 + ).starmap( + lambda e, s, d: { + "raw_event": e, + "raw_start": s, + "raw_descriptor": d, + "human_timestamp": _timestampstr(s["time"]), + } + ) -fg_dark_query = start_docs.map(query_dark, db=db) -fg_dark_query.filter(lambda x: x != [] and isinstance(x, list)).sink(print) -fg_dark_query.filter(lambda x: x == []).sink(lambda x: print("No dark found!")) + # If new calibration uid invalidate our current calibration cache + wavelength = FromEventStream("start", ("bt_wavelength",), source).unique( + history=1 + ) + composition = FromEventStream("start", ("composition_string",), source) + dspacing = FromEventStream("start", ("dSpacing",), source).unique( + history=1 + ) + start_timestamp = FromEventStream("start", ("time",), source) + h_timestamp = start_timestamp.map(_timestampstr) + + bg_docs = ( + start_docs.map(query_background, db=db) + .zip(start_docs) + .starmap(temporal_prox) + .filter(lambda x: x != []) + .map(lambda x: x[0].documents(fill=True)) + .flatten() + ) -for p, descriptor in zip([pdf_pipeline, saxs_pipeline], ["primary", "saxs"]): - unique_geo.map(lambda x: p["geometry_img_shape"].lossless_buffer.clear()) + # Get bg dark + bg_dark_query = FromEventStream("start", (), bg_docs).map( + query_dark, db=db + ) - # Clear composition every start document - (start_docs.sink(lambda x: clear_combine_latest(p["iq_comp"], 1))) - composition.connect(p["composition"]) + fg_dark_query = start_docs.map(query_dark, db=db) + fg_dark_query.filter(lambda x: x != [] and isinstance(x, list)).sink(print) + fg_dark_query.filter(lambda x: x == []).sink( + lambda x: print("No dark found!") + ) + raw_source.starsink(StartStopCallback()) + + return { + "raw_source": raw_source, + "source": source, + "start_docs": start_docs, + "descriptor_docs": descriptor_docs, + "event_docs": event_docs, + "all_docs": all_docs, + "wavelength": wavelength, + "composition": composition, + "dspacing": dspacing, + "h_timestamp": h_timestamp, + "bg_dark_query": bg_dark_query, + "fg_dark_query": fg_dark_query, + } - # Calibration information - (wavelength.connect(p["wavelength"])) - (dspacing.connect(p["calibrant"])) - # TODO: pull from descriptor - ( - FromEventStream("start", ("detector",), source) - .unique(history=1) - .connect(p["detector"]) - ) +def make_spec_pipeline(descriptor="primary"): + start_docs = Stream() + source = Stream() + unique_geo = FromEventStream( + "descriptor", + ("detector_calibration_client_uid",), + source, + event_stream_name=descriptor, + ).unique(history=1) + detector = FromEventStream( + "descriptor", ("detector",), source, event_stream_name=descriptor + ).unique(history=1) - ( - FromEventStream("start", (), source) - .map(lambda x: "detector_calibration_server_uid" in x) - .connect(p["is_calibration_img"]) - ) + is_calibration_img = FromEventStream( + "descriptor", (), source, event_stream_name=descriptor + ).map(lambda x: "detector_calibration_server_uid" in x) # TODO: pull this from the descriptor # Only pass through new calibrations (prevents us from recalculating cals) - ( - FromEventStream("start", ("calibration_md",), source) - .unique(history=1) - .connect(p["geo_input"]) - ) - - # Clean out the cached darks and backgrounds on start - # so that this will run regardless of background/dark status - # note that we get the proper data (if it exists downstream) - start_docs.sink(lambda x: p["raw_background_dark"].emit(0.0)) - start_docs.sink(lambda x: p["raw_background"].emit(0.0)) - start_docs.sink(lambda x: p["raw_foreground_dark"].emit(0.0)) - - ( - FromEventStream( - "event", - ("data", image_name), - bg_dark_query.map(lambda x: x[0].documents(fill=True)).flatten(), - event_stream_name=descriptor, - ) - .map(np.float32) - .connect(p["raw_background_dark"]) - ) + geo_input = FromEventStream( + "descriptor", ("calibration_md",), source, event_stream_name=descriptor + ).unique(history=1) + bg_dark_query = Stream() + raw_background_dark = FromEventStream( + "event", + ("data", image_name), + bg_dark_query.map(lambda x: x[0].documents(fill=True)).flatten(), + event_stream_name=descriptor, + ).map(np.float32) + bg_docs = Stream() # Get background - ( - FromEventStream( - "event", - ("data", image_name), - bg_docs, - event_stream_name=descriptor, - ) - .map(np.float32) - .connect(p["raw_background"]) - ) + raw_background = FromEventStream( + "event", ("data", image_name), bg_docs, event_stream_name=descriptor + ).map(np.float32) # Get foreground dark - ( - FromEventStream( - "event", - ("data", image_name), - fg_dark_query.filter(lambda x: x != []) - .map(lambda x: x if not isinstance(x, list) else x[0]) - .map(lambda x: x.documents(fill=True)) - .flatten(), - event_stream_name=descriptor, - ) - .map(np.float32) - .connect(p["raw_foreground_dark"]) - ) + fg_dark_query = Stream() + # XXX: this is double defined, because there are two things putting data + # into this node + raw_foreground_dark1 = FromEventStream( + "event", + ("data", image_name), + fg_dark_query.filter(lambda x: x != []) + .map(lambda x: x if not isinstance(x, list) else x[0]) + .map(lambda x: x.documents(fill=True)) + .flatten(), + event_stream_name=descriptor, + ).map(np.float32) # Pull dark live if possible - # NOTE: THIS ASSUMES THAT THE DARK STREAMS ARE NAMED saxs_dark - ( - FromEventStream( - "event", - ("data", image_name), - event_stream_name="{}_{}".format(descriptor, "dark"), - ) - .map(np.float32) - .connect(p["raw_foreground_dark"]) - ) + # NOTE: THIS ASSUMES THAT THE DARK STREAMS ARE NAMED _dark + raw_foreground_dark2 = FromEventStream( + "event", + ("data", image_name), + event_stream_name="{}_{}".format(descriptor, "dark"), + ).map(np.float32) + raw_foreground_dark = raw_foreground_dark1.union(raw_foreground_dark2) # Get foreground - FromEventStream( + img_counter = FromEventStream( "event", ("seq_num",), source, stream_name="seq_num", event_stream_name=descriptor, - ).connect(p["img_counter"]) - ( - FromEventStream( - "event", - ("data", image_name), - source, - principle=True, - stream_name="raw_foreground", - event_stream_name=descriptor, - ) - .map(np.float32) - .connect(p["raw_foreground"]) ) + raw_foreground = FromEventStream( + "event", + ("data", image_name), + source, + principle=True, + stream_name="raw_foreground", + event_stream_name=descriptor, + ).map(np.float32) + + raw_source = Stream() + return { + "raw_source": raw_source, + "start_docs": start_docs, + "unique_geo": unique_geo, + "detector": detector, + "is_calibration_img": is_calibration_img, + "geo_input": geo_input, + "bg_dark_query": bg_dark_query, + "raw_background_dark": raw_background_dark, + "bg_docs": bg_docs, + "raw_background": raw_background, + "fg_dark_query": fg_dark_query, + "raw_foreground_dark": raw_foreground_dark, + "img_counter": img_counter, + "raw_foreground": raw_foreground, + } + - # TODO: save this in such a way that we can get at it for each calibration +def make_save_cal_pipeline(descriptor): + gen_geo_cal = Stream() + # TODO: move to independent saver factory # Save out calibration data to special place + h_timestamp = Stream() ( - p["gen_geo_cal"] - .pluck(0) + gen_geo_cal.pluck(0) .zip_latest(h_timestamp) .starsink( lambda x, y: _save_calib_param( @@ -212,7 +221,18 @@ ) ) ) - -raw_source.starsink(StartStopCallback()) -# raw_source.visualize(os.path.expanduser('~/mystream_new.png'), -# source_node=True) + return {"gen_geo_cal": gen_geo_cal, "h_timestamp": h_timestamp} + + +def make_pipeline(descriptors=("primary", )): + out = [] + general = make_general_pipeline() + for desc in descriptors: + raw_pipeline = make_raw_pipeline() + ev_raw_pipeline = link( + make_spec_pipeline(desc), + raw_pipeline, + make_save_cal_pipeline(desc), + ) + out.append(link(general, ev_raw_pipeline)) + return out