diff --git a/asar_xarray/asar.py b/asar_xarray/asar.py index 0bc1bb5..fab08f3 100644 --- a/asar_xarray/asar.py +++ b/asar_xarray/asar.py @@ -1,7 +1,7 @@ """ASAR Xarray Dataset Reader.""" import os -from typing import Dict, Any +from typing import Dict, Any, List import pandas as pd import xarray as xr @@ -19,6 +19,43 @@ from asar_xarray.records_metadata import process_records_metadata +def create_srgr_dataset(grsr_poly_coeffs: List[Dict[str, Any]], gr_arr: NDArray[Any]) -> xr.Dataset: + """ + Build SRGR polynomials from GRSR polynomials of the Envisat file format to mimic sarsen Sentinel1 GRD handling. + + :param grsr_poly_coeffs: grsr polynomial metadata (List of dicts) + :param gr_arr: ground range array of the source product + + :return: xarray Dataset for srgr polynomial interpolation + """ + coords: dict[str, Any] = {} + + # TODO may need improvement, see https://github.com/SAR-ARD/asar-xarray/issues/58 + + degree = 10 + coords["degree"] = degree + + srgr_coeffs: List[NDArray[Any]] = [] + + azimuth_time_raw: List[Any] = [] + + for el in grsr_poly_coeffs: + grsr_poly = np.array(list(reversed(el["grsr_poly_coeffs"]))) + azimuth_time_raw.append(el["azimuth_time"]) + slant_range = np.polyval(grsr_poly, gr_arr) + + srgr_poly = np.polyfit(slant_range[::150], gr_arr[::150], deg=degree) + + srgr_coeffs.append(np.array(list(reversed(srgr_poly)))) + + coords["azimuth_time"] = [np.datetime64(dt, "ns") for dt in azimuth_time_raw] + coords["degree"] = list(range(degree + 1)) + + data_vars: dict[str, Any] = {"srgrCoefficients": (("azimuth_time", "degree"), srgr_coeffs)} + + return xr.Dataset(data_vars=data_vars, coords=coords) + + def get_metadata(gdal_dataset: gdal.Dataset) -> Dict[str, Any]: """ Build xarray attributes from gdal dataset to be used in xarray. @@ -66,7 +103,8 @@ def open_asar_dataset(filename_or_obj: str | os.PathLike[Any] | ReadBuffer[ if product_str == "Image Mode SLC Image" or product_str == "AP Mode SLC Image": metadata["product_type"] = "SLC" - elif product_str == "Image Mode Precision Image" or product_str == "AP Mode Precision Image": + elif (product_str == "Image Mode Precision Image" or product_str == "AP Mode Precision Image" + or product_str == "Wide Swath Mode Image"): metadata["product_type"] = "GRD" else: raise RuntimeError( @@ -132,7 +170,8 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: if product_str == "Image Mode SLC Image" or product_str == "AP Mode SLC Image": product_type = "SLC" - elif product_str == "Image Mode Precision Image" or product_str == "AP Mode Precision Image": + elif (product_str == "Image Mode Precision Image" or product_str == "AP Mode Precision Image" + or product_str == "Wide Swath Mode Image"): product_type = "GRD" else: raise RuntimeError( @@ -201,16 +240,25 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: number_of_samples, ) - # numpy polyval expects the polynomial top be highest ranked first - coeffs = list(reversed(metadata["direct_parse"]["grsr_coeffs"])) + grsr_arr = metadata["direct_parse"]["grsr_coeffs"] + if len(grsr_arr) == 1: + + # handle 1 GRSR Poly vs N GRSR Poly cases differently, + # numpy polyval expects the polynomial top be highest ranked first - slant_ranges = np.polyval(coeffs, ground_range) - slant_ranges *= 2 + coeffs = list(reversed(grsr_arr[0]["grsr_poly_coeffs"])) - c = 299792458 - slant_range_times = slant_ranges / c + slant_ranges = np.polyval(coeffs, ground_range) + slant_ranges *= 2 - coords["slant_range_time"] = ("pixel", slant_range_times) + c = 299792458 + slant_range_times = slant_ranges / c + + coords["slant_range_time"] = ("pixel", slant_range_times) + else: + swap_dims["pixel"] = "ground_range" + attrs["srgr_conversion"] = create_srgr_dataset(metadata["direct_parse"]["grsr_coeffs"], ground_range) + coords["ground_range"] = ("pixel", ground_range) data = xr.open_dataarray(filepath, engine='rasterio') diff --git a/asar_xarray/envisat_direct.py b/asar_xarray/envisat_direct.py index 69ef382..4d6fb0e 100644 --- a/asar_xarray/envisat_direct.py +++ b/asar_xarray/envisat_direct.py @@ -5,6 +5,7 @@ import math import pathlib import numpy as np +from asar_xarray import utils from numpy.typing import NDArray @@ -286,6 +287,10 @@ def __process_cal_ads(ads: EnvisatADS, gdal_metadata: dict[str, Any], metadata: Tuple containing antenna gains. """ antenna_gains = () + + if gdal_metadata["records"]["main_processing_params"]["ant_elev_corr_flag"]: + return antenna_gains + if ads.name == "EXTERNAL CALIBRATION": aux_folder = pathlib.Path(os.path.abspath(__file__)).parent @@ -342,11 +347,25 @@ def process_sr_gr_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, A None """ if ads.name == "SR GR ADS" and ads.size > 0: - srgr_buf = file_buffer[ads.offset:ads.offset + ads.size] - - r = struct.unpack(">ff5f", srgr_buf[13:41]) # Envisat file specification says it is SR/GR Conversion ADSR, # however the polynomial is for GR to SR... - srgr_coeffs = list(r[2:]) - metadata["grsr_coeffs"] = srgr_coeffs + srgr_buf = file_buffer[ads.offset:ads.offset + ads.size] + grsr_coeffs = [] + for i in range(ads.num): + one_srgr = srgr_buf[i * 55:i * 55 + 41] + + r = struct.unpack(">iiicff5f", one_srgr) + + srgr_el: dict[str, Any] = {} + + mjd_arr = [str(k) for k in r[0:3]] + mjd_str = ",".join(mjd_arr) + + srgr_el["azimuth_time"] = utils.get_envisat_time(mjd_str) + srgr_el["slr0"] = r[4] + srgr_el["gr0"] = r[5] + srgr_el["grsr_poly_coeffs"] = r[6:] + grsr_coeffs.append(srgr_el) + + metadata["grsr_coeffs"] = grsr_coeffs diff --git a/asar_xarray/records_metadata.py b/asar_xarray/records_metadata.py index db26d8f..5e7bece 100644 --- a/asar_xarray/records_metadata.py +++ b/asar_xarray/records_metadata.py @@ -206,8 +206,12 @@ def process_calibration_factors(metadata: dict[str, str]) -> list[dict[str, Any] calib_dict: dict[int, dict[str, Any]] = {} for key, value in metadata.items(): - if not key.startswith('MAIN_PROCESSING_PARAMS_ADS_CALIBRATION_FACTORS'): + + if "CALIBRATION_FACTORS" not in key: continue + # if not key.startswith('MAIN_PROCESSING_PARAMS_ADS_CALIBRATION_FACTORS') + # or not key.startswith('MAIN_PROCESSING_PARAMS_ADS_0_CALIBRATION_FACTORS'): + # continue # Split key into parts to get index and parameter name parts = key.split('.') @@ -451,6 +455,25 @@ def process_general_main_processing_params(metadata: dict[str, str]) -> dict[str ) processed[new_key] = parsed if parsed is not None else value + # Reuse 0_ tagged metadata for WSM mode + sum_num_output_lines = 0 + keys = processed.keys() + for k in keys: + if "_num_output_lines" in k: + sum_num_output_lines += processed[k] + if "0_range_samp_rate" in keys: + processed["range_samp_rate"] = processed["0_range_samp_rate"] + if "0_range_spacing" in keys: + processed["range_spacing"] = processed["0_range_spacing"] + if "0_radar_freq" in keys: + processed["radar_freq"] = processed["0_radar_freq"] + if "0_azimuth_spacing" in keys: + processed["azimuth_spacing"] = processed["0_azimuth_spacing"] + if "0_ant_elev_corr_flag" in keys: + processed["ant_elev_corr_flag"] = processed["0_ant_elev_corr_flag"] + if sum_num_output_lines > 0: + processed["num_output_lines"] = sum_num_output_lines + return processed @@ -505,13 +528,13 @@ def process_dop_centroid_coeffs(metadata: dict[str, str]) -> dict[str, Any]: param = key[24:].lower() # Process different parameter types - if param == 'zero_doppler_time': + if 'zero_doppler_time' in param: dop_dict[param] = utils.get_envisat_time(value) - elif param == 'attach_flag': + elif 'attach_flag' in param: dop_dict[param] = bool(int(value)) - elif param == 'dop_conf_below_thresh_flag': + elif 'dop_conf_below_thresh_flag' in param: dop_dict[param] = bool(int(value)) - elif param in ('dop_coef', 'delta_dopp_coeff'): + elif "dop_coef" in param or "delta_dopp_coeff" in param: # Keep all values including zeros for coefficients dop_dict[param] = [float(x) for x in value.strip().split()] else: