diff --git a/src/ess/sans/beam_center_finder.py b/src/ess/sans/beam_center_finder.py index ef9e82ef..4613be2e 100644 --- a/src/ess/sans/beam_center_finder.py +++ b/src/ess/sans/beam_center_finder.py @@ -37,6 +37,128 @@ def _xy_extrema(pos: sc.Variable) -> sc.Variable: return sc.concat([x_min, x_max, y_min, y_max], dim='extremes') +def _find_beam_center( + data, + sample_holder_radius, + sample_holder_arm_width, +): + ''' + Each iteration the center of mass of the remaining intensity is computed + and assigned to be the current beam center guess ``c``. + Then three symmetrical masks are created to make sure that the remaining intensity + distribution does not extend outside of the detector and that the sample holder + does not make the remaining intensity asymmetrical. + + The three masks are: + + - one "outer" circular mask with radius less than the minimal distance + from the current beam center guess to the border of the detector + - one "inner" circular mask with radius larger than the sample holder + - one "arm" rectangular mask with width wider than the sample holder arm + + The "outer" mask radius is found from the detector size. + The "inner" mask radius is supplied by the caller. + The "arm" mask slope is determined by the direction of minimum intensity + around the current beam center guess, the "arm" mask width is an argument + supplied by the caller. + ''' + m = data.copy() + m.masks.clear() + s = m.bins.sum() + + for i in range(20): + c = (s.coords['position'] * sc.values(s)).sum() / sc.values(s).sum() + d = s.coords['position'] - c.data + + outer = 0.9 * min( + sc.abs(d.fields.x.min()), + sc.abs(d.fields.x.max()), + sc.abs(d.fields.y.min()), + sc.abs(d.fields.y.max()), + ) + s.masks['_outer'] = d.fields.x**2 + d.fields.y**2 > outer**2 + s.masks['_inner'] = d.fields.x**2 + d.fields.y**2 < sample_holder_radius**2 + + if i > 10: + s.coords['th'] = sc.where( + d.fields.x > sc.scalar(0.0, unit='m'), + sc.atan2(y=d.fields.y, x=d.fields.x), + sc.scalar(sc.constants.pi.value, unit='rad') + - sc.atan2(y=d.fields.y, x=-d.fields.x), + ) + h = s.drop_masks(['_arm'] if '_arm' in s.masks else []).hist(th=100) + th = h.coords['th'][np.argmin(h.values)] + + slope = sc.tan(th) + s.masks['_arm'] = ( + d.fields.y < slope * d.fields.x + sample_holder_arm_width + ) & (d.fields.y > slope * d.fields.x - sample_holder_arm_width) + return c.data + + +def beam_center_from_center_of_mass_alternative( + workflow, + sample_holder_radius=None, + sample_holder_arm_width=None, +) -> BeamCenter: + """ + Estimate the beam center via the center-of-mass of the data counts. + + We are assuming the intensity distribution is symmetric around the beam center. + Even if the intensity distribution is symmetric around the beam center + the intensity distribution in the detector might not be, because + + - the detector has a finite extent, + - and there is a sample holder covering part of the detector. + + To deal with the limited size of the detector a mask can be applied that is small + enough so that the the remaining intensity is entirely inside the detector. + To deal with the sample holder we can mask the region of the detector that the + sample holder covers. + + But to preserve the symmetry of the intensity around the beam center the masks + also need to be symmetical around the beam center. + The problem is, the beam center is unknown. + However, if the beam center was known to us, and we applied symmetrical masks + that covered the regions of the detector where the intensity distribution is + asymmetrical, + then the center of mass of the remaining intensity would equal the beam center. + Conversely, if we apply symmetrical masks around a point that is not the beam center + the center of mass of the remaining intensity will (likely) not equal the original + point. + This suggests the beam center can be found using a fixed point iteration where each + iteration we + + 1. Compute the center of mass of the remaining intensity and assign it to be our + current estimate of the beam center. + 2. Create symmetrical masks around the current estimate of the beam center. + 3. Repeat from 1. until convergence. + + Parameters + ---------- + workflow: + The reduction workflow to compute MaskedData[SampleRun]. + + Returns + ------- + : + The beam center position as a vector. + """ + + if sample_holder_radius is None: + sample_holder_radius = sc.scalar(0.05, unit='m') + if sample_holder_arm_width is None: + sample_holder_arm_width = sc.scalar(0.02, unit='m') + + try: + beam_center = workflow.compute(BeamCenter) + except sciline.UnsatisfiedRequirement: + beam_center = sc.vector([0.0, 0.0, 0.0], unit='m') + workflow[BeamCenter] = beam_center + data = workflow.compute(MaskedData[SampleRun]) + return _find_beam_center(data, sample_holder_radius, sample_holder_arm_width) + + def beam_center_from_center_of_mass(workflow: sciline.Pipeline) -> BeamCenter: """ Estimate the beam center via the center-of-mass of the data counts.