diff --git a/news/flatfield b/news/flatfield new file mode 100644 index 0000000..d037671 --- /dev/null +++ b/news/flatfield @@ -0,0 +1,13 @@ +**Added:** + +* Flatfield data analysis pipeline + +**Changed:** None + +**Deprecated:** None + +**Removed:** None + +**Fixed:** None + +**Security:** None diff --git a/xpdan/pipelines/flatfield.py b/xpdan/pipelines/flatfield.py new file mode 100644 index 0000000..18586b3 --- /dev/null +++ b/xpdan/pipelines/flatfield.py @@ -0,0 +1,115 @@ +import os + +from bluesky.callbacks.broker import LiveImage +from shed.translation import FromEventStream +from xpdan.callbacks import StartStopCallback +from xpdan.pipelines.pipeline_utils import (_timestampstr, + Filler) +from xpdconf.conf import glbl_dict +from xpdconf.conf import XPD_SHUTTER_CONF +from xpdtools.calib import _save_calib_param +from xpdtools.pipelines.flatfield import * +from shed.translation import ToEventStream +from xpdtools.tools import z_score_image, overlay_mask + +image_name = glbl_dict['image_field'] +shutter_name = glbl_dict['shutter_field'] +db = glbl_dict['exp_db'] +calibration_md_folder = {'folder': 'xpdAcq_calib_info.yml'} + +filler = Filler(db=db) +# Build the general pipeline from the raw_pipeline +raw_source = Stream(stream_name='raw source') + +# Fill the raw event stream +source = (raw_source + # Filler returns None for resource/datum data + .starmap(filler).filter(lambda x: x is not None)) + +# Get all the documents +start_docs = FromEventStream('start', (), source) +descriptor_docs = FromEventStream('descriptor', (), source, + event_stream_name='primary') +event_docs = FromEventStream('event', (), source, event_stream_name='primary') +all_docs = (event_docs + .combine_latest(start_docs, descriptor_docs, emit_on=0) + .starmap(lambda e, s, d: {'raw_event': e, 'raw_start': s, + 'raw_descriptor': d, + 'human_timestamp': _timestampstr( + s['time'])})) + +# If new calibration uid invalidate our current calibration cache +(FromEventStream('start', ('detector_calibration_client_uid',), source) + .unique(history=1) + .map(lambda x: geometry_img_shape.lossless_buffer.clear())) + +# Calibration information +(FromEventStream('start', ('bt_wavelength',), source) + .unique(history=1) + .connect(wavelength)) +(FromEventStream('start', ('dSpacing',), source) + .unique(history=1) + .connect(calibrant)) +(FromEventStream('start', ('detector',), source) + .unique(history=1) + .connect(detector)) + +(FromEventStream('start', (), source). + map(lambda x: 'detector_calibration_server_uid' in x). + connect(is_calibration_img)) +# Only pass through new calibrations (prevents us from recalculating cals) +(FromEventStream('start', ('calibration_md',), source). + unique(history=1). + connect(geo_input)) + +start_timestamp = FromEventStream('start', ('time',), source) + +# Clean out the cached darks and backgrounds on start +# so that this will run regardless of background/dark status +# note that we get the proper data (if it exists downstream) +start_docs.sink(lambda x: raw_background_dark.emit(0.0)) +start_docs.sink(lambda x: raw_background.emit(0.0)) +start_docs.sink(lambda x: raw_foreground_dark.emit(0.0)) + +# Get foreground dark +((FromEventStream('event', ('data', image_name), raw_source, + event_stream_name='dark') + .map(np.float32) + .connect(raw_foreground_dark))) + +# Get foreground +FromEventStream('event', ('seq_num',), source, stream_name='seq_num', + event_stream_name='primary' + ).connect(img_counter) +(FromEventStream('event', ('data', image_name), source, principle=True, + event_stream_name='primary', + stream_name='raw_foreground') + .map(np.float32) + .connect(raw_foreground)) + +# Save out calibration data to special place +h_timestamp = start_timestamp.map(_timestampstr) +(gen_geo_cal.pluck(0) + .zip_latest(h_timestamp) + .starsink(lambda x, y: _save_calib_param(x, y, os.path.join( + glbl_dict['config_base'], glbl_dict['calib_config_name'])))) + +raw_source.starsink(StartStopCallback()) + +ave_ff.map(np.nan_to_num).sink( + LiveImage('image', cmap='viridis', + limit_func=lambda x: (np.nanpercentile(x, .1), + np.nanpercentile(x, 99.9)), + # norm=SymLogNorm(.1), + window_title='percent off').update) +L = ave_ff.sink_to_list() +z_score = ( + pol_corrected_img. + combine_latest(binner, emit_on=0). + starmap(z_score_image, stream_name='z score'). + combine_latest(mask, emit_on=0).starmap(overlay_mask)) + +# Zscore +z_score_plot = ToEventStream(z_score, ('z_score',)).starsink( + LiveImage('z_score', cmap='viridis', window_title='z score', + limit_func=lambda im: (-2, 2)), stream_name='z score vis') \ No newline at end of file diff --git a/xpdan/tests/utils.py b/xpdan/tests/utils.py index a4fb225..fbe396d 100644 --- a/xpdan/tests/utils.py +++ b/xpdan/tests/utils.py @@ -93,7 +93,7 @@ def insert_imgs(RE, reg, n, shape, save_dir, **kwargs): reg=reg) light_det = sim.SynSignalWithRegistry(name='pe1_image', func=lambda: (np.random.random( - shape) * 65535) .astype('uint16'), + shape) * 65535).astype('uint16'), reg=reg) beamtime_uid = str(uuid4()) base_md = dict(beamtime_uid=beamtime_uid,