Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 58 additions & 10 deletions asar_xarray/asar.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""ASAR Xarray Dataset Reader."""

import os
from typing import Dict, Any
from typing import Dict, Any, List

import pandas as pd
import xarray as xr
Expand All @@ -19,6 +19,43 @@
from asar_xarray.records_metadata import process_records_metadata


def create_srgr_dataset(grsr_poly_coeffs: List[Dict[str, Any]], gr_arr: NDArray[Any]) -> xr.Dataset:
"""
Build SRGR polynomials from GRSR polynomials of the Envisat file format to mimic sarsen Sentinel1 GRD handling.

:param grsr_poly_coeffs: grsr polynomial metadata (List of dicts)
:param gr_arr: ground range array of the source product

:return: xarray Dataset for srgr polynomial interpolation
"""
coords: dict[str, Any] = {}

# TODO may need improvement, see https://github.com/SAR-ARD/asar-xarray/issues/58

degree = 10
coords["degree"] = degree

srgr_coeffs: List[NDArray[Any]] = []

azimuth_time_raw: List[Any] = []

for el in grsr_poly_coeffs:
grsr_poly = np.array(list(reversed(el["grsr_poly_coeffs"])))
azimuth_time_raw.append(el["azimuth_time"])
slant_range = np.polyval(grsr_poly, gr_arr)

srgr_poly = np.polyfit(slant_range[::150], gr_arr[::150], deg=degree)

srgr_coeffs.append(np.array(list(reversed(srgr_poly))))

coords["azimuth_time"] = [np.datetime64(dt, "ns") for dt in azimuth_time_raw]
coords["degree"] = list(range(degree + 1))

data_vars: dict[str, Any] = {"srgrCoefficients": (("azimuth_time", "degree"), srgr_coeffs)}

return xr.Dataset(data_vars=data_vars, coords=coords)


def get_metadata(gdal_dataset: gdal.Dataset) -> Dict[str, Any]:
"""
Build xarray attributes from gdal dataset to be used in xarray.
Expand Down Expand Up @@ -66,7 +103,8 @@ def open_asar_dataset(filename_or_obj: str | os.PathLike[Any] | ReadBuffer[

if product_str == "Image Mode SLC Image" or product_str == "AP Mode SLC Image":
metadata["product_type"] = "SLC"
elif product_str == "Image Mode Precision Image" or product_str == "AP Mode Precision Image":
elif (product_str == "Image Mode Precision Image" or product_str == "AP Mode Precision Image"
or product_str == "Wide Swath Mode Image"):
metadata["product_type"] = "GRD"
else:
raise RuntimeError(
Expand Down Expand Up @@ -132,7 +170,8 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset:

if product_str == "Image Mode SLC Image" or product_str == "AP Mode SLC Image":
product_type = "SLC"
elif product_str == "Image Mode Precision Image" or product_str == "AP Mode Precision Image":
elif (product_str == "Image Mode Precision Image" or product_str == "AP Mode Precision Image"
or product_str == "Wide Swath Mode Image"):
product_type = "GRD"
else:
raise RuntimeError(
Expand Down Expand Up @@ -201,16 +240,25 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset:
number_of_samples,
)

# numpy polyval expects the polynomial top be highest ranked first
coeffs = list(reversed(metadata["direct_parse"]["grsr_coeffs"]))
grsr_arr = metadata["direct_parse"]["grsr_coeffs"]
if len(grsr_arr) == 1:

# handle 1 GRSR Poly vs N GRSR Poly cases differently,
# numpy polyval expects the polynomial top be highest ranked first

slant_ranges = np.polyval(coeffs, ground_range)
slant_ranges *= 2
coeffs = list(reversed(grsr_arr[0]["grsr_poly_coeffs"]))

c = 299792458
slant_range_times = slant_ranges / c
slant_ranges = np.polyval(coeffs, ground_range)
slant_ranges *= 2

coords["slant_range_time"] = ("pixel", slant_range_times)
c = 299792458
slant_range_times = slant_ranges / c

coords["slant_range_time"] = ("pixel", slant_range_times)
else:
swap_dims["pixel"] = "ground_range"
attrs["srgr_conversion"] = create_srgr_dataset(metadata["direct_parse"]["grsr_coeffs"], ground_range)
coords["ground_range"] = ("pixel", ground_range)

data = xr.open_dataarray(filepath, engine='rasterio')

Expand Down
29 changes: 24 additions & 5 deletions asar_xarray/envisat_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import math
import pathlib
import numpy as np
from asar_xarray import utils
from numpy.typing import NDArray


Expand Down Expand Up @@ -286,6 +287,10 @@ def __process_cal_ads(ads: EnvisatADS, gdal_metadata: dict[str, Any], metadata:
Tuple containing antenna gains.
"""
antenna_gains = ()

if gdal_metadata["records"]["main_processing_params"]["ant_elev_corr_flag"]:
return antenna_gains

if ads.name == "EXTERNAL CALIBRATION":

aux_folder = pathlib.Path(os.path.abspath(__file__)).parent
Expand Down Expand Up @@ -342,11 +347,25 @@ def process_sr_gr_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, A
None
"""
if ads.name == "SR GR ADS" and ads.size > 0:
srgr_buf = file_buffer[ads.offset:ads.offset + ads.size]

r = struct.unpack(">ff5f", srgr_buf[13:41])

# Envisat file specification says it is SR/GR Conversion ADSR,
# however the polynomial is for GR to SR...
srgr_coeffs = list(r[2:])
metadata["grsr_coeffs"] = srgr_coeffs
srgr_buf = file_buffer[ads.offset:ads.offset + ads.size]
grsr_coeffs = []
for i in range(ads.num):
one_srgr = srgr_buf[i * 55:i * 55 + 41]

r = struct.unpack(">iiicff5f", one_srgr)

srgr_el: dict[str, Any] = {}

mjd_arr = [str(k) for k in r[0:3]]
mjd_str = ",".join(mjd_arr)

srgr_el["azimuth_time"] = utils.get_envisat_time(mjd_str)
srgr_el["slr0"] = r[4]
srgr_el["gr0"] = r[5]
srgr_el["grsr_poly_coeffs"] = r[6:]
grsr_coeffs.append(srgr_el)

metadata["grsr_coeffs"] = grsr_coeffs
33 changes: 28 additions & 5 deletions asar_xarray/records_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,12 @@ def process_calibration_factors(metadata: dict[str, str]) -> list[dict[str, Any]
calib_dict: dict[int, dict[str, Any]] = {}

for key, value in metadata.items():
if not key.startswith('MAIN_PROCESSING_PARAMS_ADS_CALIBRATION_FACTORS'):

if "CALIBRATION_FACTORS" not in key:
continue
# if not key.startswith('MAIN_PROCESSING_PARAMS_ADS_CALIBRATION_FACTORS')
# or not key.startswith('MAIN_PROCESSING_PARAMS_ADS_0_CALIBRATION_FACTORS'):
# continue

# Split key into parts to get index and parameter name
parts = key.split('.')
Expand Down Expand Up @@ -451,6 +455,25 @@ def process_general_main_processing_params(metadata: dict[str, str]) -> dict[str
)
processed[new_key] = parsed if parsed is not None else value

# Reuse 0_ tagged metadata for WSM mode
sum_num_output_lines = 0
keys = processed.keys()
for k in keys:
if "_num_output_lines" in k:
sum_num_output_lines += processed[k]
if "0_range_samp_rate" in keys:
processed["range_samp_rate"] = processed["0_range_samp_rate"]
if "0_range_spacing" in keys:
processed["range_spacing"] = processed["0_range_spacing"]
if "0_radar_freq" in keys:
processed["radar_freq"] = processed["0_radar_freq"]
if "0_azimuth_spacing" in keys:
processed["azimuth_spacing"] = processed["0_azimuth_spacing"]
if "0_ant_elev_corr_flag" in keys:
processed["ant_elev_corr_flag"] = processed["0_ant_elev_corr_flag"]
if sum_num_output_lines > 0:
processed["num_output_lines"] = sum_num_output_lines

return processed


Expand Down Expand Up @@ -505,13 +528,13 @@ def process_dop_centroid_coeffs(metadata: dict[str, str]) -> dict[str, Any]:
param = key[24:].lower()

# Process different parameter types
if param == 'zero_doppler_time':
if 'zero_doppler_time' in param:
dop_dict[param] = utils.get_envisat_time(value)
elif param == 'attach_flag':
elif 'attach_flag' in param:
dop_dict[param] = bool(int(value))
elif param == 'dop_conf_below_thresh_flag':
elif 'dop_conf_below_thresh_flag' in param:
dop_dict[param] = bool(int(value))
elif param in ('dop_coef', 'delta_dopp_coeff'):
elif "dop_coef" in param or "delta_dopp_coeff" in param:
# Keep all values including zeros for coefficients
dop_dict[param] = [float(x) for x in value.strip().split()]
else:
Expand Down
Loading