Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 162 additions & 6 deletions pyflowdiagnostics/flow_diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import logging
import numpy as np
Expand Down Expand Up @@ -72,6 +73,7 @@ def __init__(self, file_path: str) -> None:
self.wells = {}
self.injectors = []
self.producers = []
self._json_results = {}


def execute(self, time_step_id: int) -> None:
Expand All @@ -84,11 +86,14 @@ def execute(self, time_step_id: int) -> None:
logging.info(f"Working on time step {time_step_id}.")

self.time_step_id = time_step_id
self._json_results = {}
self._read_simulator_dynamic_output()

if self._compute_TOF_and_tracer():
self._compute_flow_allocations()
self._compute_and_write_arrival_time_at_producers()
self._compute_other_diagnostics()
self._write_combined_json()
else:
logging.error("Failed to compute TOF and tracer concentrations.")

Expand Down Expand Up @@ -634,6 +639,40 @@ def _compute_flow_allocations(self) -> None:
self._write_allocation_factors(FlowAllocI, FlowAllocP)


def _compute_and_write_arrival_time_at_producers(self) -> None:
"""Computes arrival time of fluids at each producer well and writes to JSON.

For each producer, finds the minimum time-of-flight from injectors (tofI)
across its OPEN completion cells. This represents the earliest arrival
of injected fluid at that producer.
"""
if not self.producers or self.tofI is None:
logging.debug("Skipping arrival time computation: no producers or tofI.")
return

arrival_times = []
for well in self.producers:
if well.type != "PRD":
continue
tof_values = []
for cmpl in well.completions:
if cmpl.status == "OPEN":
tof_val = self.tofI[cmpl.IJK - 1]
if not np.isnan(tof_val):
tof_values.append(float(tof_val))
min_tof = float(np.min(tof_values)) if tof_values else None
arrival_times.append({"producer": well.name, "arrival_time": min_tof})

self._json_results["arrival_time"] = {
"description": (
"Earliest arrival time of injected fluid at each producer well (days). "
"Computed as the minimum time-of-flight from injectors across the producer's completion cells. "
"Lower values indicate faster breakthrough; null if no injector fluid reaches the producer."
),
"producer_arrival_times": arrival_times,
}


def _compute_flow_allocation(self, wells, C_matrix, target_names):
"""Helper function to compute flow allocation matrices.

Expand All @@ -656,17 +695,35 @@ def _compute_flow_allocation(self, wells, C_matrix, target_names):


def _write_allocation_factors(self, df_inj, df_prd) -> None:
"""Saves flow allocation factors to an Excel file.
"""Saves flow allocation factors to Excel and JSON files.

Args:
df_inj (pd.DataFrame): DataFrame of injector flow allocation.
df_prd (pd.DataFrame): DataFrame of producer flow allocation.
df_inj (pd.DataFrame): DataFrame of injector flow allocation (rows=injectors, cols=producers).
df_prd (pd.DataFrame): DataFrame of producer flow allocation (rows=producers, cols=injectors).
"""
file_path = os.path.join(self.output_dir, "Allocation_Factor.xlsx")
with pd.ExcelWriter(file_path, engine="xlsxwriter") as writer:
file_path_xlsx = os.path.join(self.output_dir, f"Allocation_Factor_{self.time_step_id}.xlsx")
with pd.ExcelWriter(file_path_xlsx, engine="xlsxwriter") as writer:
df_inj.to_excel(writer, sheet_name="Injector Flow Allocation")
df_prd.to_excel(writer, sheet_name="Producer Flow Allocation")
logging.info(f"Flow allocation factors saved to {file_path}")
logging.info(f"Flow allocation factors saved to {file_path_xlsx}")

injector_allocation = [
{"injector": inj, "producer": prd, "allocation": float(df_inj.loc[inj, prd])}
for inj in df_inj.index for prd in df_inj.columns
]
producer_allocation = [
{"producer": prd, "injector": inj, "allocation": float(df_prd.loc[prd, inj])}
for prd in df_prd.index for inj in df_prd.columns
]
self._json_results["allocation"] = {
"description": (
"Flow allocation factors: fraction of fluid from each injector that reaches each producer "
"(injector_allocation) and fraction of each producer's production that originates from each "
"injector (producer_allocation). Values range 0-1; higher values indicate stronger connectivity."
),
"injector_allocation": injector_allocation,
"producer_allocation": producer_allocation,
}


def _write_grid_flow_diagnostics(self) -> bool:
Expand Down Expand Up @@ -755,6 +812,20 @@ def _write_other_flow_diagnostics(self) -> None:
df_tracer_matrix = pd.DataFrame({'Time PVI': self.tDM[:-1], 'Tracer Concentration': self.CM})
df_tracer_matrix.to_csv(os.path.join(self.output_dir, f"Tracer_Matrix_{self.time_step_id}.csv"), index=False)

lorenz_matrix = self.compute_Lorenz(self.FM, self.PhiM)
lorenz_fracture = self.compute_Lorenz(self.FF, self.PhiF)
self._json_results["lorenz"] = {
"description": (
"Lorenz coefficient: measure of flow heterogeneity (0=uniform, 1=highly heterogeneous). "
"Derived from F-Phi (flow-storage capacity) curves. global: reservoir-wide; well_pairs: "
"per injector-producer streamtube. Higher values indicate more heterogeneous flow."
),
"global": {
"matrix": float(lorenz_matrix),
"fracture": float(lorenz_fracture),
},
}

else:

df_FPhi = pd.DataFrame({'Storage Capacity': self.Phi, 'Flow Capacity': self.F})
Expand All @@ -763,6 +834,91 @@ def _write_other_flow_diagnostics(self) -> None:
df_tracer = pd.DataFrame({'Time PVI': self.tD[:-1], 'Tracer Concentration': self.C})
df_tracer.to_csv(os.path.join(self.output_dir, f"Tracer_{self.time_step_id}.csv"), index=False)

lorenz = self.compute_Lorenz(self.F, self.Phi)
self._json_results["lorenz"] = {
"description": (
"Lorenz coefficient: measure of flow heterogeneity (0=uniform, 1=highly heterogeneous). "
"Derived from F-Phi (flow-storage capacity) curves. global: reservoir-wide; well_pairs: "
"per injector-producer streamtube. Higher values indicate more heterogeneous flow."
),
"global": float(lorenz),
}

self._compute_well_pair_lorenz()

def _compute_well_pair_lorenz(self) -> None:
"""Computes Lorenz coefficient per injector-producer well pair and appends to JSON results.

For each well pair (cells where partI and partP identify the dominant injector and producer),
builds F-Phi from pore volume and TOF, then computes the Lorenz coefficient.
"""
MIN_CELLS = 2
if self.partI is None or self.partP is None or not self.injectors or not self.producers:
return

tof_inj = np.where(np.isnan(self.tofI), np.nanmax(self.tofI, initial=0), self.tofI)
tof_prod = np.where(np.isnan(self.tofP), np.nanmax(self.tofP, initial=0), self.tofP)
tof_valid = np.column_stack([tof_inj, tof_prod])

valid_mask = (
self.grid.actnum_bool
& ~np.isnan(self.partI)
& ~np.isnan(self.partP)
& (self.partI >= 1)
& (self.partI <= len(self.injectors))
& (self.partP >= 1)
& (self.partP <= len(self.producers))
)
well_pairs = np.column_stack([self.partI[valid_mask], self.partP[valid_mask]])
unique_pairs = np.unique(well_pairs.astype(int), axis=0)

well_pair_lorenz = []
for part_i, part_p in unique_pairs:
pair_mask = valid_mask & (self.partI == part_i) & (self.partP == part_p)
n_cells = np.sum(pair_mask)
if n_cells < MIN_CELLS:
continue
porv_sub = self.grid.porv[pair_mask]
tof_sub = tof_valid[pair_mask]
try:
F, Phi = self.compute_F_and_Phi(porv_sub, tof_sub)
lc = float(self.compute_Lorenz(F, Phi))
except Exception:
continue
inj_name = self.injectors[int(part_i) - 1].name
prd_name = self.producers[int(part_p) - 1].name
well_pair_lorenz.append({
"injector": inj_name,
"producer": prd_name,
"lorenz_coefficient": lc,
"n_cells": int(n_cells),
})

if well_pair_lorenz and "lorenz" in self._json_results:
self._json_results["lorenz"]["well_pairs"] = well_pair_lorenz

def _write_combined_json(self) -> None:
"""Writes all flow diagnostics metrics to a single JSON file with descriptions."""
if not self._json_results:
return
time_block = {"time_step_id": self.time_step_id}
if isinstance(self.binary_reader, EclReader):
try:
time_info = self.binary_reader.get_report_step_date(self.time_step_id)
if time_info is not None:
time_block["date"] = time_info["date"]
time_block["days"] = time_info["days"]
time_block["description"] = "Report step date (YYYY-MM-DD) and cumulative days since simulation start."
except Exception:
pass
combined = {
"time": time_block,
"metrics": self._json_results,
}
file_path = os.path.join(self.output_dir, f"FlowDiagnostics_{self.time_step_id}.json")
with open(file_path, "w") as f:
json.dump(combined, f, indent=2)
logging.info(f"Flow diagnostics metrics saved to {file_path}")

# ---- Static Methods ---------------------------------------------------------------------------------------------

Expand Down
Loading
Loading