Skip to content

Commit

Permalink
Add option to force analysis even if resource has not changed (#205)
Browse files Browse the repository at this point in the history
We're missing this feature: forcing analysis on a resource,
independently of whether its content has changed since last check.
  • Loading branch information
Pierlou authored Nov 6, 2024
1 parent 500b921 commit 44660b7
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 7 deletions.
71 changes: 71 additions & 0 deletions tests/test_analysis/test_analysis_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
from tempfile import NamedTemporaryFile

import pytest
from aiohttp import ClientSession
from asyncpg.exceptions import UndefinedTableError
from yarl import URL

from tests.conftest import RESOURCE_ID, RESOURCE_URL
from udata_hydra.analysis.csv import analyse_csv, csv_to_db
from udata_hydra.crawl.check_resources import (
RESOURCE_RESPONSE_STATUSES,
check_resource,
)
from udata_hydra.db.resource import Resource

pytestmark = pytest.mark.asyncio
Expand Down Expand Up @@ -306,3 +311,69 @@ async def test_analyse_csv_send_udata_webhook(
assert webhook.get("analysis:parsing:started_at")
assert webhook.get("analysis:parsing:finished_at")
assert webhook.get("analysis:parsing:error") is None


@pytest.mark.parametrize(
"forced_analysis",
(
(True, True),
(False, False),
),
)
async def test_forced_analysis(
setup_catalog,
rmock,
catalog_content,
db,
fake_check,
forced_analysis,
udata_url,
):
force_analysis, table_exists = forced_analysis
check = await fake_check(
headers={
"content-type": "application/csv",
"content-length": "100",
}
)
url = check["url"]
rid = check["resource_id"]
rmock.head(
url,
status=200,
headers={
"content-type": "application/csv",
"content-length": "100",
},
)
rmock.get(
url,
status=200,
headers={
"content-type": "application/csv",
"content-length": "100",
},
body="a,b,c\n1,2,3".encode("utf-8"),
repeat=True,
)
rmock.put(udata_url, status=200, repeat=True)
async with ClientSession() as session:
await check_resource(
url=url, resource_id=rid, session=session, force_analysis=force_analysis
)

# check that csv was indeed pushed to db
table_name = hashlib.md5(url.encode("utf-8")).hexdigest()
tables = await db.fetch(
"SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = 'public';"
)
assert (table_name in [r["table_name"] for r in tables]) == table_exists

# check whether udata was pinged
if force_analysis:
webhook = rmock.requests[("PUT", URL(udata_url))][0].kwargs["json"]
assert webhook.get("analysis:parsing:started_at")
assert webhook.get("analysis:parsing:finished_at")
assert webhook.get("analysis:parsing:error") is None
else:
assert ("PUT", URL(udata_url)) not in rmock.requests.keys()
8 changes: 5 additions & 3 deletions udata_hydra/analysis/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class Change(Enum):
log = logging.getLogger("udata-hydra")


async def analyse_resource(check_id: int, is_first_check: bool) -> None:
async def analyse_resource(
check_id: int, is_first_check: bool, force_analysis: bool = False
) -> None:
"""
Perform analysis on the resource designated by check_id:
- change analysis
Expand Down Expand Up @@ -70,7 +72,7 @@ async def analyse_resource(check_id: int, is_first_check: bool) -> None:
# if the change status is NO_GUESS or HAS_CHANGED, let's download the file to get more infos
dl_analysis = {}
tmp_file = None
if change_status != Change.HAS_NOT_CHANGED:
if change_status != Change.HAS_NOT_CHANGED or force_analysis:
try:
tmp_file = await download_resource(url, headers, max_size_allowed)
except IOError:
Expand Down Expand Up @@ -107,7 +109,7 @@ async def analyse_resource(check_id: int, is_first_check: bool) -> None:

analysis_results = {**dl_analysis, **(change_payload or {})}

if change_status == Change.HAS_CHANGED or is_first_check:
if change_status == Change.HAS_CHANGED or is_first_check or force_analysis:
if is_tabular and tmp_file:
# Change status to TO_ANALYSE_CSV
await Resource.update(resource_id, data={"status": "TO_ANALYSE_CSV"})
Expand Down
3 changes: 2 additions & 1 deletion udata_hydra/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def crawl_url(url: str, method: str = "get"):


@cli
async def check_resource(resource_id: str, method: str = "get"):
async def check_resource(resource_id: str, method: str = "get", force_analysis: bool = True):
"""Trigger a complete check for a given resource_id"""
resource: asyncpg.Record | None = await Resource.get(resource_id)
if not resource:
Expand All @@ -149,6 +149,7 @@ async def check_resource(resource_id: str, method: str = "get"):
resource_id=resource_id,
session=session,
method=method,
force_analysis=force_analysis,
worker_priority="high",
)

Expand Down
16 changes: 14 additions & 2 deletions udata_hydra/crawl/check_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async def check_resource(
sleep: float = 0,
method: str = "head",
worker_priority: str = "default",
force_analysis: bool = False,
) -> str:
log.debug(f"check {url}, sleep {sleep}, method {method}")

Expand Down Expand Up @@ -101,7 +102,12 @@ async def check_resource(
end = time.time()
if method != "get" and not has_nice_head(resp):
return await check_resource(
url, resource_id, session, method="get", worker_priority=worker_priority
url,
resource_id,
session,
force_analysis=force_analysis,
method="get",
worker_priority=worker_priority,
)
resp.raise_for_status()

Expand All @@ -122,7 +128,13 @@ async def check_resource(
await Resource.update(resource_id, data={"status": "TO_ANALYSE_RESOURCE"})

# Enqueue the resource for analysis
queue.enqueue(analyse_resource, check["id"], is_first_check, _priority=worker_priority)
queue.enqueue(
analyse_resource,
check["id"],
is_first_check,
force_analysis,
_priority=worker_priority,
)

return RESOURCE_RESPONSE_STATUSES["OK"]

Expand Down
7 changes: 6 additions & 1 deletion udata_hydra/routes/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async def create_check(request: web.Request) -> web.Response:
try:
payload: dict = await request.json()
resource_id: str = payload["resource_id"]
force_analysis: bool = payload.get("force_analysis", True)
except Exception as err:
raise web.HTTPBadRequest(text=json.dumps({"error": str(err)}))

Expand All @@ -80,7 +81,11 @@ async def create_check(request: web.Request) -> web.Response:
timeout=None, headers={"user-agent": config.USER_AGENT}
) as session:
status: str = await check_resource(
url=url, resource_id=resource_id, session=session, worker_priority="high"
url=url,
resource_id=resource_id,
force_analysis=force_analysis,
session=session,
worker_priority="high",
)
context.monitor().refresh(status)

Expand Down

0 comments on commit 44660b7

Please sign in to comment.