diff --git a/pyflowdiagnostics/flow_diagnostics.py b/pyflowdiagnostics/flow_diagnostics.py index a414445..d84f7ea 100644 --- a/pyflowdiagnostics/flow_diagnostics.py +++ b/pyflowdiagnostics/flow_diagnostics.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import os import logging import numpy as np @@ -72,6 +73,7 @@ def __init__(self, file_path: str) -> None: self.wells = {} self.injectors = [] self.producers = [] + self._json_results = {} def execute(self, time_step_id: int) -> None: @@ -84,11 +86,14 @@ def execute(self, time_step_id: int) -> None: logging.info(f"Working on time step {time_step_id}.") self.time_step_id = time_step_id + self._json_results = {} self._read_simulator_dynamic_output() if self._compute_TOF_and_tracer(): self._compute_flow_allocations() + self._compute_and_write_arrival_time_at_producers() self._compute_other_diagnostics() + self._write_combined_json() else: logging.error("Failed to compute TOF and tracer concentrations.") @@ -634,6 +639,40 @@ def _compute_flow_allocations(self) -> None: self._write_allocation_factors(FlowAllocI, FlowAllocP) + def _compute_and_write_arrival_time_at_producers(self) -> None: + """Computes arrival time of fluids at each producer well and writes to JSON. + + For each producer, finds the minimum time-of-flight from injectors (tofI) + across its OPEN completion cells. This represents the earliest arrival + of injected fluid at that producer. + """ + if not self.producers or self.tofI is None: + logging.debug("Skipping arrival time computation: no producers or tofI.") + return + + arrival_times = [] + for well in self.producers: + if well.type != "PRD": + continue + tof_values = [] + for cmpl in well.completions: + if cmpl.status == "OPEN": + tof_val = self.tofI[cmpl.IJK - 1] + if not np.isnan(tof_val): + tof_values.append(float(tof_val)) + min_tof = float(np.min(tof_values)) if tof_values else None + arrival_times.append({"producer": well.name, "arrival_time": min_tof}) + + self._json_results["arrival_time"] = { + "description": ( + "Earliest arrival time of injected fluid at each producer well (days). " + "Computed as the minimum time-of-flight from injectors across the producer's completion cells. " + "Lower values indicate faster breakthrough; null if no injector fluid reaches the producer." + ), + "producer_arrival_times": arrival_times, + } + + def _compute_flow_allocation(self, wells, C_matrix, target_names): """Helper function to compute flow allocation matrices. @@ -656,17 +695,35 @@ def _compute_flow_allocation(self, wells, C_matrix, target_names): def _write_allocation_factors(self, df_inj, df_prd) -> None: - """Saves flow allocation factors to an Excel file. + """Saves flow allocation factors to Excel and JSON files. Args: - df_inj (pd.DataFrame): DataFrame of injector flow allocation. - df_prd (pd.DataFrame): DataFrame of producer flow allocation. + df_inj (pd.DataFrame): DataFrame of injector flow allocation (rows=injectors, cols=producers). + df_prd (pd.DataFrame): DataFrame of producer flow allocation (rows=producers, cols=injectors). """ - file_path = os.path.join(self.output_dir, "Allocation_Factor.xlsx") - with pd.ExcelWriter(file_path, engine="xlsxwriter") as writer: + file_path_xlsx = os.path.join(self.output_dir, f"Allocation_Factor_{self.time_step_id}.xlsx") + with pd.ExcelWriter(file_path_xlsx, engine="xlsxwriter") as writer: df_inj.to_excel(writer, sheet_name="Injector Flow Allocation") df_prd.to_excel(writer, sheet_name="Producer Flow Allocation") - logging.info(f"Flow allocation factors saved to {file_path}") + logging.info(f"Flow allocation factors saved to {file_path_xlsx}") + + injector_allocation = [ + {"injector": inj, "producer": prd, "allocation": float(df_inj.loc[inj, prd])} + for inj in df_inj.index for prd in df_inj.columns + ] + producer_allocation = [ + {"producer": prd, "injector": inj, "allocation": float(df_prd.loc[prd, inj])} + for prd in df_prd.index for inj in df_prd.columns + ] + self._json_results["allocation"] = { + "description": ( + "Flow allocation factors: fraction of fluid from each injector that reaches each producer " + "(injector_allocation) and fraction of each producer's production that originates from each " + "injector (producer_allocation). Values range 0-1; higher values indicate stronger connectivity." + ), + "injector_allocation": injector_allocation, + "producer_allocation": producer_allocation, + } def _write_grid_flow_diagnostics(self) -> bool: @@ -755,6 +812,20 @@ def _write_other_flow_diagnostics(self) -> None: df_tracer_matrix = pd.DataFrame({'Time PVI': self.tDM[:-1], 'Tracer Concentration': self.CM}) df_tracer_matrix.to_csv(os.path.join(self.output_dir, f"Tracer_Matrix_{self.time_step_id}.csv"), index=False) + lorenz_matrix = self.compute_Lorenz(self.FM, self.PhiM) + lorenz_fracture = self.compute_Lorenz(self.FF, self.PhiF) + self._json_results["lorenz"] = { + "description": ( + "Lorenz coefficient: measure of flow heterogeneity (0=uniform, 1=highly heterogeneous). " + "Derived from F-Phi (flow-storage capacity) curves. global: reservoir-wide; well_pairs: " + "per injector-producer streamtube. Higher values indicate more heterogeneous flow." + ), + "global": { + "matrix": float(lorenz_matrix), + "fracture": float(lorenz_fracture), + }, + } + else: df_FPhi = pd.DataFrame({'Storage Capacity': self.Phi, 'Flow Capacity': self.F}) @@ -763,6 +834,91 @@ def _write_other_flow_diagnostics(self) -> None: df_tracer = pd.DataFrame({'Time PVI': self.tD[:-1], 'Tracer Concentration': self.C}) df_tracer.to_csv(os.path.join(self.output_dir, f"Tracer_{self.time_step_id}.csv"), index=False) + lorenz = self.compute_Lorenz(self.F, self.Phi) + self._json_results["lorenz"] = { + "description": ( + "Lorenz coefficient: measure of flow heterogeneity (0=uniform, 1=highly heterogeneous). " + "Derived from F-Phi (flow-storage capacity) curves. global: reservoir-wide; well_pairs: " + "per injector-producer streamtube. Higher values indicate more heterogeneous flow." + ), + "global": float(lorenz), + } + + self._compute_well_pair_lorenz() + + def _compute_well_pair_lorenz(self) -> None: + """Computes Lorenz coefficient per injector-producer well pair and appends to JSON results. + + For each well pair (cells where partI and partP identify the dominant injector and producer), + builds F-Phi from pore volume and TOF, then computes the Lorenz coefficient. + """ + MIN_CELLS = 2 + if self.partI is None or self.partP is None or not self.injectors or not self.producers: + return + + tof_inj = np.where(np.isnan(self.tofI), np.nanmax(self.tofI, initial=0), self.tofI) + tof_prod = np.where(np.isnan(self.tofP), np.nanmax(self.tofP, initial=0), self.tofP) + tof_valid = np.column_stack([tof_inj, tof_prod]) + + valid_mask = ( + self.grid.actnum_bool + & ~np.isnan(self.partI) + & ~np.isnan(self.partP) + & (self.partI >= 1) + & (self.partI <= len(self.injectors)) + & (self.partP >= 1) + & (self.partP <= len(self.producers)) + ) + well_pairs = np.column_stack([self.partI[valid_mask], self.partP[valid_mask]]) + unique_pairs = np.unique(well_pairs.astype(int), axis=0) + + well_pair_lorenz = [] + for part_i, part_p in unique_pairs: + pair_mask = valid_mask & (self.partI == part_i) & (self.partP == part_p) + n_cells = np.sum(pair_mask) + if n_cells < MIN_CELLS: + continue + porv_sub = self.grid.porv[pair_mask] + tof_sub = tof_valid[pair_mask] + try: + F, Phi = self.compute_F_and_Phi(porv_sub, tof_sub) + lc = float(self.compute_Lorenz(F, Phi)) + except Exception: + continue + inj_name = self.injectors[int(part_i) - 1].name + prd_name = self.producers[int(part_p) - 1].name + well_pair_lorenz.append({ + "injector": inj_name, + "producer": prd_name, + "lorenz_coefficient": lc, + "n_cells": int(n_cells), + }) + + if well_pair_lorenz and "lorenz" in self._json_results: + self._json_results["lorenz"]["well_pairs"] = well_pair_lorenz + + def _write_combined_json(self) -> None: + """Writes all flow diagnostics metrics to a single JSON file with descriptions.""" + if not self._json_results: + return + time_block = {"time_step_id": self.time_step_id} + if isinstance(self.binary_reader, EclReader): + try: + time_info = self.binary_reader.get_report_step_date(self.time_step_id) + if time_info is not None: + time_block["date"] = time_info["date"] + time_block["days"] = time_info["days"] + time_block["description"] = "Report step date (YYYY-MM-DD) and cumulative days since simulation start." + except Exception: + pass + combined = { + "time": time_block, + "metrics": self._json_results, + } + file_path = os.path.join(self.output_dir, f"FlowDiagnostics_{self.time_step_id}.json") + with open(file_path, "w") as f: + json.dump(combined, f, indent=2) + logging.info(f"Flow diagnostics metrics saved to {file_path}") # ---- Static Methods --------------------------------------------------------------------------------------------- diff --git a/pyflowdiagnostics/readers/ecl_bin_reader.py b/pyflowdiagnostics/readers/ecl_bin_reader.py index 8752876..221460d 100644 --- a/pyflowdiagnostics/readers/ecl_bin_reader.py +++ b/pyflowdiagnostics/readers/ecl_bin_reader.py @@ -115,12 +115,15 @@ def read_rst(self, keys: list = None, tstep_id: int = None) -> dict: data = self.read_unrst(self.unrst_file_path, keys) self._unrst_data[keys_combined] = data + # For UNRST: index 0 is often initial step (no wells); report steps start at 1. + # So tstep_id=1 (first report step) maps to index 1. + idx = tstep_id d_out = {} for key in keys: - if tstep_id >= len(data.get(key, [])): + if idx < 0 or idx >= len(data.get(key, [])): d_out[key] = np.array([]) else: - d_out[key] = data[key][tstep_id] + d_out[key] = data[key][idx] return d_out return self.read_rst_step(keys, tstep_id) @@ -132,96 +135,225 @@ def read_rst_step(self, keys: list = None, tstep_id: int = None) -> dict: raise FileNotFoundError(f"Restart file not found: {file_path}") return self._read_bin(file_path, keys) + def get_report_step_date(self, tstep_id: int) -> dict | None: + """Returns date and cumulative days for a report step from INTEHEAD. + + Args: + tstep_id (int): Report step ID (1-based). + + Returns: + dict | None: {"date": "YYYY-MM-DD", "days": float} or None if unavailable. + days is cumulative days since simulation start (initial step). + """ + def _intehead_to_date(intehead): + if intehead is None or len(intehead) < 67: + return None + IDAY, IMON, IYEAR = int(intehead[64]), int(intehead[65]), int(intehead[66]) + return datetime(IYEAR, IMON, IDAY) + + try: + results = self.read_rst(keys=["INTEHEAD"], tstep_id=tstep_id) + except (ValueError, FileNotFoundError): + return None + intehead = results.get("INTEHEAD") + if intehead is None or not isinstance(intehead, np.ndarray) or len(intehead) < 67: + return None + date = _intehead_to_date(intehead) + if date is None: + return None + + if self.unrst_file_path is not None: + data = self.read_unrst(self.unrst_file_path, keys=["INTEHEAD"]) + inteheads = data.get("INTEHEAD", []) + ref_intehead = inteheads[0] if inteheads else None + ref_date = _intehead_to_date(ref_intehead) if ref_intehead is not None else None + days = round((date - ref_date).total_seconds() / 86400.0, 2) if ref_date else None + else: + ref_date = None + x0000_path = f"{self.input_file_path_base}.X0000" + if os.path.exists(x0000_path): + ref_results = self._read_bin(x0000_path, ["INTEHEAD"]) + ref_intehead = ref_results.get("INTEHEAD") + ref_date = _intehead_to_date(ref_intehead) if ref_intehead is not None else None + if ref_date is None and tstep_id > 1: + ref_results = self.read_rst_step(keys=["INTEHEAD"], tstep_id=1) + ref_intehead = ref_results.get("INTEHEAD") + ref_date = _intehead_to_date(ref_intehead) if ref_intehead is not None else None + days = round((date - ref_date).total_seconds() / 86400.0, 2) if ref_date else None + + return {"date": date.strftime("%Y-%m-%d"), "days": days} + def read_unrst(self, file_path: str, keys: list = None, tstep_id: int = None) -> dict: - def read_one_timestep(fid, pos, endian, keys): - """Reads a full timestep starting at position `pos`.""" - fid.seek(pos) - result_tmp = {} + """Read UNRST using pattern matching (robust for OPM Flow and Eclipse formats). - while True: - data, _, key = self._load_vector(fid, endian) - key = key.strip() + Returns {key: [val_t0, val_t1, ...]} for each requested key, or single timestep + when tstep_id is specified (used internally by read_unrst with tstep_id=None). + """ + if keys is None: + keys = [] - if key == "INTEHEAD": - IDAY, IMON, IYEAR = data[64], data[65], data[66] - result_tmp["DATE"] = datetime(IYEAR, IMON, IDAY) - result_tmp["INTEHEAD"] = data # Keep as np.ndarray - - elif isinstance(data, np.ndarray): - result_tmp[key] = data - elif isinstance(data, (bytes, str)): - result_tmp[key] = data.decode(errors="ignore").strip() if isinstance(data, bytes) else data.strip() - else: - result_tmp[key] = np.array([data]) + all_results = {} + file_size = os.path.getsize(file_path) - if fid.tell() >= os.fstat(fid.fileno()).st_size: - break + with open(file_path, "rb") as fid: + file_data = fid.read() - peek_pos = fid.tell() - try: - _, _, next_key = self._load_vector(fid, endian) - if next_key.strip() == "SEQNUM": - break - except Exception: - break - fid.seek(peek_pos) + first_int = struct.unpack(" 0 and first_int < 1000) else ">" - if keys is not None: - result_tmp = {k: v for k, v in result_tmp.items() if k in keys or k == "DATE"} + # Find all INTEHEAD positions (timestep boundaries) + intehead_positions = [] + pos = 0 + while True: + pos = file_data.find(b"INTEHEAD", pos) + if pos == -1: + break + intehead_positions.append(pos - 4) + pos += 8 - return result_tmp + # Find positions for each requested key + key_positions = {key: [] for key in keys} if keys else {} + for key in keys: + pos = 0 + while True: + pos = file_data.find(key.encode("ascii"), pos) + if pos == -1: + break + if pos >= 4: + try: + header_size = struct.unpack(endian + "i", file_data[pos - 4:pos])[0] + if 8 <= header_size <= 1000: + key_positions[key].append(pos - 4) + except (struct.error, IndexError): + pass + pos += len(key) + + with open(file_path, "rb") as fid: + dates = [] + for intehead_pos in intehead_positions: + fid.seek(intehead_pos) + try: + header_size = struct.unpack(endian + "i", fid.read(4))[0] + key = fid.read(8).decode("ascii", errors="ignore").strip() + data_count = struct.unpack(endian + "i", fid.read(4))[0] + data_type = fid.read(4).decode("ascii", errors="ignore").strip() + end_size = struct.unpack(endian + "i", fid.read(4))[0] + if key == "INTEHEAD" and header_size == end_size: + raw_data = bytearray() + read_count = 0 + while read_count < data_count: + chunk_size = struct.unpack(endian + "i", fid.read(4))[0] + chunk_data = fid.read(chunk_size) + chunk_end = struct.unpack(endian + "i", fid.read(4))[0] + if chunk_size != chunk_end: + break + raw_data.extend(chunk_data) + read_count += chunk_size // 4 + if len(raw_data) >= data_count * 4: + data = np.frombuffer(raw_data, dtype=endian + "i4") + if len(data) > 66: + IDAY, IMON, IYEAR = data[64], data[65], data[66] + dates.append(datetime(IYEAR, IMON, IDAY)) + else: + dates.append(None) + else: + dates.append(None) + else: + dates.append(None) + except Exception: + dates.append(None) - result_dict = defaultdict(list) - time_steps = [] + for timestep_idx, (intehead_pos, date) in enumerate(zip(intehead_positions, dates)): + result = {} + if date is not None: + result["DATE"] = date - with open(file_path, 'rb') as fid: - endian = self._detect_endian(fid) - file_size = os.fstat(fid.fileno()).st_size + with open(file_path, "rb") as fid: + try: + fid.seek(intehead_pos) + header_size = struct.unpack(endian + "i", fid.read(4))[0] + key = fid.read(8).decode("ascii", errors="ignore").strip() + data_count = struct.unpack(endian + "i", fid.read(4))[0] + data_type = fid.read(4).decode("ascii", errors="ignore").strip() + end_size = struct.unpack(endian + "i", fid.read(4))[0] + if key == "INTEHEAD" and header_size == end_size: + raw_data = bytearray() + read_count = 0 + while read_count < data_count: + chunk_size = struct.unpack(endian + "i", fid.read(4))[0] + chunk_data = fid.read(chunk_size) + chunk_end = struct.unpack(endian + "i", fid.read(4))[0] + if chunk_size != chunk_end: + break + raw_data.extend(chunk_data) + read_count += chunk_size // 4 + if len(raw_data) >= data_count * 4: + result["INTEHEAD"] = np.frombuffer(raw_data, dtype=endian + "i4") + except Exception: + pass - while fid.tell() < file_size: - pos = fid.tell() - data, _, key = self._load_vector(fid, endian) - if key.strip() == "SEQNUM": - time_steps.append((data[0], pos)) + next_intehead = intehead_positions[timestep_idx + 1] if timestep_idx + 1 < len(intehead_positions) else file_size - # Initialize result_dict with empty lists for all requested keys - if keys: + for key in keys: + if key == "INTEHEAD": + continue + key_pos = None + if key in key_positions: + for p in key_positions[key]: + if intehead_pos < p < next_intehead: + key_pos = p + break + if key_pos is None: + result[key] = np.array([]) + continue + with open(file_path, "rb") as fid: + fid.seek(key_pos) + try: + header_size = struct.unpack(endian + "i", fid.read(4))[0] + key_name = fid.read(8).decode("ascii", errors="ignore").strip() + data_count = struct.unpack(endian + "i", fid.read(4))[0] + data_type = fid.read(4).decode("ascii", errors="ignore").strip() + end_size = struct.unpack(endian + "i", fid.read(4))[0] + if key_name != key or header_size != end_size: + result[key] = np.array([]) + continue + bytes_per_element = 8 if data_type == "CHAR" else 4 + total_bytes = data_count * bytes_per_element + raw_data = bytearray() + bytes_read = 0 + while bytes_read < total_bytes: + chunk_size = struct.unpack(endian + "i", fid.read(4))[0] + chunk_data = fid.read(chunk_size) + chunk_end = struct.unpack(endian + "i", fid.read(4))[0] + if chunk_size != chunk_end: + break + raw_data.extend(chunk_data) + bytes_read += chunk_size + if len(raw_data) >= total_bytes: + if data_type == "CHAR": + char_data = np.frombuffer(raw_data, dtype="S1").reshape((-1, 8)) + result[key] = np.char.decode(char_data, encoding="ascii").astype(str) + else: + dtype_map = {"REAL": "f4", "DOUB": "f8", "INTE": "i4", "LOGI": "i4"} + dtype = dtype_map.get(data_type, "f4") + result[key] = np.frombuffer(raw_data, dtype=endian + dtype) + else: + result[key] = np.array([]) + except Exception: + result[key] = np.array([]) + + all_results[timestep_idx] = result + + # Convert to {key: [val_t0, val_t1, ...]} for read_rst + result_dict = defaultdict(list) + for idx in sorted(all_results.keys()): + r = all_results[idx] for k in keys: + result_dict[k].append(r.get(k, np.array([]))) + for k in keys: + if k not in result_dict: result_dict[k] = [] - - if tstep_id is not None: - match = [t for t in time_steps if t[0] == tstep_id] - if not match: - raise ValueError(f"Timestep {tstep_id} not found in {file_path}") - with open(file_path, 'rb') as fid: - return read_one_timestep(fid, match[0][1], endian, keys) - - cumulative_days = 0 - previous_date = None - - for step, pos in time_steps: - with open(file_path, 'rb') as fid_inner: - result = read_one_timestep(fid_inner, pos, endian, keys) - - if "DATE" in result: - current_date = result["DATE"] - if previous_date: - cumulative_days += (current_date - previous_date).days - result_dict["TIME"].append(cumulative_days) - result_dict["DATE"].append((current_date.year, current_date.month, current_date.day, 0, 0)) - previous_date = current_date - - for k in keys or result.keys(): - if k in result: - result_dict[k].append(result[k]) - - # Ensure all explicitly requested keys that were never found remain empty - if keys: - for k in keys: - if k not in result_dict: - result_dict[k] = [] - return dict(result_dict)