diff --git a/apsuite/optimization/__init__.py b/apsuite/optimization/__init__.py index 64134ea4..42d1744c 100644 --- a/apsuite/optimization/__init__.py +++ b/apsuite/optimization/__init__.py @@ -1,3 +1,4 @@ +""".""" from .genetic_algorithm import GA from .pso import PSO from .scanning import SimpleScan diff --git a/apsuite/simullib/touschek_loss_spots/__init__.py b/apsuite/simullib/touschek_loss_spots/__init__.py new file mode 100644 index 00000000..36a5003c --- /dev/null +++ b/apsuite/simullib/touschek_loss_spots/__init__.py @@ -0,0 +1,4 @@ +""".""" +from . import functions, tous_analysis + +__all__ = ["functions", "tous_analysis"] diff --git a/apsuite/simullib/touschek_loss_spots/functions.py b/apsuite/simullib/touschek_loss_spots/functions.py new file mode 100644 index 00000000..3a6fe39d --- /dev/null +++ b/apsuite/simullib/touschek_loss_spots/functions.py @@ -0,0 +1,575 @@ +"""Functions_for_TousAnalysis.""" +import matplotlib.pyplot as _plt +import numpy as _np +import pyaccel as _pyaccel +import scipy.integrate as _scyint +import scipy.special as _special +from mathphys.beam_optics import beam_rigidity as _beam_rigidity + + +def calc_amp(acc, energy_offsets, hmax, hmin): + """Calculates the amplitudes and gets the physical limitants. + + acc = accelerator model. + energy_offsets = energy deviation for calculate physical limitants. + hmax = horizontal max apperture. + hmin = horizontal min apperture. + """ + a_def = _np.zeros(energy_offsets.size) + indices = _np.zeros(energy_offsets.size) + try: + for idx, delta in enumerate(energy_offsets): + twi, *_ = _pyaccel.optics.calc_twiss( + accelerator=acc, energy_offset=delta, indices="closed" + ) + if _np.any(_np.isnan(twi[0].betax)): + raise _pyaccel.optics.OpticsException("error") + rx = twi.rx + betax = twi.betax + + a_sup = (hmax - rx) ** 2 / betax + a_inf = (hmin - rx) ** 2 / betax + + a_max = _np.minimum(a_sup, a_inf) + idx_min = _np.argmin(a_max) + indices[idx] = idx_min + a_def[idx] = a_max[idx_min] + except ( + _pyaccel.optics.OpticsException, + _pyaccel.tracking.TrackingException, + ): + pass + return _np.sqrt(a_def), indices + + +def track_eletrons_d( + deltas, n_turn, element_idx, model, pos_x=1e-5, pos_y=3e-6 +): + """Tracking simulation for touschek scattering that ocorred in element_idx. + + model = accelerator model. + deltas = energy deviation. + n_turn = number of turns desired. + pos_x = small pertubation in x. + pos_y = small pertubation in y. + """ + orb = _pyaccel.tracking.find_orbit6(model, indices=[0, element_idx]) + orb = orb[:, 1] + + rin = _np.zeros((6, deltas.size)) + rin += orb[:, None] + rin[0] += pos_x + rin[2] += pos_y + rin[4] += deltas + + track = _pyaccel.tracking.ring_pass( + model, + rin, + nr_turns=n_turn, + turn_by_turn=True, + element_offset=element_idx, + parallel=True, + ) + + _, _, turn_lost, element_lost, _ = track + + dic = {} + t_lost, e_lost, delta = [], [], [] + + for i, item in enumerate(turn_lost): + if item == n_turn and element_lost[i] == element_idx: + pass + else: + t_lost.append(item) + e_lost.append(element_lost[i]) + delta.append(deltas[i]) + + dic["turn_lost"] = _np.array(t_lost) + dic["element_lost"] = _np.intp(e_lost) + dic["energy_deviation"] = _np.array(delta) + + return dic + + +def plot_track_d( + acc, + dic_tracked, + index_list, + offs_list, + par, + element_idx, + accep, + delt, + f_dens, +): + """Plot the tracking results for a given s position. + + acc = accelerator model. + dic_tracked = dictionary of informations provided by tracking. + index_list = defines the physical limitants from linear model. + offs_list = list of e_dev used in tracking. + par = defines the analysis for positive or negative e_dev. + element_idx = defines initial conditions for tracking. + accep = touschek energy acceptance. + delt = energy deviation for the touschek loss rate density. + fdens = touschek loss rate density. + """ + # ---------------- + + turn_lost = dic_tracked["turn_lost"] + element_lost = dic_tracked["element_lost"] + deltas = dic_tracked["energy_deviation"] * 1e2 + + cm = 1 / 2.54 # 'poster' + twi0, *_ = _pyaccel.optics.calc_twiss(acc, indices="open") + betax = twi0.betax + betax = betax * (1 / 5) + spos = _pyaccel.lattice.find_spos(acc) + + fig = _plt.figure(figsize=(38.5 * cm, 18 * cm)) + gs = _plt.GridSpec( + 1, + 3, + left=0.1, + right=0.98, + wspace=0.03, + top=0.95, + bottom=0.1, + width_ratios=[2, 3, 8], + ) + a1 = fig.add_subplot(gs[0, 0]) + a2 = fig.add_subplot(gs[0, 1], sharey=a1) + a3 = fig.add_subplot(gs[0, 2], sharey=a1) + a1.tick_params(axis="both", labelsize=18) + a2.tick_params( + axis="y", + which="both", + left=False, + right=False, + labelleft=False, + labelsize=18, + ) + a3.tick_params( + axis="y", + which="both", + left=False, + right=False, + labelleft=False, + labelsize=18, + ) + a2.tick_params(axis="both", labelsize=18) + a3.tick_params(axis="both", labelsize=18) + a1.set_title(r"$\delta \times scat. rate$", fontsize=20) + a2.set_title(r"$\delta \times$ lost turn", fontsize=20) + a3.set_title(r"tracking ", fontsize=20) + + a1.set_xlabel(r"$\tau _T$ [1/s]", fontsize=25) + a2.set_xlabel(r"number of turns", fontsize=25) + a3.set_xlabel(r"$s$ [m]", fontsize=25) + + a1.grid(True, alpha=0.5, ls="--", color="k", axis="y") + a2.grid(True, alpha=0.5, ls="--", color="k", axis="y") + a3.grid(True, alpha=0.5, ls="--", color="k", axis="y") + _plt.subplots_adjust(wspace=0.1) + + if "pos" in par: + a1.set_ylabel(r"positive $\delta$ [%]", fontsize=25) + a3.plot(spos[element_lost], deltas, "r.", label="lost pos. (track)") + acp_s = accep[1][element_idx] + indx = _np.argmin(_np.abs(offs_list - acp_s)) + a3.plot( + spos[index_list][:indx], + offs_list[:indx] * 1e2, + "b.", + label=r"accep. limit", + alpha=0.25, + ) + + elif "neg" in par: + a1.set_ylabel(r"negative $\delta$ [%]", fontsize=25) + a3.plot(spos[element_lost], deltas, "r.", label="lost pos. (track)") + acp_s = accep[0][element_idx] + indx = _np.argmin(_np.abs(offs_list + acp_s)) + a3.plot( + spos[index_list][:indx], + -offs_list[:indx] * 1e2, + "b.", + label=r"accep. limit", + alpha=0.25, + ) + + a1.plot(f_dens, delt, label="Scattering touschek rate", color="black") + a2.plot(turn_lost, deltas, "k.") + stri = f"{acc[element_idx].fam_name}, ({spos[element_idx]:.2f} m)" + a3.plot(spos[element_idx], 0, "ko", label=stri) + a3.plot( + spos, _np.sqrt(betax), color="orange", label=r"$ \sqrt{\beta_x} $" + ) + _plt.hlines( + acp_s * 1e2, + spos[0], + spos[-1], + color="black", + linestyles="dashed", + alpha=0.5, + ) + _pyaccel.graphics.draw_lattice(acc, offset=-0.5, height=0.5, gca=True) + a3.legend(loc="upper right", ncol=1, fontsize=15) + fig.show() + + +# def t_list(elmnt): # não sei se isso é útil +# """.""" +# #this condition significates that if the input is only a number, then +# #the fucntion transforms it into a list to avoid errors. +# if isinstance(elmnt,(float,int)): +# return [elmnt] +# else: +# return list(elmnt) + + +def f_function_arg_mod(kappa, kappam, b1_, b2_, norm): + """Returns the touschek loss rate density. + + kappa = substitution that turns easier calculations. + kappam = kappa minimum that defines the lower integration limit. + b1_ = optical parameter obtained by accelerator model. + b2_ = optical parameter obtained by accelerator model. + norm = defines if the function will returns density or not. + """ + tau = (_np.tan(kappa) ** 2)[:, None] + taum = _np.tan(kappam) ** 2 + beta = _beam_rigidity(energy=3)[2] + ratio = tau / taum / (1 + tau) + arg = (2 * tau + 1) ** 2 * (ratio - 1) / tau + arg += tau - _np.sqrt(tau * taum * (1 + tau)) + arg -= (2 + 1 / (2 * tau)) * _np.log(ratio) + arg *= _np.sqrt(1 + tau) + # arg *= beta* _np.cos(kappa)[:, None]**2 + arg *= 1 / (2 * _np.sqrt(tau)) * 1 / (1 + tau) + arg *= 2 * beta * _np.sqrt(tau) + + bessel = _np.exp(-(b1_ - b2_) * tau) * _special.i0e(b2_ * tau) + + if norm: + pass + + else: + arg *= 2 * _np.sqrt(_np.pi * (b1_**2 - b2_**2)) * taum + + return arg * bessel + + +def get_scaccep(acc, accep): + """Returns the s position and the energy acceptance every 10 cm. + + acc = accelerator model used to acquire the optical parameters. + accep = positive and negative energy acceptance. + """ + spos = _pyaccel.lattice.find_spos(acc, indices="closed") + + npt = int((spos[-1] - spos[0]) / 0.1) + scalc = _np.linspace(spos[0], spos[-1], npt) + daccpp = _np.interp(scalc, spos, accep[1]) + daccpn = _np.interp(scalc, spos, accep[0]) + + return scalc, daccpp, daccpn + + +# Como discutido no dia 23.08.2023 corte para a definição da densidade +# de espalhamento Touschek ocorre na aceitância de energia para deter +# minado ponto. + + +def norm_cutacp(acc, lsps, npt, accep, norm=False): + """Plot loss rate for a list of s positions. + + acc = accelerator model used to acquire the optical parameters. + lsps = list of s positions to calculate loss density. + npt = number of points to a linspace array. + accep = positive and negative energy acceptance. + norm = parameter to define if the function will return density. + """ + dic = {} + + scalc, daccpp, daccpn = get_scaccep(acc, accep) + beta = _beam_rigidity(energy=3)[2] + + taum_p = (beta * daccpp) ** 2 + taum_n = (beta * daccpn) ** 2 + kappam_p = _np.arctan(_np.sqrt(taum_p)) + kappam_n = _np.arctan(_np.sqrt(taum_n)) + + ltime = _pyaccel.lifetime.Lifetime(acc) + b1 = ltime.touschek_data["touschek_coeffs"]["b1"] + b2 = ltime.touschek_data["touschek_coeffs"]["b2"] + + fdens_p, fdens_n = [], [] + deltasp, deltasn = [], [] + + for _, s in enumerate(lsps): + idx = _np.argmin(_np.abs(scalc - s)) + kappam_p0 = kappam_p[idx] + kappam_n0 = kappam_n[idx] + + kappap = _np.linspace(kappam_p0, _np.pi / 2, npt) + deltap = 1 / beta * _np.tan(kappap) + kappan = _np.linspace(kappam_n0, _np.pi / 2, npt) + deltan = 1 / beta * _np.tan(kappan) + + y_p = f_function_arg_mod( + kappa=kappap, kappam=kappam_p0, b1_=b1[idx], b2_=b2[idx], norm=norm + ).squeeze() + y_n = f_function_arg_mod( + kappa=kappan, kappam=kappam_n0, b1_=b1[idx], b2_=b2[idx], norm=norm + ).squeeze() + norm_facp = _scyint.trapz(y_p, deltap) + norm_facn = _scyint.trapz(y_n, deltan) + + y_p /= norm_facp + y_n /= norm_facn + + # eliminating the negative values from array + indp = _np.where(y_p < 0)[0] + indn = _np.where(y_n < 0)[0] + y_p[indp] = 0 + y_n[indn] = 0 + + fdens_p.append(y_p) + fdens_n.append(y_n) + deltasp.append(deltap) + deltasn.append(deltan) + + fdens_p = _np.array(fdens_p) + fdens_n = _np.array(fdens_n) + deltasp = _np.array(deltasp) + deltasn = _np.array(deltasn) + + dic["fdensp"] = fdens_p + dic["fdensn"] = fdens_n + dic["deltasp"] = deltasp + dic["deltasn"] = deltasn + + return dic + + +def create_particles(cov_matrix, num_part): + """Creates the beam to realize the Monte-Carlo simulation. + + cov_matrix = covariant matrix to generate beam distribution. + num_part = particles' number for M.C. simulation. + """ + # permute indices to change the order of the columns: + # [rx, px, ry, py, de, dl]^T -> [px, py, de, rx, ry, dl]^T + idcs = [1, 3, 4, 0, 2, 5] + # [px, py, de, rx, ry, dl]^T -> [rx, px, ry, py, de, dl]^T + idcs_r = [3, 0, 4, 1, 2, 5] + + cov_matrix = cov_matrix[:, idcs][idcs, :] + + sig_xx = cov_matrix[:3, :3] + sig_xy = cov_matrix[:3, 3:] + sig_yx = cov_matrix[3:, :3] + sig_yy = cov_matrix[3:, 3:] + inv_yy = _np.linalg.inv(sig_yy) + + part1 = _np.random.Generator.multivariate_normal( + _np.zeros(6), cov_matrix, num_part + ).T + + part2 = part1.copy() + + vec_a = part2[3:] + new_mean = (sig_xy @ inv_yy) @ vec_a + new_cov = sig_xx - sig_xy @ inv_yy @ sig_yx + + part2[:3] = _np.random.Generator.multivariate_normal( + _np.zeros(3), new_cov, num_part + ).T + part2[:3] += new_mean + + part2 = part2[idcs_r, :] + part1 = part1[idcs_r, :] + + return part1, part2 + + +def get_cross_section_distribution(psim, npts=3000): + """Calculates the Moller's cross section. + + psim = minimum scattering angle (lower integration limitant). + npts = number of point for a logspace function. + + if beta_bar equals zero then the total cross section diverges when using + piwinski's relation for the total cross section by the 1/beta_bar + dependence. Therefore, is necessary to neglect this factor contribution. + """ + beta_bar = 0 + psi = _np.logspace(_np.log10(_np.pi / 2 - psim), 0, npts) + psi = _np.pi / 2 - psi + psi = psi[::-1] + cpsi = _np.cos(psi) + cross = _np.zeros(cpsi.size) + if beta_bar > 1e-19: + cross += (1 + 1 / beta_bar**2) ** 2 * ( + 2 * (1 + cpsi**2) / cpsi**3 - 3 / cpsi + ) + cross += 4 / cpsi + 1 + cross *= _np.sin(psi) + cross = _scyint.cumtrapz(cross, x=psi, initial=0.0) + cross /= cross[-1] + return psi, cross + + +def cross_section_draw_samples(psim, num_part): + """Interpolates the cross section effect. + + psim = minimum scat. angle that remains the electrons' orbit stable. + num_part = number of particles for simulation. + """ + psi, cross = get_cross_section_distribution(psim) + crs = _np.random.Generator.rand(num_part) + return _np.interp(crs, cross, psi) + + +def scatter_particles(part1, part2, de_min): + """M.C. simulation of the Touschek scattering process. + + part1 = 1st partcile's coordinates. + part2 = 2nd partcile's coordinates. + de_min = mimimum energy deviation. + """ + gamma = 3e9 / 0.510e6 + beta = _np.sqrt(1 - 1 / gamma / gamma) + num_part = part1.shape[1] + + xl1, yl1, de1 = part1[1], part1[3], part1[4] + xl2, yl2, de2 = part2[1], part2[3], part2[4] + + # calculating the changing base matrix for every particle + pz1 = _np.sqrt((1 + de1) ** 2 - xl1 * xl1 - yl1 * yl1) + pz2 = _np.sqrt((1 + de2) ** 2 - xl2 * xl2 - yl2 * yl2) + + # desired vectors to construct the transformation matrix + p_1 = _np.vstack([xl1, yl1, pz1]) + p_2 = _np.vstack([xl2, yl2, pz2]) + + # new coordinate system + p_j = p_1 + p_2 + p_j /= _np.linalg.norm(p_j, axis=0) # sum + p_k = _np.cross(p_1.T, p_2.T).T + p_k /= _np.linalg.norm(p_k, axis=0) # cross product + p_l = _np.cross(p_j.T, p_k.T).T + + jkl2xyz = _np.zeros([num_part, 3, 3]) + jkl2xyz[:, :, 0] = p_j.T + jkl2xyz[:, :, 1] = p_k.T + jkl2xyz[:, :, 2] = p_l.T + + theta = xl1 - xl2 + zeta = yl1 - yl2 + chi = _np.sqrt(zeta**2 + theta**2) / 2 + + # draw the scattering angles from uniform distribution: + phi = _np.random.Generator.rand(num_part) * 2 * _np.pi + psi = _np.random.Generator.rand(num_part) * _np.pi / 2 + + # draw the psi angle from the cross section probability density: + # we need to define a maximum angle to normalize the cross section + # distribution because it diverges when the full interval [0, pi/2] + # is considered. + # To estimate this angle we define a threshold for the minimum energy + # deviation we care about (de_min) and consider the worst case scenario + # in terms of chi that could create such scattering. + # The worst case happens when chi is maximum, because in this way a small + # scattering angle would create a larger energy deviation. + # We considered here a chi twice as large as the maximum chi draw from + # the particles distribution. + # This method of doing things should be tested and thought about very + # carefully, though. + psim = _np.arccos(de_min / gamma / (chi.max() * 2)) + fact = psim * 2 / _np.pi + print(psim * 2 / _np.pi) + psi = cross_section_draw_samples(psim, num_part) + + # new momentum in j,k,l (eq. 16 of Piwinski paper) + gammat = gamma / _np.sqrt(1 + beta * beta * gamma * gamma * chi * chi) + dp_ = _np.sin(chi[None, :]) * _np.vstack( + [ + gammat * _np.cos(psi), + _np.sin(psi) * _np.cos(phi), + _np.sin(psi) * _np.sin(phi), + ] + ) + p_prime1 = _np.zeros((3, chi.size)) + p_prime1[0] = _np.cos(chi) + p_prime2 = p_prime1.copy() + p_prime1 += dp_ + p_prime2 -= dp_ + + # returning to momentum x,y,z + pnew_1 = (jkl2xyz @ p_prime1.T[:, :, None]).squeeze().T + pnew_2 = (jkl2xyz @ p_prime2.T[:, :, None]).squeeze().T + + delta1 = _np.linalg.norm(pnew_1, axis=0) - 1 + delta2 = _np.linalg.norm(pnew_2, axis=0) - 1 + + part1_new = part1.copy() + part1_new[1] = pnew_1[0] + part1_new[3] = pnew_1[1] + part1_new[4] = delta1 + + part2_new = part2.copy() + part2_new[1] = pnew_2[0] + part2_new[3] = pnew_2[1] + part2_new[4] = delta2 + + return part1_new, part2_new, fact + + +def histgms(acc, l_spos, num_part, accep, de_min, cutaccep): + """Calculates the touschek scattering densities. + + l_spos = list of positions. + num_part = number of particles for M.C. sim. + accep = touschek energy acceptance. + de_min = minimum energy deviation for. + cutaccep = defines the cutoff on the energy acceptance. + """ + envelopes = _pyaccel.optics.calc_beamenvelope(acc) + spos = _pyaccel.lattice.find_spos(acc, indices="closed") + scalc, daccpp, daccpn = get_scaccep(acc, accep) + + histsp1, histsp2, indices = [], [], [] + + for iten in l_spos: + idx_model = _np.argmin(_np.abs(spos - iten)) + idx = _np.argmin(_np.abs(scalc - iten)) + indices.append(idx_model) + + env = envelopes[idx_model] + + part1, part2 = create_particles(env, num_part) + part1_new, part2_new, _ = scatter_particles(part1, part2, de_min) + + if cutaccep: # if true the cutoff is the accp at the s + acpp, acpn = daccpp[idx], daccpn[idx] + check1 = acpp - part1_new[4] + check2 = -(acpn - part2_new[4]) + + ind1 = _np.intp(_np.where(check1 < 0)[0]) + ind2 = _np.intp(_np.where(check2 < 0)[0]) + + histsp1.append(part1_new[4][ind1] * 1e2) + histsp2.append(part2_new[4][ind2] * 1e2) + + else: # ximenes cutoff + ind1 = _np.intp(_np.where(part1_new[4] >= 0.01)[0]) + ind2 = _np.intp(_np.where(part2_new[4] <= -0.01)[0]) + + histsp1.append(part1_new[4][ind1] * 1e2) + histsp2.append(part2_new[4][ind2] * 1e2) + + indices = _np.array(indices) + + return histsp1, histsp2, indices diff --git a/apsuite/simullib/touschek_loss_spots/tous_analysis.py b/apsuite/simullib/touschek_loss_spots/tous_analysis.py new file mode 100644 index 00000000..efae9fb9 --- /dev/null +++ b/apsuite/simullib/touschek_loss_spots/tous_analysis.py @@ -0,0 +1,739 @@ +"""tous_analysis.""" + +import matplotlib.pyplot as _mplt +import numpy as _np +import pandas as _pd +import pyaccel as _pa +import scipy.integrate as _sp_int +from mathphys.beam_optics import beam_rigidity as _beam_rigidity +from mathphys.functions import load as _load, save as _save +from pymodels import si as _si + +from . import functions as _to_fu + + +class TousAnalysis: + """Class for the analysis of electron losses along the ring.""" + + def __init__( + self, accelerator, energies_off=None, beam_energy=None, n_turns=7 + ): + """Parameters necessary to define the class.""" + if energies_off is None: + energy_off = _np.linspace(0, 0.046, 460) # physical limitants + deltas = _np.linspace(0, 0.1, 400) # energy off for tracking + + if beam_energy is None: # defining beta factor + beam_energy = _beam_rigidity(energy=3)[2] + beta = beam_energy + + self._model_fit = accelerator + self._model = _si.create_accelerator() + + self._sc_accps = None + self._accep = None + self._inds_pos = None + self._inds_neg = None + self._amps_pos = None + self._amps_neg = None + self.num_part = 50000 + self.energy_dev_min = 1e-4 + + self.beta = beta # beta factor + self.h_pos = _pa.lattice.get_attribute( + self._model_fit, "hmax", indices="closed" + ) + self.h_neg = _pa.lattice.get_attribute( + self._model_fit, "hmin", indices="closed" + ) + self.ltime = _pa.lifetime.Lifetime(self._model_fit) + self._off_energy = energy_off # (linear model) en_dev to amplitudes + self.nturns = n_turns + self._deltas = deltas + self.spos = _pa.lattice.find_spos(self._model_fit, indices="closed") + self.scraph_inds = _pa.lattice.find_indices( + self._model, "fam_name", "SHVC" + ) + self.scrapv_inds = _pa.lattice.find_indices( + self._model, "fam_name", "SVVC" + ) + + @property + def accelerator(self): + """.""" + return self._model_fit + + @accelerator.setter + def accelerator(self, new_model): + """.""" + self._model_fit = new_model + + @property + def nom_model(self): + """Defines nominal model. + + Some calculus involves nominal model without coupling and vertical + dispersion corretion. + """ + return self._model + + @property + def accep(self): + """Defines Touschek energy acceptance.""" + if self._accep is None: + self._accep = _pa.optics.calc_touschek_energy_acceptance( + self.accelerator + ) + return self._accep + + @property + def s_calc(self): + """Defines property. + + Defines s position and get the energy accpetance both at each 10 + meters. + """ + if self._sc_accps is None: + self._sc_accps = _to_fu.get_scaccep(self.accelerator, self.accep) + return self._sc_accps + + @property + def linear_lim_indices(self): + """Defines 4 properties. + + Positive and negative amplitudes. + physical limitants for positive and negative e_dev + """ + + try: + self._inds_pos = _load("phy_lim_pos.pickle") + self._inds_neg = _load("phy_lim_neg.pickle") + self._amp_pos = _load("amps_pos.pickle") + self._amp_neg = _load("amps_neg.pickle") + except Exception: + self._model.cavity_on = False + self._model.radiation_on = False + + self._amps_pos, self._inds_pos = _to_fu.calc_amp( + self._model, self.off_energy, self.h_pos, self.h_neg + ) + + self._amps_neg, self._inds_neg = _to_fu.calc_amp( + self._model, -self.off_energy, self.h_pos, self.h_neg + ) + + _save(data=self._inds_pos, fname="phy_lim_pos.pickle") + _save(data=self._inds_neg, fname="phy_lim_neg.pickle") + _save(data=self._amps_pos, fname="amps_pos.pickle") + _save(data=self._amps_neg, fname="amps_neg.pickle") + + return self._inds_pos, self._inds_neg + + @property + def off_energy(self): + """.""" + return self._off_energy + + @off_energy.setter + def off_energy(self, accep): + """If necessary redefines the e_dev for tracking. + + The property defines the cutoof e_dev with tous_accep + accep = new touschek acceptance. + """ + accep_pos, accep_neg = accep + accep_lim = _np.max(_np.maximum(accep_pos, _np.abs(accep_neg))) + steps = int(accep_lim * 10000) # choosen to be the number of steps + self._off_energy = _np.linspace(0, accep_lim, steps) + + @property + def deltas(self): + """.""" + return self._deltas + + @deltas.setter + def deltas(self, dev_percent, steps=400): + """Redefines the e_dev for tracking. + + dev_percent = energy deviation in percents [%]. + """ + dev_percent /= 100 + self._deltas = _np.linspace(0, dev_percent, steps) + + # the four properties defining below are to be static + @property + def inds_pos(self): + """.""" + return self._inds_pos + + @property + def inds_neg(self): + """.""" + return self._inds_neg + + @property + def amp_pos(self): + """.""" + return self._amps_pos + + @property # negative amplitudes from linear model + def amp_neg(self): + """.""" + return self._amps_neg + + def get_amps_idxs(self): # Defines various parameters + """Defines 3 self params at same time.""" + return self.linear_lim_indices, self.accep, self.s_calc + + def set_vchamber_scraper(self, vchamber): + """Function for setting the vchamber apperture.""" + model = self._model + scph_inds = self.scraph_inds + scpv_inds = self.scrapv_inds + + for iten in scph_inds: + model[iten].hmin = vchamber[0] + model[iten].hmax = vchamber[1] + for iten in scpv_inds: + model[iten].vmin = vchamber[2] + model[iten].vmax = vchamber[3] + + def _single_pos_track(self, single_spos, par): + """Single position tracking.""" + self._model.cavity_on = True + self._model.radiation_on = True + self._model.vchamber_on = True + s = self.spos + + index = _np.argmin(_np.abs(s - single_spos)) + if "pos" in par: + res = _to_fu.track_eletrons_d( + self.deltas, + self.nturns, + index, + self._model, + pos_x=1e-5, + pos_y=3e-6, + ) + elif "neg" in par: + res = _to_fu.track_eletrons_d( + -self.deltas, + self.nturns, + index, + self._model, + pos_x=1e-5, + pos_y=3e-6, + ) + + return res + + def _get_weighting_tous(self, single_spos, npt=5000): + """.""" + scalc, daccp, daccn = _to_fu.get_scaccep(self.accelerator, self.accep) + bf = self.beta # bf:beta factor + lt = self.ltime + b1 = lt.touschek_data["touschek_coeffs"]["b1"] + b2 = lt.touschek_data["touschek_coeffs"]["b2"] + + taup, taun = (bf * daccp) ** 2, (bf * daccn) ** 2 + idx = _np.argmin(_np.abs(scalc - single_spos)) + taup_0, taun_0 = taup[idx], taun[idx] + kappap_0 = _np.arctan(_np.sqrt(taup_0)) + kappan_0 = _np.arctan(_np.sqrt(taun_0)) + + kappa_pos = _np.linspace(kappap_0, _np.pi / 2, npt) + kappa_neg = _np.linspace(kappan_0, _np.pi / 2, npt) + + deltp = _np.tan(kappa_pos) / bf + deltn = _np.tan(kappa_neg) / bf + fdensp = _to_fu.f_function_arg_mod( + kappa_pos, kappap_0, b1[idx], b2[idx], norm=False + ) + + fdensn = _to_fu.f_function_arg_mod( + kappa_neg, kappan_0, b1[idx], b2[idx], norm=False + ) + + # eliminating negative values + indp = _np.where(fdensp < 0)[0] + indn = _np.where(fdensn < 0)[0] + fdensp[indp] = 0 + fdensn[indn] = 0 + + ind = _np.intp(_np.where(fdensp > 1e-2)[0]) + fdensp = fdensp[ind] + deltp = deltp[ind] + ind = _np.intp(_np.where(fdensn > 1e-2)[0]) + fdensn = fdensn[ind] + deltn = deltn[ind] + + self.deltas = deltp[-1] * 1e2 + + return fdensp, fdensn, deltp, deltn + + def _get_trackndens(self, single_spos, par): + """Concatenates tracking and touschek loss dens.""" + # if len(_to_fu.t_list(single_spos)) != 1: # Não sei se isso é útil + # raise ValueError('This function suports only one s position') + + fdensp, fdensn, deltp, deltn = self._get_weighting_tous(single_spos) + + fp = fdensp.squeeze() + fn = fdensn.squeeze() + + dic = self._single_pos_track(single_spos, par) + deltas = dic["energy_deviation"] + delta_ = _np.abs(_np.diff(deltas)[0]) + + if "pos" in par: + return dic, fp * delta_, deltp * 1e2 + elif "neg" in par: + return dic, fn * delta_, -deltn * 1e2 + + # this function plot the graphic of tracking and the touschek scattering + # distribution for one single position + + def plot_track_lossdens(self, single_spos, par, accep): + """Plot the results of tracking and the tous. scat. loss density. + + single_spos = single possition. + par = defines the analysis (positive or negative). + accep = touschek scattering acceptance. + """ + dic, fp, dp = self._get_trackndens(single_spos, par) + s = self.spos + + if "pos" in par: + inds = _np.intp(self.inds_pos) + elif "neg" in par: + inds = _np.intp(self.inds_neg) + index = _np.argmin(_np.abs(s - single_spos)) + + _to_fu.plot_track_d( + self.accelerator, + dic, + inds, + self.off_energy, + par, + index, + accep, + dp, + fp, + ) + + def plot_normtousd(self, spos): + """Touschek scattering loss density. + + spos = desired s positions (list or numpy.array) + """ + spos_ring = self.spos + dic = _to_fu.norm_cutacp( + self._model_fit, spos, 5000, self._accep, norm=True + ) + + fdensp, fdensn = dic["fdensp"], dic["fdensn"] + deltasp, deltasn = dic["deltasp"], dic["deltasn"] + + _, ax = _mplt.subplots(figsize=(10, 5)) + ax.set_title( + "Probability density analytically calculated", fontsize=20 + ) + ax.grid(True, alpha=0.5, ls="--", color="k") + ax.xaxis.grid(False) + ax.set_xlabel(r"$\delta$ [%]", fontsize=25) + ax.set_ylabel("PDF", fontsize=25) + ax.tick_params(axis="both", labelsize=28) + + for idx, _ in enumerate(spos): + array_fdens = fdensp[idx] + # pega o primeiro item onde ocorre a condição passada + # como argumento para a função numpy.where + index = _np.intp(_np.where(array_fdens <= 1e-2)[0][1]) + + # this block selects the best index + # for plot the density distribution + if not idx: + best_index = index + else: + if best_index < index: # eu não entendi direito porquê disso + best_index = index + else: + pass + + for idx, s in enumerate(spos): + mod_ind = _np.argmin(_np.abs(spos_ring - s)) + + fdenspi = fdensp[idx][:best_index] + fdensni = fdensn[idx][:best_index] + deltaspi = deltasp[idx][:best_index] * 1e2 + deltasni = -deltasn[idx][:best_index] * 1e2 + + color = _mplt.cm.gist_rainbow(idx / len(spos)) + not_desired = [ + "calc_mom_accep", + "mia", + "mib", + "mip", + "mb1", + "mb2", + "mc", + ] + + while self._model_fit[mod_ind].fam_name in not_desired: + mod_ind += 1 + + fam_name = self._model_fit[mod_ind].fam_name + s_stri = _np.round(spos_ring[mod_ind], 2) + stri = f"{fam_name} em {s_stri} m" + + ax.plot(deltaspi, fdenspi, label=stri, color=color) + ax.plot(deltasni, fdensni, color=color) + + ax.legend(loc="best", fontsize=20) + + def plot_histograms(self, l_spos): + """Touschek scattering density from Monte-Carlo simulation. + + l_spos = desired s positions (list or array). + """ + s = self.spos + accep = self.accep + model = self._model_fit + + tup = _to_fu.histgms( + self._model_fit, + l_spos, + self.num_part, + accep, + self.energy_dev_min, + cutaccep=False, + ) + + hp, hn, idx_model = tup + + fig, ax = _mplt.subplots( + ncols=len(l_spos), nrows=1, figsize=(30, 10), sharey=True + ) + fig.suptitle( + "Probability density calculated by Monte-Carlo simulation", + fontsize=20, + ) + + for index, iten in enumerate(idx_model): + color = _mplt.cm.jet(index / len(idx_model)) + ay = ax[index] + if not index: + ay.set_ylabel("PDF", fontsize=25) + + ay.grid(True, alpha=0.5, ls="--", color="k") + ay.xaxis.grid(False) + ay.set_xlabel(r"$\delta$ [%]", fontsize=25) + ay.tick_params(axis="both", labelsize=18) + + stri = f"{model[iten].fam_name:s}, {s[iten]:.2f}" + ay.hist(hp[index], density=True, bins=200, color=color, label=stri) + ay.hist(hn[index], density=True, bins=200, color=color) + _mplt.tight_layout() + ay.legend() + + def _get_track_def(self, l_scattered_pos, scrap, vchamber): + """Tracking for getting the loss profile along the ring. + + l_scattered_pos = scattered positions (list or numpy.array). + scrap = if True, the vchamber's height will be changed. + vchmaber = defines the new vchamber's apperture. + """ + all_track = [] + indices = [] + spos = self.spos + + self._model.radiation_on = True + self._model.cavity_on = True + self._model.vchamber_on = True + + if scrap: + self.set_vchamber_scraper(vchamber) + + for _, scattered_pos in enumerate(l_scattered_pos): + index = _np.argmin(_np.abs(scattered_pos - spos)) + indices.append(index) + dic = _to_fu.track_eletrons_d( + self._deltas, self.nturns, index, self._model + ) + all_track.append(dic) + + hx = self._model_fit[self.scraph_inds[0]].hmax + hn = self._model_fit[self.scraph_inds[0]].hmin + vx = self._model_fit[self.scrapv_inds[0]].vmax + vn = self._model_fit[self.scrapv_inds[0]].vmin + vchamber = [hx, hn, vx, vn] + + self.set_vchamber_scraper(vchamber) + + return all_track, indices + + def _concat_track_lossrate(self, l_scattered_pos, scrap, vchamber): + # não consegui resolvero erro que o ruff indicou nessa função + """Generating the data for the plot.""" + all_track, indices = self._get_track_def( + l_scattered_pos, scrap, vchamber + ) + spos = self.spos + fact = 0.03 + + tous_rate = self.ltime.touschek_data["rate"] # scattering rate + prob, lostp, all_lostp = [], [], [] + + # comentar os dois métodos implementados. + # um deles parece ser mais rápido. + # for j, single_track in enumerate(all_track): + for j, dic in enumerate(all_track): + index = indices[j] + + lostinds = dic["element_lost"] + deltas = dic["energy_deviation"] + + # lostinds = _np.zeros(len(single_track)) + # deltas = _np.zeros(len(single_track)) + # for idx, iten in enumerate(single_track): + # _, ellost, delta = iten + # lostinds[idx] = ellost + # deltas[idx] = delta + # lostinds = _np.intp(lostinds) + + lost_positions = _np.round(spos[lostinds], 2) + + step = int((deltas[0] + deltas[-1]) / fact) + itv_track = _np.linspace(deltas[0], deltas[-1], step) + + data = _pd.DataFrame({"lost_pos_by_tracking": lost_positions}) + # dataframe that storages the tracking data + lost_pos_column = ( + data.groupby("lost_pos_by_tracking").groups + ).keys() + data = _pd.DataFrame({"lost_pos_by_tracking": lost_pos_column}) + # this step agroups the lost_positions + + itv_delta = [] + for current, next_iten in zip(itv_track, itv_track[1:]): + stri = f"{current*1e2:.2f} % < delta < {next_iten*1e2:.2f} %" + data[stri] = _np.zeros(len(list(lost_pos_column))) # this step + # creates new columns in the dataframe and fill with zeros + + itv_delta.append((current, next_iten)) + # Next step must calculate each matrix element from the + # dataframe + + var = list(data.index) + if var == lost_pos_column: + pass + else: + data = data.set_index("lost_pos_by_tracking") + + for idx, lost_pos in enumerate(lost_positions): # essas duas + # estruturas de repetição são responsáveis por calcular + # o percentual dos eletrons que possuem um determinado desvio + # de energia e se perdem em um intervalo de desvio de energia + # específico + delta = deltas[idx] + # lps = [] + for i, interval in enumerate(itv_delta): + if not i: # subtle difference: <= in first iteraction + if interval[0] <= delta <= interval[1]: + stri = f"{interval[0]*1e2:.2f} % < delta < {interval[1]*1e2:.2f} %" + data.loc[lost_pos, stri] += 1 + + else: + if interval[0] < delta <= interval[1]: + stri = f"{interval[0]*1e2:.2f} % < delta < {interval[1]*1e2:.2f} %" + data.loc[lost_pos, stri] += 1 + + data = data / len(deltas) + + npt = int((spos[-1] - spos[0]) / 0.1) + + scalc = _np.linspace(spos[0], spos[-1], npt) + rate_nom_lattice = _np.interp(spos, scalc, tous_rate) + + lost_pos_df = [] + part_prob = [] + # Calculates the loss probablity by tracking + for indx, iten in data.iterrows(): + t_prob = 0 + for idx, m in enumerate(iten): + t_prob += m + if idx == iten.count() - 1: + # appends the probability after sum + part_prob.append(t_prob) + lost_pos_df.append(indx) + + lost_pos_df = _np.array(lost_pos_df) + part_prob = _np.array(part_prob) + + # Calculates the absolute probability for electron loss + # by touschek scattering rate + prob.append(part_prob * rate_nom_lattice[index]) + lostp.append(lost_pos_df) + + # Aqui eu pego as posições em que os elétrons foram perdidos + # e armazeno todas em uma grande lista sem repetição de qualquer + # posição perdida + if not j: + all_lostp = lost_pos_df + else: + boolean_array = _np.isin(lost_pos_df, all_lostp) + for ind, boolean_indc in enumerate(boolean_array): + if not boolean_indc: + all_lostp = _np.append(all_lostp, lost_pos_df[ind]) + + return all_lostp, prob, lostp + + def _f_scat_table(self, l_scattered_pos, scrap, vchamber): + """Generates the heat map of loss positions.""" + dic_res = {} + all_lostp, prob, lostp = self._concat_track_lossrate( + l_scattered_pos, scrap, vchamber + ) + n_scat = _np.round(l_scattered_pos, 2) + + for idx, scattered_pos in enumerate(n_scat): + scat_data = [] + bool_array = _np.isin(all_lostp, lostp[idx]) + + for j, boolean in enumerate(bool_array): + if boolean: + index = _np.intp( + _np.where(lostp[idx] == all_lostp[j])[0][0] + ) + scat_data.append(prob[idx][index]) + else: + scat_data.append(0) + + if not idx: + dic_res["lost_positions"] = all_lostp + stri = f"{scattered_pos}" + dic_res[stri] = scat_data + else: + stri = f"{scattered_pos}" + dic_res[stri] = scat_data + + # df = _pd.DataFrame(dic_res) + + return dic_res + + def get_scat_dict(self, l_scattered_pos, reording_key, scrap, vchamber): + """Get the reordered dictionary.""" + dic = self._f_scat_table(l_scattered_pos, scrap, vchamber) + + zip_tuples = zip(*[dic[chave] for chave in dic]) + new_tuples = sorted( + zip_tuples, key=lambda x: x[list(dic.keys()).index(reording_key)] + ) + zip_ordered = zip(*new_tuples) + + new_dict = { + chave: list(valores) + for chave, valores in zip(dic.keys(), zip_ordered) + } + + return new_dict + + def get_loss_profile(self, dic): + """Integrates the lost positions for all scattering points. + + dic = cointains the lost positions and the scattered points. + """ + spos = self.spos + + df = _pd.DataFrame(dic) + a = df.set_index("lost_positions") + + scat_pos = _np.array(a.columns, dtype=float) + + indices = [] + for iten in scat_pos: + ind = _np.argmin(_np.abs(spos - iten)) + indices.append(ind) + + summed = [] + for idx, _ in a.iterrows(): + sum_row = _sp_int.trapz(a.loc[idx], spos[indices]) + summed.append(sum_row) + + _, ax = _mplt.subplots( + figsize=(13, 7), gridspec_kw={"hspace": 0.2, "wspace": 0.2} + ) + ax.set_title("loss rate integral along the ring", fontsize=16) + + ax.set_xlabel("lost position [m]", fontsize=16) + ax.set_ylabel("loss rate [1/s]", fontsize=16) + ax.tick_params(axis="both", labelsize=16) + + ax.plot(list(a.index), summed, color="navy") + _pa.graphics.draw_lattice( + self._model_fit, offset=-1e-6, height=1e-6, gca=True + ) + + def get_loss_profilel(self, l_dic): + """Comparing distinct loss profiles. + + l_dic = list of dictionaries of loss profiles. + """ + lista = [] + for dic in l_dic: + s = self.spos + + df = _pd.DataFrame(dic) + a = df.set_index("lost_positions") + + scat_pos = _np.array(a.columns, dtype=float) + + indices = [] + for iten in scat_pos: + ind = _np.argmin(_np.abs(s - iten)) + indices.append(ind) + + summed = [] + for idx, _ in a.iterrows(): + sum_row = _sp_int.trapz(a.loc[idx], s[indices]) + summed.append(sum_row) + + lista.append((a.index, summed)) + + return lista + + def plot_scat_dict(self, new_dic): + """Heatmap plot indicating the warm points of loss along the ring. + + new_dic = contains the reordered dict with lost positions and + scattered points. + """ + s = self.spos + + df = _pd.DataFrame(new_dic) + df = df.set_index("lost_positions") + + val = df.values.copy() + idx = val != 0.0 + val[idx] = _np.log10(val[idx]) + val[~idx] = val[idx].min() + + fig, ax = _mplt.subplots(figsize=(10, 10)) + + y = _np.linspace(0, s[-1], df.shape[0] + 1) + x = _np.linspace(0, s[-1], df.shape[1] + 1) + x_mesh, y_mesh = _np.meshgrid(x, y) + + heatmp = ax.pcolor(x_mesh, y_mesh, val, cmap="jet", shading="flat") + + cbar = _mplt.colorbar(heatmp) + cbar.set_label("Loss rate [1/s] in logarithmic scale", rotation=90) + + ax.set_title("Loss profile", fontsize=16) + + ax.set_xlabel("scattered positions [m]", fontsize=16) + ax.set_ylabel("lost positions [m]", fontsize=16) + + fig.tight_layout() + _mplt.gca().set_aspect("equal") + _mplt.show()