Source code for S1_NRB.config

import os
import re
from datetime import timedelta
import configparser
import dateutil.parser
from osgeo import gdal

[docs] def get_keys(section): """ get all allowed configuration keys Parameters ---------- section: {'processing', 'metadata'} the configuration section to get the allowed keys for. Returns ------- list[str] a list of keys """ if section == 'processing': return ['mode', 'aoi_tiles', 'aoi_geometry', 'mindate', 'maxdate', 'acq_mode', 'datatake', 'work_dir', 'scene_dir', 'sar_dir', 'tmp_dir', 'wbm_dir', 'measurement', 'db_file', 'kml_file', 'dem_type', 'gdal_threads', 'log_dir', 'ard_dir', 'etad', 'etad_dir', 'product', 'annotation', 'stac_catalog', 'stac_collections', 'sensor', 'date_strict', 'snap_gpt_args', 'scene'] elif section == 'metadata': return ['format', 'copy_original', 'access_url', 'licence', 'doi', 'processing_center'] else: raise RuntimeError(f"unknown section: {section}. Options: 'processing', 'metadata'.")
[docs] def get_config(config_file, proc_section='PROCESSING', **kwargs): """Returns the content of a `config.ini` file as a dictionary. Parameters ---------- config_file: str Full path to the config file that should be parsed to a dictionary. proc_section: str Section of the config file that processing parameters should be parsed from. Default is 'PROCESSING'. Returns ------- out_dict: dict Dictionary of the parsed config parameters. """ parser = configparser.ConfigParser(allow_no_value=True, converters={'_annotation': _parse_annotation, '_datetime': _parse_datetime, '_modes': _parse_modes, '_stac_collections': _parse_list, '_tile_list': _parse_tile_list, '_list': _parse_list}) if isinstance(config_file, str): if not os.path.isfile(config_file): raise FileNotFoundError("Config file {} does not exist.".format(config_file)) elif config_file is None: parser.add_section(proc_section) parser.add_section('METADATA') else: raise TypeError(f"'config_file' must be of type str or None, was {type(config_file)}") out_dict = {} # PROCESSING section allowed_keys = get_keys(section='processing') try: proc_sec = parser[proc_section] except KeyError: raise KeyError("Section '{}' does not exist in config file {}".format(proc_section, config_file)) # override config file parameters for k, v in kwargs.items(): proc_sec[k] = v.strip() # set some defaults if 'etad' not in proc_sec.keys(): proc_sec['etad'] = 'False' proc_sec['etad_dir'] = 'None' for item in ['sar_dir', 'tmp_dir', 'ard_dir', 'wbm_dir', 'log_dir']: if item not in proc_sec.keys(): proc_sec[item] = item[:3].upper() if 'gdal_threads' not in proc_sec.keys(): proc_sec['gdal_threads'] = '4' if 'dem_type' not in proc_sec.keys(): proc_sec['dem_type'] = 'Copernicus 30m Global DEM' if 'date_strict' not in proc_sec.keys(): proc_sec['date_strict'] = 'True' if 'snap_gpt_args' not in proc_sec.keys(): proc_sec['snap_gpt_args'] = 'None' if 'datatake' not in proc_sec.keys(): proc_sec['datatake'] = 'None' # use previous defaults for measurement and annotation if they have not been defined if 'measurement' not in proc_sec.keys(): proc_sec['measurement'] = 'gamma' if 'annotation' not in proc_sec.keys(): proc_sec['annotation'] = 'dm,ei,id,lc,li,np,ratio' # check completeness of configuration parameters missing = [] exclude = ['aoi_tiles', 'aoi_geometry'] for key in get_keys(section='processing'): if key not in proc_sec.keys() and key not in exclude: missing.append(key) if len(missing) > 0: missing_str = '\n - ' + '\n - '.join(missing) raise RuntimeError(f"missing the following parameters:{missing_str}") for k, v in proc_sec.items(): v = _keyval_check(key=k, val=v, allowed_keys=allowed_keys) if k == 'mode': v = proc_sec.get_modes(k) if k == 'aoi_tiles': if v is not None: v = proc_sec.get_tile_list(k) if k == 'aoi_geometry': if v is not None: assert os.path.isfile(v), "Parameter '{}': File {} could not be found".format(k, v) if k == 'mindate': v = proc_sec.get_datetime(k) if k == 'maxdate': date_short ='^[0-9-]{10}$', v) is not None v = proc_sec.get_datetime(k) if date_short: v += timedelta(days=1, microseconds=-1) if k == 'sensor': assert v in ['S1A', 'S1B'] if k == 'acq_mode': assert v in ['IW', 'EW', 'SM'] if k == 'work_dir': assert os.path.isdir(v), "Parameter '{}': '{}' must be an existing directory".format(k, v) dir_ignore = ['work_dir'] if proc_sec['etad'] == 'False': dir_ignore.append('etad_dir') if k == 'scene_dir' and v is None: dir_ignore.append(k) if k.endswith('_dir') and k not in dir_ignore: if any(x in v for x in ['/', '\\']): assert os.path.isdir(v), "Parameter '{}': {} is a full path to a non-existing directory".format(k, v) else: v = os.path.join(proc_sec['work_dir'], v) if k.endswith('_file') and not k.startswith('db'): if any(x in v for x in ['/', '\\']): assert os.path.isfile(v), "Parameter '{}': File {} could not be found".format(k, v) else: v = os.path.join(proc_sec['work_dir'], v) assert os.path.isfile(v), "Parameter '{}': File {} could not be found".format(k, v) if k == 'db_file' and v is not None: if not any(x in v for x in ['/', '\\']): v = os.path.join(proc_sec['work_dir'], v) if k == 'stac_collections': v = proc_sec.get_stac_collections(k) if k == 'gdal_threads': v = int(v) if k == 'dem_type': allowed = ['Copernicus 10m EEA DEM', 'Copernicus 30m Global DEM II', 'Copernicus 30m Global DEM', 'GETASSE30'] assert v in allowed, "Parameter '{}': expected to be one of {}; got '{}' instead".format(k, allowed, v) if k in ['etad', 'date_strict']: v = proc_sec.getboolean(k) if k == 'product': allowed = ['GRD', 'SLC'] assert v in allowed, "Parameter '{}': expected to be one of {}; got '{}' instead".format(k, allowed, v) if k == 'measurement': allowed = ['gamma', 'sigma'] assert v in allowed, "Parameter '{}': expected to be one of {}; got '{}' instead".format(k, allowed, v) if k == 'annotation': v = proc_sec.get_annotation(k) if k == 'snap_gpt_args': v = proc_sec.get_list(k) if k == 'datatake': v = proc_sec.get_list(k) out_dict[k] = v if out_dict['db_file'] is None and out_dict['stac_catalog'] is None: raise RuntimeError("Either 'db_file' or 'stac_catalog' has to be defined.") if out_dict['db_file'] is not None and out_dict['stac_catalog'] is not None: raise RuntimeError("both 'db_file' and 'stac_catalog' have been defined. Please choose only one.") if out_dict['stac_catalog'] is not None: if out_dict['stac_collections'] is None: raise RuntimeError("'stac_collections' must be defined if data is to be searched in a STAC.") if out_dict['db_file'] is not None: if out_dict['scene_dir'] is None: raise RuntimeError("'scene_dir' must be defined if data is to be searched via an SQLite database.") # METADATA section allowed_keys = get_keys(section='metadata') if 'METADATA' not in parser.keys(): parser.add_section('METADATA') meta_sec = parser['METADATA'] out_dict['meta'] = {} # set defaults if 'format' not in meta_sec.keys(): meta_sec['format'] = 'OGC, STAC' if 'copy_original' not in meta_sec.keys(): meta_sec['copy_original'] = 'True' for k, v in meta_sec.items(): v = _keyval_check(key=k, val=v, allowed_keys=allowed_keys) if k == 'format': v = meta_sec.get_list(k) if k == 'copy_original': v = meta_sec.getboolean(k) out_dict['meta'][k] = v for key in allowed_keys: if key not in out_dict['meta'].keys(): out_dict['meta'][key] = None return out_dict
def _parse_annotation(s): """Custom converter for configparser:""" annotation_list = _parse_list(s) if annotation_list is not None: allowed = ['dm', 'ei', 'em', 'id', 'lc', 'ld', 'li', 'np', 'ratio', 'wm'] for layer in annotation_list: if layer not in allowed: msg = "Parameter 'annotation': Error while parsing to list; " \ "layer '{}' is not supported. Allowed keys:\n{}" raise ValueError(msg.format(layer, allowed)) return annotation_list def _parse_datetime(s): """Custom converter for configparser:""" return dateutil.parser.parse(s) def _parse_modes(s): """Custom converter for configparser:""" mode_list = _parse_list(s) allowed = ['sar', 'nrb', 'orb'] for mode in mode_list: if mode not in allowed: msg = "Parameter 'annotation': Error while parsing to list; " \ "mode '{}' is not supported. Allowed keys:\n{}" raise ValueError(msg.format(mode, allowed)) return mode_list def _parse_tile_list(s): """Custom converter for configparser:""" tile_list = _parse_list(s) if tile_list is not None: for tile in tile_list: if len(tile) != 5: raise ValueError("Parameter 'aoi_tiles': Error while parsing " "MGRS tile IDs to list; tile '{}' is not 5 " "digits long.".format(tile)) else: continue return tile_list def _parse_list(s): """Custom converter for configparser:""" if s in ['', 'None']: return None else: return [x.strip() for x in s.split(',')] def _keyval_check(key, val, allowed_keys): """Helper function to check and clean up key,value pairs while parsing a config file.""" if key not in allowed_keys: raise ValueError("Parameter '{}' is not allowed; should be one of {}".format(key, allowed_keys)) val = val.replace('"', '').replace("'", "") if val in ['None', 'none', '']: val = None return val
[docs] def snap_conf(config): """ Returns a dictionary of additional parameters for :func:`S1_NRB.snap.process` based on processing configurations provided by the config file. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. Returns ------- dict Dictionary of parameters that can be passed to :func:`S1_NRB.snap.process` """ return {'spacing': {'IW': 10, 'SM': 10, 'EW': 40}[config['acq_mode']], 'allow_res_osv': True, 'dem_resampling_method': 'BILINEAR_INTERPOLATION', 'img_resampling_method': 'BILINEAR_INTERPOLATION', 'clean_edges': True, 'clean_edges_pixels': 4, 'cleanup': True }
[docs] def gdal_conf(config): """ Stores GDAL configuration options for the current process. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. Returns ------- dict Dictionary containing GDAL configuration options for the current process. """ threads = config['gdal_threads'] threads_before = gdal.GetConfigOption('GDAL_NUM_THREADS') if not isinstance(threads, int): raise TypeError("'threads' must be of type int") if threads == 1: multithread = False elif threads > 1: multithread = True gdal.SetConfigOption('GDAL_NUM_THREADS', str(threads)) else: raise ValueError("'threads' must be >= 1") return {'threads': threads, 'threads_before': threads_before, 'multithread': multithread}