diff --git a/ANMN/NRS_AIMS/REALTIME/anmn_nrs_aims.py b/ANMN/NRS_AIMS/REALTIME/anmn_nrs_aims.py index d7cf77f4..ccaa7d98 100755 --- a/ANMN/NRS_AIMS/REALTIME/anmn_nrs_aims.py +++ b/ANMN/NRS_AIMS/REALTIME/anmn_nrs_aims.py @@ -30,281 +30,389 @@ import datetime import logging import os -import sys import re import shutil -import traceback +import sys import unittest as data_validation_test - -from netCDF4 import Dataset from itertools import groupby -from tendo import singleton - -from aims_realtime_util import (convert_time_cf_to_imos, - create_list_of_dates_to_download, download_channel, - fix_data_code_from_filename, - fix_provider_code_from_filename, - has_var_only_fill_value, - is_no_data_found, is_time_monotonic, - is_time_var_empty, logging_aims, md5, - modify_aims_netcdf, parse_aims_xml, - remove_dimension_from_netcdf, - remove_end_date_from_filename, save_channel_info, - set_up, rm_tmp_dir, get_main_netcdf_var, - list_recursively_files_abs_path) +from pathlib import Path + +from aims_realtime_util import ( + convert_time_cf_to_imos, + create_list_of_dates_to_download, + download_channel, + fix_data_code_from_filename, + fix_provider_code_from_filename, + get_main_netcdf_var, + has_var_only_fill_value, + is_no_data_found, + is_time_monotonic, + is_time_var_empty, + list_recursively_files_abs_path, + logging_aims, + md5, + modify_aims_netcdf, + parse_aims_xml, + remove_dimension_from_netcdf, + remove_end_date_from_filename, + rm_tmp_dir, + save_channel_info, + set_up, +) from dest_path import get_anmn_nrs_site_name +from netCDF4 import Dataset +from tendo import singleton from util import pass_netcdf_checker -DATA_WIP_PATH = os.path.join(os.environ.get('WIP_DIR'), 'ANMN', 'NRS_AIMS_Darwin_Yongala_data_rss_download_temporary') -ANMN_NRS_INCOMING_DIR = os.path.join(os.environ.get('INCOMING_DIR'), 'AODN', 'ANMN_NRS_DAR_YON') -ANMN_NRS_ERROR_DIR = os.path.join(os.environ['ERROR_DIR'], 'ANMN_NRS_DAR_YON') +MD5_EXPECTED_VALUE = "ba3bcf5d61134a338ee62c8f98033d00" +# MD5_EXPECTED_VALUE = "a6207e053f1cc0e00d171701f0cdb186" + +DATA_WIP_PATH = os.path.join( + os.environ.get("WIP_DIR"), + "ANMN", + "NRS_AIMS_Darwin_Yongala_data_rss_download_temporary", +) +ANMN_NRS_INCOMING_DIR = os.path.join( + os.environ.get("INCOMING_DIR"), "AODN", "ANMN_NRS_DAR_YON" +) +ANMN_NRS_ERROR_DIR = os.path.join(os.environ["ERROR_DIR"], "ANMN_NRS_DAR_YON") def modify_anmn_nrs_netcdf(netcdf_file_path, channel_id_info): - """ Modify the downloaded netCDF file so it passes both CF and IMOS checker - input: - netcdf_file_path(str) : path of netcdf file to modify - channel_id_index(tupple) : information from xml for the channel """ + Refines ANMN NRS specific metadata and coordinate variables. + """ + # First pass: Generic AIMS modifications modify_aims_netcdf(netcdf_file_path, channel_id_info) - netcdf_file_obj = Dataset(netcdf_file_path, 'a', format='NETCDF4') - netcdf_file_obj.aims_channel_id = int(channel_id_info['channel_id']) - - if 'Yongala' in channel_id_info['site_name']: - netcdf_file_obj.site_code = 'NRSYON' - netcdf_file_obj.platform_code = 'Yongala NRS Buoy' - elif 'Darwin' in channel_id_info['site_name']: - netcdf_file_obj.site_code = 'NRSDAR' - netcdf_file_obj.platform_code = 'Darwin NRS Buoy' - elif 'Beagle' in channel_id_info['site_name']: - netcdf_file_obj.site_code = 'DARBGF' - netcdf_file_obj.platform_code = 'Beagle Gulf Mooring' - else: - return False - - if not (channel_id_info['metadata_uuid'] == 'Not Available'): - netcdf_file_obj.metadata_uuid = channel_id_info['metadata_uuid'] - - # some weather stations channels don't have a depth variable if sensor above water - if 'depth' in netcdf_file_obj.variables.keys(): - var = netcdf_file_obj.variables['depth'] - var.long_name = 'nominal depth' - var.positive = 'down' - var.axis = 'Z' - var.reference_datum = 'sea surface' - var.valid_min = -10.0 - var.valid_max = 30.0 - var.units = 'm' # some channels put degrees celcius instead ... - netcdf_file_obj.renameVariable('depth', 'NOMINAL_DEPTH') - - if 'DEPTH' in netcdf_file_obj.variables.keys(): - var = netcdf_file_obj.variables['DEPTH'] - var.coordinates = "TIME LATITUDE LONGITUDE NOMINAL_DEPTH" - var.long_name = 'actual depth' - var.reference_datum = 'sea surface' - var.positive = 'down' - var.valid_min = -10.0 - var.valid_max = 30.0 - var.units = 'm' # some channels put degrees celcius instead ... - - netcdf_file_obj.close() - netcdf_file_obj = Dataset(netcdf_file_path, 'a', format='NETCDF4') # need to close to save to file. as we call get_main_var just after - main_var = get_main_netcdf_var(netcdf_file_path) - # DEPTH, LATITUDE and LONGITUDE are not dimensions, so we make them into auxiliary cooordinate variables by adding this attribute - if 'NOMINAL_DEPTH' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.variables[main_var].coordinates = "TIME LATITUDE LONGITUDE NOMINAL_DEPTH" - else: - netcdf_file_obj.variables[main_var].coordinates = "TIME LATITUDE LONGITUDE" - - netcdf_file_obj.close() - + site_map = { + "Yongala": ("NRSYON", "Yongala NRS Buoy"), + "Darwin": ("NRSDAR", "Darwin NRS Buoy"), + "Beagle": ("DARBGF", "Beagle Gulf Mooring"), + } + + site_name = channel_id_info.get("site_name", "") + site_data = next((v for k, v in site_map.items() if k in site_name), None) + + if not site_data: + return False # Site not recognised + + with Dataset(netcdf_file_path, "a") as nc: + nc.site_code, nc.platform_code = site_data + nc.aims_channel_id = int(channel_id_info["channel_id"]) + + if channel_id_info.get("metadata_uuid") != "Not Available": + nc.metadata_uuid = channel_id_info["metadata_uuid"] + + # Depth Variable Attributes (Common configurations) + depth_attrs = { + "positive": "down", + "axis": "Z", + "reference_datum": "sea surface", + "valid_min": -10.0, + "valid_max": 30.0, + "units": "m", + } + + # Handle 'depth' + if "depth" in nc.variables: + var = nc.variables["depth"] + for k, v in depth_attrs.items(): + setattr(var, k, v) + var.long_name = "nominal depth" + nc.renameVariable("depth", "NOMINAL_DEPTH") + + # Handle 'DEPTH' (actual depth) + if "DEPTH" in nc.variables: + var = nc.variables["DEPTH"] + # Standard depth attributes plus coordinates + for k, v in depth_attrs.items(): + setattr(var, k, v) + var.long_name = "actual depth" + var.coordinates = "TIME LATITUDE LONGITUDE NOMINAL_DEPTH" + + # Coordinate String Assignment + # We close the file above so that the next functions see the changes + main_var = get_main_netcdf_var(netcdf_file_path) + + with Dataset(netcdf_file_path, "a") as nc: + if main_var in nc.variables: + coords = "TIME LATITUDE LONGITUDE" + if "NOMINAL_DEPTH" in nc.variables: + coords += " NOMINAL_DEPTH" + nc.variables[main_var].coordinates = coords + + # Final transformations if not convert_time_cf_to_imos(netcdf_file_path): return False - remove_dimension_from_netcdf(netcdf_file_path) # last modification to do in this order! + # This MUST be last as it reshapes the file + remove_dimension_from_netcdf(netcdf_file_path) + return True def move_to_tmp_incoming(netcdf_path): - # [org_filename withouth creation date].[md5].nc to have unique filename in - new_filename = '%s.%s.nc' % (os.path.splitext(os.path.basename(remove_end_date_from_filename(netcdf_path)))[0], md5(netcdf_path)) + """ + Renames the NetCDF to include its MD5 hash, moves it to the manifest directory, + and cleans up the now-empty source directory. + """ + logger = logging.getLogger(__name__) + + source_file = Path(netcdf_path) + source_dir = source_file.parent - os.chmod(netcdf_path, 0o0664) # change to 664 for pipeline v2 - shutil.move(netcdf_path, os.path.join(TMP_MANIFEST_DIR, new_filename)) + # Construct the new filename: [name_without_date].[md5].nc + # remove_end_date_from_filename returns a string, so we wrap it in Path + name_no_date = Path(remove_end_date_from_filename(str(source_file))).stem + file_hash = md5(str(source_file)) + new_filename = f"{name_no_date}.{file_hash}.nc" + + destination = Path(TMP_MANIFEST_DIR) / new_filename + + try: + # Apply permissions (664) + source_file.chmod(0o664) + + # Perform the move + shutil.move(str(source_file), str(destination)) + logger.info(f"Moved {source_file.name} to {destination}") + + # Cleanup: Delete the source directory if it is now empty + try: + source_dir.rmdir() + logger.debug(f"Cleaned up empty directory: {source_dir}") + except OSError: + logger.debug(f"Source directory not empty; skipping cleanup: {source_dir}") + + except Exception as e: + logger.error(f"Failed to move {source_file} to incoming: {e}") + raise def process_monthly_channel(channel_id, aims_xml_info, level_qc): - """ Downloads all the data available for one channel_id and moves the file to a wip_path dir - channel_id(str) - aims_xml_info(tuple) - level_qc(int) + """ + Downloads all the data available for one channel_id and moves the file to a wip_path dir aims_service : 1 -> FAIMMS data 100 -> SOOP TRV data 300 -> NRS DATA for monthly data download, only 1 and 300 should be use """ - logger.info('QC{level_qc} - Processing channel {channel_id}'.format(channel_id=str(channel_id), - level_qc=str(level_qc))) + contact_aims_msg = "Process of channel aborted - CONTACT AIMS" + wip_path = Path(os.environ.get("data_wip_path", "")) + + HL = "\x1b[1;35m" # Bold Magenta + RS = "\x1b[0m" + GREEN = "\033[92m" + ORANGE = "\033[38;5;208m" + RESET = "\033[0m" + YELLOW = "\033[33m" + + logger.info( + f"QC{level_qc} - {YELLOW}Processing channel{YELLOW} {HL}{channel_id}{RS}" + ) + channel_id_info = aims_xml_info[channel_id] - from_date = channel_id_info['from_date'] - thru_date = channel_id_info['thru_date'] - [start_dates, end_dates] = create_list_of_dates_to_download(channel_id, level_qc, from_date, thru_date) - - if len(start_dates) != 0: - # download monthly file - for start_date, end_date in zip(start_dates, end_dates): - start_date = start_date.strftime("%Y-%m-%dT%H:%M:%SZ") - end_date = end_date.strftime("%Y-%m-%dT%H:%M:%SZ") - netcdf_tmp_file_path = download_channel(channel_id, start_date, end_date, level_qc) - contact_aims_msg = "Process of channel aborted - CONTACT AIMS" - - if netcdf_tmp_file_path is None: - logger.error(' Channel %s - not valid zip file - %s' % (str(channel_id), contact_aims_msg)) - break - - # NO_DATA_FOUND file only means there is no data for the selected time period. Could be some data afterwards - if is_no_data_found(netcdf_tmp_file_path): - logger.info('Channel {channel_id}: No data for the time period:[{start_date} - {end_date}]'.format( - channel_id=str(channel_id), - start_date=start_date, - end_date=end_date)) - shutil.rmtree(os.path.dirname(netcdf_tmp_file_path)) - else: - if is_time_var_empty(netcdf_tmp_file_path): - logger.error('Channel {channel_id}: No values in TIME variable - {message}'.format( - channel_id=str(channel_id), - message=contact_aims_msg)) - shutil.rmtree(os.path.dirname(netcdf_tmp_file_path)) - break - - if not modify_anmn_nrs_netcdf(netcdf_tmp_file_path, channel_id_info): - logger.error('Channel{channel_id}: Could not modify the NetCDF file - Process of channel aborted'. - format(channel_id=str(channel_id))) - shutil.rmtree(os.path.dirname(netcdf_tmp_file_path)) - break - - main_var = get_main_netcdf_var(netcdf_tmp_file_path) - if has_var_only_fill_value(netcdf_tmp_file_path, main_var): - logger.error('Channel {channel_id}: _Fillvalues only in main variable - {message}'.format( - channel_id=str(channel_id), - message=contact_aims_msg)) - shutil.rmtree(os.path.dirname(netcdf_tmp_file_path)) - break - - if get_anmn_nrs_site_name(netcdf_tmp_file_path) == []: - logger.error('Channel {channel_id}: Unknown site_code gatt value - {message}'.format( - channel_id=str(channel_id), - message=contact_aims_msg)) - shutil.rmtree(os.path.dirname(netcdf_tmp_file_path)) - break - - if not is_time_monotonic(netcdf_tmp_file_path): - logger.error('Channel {channel_id}: TIME value is not strictly monotonic \ - - {message}'.format(channel_id=str(channel_id), - message=contact_aims_msg)) - shutil.rmtree(os.path.dirname(netcdf_tmp_file_path)) - break - - # check every single file of the list. We don't assume that if one passes, all pass ... past proved this - wip_path = os.environ.get('data_wip_path') - checker_retval = pass_netcdf_checker(netcdf_tmp_file_path, tests=['cf:1.6', 'imos:1.3']) - if not checker_retval: - logger.error('Channel {channel_id}: File does not pass CF/IMOS compliance checker - Process of channel aborted' - .format(channel_id=str(channel_id))) - shutil.copy(netcdf_tmp_file_path, os.path.join(wip_path, 'errors')) - - logger.error('File copied to {path} for debugging'.format( - path=os.path.join(wip_path, 'errors', os.path.basename(netcdf_tmp_file_path)))) - shutil.rmtree(os.path.dirname(netcdf_tmp_file_path)) - break - - netcdf_tmp_file_path = fix_data_code_from_filename(netcdf_tmp_file_path) - netcdf_tmp_file_path = fix_provider_code_from_filename(netcdf_tmp_file_path, 'IMOS_ANMN') - - if re.search('IMOS_ANMN_[A-Z]{1}_', netcdf_tmp_file_path) is None: - logger.error(' Channel %s - File name Data code does not pass REGEX - Process of channel aborted' % str(channel_id)) - shutil.copy(netcdf_tmp_file_path, os.path.join(wip_path, 'errors')) - logger.error(' File copied to %s for debugging' % (os.path.join(wip_path, 'errors', os.path.basename(netcdf_tmp_file_path)))) - shutil.rmtree(os.path.dirname(netcdf_tmp_file_path)) - break - - move_to_tmp_incoming(netcdf_tmp_file_path) - - if TESTING: - # The 2 next lines download the first month only for every single channel. This is only used for testing - save_channel_info(channel_id, aims_xml_info, level_qc, end_date) - break - - save_channel_info(channel_id, aims_xml_info, level_qc, end_date) + from_date = channel_id_info["from_date"] + thru_date = channel_id_info["thru_date"] + + # [start_dates, end_dates] generation + start_dates, end_dates = create_list_of_dates_to_download( + channel_id, level_qc, from_date, thru_date + ) + + if not start_dates: + logger.info( + f"{GREEN}QC{level_qc} - Channel {channel_id}: already up to date{RESET}" + ) + return + + # download monthly file + for start_dt, end_dt in zip(start_dates, end_dates): + start_date = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") + end_date = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + netcdf_tmp_file_path = download_channel( + channel_id, start_date, end_date, level_qc + ) + + if netcdf_tmp_file_path is None: + logger.error( + f" Channel {channel_id} - not valid zip file - {contact_aims_msg}" + ) + break + + tmp_dir = Path(netcdf_tmp_file_path).parent + + # NO_DATA_FOUND file only means there is no data for the selected time period. + # Could be some data afterwards + if is_no_data_found(netcdf_tmp_file_path): + logger.info( + f"{ORANGE}Channel {channel_id}: No data for the time period:[{start_date} - {end_date}]{RESET}" + ) + shutil.rmtree(tmp_dir) + continue # Move to next month + + # Start of validation sequence + error_occurred = False + + if is_time_var_empty(netcdf_tmp_file_path): + logger.error( + f"Channel {channel_id}: No values in TIME variable - {contact_aims_msg}" + ) + error_occurred = True + + elif not modify_anmn_nrs_netcdf(netcdf_tmp_file_path, channel_id_info): + logger.error( + f"Channel {channel_id}: Could not modify the NetCDF file - Process of channel aborted" + ) + error_occurred = True - else: - logger.info('QC{level_qc} - Channel {channel_id}: already up to date'.format(channel_id=str(channel_id), - level_qc=str(level_qc))) + else: + main_var = get_main_netcdf_var(netcdf_tmp_file_path) + if has_var_only_fill_value(netcdf_tmp_file_path, main_var): + logger.error( + f"Channel {channel_id}: _Fillvalues only in main variable - {contact_aims_msg}" + ) + error_occurred = True + elif not get_anmn_nrs_site_name(netcdf_tmp_file_path): + logger.error( + f"Channel {channel_id}: Unknown site_code gatt value - {contact_aims_msg}" + ) + error_occurred = True + elif not is_time_monotonic(netcdf_tmp_file_path): + logger.error( + f"Channel {channel_id}: TIME value is not strictly monotonic - {contact_aims_msg}" + ) + error_occurred = True + + if error_occurred: + shutil.rmtree(tmp_dir) + break + + # check every single file of the list. We don't assume that if one passes, all pass ... past proved this + if not pass_netcdf_checker(netcdf_tmp_file_path, tests=["cf:1.6", "imos:1.3"]): + logger.error( + f"Channel {channel_id}: File does not pass CF/IMOS compliance checker - Process of channel aborted" + ) + + err_dest = wip_path / "errors" / os.path.basename(netcdf_tmp_file_path) + shutil.copy(netcdf_tmp_file_path, err_dest) + + logger.error(f"File copied to {err_dest} for debugging") + shutil.rmtree(tmp_dir) + break + + netcdf_tmp_file_path = fix_data_code_from_filename(netcdf_tmp_file_path) + netcdf_tmp_file_path = fix_provider_code_from_filename( + netcdf_tmp_file_path, "IMOS_ANMN" + ) + + if not re.search(r"IMOS_ANMN_[A-Z]{1}_", netcdf_tmp_file_path): + logger.error( + f" Channel {channel_id} - File name Data code does not pass REGEX - Process of channel aborted" + ) + + err_dest = wip_path / "errors" / os.path.basename(netcdf_tmp_file_path) + shutil.copy(netcdf_tmp_file_path, err_dest) + + logger.error(f" File copied to {err_dest} for debugging") + shutil.rmtree(tmp_dir) + break + + move_to_tmp_incoming(netcdf_tmp_file_path) + + # Update tracking + save_channel_info(channel_id, aims_xml_info, level_qc, end_date) + + if TESTING: + # The 2 next lines download the first month only for every single channel. + # This is only used for testing + # Note: save_channel_info already called above + break def process_qc_level(level_qc): - """ Downloads all channels for a QC level - level_qc(int) : 0 or 1 """ + Downloads all channels for a specific QC level (0 or 1). + """ + logger.info( + f"Process ANMN NRS download from AIMS web service - QC level {level_qc}" + ) + + xml_url = ( + f"https://data.aims.gov.au/gbroosdata/services/rss/netcdf/level{level_qc}/300" + ) - logger.info('Process ANMN NRS download from AIMS web service - QC level {level_qc}'.format(level_qc=level_qc)) - xml_url = 'https://data.aims.gov.au/gbroosdata/services/rss/netcdf/level{level_qc}/300'.format(level_qc=level_qc) try: aims_xml_info = parse_aims_xml(xml_url) - except Exception as err: - logger.critical('RSS feed not available') + except Exception: + # Use exc_info=True to automatically attach the stack trace to the log + logger.critical(f"RSS feed not available at {xml_url}", exc_info=True) exit(1) - for channel_id in aims_xml_info.keys(): + # Iterate through channels + for channel_id in aims_xml_info: try: process_monthly_channel(channel_id, aims_xml_info, level_qc) - except Exception as err: - logger.error('QC{qc_level} - Channel {channel_id}: Failed, unknown reason - manual debug required'.format( - channel_id=str(channel_id), - qc_level=str(level_qc))) - logger.error(traceback.print_exc()) + except Exception: + # logger.exception automatically logs the error AND the traceback + logger.exception( + f"QC{level_qc} - Channel {channel_id}: Failed, unknown reason - manual debug required" + ) class AimsDataValidationTest(data_validation_test.TestCase): - def setUp(self): - """ Check that a the AIMS system or this script hasn't been modified. + """Check that a the AIMS system or this script hasn't been modified. This function checks that a downloaded file still has the same md5. """ - channel_id = '84329' - from_date = '2016-01-01T00:00:00Z' - thru_date = '2016-01-02T00:00:00Z' - level_qc = 1 - aims_rss_val = 300 - xml_url = 'https://data.aims.gov.au/gbroosdata/services/rss/netcdf/level%s/%s' % (str(level_qc), str(aims_rss_val)) - - logger.info('Data validation unittests...') - aims_xml_info = parse_aims_xml(xml_url) + channel_id = "84329" + from_date = "2016-01-01T00:00:00Z" + thru_date = "2016-01-02T00:00:00Z" + level_qc = 1 + aims_rss_val = 300 + xml_url = ( + "https://data.aims.gov.au/gbroosdata/services/rss/netcdf/level%s/%s" + % (str(level_qc), str(aims_rss_val)) + ) + + logger.info("Data validation unittests...") + aims_xml_info = parse_aims_xml(xml_url) channel_id_info = aims_xml_info[channel_id] - self.netcdf_tmp_file_path = download_channel(channel_id, from_date, thru_date, level_qc) - modify_anmn_nrs_netcdf(self.netcdf_tmp_file_path, channel_id_info) + self.nc_path = Path( + download_channel(channel_id, from_date, thru_date, level_qc) + ) + modify_anmn_nrs_netcdf(str(self.nc_path), channel_id_info) # force values of attributes which change all the time - netcdf_file_obj = Dataset(self.netcdf_tmp_file_path, 'a', format='NETCDF4') - netcdf_file_obj.date_created = "1970-01-01T00:00:00Z" # epoch - netcdf_file_obj.history = 'data validation test only' - netcdf_file_obj.NCO = 'NCO_VERSION' - - netcdf_file_obj.close() + with Dataset(self.nc_path, "a") as nc: + nc.date_created = "1970-01-01T00:00:00Z" + nc.history = "data validation test only" + # Check if NCO attribute exists before forcing it + if hasattr(nc, "NCO"): + nc.NCO = "NCO_VERSION" def tearDown(self): - shutil.copy(self.netcdf_tmp_file_path, os.path.join(os.environ['data_wip_path'], 'nc_unittest_%s.nc' % self.md5_netcdf_value)) - shutil.rmtree(os.path.dirname(self.netcdf_tmp_file_path)) + wip_dir = Path(os.environ.get("data_wip_path", ".")) + + # Preserve the file for debugging before cleanup + # self.md5_netcdf_value needs to be calculated in the test method itself + if hasattr(self, "md5_netcdf_value"): + debug_name = f"nc_unittest_{self.md5_netcdf_value}.nc" + shutil.copy(self.nc_path, wip_dir / debug_name) + + # Cleanup: Remove the parent directory of the temp file + if self.nc_path.parent.exists(): + shutil.rmtree(self.nc_path.parent) def test_aims_validation(self): if sys.version_info[0] < 3: - self.md5_expected_value = '76c9a595264a8173545b6dc0c518a280' + self.md5_expected_value = "76c9a595264a8173545b6dc0c518a280" else: - self.md5_expected_value = '78c6386529faf9dc2272e9bed5ed7fa2' - - self.md5_netcdf_value = md5(self.netcdf_tmp_file_path) + self.md5_expected_value = MD5_EXPECTED_VALUE + self.md5_netcdf_value = md5(str(self.nc_path)) self.assertEqual(self.md5_netcdf_value, self.md5_expected_value) @@ -315,19 +423,24 @@ def args(): :return: vargs """ parser = argparse.ArgumentParser() - parser.add_argument("-t", "--testing", - action='store_true', - help="testing only - downloads the first month of each channel") + parser.add_argument( + "-t", + "--testing", + action="store_true", + help="testing only - downloads the first month of each channel", + ) return parser.parse_args() -if __name__ == '__main__': +if __name__ == "__main__": vargs = args() me = singleton.SingleInstance() - os.environ['data_wip_path'] = os.path.join(os.environ.get('WIP_DIR'), - 'ANMN', - 'NRS_AIMS_Darwin_Yongala_data_rss_download_temporary') + os.environ["data_wip_path"] = os.path.join( + os.environ.get("WIP_DIR"), + "ANMN", + "NRS_AIMS_Darwin_Yongala_data_rss_download_temporary", + ) global TMP_MANIFEST_DIR global TESTING @@ -340,11 +453,13 @@ def args(): # data validation test runner = data_validation_test.TextTestRunner() - itersuite = data_validation_test.TestLoader().loadTestsFromTestCase(AimsDataValidationTest) + itersuite = data_validation_test.TestLoader().loadTestsFromTestCase( + AimsDataValidationTest + ) res = runner.run(itersuite) if not DATA_WIP_PATH: - logger.critical('environment variable data_wip_path is not defined.') + logger.critical("environment variable data_wip_path is not defined.") exit(1) # script optional argument for testing only. used in process_monthly_channel @@ -353,18 +468,19 @@ def args(): rm_tmp_dir(DATA_WIP_PATH) if len(os.listdir(ANMN_NRS_INCOMING_DIR)) >= 2: - logger.critical('Operation aborted, too many files in INCOMING_DIR') + logger.critical("Operation aborted, too many files in INCOMING_DIR") exit(1) if len(os.listdir(ANMN_NRS_ERROR_DIR)) >= 2: - logger.critical('Operation aborted, too many files in ERROR_DIR') + logger.critical("Operation aborted, too many files in ERROR_DIR") exit(1) if not res.failures: for level in [0, 1]: - date_str_now = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - TMP_MANIFEST_DIR = os.path.join(DATA_WIP_PATH, 'manifest_dir_tmp_{date}'.format( - date=date_str_now)) + date_str_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + TMP_MANIFEST_DIR = os.path.join( + DATA_WIP_PATH, "manifest_dir_tmp_{date}".format(date=date_str_now) + ) os.makedirs(TMP_MANIFEST_DIR) process_qc_level(level) @@ -372,17 +488,26 @@ def args(): lines_per_file = 2**12 file_list = list_recursively_files_abs_path(TMP_MANIFEST_DIR) if len(file_list) > 0: - for file_number, lines in groupby(enumerate(file_list), key=lambda x: x[0] // lines_per_file): - incoming_file = os.path.join(DATA_WIP_PATH, 'anmn_nrs_aims_FV0{level}_{date}_{file_number}.manifest'.format( - level=str(level), - date=date_str_now, - file_number=file_number)) - with open(incoming_file, 'w') as outfile: + for file_number, lines in groupby( + enumerate(file_list), key=lambda x: x[0] // lines_per_file + ): + incoming_file = os.path.join( + DATA_WIP_PATH, + "anmn_nrs_aims_FV0{level}_{date}_{file_number}.manifest".format( + level=str(level), date=date_str_now, file_number=file_number + ), + ) + with open(incoming_file, "w") as outfile: for item in lines: outfile.write("%s\n" % item[1]) os.chmod(incoming_file, 0o0664) # change to 664 for pipeline v2 - shutil.move(incoming_file, os.path.join(ANMN_NRS_INCOMING_DIR, os.path.basename(incoming_file))) + shutil.move( + incoming_file, + os.path.join( + ANMN_NRS_INCOMING_DIR, os.path.basename(incoming_file) + ), + ) else: - logger.error('Data validation unittests failed') + logger.error("Data validation unittests failed") diff --git a/lib/python/aims_realtime_util.py b/lib/python/aims_realtime_util.py index 4daad1b7..e22d358f 100755 --- a/lib/python/aims_realtime_util.py +++ b/lib/python/aims_realtime_util.py @@ -1,4 +1,4 @@ -""" set of tools to +"""set of tools to - parse AIMS RSS feed web pages - create a list of monthly timestamps to download - generate URL to download (with regards to what has already been downloaded @@ -10,25 +10,30 @@ author Laurent Besnard, laurent.besnard@utas.edu.au """ -import datetime + import glob +import hashlib import json import logging import os import pickle import re import shutil -import subprocess import sys import tempfile import time import xml.etree.ElementTree as ET import zipfile +from datetime import datetime, timedelta +from logging.handlers import TimedRotatingFileHandler +from pathlib import Path from time import gmtime, strftime import dotenv import numpy import requests +from dateutil import rrule +from dateutil.relativedelta import relativedelta from six.moves.urllib.request import urlopen from six.moves.urllib_error import URLError @@ -36,55 +41,83 @@ from functools import lru_cache except ImportError: from functools32 import lru_cache -from netCDF4 import Dataset, date2num, num2date - -from retrying import retry from logging.handlers import TimedRotatingFileHandler +from netCDF4 import Dataset, date2num, num2date +from retrying import retry ##################### # Logging Functions # ##################### +class AimsColorFormatter(logging.Formatter): + """Custom formatter to add colors to console output only.""" + + # ANSI Codes + GREY = "\x1b[38;20m" + CYAN = "\x1b[36;20m" + YELLOW = "\x1b[33;20m" + RED = "\x1b[31;20m" + BOLD_RED = "\x1b[31;1m" + RESET = "\x1b[0m" + + log_format = ( + "%(asctime)s — %(name)s — %(levelname)s — %(funcName)s:%(lineno)d — %(message)s" + ) + + LEVEL_COLORS = { + logging.DEBUG: GREY, + logging.INFO: CYAN, + logging.WARNING: YELLOW, + logging.ERROR: RED, + logging.CRITICAL: BOLD_RED, + } + + def format(self, record): + color = self.LEVEL_COLORS.get(record.levelno, self.RESET) + formatter = logging.Formatter(f"{color}{self.log_format}{self.RESET}") + return formatter.format(record) + + def logging_aims(): - """ start logging using logging python library - output: - logger - similar to a file handler - """ - wip_path = os.environ.get('data_wip_path') - # this is used for unit testing as data_wip_path env would not be set - if wip_path is None: - wip_path = tempfile.mkdtemp() + """Starts logging with colored console and plain-text file output.""" + + wip_path_env = os.environ.get("data_wip_path") + wip_path = Path(wip_path_env) if wip_path_env else Path(tempfile.mkdtemp()) + log_path = wip_path / "aims.log" - logging_format = "%(asctime)s — %(name)s — %(levelname)s — %(funcName)s:%(lineno)d — %(message)s" + # Standard plain formatter for the file + file_format = ( + "%(asctime)s — %(name)s — %(levelname)s — %(funcName)s:%(lineno)d — %(message)s" + ) + file_formatter = logging.Formatter(file_format) - # set up logging to file - tmp_filename = tempfile.mkstemp('.log', 'aims_data_download_')[1] - log_path = os.path.join(wip_path, 'aims.log') - logging.basicConfig(level=logging.INFO, - format=logging_format, - filename=tmp_filename, - filemode='a+') + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) - # rotate logs every Day, and keep only the last 5 log files - logHandler = TimedRotatingFileHandler(log_path, - when="D", - interval=1, - backupCount=5, # backupCount files will be kept - ) - logHandler.setFormatter(logging.Formatter(logging_format)) - logHandler.setLevel(logging.DEBUG) - logging.getLogger('').addHandler(logHandler) + if root_logger.hasHandlers(): + root_logger.handlers.clear() - # define a Handler which writes DEBUG messages to the sys.stderr - logFormatter = logging.Formatter(logging_format) - consoleHandler = logging.StreamHandler() - consoleHandler.setLevel(logging.INFO) - consoleHandler.setFormatter(logFormatter) + # 1. File Handler (Plain text) + file_handler = TimedRotatingFileHandler( + filename=log_path, when="D", interval=1, backupCount=5, encoding="utf-8" + ) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(file_formatter) + root_logger.addHandler(file_handler) - # add the console handler to the root logger - logging.getLogger('').addHandler(consoleHandler) + # 2. Console Handler (Colored) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + + console_handler.setFormatter(AimsColorFormatter()) + root_logger.addHandler(console_handler) + + root_logger.debug("Logging initialized successfully.") + root_logger.info(f"Log file location: {log_path}") + + return root_logger #################### @@ -93,33 +126,33 @@ def logging_aims(): def _pickle_filename(level_qc): - """ returns the pickle filepath according to the QC level being processed + """returns the pickle filepath according to the QC level being processed input: level_qc(int) : 0 or 1 output: picleQc_file(str) : pickle file path """ - wip_path = os.environ.get('data_wip_path') + wip_path = os.environ.get("data_wip_path") if wip_path is None: - raise ValueError('data_wip_path enviromnent variable is not set') + raise ValueError("data_wip_path enviromnent variable is not set") if level_qc == 0: - pickle_qc_file = os.path.join(wip_path, 'aims_qc0.pickle') + pickle_qc_file = os.path.join(wip_path, "aims_qc0.pickle") elif level_qc == 1: - pickle_qc_file = os.path.join(wip_path, 'aims_qc1.pickle') + pickle_qc_file = os.path.join(wip_path, "aims_qc1.pickle") return pickle_qc_file def delete_channel_id_from_pickle(level_qc, channel_id): pickle_file = _pickle_filename(level_qc) - with open(pickle_file, 'rb') as p_read: + with open(pickle_file, "rb") as p_read: aims_xml_info = pickle.load(p_read) if channel_id in aims_xml_info.keys(): - del(aims_xml_info[channel_id]) + del aims_xml_info[channel_id] - with open(pickle_file, 'wb') as p_write: + with open(pickle_file, "wb") as p_write: pickle.dump(aims_xml_info, p_write) @@ -139,120 +172,181 @@ def delete_platform_entries_from_pickle(level_qc, platform): In [2]: delete_platform_entries_from_pickle(2, 'Beagle') """ pickle_file = _pickle_filename(level_qc) - with open(pickle_file, 'rb') as p_read: + with open(pickle_file, "rb") as p_read: aims_xml_info = pickle.load(p_read) def delete_over_list_platform(aims_xml_info, platform): for index_platform, value in enumerate(aims_xml_info): if platform in value: for index_field in range(0, len(aims_xml_info)): - del(aims_xml_info[index_field][platform_name]) + del aims_xml_info[index_field][platform_name] aims_xml_info = delete_over_list_platform(aims_xml_info, platform) return aims_xml_info aims_xml_info_clean = delete_over_list_platform(aims_xml_info, platform) - with open(pickle_file, 'wb') as p_write: + with open(pickle_file, "wb") as p_write: pickle.dump(aims_xml_info_clean, p_write) @retry(URLError, tries=10, delay=3, backoff=2) def urlopen_with_retry(url): - """ it will retry a maximum of 10 times, with an exponential backoff delay + """it will retry a maximum of 10 times, with an exponential backoff delay doubling each time, e.g. 3 seconds, 6 seconds, 12 seconds """ return urlopen(url) -def save_channel_info(channel_id, aims_xml_info, level_qc, *last_downloaded_date_channel): +def save_channel_info( + channel_id, aims_xml_info, level_qc, *last_downloaded_date_channel +): """ - if channel_id has been successfuly processed, we write about it in a pickle file - we write the last downloaded data date for each channel - input: - channel_id(str) : channel_id to save information - aims_xml_info(dict) : generated by parser_aims_xml - level_qc(int) : 0 or 1 - last_downloaded_date_channel is a variable argument, not used by soop trv + if channel_id has been successfuly processed, we write about it in a pickle file + we write the last downloaded data date for each channel + input: + channel_id(str) : channel_id to save information + aims_xml_info(dict) : generated by parser_aims_xml + level_qc(int) : 0 or 1 + last_downloaded_date_channel is a variable argument, not used by soop trv """ - pickle_file = _pickle_filename(level_qc) - last_downloaded_date = dict() - # condition in case the pickle file already exists or not. In the first case, - # aims_xml_info comes from the pickle, file, otherwise comes from the function arg - if os.path.isfile(pickle_file): - with open(pickle_file, 'rb') as p_read: - aims_xml_info_file = pickle.load(p_read) - last_downloaded_date = aims_xml_info_file - - if not last_downloaded_date_channel: - # soop trv specific, vararg - last_downloaded_date[channel_id] = aims_xml_info[channel_id]['thru_date'] - else: - last_downloaded_date[channel_id] = last_downloaded_date_channel[0] + logger = logging.getLogger(__name__) + pickle_file = Path(_pickle_filename(level_qc)) + last_downloaded_data = {} + # Load existing data if file exists + if pickle_file.exists(): + try: + with pickle_file.open("rb") as p_read: + last_downloaded_data = pickle.load(p_read) + logger.debug(f"Loaded existing metadata from {pickle_file}") + except (EOFError, pickle.UnpicklingError): + logger.warning(f"Pickle file {pickle_file} was corrupt. Starting fresh.") + + # Determine the date (DRY - Don't Repeat Yourself) + if last_downloaded_date_channel: + new_date = last_downloaded_date_channel[0] + logger.debug(f"Using provided vararg date for {channel_id}: {new_date}") else: - if not last_downloaded_date_channel: - # soop trv specific, vararg - last_downloaded_date[channel_id] = aims_xml_info[channel_id]['thru_date'] - else: - last_downloaded_date[channel_id] = last_downloaded_date_channel[0] + new_date = aims_xml_info[channel_id]["thru_date"] + logger.debug(f"Extracted date from XML info for {channel_id}: {new_date}") - with open(pickle_file, 'wb') as p_write: - pickle.dump(last_downloaded_date, p_write) + # Update and Save + last_downloaded_data[channel_id] = new_date + + with pickle_file.open("wb") as p_write: + pickle.dump(last_downloaded_data, p_write) + + logger.info(f"Successfully saved channel info for {channel_id} to {pickle_file}") def get_last_downloaded_date_channel(channel_id, level_qc, from_date): - """ Retrieve the last date sucessfully downloaded for a channel """ - pickle_file = _pickle_filename(level_qc) # different pickle per QC - if os.path.isfile(pickle_file): - with open(pickle_file, 'rb') as p_read: - last_downloaded_date = pickle.load(p_read) + """ + Retrieve the last date successfully downloaded for a channel. + Falls back to from_date if no record is found or the file is missing/corrupt. + """ - if channel_id in last_downloaded_date.keys(): # check the channel is in the pickle file - if last_downloaded_date[channel_id] is not None: - return last_downloaded_date[channel_id] + logger = logging.getLogger(__name__) + pickle_path = Path(_pickle_filename(level_qc)) - return from_date + if not pickle_path.is_file(): + return from_date + try: + with pickle_path.open("rb") as p_read: + last_downloaded_map = pickle.load(p_read) -def has_channel_already_been_downloaded(channel_id, level_qc): - pickle_file = _pickle_filename(level_qc) # different pickle per QC - if os.path.isfile(pickle_file): - with open(pickle_file, 'rb') as p_read: - last_downloaded_date = pickle.load(p_read) + recorded_date = last_downloaded_map.get(channel_id) + return recorded_date if recorded_date is not None else from_date - if channel_id in last_downloaded_date.keys(): # check the channel is in the pickle file - if last_downloaded_date[channel_id] is not None: # check the last downloaded_date field - return True - else: - return False - else: - return False + except (EOFError, pickle.UnpicklingError, Exception) as e: + # If the pickle is corrupt, we don't want to kill the pipeline. + # Log it and fall back to the provided from_date. + logger.warning( + f"Failed to read tracking file {pickle_path}: {e}. Falling back to {from_date}" + ) + return from_date - else: + +def has_channel_already_been_downloaded(channel_id, level_qc): + """ + Checks if a channel exists in the tracking pickle and has a valid date. + """ + + logger = logging.getLogger(__name__) + pickle_path = Path(_pickle_filename(level_qc)) + # + # Early exit if file doesn't exist + if not pickle_path.is_file(): + logger.debug(f"No tracking file found at {pickle_path}") return False + try: + with pickle_path.open("rb") as p_read: + last_downloaded_date = pickle.load(p_read) + except (EOFError, pickle.UnpicklingError): + logger.error(f"Failed to read pickle file: {pickle_path}") + return False -def create_list_of_dates_to_download(channel_id, level_qc, from_date, thru_date): - """ generate a list of monthly start dates and end dates to download FAIMMS and NRS data """ + # Dictionary .get() returns None if key is missing + download_date = last_downloaded_date.get(channel_id) + exists = download_date is not None - from dateutil import rrule - from datetime import datetime - from dateutil.relativedelta import relativedelta + logger.debug( + f"Channel {channel_id} download status: {exists} (Date: {download_date})" + ) - last_downloaded_date = get_last_downloaded_date_channel(channel_id, level_qc, from_date) - start_dates = [] - end_dates = [] + return exists - from_date = datetime.strptime(from_date, "%Y-%m-%dT%H:%M:%SZ") - thru_date = datetime.strptime(thru_date, "%Y-%m-%dT%H:%M:%SZ") - last_downloaded_date = datetime.strptime(last_downloaded_date, "%Y-%m-%dT%H:%M:%SZ") - if last_downloaded_date < thru_date: - for dt in rrule.rrule(rrule.MONTHLY, dtstart=datetime(last_downloaded_date.year, last_downloaded_date.month, 1), until=thru_date): - start_dates.append(dt) - end_dates.append(datetime(dt.year, dt.month, 1) + relativedelta(months=1)) +def create_list_of_dates_to_download( + channel_id, level_qc, from_date_str, thru_date_str +): + """ + Generates lists of monthly start and end dates for data downloads. + Logic: Starts from the 1st of the month of the last download. + """ + logger = logging.getLogger(__name__) + # date format + iso_format = "%Y-%m-%dT%H:%M:%SZ" + + # Retrieve last download date + last_dl_str = get_last_downloaded_date_channel(channel_id, level_qc, from_date_str) + + # Convert strings to datetime objects + thru_date = datetime.strptime(thru_date_str, iso_format) + last_dl_date = datetime.strptime(last_dl_str, iso_format) + + start_dates = [] + end_dates = [] + + # Only process if there is new data to get + if last_dl_date >= thru_date: + logger.info( + f"Channel {channel_id}: No new dates to download. " + f"Last download ({last_dl_date}) is >= thru_date ({thru_date})" + ) + return start_dates, end_dates + + # Generate Monthly Ranges + # We start at the beginning (1st) of the month of the last download + month_start = datetime(last_dl_date.year, last_dl_date.month, 1) + + logger.debug( + f"Generating monthly ranges for {channel_id} starting from {month_start}" + ) + + for dt in rrule.rrule(rrule.MONTHLY, dtstart=month_start, until=thru_date): + start_dates.append(dt) + # End date is exactly one month after the start of the current iteration + end_dates.append(dt + relativedelta(months=1)) + + # Ensure the very last end date doesn't overshoot the requested thru_date + if end_dates: + original_end = end_dates[-1] end_dates[-1] = thru_date + logger.debug(f"Snapped final end date from {original_end} to {thru_date}") + logger.info(f"Generated {len(start_dates)} monthly intervals for {channel_id}") return start_dates, end_dates @@ -263,60 +357,72 @@ def list_recursively_files_abs_path(path): :return: """ filelist = [] - for filename in glob.glob('{path}/**'.format(path=path), recursive=True): + for filename in glob.glob("{path}/**".format(path=path), recursive=True): if os.path.isfile(filename): filelist.append(os.path.abspath(filename)) return filelist def md5(fname): - """ return a md5 checksum of a file """ - import hashlib - - hash = hashlib.md5() + """Return an md5 checksum of a file.""" with open(fname, "rb") as f: + if hasattr(hashlib, "file_digest"): + return hashlib.file_digest(f, "md5").hexdigest() + + hash_obj = hashlib.md5() for chunk in iter(lambda: f.read(4096), b""): - hash.update(chunk) - return hash.hexdigest() + hash_obj.update(chunk) + return hash_obj.hexdigest() def get_main_netcdf_var(netcdf_file_path): - with Dataset(netcdf_file_path, mode='r') as netcdf_file_obj: - variables = netcdf_file_obj.variables - - variables.pop('TIME') - variables.pop('LATITUDE') - variables.pop('LONGITUDE') - - if 'NOMINAL_DEPTH' in variables: - variables.pop('NOMINAL_DEPTH') + """ + Identifies the primary data variable in a NetCDF file by excluding + known coordinate and QC variables. + """ + with Dataset(netcdf_file_path, mode="r") as nc: + # Define the set of variables to ignore + excluded_vars = {"TIME", "LATITUDE", "LONGITUDE", "NOMINAL_DEPTH"} - qc_var = [s for s in variables if '_quality_control' in s] - if qc_var != []: - variables.pop(qc_var[0]) + # Get all variable names as a list to avoid modifying the 'variables' object + var_names = list(nc.variables.keys()) - return [item for item in variables.keys()][0] + # 1. Filter out the static coordinate names + # 2. Filter out any variable containing '_quality_control' + remaining_vars = [ + v + for v in var_names + if v not in excluded_vars and "_quality_control" not in v + ] - return variables[0] + # Return the first remaining variable if one exists, else None + return remaining_vars[0] if remaining_vars else None def is_above_file_limit(json_watchd_name): - """ check if the number of files in INCOMING DIR as set in watch.d/[JSON_WATCHD_NAME.json is above threshold - SOMETHING quite annoying re the pipeline structure : - * the watchd JSON filename maches the ERROR directory - * BUT doesn't match the INCOMING_DIR. the 'path' in the watch.d json file matches the ERROR_DIR""" - - json_fp = os.path.join(os.environ['DATA_SERVICES_DIR'], 'watch.d', '%s.json' % json_watchd_name) + """check if the number of files in INCOMING DIR as set in watch.d/[JSON_WATCHD_NAME.json is above threshold + SOMETHING quite annoying re the pipeline structure : + * the watchd JSON filename maches the ERROR directory + * BUT doesn't match the INCOMING_DIR. the 'path' in the watch.d json file matches the ERROR_DIR""" + + json_fp = os.path.join( + os.environ["DATA_SERVICES_DIR"], "watch.d", "%s.json" % json_watchd_name + ) with open(json_fp) as j_data: parsed_json = json.load(j_data) - if len(os.listdir(os.path.join(os.environ['INCOMING_DIR'], parsed_json['path'][0]))) >= int(parsed_json['files_crit']): + if len( + os.listdir(os.path.join(os.environ["INCOMING_DIR"], parsed_json["path"][0])) + ) >= int(parsed_json["files_crit"]): return True - elif len(os.listdir(os.path.join(os.environ['ERROR_DIR'], json_watchd_name))) >= int(parsed_json['files_crit']): + elif len( + os.listdir(os.path.join(os.environ["ERROR_DIR"], json_watchd_name)) + ) >= int(parsed_json["files_crit"]): return True else: return False + ###################### # XML Info Functions # ###################### @@ -324,74 +430,51 @@ def is_above_file_limit(json_watchd_name): @lru_cache(maxsize=100) def parse_aims_xml(xml_url): - """ Download and parse the AIMS XML rss feed """ + """Download and parse the AIMS XML rss feed using a single-pass loop.""" logger = logging.getLogger(__name__) - logger.info('PARSE AIMS xml RSS feed : %s' % (xml_url)) - response = urlopen(xml_url) - html = response.read() - root = ET.fromstring(html) - - n_item_start = 3 # start number for AIMS xml file - - title = [] - link = [] - metadata_uuid = [] - uom = [] - from_date = [] - thru_date = [] - platform_name = [] - site_name = [] - channel_id = [] - parameter = [] - parameter_type = [] - trip_id = [] # soop trv only - - for n_item in range(n_item_start, len(root[0])): - title .append(root[0][n_item][0].text) - link .append(root[0][n_item][1].text) - metadata_uuid .append(root[0][n_item][6].text) - uom .append(root[0][n_item][7].text) - from_date .append(root[0][n_item][8].text) - thru_date .append(root[0][n_item][9].text) - platform_name .append(root[0][n_item][10].text) - site_name .append(root[0][n_item][11].text) - channel_id .append(root[0][n_item][12].text) - parameter .append(root[0][n_item][13].text) - parameter_type.append(root[0][n_item][14].text) - - # in case there is no trip id defined by AIMS, we create a fake one, used by SOOP TRV only + logger.info(f"PARSE AIMS xml RSS feed : {xml_url}") + + with urlopen(xml_url) as response: + root = ET.fromstring(response.read()) + + new_dict = {} + items = root[0] + n_item_start = 3 + + for i in range(n_item_start, len(items)): + node = items[i] + + # Extract channel_id first as it's our primary key + c_id = node[12].text + + # Handle the trip_id logic for SOOP TRV only try: - trip_id.append(root[0][n_item][15].text) + t_id = node[15].text except IndexError: - dateObject = time.strptime(root[0][n_item][8].text, "%Y-%m-%dT%H:%M:%SZ") - trip_id_fake = str(dateObject.tm_year) + str(dateObject.tm_mon).zfill(2) + str(dateObject.tm_mday).zfill(2) - trip_id.append(trip_id_fake) - - response.close() - d = [{c: {'title': ttl, - 'channel_id': c, - 'link': lk, - 'metadata_uuid': muuid, - 'uom': uo, - 'from_date': fro, - 'thru_date': thr, - 'platform_name': pltname, - 'site_name': stname, - 'parameter': para, - 'parameter_type': paratype, - 'trip_id': trid - }} for c, ttl, lk, muuid, uo, fro, thr, pltname, stname, para, paratype, trid in - zip(channel_id, title, link, metadata_uuid, uom, from_date, - thru_date, platform_name, site_name, parameter, parameter_type, trip_id)] - - # re-writting the dict to have the channel key as a key value - new_dict = {} - for item in d: - for name in item.keys(): - new_dict[name] = item[name] + # Create fake trip_id from from_date (node[8]) + date_str = node[8].text + date_obj = time.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ") + t_id = time.strftime("%Y%m%d", date_obj) + + # Build the entry directly into the final dictionary + new_dict[c_id] = { + "title": node[0].text, + "channel_id": c_id, + "link": node[1].text, + "metadata_uuid": node[6].text, + "uom": node[7].text, + "from_date": node[8].text, + "thru_date": node[9].text, + "platform_name": node[10].text, + "site_name": node[11].text, + "parameter": node[13].text, + "parameter_type": node[14].text, + "trip_id": t_id, + } return new_dict + ########################################## # Channel Process/Download/Mod Functions # ########################################## @@ -402,9 +485,11 @@ def retry_if_result_none(result): return result is None -@retry(retry_on_result=retry_if_result_none, stop_max_attempt_number=10, wait_fixed=2000) +@retry( + retry_on_result=retry_if_result_none, stop_max_attempt_number=10, wait_fixed=2000 +) def download_channel(channel_id, from_date, thru_date, level_qc): - """ generated the data link to download, and extract the zip file into a temp file + """generated the data link to download, and extract the zip file into a temp file input: channel_id(str) : channel_id to download from_date(str) : str containing the first time to start the download from written in this format 2009-04-21_t10:43:54Z @@ -412,28 +497,38 @@ def download_channel(channel_id, from_date, thru_date, level_qc): level_qc(int) : 0 or 1 """ logger = logging.getLogger(__name__) - tmp_zip_file = tempfile.mkstemp() - netcdf_tmp_path = tempfile.mkdtemp() - url_data_download = 'http://data.aims.gov.au/gbroosdata/services/data/rtds/%s/level%s/raw/raw/%s/%s/netcdf/2' % \ - (channel_id, str(level_qc), from_date, thru_date) + tmp_zip_file = tempfile.mkstemp() + netcdf_tmp_path = tempfile.mkdtemp() + url_data_download = ( + "https://data.aims.gov.au/gbroosdata/services/data/rtds/%s/level%s/raw/raw/%s/%s/netcdf/2" + % (channel_id, str(level_qc), from_date, thru_date) + ) # set the timeout for no data to 120 seconds and enable streaming responses so we don't have to keep the file in memory - headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'} + headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" + } request = requests.get(url_data_download, timeout=120, stream=True, headers=headers) if request.status_code == 403: - logger.error('Error 403: access to the requested resource is forbidden - {url}'.format(url=url_data_download)) + logger.error( + "Error 403: access to the requested resource is forbidden - {url}".format( + url=url_data_download + ) + ) return - with open(tmp_zip_file[1], 'wb') as fh: + with open(tmp_zip_file[1], "wb") as fh: # Walk through the request response in chunks of 1024 * 1024 bytes, so 1MiB for chunk in request.iter_content(1024 * 1024): # Write the chunk to the file fh.write(chunk) if not zipfile.is_zipfile(tmp_zip_file[1]): - logger.error('%s is not a valid zip file' % url_data_download) + logger.error("%s is not a valid zip file" % url_data_download) os.close(tmp_zip_file[0]) - os.remove(tmp_zip_file[1]) # file object needs to be closed or can end up with too many open files + os.remove( + tmp_zip_file[1] + ) # file object needs to be closed or can end up with too many open files shutil.rmtree(netcdf_tmp_path) return @@ -445,11 +540,14 @@ def download_channel(channel_id, from_date, thru_date, level_qc): zip.close() os.close(tmp_zip_file[0]) - os.remove(tmp_zip_file[1]) # file object needs to be closed or can end up with too many open files + os.remove( + tmp_zip_file[1] + ) # file object needs to be closed or can end up with too many open files - logger.info('%s download: SUCCESS' % url_data_download) + logger.info("%s download: SUCCESS" % url_data_download) return netcdf_file_path + #################################### # Functions to modify NetCDF files # # AIMS NetCDF file specific only # @@ -457,27 +555,27 @@ def download_channel(channel_id, from_date, thru_date, level_qc): def is_no_data_found(netcdf_file_path): - """ Check if the unzipped file is a 'NO_DATA_FOUND' file instead of a netCDF file + """Check if the unzipped file is a 'NO_DATA_FOUND' file instead of a netCDF file this behaviour is correct for FAIMMS and NRS, as it means no data for the selected time period. However it doesn't make sense for SOOP TRV """ - return os.path.basename(netcdf_file_path) == 'NO_DATA_FOUND' + return os.path.basename(netcdf_file_path) == "NO_DATA_FOUND" def rename_netcdf_attribute(object_, old_attribute_name, new_attribute_name): - """ Rename global attribute from netcdf4 dataset object - object = Dataset(netcdf_file, 'a', format='NETCDF4') - old_attribute_name = current gatt name to modify - new_attribute_name = new gatt name + """Rename global attribute from netcdf4 dataset object + object = Dataset(netcdf_file, 'a', format='NETCDF4') + old_attribute_name = current gatt name to modify + new_attribute_name = new gatt name """ setattr(object_, new_attribute_name, getattr(object_, old_attribute_name)) delattr(object_, old_attribute_name) def is_time_var_empty(netcdf_file_path): - """ check if the yet unmodified file (time instead of TIME) has values in its time variable """ - netcdf_file_obj = Dataset(netcdf_file_path, 'r', format='NETCDF4') - var_obj = netcdf_file_obj.variables['time'] + """check if the yet unmodified file (time instead of TIME) has values in its time variable""" + netcdf_file_obj = Dataset(netcdf_file_path, "r", format="NETCDF4") + var_obj = netcdf_file_obj.variables["time"] if var_obj.shape[0] == 0: return True @@ -489,15 +587,19 @@ def is_time_var_empty(netcdf_file_path): def convert_time_cf_to_imos(netcdf_file_path): - """ convert a CF time into an IMOS one forced to be 'days since 1950-01-01 00:00:00' + """convert a CF time into an IMOS one forced to be 'days since 1950-01-01 00:00:00' the variable HAS to be 'TIME' """ try: - netcdf_file_obj = Dataset(netcdf_file_path, 'a', format='NETCDF4') - time = netcdf_file_obj.variables['TIME'] - dtime = num2date(time[:], time.units, time.calendar) # this gives an array of datetime objects - time.units = 'days since 1950-01-01 00:00:00 UTC' - time[:] = date2num(dtime, time.units, time.calendar) # conversion to IMOS recommended time + netcdf_file_obj = Dataset(netcdf_file_path, "a", format="NETCDF4") + time = netcdf_file_obj.variables["TIME"] + dtime = num2date( + time[:], time.units, time.calendar + ) # this gives an array of datetime objects + time.units = "days since 1950-01-01 00:00:00 UTC" + time[:] = date2num( + dtime, time.units, time.calendar + ) # conversion to IMOS recommended time netcdf_file_obj.close() return True except: @@ -507,13 +609,13 @@ def convert_time_cf_to_imos(netcdf_file_path): def strictly_increasing(list): - """ check monotocity of list of values""" + """check monotocity of list of values""" return all(x < y for x, y in zip(list, list[1:])) def is_time_monotonic(netcdf_file_path): - netcdf_file_obj = Dataset(netcdf_file_path, 'r', format='NETCDF4') - time = netcdf_file_obj.variables['TIME'][:] + netcdf_file_obj = Dataset(netcdf_file_path, "r", format="NETCDF4") + time = netcdf_file_obj.variables["TIME"][:] netcdf_file_obj.close() if not strictly_increasing(time): return False @@ -521,77 +623,87 @@ def is_time_monotonic(netcdf_file_path): def modify_aims_netcdf(netcdf_file_path, channel_id_info): - """ Modify the downloaded netCDF file so it passes both CF and IMOS checker + """Modify the downloaded netCDF file so it passes both CF and IMOS checker input: netcdf_file_path(str) : path of netcdf file to modify channel_id_index(dict) : information from xml for the channel """ - imos_env_path = os.path.join(os.environ.get('DATA_SERVICES_DIR'), 'lib', 'netcdf', 'imos_env') + imos_env_path = os.path.join( + os.environ.get("DATA_SERVICES_DIR"), "lib", "netcdf", "imos_env" + ) if not os.path.isfile(imos_env_path): logger = logging.getLogger(__name__) - logger.error('%s is not accessible' % imos_env_path) + logger.error("%s is not accessible" % imos_env_path) sys.exit(1) dotenv.load_dotenv(imos_env_path) - netcdf_file_obj = Dataset(netcdf_file_path, 'a', format='NETCDF4') - netcdf_file_obj.naming_authority = 'IMOS' + netcdf_file_obj = Dataset(netcdf_file_path, "a", format="NETCDF4") + netcdf_file_obj.naming_authority = "IMOS" # add gatts to NetCDF - netcdf_file_obj.aims_channel_id = int(channel_id_info['channel_id']) + netcdf_file_obj.aims_channel_id = int(channel_id_info["channel_id"]) - if not (channel_id_info['metadata_uuid'] == 'Not Available'): - netcdf_file_obj.metadata_uuid = channel_id_info['metadata_uuid'] + if not (channel_id_info["metadata_uuid"] == "Not Available"): + netcdf_file_obj.metadata_uuid = channel_id_info["metadata_uuid"] if not netcdf_file_obj.instrument_serial_number: - del(netcdf_file_obj.instrument_serial_number) + del netcdf_file_obj.instrument_serial_number # add CF gatts, values stored in lib/netcdf/imos_env - netcdf_file_obj.Conventions = os.environ.get('CONVENTIONS') - netcdf_file_obj.data_centre_email = os.environ.get('DATA_CENTRE_EMAIL') - netcdf_file_obj.data_centre = os.environ.get('DATA_CENTRE') - netcdf_file_obj.project = os.environ.get('PROJECT') - netcdf_file_obj.acknowledgement = os.environ.get('ACKNOWLEDGEMENT') - netcdf_file_obj.distribution_statement = os.environ.get('DISTRIBUTION_STATEMENT') - - netcdf_file_obj.date_created = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) - netcdf_file_obj.quality_control_set = 1 - imos_qc_convention = 'IMOS standard set using the IODE flags' - netcdf_file_obj.author = 'laurent besnard' - netcdf_file_obj.author_email = 'laurent.besnard@utas.edu.au' - - rename_netcdf_attribute(netcdf_file_obj, 'geospatial_LAT_max', 'geospatial_lat_max') - rename_netcdf_attribute(netcdf_file_obj, 'geospatial_LAT_min', 'geospatial_lat_min') - rename_netcdf_attribute(netcdf_file_obj, 'geospatial_LON_max', 'geospatial_lon_max') - rename_netcdf_attribute(netcdf_file_obj, 'geospatial_LON_min', 'geospatial_lon_min') + netcdf_file_obj.Conventions = os.environ.get("CONVENTIONS") + netcdf_file_obj.data_centre_email = os.environ.get("DATA_CENTRE_EMAIL") + netcdf_file_obj.data_centre = os.environ.get("DATA_CENTRE") + netcdf_file_obj.project = os.environ.get("PROJECT") + netcdf_file_obj.acknowledgement = os.environ.get("ACKNOWLEDGEMENT") + netcdf_file_obj.distribution_statement = os.environ.get("DISTRIBUTION_STATEMENT") + + netcdf_file_obj.date_created = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) + netcdf_file_obj.quality_control_set = 1 + imos_qc_convention = "IMOS standard set using the IODE flags" + netcdf_file_obj.author = "laurent besnard" + netcdf_file_obj.author_email = "laurent.besnard@utas.edu.au" + + rename_netcdf_attribute(netcdf_file_obj, "geospatial_LAT_max", "geospatial_lat_max") + rename_netcdf_attribute(netcdf_file_obj, "geospatial_LAT_min", "geospatial_lat_min") + rename_netcdf_attribute(netcdf_file_obj, "geospatial_LON_max", "geospatial_lon_max") + rename_netcdf_attribute(netcdf_file_obj, "geospatial_LON_min", "geospatial_lon_min") # variables modifications - time = netcdf_file_obj.variables['time'] - time.calendar = 'gregorian' - time.axis = 'T' + time = netcdf_file_obj.variables["time"] + time.calendar = "gregorian" + time.axis = "T" time.valid_min = 0.0 time.valid_max = 9999999999.0 - netcdf_file_obj.renameDimension('time', 'TIME') - netcdf_file_obj.renameVariable('time', 'TIME') - - netcdf_file_obj.time_coverage_start = num2date(time[:], time.units, time.calendar).min().strftime('%Y-%m-%dT%H:%M:%SZ') - netcdf_file_obj.time_coverage_end = num2date(time[:], time.units, time.calendar).max().strftime('%Y-%m-%dT%H:%M:%SZ') + netcdf_file_obj.renameDimension("time", "TIME") + netcdf_file_obj.renameVariable("time", "TIME") + + netcdf_file_obj.time_coverage_start = ( + num2date(time[:], time.units, time.calendar) + .min() + .strftime("%Y-%m-%dT%H:%M:%SZ") + ) + netcdf_file_obj.time_coverage_end = ( + num2date(time[:], time.units, time.calendar) + .max() + .strftime("%Y-%m-%dT%H:%M:%SZ") + ) # latitude longitude - latitude = netcdf_file_obj.variables['LATITUDE'] - latitude.axis = 'Y' - latitude.valid_min = -90.0 - latitude.valid_max = 90.0 - latitude.reference_datum = 'geographical coordinates, WGS84 projection' - latitude.standard_name = 'latitude' - latitude.long_name = 'latitude' - - longitude = netcdf_file_obj.variables['LONGITUDE'] - longitude.axis = 'X' - longitude.valid_min = -180.0 - longitude.valid_max = 180.0 - longitude.reference_datum = 'geographical coordinates, WGS84 projection' - longitude.standard_name = 'longitude' - longitude.long_name = 'longitude' + latitude = netcdf_file_obj.variables["LATITUDE"] + latitude.axis = "Y" + latitude.valid_min = -90.0 + latitude.valid_max = 90.0 + latitude.reference_datum = "geographical coordinates, WGS84 projection" + latitude.standard_name = "latitude" + latitude.long_name = "latitude" + + longitude = netcdf_file_obj.variables["LONGITUDE"] + longitude.axis = "X" + longitude.valid_min = -180.0 + longitude.valid_max = 180.0 + longitude.reference_datum = "geographical coordinates, WGS84 projection" + longitude.standard_name = "longitude" + longitude.long_name = "longitude" # handle masked arrays lon_array = longitude[:] @@ -612,293 +724,373 @@ def modify_aims_netcdf(netcdf_file_path, channel_id_info): netcdf_file_obj.geospatial_lat_max = numpy.ma.MaskedArray.max(lat_array) # Change variable name, standard name, longname, untis .... - if 'Seawater_Intake_Temperature' in netcdf_file_obj.variables.keys(): - var = netcdf_file_obj.variables['Seawater_Intake_Temperature'] - var.units = 'Celsius' - netcdf_file_obj.renameVariable('Seawater_Intake_Temperature', 'TEMP') - netcdf_file_obj.renameVariable('Seawater_Intake_Temperature_quality_control', 'TEMP_quality_control') - var.ancillary_variables = 'TEMP_quality_control' - - if 'PSAL' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.variables['PSAL'].units = '1e-3' - - if 'TURB' in netcdf_file_obj.variables.keys(): - var = netcdf_file_obj.variables['TURB'] - var.units = '1' - var.standard_name = 'sea_water_turbidity' - netcdf_file_obj.variables['TURB_quality_control'].standard_name = 'sea_water_turbidity status_flag' - - if 'DOWN_PHOTOSYNTH_FLUX' in netcdf_file_obj.variables.keys(): - var = netcdf_file_obj.variables['DOWN_PHOTOSYNTH_FLUX'] - var.units = 'W m-2' - - if 'PEAK_WAVE_DIR' in netcdf_file_obj.variables.keys(): - var = netcdf_file_obj.variables['PEAK_WAVE_DIR'] - var.units = 'degree' - - if 'CDIR' in netcdf_file_obj.variables.keys(): - var = netcdf_file_obj.variables['CDIR'] - var.units = 'degree' - var.long_name = 'current_direction' - - if 'CSPD' in netcdf_file_obj.variables.keys(): - var = netcdf_file_obj.variables['CSPD'] - var.long_name = 'current_magnitude' - - if 'ALBD' in netcdf_file_obj.variables.keys(): - var = netcdf_file_obj.variables['ALBD'] - var.units = '1' + if "Seawater_Intake_Temperature" in netcdf_file_obj.variables.keys(): + var = netcdf_file_obj.variables["Seawater_Intake_Temperature"] + var.units = "Celsius" + netcdf_file_obj.renameVariable("Seawater_Intake_Temperature", "TEMP") + netcdf_file_obj.renameVariable( + "Seawater_Intake_Temperature_quality_control", "TEMP_quality_control" + ) + var.ancillary_variables = "TEMP_quality_control" + + if "PSAL" in netcdf_file_obj.variables.keys(): + netcdf_file_obj.variables["PSAL"].units = "1e-3" + + if "TURB" in netcdf_file_obj.variables.keys(): + var = netcdf_file_obj.variables["TURB"] + var.units = "1" + var.standard_name = "sea_water_turbidity" + netcdf_file_obj.variables[ + "TURB_quality_control" + ].standard_name = "sea_water_turbidity status_flag" + + if "DOWN_PHOTOSYNTH_FLUX" in netcdf_file_obj.variables.keys(): + var = netcdf_file_obj.variables["DOWN_PHOTOSYNTH_FLUX"] + var.units = "W m-2" + + if "PEAK_WAVE_DIR" in netcdf_file_obj.variables.keys(): + var = netcdf_file_obj.variables["PEAK_WAVE_DIR"] + var.units = "degree" + + if "CDIR" in netcdf_file_obj.variables.keys(): + var = netcdf_file_obj.variables["CDIR"] + var.units = "degree" + var.long_name = "current_direction" + + if "CSPD" in netcdf_file_obj.variables.keys(): + var = netcdf_file_obj.variables["CSPD"] + var.long_name = "current_magnitude" + + if "ALBD" in netcdf_file_obj.variables.keys(): + var = netcdf_file_obj.variables["ALBD"] + var.units = "1" + + if "DOXY" in netcdf_file_obj.variables.keys(): + var = netcdf_file_obj.variables["DOXY"] + var.units = "kg m-3" # unit was milliliter/Liter which was not CF but equivalent anyway; Example channel 84900 def clean_no_cf_variables(var, netcdf_file_obj): """ remove standard name of main variable and of its ancillary qc var if exists """ if var in netcdf_file_obj.variables.keys(): - if hasattr(netcdf_file_obj.variables[var], 'standard_name'): - del(netcdf_file_obj.variables[var].standard_name) - var_qc = '%s_quality_control' % var + if hasattr(netcdf_file_obj.variables[var], "standard_name"): + del netcdf_file_obj.variables[var].standard_name + var_qc = "%s_quality_control" % var if var_qc in netcdf_file_obj.variables.keys(): - if hasattr(netcdf_file_obj.variables[var_qc], 'standard_name'): - del(netcdf_file_obj.variables[var_qc].standard_name) - if hasattr(netcdf_file_obj.variables[var], 'ancillary_variables'): + if hasattr(netcdf_file_obj.variables[var_qc], "standard_name"): + del netcdf_file_obj.variables[var_qc].standard_name + if hasattr(netcdf_file_obj.variables[var], "ancillary_variables"): netcdf_file_obj.variables[var].ancillary_variables = var_qc - if 'Dissolved_Oxygen_Percent' in netcdf_file_obj.variables.keys(): - clean_no_cf_variables('Dissolved_Oxygen_Percent', netcdf_file_obj) - - if 'ErrorVelocity' in netcdf_file_obj.variables.keys(): - clean_no_cf_variables('ErrorVelocity', netcdf_file_obj) - netcdf_file_obj.variables['ErrorVelocity'].long_name = 'error_velocity' - - if 'Average_Compass_Heading' in netcdf_file_obj.variables.keys(): - clean_no_cf_variables('Average_Compass_Heading', netcdf_file_obj) - var = netcdf_file_obj.variables['Average_Compass_Heading'] - var.units = 'degree' - - if 'Upwelling_longwave_radiation' in netcdf_file_obj.variables.keys(): - var_str = 'Upwelling_longwave_radiation' - var_qc_str = '%s_quality_control' % var_str - var = netcdf_file_obj.variables[var_str] - var_qc = netcdf_file_obj.variables[var_qc_str] - var.units = 'W m-2' - var.standard_name = 'upwelling_longwave_flux_in_air' - var_qc.standard_name = 'upwelling_longwave_flux_in_air status_flag' - - if 'Downwelling_longwave_radiation' in netcdf_file_obj.variables.keys(): - var_str = 'Downwelling_longwave_radiation' - var_qc_str = '%s_quality_control' % var_str - var = netcdf_file_obj.variables[var_str] - var_qc = netcdf_file_obj.variables[var_qc_str] - var.units = 'W m-2' - var.standard_name = 'downwelling_longwave_flux_in_air' - var_qc.standard_name = 'downwelling_longwave_flux_in_air status_flag' - - if 'UP_TOT_RADIATION' in netcdf_file_obj.variables.keys(): - var_str = 'UP_TOT_RADIATION' - var_qc_str = '%s_quality_control' % var_str - var = netcdf_file_obj.variables[var_str] - var_qc = netcdf_file_obj.variables[var_qc_str] - var.units = 'W m-2' - var.standard_name = 'upwelling_longwave_flux_in_air' - var_qc.standard_name = 'upwelling_longwave_flux_in_air status_flag' - - if 'DOWN_TOT_RADIATION' in netcdf_file_obj.variables.keys(): - var_str = 'DOWN_TOT_RADIATION' - var_qc_str = '%s_quality_control' % var_str - var = netcdf_file_obj.variables[var_str] - var_qc = netcdf_file_obj.variables[var_qc_str] - var.units = 'W m-2' - var.standard_name = 'downwelling_longwave_flux_in_air' - var_qc.standard_name = 'downwelling_longwave_flux_in_air status_flag' - - if 'RADIATION_DOWN_NET' in netcdf_file_obj.variables.keys(): - clean_no_cf_variables('RADIATION_DOWN_NET', netcdf_file_obj) - - if 'fluorescence' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.renameVariable('fluorescence', 'CPHL') - netcdf_file_obj.variables['CPHL'].long_name = 'mass_concentration_of_inferred_chlorophyll_from_relative_fluorescence_units_in_sea_water_concentration_of_chlorophyll_in_sea_water' - if 'fluorescence_quality_control' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.renameVariable('fluorescence_quality_control', 'CPHL_quality_control') - netcdf_file_obj.variables['CPHL_quality_control'].long_name = 'mass_concentration_of_inferred_chlorophyll_from_relative_fluorescence_units_in_sea_waterconcentration_of_chlorophyll_in_sea_water status_flag' - clean_no_cf_variables('CPHL', netcdf_file_obj) - - if 'WDIR_10min' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.variables['WDIR_10min'].units = 'degree' - - if 'WDIR_30min' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.variables['WDIR_30min'].units = 'degree' - - if 'R_sigma_30min' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.variables['R_sigma_30min'].units = 'degree' - clean_no_cf_variables('R_sigma_30min', netcdf_file_obj) - - if 'WDIR_sigma_10min' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.variables['WDIR_sigma_10min'].units = 'degree' - clean_no_cf_variables('WDIR_sigma_10min', netcdf_file_obj) - - if 'WDIR_sigma_30min' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.variables['WDIR_sigma_30min'].units = 'degree' - clean_no_cf_variables('WDIR_sigma_30min', netcdf_file_obj) - - if 'ATMP' in netcdf_file_obj.variables.keys(): - netcdf_file_obj.variables['ATMP'].units = 'hPa' - - if 'RAIN_DURATION' in netcdf_file_obj.variables.keys(): - clean_no_cf_variables('RAIN_DURATION', netcdf_file_obj) - - if 'HAIL_DURATION' in netcdf_file_obj.variables.keys(): - clean_no_cf_variables('HAIL_DURATION', netcdf_file_obj) - - if 'HAIL_HIT' in netcdf_file_obj.variables.keys(): - clean_no_cf_variables('HAIL_HIT', netcdf_file_obj) - netcdf_file_obj.variables['HAIL_HIT'].comment = netcdf_file_obj.variables['HAIL_HIT'].units - netcdf_file_obj.variables['HAIL_HIT'].units = '1' - - if 'HAIL_INTENSITY_10min' in netcdf_file_obj.variables.keys(): - clean_no_cf_variables('HAIL_INTENSITY_10min', netcdf_file_obj) - netcdf_file_obj.variables['HAIL_INTENSITY_10min'].comment = netcdf_file_obj.variables['HAIL_INTENSITY_10min'].units - netcdf_file_obj.variables['HAIL_INTENSITY_10min'].units = '1' + if "Dissolved_Oxygen_Percent" in netcdf_file_obj.variables.keys(): + clean_no_cf_variables("Dissolved_Oxygen_Percent", netcdf_file_obj) + + if "ErrorVelocity" in netcdf_file_obj.variables.keys(): + clean_no_cf_variables("ErrorVelocity", netcdf_file_obj) + netcdf_file_obj.variables["ErrorVelocity"].long_name = "error_velocity" + + if "Average_Compass_Heading" in netcdf_file_obj.variables.keys(): + clean_no_cf_variables("Average_Compass_Heading", netcdf_file_obj) + var = netcdf_file_obj.variables["Average_Compass_Heading"] + var.units = "degree" + + if "Upwelling_longwave_radiation" in netcdf_file_obj.variables.keys(): + var_str = "Upwelling_longwave_radiation" + var_qc_str = "%s_quality_control" % var_str + var = netcdf_file_obj.variables[var_str] + var_qc = netcdf_file_obj.variables[var_qc_str] + var.units = "W m-2" + var.standard_name = "upwelling_longwave_flux_in_air" + var_qc.standard_name = "upwelling_longwave_flux_in_air status_flag" + + if "Downwelling_longwave_radiation" in netcdf_file_obj.variables.keys(): + var_str = "Downwelling_longwave_radiation" + var_qc_str = "%s_quality_control" % var_str + var = netcdf_file_obj.variables[var_str] + var_qc = netcdf_file_obj.variables[var_qc_str] + var.units = "W m-2" + var.standard_name = "downwelling_longwave_flux_in_air" + var_qc.standard_name = "downwelling_longwave_flux_in_air status_flag" + + if "UP_TOT_RADIATION" in netcdf_file_obj.variables.keys(): + var_str = "UP_TOT_RADIATION" + var_qc_str = "%s_quality_control" % var_str + var = netcdf_file_obj.variables[var_str] + var_qc = netcdf_file_obj.variables[var_qc_str] + var.units = "W m-2" + var.standard_name = "upwelling_longwave_flux_in_air" + var_qc.standard_name = "upwelling_longwave_flux_in_air status_flag" + + if "DOWN_TOT_RADIATION" in netcdf_file_obj.variables.keys(): + var_str = "DOWN_TOT_RADIATION" + var_qc_str = "%s_quality_control" % var_str + var = netcdf_file_obj.variables[var_str] + var_qc = netcdf_file_obj.variables[var_qc_str] + var.units = "W m-2" + var.standard_name = "downwelling_longwave_flux_in_air" + var_qc.standard_name = "downwelling_longwave_flux_in_air status_flag" + + if "RADIATION_DOWN_NET" in netcdf_file_obj.variables.keys(): + clean_no_cf_variables("RADIATION_DOWN_NET", netcdf_file_obj) + + if "fluorescence" in netcdf_file_obj.variables.keys(): + netcdf_file_obj.renameVariable("fluorescence", "CPHL") + netcdf_file_obj.variables[ + "CPHL" + ].long_name = "mass_concentration_of_inferred_chlorophyll_from_relative_fluorescence_units_in_sea_water_concentration_of_chlorophyll_in_sea_water" + if "fluorescence_quality_control" in netcdf_file_obj.variables.keys(): + netcdf_file_obj.renameVariable( + "fluorescence_quality_control", "CPHL_quality_control" + ) + netcdf_file_obj.variables[ + "CPHL_quality_control" + ].long_name = "mass_concentration_of_inferred_chlorophyll_from_relative_fluorescence_units_in_sea_waterconcentration_of_chlorophyll_in_sea_water status_flag" + clean_no_cf_variables("CPHL", netcdf_file_obj) + + if "WDIR_10min" in netcdf_file_obj.variables.keys(): + netcdf_file_obj.variables["WDIR_10min"].units = "degree" + + if "WDIR_30min" in netcdf_file_obj.variables.keys(): + netcdf_file_obj.variables["WDIR_30min"].units = "degree" + + if "R_sigma_30min" in netcdf_file_obj.variables.keys(): + netcdf_file_obj.variables["R_sigma_30min"].units = "degree" + clean_no_cf_variables("R_sigma_30min", netcdf_file_obj) + + if "WDIR_sigma_10min" in netcdf_file_obj.variables.keys(): + netcdf_file_obj.variables["WDIR_sigma_10min"].units = "degree" + clean_no_cf_variables("WDIR_sigma_10min", netcdf_file_obj) + + if "WDIR_sigma_30min" in netcdf_file_obj.variables.keys(): + netcdf_file_obj.variables["WDIR_sigma_30min"].units = "degree" + clean_no_cf_variables("WDIR_sigma_30min", netcdf_file_obj) + + if "ATMP" in netcdf_file_obj.variables.keys(): + netcdf_file_obj.variables["ATMP"].units = "hPa" + + if "RAIN_DURATION" in netcdf_file_obj.variables.keys(): + clean_no_cf_variables("RAIN_DURATION", netcdf_file_obj) + + if "HAIL_DURATION" in netcdf_file_obj.variables.keys(): + clean_no_cf_variables("HAIL_DURATION", netcdf_file_obj) + + if "HAIL_HIT" in netcdf_file_obj.variables.keys(): + clean_no_cf_variables("HAIL_HIT", netcdf_file_obj) + netcdf_file_obj.variables["HAIL_HIT"].comment = netcdf_file_obj.variables[ + "HAIL_HIT" + ].units + netcdf_file_obj.variables["HAIL_HIT"].units = "1" + + if "HAIL_INTENSITY_10min" in netcdf_file_obj.variables.keys(): + clean_no_cf_variables("HAIL_INTENSITY_10min", netcdf_file_obj) + netcdf_file_obj.variables[ + "HAIL_INTENSITY_10min" + ].comment = netcdf_file_obj.variables["HAIL_INTENSITY_10min"].units + netcdf_file_obj.variables["HAIL_INTENSITY_10min"].units = "1" # add qc conventions to qc vars variables = netcdf_file_obj.variables.keys() - qc_vars = [s for s in variables if '_quality_control' in s] + qc_vars = [s for s in variables if "_quality_control" in s] if qc_vars != []: for var in qc_vars: - netcdf_file_obj.variables[var].quality_control_conventions = imos_qc_convention + netcdf_file_obj.variables[ + var + ].quality_control_conventions = imos_qc_convention # clean longnames, force lower case, remove space, remove double underscore for var in variables: - if hasattr(netcdf_file_obj.variables[var], 'long_name'): - netcdf_file_obj.variables[var].long_name = netcdf_file_obj.variables[var].long_name.replace('__', '_') - netcdf_file_obj.variables[var].long_name = netcdf_file_obj.variables[var].long_name.replace(' _', '_') - netcdf_file_obj.variables[var].long_name = netcdf_file_obj.variables[var].long_name.lower() + if hasattr(netcdf_file_obj.variables[var], "long_name"): + netcdf_file_obj.variables[var].long_name = netcdf_file_obj.variables[ + var + ].long_name.replace("__", "_") + netcdf_file_obj.variables[var].long_name = netcdf_file_obj.variables[ + var + ].long_name.replace(" _", "_") + netcdf_file_obj.variables[var].long_name = netcdf_file_obj.variables[ + var + ].long_name.lower() netcdf_file_obj.close() def fix_provider_code_from_filename(netcdf_file_path, imos_facility_code): - new_filename = re.sub('AIMS_', ('%s_' % imos_facility_code), netcdf_file_path) + new_filename = re.sub("AIMS_", ("%s_" % imos_facility_code), netcdf_file_path) shutil.move(netcdf_file_path, new_filename) return new_filename def fix_data_code_from_filename(netcdf_file_path): - """ Some filename are badly written. + """Some filename are badly written. this function has to run after modifying the file to make it CF and IMOS compliant It physically renames the filename if needed """ - netcdf_file_obj = Dataset(netcdf_file_path, 'r', format='NETCDF4') - if 'CDIR' in netcdf_file_obj.variables.keys(): - new_filename = re.sub('_CDIR_', '_V_', netcdf_file_path) - netcdf_file_obj.close() - shutil.move(netcdf_file_path, new_filename) - return new_filename - - if 'CSPD' in netcdf_file_obj.variables.keys(): - new_filename = re.sub('_CSPD_', '_V_', netcdf_file_path) - netcdf_file_obj.close() - shutil.move(netcdf_file_path, new_filename) - return new_filename - - if 'DOX1' in netcdf_file_obj.variables.keys(): - new_filename = re.sub('_Dissolved_O2_\(mole\)_', '_K_', netcdf_file_path) - netcdf_file_obj.close() - shutil.move(netcdf_file_path, new_filename) - return new_filename + logger = logging.getLogger(__name__) - if 'DEPTH' in netcdf_file_obj.variables.keys(): - new_filename = re.sub('_DEPTH_', '_Z_', netcdf_file_path) - netcdf_file_obj.close() - shutil.move(netcdf_file_path, new_filename) - return new_filename + # Mapping of {Variable_Internal_Name: (Regex_Pattern, Replacement_Code)} + FILENAME_MAPPING = { + "CDIR": ("_CDIR_", "_V_"), + "CSPD": ("_CSPD_", "_V_"), + "DOX1": (r"_Dissolved_O2_\(mole\)_", "_K_"), + "DEPTH": ("_DEPTH_", "_Z_"), + "Dissolved_Oxygen_Percent": ("_DO_%_", "_O_"), + "ErrorVelocity": ("_ErrorVelocity_", "_V_"), + "Average_Compass_Heading": ("_Average_Compass_Heading_", "_E_"), + "Upwelling_longwave_radiation": ("_Upwelling_longwave_radiation_", "_F_"), + "Downwelling_longwave_radiation": ("_Downwelling_longwave_radiation_", "_F_"), + } - if 'Dissolved_Oxygen_Percent' in netcdf_file_obj.variables.keys(): - new_filename = re.sub('_DO_%_', '_O_', netcdf_file_path) - netcdf_file_obj.close() - shutil.move(netcdf_file_path, new_filename) - return new_filename + with Dataset(netcdf_file_path, "r", format="NETCDF4") as nc: + found_var = next((var for var in FILENAME_MAPPING if var in nc.variables), None) - if 'ErrorVelocity' in netcdf_file_obj.variables.keys(): - new_filename = re.sub('_ErrorVelocity_', '_V_', netcdf_file_path) - netcdf_file_obj.close() - shutil.move(netcdf_file_path, new_filename) - return new_filename + if found_var: + pattern, replacement = FILENAME_MAPPING[found_var] + new_filename = re.sub(pattern, replacement, str(netcdf_file_path)) - if 'Average_Compass_Heading' in netcdf_file_obj.variables.keys(): - new_filename = re.sub('_Average_Compass_Heading_', '_E_', netcdf_file_path) - netcdf_file_obj.close() - shutil.move(netcdf_file_path, new_filename) - return new_filename + logger.debug(f"Renaming file based on variable '{found_var}': {new_filename}") - if 'Upwelling_longwave_radiation' in netcdf_file_obj.variables.keys(): - new_filename = re.sub('_Upwelling_longwave_radiation_', '_F_', netcdf_file_path) - netcdf_file_obj.close() - shutil.move(netcdf_file_path, new_filename) - return new_filename + old_path = Path(netcdf_file_path) + new_path = old_path.with_name(Path(new_filename).name) - if 'Downwelling_longwave_radiation' in netcdf_file_obj.variables.keys(): - new_filename = re.sub('_Downwelling_longwave_radiation_', '_F_', netcdf_file_path) - netcdf_file_obj.close() - shutil.move(netcdf_file_path, new_filename) - return new_filename + shutil.move(str(old_path), str(new_path)) + return str(new_path) - netcdf_file_obj.close() return netcdf_file_path def has_var_only_fill_value(netcdf_file_path, var): - """ some channels have only _Fillvalues in their main variable. This is not correct and need + """some channels have only _Fillvalues in their main variable. This is not correct and need to be tested var is a string of the variable to test """ - netcdf_file_obj = Dataset(netcdf_file_path, 'r', format='NETCDF4') - var_obj = netcdf_file_obj.variables[var] - var_values = var_obj[:] + netcdf_file_obj = Dataset(netcdf_file_path, "r", format="NETCDF4") + var_obj = netcdf_file_obj.variables[var] + var_values = var_obj[:] netcdf_file_obj.close() # if no fill value in variable, no mask attribute - if hasattr(var_values, 'mask'): + if hasattr(var_values, "mask"): return var_values.mask.all() else: return False +# +# def remove_dimension_from_netcdf(netcdf_file_path): +# """DIRTY, calling bash. need to write in Python, or part of the NetCDF4 module +# need to remove the 'single' dimension name from DEPTH or other dim. Unfortunately can't seem to find a way to do it easily with netCDF4 module +# """ +# fd, tmp_file = tempfile.mkstemp() +# os.close(fd) +# import subprocess +# +# subprocess.check_call(["ncwa", "-O", "-a", "single", netcdf_file_path, tmp_file]) +# subprocess.check_call( +# ["ncatted", "-O", "-a", "cell_methods,,d,,", tmp_file, tmp_file] +# ) +# shutil.move(tmp_file, netcdf_file_path) +# +# def remove_dimension_from_netcdf(netcdf_file_path): - """ DIRTY, calling bash. need to write in Python, or part of the NetCDF4 module - need to remove the 'single' dimension name from DEPTH or other dim. Unfortunately can't seem to find a way to do it easily with netCDF4 module + """ + Python replacement for NCO ncwa/ncatted. + Fixes the _FillValue AttributeError by passing it during variable creation. """ fd, tmp_file = tempfile.mkstemp() os.close(fd) - subprocess.check_call(['ncwa', '-O', '-a', 'single', netcdf_file_path, tmp_file]) - subprocess.check_call(['ncatted', '-O', '-a', 'cell_methods,,d,,', tmp_file, tmp_file]) + with Dataset(netcdf_file_path, "r") as src, Dataset(tmp_file, "w") as dst: + # 1. Copy global attributes + dst.setncatts(src.__dict__) + + hist_msg = "NetCDF file modified by remove_dimension_from_netcdf function" + if hasattr(dst, "history"): + # Append to existing history with a newline for readability + dst.history = f"{hist_msg}\n{dst.history}" + else: + # Create it if it doesn't exist + dst.history = hist_msg + + # 2. Copy dimensions EXCEPT 'single' + for name, dimension in src.dimensions.items(): + if name != "single": + dst.createDimension( + name, (len(dimension) if not dimension.isunlimited() else None) + ) + + # 3. Copy variables + for name, variable in src.variables.items(): + new_dims = tuple(d for d in variable.dimensions if d != "single") + + # --- THE FIX --- + # Check if source has a fill value. + # We use getattr because _FillValue is a reserved attribute name. + fill_val = getattr(variable, "_FillValue", None) + + # Create the variable with the fill_value already set + dst_var = dst.createVariable( + name, variable.datatype, new_dims, fill_value=fill_val + ) + # ---------------- + + # 4. Copy remaining Attributes (Replaces ncatted logic) + # We skip 'cell_methods' AND '_FillValue' (since we just set it) + for attr_name in variable.ncattrs(): + if attr_name not in ["cell_methods", "_FillValue"]: + dst_var.setncattr(attr_name, variable.getncattr(attr_name)) + + # 5. Copy Data + dst_var[:] = variable[:] + shutil.move(tmp_file, netcdf_file_path) def remove_end_date_from_filename(netcdf_filename): - """ remove the _END-* part of the file, as we download monthly file. This helps + """remove the _END-* part of the file, as we download monthly file. This helps to overwrite file with new data for the same month """ - return re.sub('_END-.*$', '.nc', netcdf_filename) + return re.sub("_END-.*$", ".nc", netcdf_filename) def rm_tmp_dir(data_wip_path): - """ remove temporary directories older than 15 days from data_wip path""" - for dir_path in os.listdir(data_wip_path): - if dir_path.startswith('manifest_dir_tmp_'): - file_date = datetime.datetime.strptime(dir_path.split('_')[-1], '%Y%m%d%H%M%S') - if (datetime.datetime.now() - file_date).days > 15: - logger = logging.getLogger(__name__) - logger.info('DELETE old temporary folder {path}'.format(path=os.path.join(data_wip_path, dir_path))) - shutil.rmtree(os.path.join(data_wip_path, dir_path)) + """ + Remove temporary directories older than 15 days from data_wip path. + Expected folder format: manifest_dir_tmp_YYYYMMDDHHMMSS + """ + + logger = logging.getLogger(__name__) + base_path = Path(data_wip_path) + if not base_path.is_dir(): + logger.warning(f"Cleanup skipped: {data_wip_path} is not a valid directory.") + return + + # Set threshold to 15 days ago + expiry_limit = datetime.now() - timedelta(days=15) + + for folder in base_path.glob("manifest_dir_tmp_*"): + try: + # Extract date string from the end of the folder name + date_str = folder.name.split("_")[-1] + folder_date = datetime.strptime(date_str, "%Y%m%d%H%M%S") + + if folder_date < expiry_limit: + logger.info(f"Deleting old temporary folder: {folder}") + shutil.rmtree(folder) + + except ValueError: + # This handles cases where the folder name matches the prefix + # but the suffix isn't a valid date + logger.debug(f"Skipping folder with invalid date format: {folder.name}") + except Exception as e: + logger.error(f"Failed to delete {folder}: {e}") def set_up(): """ set up wip facility directories """ - wip_path = os.environ.get('data_wip_path') + wip_path = os.environ.get("data_wip_path") # this is used for unit testing as data_wip_path env would not be set if wip_path is None: @@ -906,11 +1098,11 @@ def set_up(): if not wip_path: logger = logging.getLogger(__name__) - logger.error('env data_wip_path not defined') + logger.error("env data_wip_path not defined") exit(1) if not os.path.exists(wip_path): os.makedirs(wip_path) - if not os.path.exists(os.path.join(wip_path, 'errors')): - os.makedirs(os.path.join(wip_path, 'errors')) + if not os.path.exists(os.path.join(wip_path, "errors")): + os.makedirs(os.path.join(wip_path, "errors"))