From aae1b9cf553ee82a547e4249a4856f1d93dd1afc Mon Sep 17 00:00:00 2001 From: Evangelos <157492053+ediakatos@users.noreply.github.com> Date: Thu, 13 Mar 2025 12:26:49 +0000 Subject: [PATCH 1/5] Revert "Updated the disaster charter to resolve warning" This reverts commit 0fc561101976437ee3b00c091c983c624131953d. --- Makefile | 2 +- src/cerf/data_normalisation_cerf.py | 10 +++------- src/disaster_charter/data_normalisation_dc.py | 4 ++-- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/Makefile b/Makefile index 05cdf32..3d49da5 100644 --- a/Makefile +++ b/Makefile @@ -56,7 +56,7 @@ run_all_normal: | run_glide_normal run_gdacs_normal run_dc_normal run_emdat_norm @echo "Running all normalisation scripts.." run_all_clean: | run_all_normal - @echo "Running all cleaning scripts.." + @echo "Running all cleaner scripts.." @poetry run python -m src.utils.splitter help: diff --git a/src/cerf/data_normalisation_cerf.py b/src/cerf/data_normalisation_cerf.py index ed262ab..0eb0ea2 100644 --- a/src/cerf/data_normalisation_cerf.py +++ b/src/cerf/data_normalisation_cerf.py @@ -9,12 +9,12 @@ from src.data_consolidation.dictionary import ( CERF_MAPPING, ) -from src.utils.azure_blob_utils import read_blob_to_dataframe -from src.utils.util import ( +from src.glide.data_normalisation_glide import ( change_data_type, map_and_drop_columns, normalize_event_type, ) +from src.utils.azure_blob_utils import read_blob_to_dataframe SCHEMA_PATH_CERF = "./src/cerf/cerf_schema.json" EVENT_CODE_CSV = "./static_data/event_code_table.csv" @@ -38,11 +38,7 @@ def get_iso3_code(country_name: str) -> None: cleaned1_df["Country_Code"] = cleaned1_df["Country"].apply(get_iso3_code) cleaned2_df = change_data_type(cleaned1_df, cerf_schema) - cleaned2_df["Date"] = pd.to_datetime( - cleaned2_df["Date"], - errors="coerce", - dayfirst=True, - ) + cleaned2_df["Date"] = pd.to_datetime(cleaned2_df["Date"], errors="coerce") cleaned2_df = normalize_event_type(cleaned2_df, EVENT_CODE_CSV) schema_order = list(cerf_schema["properties"].keys()) ordered_columns = [col for col in schema_order if col in cleaned2_df.columns] diff --git a/src/disaster_charter/data_normalisation_dc.py b/src/disaster_charter/data_normalisation_dc.py index 60e0599..4c31e7b 100644 --- a/src/disaster_charter/data_normalisation_dc.py +++ b/src/disaster_charter/data_normalisation_dc.py @@ -9,12 +9,12 @@ from src.data_consolidation.dictionary import ( DISASTER_CHARTER_MAPPING, ) -from src.utils.azure_blob_utils import read_blob_to_dataframe -from src.utils.util import ( +from src.glide.data_normalisation_glide import ( change_data_type, map_and_drop_columns, normalize_event_type, ) +from src.utils.azure_blob_utils import read_blob_to_dataframe SCHEMA_PATH_DISASTER_CHARTER = "./src/disaster_charter/disaster_charter_schema.json" BLOB_NAME = ( From fe313af2033ff2f3c3d83cfa77fd05825ea0a11c Mon Sep 17 00:00:00 2001 From: Evangelos <157492053+ediakatos@users.noreply.github.com> Date: Thu, 13 Mar 2025 12:26:49 +0000 Subject: [PATCH 2/5] Revert "Path changes for emdat" This reverts commit 8b1c91838d2cf1eb8f5d7ca99707191c8611b8fa. --- src/emdat/data_normalisation_emdat.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emdat/data_normalisation_emdat.py b/src/emdat/data_normalisation_emdat.py index f21fe3d..1a01096 100644 --- a/src/emdat/data_normalisation_emdat.py +++ b/src/emdat/data_normalisation_emdat.py @@ -7,11 +7,11 @@ import pandas as pd from src.data_consolidation.dictionary import EMDAT_MAPPING -from src.utils.azure_blob_utils import read_blob_to_dataframe -from src.utils.util import ( +from src.glide.data_normalisation_glide import ( map_and_drop_columns, normalize_event_type, ) +from src.utils.azure_blob_utils import read_blob_to_dataframe EMDAT_INPUT_XLX_BLOB = ( "disaster-impact/raw/emdat/" From 457b38f7f289a3fbb95e3a426bdd476adab61c87 Mon Sep 17 00:00:00 2001 From: Evangelos <157492053+ediakatos@users.noreply.github.com> Date: Thu, 13 Mar 2025 12:26:49 +0000 Subject: [PATCH 3/5] Revert "Changed the path from idmc" This reverts commit c37aafaf1087b608f44bab400e0740098a9cb80c. --- src/glide/data_normalisation_glide.py | 6 +----- src/idmc/data_normalisation_idmc.py | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/glide/data_normalisation_glide.py b/src/glide/data_normalisation_glide.py index b3b63a9..a524878 100644 --- a/src/glide/data_normalisation_glide.py +++ b/src/glide/data_normalisation_glide.py @@ -10,11 +10,7 @@ from src.data_consolidation.dictionary import GLIDE_MAPPING from src.utils.azure_blob_utils import read_blob_to_dataframe -from src.utils.util import ( - change_data_type, - map_and_drop_columns, - normalize_event_type, -) +from src.utils.util import change_data_type, map_and_drop_columns, normalize_event_type GLIDE_INPUT_BLOB = "disaster-impact/raw/glide/glide_data_combined_all.csv" SCHEMA_PATH_GLIDE = "./src/glide/glide_schema.json" diff --git a/src/idmc/data_normalisation_idmc.py b/src/idmc/data_normalisation_idmc.py index 11bacf6..1db47db 100644 --- a/src/idmc/data_normalisation_idmc.py +++ b/src/idmc/data_normalisation_idmc.py @@ -8,12 +8,12 @@ import pandas as pd from src.data_consolidation.dictionary import IDMC_MAPPING -from src.utils.azure_blob_utils import read_blob_to_json -from src.utils.util import ( +from src.glide.data_normalisation_glide import ( change_data_type, map_and_drop_columns, normalize_event_type, ) +from src.utils.azure_blob_utils import read_blob_to_json SCHEMA_PATH_IDMC = "./src/idmc/idmc_schema.json" EVENT_CODE_CSV = "./static_data/event_code_table.csv" From ae1abe290d9ba7c531684ae8a1f7b61f432ba959 Mon Sep 17 00:00:00 2001 From: Evangelos <157492053+ediakatos@users.noreply.github.com> Date: Thu, 13 Mar 2025 12:26:49 +0000 Subject: [PATCH 4/5] Revert "Changed uitls path for ifrc emergencies" This reverts commit cd61bea14a4bfa95fe99ac8fc8d3b898edc2f409. --- src/ifrc_eme/data_normalisation_ifrc_eme.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ifrc_eme/data_normalisation_ifrc_eme.py b/src/ifrc_eme/data_normalisation_ifrc_eme.py index 079c1bb..df368b2 100644 --- a/src/ifrc_eme/data_normalisation_ifrc_eme.py +++ b/src/ifrc_eme/data_normalisation_ifrc_eme.py @@ -10,12 +10,12 @@ import pandas as pd from src.data_consolidation.dictionary import IFRC_EME_MAPPING -from src.utils.azure_blob_utils import read_blob_to_dataframe -from src.utils.util import ( +from src.glide.data_normalisation_glide import ( change_data_type, map_and_drop_columns, normalize_event_type, ) +from src.utils.azure_blob_utils import read_blob_to_dataframe IFRC_EME_INPUT_BLOB = "disaster-impact/raw/ifrc_dref/IFRC_emergencies.csv" SCHEMA_PATH_IFRC_EME = "./src/ifrc_eme/ifrc_eme_schema.json" From 71414afafce18dc9513687b5c64fa99bcdd36594 Mon Sep 17 00:00:00 2001 From: Evangelos <157492053+ediakatos@users.noreply.github.com> Date: Thu, 13 Mar 2025 12:26:49 +0000 Subject: [PATCH 5/5] Revert "Created and updated a util py script to offload some functions from main scripts and for better life cycle" This reverts commit 25099672795cf956db22cb36c450eea3b1665d90. --- src/gdacs/data_normalisation_gdacs.py | 13 +--- src/glide/data_normalisation_glide.py | 99 ++++++++++++++++++++++++- src/utils/util.py | 101 -------------------------- 3 files changed, 102 insertions(+), 111 deletions(-) delete mode 100644 src/utils/util.py diff --git a/src/gdacs/data_normalisation_gdacs.py b/src/gdacs/data_normalisation_gdacs.py index 004edaf..2300bae 100644 --- a/src/gdacs/data_normalisation_gdacs.py +++ b/src/gdacs/data_normalisation_gdacs.py @@ -9,16 +9,15 @@ import pycountry from src.data_consolidation.dictionary import GDACS_MAPPING -from src.utils.azure_blob_utils import combine_csvs_from_blob_dir -from src.utils.util import ( +from src.glide.data_normalisation_glide import ( change_data_type, map_and_drop_columns, normalize_event_type, ) +from src.utils.azure_blob_utils import combine_csvs_from_blob_dir EVENT_CODE_CSV = "./static_data/event_code_table.csv" COORDINATE_PAIR_LENGTH = 2 -SCHEMA_PATH_GDACS = "./src/gdacs/gdacs_schema.json" def combine_csvs_from_blob(blob_dir: str) -> pd.DataFrame: @@ -139,9 +138,9 @@ def get_iso3_from_country_name(country_name: str) -> None: return df -def main() -> None: - """Main function to clean the GDACS data and save it to a CSV file.""" +if __name__ == "__main__": blob_dir = "disaster-impact/raw/gdacs/v2/" + SCHEMA_PATH_GDACS = "./src/gdacs/gdacs_schema.json" gdacs_df_raw = combine_csvs_from_blob(blob_dir) gdacs_df_raw = split_coordinates( @@ -174,7 +173,3 @@ def main() -> None: Path("./data_mid_1/gdacs/").mkdir(parents=True, exist_ok=True) output_file_path = "./data_mid_1/gdacs/gdacs_mid1.csv" cleaned2_gdacs_df.to_csv(output_file_path, index=False) - - -if __name__ == "__main__": - main() diff --git a/src/glide/data_normalisation_glide.py b/src/glide/data_normalisation_glide.py index a524878..3aa50d5 100644 --- a/src/glide/data_normalisation_glide.py +++ b/src/glide/data_normalisation_glide.py @@ -10,7 +10,6 @@ from src.data_consolidation.dictionary import GLIDE_MAPPING from src.utils.azure_blob_utils import read_blob_to_dataframe -from src.utils.util import change_data_type, map_and_drop_columns, normalize_event_type GLIDE_INPUT_BLOB = "disaster-impact/raw/glide/glide_data_combined_all.csv" SCHEMA_PATH_GLIDE = "./src/glide/glide_schema.json" @@ -20,6 +19,104 @@ glide_schema = json.load(schema_glide) +def map_and_drop_columns(raw_data: pd.DataFrame, dictionary: dict) -> pd.DataFrame: + """Renames columns in the raw_data DataFrame based. + + Args: + raw_data (pd.DataFrame): The input DataFrame with raw data. + dictionary (dict): A dictionary where keys are + the new column names and values are the old column names. + + Returns: + pd.DataFrame: A DataFrame with columns renamed and unnecessary columns dropped. + """ + rename_mapping = {value: key for key, value in dictionary.items() if value} + return raw_data[list(rename_mapping.keys())].rename(columns=rename_mapping) + + +def change_data_type(cleaned1_data: pd.DataFrame, json_schema: dict) -> pd.DataFrame: + """Change the data types of columns in a DataFrame based on a JSON schema. + + Args: + cleaned1_data (pd.DataFrame): The DataFrame with data to be type-casted. + json_schema (dict): The JSON schema defining + the desired data types for each column. + + Returns: + pd.DataFrame: The DataFrame with columns cast to the specified data types. + """ + for column, properties in json_schema["properties"].items(): + if column in cleaned1_data.columns: + column_type = properties.get("type") + if "array" in column_type: + cleaned1_data[column] = cleaned1_data[column].apply( + lambda x: ",".join(map(str, x)) + if isinstance(x, list) + else (str(x) if pd.notna(x) else ""), + ) + elif "string" in column_type: + cleaned1_data[column] = cleaned1_data[column].astype(str) + elif "number" in column_type: + cleaned1_data[column] = pd.to_numeric( + cleaned1_data[column], + errors="coerce", + ) + elif "integer" in column_type: + cleaned1_data[column] = pd.to_numeric( + cleaned1_data[column], + errors="coerce", + ).astype("Int64") + elif "null" in column_type: + cleaned1_data[column] = cleaned1_data[column].where( + cleaned1_data[column].notna(), + None, + ) + return cleaned1_data + + +def normalize_event_type(df: pd.DataFrame, event_code_csv: str) -> pd.DataFrame: + """Normalizes the Event_Type. + + The CSV file is expected to have two columns with headers: + - event_code: the normalized event type key. + - event_name: the event type description. + + For each row in `df`, if the standardized Event_Type value matches a + description from the CSV, the corresponding normalized key is stored in a + new column, Event_Code. If no match is found, the original Event_Type value + is retained. + + Args: + df (pd.DataFrame): The input DataFrame containing an 'Event_Type' column. + event_code_csv (str): The path to the CSV file containing the event code + mapping. + + Returns: + pd.DataFrame: The DataFrame with an additional 'Event_Code' column. + """ + event_mapping_df = pd.read_csv(event_code_csv) + event_mapping_df["event_name"] = ( + event_mapping_df["event_name"].str.strip().str.upper() + ) + event_mapping_df["event_code"] = event_mapping_df["event_code"].str.strip() + mapping = dict( + zip( + event_mapping_df["event_name"], + event_mapping_df["event_code"], + strict=False, + ), + ) + df["Event_Code"] = ( + df["Event_Type"] + .astype(str) + .str.strip() + .str.upper() + .map(mapping) + .fillna(df["Event_Type"]) + ) + return df + + def main() -> None: """Main function to clean the GLIDE data and save it to a CSV file.""" glide_df_raw = read_blob_to_dataframe(GLIDE_INPUT_BLOB) diff --git a/src/utils/util.py b/src/utils/util.py deleted file mode 100644 index 0731cfe..0000000 --- a/src/utils/util.py +++ /dev/null @@ -1,101 +0,0 @@ -"""Utility functions for the project.""" - -import pandas as pd - - -def map_and_drop_columns(raw_data: pd.DataFrame, dictionary: dict) -> pd.DataFrame: - """Renames columns in the raw_data DataFrame based. - - Args: - raw_data (pd.DataFrame): The input DataFrame with raw data. - dictionary (dict): A dictionary where keys are - the new column names and values are the old column names. - - Returns: - pd.DataFrame: A DataFrame with columns renamed and unnecessary columns dropped. - """ - rename_mapping = {value: key for key, value in dictionary.items() if value} - return raw_data[list(rename_mapping.keys())].rename(columns=rename_mapping) - - -def change_data_type(cleaned1_data: pd.DataFrame, json_schema: dict) -> pd.DataFrame: - """Change the data types of columns in a DataFrame based on a JSON schema. - - Args: - cleaned1_data (pd.DataFrame): The DataFrame with data to be type-casted. - json_schema (dict): The JSON schema defining - the desired data types for each column. - - Returns: - pd.DataFrame: The DataFrame with columns cast to the specified data types. - """ - for column, properties in json_schema["properties"].items(): - if column in cleaned1_data.columns: - column_type = properties.get("type") - if "array" in column_type: - cleaned1_data[column] = cleaned1_data[column].apply( - lambda x: ",".join(map(str, x)) - if isinstance(x, list) - else (str(x) if pd.notna(x) else ""), - ) - elif "string" in column_type: - cleaned1_data[column] = cleaned1_data[column].astype(str) - elif "number" in column_type: - cleaned1_data[column] = pd.to_numeric( - cleaned1_data[column], - errors="coerce", - ) - elif "integer" in column_type: - cleaned1_data[column] = pd.to_numeric( - cleaned1_data[column], - errors="coerce", - ).astype("Int64") - elif "null" in column_type: - cleaned1_data[column] = cleaned1_data[column].where( - cleaned1_data[column].notna(), - None, - ) - return cleaned1_data - - -def normalize_event_type(df: pd.DataFrame, event_code_csv: str) -> pd.DataFrame: - """Normalizes the Event_Type. - - The CSV file is expected to have two columns with headers: - - event_code: the normalized event type key. - - event_name: the event type description. - - For each row in `df`, if the standardized Event_Type value matches a - description from the CSV, the corresponding normalized key is stored in a - new column, Event_Code. If no match is found, the original Event_Type value - is retained. - - Args: - df (pd.DataFrame): The input DataFrame containing an 'Event_Type' column. - event_code_csv (str): The path to the CSV file containing the event code - mapping. - - Returns: - pd.DataFrame: The DataFrame with an additional 'Event_Code' column. - """ - event_mapping_df = pd.read_csv(event_code_csv) - event_mapping_df["event_name"] = ( - event_mapping_df["event_name"].str.strip().str.upper() - ) - event_mapping_df["event_code"] = event_mapping_df["event_code"].str.strip() - mapping = dict( - zip( - event_mapping_df["event_name"], - event_mapping_df["event_code"], - strict=False, - ), - ) - df["Event_Code"] = ( - df["Event_Type"] - .astype(str) - .str.strip() - .str.upper() - .map(mapping) - .fillna(df["Event_Type"]) - ) - return df