diff --git a/app_state.py b/app_state.py index ebce676..ec406fa 100644 --- a/app_state.py +++ b/app_state.py @@ -48,7 +48,7 @@ def platewells_by_sample_id(self) -> Dict[str, List[PlateWell]]: result = {} for plate in self.plate_registry: for well in plate.plate_wells: - sid = well.sample.sample_id + sid = well.get_sample(self).file_sample_identifier if sid: result.setdefault(sid, []).append(well) return result diff --git a/index.py b/index.py index 896f1c9..ceb699e 100644 --- a/index.py +++ b/index.py @@ -32,6 +32,8 @@ EMPTY_BORDER_PX, make_tab, get_tab_id, + build_standard_curve_figure, + build_computed_molarity ) pd.set_option('future.no_silent_downcasting', True) @@ -529,6 +531,54 @@ def plate_mapper(plate_ids, objects): raise Exception("ERROR RAISED INTENTIONALLY - Reached end of manipulate_plates without handling trigger... major underlying issue to resolve.") +@app.callback( + Output({"type": "pcr-curve-graph", "index": MATCH}, "figure"), + Output({"type": "pcr-curve-table", "index": MATCH}, "children"), + Output({"type": "pcr-curve-store", "index": MATCH}, "data"), + Input("app_data_state", "data"), + State({"type": "datatable-plate", "index": MATCH}, "rowData"), + State({"type": "datatable-plate", "index": MATCH}, "id"), + prevent_initial_call=True, +) +def update_standard_curve(app_state, row_data, plate_id): + + if not row_data: + return no_update, no_update, no_update + + else: + app_state = AppState.from_json(app_state) + plate = app_state.plate_by_id.get(plate_id.get("index")) + df = plate.dataset(app_state, tidy=True).replace(['None', 'none', 'NaN', 'nan', ''], np.nan).infer_objects(copy=False) + + # grab only samples which are defined as standards + standards_df = df[df['sample_type'] == 'Standard'].copy() + + # ensure numeric and drop rows with missing/invalid Standard or Cq + standards_df["Standard"] = pd.to_numeric(standards_df["Standard"], errors="coerce") + standards_df = standards_df.loc[standards_df["Standard"].notna() & standards_df["Cq"].notna()] + + pcr_fig, slope, intercept, efficiency, r_squared = build_standard_curve_figure(standards_df) + store_data = { + "slope": slope, + "intercept": intercept, + "efficiency": efficiency, + "r_squared": r_squared + } + + pcr_table_children = [ + html.Tr([html.Th("PCR Efficiency"), html.Th("R-Squared"), html.Th("Slope"), html.Th("Intercept")]), + html.Tr([ + html.Td(f"{efficiency:.1f}%"), + html.Td(f"{r_squared:.3f}"), + html.Td(f"{slope:.3f}"), + html.Td(f"{intercept:.3f}") + ]) + ] + return pcr_fig, pcr_table_children, store_data + + + + @app.callback( [ Output("dynamic-tabs", "children", allow_duplicate=True), @@ -766,10 +816,14 @@ def sync_plate_style(cell_evt, row_data, virtual_row_data, fig): Output({"type": "datatable-plate", "index": MATCH}, "rowData"), Input("update-rows-trigger-div", "children"), # broadcast trigger Input("app_data_state", "data"), # source of truth + Input({"type": "pcr-curve-store", "index": MATCH}, "data"), # for qPCR plates, to show efficiency etc + # Input({"type": "phix-control-dropdown", "index": MATCH}, "value"), # to highlight the selected phix control value State({"type": "datatable-plate", "index": MATCH}, "id"), # gives us the plate_id (via id["index"]) prevent_initial_call=True ) -def update_rows_callback(_, app_data_state_json, table_id): +# def update_rows_callback(_, app_data_state_json, pcr_store, phix_control_value, table_id): +def update_rows_callback(_, app_data_state_json, pcr_store, table_id): + if not app_data_state_json or not table_id: raise PreventUpdate @@ -778,14 +832,23 @@ def update_rows_callback(_, app_data_state_json, table_id): # find the plate by invariant key plate = app_state.plate_by_id.get(plate_id) + + slope, intercept = pcr_store.get("slope"), pcr_store.get("intercept") if pcr_store else (None, None) + + phix_correction_factor = 1.0 + + # if phix_control_value is None: + # phix_correction_factor = 1.0 + # else: + # phix_correction_factor = 10 / float(phix_control_value) + if not plate: return None - print("Processing Plate ID:", plate_id) - is_qpcr = (plate.plate_type == "qPCR") df = plate.dataset(app_state, tidy=False).replace(['None', 'none', 'NaN', 'nan', ''], np.nan).infer_objects(copy=False) + df = build_computed_molarity(app_state, plate, slope, intercept, phix_correction_factor) # Keep columns tidy, but do not drop keys AG Grid needs (e.g., well_position_id if you use getRowId) if is_qpcr: @@ -842,7 +905,7 @@ def update_plate_buttons(app_data_state_json, selected_tab): colum_value = row.get(region_molarity_col) plate_type_display = "TS" label = f"{plate_type_display} - PhiX - {well} - {colum_value}" - value = f"{plate_type_display} - {well} - {colum_value}" + value = str(colum_value) dropdown_options.append({"label": label, "value": value}) return html.Div( diff --git a/models.py b/models.py index 52c690b..e89a72d 100644 --- a/models.py +++ b/models.py @@ -45,9 +45,11 @@ class Sample(BaseModel): container: Optional[str] = "" file_sample_identifier: Optional[str] = "" # File-specific column for identifying the sample sample_type: Optional[SampleType] = SampleType.LIBRARY + computed_molarity: Optional[float | str] = " - " # Computed molarity for qPCR samples + computed_standard_error: Optional[float | str] = " - " # Standard error for qPCR samples def get_plate_wells(self, app_state): - return app_state.platewells_by_sample_id.get(self.sample_id, []) + return app_state.platewells_by_sample_id.get(self.file_sample_identifier, []) ##################################################### @@ -157,6 +159,10 @@ def get_well_by_position(self, position: str) -> Optional["PlateWell"]: def to_json(self) -> str: return self.model_dump_json() + + def get_samples(self, app_state) -> List[Sample]: + sample_ids = {well.sample_identifier for well in self.plate_wells} + return [app_state.sample_by_id[sid] for sid in sample_ids if sid in app_state.sample_by_id] @classmethod def from_json(cls, json_str: str) -> "Plate": @@ -260,7 +266,7 @@ def from_file(cls, input_file: "File", app_state) -> "Plate": app_state.sample_registry.append(sample) - print("created sample", sample) + # print("created sample", sample) for well_position, well_df in df_slice.groupby(input_file.well_position_id): well_kwargs = {} if input_file.plate_specific_columns: diff --git a/utils/plate_render_utils.py b/utils/plate_render_utils.py index a7170a3..aad9fe5 100644 --- a/utils/plate_render_utils.py +++ b/utils/plate_render_utils.py @@ -10,6 +10,14 @@ pd.set_option('future.no_silent_downcasting', True) +def is_numeric(val): + try: + float(val) + return True + except (ValueError, TypeError): + return False + + SAMPLE_TYPE_COLORS = { "Negative Control": "#ef5350", # red "Positive Control": "#2979ff", # blue @@ -58,6 +66,7 @@ EMPTY_BORDER = "#d0d0d0" EMPTY_BORDER_PX = 1 +QPCR_SAMPLE_DILUTION_COEFFICIENT = 10000 def make_tab(plate, app_state): @@ -77,6 +86,152 @@ def get_tab_id(tab_like): return tid if tid is not None else (tab_like or {}).get("props", {}).get("tab_id") +def build_computed_molarity(app_state, plate, slope, intercept, phix_correction=1.0): + + """ + For qPCR plates, compute molarity for each sample based on Cq values and the standard curve. + Updates each Sample's computed_molarity attribute in place. + """ + + if plate.plate_type == "qPCR": + if slope is None or intercept is None: + return plate.dataset(app_state, tidy=False).replace(['None', 'none', 'NaN', 'nan', ''], np.nan).infer_objects(copy=False) + + # Compute molarity for all qPCR samples based on Cq + for sample in plate.get_samples(app_state): + + if sample.sample_type == "Standard": + continue + + sample_specific_cq_values = [] + for plate_well in sample.get_plate_wells(app_state): + for measurement in plate_well.measurements: + if measurement.measurement_type.name == "Cq" and measurement.value is not None: + # Check if the value can be converted to float + if is_numeric(measurement.value): + sample_specific_cq_values.append(measurement.value) + + if sample_specific_cq_values and slope not in (0, None) and intercept not in (0, None): + + avg_cq = np.mean(sample_specific_cq_values) + log_conc = (avg_cq - intercept) / slope + + computed_molarity = 10 ** log_conc if not np.isnan(log_conc) else None + computed_standard_error = np.std(sample_specific_cq_values) / np.sqrt(len(sample_specific_cq_values)) + + qpcr_adjusted_molarity = computed_molarity * QPCR_SAMPLE_DILUTION_COEFFICIENT + adjusted_for_phix = qpcr_adjusted_molarity * phix_correction + + sample.computed_molarity = str(round(adjusted_for_phix, 3)) + " nM" + sample.computed_standard_error = str(round(computed_standard_error, 3)) if not np.isnan(computed_standard_error) else " - " + # print(f"Computed molarity for sample '{sample.file_sample_identifier}': {computed_molarity} (Avg Cq: {avg_cq})") + + else: + sample.computed_molarity = " - " + sample.computed_standard_error = " - " + # print(f"No valid Cq values for sample '{sample.file_sample_identifier}' or invalid slope/intercept.") + else: + for sample in plate.get_samples(app_state): + well_pos = sample.get_plate_wells(app_state) + if not well_pos: + sample.computed_molarity = " - " + else: + unit_multiplier_map = {"pmol/l": 1000, "nmol/l": 1} + sample_specific_molarity_values = [] + for plate_well in sample.get_plate_wells(app_state): + for measurement in plate_well.measurements: + if measurement.measurement_type.id == "region_molarity" and measurement.value is not None: + if is_numeric(measurement.value): + sample_specific_molarity_values.append(measurement.value / unit_multiplier_map.get(measurement.unit.id, 1)) + if sample_specific_molarity_values: + avg_molarity = np.mean(sample_specific_molarity_values) + sample.computed_molarity = str(round(avg_molarity, 3)) + " nM" + else: + sample.computed_molarity = " - " + + sample.computed_standard_error = " - " + + df = plate.dataset(app_state, tidy=False).replace(['None', 'none', 'NaN', 'nan', ''], np.nan).infer_objects(copy=False) + + return df + + +def build_standard_curve_figure(standards_df): + + def make_error_figure(msg="Not enough standard points to build a PCR curve."): + fig = go.Figure() + fig.add_annotation( + text=msg, + xref="paper", yref="paper", + x=0.5, y=0.5, + showarrow=False, + font=dict(size=16, color="red"), + align="center" + ) + fig.update_layout( + xaxis=dict(visible=False), + yaxis=dict(visible=False), + plot_bgcolor="white" + ) + return fig + + if standards_df.empty or len(standards_df) < 6: + return make_error_figure(), *[0]*4 + + # --- Extract replicate-level data --- + # Use "Cq" for Ct values (not "Mean") + ct_values = standards_df["Cq"].astype(float).values + concentrations = standards_df["Standard"].astype(float).values + + # Transform concentration to -log10 + x_vals = np.log10(concentrations) + y_vals = ct_values + + # --- Regression (manual OLS) --- + x_mean, y_mean = np.mean(x_vals), np.mean(y_vals) + slope = np.sum((x_vals - x_mean) * (y_vals - y_mean)) / np.sum((x_vals - x_mean) ** 2) + intercept = y_mean - slope * x_mean + y_pred = slope * x_vals + intercept + + # Fit quality + ss_res = np.sum((y_vals - y_pred) ** 2) + ss_tot = np.sum((y_vals - y_mean) ** 2) + r_squared = 1 - ss_res / ss_tot if ss_tot != 0 else np.nan + + # PCR efficiency (formula: efficiency = (10^(-1/slope) - 1) * 100) + efficiency = (10 ** (-1 / slope) - 1) * 100 if slope != 0 else np.nan + + # --- Regression line for plotting --- + x_fit = np.linspace(min(x_vals) - 0.2, max(x_vals) + 0.2, 100) + y_fit = slope * x_fit + intercept + + # --- Build figure --- + pcr_fig = go.Figure() + pcr_fig.add_trace(go.Scatter( + x=x_vals, + y=y_vals, + mode='markers', + marker=dict(color='blue'), + name='Standards' + )) + pcr_fig.add_trace(go.Scatter( + x=x_fit, + y=y_fit, + mode='lines', + line=dict(color='red', dash='dot'), + name='Fit' + )) + pcr_fig.update_layout( + xaxis_title='log10(Concentration)', + yaxis_title='Ct Value', + template='plotly_white', + showlegend=False, + margin=dict(l=30, r=10, t=30, b=30), + ) + + return pcr_fig, slope, intercept, efficiency, r_squared + + def _build_plate_figure_for_plate(plate, app_state): """ Returns a Plotly Figure for a single plate, colored by SAMPLE_TYPE_COLORS. @@ -213,9 +368,54 @@ def build_tab_body(plate, app_state): is_qpcr = (plate.plate_type == "qPCR") # --- DataFrame cleanup (same logic you had) --- - df = (plate.dataset(app_state, tidy=False) - .replace(['None', 'none', 'NaN', 'nan', ''], np.nan) - .infer_objects(copy=False)) + df = ( + plate.dataset(app_state, tidy=False) + .replace(['None', 'none', 'NaN', 'nan', ''], np.nan) + .infer_objects(copy=False) + ) + + # Render first PCR curve plot, and assign computed sample molarities to df + if is_qpcr: + + # grab only samples which are defined as standards + standards_df = df[df['sample_type'] == 'Standard'].copy() + + # ensure numeric and drop rows with missing/invalid Standard or Cq + standards_df["Standard"] = pd.to_numeric(standards_df["Standard"], errors="coerce") + standards_df = standards_df.loc[standards_df["Standard"].notna() & standards_df["Cq"].notna()] + + pcr_fig, slope, intercept, efficiency, r_squared = build_standard_curve_figure(standards_df) + + pcr_curve_div = dbc.Container([ + html.Table( + id={"type": "pcr-curve-table", "index": pid}, + children=[ + html.Tr([html.Th("PCR Efficiency"), html.Th("R-Squared"), html.Th("Slope"), html.Th("Intercept")]), + html.Tr([ + html.Td(f"{efficiency:.1f}%"), + html.Td(f"{r_squared:.3f}"), + html.Td(f"{slope:.3f}"), + html.Td(f"{intercept:.3f}") + ]) + ], style={"marginBottom": "20px", "border": "1px solid black", "borderCollapse": "collapse", + "width": "100%", "textAlign": "center"} + ), + dcc.Graph( + figure=pcr_fig, + id={"type": "pcr-curve-graph", "index": pid}, + style={ + "maxWidth": "100%", + "height": f"{GRAPH_H-100}px" + } + ), + ]) + + + else: + pcr_fig, slope, intercept, efficiency, r_squared = html.Div(), None, None, None, None + pcr_curve_div = html.Div() # empty div for non-qPCR plates + + df = build_computed_molarity(app_state, plate, slope, intercept) if is_qpcr: df = df.dropna(axis=1, how="all") @@ -441,9 +641,45 @@ def _is_num(s): }, style={"width": f"{GRAPH_W}px", "height": f"{GRAPH_H}px"}, ) - plate_wrap = html.Div([html.Div(plate_graph, style={"width": f"{GRAPH_W}px"})], - id={"type": "tab-plate-wrap", "index": pid}, - style={"display": "none"}) + + analysis_data = dcc.Store( + id={"type": "pcr-curve-store", "index": pid}, + data={ + "slope": slope, + "intercept": intercept, + "efficiency": efficiency, + "r_squared": r_squared + } + ) + + # --- side-by-side layout --- + plate_wrap = html.Div( + [ + html.Div( + [ + plate_graph, + analysis_data + ], + style={ + "width": f"{GRAPH_W}px", + "height": f"{GRAPH_H}px", + "display": "inline-block", + "verticalAlign": "top", + "marginRight": "20px", + }, + ), + html.Div( + pcr_curve_div, + style={ + "width": "420px", + "display": "inline-block", + "verticalAlign": "top", + }, + ), + ], + id={"type": "tab-plate-wrap", "index": pid}, + style={"display": "none", "whiteSpace": "nowrap"}, + ) # --- Modals --- delete_plate_modal = html.Div([