Source code for S1_NRB.ancillary

import os
import sys
import logging
import binascii
from lxml import etree
from datetime import datetime, timedelta
from osgeo import gdal
import spatialist
from spatialist.vector import bbox, intersect
import pyroSAR
from pyroSAR import examine, identify_many
import S1_NRB


[docs] def check_scene_consistency(scenes): """ Check the consistency of a scene selection. The following pyroSAR object attributes must be the same: - sensor - acquisition_mode - product - frameNumber (data take ID) Parameters ---------- scenes: list[str or pyroSAR.drivers.ID] Returns ------- Raises ------ RuntimeError """ scenes = identify_many(scenes) for attr in ['sensor', 'acquisition_mode', 'product', 'frameNumber']: values = set([getattr(x, attr) for x in scenes]) if not len(values) == 1: msg = f"scene selection differs in attribute '{attr}': {values}" raise RuntimeError(msg)
[docs] def check_spacing(spacing): """ Check whether the spacing fits into the MGRS tile boundaries. Parameters ---------- spacing: int or float the target pixel spacing in meters Returns ------- """ if 109800 % spacing != 0: raise RuntimeError(f'target spacing of {spacing} m does not align with MGRS tile size of 109800 m.')
[docs] def generate_unique_id(encoded_str): """ Returns a unique product identifier as a hexadecimal string. The CRC-16 algorithm used to compute the unique identifier is CRC-CCITT (0xFFFF). Parameters ---------- encoded_str: bytes A string that should be used to generate a unique id from. The string needs to be encoded; e.g.: ``'abc'.encode()`` Returns ------- p_id: str The unique product identifier. """ crc = binascii.crc_hqx(encoded_str, 0xffff) p_id = '{:04X}'.format(crc & 0xffff) return p_id
[docs] def get_max_ext(geometries, buffer=None): """ Gets the maximum extent from a list of geometries. Parameters ---------- geometries: list[spatialist.vector.Vector] List of :class:`~spatialist.vector.Vector` geometries. buffer: float or None The buffer in units of the geometries' CRS to add to the extent. Returns ------- max_ext: dict The maximum extent of the selected :class:`~spatialist.vector.Vector` geometries including the chosen buffer. """ max_ext = {} for geo in geometries: if len(max_ext.keys()) == 0: max_ext = geo.extent else: ext = geo.extent for key in ['xmin', 'ymin']: if ext[key] < max_ext[key]: max_ext[key] = ext[key] for key in ['xmax', 'ymax']: if ext[key] > max_ext[key]: max_ext[key] = ext[key] max_ext = dict(max_ext) if buffer is not None: max_ext['xmin'] -= buffer max_ext['xmax'] += buffer max_ext['ymin'] -= buffer max_ext['ymax'] += buffer return max_ext
[docs] def set_logging(config, debug=False): """ Set logging for the current process. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. debug: bool Set pyroSAR logging level to DEBUG? Returns ------- log_local: logging.Logger The log handler for the current process. """ # pyroSAR logging as sys.stdout log_pyro = logging.getLogger('pyroSAR') if debug: log_pyro.setLevel(logging.DEBUG) else: log_pyro.setLevel(logging.INFO) sh = logging.StreamHandler(sys.stdout) log_pyro.addHandler(sh) # S1_NRB logging in logfile now = datetime.now().strftime('%Y%m%dT%H%M') log_local = logging.getLogger(__name__) log_local.setLevel(logging.DEBUG) log_file = os.path.join(config['log_dir'], f"{now}_process.log") os.makedirs(os.path.dirname(log_file), exist_ok=True) fh = logging.FileHandler(filename=log_file, mode='a') log_local.addHandler(fh) # Add header first with simple formatting form_simple = logging.Formatter("%(message)s") fh.setFormatter(form_simple) _log_process_config(logger=log_local, config=config) # Use normal formatting from here on out form = logging.Formatter("[%(asctime)s] [%(levelname)8s] %(message)s") fh.setFormatter(form) return log_local
[docs] def group_by_time(scenes, time=3): """ Group scenes by their acquisition time difference. Parameters ---------- scenes:list[pyroSAR.drivers.ID or str] a list of image names time: int or float a time difference in seconds by which to group the scenes. The default of 3 seconds incorporates the overlap between SLCs. Returns ------- list[list[pyroSAR.drivers.ID]] a list of sub-lists containing the file names of the grouped scenes """ # sort images by time stamp scenes = identify_many(scenes, sortkey='start') if len(scenes) < 2: return [scenes] groups = [[scenes[0]]] group = groups[0] for i in range(1, len(scenes)): start = datetime.strptime(scenes[i].start, '%Y%m%dT%H%M%S') stop_pred = datetime.strptime(scenes[i - 1].stop, '%Y%m%dT%H%M%S') diff = abs((stop_pred - start).total_seconds()) if diff <= time: group.append(scenes[i]) else: groups.append([scenes[i]]) group = groups[-1] return groups
def _log_process_config(logger, config): """ Adds a header to the logfile, which includes information about the current processing configuration. Parameters ---------- logger: logging.Logger The logger to which the header is added to. config: dict Dictionary of the parsed config parameters for the current process. """ try: core = examine.ExamineSnap().get_version('core') s1tbx = examine.ExamineSnap().get_version('s1tbx') snap_core = f"{core['version']} | {core['date']}" snap_s1tbx = f"{s1tbx['version']} | {s1tbx['date']}" except RuntimeError: snap_core = 'unknown' snap_s1tbx = 'unknown' header = f""" ==================================================================================================================== PROCESSING CONFIGURATION mode {config['mode']} aoi_tiles {config['aoi_tiles']} aoi_geometry {config['aoi_geometry']} scene {config['scene']} mindate {config['mindate'].isoformat()} maxdate {config['maxdate'].isoformat()} date_strict {config['date_strict']} sensor {config['sensor']} acq_mode {config['acq_mode']} product {config['product']} datatake {config['datatake']} measurement {config.get('measurement')} annotation {config.get('annotation')} dem_type {config.get('dem_type')} etad {config.get('etad')} work_dir {config['work_dir']} sar_dir {config['sar_dir']} tmp_dir {config['tmp_dir']} ard_dir {config['ard_dir']} wbm_dir {config['wbm_dir']} log_dir {config['log_dir']} etad_dir {config['etad_dir']} scene_dir {config['scene_dir']} db_file {config['db_file']} stac_catalog {config['stac_catalog']} stac_collections {config['stac_collections']} kml_file {config['kml_file']} gdal_threads {config.get('gdal_threads')} snap_gpt_args {config['snap_gpt_args']} ==================================================================================================================== SOFTWARE S1_NRB {S1_NRB.__version__} snap-core {snap_core} snap-s1tbx {snap_s1tbx} python {sys.version} python-pyroSAR {pyroSAR.__version__} python-spatialist {spatialist.__version__} python-GDAL {gdal.__version__} ==================================================================================================================== """ print(header) logger.info(header)
[docs] def log(handler, mode, proc_step, scenes, msg): """ Format and handle log messages during processing. Parameters ---------- handler: logging.Logger The log handler for the current process. mode: {'info', 'warning', 'exception'} Calls the respective logging helper function. E.g., ``handler.info()``. proc_step: str The processing step for which the message is logged. scenes: str or list[str] Scenes that are currently being processed. msg: Any The message that should be logged. """ proc_step = proc_step.zfill(7).replace('0', ' ') message = '[{proc_step}] -- {scenes} -- {msg}' message = message.format(proc_step=proc_step, scenes=scenes, msg=msg) if mode == 'info': handler.info(message) elif mode == 'warning': handler.warning(message) elif mode == 'exception': handler.exception(message) else: raise RuntimeError('log mode {} is not supported'.format(mode))
[docs] def vrt_add_overviews(vrt, overviews, resampling='AVERAGE'): """ Add overviews to an existing VRT file. Existing overviews will be overwritten. Parameters ---------- vrt: str the VRT file overviews: list[int] the overview levels resampling: str the overview resampling method Returns ------- """ tree = etree.parse(vrt) root = tree.getroot() ovr = root.find('OverviewList') if ovr is None: ovr = etree.SubElement(root, 'OverviewList') ovr.text = ' '.join([str(x) for x in overviews]) ovr.attrib['resampling'] = resampling.lower() etree.indent(root) tree.write(vrt, pretty_print=True, xml_declaration=False, encoding='utf-8')
[docs] def buffer_min_overlap(geom1, geom2, percent=1): """ Buffer a geometry to a minimum overlap with a second geometry. The geometry is iteratively buffered until the minimum overlap is reached. If the overlap of the input geometries is already larger than the defined threshold, a copy of the original geometry is returned. Parameters ---------- geom1: spatialist.vector.Vector the geometry to be buffered geom2: spatialist.vector.Vector the reference geometry to intersect with percent: int or float the minimum overlap in percent of `geom1` Returns ------- """ geom2_area = geom2.getArea() ext = geom1.extent ext2 = ext.copy() xdist = ext['xmax'] - ext['xmin'] ydist = ext['ymax'] - ext['ymin'] buffer = 0 overlap = 0 while overlap < percent: xbuf = xdist * buffer / 100 / 2 ybuf = ydist * buffer / 100 / 2 ext2['xmin'] = ext['xmin'] - xbuf ext2['xmax'] = ext['xmax'] + xbuf ext2['ymin'] = ext['ymin'] - ybuf ext2['ymax'] = ext['ymax'] + ybuf with bbox(ext2, 4326) as geom3: ext3 = geom3.extent with intersect(geom2, geom3) as inter: inter_area = inter.getArea() overlap = inter_area / geom2_area * 100 buffer += 1 return bbox(ext3, 4326)
[docs] def buffer_time(start, stop, **kwargs): """ Time range buffering Parameters ---------- start: str the start time in format '%Y%m%dT%H%M%S' stop: str the stop time in format '%Y%m%dT%H%M%S' kwargs time arguments passed to :func:`datetime.timedelta` Returns ------- """ f = '%Y%m%dT%H%M%S' td = timedelta(**kwargs) start = datetime.strptime(start, f) - td start = datetime.strftime(start, f) stop = datetime.strptime(stop, f) + td stop = datetime.strftime(stop, f) return start, stop