import os
import re
import time
import shutil
import tempfile
from datetime import datetime, timezone
import numpy as np
from lxml import etree
from time import gmtime, strftime
from copy import deepcopy
from scipy.interpolate import griddata
from osgeo import gdal
from spatialist import Raster, Vector, vectorize, boundary, bbox, intersect, rasterize
from spatialist.auxil import gdalwarp, gdalbuildvrt
from spatialist.ancillary import finder
from pyroSAR import identify, identify_many
from pyroSAR.ancillary import find_datasets
import S1_NRB
from S1_NRB.metadata import extract, xml, stac
from S1_NRB.metadata.mapping import ITEM_MAP
from S1_NRB.ancillary import generate_unique_id
from S1_NRB.metadata.extract import etree_from_sid, find_in_annotation
[docs]def get_datasets(scenes, datadir, tile, extent, epsg):
"""
Identifies all source SLC scenes, finds matching output files processed with :func:`pyroSAR.snap.util.geocode` in
`datadir` and filters both lists depending on the actual overlap of each SLC footprint with the current MGRS tile
geometry.
Parameters
----------
scenes: list[str]
List of scenes to process. Either an individual scene or multiple, matching scenes (consecutive acquisitions).
datadir: str
The directory containing the datasets processed from the source scenes using pyroSAR.
tile: str
ID of an MGRS tile.
extent: dict
Spatial extent of the MGRS tile, derived from a :class:`~spatialist.vector.Vector` object.
epsg: int
The coordinate reference system as an EPSG code.
Returns
-------
ids: list[:class:`pyroSAR.drivers.ID`]
List of :class:`~pyroSAR.drivers.ID` objects of all source SLC scenes that overlap with the current MGRS tile.
datasets: list[str] or list[list[str]]
List of output files processed with :func:`pyroSAR.snap.util.geocode` that match each
:class:`~pyroSAR.drivers.ID` object of `ids`. The format is a list of strings if only a single object is stored
in `ids`, else it is a list of lists.
datamasks: list[str]
List of raster datamask files covering the footprint of each source SLC scene that overlaps with the current
MGRS tile.
"""
ids = identify_many(scenes)
datasets = []
for _id in ids:
scene_base = os.path.splitext(os.path.basename(_id.scene))[0]
scene_dir = os.path.join(datadir, scene_base, str(epsg))
datasets.append(find_datasets(directory=scene_dir))
if len(datasets) == 0:
raise RuntimeError("No pyroSAR datasets were found in the directory '{}'".format(datadir))
pattern = '[VH]{2}_gamma0-rtc'
i = 0
datamasks = []
while i < len(datasets):
pols = [x for x in datasets[i] if re.search(pattern, os.path.basename(x))]
snap_dm_ras = re.sub(pattern, 'datamask', pols[0])
snap_dm_vec = snap_dm_ras.replace('.tif', '.gpkg')
if not all([os.path.isfile(x) for x in [snap_dm_ras, snap_dm_vec]]):
with Raster(pols[0]) as ras:
arr = ras.array()
mask = ~np.isnan(arr)
with vectorize(target=mask, reference=ras) as vec:
with boundary(vec, expression="value=1") as bounds:
if not os.path.isfile(snap_dm_ras):
print('creating raster mask', i)
rasterize(vectorobject=bounds, reference=ras, outname=snap_dm_ras)
if not os.path.isfile(snap_dm_vec):
print('creating vector mask', i)
bounds.write(outfile=snap_dm_vec)
with Vector(snap_dm_vec) as bounds:
with bbox(extent, epsg) as tile_geom:
inter = intersect(bounds, tile_geom)
if inter is None:
print('removing dataset', i)
del ids[i]
del datasets[i]
else:
# Add snap_dm_ras to list if it overlaps with the current tile
datamasks.append(snap_dm_ras)
i += 1
inter.close()
if len(ids) == 0:
raise RuntimeError('None of the scenes overlap with the current tile {tile_id}: '
'\n{scenes}'.format(tile_id=tile, scenes=scenes))
if len(datasets) > 1:
datasets = list(zip(*datasets))
else:
datasets = datasets[0]
return ids, datasets, datamasks
[docs]def create_vrt(src, dst, fun, relpaths=False, scale=None, offset=None,
options=None, overviews=None, overview_resampling=None):
"""
Creates a VRT file for the specified source dataset(s) and adds a pixel function that should be applied on the fly
when opening the VRT file.
Parameters
----------
src: str or list[str]
The input dataset(s).
dst: str
The output dataset.
fun: str
A PixelFunctionType that should be applied on the fly when opening the VRT file. The function is applied to a
band that derives its pixel information from the source bands. A list of possible options can be found here:
https://gdal.org/drivers/raster/vrt.html#default-pixel-functions
Furthermore, the option 'decibel' can be specified, which will implement a custom pixel function that uses
Python code for decibel conversion (10*log10).
relpaths: bool, optional
Should all `SourceFilename` XML elements with attribute `@relativeToVRT="0"` be updated to be paths relative to
the output VRT file? Default is False.
scale: int, optional
The scale that should be applied when computing “real” pixel values from scaled pixel values on a raster band.
Will be ignored if `fun='decibel'`.
offset: float, optional
The offset that should be applied when computing “real” pixel values from scaled pixel values on a raster band.
Will be ignored if `fun='decibel'`.
options: dict, optional
Additional parameters passed to `gdal.BuildVRT`.
overviews: list[int], optional
Internal overview levels to be created for each raster file.
overview_resampling: str, optional
Resampling method for overview levels.
"""
gdalbuildvrt(src=src, dst=dst, options=options)
tree = etree.parse(dst)
root = tree.getroot()
band = tree.find('VRTRasterBand')
band.attrib['subClass'] = 'VRTDerivedRasterBand'
if fun == 'decibel':
pxfun_language = etree.SubElement(band, 'PixelFunctionLanguage')
pxfun_language.text = 'Python'
pxfun_type = etree.SubElement(band, 'PixelFunctionType')
pxfun_type.text = fun
pxfun_code = etree.SubElement(band, 'PixelFunctionCode')
pxfun_code.text = etree.CDATA("""
import numpy as np
def decibel(in_ar, out_ar, xoff, yoff, xsize, ysize, raster_xsize, raster_ysize, buf_radius, gt, **kwargs):
np.multiply(np.log10(in_ar[0], where=in_ar[0]>0.0, out=out_ar, dtype='float32'), 10.0, out=out_ar, dtype='float32')
""")
else:
pixfun_type = etree.SubElement(band, 'PixelFunctionType')
pixfun_type.text = fun
if scale is not None:
sc = etree.SubElement(band, 'Scale')
sc.text = str(scale)
if offset is not None:
off = etree.SubElement(band, 'Offset')
off.text = str(offset)
if any([overviews, overview_resampling]) is not None:
ovr = tree.find('OverviewList')
if ovr is None:
ovr = etree.SubElement(root, 'OverviewList')
if overview_resampling is not None:
ovr.attrib['resampling'] = overview_resampling.lower()
if overviews is not None:
ov = str(overviews)
for x in ['[', ']', ',']:
ov = ov.replace(x, '')
ovr.text = ov
if relpaths:
srcfiles = tree.xpath('//SourceFilename[@relativeToVRT="0"]')
for srcfile in srcfiles:
repl = os.path.relpath(srcfile.text, start=os.path.dirname(dst))
srcfile.text = repl
srcfile.attrib['relativeToVRT'] = '1'
etree.indent(root)
tree.write(dst, pretty_print=True, xml_declaration=False, encoding='utf-8')
[docs]def create_rgb_vrt(outname, infiles, overviews, overview_resampling):
"""
Creation of the color composite VRT file.
Parameters
----------
outname: str
Full path to the output VRT file.
infiles: list[str]
A list of paths pointing to the linear scaled measurement backscatter files.
overviews: list[int]
Internal overview levels to be defined for the created VRT file.
overview_resampling: str
Resampling method applied to overview pyramids.
"""
print(outname)
# make sure order is right and co-polarization (VV or HH) is first
pols = [re.search('[hv]{2}', os.path.basename(f)).group() for f in infiles]
if pols[1] in ['vv', 'hh']:
infiles.reverse()
pols.reverse()
# format overview levels
ov = str(overviews)
for x in ['[', ']', ',']:
ov = ov.replace(x, '')
# create VRT file and change its content
gdalbuildvrt(src=infiles, dst=outname, options={'separate': True})
tree = etree.parse(outname)
root = tree.getroot()
srs = tree.find('SRS').text
geotrans = tree.find('GeoTransform').text
bands = tree.findall('VRTRasterBand')
new_band = etree.SubElement(root, 'VRTRasterBand',
attrib={'dataType': 'Float32', 'band': '3', 'subClass': 'VRTDerivedRasterBand'})
new_band.append(deepcopy(bands[0].find('NoDataValue')))
pxfun_type = etree.SubElement(new_band, 'PixelFunctionType')
pxfun_type.text = 'mul'
new_band.append(deepcopy(bands[0].find('ComplexSource')))
new_band.append(deepcopy(bands[1].find('ComplexSource')))
src = new_band.findall('ComplexSource')[1]
fname = src.find('SourceFilename')
fname_old = fname.text
nodata = src.find('NODATA').text
src_attr = src.find('SourceProperties').attrib
fname.text = etree.CDATA("""
<VRTDataset rasterXSize="{rasterxsize}" rasterYSize="{rasterysize}">
<SRS dataAxisToSRSAxisMapping="1,2">{srs}</SRS>
<GeoTransform>{geotrans}</GeoTransform>
<VRTRasterBand dataType="{dtype}" band="1" subClass="VRTDerivedRasterBand">
<PixelFunctionType>{px_fun}</PixelFunctionType>
<ComplexSource>
<SourceFilename relativeToVRT="1">{fname}</SourceFilename>
<SourceBand>1</SourceBand>
<SourceProperties RasterXSize="{rasterxsize}" RasterYSize="{rasterysize}" DataType="{dtype}" BlockXSize="{blockxsize}" BlockYSize="{blockysize}"/>
<SrcRect xOff="0" yOff="0" xSize="{rasterxsize}" ySize="{rasterysize}"/>
<DstRect xOff="0" yOff="0" xSize="{rasterxsize}" ySize="{rasterysize}"/>
<NODATA>{nodata}</NODATA>
</ComplexSource>
</VRTRasterBand>
<OverviewList resampling="{ov_resampling}">{ov}</OverviewList>
</VRTDataset>
""".format(rasterxsize=src_attr['RasterXSize'], rasterysize=src_attr['RasterYSize'], srs=srs, geotrans=geotrans,
dtype=src_attr['DataType'], px_fun='inv', fname=fname_old,
blockxsize=src_attr['BlockXSize'], blockysize=src_attr['BlockYSize'],
nodata=nodata, ov_resampling=overview_resampling.lower(), ov=ov))
bands = tree.findall('VRTRasterBand')
for band, col in zip(bands, ['Red', 'Green', 'Blue']):
color = etree.Element('ColorInterp')
color.text = col
band.insert(0, color)
for i, band in enumerate(bands):
if i in [0, 1]:
band.remove(band.find('NoDataValue'))
ovr = etree.SubElement(root, 'OverviewList', attrib={'resampling': overview_resampling.lower()})
ovr.text = ov
etree.indent(root)
tree.write(outname, pretty_print=True, xml_declaration=False, encoding='utf-8')
[docs]def calc_product_start_stop(src_ids, extent, epsg):
"""
Calculates the start and stop times of the NRB product.
The geolocation grid points including their azimuth time information are extracted first from the metadata of each
source SLC. These grid points are then used to interpolate the azimuth time for the lower right and upper left
(ascending) or upper right and lower left (descending) corners of the MGRS tile extent.
Parameters
----------
src_ids: list[pyroSAR.drivers.ID]
List of :class:`~pyroSAR.drivers.ID` objects of all source SLC scenes that overlap with the current MGRS tile.
extent: dict
Spatial extent of the MGRS tile, derived from a :class:`~spatialist.vector.Vector` object.
epsg: int
The coordinate reference system as an EPSG code.
Returns
-------
start: str
Start time of the NRB product formatted as `%Y%m%dT%H%M%S` in UTC.
stop: str
Stop time of the NRB product formatted as `%Y%m%dT%H%M%S` in UTC.
"""
with bbox(extent, epsg) as tile_geom:
tile_geom.reproject(4326)
ext = tile_geom.extent
ul = (ext['xmin'], ext['ymax'])
ur = (ext['xmax'], ext['ymax'])
lr = (ext['xmax'], ext['ymin'])
ll = (ext['xmin'], ext['ymin'])
tile_geom = None
slc_dict = {}
for i, sid in enumerate(src_ids):
uid = os.path.basename(sid.scene).split('.')[0][-4:]
slc_dict[uid] = etree_from_sid(sid)
slc_dict[uid]['sid'] = sid
uids = list(slc_dict.keys())
for uid in uids:
t = find_in_annotation(annotation_dict=slc_dict[uid]['annotation'],
pattern='.//geolocationGridPoint/azimuthTime')
swaths = t.keys()
y = find_in_annotation(annotation_dict=slc_dict[uid]['annotation'], pattern='.//geolocationGridPoint/latitude')
x = find_in_annotation(annotation_dict=slc_dict[uid]['annotation'], pattern='.//geolocationGridPoint/longitude')
t_flat = np.asarray([datetime.fromisoformat(item).timestamp() for sublist in [t[swath] for swath in swaths]
for item in sublist])
y_flat = np.asarray([float(item) for sublist in [y[swath] for swath in swaths] for item in sublist])
x_flat = np.asarray([float(item) for sublist in [x[swath] for swath in swaths] for item in sublist])
g = np.asarray([(x, y) for x, y in zip(x_flat, y_flat)])
slc_dict[uid]['az_time'] = t_flat
slc_dict[uid]['gridpts'] = g
if len(uids) == 2:
starts = [datetime.strptime(slc_dict[key]['sid'].start, '%Y%m%dT%H%M%S') for key in slc_dict.keys()]
if starts[0] > starts[1]:
az_time = np.concatenate([slc_dict[uids[1]]['az_time'], slc_dict[uids[0]]['az_time']])
gridpts = np.concatenate([slc_dict[uids[1]]['gridpts'], slc_dict[uids[0]]['gridpts']])
else:
az_time = np.concatenate([slc_dict[key]['az_time'] for key in slc_dict.keys()])
gridpts = np.concatenate([slc_dict[key]['gridpts'] for key in slc_dict.keys()])
else:
az_time = slc_dict[uids[0]]['az_time']
gridpts = slc_dict[uids[0]]['gridpts']
if slc_dict[uids[0]]['sid'].orbit == 'A':
coord1 = lr
coord2 = ul
else:
coord1 = ur
coord2 = ll
method = 'linear'
res = [griddata(gridpts, az_time, coord1, method=method),
griddata(gridpts, az_time, coord2, method=method)]
min_start = min([datetime.strptime(slc_dict[uid]['sid'].start, '%Y%m%dT%H%M%S') for uid in uids])
max_stop = max([datetime.strptime(slc_dict[uid]['sid'].stop, '%Y%m%dT%H%M%S') for uid in uids])
res_t = []
for i, r in enumerate(res):
if np.isnan(r):
if i == 0:
res_t.append(min_start)
else:
res_t.append(max_stop)
else:
res_t.append(datetime.fromtimestamp(float(r)))
start = datetime.strftime(res_t[0], '%Y%m%dT%H%M%S')
stop = datetime.strftime(res_t[1], '%Y%m%dT%H%M%S')
return start, stop
[docs]def create_data_mask(outname, snap_datamasks, snap_datasets, extent, epsg, driver,
creation_opt, overviews, overview_resampling, dst_nodata, wbm=None):
"""
Creation of the Data Mask image.
Parameters
----------
outname: str
Full path to the output data mask file.
snap_datamasks: list[str]
List of raster datamask files covering the footprint of each source SLC scene that overlaps with the current
MGRS tile.
snap_datasets: list[str]
List of output files processed with :func:`pyroSAR.snap.util.geocode` that match the source SLC scenes and
overlap with the current MGRS tile.
extent: dict
Spatial extent of the MGRS tile, derived from a :class:`~spatialist.vector.Vector` object.
epsg: int
The coordinate reference system as an EPSG code.
driver: str
GDAL driver to use for raster file creation.
creation_opt: list[str]
GDAL creation options to use for raster file creation. Should match specified GDAL driver.
overviews: list[int]
Internal overview levels to be created for each raster file.
overview_resampling: str
Resampling method for overview levels.
dst_nodata: int or str
Nodata value to write to the output raster.
wbm: str, optional
Path to a water body mask file with the dimensions of an MGRS tile.
"""
print(outname)
pols = [pol for pol in set([re.search('[VH]{2}', os.path.basename(x)).group() for x in snap_datasets if
re.search('[VH]{2}', os.path.basename(x)) is not None])]
pattern = pols[0] + '_gamma0-rtc'
snap_gamma0 = [x for x in snap_datasets if re.search(pattern, os.path.basename(x))]
snap_ls_mask = [x for x in snap_datasets if re.search('layoverShadowMask', os.path.basename(x))]
dm_bands = {1: {'arr_val': 0,
'name': 'not layover, nor shadow'},
2: {'arr_val': 1,
'name': 'layover'},
3: {'arr_val': 2,
'name': 'shadow'},
4: {'arr_val': 4,
'name': 'ocean water'}}
tile_bounds = [extent['xmin'], extent['ymin'], extent['xmax'], extent['ymax']]
vrt_snap_ls = '/vsimem/' + os.path.dirname(outname) + 'snap_ls.vrt'
vrt_snap_valid = '/vsimem/' + os.path.dirname(outname) + 'snap_valid.vrt'
vrt_snap_gamma0 = '/vsimem/' + os.path.dirname(outname) + 'snap_gamma0.vrt'
gdalbuildvrt(snap_ls_mask, vrt_snap_ls, options={'outputBounds': tile_bounds}, void=False)
gdalbuildvrt(snap_datamasks, vrt_snap_valid, options={'outputBounds': tile_bounds}, void=False)
gdalbuildvrt(snap_gamma0, vrt_snap_gamma0, options={'outputBounds': tile_bounds}, void=False)
with Raster(vrt_snap_ls) as ras_snap_ls:
with bbox(extent, crs=epsg) as tile_vec:
rows = ras_snap_ls.rows
cols = ras_snap_ls.cols
geotrans = ras_snap_ls.raster.GetGeoTransform()
proj = ras_snap_ls.raster.GetProjection()
arr_snap_dm = ras_snap_ls.array()
# Add Water Body Mask
if wbm is not None:
with Raster(wbm) as ras_wbm:
arr_wbm = ras_wbm.array()
out_arr = np.where((arr_wbm == 1), 4, arr_snap_dm)
del arr_wbm
else:
out_arr = arr_snap_dm
dm_bands.pop(4)
del arr_snap_dm
# Extend the shadow class of the data mask with nodata values from backscatter data and create final array
with Raster(vrt_snap_valid)[tile_vec] as ras_snap_valid:
with Raster(vrt_snap_gamma0)[tile_vec] as ras_snap_gamma0:
arr_snap_valid = ras_snap_valid.array()
arr_snap_gamma0 = ras_snap_gamma0.array()
out_arr = np.nan_to_num(out_arr)
out_arr = np.where(((arr_snap_valid == 1) & (np.isnan(arr_snap_gamma0)) & (out_arr != 4)), 2,
out_arr)
out_arr[np.isnan(arr_snap_valid)] = dst_nodata
del arr_snap_gamma0
del arr_snap_valid
outname_tmp = '/vsimem/' + os.path.basename(outname) + '.vrt'
gdriver = gdal.GetDriverByName('GTiff')
ds_tmp = gdriver.Create(outname_tmp, rows, cols, len(dm_bands.keys()), gdal.GDT_Byte,
options=['ALPHA=UNSPECIFIED', 'PHOTOMETRIC=MINISWHITE'])
gdriver = None
ds_tmp.SetGeoTransform(geotrans)
ds_tmp.SetProjection(proj)
for k, v in dm_bands.items():
band = ds_tmp.GetRasterBand(k)
arr_val = v['arr_val']
b_name = v['name']
arr = np.full((rows, cols), 0)
arr[out_arr == dst_nodata] = dst_nodata
if arr_val == 0:
arr[out_arr == 0] = 1
elif arr_val in [1, 2]:
arr[(out_arr == arr_val) | (out_arr == 3)] = 1
elif arr_val == 4:
arr[out_arr == 4] = 1
arr = arr.astype('uint8')
band.WriteArray(arr)
band.SetNoDataValue(dst_nodata)
band.SetDescription(b_name)
band.FlushCache()
band = None
del arr
ds_tmp.SetMetadataItem('TIFFTAG_DATETIME', strftime('%Y:%m:%d %H:%M:%S', gmtime()))
ds_tmp.BuildOverviews(overview_resampling, overviews)
outDataset_cog = gdal.GetDriverByName(driver).CreateCopy(outname, ds_tmp, strict=1, options=creation_opt)
outDataset_cog = None
ds_tmp = None
tile_vec = None
[docs]def create_acq_id_image(outname, ref_tif, snap_datamasks, src_ids, extent,
epsg, driver, creation_opt, overviews, dst_nodata):
"""
Creation of the Acquisition ID image.
Parameters
----------
outname: str
Full path to the output data mask file.
ref_tif: str
Full path to any GeoTIFF file of the NRB product.
snap_datamasks: list[str]
List of raster datamask files covering the footprint of each source SLC scene that overlaps with the current
MGRS tile.
src_ids: list[pyroSAR.drivers.ID]
List of :class:`~pyroSAR.drivers.ID` objects of all source SLC scenes that overlap with the current MGRS tile.
extent: dict
Spatial extent of the MGRS tile, derived from a :class:`~spatialist.vector.Vector` object.
epsg: int
The CRS used for the NRB product; provided as an EPSG code.
driver: str
GDAL driver to use for raster file creation.
creation_opt: list[str]
GDAL creation options to use for raster file creation. Should match specified GDAL driver.
overviews: list[int]
Internal overview levels to be created for each raster file.
dst_nodata: int or str
Nodata value to write to the output raster.
"""
print(outname)
src_scenes = [sid.scene for sid in src_ids]
# If there are two source scenes, make sure that the order of acquisitions in all lists is correct!
if len(src_scenes) > 1:
if not len(src_scenes) == 2 and len(snap_datamasks) == 2:
raise RuntimeError('expected lists `src_scenes` and `valid_mask_list` to be of length 2; length is '
'{} and {} respectively'.format(len(src_scenes), len(snap_datamasks)))
starts_src = [datetime.strptime(identify(f).start, '%Y%m%dT%H%M%S') for f in src_scenes]
start_valid = [datetime.strptime(re.search('[0-9]{8}T[0-9]{6}', os.path.basename(f)).group(), '%Y%m%dT%H%M%S')
for f in snap_datamasks]
if starts_src[0] > starts_src[1]:
src_scenes.reverse()
starts_src.reverse()
if start_valid[0] != starts_src[0]:
snap_datamasks.reverse()
if start_valid[0] != starts_src[0]:
raise RuntimeError('failed to match order of lists `src_scenes` and `valid_mask_list`')
tile_bounds = [extent['xmin'], extent['ymin'], extent['xmax'], extent['ymax']]
arr_list = []
for dm in snap_datamasks:
vrt_snap_valid = '/vsimem/' + os.path.dirname(outname) + 'mosaic.vrt'
gdalbuildvrt(dm, vrt_snap_valid, options={'outputBounds': tile_bounds}, void=False)
with bbox(extent, crs=epsg) as tile_vec:
with Raster(vrt_snap_valid)[tile_vec] as vrt_ras:
vrt_arr = vrt_ras.array()
arr_list.append(vrt_arr)
del vrt_arr
tile_vec = None
src_scenes_clean = [os.path.basename(src).replace('.zip', '').replace('.SAFE', '') for src in src_scenes]
tag = '{{"{src1}": 1}}'.format(src1=src_scenes_clean[0])
out_arr = np.full(arr_list[0].shape, dst_nodata)
out_arr[arr_list[0] == 1] = 1
if len(arr_list) == 2:
out_arr[arr_list[1] == 1] = 2
tag = '{{"{src1}": 1, "{src2}": 2}}'.format(src1=src_scenes_clean[0], src2=src_scenes_clean[1])
creation_opt.append('TIFFTAG_IMAGEDESCRIPTION={}'.format(tag))
with Raster(ref_tif) as ref_ras:
ref_ras.write(outname, format=driver, array=out_arr.astype('uint8'), nodata=dst_nodata, overwrite=True,
overviews=overviews, options=creation_opt)