Skip to content

Commit

Permalink
Merge branch 'b-1.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelrpl committed Dec 16, 2024
2 parents c9783eb + 5228dc3 commit 4d95984
Show file tree
Hide file tree
Showing 20 changed files with 220 additions and 65 deletions.
31 changes: 31 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,37 @@
Changes
=======

Version 1.0.1 (2024-12-16)
--------------------------

- Fix depended bot related Flask dependency <2.3 due Werkzeug vulnerability on version less than 3x


Version 1.0.0 (2024-08-19)
--------------------------

- Add driver for Copernicus DataSpace EcoSystem.
- Review module dependencies
- Improve stability of driver Copernicus DataSpace EcoSystem
- Improve driver to search in provider NASA Modis
- Improve docs for command line and downloading.
- Fix MODIS api search using geometry
- Fix search in Landsat API using day/night indicator


Version 1.0.0.dev3 (2024-03-04)
-------------------------------

- Fix MODIS api search using geometry


Version 1.0.0.dev2 (2024-01-04)
-------------------------------

- Fix search in Landsat API using day/night indicator
- Improve stability of driver Copernicus DataSpace EcoSystem
- Improve driver to search in provider NASA Modis


Version 1.0.0.dev1 (2023-10-03)
-------------------------------
Expand Down
8 changes: 4 additions & 4 deletions bdc_collectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, scene_id: str, collection=None):
self.parser = self.parser_class(scene_id)
self.collection = collection

def get_files(self, collection, path=None, prefix=None) -> Dict[str, Path]:
def get_files(self, collection, path=None, prefix=None, **kwargs) -> Dict[str, Path]:
"""List all files in the collection.
Returns:
Expand All @@ -92,7 +92,7 @@ def get_files(self, collection, path=None, prefix=None) -> Dict[str, Path]:

return {i: entry for i, entry in enumerate(entries)}

def get_assets(self, collection, path=None, prefix=None) -> Dict[str, str]:
def get_assets(self, collection, path=None, prefix=None, **kwargs) -> Dict[str, str]:
"""Get a list of extra assets contained in collection path.
Args:
Expand All @@ -106,7 +106,7 @@ def get_assets(self, collection, path=None, prefix=None) -> Dict[str, str]:
"""
return dict()

def path(self, collection, prefix=None, path_include_month=False) -> Path:
def path(self, collection, prefix=None, path_include_month=False, **kwargs) -> Path:
"""Retrieve the relative path to the Collection on Brazil Data Cube cluster.
Note:
Expand All @@ -129,7 +129,7 @@ def path(self, collection, prefix=None, path_include_month=False) -> Path:

return scene_path

def compressed_file(self, collection, prefix=None, path_include_month=False) -> Path:
def compressed_file(self, collection, prefix=None, path_include_month=False, **kwargs) -> Path:
"""Retrieve the path to the compressed file L1.
.. deprecated:: 0.6.2
Expand Down
1 change: 1 addition & 0 deletions bdc_collectors/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def get_version(ctx, param, value):
return

import platform

from . import __version__

click.echo(
Expand Down
62 changes: 51 additions & 11 deletions bdc_collectors/dataspace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from ..base import BaseProvider, BulkDownloadResult, SceneResult, SceneResults
from ..exceptions import DataOfflineError, DownloadError
from ..scihub.sentinel2 import Sentinel1, Sentinel2, Sentinel3
from ..utils import download_stream, import_entry
from ..utils import download_stream, import_entry, to_bool
from ._token import TokenManager
from .odata import ODATAStrategy

Expand All @@ -52,7 +52,7 @@ def init_provider():

class DataspaceProvider(BaseProvider):
"""Represent the Driver for Copernicus Dataspace program.
This module supports the following API provider using strategies:
- ODATA
- STAC
Expand All @@ -61,7 +61,7 @@ class DataspaceProvider(BaseProvider):
For Authorization and Token Authentication, as defined in
`Access Token <https://documentation.dataspace.copernicus.eu/APIs/Token.html>`_,
an ``access_token`` is required to download data. By default, this module stores these tokens in
:class:`bdc_collectors.dataspace._token.TokenManager`. Whenever a download is initiated by
:class:`bdc_collectors.dataspace._token.TokenManager`. Whenever a download is initiated by
:meth:`bdc_collectors.dataspace.DataspaceProvider.download`, the bdc-collectors creates two (2) access tokens
in memory and then use it to download as many scenes as can. When the token expires, it automatically refresh
a new token.
Expand Down Expand Up @@ -112,7 +112,23 @@ def __init__(self, username: str, password: str, strategy: t.Optional[BaseProvid

def search(self, query, *args, **kwargs) -> SceneResults:
"""Search for data products in Copernicus Dataspace program."""
entries = self.strategy.search(query, *args, **kwargs)
options = kwargs.copy()

# Compatibility with others BDC-Providers
scenes = []
if options.get("scene"):
scenes.append(self._item_id(options["scene"]))
if options.get("scenes"):
scenes.extend([self._item_id(scene) for scene in options["scenes"]])

if options.get("filename"):
scenes.append(self._item_id(options["filename"].replace("*", "")))

if scenes:
options.setdefault("ids", [])
options["ids"].extend(scenes)

entries = self.strategy.search(query, *args, **options)
return entries

def download(self, query: t.Union[SceneResult, str], output: str, *args, **kwargs) -> str:
Expand All @@ -125,9 +141,7 @@ def download(self, query: t.Union[SceneResult, str], output: str, *args, **kwarg
if kwargs.get("sceneid") or kwargs.get("scene_id"):
scene: str = kwargs.get("sceneid", kwargs.get("scene_id"))

# Helper to set up SAFE files for Sentinel-1 and Sentinel-2
if not scene.endswith(".SAFE") and scene[:2] in ("S1", "S2"):
scene = f"{scene}.SAFE"
scene = self._item_id(scene)

item_ids.append(scene)

Expand Down Expand Up @@ -200,7 +214,6 @@ def download_all(self, scenes: t.List[SceneResult], output: str, **kwargs) -> Bu

return (success, [], failed,)


def _download(self, entry: SceneResult, output: str, **kwargs):
try:
downloaded_file = self.download(entry, output=output, **kwargs)
Expand All @@ -214,14 +227,21 @@ def _download(self, entry: SceneResult, output: str, **kwargs):

def _check_integrity(self, scene: SceneResult, filepath: str):
"""Check for scene file integrity if exists.
Note:
Ensure that the file is writable.
It removes the file when its invalid.
"""
if not os.path.exists(filepath):
return False

skip_checksum = to_bool(os.getenv("SKIP_CHECKSUM", "0"))
if skip_checksum:
res = is_valid_zip(filepath)

logging.info(f"Testing zip (unzip -t) {filepath} {res}")
return res

if scene.get("Checksum"):
checksums = scene["Checksum"]
if not _is_valid_checksum(filepath, checksums):
Expand All @@ -232,9 +252,16 @@ def _check_integrity(self, scene: SceneResult, filepath: str):
# TODO: Consider scene.get("ContentLength")??
return True

def _item_id(self, scene: str) -> str:
if not scene.endswith(".SAFE") and scene[:2] in ("S1", "S2"):
return f"{scene}.SAFE"
elif not scene.endswith(".SEN3") and scene[:2] in ("S3",):
return f"{scene}.SEN3"
return scene


def _is_valid_checksum(filepath: str, checksums: t.List[t.Dict[str, t.Any]]) -> bool:
"""Assert checksum validity of data"""
"""Assert checksum validity of data."""
for context in checksums:
algorithm_name = context["Algorithm"]
algorithm_cls = getattr(hashlib, algorithm_name.lower(), None)
Expand All @@ -247,6 +274,8 @@ def _is_valid_checksum(filepath: str, checksums: t.List[t.Dict[str, t.Any]]) ->
if checksum == context["Value"]:
return True

logging.warning(f"Checksum error {context['Value']}, got {checksum}")

return False


Expand All @@ -264,6 +293,7 @@ def _check_sum(file_path: t.Union[str, t.Any], algorithm: t.Any, chunk_size=1638
Returns:
The hex digest.
"""

def _read(stream):
for chunk in iter(lambda: stream.read(chunk_size), b""):
algorithm.update(chunk)
Expand All @@ -274,4 +304,14 @@ def _read(stream):
else:
_read(file_path)

return algorithm.hexdigest()
return algorithm.hexdigest()


def is_valid_zip(filepath: str) -> bool:
"""Check the consistency of Zip file."""
import subprocess

proc = subprocess.Popen(["unzip", "-t", filepath], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc.wait()

return proc.returncode == 0
2 changes: 1 addition & 1 deletion bdc_collectors/dataspace/odata.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _serialize_product(self, product: t.Dict[str, t.Any]) -> SceneResult:
attribute["Name"]: attribute["Value"] for attribute in product["Attributes"]
}
product.pop("Attributes")
return SceneResult(product["Name"].replace(".SAFE", ""),
return SceneResult(product["Name"].replace(".SAFE", "").replace(".SEN3", ""),
attribute_dict.get("cloudCover"),
link=f"{PRODUCTS_URL.format(url=self.api_url)}({product['Id']})/$value",
**product,
Expand Down
8 changes: 4 additions & 4 deletions bdc_collectors/dgi/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DGICollection(BaseCollection):

parser_class = DGICommonScene

def get_files(self, collection, path=None, prefix=None) -> Dict[str, Path]:
def get_files(self, collection, path=None, prefix=None, **kwargs) -> Dict[str, Path]:
"""List all files in the collection."""
if path is None:
path = self.path(collection, prefix=prefix)
Expand Down Expand Up @@ -63,11 +63,11 @@ def get_files(self, collection, path=None, prefix=None) -> Dict[str, Path]:

return output

def get_assets(self, collection, path=None, prefix=None) -> Dict[str, str]:
def get_assets(self, collection, path=None, prefix=None, **kwargs) -> Dict[str, str]:
"""Get a list of extra assets contained in collection path."""
return dict()

def path(self, collection, prefix=None) -> Path:
def path(self, collection, prefix=None, **kwargs) -> Path:
"""Retrieve the relative path to the Collection on Brazil Data Cube cluster."""
if prefix is None:
prefix = current_app.config.get('DATA_DIR')
Expand All @@ -82,6 +82,6 @@ def path(self, collection, prefix=None) -> Path:

return scene_path

def compressed_file(self, collection, prefix=None) -> Path:
def compressed_file(self, collection, prefix=None, **kwargs) -> Path:
"""Retrieve the path to the compressed file L1."""
return None
4 changes: 2 additions & 2 deletions bdc_collectors/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from threading import Lock
from typing import Dict, List, Type

import importlib.metadata
import pkg_resources
from flask import Flask

from .base import BaseProvider
Expand Down Expand Up @@ -119,7 +119,7 @@ def init_app(self, app: Flask, **kwargs):
def init_providers(self, entry_point: str = 'bdc_collectors.providers', **kwargs):
"""Load the supported providers from setup.py entry_point."""
if entry_point:
for base_entry in importlib.metadata.entry_points(group=entry_point):
for base_entry in pkg_resources.iter_entry_points(entry_point):
provider = base_entry.load()

if hasattr(provider, 'init_provider') and callable(provider.init_provider):
Expand Down
20 changes: 18 additions & 2 deletions bdc_collectors/modis/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from ..base import BaseCollection, BaseProvider, SceneResult, SceneResults
from ..exceptions import DataOfflineError
from ..utils import to_geom
from .collection import ModisCollection
from .parser import ModisScene

Expand Down Expand Up @@ -68,6 +69,9 @@ def search(self, query, *args, **kwargs) -> SceneResults:
options = dict(
product=query
)
client_opts = self._kwargs.get("client_options", {})
options.update(client_opts)

path = kwargs.get('path')
if path is None:
path = self._guess_path(query)
Expand All @@ -93,9 +97,14 @@ def search(self, query, *args, **kwargs) -> SceneResults:
# The end date should be the same as today.
options['enddate'] = options['today']

# TODO: Implement way to deal with minimum bounding region
api = self._get_client(**options)

if kwargs.get("bbox"):
options["geom"] = shapely.geometry.box(*kwargs["bbox"])

if kwargs.get("geom"):
options["geom"] = to_geom(kwargs["geom"])

dates = api.getListDays()

scenes = []
Expand Down Expand Up @@ -208,6 +217,7 @@ def _search(self, date_reference, api, **kwargs):
files = api.getFilesList(date_reference)

scenes = []
geom = kwargs.get("geom")

for file in files:
if file.endswith('.hdf'):
Expand All @@ -218,10 +228,16 @@ def _search(self, date_reference, api, **kwargs):

downloaded_meta_file = f'{api.writeFilePath}/{file_xml}'
meta = self._read_meta(downloaded_meta_file)

if geom is not None and meta.get("geometry"):
g = shapely.geometry.shape(meta["geometry"])
if not g.intersects(geom):
continue

link = f'{api.url}/{api.path}/{date_reference}/{file}'

scenes.append(
SceneResult(scene, cloud_cover=float(meta['QAPercentCloudCover']), link=link, **meta)
SceneResult(scene, cloud_cover=float(meta['QAPercentCloudCover']) if meta['QAPercentCloudCover'] else None, link=link, **meta)
)
return scenes

Expand Down
11 changes: 5 additions & 6 deletions bdc_collectors/modis/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from flask import current_app

from ..base import BaseCollection
from ..utils import entry_version
from .parser import ModisScene


Expand All @@ -33,7 +32,7 @@ class ModisCollection(BaseCollection):

parser_class = ModisScene

def get_assets(self, collection, path=None, prefix=None) -> Dict[str, str]:
def get_assets(self, collection, path=None, prefix=None, **kwargs) -> Dict[str, str]:
"""Get a list of extra assets contained in collection path.
Args:
Expand All @@ -47,14 +46,14 @@ def get_assets(self, collection, path=None, prefix=None) -> Dict[str, str]:
"""
return dict()

def path(self, collection, prefix=None, cube_prefix=None) -> Path:
def path(self, collection, prefix=None, cube_prefix=None, **kwargs) -> Path:
"""Retrieve the relative path to the Collection on Brazil Data Cube cluster."""
if prefix is None:
prefix = current_app.config.get('DATA_DIR')

year = str(self.parser.sensing_date().year)
tile = self.parser.tile_id()
version = entry_version(collection.version)
version = f"v{collection.version}"
scene_id = self.parser.scene_id

relative = Path(collection.name) / version / tile[:3] / tile[3:] / year / scene_id
Expand All @@ -63,8 +62,8 @@ def path(self, collection, prefix=None, cube_prefix=None) -> Path:

return scene_path

def compressed_file(self, collection, prefix=None) -> Path:
def compressed_file(self, collection, prefix=None, **kwargs) -> Path:
"""Show the path to the MODIS HDF file."""
path = self.path(collection=collection, prefix=prefix, cube_prefix='Archive')

return path / f'{self.parser.scene_id}.hdf'
return path.parent / f'{self.parser.scene_id}.hdf'
2 changes: 1 addition & 1 deletion bdc_collectors/modis/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@ def level(self) -> str:
"""Retrieve the collection level."""
return ''

def version(self) -> '':
def version(self):
"""Retrieve the Collection Version."""
return self.fragments[3]
Loading

0 comments on commit 4d95984

Please sign in to comment.