Source code for S1_NRB.config

import os
import configparser
from datetime import datetime
from osgeo import gdal


[docs]def get_config(config_file, proc_section='PROCESSING'): """Returns the content of a `config.ini` file as a dictionary. Parameters ---------- config_file: str Full path to the config file that should be parsed to a dictionary. proc_section: str Section of the config file that processing parameters should be parsed from. Default is 'PROCESSING'. Returns ------- out_dict: dict Dictionary of the parsed config parameters. """ if not os.path.isfile(config_file): raise FileNotFoundError("Config file {} does not exist.".format(config_file)) parser = configparser.ConfigParser(allow_no_value=True, converters={'_annotation': _parse_annotation, '_datetime': _parse_datetime, '_tile_list': _parse_tile_list}) parser.read(config_file) out_dict = {} # PROCESSING section allowed_keys = ['mode', 'aoi_tiles', 'aoi_geometry', 'mindate', 'maxdate', 'acq_mode', 'work_dir', 'scene_dir', 'rtc_dir', 'tmp_dir', 'wbm_dir', 'measurement', 'db_file', 'kml_file', 'dem_type', 'gdal_threads', 'log_dir', 'nrb_dir', 'etad', 'etad_dir', 'product', 'annotation'] try: proc_sec = parser[proc_section] except KeyError: raise KeyError("Section '{}' does not exist in config file {}".format(proc_section, config_file)) for k, v in proc_sec.items(): v = _keyval_check(key=k, val=v, allowed_keys=allowed_keys) if k == 'mode': allowed = ['nrb', 'rtc', 'all'] assert v in allowed, "Parameter '{}': expected to be one of {}; got '{}' instead".format(k, allowed, v) v = v.lower() if k == 'aoi_tiles': if v is not None: v = proc_sec.get_tile_list(k) if k == 'aoi_geometry': if v is not None: assert os.path.isfile(v), "Parameter '{}': File {} could not be found".format(k, v) if k.endswith('date'): v = proc_sec.get_datetime(k) if k == 'acq_mode': assert v in ['IW', 'EW', 'SM'] if k == 'work_dir': assert os.path.isdir(v), "Parameter '{}': '{}' must be an existing directory".format(k, v) dir_ignore = ['work_dir'] if proc_sec['etad'] == 'False': dir_ignore.append('etad_dir') if k.endswith('_dir') and k not in dir_ignore: if any(x in v for x in ['/', '\\']): assert os.path.isdir(v), "Parameter '{}': {} is a full path to a non-existing directory".format(k, v) else: v = os.path.join(proc_sec['work_dir'], v) if k.endswith('_file') and not k.startswith('db'): if any(x in v for x in ['/', '\\']): assert os.path.isfile(v), "Parameter '{}': File {} could not be found".format(k, v) else: v = os.path.join(proc_sec['work_dir'], v) assert os.path.isfile(v), "Parameter '{}': File {} could not be found".format(k, v) if k == 'db_file': if not any(x in v for x in ['/', '\\']): v = os.path.join(proc_sec['work_dir'], v) if k == 'gdal_threads': v = int(v) if k == 'dem_type': allowed = ['Copernicus 10m EEA DEM', 'Copernicus 30m Global DEM II', 'Copernicus 30m Global DEM', 'GETASSE30'] assert v in allowed, "Parameter '{}': expected to be one of {}; got '{}' instead".format(k, allowed, v) if k == 'etad': if v.lower() == 'true': v = True elif v.lower() == 'false': v = False else: allowed = ['True', 'true', 'False', 'false'] raise ValueError("Parameter '{}': expected to be one of {}; got '{}' instead".format(k, allowed, v)) if k == 'product': allowed = ['GRD', 'SLC'] assert v in allowed, "Parameter '{}': expected to be one of {}; got '{}' instead".format(k, allowed, v) if k == 'measurement': allowed = ['gamma', 'sigma'] assert v in allowed, "Parameter '{}': expected to be one of {}; got '{}' instead".format(k, allowed, v) if k == 'annotation': v = proc_sec.get_annotation(k) out_dict[k] = v # use previous defaults for measurement and annotation if they have not been defined if 'measurement' not in out_dict.keys(): out_dict['measurement'] = 'gamma' if 'annotation' not in out_dict.keys(): if out_dict['measurement'] == 'gamma': out_dict['annotation'] = ['dm', 'ei', 'id', 'lc', 'li', 'np', 'gs'] else: out_dict['annotation'] = ['dm', 'ei', 'id', 'lc', 'li', 'np', 'sg'] assert any([out_dict[k] is not None for k in ['aoi_tiles', 'aoi_geometry']]) # METADATA section meta_keys = ['access_url', 'licence', 'doi', 'processing_center'] try: meta_sec = parser['METADATA'] out_dict['meta'] = {} for k, v in meta_sec.items(): v = _keyval_check(key=k, val=v, allowed_keys=meta_keys) # No need to check values. Only requirement is that they're strings, which is configparser's default. out_dict['meta'][k] = v except KeyError: # Use None for all relevant fields if the metadata section doesn't exist. out_dict['meta'] = dict([(k, None) for k in meta_keys]) return out_dict
def _parse_annotation(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" if s in ['', 'None']: return None annotation_list = s.replace(' ', '').split(',') allowed = ['dm', 'ei', 'em', 'id', 'lc', 'li', 'np', 'gs', 'sg'] for annotation in annotation_list: if annotation not in allowed: msg = "Parameter 'annotation': Error while parsing to list; " \ "annotation '{}' is not supported. Allowed keys:\n{}" raise ValueError(msg.format(annotation, allowed)) return annotation_list def _parse_datetime(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" if 'T' in s: try: return datetime.strptime(s, '%Y-%m-%dT%H:%M:%S') except ValueError as e: raise Exception("Parameters 'mindate/maxdate': Could not parse '{}' with datetime format " "'%Y-%m-%dT%H:%M:%S'".format(s)) from e else: try: return datetime.strptime(s, '%Y-%m-%d') except ValueError as e: raise Exception("Parameters 'mindate/maxdate': Could not parse '{}' with datetime format " "'%Y-%m-%d'".format(s)) from e def _parse_tile_list(s): """Custom converter for configparser: https://docs.python.org/3/library/configparser.html#customizing-parser-behaviour""" tile_list = s.replace(' ', '').split(',') for tile in tile_list: if len(tile) != 5: raise ValueError("Parameter 'aoi_tiles': Error while parsing MGRS tile IDs to list; tile '{}' is not 5 " "digits long.".format(tile)) else: continue return tile_list def _keyval_check(key, val, allowed_keys): """Helper function to check and clean up key,value pairs while parsing a config file.""" if key not in allowed_keys: raise ValueError("Parameter '{}' is not allowed; should be one of {}".format(key, allowed_keys)) val = val.replace('"', '').replace("'", "") if val in ['None', 'none', '']: val = None return val
[docs]def snap_conf(config): """ Returns a dictionary of additional parameters for :func:`S1_NRB.snap.process` based on processing configurations provided by the config file. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. Returns ------- dict Dictionary of parameters that can be passed to :func:`S1_NRB.snap.process` """ return {'spacing': {'IW': 10, 'SM': 10, 'EW': 20}[config['acq_mode']], 'allow_res_osv': True, 'dem_resampling_method': 'BILINEAR_INTERPOLATION', 'img_resampling_method': 'BILINEAR_INTERPOLATION', 'slc_clean_edges': True, 'slc_clean_edges_pixels': 4, 'cleanup': True }
[docs]def gdal_conf(config): """ Stores GDAL configuration options for the current process. Parameters ---------- config: dict Dictionary of the parsed config parameters for the current process. Returns ------- dict Dictionary containing GDAL configuration options for the current process. """ threads = config['gdal_threads'] threads_before = gdal.GetConfigOption('GDAL_NUM_THREADS') if not isinstance(threads, int): raise TypeError("'threads' must be of type int") if threads == 1: multithread = False elif threads > 1: multithread = True gdal.SetConfigOption('GDAL_NUM_THREADS', str(threads)) else: raise ValueError("'threads' must be >= 1") return {'threads': threads, 'threads_before': threads_before, 'multithread': multithread}