Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/country_workspace/admin/office.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class OfficeAdmin(SyncAdminMixin, BaseModelAdmin):
sync_config = SyncAdminConfig(
targets=[
TargetConfig(target=Target.OFFICES),
TargetConfig(target=Target.BENEFICIARY_GROUPS),
TargetConfig(target=Target.PROGRAMS),
],
)
Expand Down
14 changes: 9 additions & 5 deletions src/country_workspace/contrib/hope/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,21 @@ def get(self, path: str, params: dict[str, Any] | None = None) -> "Generator[Fla
signature = hashlib.sha256(f"{url}{params}{time.perf_counter_ns()}".encode()).hexdigest()
pages = 0
hope_request_start.send(self.__class__, url=url, params=params, signature=signature)
while True:
if not url:
break
while url:
try:
ret = requests.get(url, params=params, headers={"Authorization": f"Token {self.token}"}, timeout=10) # nosec
ret = requests.get(
url,
params=(params if pages == 0 else None),
headers={"Authorization": f"Token {self.token}"},
timeout=10,
) # nosec
if ret.status_code != 200:
raise RemoteError(f"Error {ret.status_code} fetching {url}")
pages += 1
except RequestException:
raise RemoteError(f"Remote Error fetching {url}")

pages += 1

try:
data = ret.json()
except JSONDecodeError:
Expand Down
10 changes: 9 additions & 1 deletion src/country_workspace/contrib/hope/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
class HopePushError(Exception):
pass
"""Exception raised for errors during the push process."""


class HopeSyncError(Exception):
"""Exception raised for errors during the synchronization process."""


class SkipRecordError(Exception):
"""Exception raised when a record should be skipped during synchronization process."""
53 changes: 26 additions & 27 deletions src/country_workspace/contrib/hope/sync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
from contextlib import contextmanager
from enum import Enum
from io import TextIOBase
from typing import Any, Final, TypedDict, NotRequired, TextIO, Literal
from typing import Any, Final, TypedDict, NotRequired, Literal

from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.db import DatabaseError
from django.db.models import Model

from country_workspace.exceptions import RemoteError
from country_workspace.models import SyncLog
from ..client import HopeClient
from ....models import SyncLog
from ..exceptions import HopeSyncError, SkipRecordError

logging.basicConfig()

Expand All @@ -22,15 +23,11 @@
"RECORD_SKIPPED": "Skipped record '{reference_id_val}': {error}",
"RECORD_SYNC_FAILURE": "Failed to sync DB record '{reference_id_val}': {error}",
"REMOTE_API_FAILURE": "API Error fetching '{path}': {error}",
"SYNC_COMPLETE": "Sync complete for '{entity}' with result {result} with '{errors_count}' erors.",
"SYNC_COMPLETE": "Sync complete for '{entity}' with result {result} with '{errors_count}' errors.",
"SYNC_START": "Start fetching '{entity}' data from HOPE core...",
}


class SkipRecordError(Exception):
"""Exception raised when a record should be skipped during synchronization."""


class ParamDateName(Enum):
"""Parameter names for date filtering in API requests."""

Expand All @@ -48,12 +45,12 @@ class SyncConfig[T: Model](TypedDict):

Attributes:
model: The Django model class to synchronize.
delta_sync: If True, only new records will be processed; otherwise, existing records will be updated.
delta_sync: If True, the API request is constrained by a date param (see build_endpoint); otherwise full fetch.
endpoint: The API endpoint configuration with path and optional query parameters.
reference_id: The field name used as the system reference ID for the model.
prepare_defaults: Function to prepare default values for the model.
should_process: Optional function to filter records before processing.
post_process: Optional function to process the model instance after creation/update.
prepare_defaults: Function to map an API record into model defaults (required).
should_process: Optional record-level filter predicate (truthy -> process).
post_process: Optional hook after create/update; may raise to fail the sync.

"""

Expand All @@ -79,6 +76,7 @@ def log_to(
level: Literal[0, 10, 20, 30, 40, 50, 60] = logging.INFO,
log_format: str = "%(levelname)s %(message)s",
) -> Iterator[None]:
"""Temporarily redirect logs of `logger_name` to a stream."""
logger = logging.getLogger(logger_name)

handlers_backup = tuple(logger.handlers)
Expand Down Expand Up @@ -108,13 +106,14 @@ def add_error(stats: Stats, error: str) -> None:


def safe_get(client: HopeClient, endpoint: EndpointConfig, stats: Stats) -> Generator[dict[str, Any], None, None]:
"""Fetch data from the remote API safely, handling errors."""
"""Yield records from the remote API, converting RemoteError -> HopeSyncError."""
try:
yield from client.get(**endpoint)
except RemoteError as e:
error = format_msg("REMOTE_API_FAILURE", path=endpoint.get("path"), error=str(e))
add_error(stats, error)
logging.error(error)
raise HopeSyncError(error) from e


def format_msg(key: str, **kwargs: Any) -> str:
Expand All @@ -127,26 +126,27 @@ def format_msg(key: str, **kwargs: Any) -> str:
raise KeyError(f"Log key '{key}' not found in MESSAGES configuration.")


def validated_reference_id(record: dict[str, Any], out: TextIO) -> str | None:
"""Validate and retrieve the system reference ID from the record."""
def validated_reference_id(record: dict[str, Any]) -> str | None:
"""Return record['id'] if present; warn and return None otherwise."""
reference_id_val = record.get("id")
if not reference_id_val:
logging.warning(format_msg("RECORD_MISSING_REFERENCE_ID", record=record))
return reference_id_val


def sync_entity[T: Model](config: SyncConfig[T], client: HopeClient | None = None, stats: Stats | None = None) -> Stats:
"""Synchronize an entity with the remote API.
"""Synchronize a single model using the provided `SyncConfig`.

Args:
config (SyncConfig): Configuration for the entity synchronization.
out (TextIOBase): Output file to write to.
client (HopeClient): HopeClient to use for synchronization.
stats (dict[str, Any]): Synchronization results.
config: Sync configuration (model, endpoint, mappers/hooks).
client: Optional `HopeClient`, created by default.
stats: Optional running stats; a new one is created if not provided.

Notes:
- Fetches records from the API, processes them, and updates/creates model instances.
- Logs synchronization start, errors, and completion.
Returns:
Stats: counts of added/updated records; empty `errors` on success.

Raises:
HopeSyncError: if remote API fails or any record-level errors were collected.

"""
should_process = config.get("should_process")
Expand All @@ -161,7 +161,7 @@ def sync_entity[T: Model](config: SyncConfig[T], client: HopeClient | None = Non
with cache.lock(f"sync-{model_name}"):
logging.info(format_msg("SYNC_START", entity=model_name))
for record in safe_get(client, config["endpoint"], stats):
if not (reference_id_val := validated_reference_id(record, stats)):
if not (reference_id_val := validated_reference_id(record)):
continue
if should_process and not should_process(record):
continue
Expand All @@ -181,21 +181,20 @@ def sync_entity[T: Model](config: SyncConfig[T], client: HopeClient | None = Non
error = format_msg("RECORD_SYNC_FAILURE", reference_id_val=reference_id_val, error=str(e))
add_error(stats, error)
logging.error(error)
if stats["errors"]:
raise HopeSyncError(stats["errors"])
SyncLog.objects.register_sync(model)
logging.info(format_msg("SYNC_COMPLETE", entity=model_name, result=stats, errors_count=len(stats["errors"])))

logging.info(format_msg("SYNC_COMPLETE", entity=model_name, result=stats, errors_count=0))
return stats


def _get_last_updated_date(model: type[Model]) -> str | None:
"""Get the last update date for the given model."""
ct = ContentType.objects.get_for_model(model)
last_sync = SyncLog.objects.filter(content_type=ct).order_by("-last_update_date").first()
return last_sync.last_update_date.date().isoformat() if last_sync else None


def build_endpoint(path: str, model: type[Model], param_date_name: ParamDateName, delta_sync: bool) -> EndpointConfig:
"""Build the endpoint configuration for the API request."""
params = {"format": "json"}
if delta_sync and (last_date := _get_last_updated_date(model)):
return EndpointConfig(path=path, params={param_date_name.value: last_date, **params})
Expand Down
17 changes: 1 addition & 16 deletions src/country_workspace/contrib/hope/sync/context_geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@


def sync_countries(delta_sync: bool = False) -> Stats:
"""Fetch and process Country records from the remote API."""
return sync_entity(
SyncConfig(
model=Country,
Expand All @@ -33,13 +32,6 @@ def sync_countries(delta_sync: bool = False) -> Stats:


def sync_area_types(delta_sync: bool = False) -> Stats:
"""Fetch and process AreaType records from the remote API.

Notes:
Calls sync_countries first to ensure dependencies are synchronized.

"""

def _prepare_defaults(rec: dict[str, Any]) -> dict[str, Any] | None:
try:
country = Country.objects.get(hope_id=rec["country"])
Expand Down Expand Up @@ -73,13 +65,6 @@ def _prepare_defaults(rec: dict[str, Any]) -> dict[str, Any] | None:


def sync_areas(delta_sync: bool = False) -> Stats:
"""Fetch and process Area records from the remote API.

Notes:
Calls sync_area_types first to ensure dependencies are synchronized.

"""

def _prepare_defaults(rec: dict[str, Any]) -> dict[str, Any] | None:
try:
area_type = AreaType.objects.get(hope_id=rec["area_type"])
Expand Down Expand Up @@ -113,7 +98,7 @@ def _prepare_defaults(rec: dict[str, Any]) -> dict[str, Any] | None:


def _assign_parents(model: type[Model], parent_mapping: dict[str, str]) -> None:
"""Assign parent relationships for the given model based on the parent mapping."""
"""Bulk-assign parents from mapping."""
updates = []
for child_id, parent_id in parent_mapping.items():
try:
Expand Down
8 changes: 0 additions & 8 deletions src/country_workspace/contrib/hope/sync/context_programs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def should_process_office(record: dict[str, Any]) -> bool:


def sync_offices(delta_sync: bool = False) -> Stats:
"""Fetch and process Office records from the remote API, deactivating those not present in the source."""
return sync_entity(
SyncConfig[Office](
model=Office,
Expand All @@ -77,7 +76,6 @@ def sync_offices(delta_sync: bool = False) -> Stats:


def sync_beneficiary_groups(delta_sync: bool = False) -> Stats:
"""Fetch and process BeneficiaryGroup records from the remote API."""
return sync_entity(
SyncConfig[BeneficiaryGroup](
model=BeneficiaryGroup,
Expand Down Expand Up @@ -131,12 +129,6 @@ def post_process_program(program: Program, created: bool) -> None:


def sync_programs(delta_sync: bool = False, programs_limit_to_office: Office | None = None) -> Stats:
"""Synchronize and process Program records from the remote API, applying filters and post-processing.

Notes:
Calls sync_beneficiary_groups to ensure dependencies are synchronized.

"""
return sync_entity(
SyncConfig[Program](
model=Program,
Expand Down
9 changes: 9 additions & 0 deletions tests/contrib/hope/test_hope_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ def test_get_success(
assert start_mock.call_count == end_mock.call_count == 1
assert end_mock.call_args[1]["pages"] == pages

urls = [c.request.url for c in mocked_responses.calls]
assert len(urls) == pages
assert "p=v" in urls[0]
assert all("p=" not in u for u in urls[1:])


@pytest.mark.parametrize(
("case", "status", "body", "error_pattern"),
Expand Down Expand Up @@ -265,3 +270,7 @@ def test_break_with_empty_results(
assert results == []
assert start_mock.call_count == end_mock.call_count == 1
assert end_mock.call_args[1]["pages"] == 1

urls = [c.request.url for c in mocked_responses.calls]
assert len(urls) == 1
assert "p=v" in urls[0]
25 changes: 17 additions & 8 deletions tests/contrib/hope/test_sync_base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import Callable
from collections.abc import Callable
from unittest.mock import Mock
import pytest
from django.db import DatabaseError
from pytest_mock import MockerFixture

from country_workspace.contrib.hope.client import HopeClient
from country_workspace.contrib.hope.exceptions import HopeSyncError, SkipRecordError
from country_workspace.contrib.hope.sync.base import (
SkipRecordError,
SyncConfig,
EndpointConfig,
Stats,
Expand Down Expand Up @@ -39,8 +39,9 @@ def test_safe_get_errors(hope_client: HopeClient, exception: Exception, expected
hope_client.get.side_effect = exception
stats = Stats(add=0, upd=0, errors=[])
with log_to(out := Mock()):
results = list(safe_get(hope_client, EndpointConfig(path="dummy_path"), stats))
assert results == []
with pytest.raises(HopeSyncError) as exc:
list(safe_get(hope_client, EndpointConfig(path="dummy_path"), stats))
assert expected_error in str(exc.value)
assert_stdout_contains(out, expected_error)
assert any(expected_error in e for e in stats["errors"])

Expand All @@ -54,10 +55,12 @@ def test_safe_get_errors(hope_client: HopeClient, exception: Exception, expected
ids=["valid_id", "missing_id"],
)
def test_validated_reference_id(record: dict, expected_id: str | None, stdout_contains: str | None) -> None:
with log_to(out := Mock()):
assert validated_reference_id(record, out) == expected_id
if stdout_contains:
with log_to(out := Mock()):
assert validated_reference_id(record) == expected_id
assert_stdout_contains(out, stdout_contains)
else:
assert validated_reference_id(record) == expected_id


def test_sync_entity_success(
Expand Down Expand Up @@ -121,8 +124,14 @@ def test_sync_entity_errors(
expected_errors: list[str] | None,
) -> None:
mock_model.objects.update_or_create.side_effect = exception
stats = sync_entity_context([records[0]], success_config)
assert stats == {"add": 0, "upd": 0, "errors": expected_errors}
if expected_errors:
with pytest.raises(HopeSyncError) as exc:
sync_entity_context([records[0]], success_config)
for e in expected_errors:
assert e in str(exc.value)
else:
stats = sync_entity_context([records[0]], success_config)
assert stats == {"add": 0, "upd": 0, "errors": expected_errors}
assert_stdout_contains(out, expected_log)


Expand Down