Skip to content

Commit

Permalink
Simplify policy options and tidy api.
Browse files Browse the repository at this point in the history
  • Loading branch information
pp-mo committed Oct 15, 2024
1 parent 5fa2b46 commit f7ddb65
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 89 deletions.
281 changes: 202 additions & 79 deletions lib/iris/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def callback(cube, field, filename):
import itertools
import os.path
import threading
from typing import Callable, Literal
from typing import Callable, Literal, Mapping

import iris._constraints
import iris.config
Expand Down Expand Up @@ -313,116 +313,239 @@ def _load_collection(uris, constraints=None, callback=None):


class LoadPolicy(threading.local):
"""Object defining a general loading strategy."""
"""A container for loading strategy options.
Controls merge/concatenate usage, and the handling of cases where multiple reference
fields merge to define an additional dimension (e.g. a time-varying orography).
Options can be set directly, or via :meth:`~iris.LoadPolicy.set`, or changed for
the scope of a code block with :meth:`~iris.LoadPolicy.context`.
Example uses :
>>> LOAD_POLICY.set("legacy")
>>> print(LOAD_POLICY)
LoadPolicy(support_multiple_references=False, merge_concat_sequence='m', repeat_until_unchanged=False)
>>> LOAD_POLICY.support_multiple_references = True
>>> print(LOAD_POLICY)
LoadPolicy(support_multiple_references=True, merge_concat_sequence='m', repeat_until_unchanged=False)
>>> LOAD_POLICY.set(merge_concat_sequence="cm")
>>> print(LOAD_POLICY)
LoadPolicy(support_multiple_references=True, merge_concat_sequence='cm', repeat_until_unchanged=False)
>>> with LOAD_POLICY.context("comprehensive"):
... print(LOAD_POLICY)
... print(LOAD_POLICY)
LoadPolicy(support_multiple_references=True, merge_concat_sequence='mc', repeat_until_unchanged=True)
LoadPolicy(support_multiple_references=True, merge_concat_sequence='cm', repeat_until_unchanged=False)
_allkeys = (
"""

# Useful constants
OPTION_KEYS = (
"support_multiple_references",
"multiref_triggers_concatenate",
"use_concatenate",
"use_merge",
"cat_before_merge",
"repeat_until_done",
"merge_concat_sequence",
"repeat_until_unchanged",
)
_OPTIONS_ALLOWED_VALUES = {
"support_multiple_references": (False, True),
"merge_concat_sequence": ("", "m", "c", "mc", "cm"),
"repeat_until_unchanged": (False, True),
}
SETTINGS = {
"legacy": dict(
support_multiple_references=False,
merge_concat_sequence="m",
repeat_until_unchanged=False,
),
"default": dict(
support_multiple_references=True,
merge_concat_sequence="m",
repeat_until_unchanged=False,
),
"recommended": dict(
support_multiple_references=True,
merge_concat_sequence="mc",
repeat_until_unchanged=False,
),
"comprehensive": dict(
support_multiple_references=True,
merge_concat_sequence="mc",
repeat_until_unchanged=True,
),
}

def __init__(self, options: str | dict | None = None, **kwargs):
"""Create loading strategy control object."""
self.set("default")
self.set(options, **kwargs)

def __setattr__(self, key, value):
if key not in self.OPTION_KEYS:
raise KeyError(f"LoadPolicy object has no property '{key}'.")

allowed_values = self._OPTIONS_ALLOWED_VALUES[key]
if value not in allowed_values:
msg = (
f"{value!r} is not a valid setting for LoadPolicy.{key} : "
f"must be one of '{allowed_values}'."
)
raise ValueError(msg)

self.__dict__[key] = value

def set(self, options: str | dict | None = None, **kwargs):
"""Set new options.
def __init__(
self,
support_multiple_references: bool = False,
multiref_triggers_concatenate: bool = False,
use_concatenate: bool = False,
use_merge: bool = True,
cat_before_merge: bool = False,
repeat_until_done: bool = False,
):
"""Container for loading controls."""
self.support_multiple_references = support_multiple_references
self.multiref_triggers_concatenate = multiref_triggers_concatenate
self.use_concatenate = use_concatenate
self.use_merge = use_merge
self.cat_before_merge = cat_before_merge
self.repeat_until_done = repeat_until_done
Parameters
----------
* options : str or dict, optional
A dictionary of options values, or the name of one of the
:data:`~iris.LoadPolicy.SETTINGS` standard option sets,
e.g. "legacy" or "comprehensive".
* kwargs : dict
Individual options options, from :data:`~iris.LoadPolicy.OPTION_KEYS`.
Note
----
Keyword arguments are applied after the 'options' arg, and
so will take precedence.
"""
if options is None:
options = {}
elif isinstance(options, str) and options in self.SETTINGS:
options = self.SETTINGS[options]
elif not isinstance(options, Mapping):
msg = (
f"Invalid arg options='{options!r}' : "
f"must be a dict, or one of {self.SETTINGS.keys()}"
)
raise ValueError(msg)

# Override any options with keywords
options.update(**kwargs)
bad_keys = [key for key in options if key not in self.OPTION_KEYS]
if bad_keys:
msg = f"Unknown options {bad_keys} : valid options are {self.OPTION_KEYS}."
raise ValueError(msg)

# Implement all options by changing own content.
for key, value in options.items():
setattr(self, key, value)

def settings(self):
"""Return a options dict containing the current state."""
return {key: getattr(self, key) for key in self.OPTION_KEYS}

def __repr__(self):
msg = (
"LoadPolicy("
f"support_multiple_references={self.support_multiple_references}, "
f"multiref_triggers_concatenate={self.multiref_triggers_concatenate}, "
f"use_concatenate={self.use_concatenate}, "
f"use_merge={self.use_merge}, "
f"cat_before_merge={self.cat_before_merge}, "
f"repeat_until_done={self.repeat_until_done}"
")"
)
msg = f"{self.__class__.__name__}("
msg += ", ".join(f"{key}={getattr(self, key)!r}" for key in self.OPTION_KEYS)
msg += ")"
return msg

def copy(self):
return LoadPolicy(**{key: getattr(self, key) for key in self._allkeys})

@contextlib.contextmanager
def context(self, policy=None, **kwargs):
"""Return context manager for temporary options.
def context(self, settings=None, **kwargs):
"""Return a context manager applying given options.
Modifies the given parameters within a context, for the active thread.
"""
# Save the current statr
current_state = self.__dict__.copy()
Parameters
----------
settings : str or dict
Options dictionary or name, as for :meth:`~LoadPolicy.set`.
kwargs : dict
Option values, as for :meth:`~LoadPolicy.set`.
Examples
--------
>>> LOAD_POLICY.set("default")
>>> print(LOAD_POLICY.repeat_until_unchanged)
False
>>> with LOAD_POLICY.context("comprehensive"):
... print(LOAD_POLICY.repeat_until_unchanged)
...
True
>>> print(LOAD_POLICY.repeat_until_unchanged)
False
# Update the state from given policy object and/or method keywords
for name in self._allkeys:
value = getattr(self, name)
if policy and hasattr(policy, name):
value = getattr(policy, name)
if name in kwargs:
value = kwargs[name]
setattr(self, name, value)
"""
# Save the current state
saved_settings = self.settings()

# Apply the new options and execute the context
try:
# Execute the context
self.set(settings, **kwargs)
yield
finally:
# Return the state
self.__dict__.clear()
self.__dict__.update(current_state)
# Re-establish the former state
self.set(saved_settings)


LOAD_POLICY = LoadPolicy()
LOAD_POLICY_LEGACY = LoadPolicy()
LOAD_POLICY_RECOMMENDED = LoadPolicy(
support_multiple_references=True, multiref_triggers_concatenate=True
)
LOAD_POLICY_COMPREHENSIVE = LoadPolicy(
support_multiple_references=True, use_concatenate=True, repeat_until_done=True
)
# The unique (singleton) policy object
# N.B. FOR NOW, our starting point is "legacy" rather than "default"
# TODO: resolve tests as needed, to pass with "default".
LOAD_POLICY = LoadPolicy("legacy")


def _current_effective_policy():
policy = LOAD_POLICY
if not policy.use_concatenate and policy.multiref_triggers_concatenate:
from iris.fileformats.rules import _MULTIREF_DETECTION
def combine_cubes(cubes, options=None, merge_require_unique=False):
"""Combine cubes as for load, according to a "loading policy".
if _MULTIREF_DETECTION.found_multiple_refs:
policy = policy.copy()
policy.use_concatenate = True
return policy
Applies :meth:`~iris.cube.CubeList.merge`/:meth:`~iris.cube.CubeList.concatenate`
steps to the given cubes, as determined by the 'settings'.
Parameters
----------
cubes : list of :class:`~iris.cube.Cube`
A list of cubes to combine.
options : dict or str
Settings, as described for :meth:`iris.LOAD_POLICY.set`.
Defaults to current :meth:`iris.LOAD_POLICY.settings`.
merge_require_unique : bool
Value for the 'unique' keyword in any merge operations.
def _combine_with_loading_policy(cubes, policy=None, merge_require_unique=False):
if not policy:
policy = _current_effective_policy()
Returns
-------
list of :class:`~iris.cube.Cube`
"""
if not options:
options = LOAD_POLICY.settings()
while True:
n_original_cubes = len(cubes)
if policy.use_concatenate and policy.cat_before_merge:
sequence = options["merge_concat_sequence"]

if sequence[0] == "c":
# concat if it comes first
cubes = cubes.concatenate()
if policy.use_merge:
if "m" in sequence:
# merge if requested
cubes = cubes.merge(unique=merge_require_unique)
if policy.use_concatenate and not policy.cat_before_merge:
if sequence[-1] == "c":
# concat if it comes last
cubes = cubes.concatenate()
n_new_cubes = len(cubes)
if not policy.repeat_until_done or n_new_cubes >= n_original_cubes:

# Repeat if requested, and this step reduced the number of cubes
if not options["repeat_until_unchanged"] or len(cubes) < n_original_cubes:
break

return cubes


def _combine_load_cubes(cubes, merge_require_unique=False):
# A special version to call combine_cubes while also implementing the
# _MULTIREF_DETECTION behaviour
options = LOAD_POLICY.settings()
if (
options["support_multiple_references"]
and "c" not in options["merge_concat_sequence"]
):
# Add a concatenate to implement the "multiref triggers concatenate" mechanism
from iris.fileformats.rules import _MULTIREF_DETECTION

if _MULTIREF_DETECTION.found_multiple_refs:
options["merge_concat_sequence"] += "c"

return combine_cubes(cubes, options)


def load(uris, constraints=None, callback=None, policy=None):
"""Load any number of Cubes for each constraint.
Expand Down
6 changes: 3 additions & 3 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def add(self, cube):
self.cubes.append(sub_cube)

def combined(self, unique=False):
"""Return a new :class:`_CubeFilter` by merging the list of cubes.
"""Return a new :class:`_CubeFilter` by combining the list of cubes.
Parameters
----------
Expand All @@ -90,11 +90,11 @@ def combined(self, unique=False):
duplicate cubes are detected.
"""
from iris import _combine_with_loading_policy
from iris import _combine_load_cubes

return _CubeFilter(
self.constraint,
_combine_with_loading_policy(self.cubes, merge_require_unique=unique),
_combine_load_cubes(self.cubes, merge_require_unique=unique),
)


Expand Down
16 changes: 9 additions & 7 deletions lib/iris/tests/testroundtrip_hybrid_factory_H_or_P.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,14 @@ def check_create():

def test_roundtrip():
print("Check with Iris from : ", iris.__file__)
from iris import (
LOAD_POLICY,
LOAD_POLICY_RECOMMENDED,
# LOAD_POLICY_LEGACY,
# LOAD_POLICY_COMPREHENSIVE
)
from iris import LOAD_POLICY

with LOAD_POLICY.context(LOAD_POLICY_RECOMMENDED):
# print(LOAD_POLICY)
# LOAD_POLICY.repeat_until_unchanged = 4
with LOAD_POLICY.context("default"):
# print(LOAD_POLICY)
# print("merge/concat = ", LOAD_POLICY.merge_concat_sequence)
check_create()

# print(LOAD_POLICY)
# print("now legacy mode ? ", LOAD_POLICY.settings() == LOAD_POLICY.SETTINGS["legacy"])

0 comments on commit f7ddb65

Please sign in to comment.