Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poetry 2 + Logs #23

Merged
merged 17 commits into from
Mar 18, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updated all code, removed active storage dependencies, attempt poetry…
… 2 upgrade
dwest77a committed Mar 18, 2025
commit 4ea3997fddd21b87f5db6b9f50f57679f6464a5f
32 changes: 10 additions & 22 deletions cfapyx/backend.py
Original file line number Diff line number Diff line change
@@ -22,11 +22,13 @@ def open_cfa_dataset(
decode_coords=None,
use_cftime=None,
decode_timedelta=None,
cfa_options: dict=None,
cfa_options: dict = None,
group=None,
):
"""
Top-level function which opens a CFA dataset using Xarray. Creates a CFA Datastore
Top-level function which opens a CFA dataset using Xarray.

Creates a CFA Datastore
from the ``filename_or_obj`` provided, then passes this to a CFA StoreBackendEntrypoint
to create an Xarray Dataset. Most parameters are not handled by CFA, so only the
CFA-relevant ones are described here.
@@ -52,11 +54,7 @@ def open_cfa_dataset(
store = CFADataStore.open(filename_or_obj, group=group)

# Expands cfa_options into individual kwargs for the store.
store.cfa_options = cfa_options

use_active = False
if hasattr(store, 'use_active'):
use_active = store.use_active
store.cfa_options = cfa_options

# Xarray makes use of StoreBackendEntrypoints to provide the Dataset 'ds'
store_entrypoint = CFAStoreBackendEntrypoint()
@@ -68,13 +66,13 @@ def open_cfa_dataset(
decode_coords=decode_coords,
drop_variables=drop_variables,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
use_active=use_active
decode_timedelta=decode_timedelta
)

return ds

class CFANetCDFBackendEntrypoint(BackendEntrypoint):
"""Open CFA-netCDF files (.nca) using 'cfapyx' in Xarray"""

description = 'Open CFA-netCDF files (.nca) using "cfapyx" in Xarray'
url = "https://cedadev.github.io/CFAPyX/"
@@ -115,6 +113,8 @@ def open_dataset(
group=group)

class CFAStoreBackendEntrypoint(StoreBackendEntrypoint):
"""Open CFA-based Abstract Data Store"""

description = "Open CFA-based Abstract Data Store"
url = "https://cedadev.github.io/CFAPyX/"

@@ -128,7 +128,6 @@ def open_dataset(
drop_variables=None,
use_cftime=None,
decode_timedelta=None,
use_active=False,
) -> Dataset:
"""
Takes cfa_xarray_store of type AbstractDataStore and creates an xarray.Dataset object.
@@ -162,18 +161,7 @@ def open_dataset(
)

# Create the xarray.Dataset object here.
if use_active:
try:
from XarrayActive import ActiveDataset

ds = ActiveDataset(vars, attrs=attrs)
except ImportError:
raise ImportError(
'"ActiveDataset" from XarrayActive failed to import - please '
'ensure you have the XarrayActive package installed.'
)
else:
ds = Dataset(vars, attrs=attrs)
ds = Dataset(vars, attrs=attrs)

ds = ds.set_coords(coord_names.intersection(vars))
ds.set_close(cfa_xarray_store.close)
13 changes: 5 additions & 8 deletions cfapyx/datastore.py
Original file line number Diff line number Diff line change
@@ -34,6 +34,8 @@ class CFADataStore(NetCDF4DataStore):
that may be un-set at time of use.
"""

wrapper = FragmentArrayWrapper

@property
def chunks(self):
if hasattr(self,'_cfa_chunks'):
@@ -55,8 +57,7 @@ def cfa_options(self):
'substitutions': self._substitutions,
'decode_cfa': self._decode_cfa,
'chunks': self.chunks,
'chunk_limits': self._chunk_limits,
'use_active': self.use_active
'chunk_limits': self._chunk_limits
}

@cfa_options.setter
@@ -69,7 +70,6 @@ def _set_cfa_options(
decode_cfa=True,
chunks={},
chunk_limits=True,
use_active=False,
):
"""
Method to set cfa options.
@@ -80,8 +80,6 @@ def _set_cfa_options(
:param decode_cfa: (bool) Optional setting to disable CFA decoding
in some cases, default is True.

:param use_active: (bool) Enable for use with XarrayActive.

:param chunks: (dict) Not implemented in 2024.9.0

:param chunk_limits: (dict) Not implemented in 2024.9.0
@@ -91,7 +89,6 @@ def _set_cfa_options(
self._substitutions = substitutions
self._decode_cfa = decode_cfa
self._chunk_limits = chunk_limits
self.use_active = use_active

def _acquire(self, needs_lock=True):
"""
@@ -360,7 +357,7 @@ def open_variable(self, name: str, var):
:returns: The variable object opened as either a standard store variable
or CFA aggregated variable.
"""
if type(var) == tuple:
if isinstance(var, tuple):
if var[1] and self._decode_cfa:
variable = self.open_cfa_variable(name, var[0])
else:
@@ -412,7 +409,7 @@ def open_cfa_variable(self, name: str, var):

## Array-like object
data = indexing.LazilyIndexedArray(
FragmentArrayWrapper(
self.wrapper(
fragment_info,
fragment_space,
shape=array_shape,
338 changes: 156 additions & 182 deletions cfapyx/wrappers.py
Original file line number Diff line number Diff line change
@@ -2,12 +2,14 @@
__contact__ = "daniel.westwood@stfc.ac.uk"
__copyright__ = "Copyright 2024 United Kingdom Research and Innovation"

import logging
import math
from itertools import product

import dask.array as da
import numpy as np
from arraypartition import (ArrayLike, ArrayPartition, combine_slices,
from arraypartition import ArrayLike, ArrayPartition
from arraypartition.partition import (combine_slices,
get_chunk_extent, get_chunk_positions,
get_chunk_shape, get_chunk_space, get_dask_chunks,
normalize_partition_chunks)
@@ -16,65 +18,99 @@
from dask.base import tokenize
from dask.utils import SerializableLock, is_arraylike

try:
from XarrayActive import ActiveOptionsContainer
except:
class ActiveOptionsContainer:
pass

import logging

logger = logging.getLogger(__name__)

class CFAOptionsMixin:

class CFAPartition(ArrayPartition):
"""
Simple container for CFA options properties.
Wrapper object for a CFA Partition, extends the basic ArrayPartition with CFA-specific
methods.
"""

description = 'Wrapper object for a CFA Partition (Fragment or Chunk)'

__slots__ = (
'chunks',
'_chunk_limits',
'_substitutions',
'_decode_cfa'
)

@property
def cfa_options(self):
"""
Relates private option variables to the ``cfa_options`` parameter of the backend.
def __init__(self,
filename,
address,
aggregated_units=None,
aggregated_calendar=None,
global_extent=None,
**kwargs
):

"""
Wrapper object for the 'array' section of a fragment. Contains some metadata
to ensure the correct fragment is selected, but generally just serves the
fragment array to dask when required.
return {
'substitutions': self._substitutions,
'decode_cfa': self._decode_cfa,
'chunks': self.chunks,
'chunk_limits':self._chunk_limits
}
:param filename: (str) The path to a Fragment file from which this
partition object will access data from. The partition may represent
all or a subset of the data from the Fragment file.
@cfa_options.setter
def cfa_options(self, value):
self._set_cfa_options(**value)
:param address: (str) The address of the data variable within the
Fragment file, may include a group hierarchy structure.
def _set_cfa_options(
self,
substitutions=None,
decode_cfa=None,
chunks={},
chunk_limits=None,
use_active=False,
**kwargs):
:param aggregated_units: (obj) The expected units for the received data.
If the units of the received data are not equal to the ``aggregated_units``
then the data is 'post-processed' using the cfunits ``conform`` function.
:param aggregated_calendar: None
"""
Sets the private variables referred by the ``cfa_options`` parameter to the backend.
Ignores additional kwargs.

super().__init__(filename, address, units=aggregated_units, **kwargs)
self.aggregated_units = aggregated_units
self.aggregated_calendar = aggregated_calendar
self.global_extent = global_extent

def copy(self, extent=None):
"""
Create a new instance of this class from its own methods and attributes, and
apply a new extent to the copy if required.
"""

kwargs = self.get_kwargs()

self._substitutions = substitutions
self._decode_cfa = decode_cfa
self._chunk_limits = chunk_limits
self.chunks = chunks
self.use_active = use_active
if 'units' in kwargs:
if not kwargs['aggregated_units']:
kwargs['aggregated_units'] = kwargs['units']
kwargs.pop('units')

if extent:
kwargs['extent'] = combine_slices(self.shape, list(self.get_extent()), extent)
kwargs['global_extent'] = combine_slices(self.shape, list(self.global_extent), extent)

new = CFAPartition(
self.filename,
self.address,
**kwargs
)
return new

def _post_process_data(self, data):
"""Correct units/data conversions - if necessary at this stage"""

if self.units != self.aggregated_units:
try:
from cfunits import Units
except FileNotFoundError:
raise ValueError(
'Encountered issue when trying to import the "cfunits" library:'
"cfunits requires UNIDATA UDUNITS-2. Can't find the 'udunits2' library."
' - Consider setting up a conda environment, and installing '
'`conda install -c conda-forge udunits2`'
)

data = Units.conform(data, self.units, self.aggregated_units)
return data

class FragmentArrayWrapper(ArrayLike, CFAOptionsMixin, ActiveOptionsContainer):
def get_kwargs(self):
return {
'aggregated_units': self.aggregated_units,
'aggregated_calendar': self.aggregated_calendar
} | super().get_kwargs()

class FragmentArrayWrapper(ArrayLike):
"""
FragmentArrayWrapper behaves like an Array that can be indexed or referenced to
return a Dask-like array object. This class is essentially a constructor for the
@@ -83,6 +119,8 @@ class FragmentArrayWrapper(ArrayLike, CFAOptionsMixin, ActiveOptionsContainer):

description = 'Wrapper-class for the array of fragment objects'

partition = CFAPartition

def __init__(
self,
fragment_info,
@@ -153,17 +191,78 @@ def __array__(self):

array_name = (f"{self.__class__.__name__}-{tokenize(self)}",)

# Fragment info dict at this point
fragment_info = self.fragment_info

# dict of array-like objects to pass to the dask Array constructor.
fragments = self._get_fragments()

if not self.chunks:
dsk = self._assemble_dsk_dict(fragments, array_name)

global_extent = {
k: fragment_info[k]["global_extent"] for k in fragment_info.keys()
}

dask_chunks = get_dask_chunks(
self.shape,
self.fragment_space,
extent=global_extent,
dtype=self.dtype,
explicit_shapes=None
)

else:
dask_chunks, partitions = self._create_partitions(fragments)

dsk = self._assemble_dsk_dict(partitions, array_name)

darr = self._assemble_array(dsk, array_name[0], dask_chunks)
return darr

@property
def cfa_options(self):
"""
Relates private option variables to the ``cfa_options`` parameter of the backend.
"""

return {
'substitutions': self._substitutions,
'decode_cfa': self._decode_cfa,
'chunks': self.chunks,
'chunk_limits':self._chunk_limits
}

@cfa_options.setter
def cfa_options(self, value):
self._set_cfa_options(**value)

def _set_cfa_options(
self,
substitutions=None,
decode_cfa=None,
chunks={},
chunk_limits=None,
**kwargs):
"""
Sets the private variables referred by the ``cfa_options`` parameter to the backend.
Ignores additional kwargs.
"""

self._substitutions = substitutions
self._decode_cfa = decode_cfa
self._chunk_limits = chunk_limits
self.chunks = chunks

def _get_fragments(self) -> dict:
"""
Get the set of fragment objects to pass to dask."""

dtype = self.dtype
units = self.units

calendar = None # Fix later

# Fragment info dict at this point
fragment_info = self.fragment_info

# For now expect to deal only with NetCDF Files

# dict of array-like objects to pass to the dask Array constructor.
fragments = {}

for pos in self.fragment_info.keys():
@@ -186,7 +285,7 @@ def __array__(self):
# Wrong extent type for both scenarios but keep as a different label for
# dask chunking.

fragment = CFAPartition(
fragment = self.partition(
filename,
address,
dtype=dtype,
@@ -202,28 +301,7 @@ def __array__(self):

fragments[pos] = fragment

if not self.chunks:
dsk = self._assemble_dsk_dict(fragments, array_name)

global_extent = {
k: fragment_info[k]["global_extent"] for k in fragment_info.keys()
}

dask_chunks = get_dask_chunks(
self.shape,
self.fragment_space,
extent=global_extent,
dtype=self.dtype,
explicit_shapes=None
)

else:
dask_chunks, partitions = self._create_partitions(fragments)

dsk = self._assemble_dsk_dict(partitions, array_name)

darr = self._assemble_array(dsk, array_name[0], dask_chunks)
return darr
return fragments

def _optimise_chunks(self):
"""
@@ -380,7 +458,7 @@ def _apply_substitutions(self):
if not self._substitutions:
return

if type(self._substitutions) != list:
if not isinstance(self._substitutions, list):
self._substitutions = [self._substitutions]

for s in self._substitutions:
@@ -402,109 +480,5 @@ def _assemble_array(self, dsk, array_name, dask_chunks):
"""

meta = da.empty(self.shape, dtype=self.dtype)
if not hasattr(self, 'use_active'):
darr = da.Array(dsk, array_name, chunks=dask_chunks, dtype=self.dtype, meta=meta)
return darr

if not self.use_active:
darr = da.Array(dsk, array_name, chunks=dask_chunks, dtype=self.dtype, meta=meta)
return darr
try:
from XarrayActive import DaskActiveArray

darr = DaskActiveArray(dsk, array_name, chunks=dask_chunks, dtype=self.dtype, meta=meta)
except ImportError:
raise ImportError(
'"DaskActiveArray" from XarrayActive failed to import - please ensure '
'you have the XarrayActive package installed.'
)
return darr

class CFAPartition(ArrayPartition):
"""
Wrapper object for a CFA Partition, extends the basic ArrayPartition with CFA-specific
methods.
"""

description = 'Wrapper object for a CFA Partition (Fragment or Chunk)'


def __init__(self,
filename,
address,
aggregated_units=None,
aggregated_calendar=None,
global_extent=None,
**kwargs
):

"""
Wrapper object for the 'array' section of a fragment. Contains some metadata
to ensure the correct fragment is selected, but generally just serves the
fragment array to dask when required.
:param filename: (str) The path to a Fragment file from which this
partition object will access data from. The partition may represent
all or a subset of the data from the Fragment file.
:param address: (str) The address of the data variable within the
Fragment file, may include a group hierarchy structure.
:param aggregated_units: (obj) The expected units for the received data.
If the units of the received data are not equal to the ``aggregated_units``
then the data is 'post-processed' using the cfunits ``conform`` function.
:param aggregated_calendar: None
"""

super().__init__(filename, address, units=aggregated_units, **kwargs)
self.aggregated_units = aggregated_units
self.aggregated_calendar = aggregated_calendar
self.global_extent = global_extent

def copy(self, extent=None):
"""
Create a new instance of this class from its own methods and attributes, and
apply a new extent to the copy if required.
"""

kwargs = self.get_kwargs()

if 'units' in kwargs:
if not kwargs['aggregated_units']:
kwargs['aggregated_units'] = kwargs['units']
kwargs.pop('units')

if extent:
kwargs['extent'] = combine_slices(self.shape, list(self.get_extent()), extent)
kwargs['global_extent'] = combine_slices(self.shape, list(self.global_extent), extent)

new = CFAPartition(
self.filename,
self.address,
**kwargs
)
return new

def _post_process_data(self, data):
"""Correct units/data conversions - if necessary at this stage"""

if self.units != self.aggregated_units:
try:
from cfunits import Units
except FileNotFoundError:
raise ValueError(
'Encountered issue when trying to import the "cfunits" library:'
"cfunits requires UNIDATA UDUNITS-2. Can't find the 'udunits2' library."
' - Consider setting up a conda environment, and installing '
'`conda install -c conda-forge udunits2`'
)

data = Units.conform(data, self.units, self.aggregated_units)
return data

def get_kwargs(self):
return {
'aggregated_units': self.aggregated_units,
'aggregated_calendar': self.aggregated_calendar
} | super().get_kwargs()
darr = da.Array(dsk, array_name, chunks=dask_chunks, dtype=self.dtype, meta=meta)
return darr
1,184 changes: 585 additions & 599 deletions poetry.lock

Large diffs are not rendered by default.

46 changes: 22 additions & 24 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
[tool.poetry]
[project]
name = "cfapyx"
version = "2025.2.27"
version = "2025.3.18"
description = "The pure-Python implementation of the CF Aggregation conventions, including the Xarray engine to enable reading CFA-netCDF files."
authors = ["dwest77 <daniel.westwood@stfc.ac.uk>"]
license = "BSD 3"
authors = [
{name = "Daniel Westwood",email = "daniel.westwood@stfc.ac.uk"}
]
license = {text = "BSD 3"}
readme = "README.md"
keywords = ["cf", "cf-conventions","xarray","cloud","ArrayPartition"]
requires-python = ">=3.9,<4.0"
dependencies = [
"xarray (>=2024,<2025)",
"netcdf4 (>=1.7.2,<2.0.0)",
"numpy (<=2.0.2)",
"dask (>=2024,<2025)",
"arraypartition (>=1.1,<2.0)",
]

[tool.poetry.dependencies]
python = ">=3.9,<4.0"
poetry = "1.8.5"
scipy = "^1.12"
xarray = "^2024"
cfdm = "^1.11.1.0"
netCDF4 = "^1.6.5"
h5py = "^3.11.0"
dask = "^2024.7.0"
cftime = "^1.6.4"
cfunits = "^3.3.7"
pytest = "^8"
ArrayPartition = "^1.0.1"
sphinx = "^7.1.2"
sphinx-rtd-theme = "^2.0.0"
isort = "^6.0.0"
[tool.poetry]

[build-system]
requires = ["poetry-core", "wheel"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.group.dev.dependencies]
pytest = "^7"
poetry = "^2"

[tool.poetry.plugins."xarray.backends"]
CFA = "cfapyx.backend:CFANetCDFBackendEntrypoint"

[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"