Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to force analysis even if resource has not changed #205

Merged
merged 7 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions tests/test_analysis/test_analysis_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
from tempfile import NamedTemporaryFile

import pytest
from aiohttp import ClientSession
from asyncpg.exceptions import UndefinedTableError
from yarl import URL

from tests.conftest import RESOURCE_ID, RESOURCE_URL
from udata_hydra.analysis.csv import analyse_csv, csv_to_db
from udata_hydra.crawl.check_resources import (
RESOURCE_RESPONSE_STATUSES,
check_resource,
)
from udata_hydra.db.resource import Resource

pytestmark = pytest.mark.asyncio
Expand Down Expand Up @@ -306,3 +311,69 @@ async def test_analyse_csv_send_udata_webhook(
assert webhook.get("analysis:parsing:started_at")
assert webhook.get("analysis:parsing:finished_at")
assert webhook.get("analysis:parsing:error") is None


@pytest.mark.parametrize(
"forced_analysis",
(
(True, True),
(False, False),
),
)
async def test_forced_analysis(
setup_catalog,
rmock,
catalog_content,
db,
fake_check,
forced_analysis,
udata_url,
):
force_analysis, table_exists = forced_analysis
check = await fake_check(
headers={
"content-type": "application/csv",
"content-length": "100",
}
)
url = check["url"]
rid = check["resource_id"]
rmock.head(
url,
status=200,
headers={
"content-type": "application/csv",
"content-length": "100",
},
)
rmock.get(
url,
status=200,
headers={
"content-type": "application/csv",
"content-length": "100",
},
body="a,b,c\n1,2,3".encode("utf-8"),
repeat=True,
)
rmock.put(udata_url, status=200, repeat=True)
async with ClientSession() as session:
await check_resource(
url=url, resource_id=rid, session=session, force_analysis=force_analysis
)

# check that csv was indeed pushed to db
table_name = hashlib.md5(url.encode("utf-8")).hexdigest()
tables = await db.fetch(
"SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = 'public';"
)
assert (table_name in [r["table_name"] for r in tables]) == table_exists

# check whether udata was pinged
if force_analysis:
webhook = rmock.requests[("PUT", URL(udata_url))][0].kwargs["json"]
assert webhook.get("analysis:parsing:started_at")
assert webhook.get("analysis:parsing:finished_at")
assert webhook.get("analysis:parsing:error") is None
else:
assert ("PUT", URL(udata_url)) not in rmock.requests.keys()
8 changes: 5 additions & 3 deletions udata_hydra/analysis/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class Change(Enum):
log = logging.getLogger("udata-hydra")


async def analyse_resource(check_id: int, is_first_check: bool) -> None:
async def analyse_resource(
check_id: int, is_first_check: bool, force_analysis: bool = False
) -> None:
"""
Perform analysis on the resource designated by check_id:
- change analysis
Expand Down Expand Up @@ -70,7 +72,7 @@ async def analyse_resource(check_id: int, is_first_check: bool) -> None:
# if the change status is NO_GUESS or HAS_CHANGED, let's download the file to get more infos
dl_analysis = {}
tmp_file = None
if change_status != Change.HAS_NOT_CHANGED:
if change_status != Change.HAS_NOT_CHANGED or force_analysis:
try:
tmp_file = await download_resource(url, headers, max_size_allowed)
except IOError:
Expand Down Expand Up @@ -107,7 +109,7 @@ async def analyse_resource(check_id: int, is_first_check: bool) -> None:

analysis_results = {**dl_analysis, **(change_payload or {})}

if change_status == Change.HAS_CHANGED or is_first_check:
if change_status == Change.HAS_CHANGED or is_first_check or force_analysis:
if is_tabular and tmp_file:
# Change status to TO_ANALYSE_CSV
await Resource.update(resource_id, data={"status": "TO_ANALYSE_CSV"})
Expand Down
3 changes: 2 additions & 1 deletion udata_hydra/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def crawl_url(url: str, method: str = "get"):


@cli
async def check_resource(resource_id: str, method: str = "get"):
async def check_resource(resource_id: str, method: str = "get", force_analysis: bool = True):
"""Trigger a complete check for a given resource_id"""
resource: asyncpg.Record | None = await Resource.get(resource_id)
if not resource:
Expand All @@ -149,6 +149,7 @@ async def check_resource(resource_id: str, method: str = "get"):
resource_id=resource_id,
session=session,
method=method,
force_analysis=force_analysis,
worker_priority="high",
)

Expand Down
16 changes: 14 additions & 2 deletions udata_hydra/crawl/check_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async def check_resource(
sleep: float = 0,
method: str = "head",
worker_priority: str = "default",
force_analysis: bool = False,
) -> str:
log.debug(f"check {url}, sleep {sleep}, method {method}")

Expand Down Expand Up @@ -101,7 +102,12 @@ async def check_resource(
end = time.time()
if method != "get" and not has_nice_head(resp):
return await check_resource(
url, resource_id, session, method="get", worker_priority=worker_priority
url,
resource_id,
session,
force_analysis=force_analysis,
method="get",
worker_priority=worker_priority,
)
resp.raise_for_status()

Expand All @@ -122,7 +128,13 @@ async def check_resource(
await Resource.update(resource_id, data={"status": "TO_ANALYSE_RESOURCE"})

# Enqueue the resource for analysis
queue.enqueue(analyse_resource, check["id"], is_first_check, _priority=worker_priority)
queue.enqueue(
analyse_resource,
check["id"],
is_first_check,
force_analysis,
_priority=worker_priority,
)

return RESOURCE_RESPONSE_STATUSES["OK"]

Expand Down
7 changes: 6 additions & 1 deletion udata_hydra/routes/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async def create_check(request: web.Request) -> web.Response:
try:
payload: dict = await request.json()
resource_id: str = payload["resource_id"]
force_analysis: bool = payload.get("force_analysis", True)
except Exception as err:
raise web.HTTPBadRequest(text=json.dumps({"error": str(err)}))

Expand All @@ -80,7 +81,11 @@ async def create_check(request: web.Request) -> web.Response:
timeout=None, headers={"user-agent": config.USER_AGENT}
) as session:
status: str = await check_resource(
url=url, resource_id=resource_id, session=session, worker_priority="high"
url=url,
resource_id=resource_id,
force_analysis=force_analysis,
session=session,
worker_priority="high",
)
context.monitor().refresh(status)

Expand Down