Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ run_all_normal: | run_glide_normal run_gdacs_normal run_dc_normal run_emdat_norm
@echo "Running all normalisation scripts.."

run_all_clean: | run_all_normal
@echo "Running all cleaning scripts.."
@echo "Running all cleaner scripts.."
@poetry run python -m src.utils.splitter

help:
Expand Down
10 changes: 3 additions & 7 deletions src/cerf/data_normalisation_cerf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
from src.data_consolidation.dictionary import (
CERF_MAPPING,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
from src.glide.data_normalisation_glide import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe

SCHEMA_PATH_CERF = "./src/cerf/cerf_schema.json"
EVENT_CODE_CSV = "./static_data/event_code_table.csv"
Expand All @@ -38,11 +38,7 @@ def get_iso3_code(country_name: str) -> None:

cleaned1_df["Country_Code"] = cleaned1_df["Country"].apply(get_iso3_code)
cleaned2_df = change_data_type(cleaned1_df, cerf_schema)
cleaned2_df["Date"] = pd.to_datetime(
cleaned2_df["Date"],
errors="coerce",
dayfirst=True,
)
cleaned2_df["Date"] = pd.to_datetime(cleaned2_df["Date"], errors="coerce")
cleaned2_df = normalize_event_type(cleaned2_df, EVENT_CODE_CSV)
schema_order = list(cerf_schema["properties"].keys())
ordered_columns = [col for col in schema_order if col in cleaned2_df.columns]
Expand Down
4 changes: 2 additions & 2 deletions src/disaster_charter/data_normalisation_dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
from src.data_consolidation.dictionary import (
DISASTER_CHARTER_MAPPING,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
from src.glide.data_normalisation_glide import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe

SCHEMA_PATH_DISASTER_CHARTER = "./src/disaster_charter/disaster_charter_schema.json"
BLOB_NAME = (
Expand Down
4 changes: 2 additions & 2 deletions src/emdat/data_normalisation_emdat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import pandas as pd

from src.data_consolidation.dictionary import EMDAT_MAPPING
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
from src.glide.data_normalisation_glide import (
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe

EMDAT_INPUT_XLX_BLOB = (
"disaster-impact/raw/emdat/"
Expand Down
13 changes: 4 additions & 9 deletions src/gdacs/data_normalisation_gdacs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,15 @@
import pycountry

from src.data_consolidation.dictionary import GDACS_MAPPING
from src.utils.azure_blob_utils import combine_csvs_from_blob_dir
from src.utils.util import (
from src.glide.data_normalisation_glide import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import combine_csvs_from_blob_dir

EVENT_CODE_CSV = "./static_data/event_code_table.csv"
COORDINATE_PAIR_LENGTH = 2
SCHEMA_PATH_GDACS = "./src/gdacs/gdacs_schema.json"


def combine_csvs_from_blob(blob_dir: str) -> pd.DataFrame:
Expand Down Expand Up @@ -139,9 +138,9 @@ def get_iso3_from_country_name(country_name: str) -> None:
return df


def main() -> None:
"""Main function to clean the GDACS data and save it to a CSV file."""
if __name__ == "__main__":
blob_dir = "disaster-impact/raw/gdacs/v2/"
SCHEMA_PATH_GDACS = "./src/gdacs/gdacs_schema.json"

gdacs_df_raw = combine_csvs_from_blob(blob_dir)
gdacs_df_raw = split_coordinates(
Expand Down Expand Up @@ -174,7 +173,3 @@ def main() -> None:
Path("./data_mid_1/gdacs/").mkdir(parents=True, exist_ok=True)
output_file_path = "./data_mid_1/gdacs/gdacs_mid1.csv"
cleaned2_gdacs_df.to_csv(output_file_path, index=False)


if __name__ == "__main__":
main()
103 changes: 98 additions & 5 deletions src/glide/data_normalisation_glide.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@

from src.data_consolidation.dictionary import GLIDE_MAPPING
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)

GLIDE_INPUT_BLOB = "disaster-impact/raw/glide/glide_data_combined_all.csv"
SCHEMA_PATH_GLIDE = "./src/glide/glide_schema.json"
Expand All @@ -24,6 +19,104 @@
glide_schema = json.load(schema_glide)


def map_and_drop_columns(raw_data: pd.DataFrame, dictionary: dict) -> pd.DataFrame:
"""Renames columns in the raw_data DataFrame based.

Args:
raw_data (pd.DataFrame): The input DataFrame with raw data.
dictionary (dict): A dictionary where keys are
the new column names and values are the old column names.

Returns:
pd.DataFrame: A DataFrame with columns renamed and unnecessary columns dropped.
"""
rename_mapping = {value: key for key, value in dictionary.items() if value}
return raw_data[list(rename_mapping.keys())].rename(columns=rename_mapping)


def change_data_type(cleaned1_data: pd.DataFrame, json_schema: dict) -> pd.DataFrame:
"""Change the data types of columns in a DataFrame based on a JSON schema.

Args:
cleaned1_data (pd.DataFrame): The DataFrame with data to be type-casted.
json_schema (dict): The JSON schema defining
the desired data types for each column.

Returns:
pd.DataFrame: The DataFrame with columns cast to the specified data types.
"""
for column, properties in json_schema["properties"].items():
if column in cleaned1_data.columns:
column_type = properties.get("type")
if "array" in column_type:
cleaned1_data[column] = cleaned1_data[column].apply(
lambda x: ",".join(map(str, x))
if isinstance(x, list)
else (str(x) if pd.notna(x) else ""),
)
elif "string" in column_type:
cleaned1_data[column] = cleaned1_data[column].astype(str)
elif "number" in column_type:
cleaned1_data[column] = pd.to_numeric(
cleaned1_data[column],
errors="coerce",
)
elif "integer" in column_type:
cleaned1_data[column] = pd.to_numeric(
cleaned1_data[column],
errors="coerce",
).astype("Int64")
elif "null" in column_type:
cleaned1_data[column] = cleaned1_data[column].where(
cleaned1_data[column].notna(),
None,
)
return cleaned1_data


def normalize_event_type(df: pd.DataFrame, event_code_csv: str) -> pd.DataFrame:
"""Normalizes the Event_Type.

The CSV file is expected to have two columns with headers:
- event_code: the normalized event type key.
- event_name: the event type description.

For each row in `df`, if the standardized Event_Type value matches a
description from the CSV, the corresponding normalized key is stored in a
new column, Event_Code. If no match is found, the original Event_Type value
is retained.

Args:
df (pd.DataFrame): The input DataFrame containing an 'Event_Type' column.
event_code_csv (str): The path to the CSV file containing the event code
mapping.

Returns:
pd.DataFrame: The DataFrame with an additional 'Event_Code' column.
"""
event_mapping_df = pd.read_csv(event_code_csv)
event_mapping_df["event_name"] = (
event_mapping_df["event_name"].str.strip().str.upper()
)
event_mapping_df["event_code"] = event_mapping_df["event_code"].str.strip()
mapping = dict(
zip(
event_mapping_df["event_name"],
event_mapping_df["event_code"],
strict=False,
),
)
df["Event_Code"] = (
df["Event_Type"]
.astype(str)
.str.strip()
.str.upper()
.map(mapping)
.fillna(df["Event_Type"])
)
return df


def main() -> None:
"""Main function to clean the GLIDE data and save it to a CSV file."""
glide_df_raw = read_blob_to_dataframe(GLIDE_INPUT_BLOB)
Expand Down
4 changes: 2 additions & 2 deletions src/idmc/data_normalisation_idmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import pandas as pd

from src.data_consolidation.dictionary import IDMC_MAPPING
from src.utils.azure_blob_utils import read_blob_to_json
from src.utils.util import (
from src.glide.data_normalisation_glide import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_json

SCHEMA_PATH_IDMC = "./src/idmc/idmc_schema.json"
EVENT_CODE_CSV = "./static_data/event_code_table.csv"
Expand Down
4 changes: 2 additions & 2 deletions src/ifrc_eme/data_normalisation_ifrc_eme.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import pandas as pd

from src.data_consolidation.dictionary import IFRC_EME_MAPPING
from src.utils.azure_blob_utils import read_blob_to_dataframe
from src.utils.util import (
from src.glide.data_normalisation_glide import (
change_data_type,
map_and_drop_columns,
normalize_event_type,
)
from src.utils.azure_blob_utils import read_blob_to_dataframe

IFRC_EME_INPUT_BLOB = "disaster-impact/raw/ifrc_dref/IFRC_emergencies.csv"
SCHEMA_PATH_IFRC_EME = "./src/ifrc_eme/ifrc_eme_schema.json"
Expand Down
101 changes: 0 additions & 101 deletions src/utils/util.py

This file was deleted.

Loading