Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions mibiscreen/data/check_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ def standard_names(name_list,

Returns:
-------
tuple: three list containing names of
tuple: three lists containing names of
list with identitied quantities in data (but not standardized names)
list with unknown quantities in data (not in list of standardized names)
list with standard names of identified quantities
+ one dictionary mapping identified quantities to their
standard values for fast name transformation

Raises:
-------
Expand All @@ -60,7 +62,6 @@ def standard_names(name_list,
names_unknown = []
names_transform = {}


if isinstance(name_list, str):
name_list = [name_list]
elif isinstance(name_list, list):
Expand All @@ -76,17 +77,13 @@ def standard_names(name_list,
**contaminants_analysis,
}
dict_names=_generate_dict_other_names(properties_all)

other_names_contaminants = _generate_dict_other_names(properties_contaminants)
other_names_isotopes = _generate_dict_other_names(properties_isotopes)

# dict_names= other_names_all.copy()

for x in name_list:
y = dict_names.get(x, False)
x_isotope = x.split('-')[0]
y_isotopes = other_names_isotopes.get(x_isotope.lower(), False)

if y_isotopes is not False:
x_molecule = x.removeprefix(x_isotope+'-')
y_molecule = other_names_contaminants.get(x_molecule.lower(), False)
Expand Down Expand Up @@ -246,6 +243,21 @@ def check_columns(data_frame,
column_names_unknown = results[2]
column_names_transform = results[3]

### check on duplicates in column names after name standardization
### (i.e. in case the same quantity has been provided with different non-standard names
duplicates_indices = _check_duplicates_in_list(column_names_standard)

if duplicates_indices:
# duplicates_indices is NOT empty — do something here
print("WARNING: Duplicates found in list of standard names:")
for name, indices in duplicates_indices.items():
print(f"'{name}' occur has been provided as:")
for i in indices:
# Print the entry from other_list at index i
print(f" - '{column_names_known[i]}' (index {i})")
print('Remove or rename duplicate quantities in data.')
print('________________________________________________________________')

if standardize:
data.columns = [column_names_transform.get(x, x) for x in data.columns]

Expand All @@ -258,7 +270,7 @@ def check_columns(data_frame,
print('----------------------------------')
for i,name in enumerate(column_names_known):
print(name," --> ",column_names_standard[i])
print('----------------------------------')
print('---------------------------------------------------------')
if standardize:
print("Identified column names have been standardized")
else:
Expand Down Expand Up @@ -342,6 +354,7 @@ def check_units(data,
**properties_contaminants,
**properties_metabolites,
**properties_isotopes,
**contaminants_analysis,
}

### run through all quantity columns and check their units
Expand All @@ -354,6 +367,8 @@ def check_units(data,
else:
col_not_checked.append(quantity)
continue

### check on given unit (also considering alternative unit names)
if standard_unit != names.unit_less:
other_names_unit = properties_units[standard_unit]['other_names']
if str(units[quantity][0]).lower() not in other_names_unit:
Expand All @@ -370,6 +385,7 @@ def check_units(data,
print(" Quantities not in requested units:")
print(*col_check_list, sep='\n')
if len(col_not_checked) != 0:
print('________________________________________________________________')
print(" Quantities not identified (and thus not checked on units):")
print(*col_not_checked, sep='\n')
print('================================================================')
Expand Down Expand Up @@ -645,3 +661,35 @@ def _generate_dict_other_names(name_dict,
other_names_dict[other_name] = key

return other_names_dict

def _check_duplicates_in_list(name_list):
"""Finds duplicate strings in a list and returns their indices.

Parameters:
-----------
name_list : list of str
The list of strings to check for duplicates.

Returns:
--------
dict
A dictionary where keys are duplicated strings and values are lists
of indices where these duplicates occur in the input list.

Example:
--------
>>> find_duplicates_with_indices(['a', 'b', 'a', 'c', 'b'])
{'a': [0, 2], 'b': [1, 4]}
"""
duplicates_indices = {}

for index, name in enumerate(name_list):
if name not in duplicates_indices:
duplicates_indices[name] = [index]
else:
duplicates_indices[name].append(index)

# Filter to keep only duplicates (more than one occurrence)
duplicates_indices = {name: indices for name, indices in duplicates_indices.items() if len(indices) > 1}

return duplicates_indices
Binary file not shown.
72 changes: 60 additions & 12 deletions mibiscreen/data/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
@author: Alraune Zech
"""
import os.path
import re
import numpy as np
import pandas as pd

Expand All @@ -29,7 +30,7 @@ def load_excel(
store_provenance: Boolean
To add!
**kwargs: optional keyword arguments to pass to pandas' routine
read_excel()
read_excel(), e.g. sep = ',' or sep = ';'

Returns:
-------
Expand All @@ -51,6 +52,11 @@ def load_excel(
>>> load_excel(example_data.xlsx)

"""
if verbose:
print('===================================')
print(" Running function 'load_excel()'")
print('===================================')

if file_path is None:
raise ValueError('Specify file path and file name!')
if not os.path.isfile(file_path):
Expand All @@ -59,18 +65,16 @@ def load_excel(
data = pd.read_excel(file_path,
sheet_name = sheet_name,
**kwargs)
if ";" in data.iloc[1].iloc[0]:
data = pd.read_excel(file_path,
sep=";",
sheet_name = sheet_name,
**kwargs)

if verbose:
print("Reading data from file: {}".format(file_path))
print('------------------------------------------------------------------')

_check_duplicates_in_df(data)

units = data.drop(labels = np.arange(1,data.shape[0]))

if verbose:
print('==============================================================')
print(" Running function 'load_excel()' on data file ", file_path)
print('==============================================================')
print("Unit of quantities:")
print('-------------------')
print(units)
Expand Down Expand Up @@ -118,20 +122,29 @@ def load_csv(
>>> load_excel(example_data.csv)

"""
if verbose:
print('==================================')
print(" Running function 'load_csv()'")
print('==================================')

if file_path is None:
raise ValueError('Specify file path and file name!')
if not os.path.isfile(file_path):
raise OSError('Cannot access file at : ',file_path)

if verbose:
print("Reading data from file: {}".format(file_path))
print('------------------------------------------------------------------')

data = pd.read_csv(file_path, encoding="unicode_escape")
if ";" in data.iloc[1].iloc[0]:
data = pd.read_csv(file_path, sep=";", encoding="unicode_escape")

_check_duplicates_in_df(data)

units = data.drop(labels = np.arange(1,data.shape[0]))

if verbose:
print('================================================================')
print(" Running function 'load_csv()' on data file ", file_path)
print('================================================================')
print("Units of quantities:")
print('-------------------')
print(units)
Expand All @@ -142,3 +155,38 @@ def load_csv(
print('================================================================')

return data, units

def _check_duplicates_in_df(data):
"""Detects duplicate column names in a pandas DataFrame.

When a DataFrame contains identical column names they are automatically
renamed by pandas (e.g., 'Column', 'Column.1', 'Column.2'). This function
identifies if such column names exist and prints a warning message.

This function checks for column names that match the pandas auto-renaming pattern (`.1`, `.2`, etc.)
indicating that duplicate column names were present in the original data source (e.g., an Excel file).

Args:
-----
data (pd.DataFrame): The DataFrame to check for renamed duplicate columns.

Returns:
--------
None
"""
# Check for duplicated column names
renamed_pattern = re.compile(r"^(.*)\.(\d+)$") # Pattern to match renamed columns
duplicate_columns = {}
for col in data.columns:
if (match := renamed_pattern.match(col)):
base = match.group(1)
duplicate_columns.setdefault(base, []).append(col)
if duplicate_columns:
print("WARNING: Looks like duplicate column names detected.")
print(" They were automatically renamed by pandas into:")
for base, renamed_list in duplicate_columns.items():
for renamed in renamed_list:
print(f" - '{renamed}'")
print("Duplicate column names will not be identified as standard names.")
print("Consider renaming them.")
print('------------------------------------------------------------------')
Loading
Loading