import os
import re
import time
from lxml import etree
from pathlib import Path
from datetime import datetime
from pystac_client import Client
import pystac_client.exceptions
from spatialist import Vector
import asf_search as asf
[docs]class STACArchive(object):
"""
Search for scenes in a SpatioTemporal Asset Catalog.
Scenes are expected to be unpacked with a folder suffix .SAFE.
The interface is kept consistent with :class:`pyroSAR.drivers.Archive`.
Parameters
----------
url: str
the catalog URL
collections: str or list[str]
the catalog collection(s) to be searched
"""
def __init__(self, url, collections):
self.url = url
self.max_tries = 300
self._open_catalog()
if isinstance(collections, str):
self.collections = [collections]
elif isinstance(collections, list):
self.collections = collections
else:
raise TypeError("'collections' must be of type str or list")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@staticmethod
def _get_proc_time(scene):
with open(os.path.join(scene, 'manifest.safe'), 'rb') as f:
tree = etree.fromstring(f.read())
proc = tree.find(path='.//xmlData/safe:processing',
namespaces=tree.nsmap)
start = proc.attrib['start']
del tree, proc
return datetime.strptime(start, '%Y-%m-%dT%H:%M:%S.%f')
def _filter_duplicates(self, scenes):
tmp = sorted(scenes)
pattern = '([0-9A-Z_]{16})_([0-9T]{15})_([0-9T]{15})'
keep = []
i = 0
while i < len(tmp):
group = [tmp[i]]
match1 = re.search(pattern, os.path.basename(tmp[i])).groups()
j = i + 1
while j < len(tmp):
match2 = re.search(pattern, os.path.basename(tmp[j])).groups()
if match1 == match2:
group.append(tmp[j])
j += 1
else:
break
if len(group) > 1:
tproc = [self._get_proc_time(x) for x in group]
keep.append(group[tproc.index(max(tproc))])
else:
keep.append(group[0])
i = j
return keep
def _open_catalog(self):
i = 1
while True:
try:
self.catalog = Client.open(self.url)
# print('catalog opened successfully')
break
except pystac_client.exceptions.APIError:
# print(f'failed opening the catalog at try {i:03d}/{self.max_tries}')
if i < self.max_tries:
i += 1
time.sleep(1)
else:
raise
[docs] def close(self):
del self.catalog
[docs] def select(self, sensor=None, product=None, acquisition_mode=None,
mindate=None, maxdate=None, frameNumber=None,
vectorobject=None, date_strict=True, check_exist=True):
"""
Select scenes from the catalog. Used STAC keys:
- platform
- start_datetime
- end_datetime
- sar:instrument_mode
- sar:product_type
- s1:datatake (custom)
Parameters
----------
sensor: str or list[str] or None
S1A or S1B
product: str or list[str] or None
GRD or SLC
acquisition_mode: str or list[str] or None
IW, EW or SM
mindate: str or datetime.datetime or None
the minimum acquisition date
maxdate: str or datetime.datetime or None
the maximum acquisition date
frameNumber: int or list[int] or None
the data take ID in decimal representation.
Requires custom STAC key `s1:datatake`.
vectorobject: spatialist.vector.Vector or None
a geometry with which the scenes need to overlap
date_strict: bool
treat dates as strict limits or also allow flexible limits to incorporate scenes
whose acquisition period overlaps with the defined limit?
- strict: start >= mindate & stop <= maxdate
- not strict: stop >= mindate & start <= maxdate
check_exist: bool
check whether found files exist locally?
Returns
-------
list[str]
the locations of the scene directories with suffix .SAFE
See Also
--------
pystac_client.Client.search
"""
pars = locals()
del pars['date_strict']
del pars['check_exist']
del pars['self']
lookup = {'product': 'sar:product_type',
'acquisition_mode': 'sar:instrument_mode',
'mindate': 'start_datetime',
'maxdate': 'end_datetime',
'sensor': 'platform',
'frameNumber': 's1:datatake'}
lookup_platform = {'S1A': 'sentinel-1a',
'S1B': 'sentinel-1b'}
flt = {'op': 'and', 'args': []}
for key in pars.keys():
val = pars[key]
if val is None:
continue
if key == 'mindate':
if isinstance(val, datetime):
val = datetime.strftime(val, '%Y%m%dT%H%M%S')
if date_strict:
arg = {'op': '>=', 'args': [{'property': 'start_datetime'}, val]}
else:
arg = {'op': '>=', 'args': [{'property': 'end_datetime'}, val]}
elif key == 'maxdate':
if isinstance(val, datetime):
val = datetime.strftime(val, '%Y%m%dT%H%M%S')
if date_strict:
arg = {'op': '<=', 'args': [{'property': 'end_datetime'}, val]}
else:
arg = {'op': '<=', 'args': [{'property': 'start_datetime'}, val]}
elif key == 'vectorobject':
if isinstance(val, Vector):
with val.clone() as vec:
vec.reproject(4326)
ext = vec.extent
arg = {'op': 's_intersects',
'args': [{'property': 'geometry'},
{'type': 'Polygon',
'coordinates': [[[ext['xmin'], ext['ymin']],
[ext['xmin'], ext['ymax']],
[ext['xmax'], ext['ymax']],
[ext['xmax'], ext['ymin']],
[ext['xmin'], ext['ymin']]]]}],
}
else:
raise TypeError('argument vectorobject must be of type spatialist.vector.Vector')
else:
args = []
if isinstance(val, (str, int)):
val = [val]
for v in val:
if key == 'sensor':
value = lookup_platform[v]
elif key == 'frameNumber':
value = '{:06X}'.format(v) # convert to hexadecimal
else:
value = v
a = {'op': '=', 'args': [{'property': lookup[key]}, value]}
args.append(a)
if len(args) == 1:
arg = args[0]
else:
arg = {'op': 'or', 'args': args}
flt['args'].append(arg)
t = 1
while True:
try:
result = self.catalog.search(collections=self.collections,
filter=flt, max_items=None)
result = list(result.items())
# print('catalog search successful')
break
except pystac_client.exceptions.APIError:
# print(f'failed searching the catalog at try {t:03d}/{self.max_tries}')
if t < self.max_tries:
t += 1
time.sleep(1)
else:
raise
out = []
for item in result:
assets = item.assets
ref = assets[list(assets.keys())[0]]
href = ref.href
path = href[:re.search(r'\.SAFE', href).end()]
path = re.sub('^file://', '', path)
if Path(path).exists():
path = os.path.realpath(path)
else:
if check_exist:
raise RuntimeError('scene does not exist locally:', path)
out.append(path)
out = self._filter_duplicates(out)
return out
[docs]def asf_select(sensor, product, acquisition_mode, mindate, maxdate):
"""
Search scenes in the Alaska Satellite Facility (ASF) data catalog using the
`asf_search <https://github.com/asfadmin/Discovery-asf_search>`_ package.
This simplified function is solely intended for cross-checking an online catalog in
:func:`S1_NRB.ancillary.check_acquisition_completeness`.
Parameters
----------
sensor: str
S1A or S1B
product: str
GRD or SLC
acquisition_mode: str
IW, EW or SM
mindate: str
the minimum acquisition date
maxdate: str
the maximum acquisition date
Returns
-------
list[str]
the IDs of the found scenes
"""
if product == 'GRD':
processing_level = ['GRD_HD', 'GRD_MD', 'GRD_MS', 'GRD_HS', 'GRD_FD']
else:
processing_level = product
if acquisition_mode == 'SM':
beam_mode = ['S1', 'S2', 'S3', 'S4', 'S5', 'S6']
else:
beam_mode = acquisition_mode
start = datetime.strptime(mindate, '%Y%m%dT%H%M%S').strftime('%Y-%m-%dT%H:%M:%SZ')
end = datetime.strptime(maxdate, '%Y%m%dT%H%M%S').strftime('%Y-%m-%dT%H:%M:%SZ')
result = asf.search(platform=sensor.replace('S1', 'Sentinel-1'),
processingLevel=processing_level,
beamMode=beam_mode,
start=start,
end=end).geojson()
scenes = sorted([x['properties']['sceneName'] for x in result['features']])
return scenes