Skip to content

Commit

Permalink
Revert "Add option to force analysis even if resource has not changed (
Browse files Browse the repository at this point in the history
…#205)"

This reverts commit 44660b7.
  • Loading branch information
bolinocroustibat authored Nov 6, 2024
1 parent 44660b7 commit 3e2a6b0
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 98 deletions.
71 changes: 0 additions & 71 deletions tests/test_analysis/test_analysis_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@
from tempfile import NamedTemporaryFile

import pytest
from aiohttp import ClientSession
from asyncpg.exceptions import UndefinedTableError
from yarl import URL

from tests.conftest import RESOURCE_ID, RESOURCE_URL
from udata_hydra.analysis.csv import analyse_csv, csv_to_db
from udata_hydra.crawl.check_resources import (
RESOURCE_RESPONSE_STATUSES,
check_resource,
)
from udata_hydra.db.resource import Resource

pytestmark = pytest.mark.asyncio
Expand Down Expand Up @@ -311,69 +306,3 @@ async def test_analyse_csv_send_udata_webhook(
assert webhook.get("analysis:parsing:started_at")
assert webhook.get("analysis:parsing:finished_at")
assert webhook.get("analysis:parsing:error") is None


@pytest.mark.parametrize(
"forced_analysis",
(
(True, True),
(False, False),
),
)
async def test_forced_analysis(
setup_catalog,
rmock,
catalog_content,
db,
fake_check,
forced_analysis,
udata_url,
):
force_analysis, table_exists = forced_analysis
check = await fake_check(
headers={
"content-type": "application/csv",
"content-length": "100",
}
)
url = check["url"]
rid = check["resource_id"]
rmock.head(
url,
status=200,
headers={
"content-type": "application/csv",
"content-length": "100",
},
)
rmock.get(
url,
status=200,
headers={
"content-type": "application/csv",
"content-length": "100",
},
body="a,b,c\n1,2,3".encode("utf-8"),
repeat=True,
)
rmock.put(udata_url, status=200, repeat=True)
async with ClientSession() as session:
await check_resource(
url=url, resource_id=rid, session=session, force_analysis=force_analysis
)

# check that csv was indeed pushed to db
table_name = hashlib.md5(url.encode("utf-8")).hexdigest()
tables = await db.fetch(
"SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = 'public';"
)
assert (table_name in [r["table_name"] for r in tables]) == table_exists

# check whether udata was pinged
if force_analysis:
webhook = rmock.requests[("PUT", URL(udata_url))][0].kwargs["json"]
assert webhook.get("analysis:parsing:started_at")
assert webhook.get("analysis:parsing:finished_at")
assert webhook.get("analysis:parsing:error") is None
else:
assert ("PUT", URL(udata_url)) not in rmock.requests.keys()
8 changes: 3 additions & 5 deletions udata_hydra/analysis/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ class Change(Enum):
log = logging.getLogger("udata-hydra")


async def analyse_resource(
check_id: int, is_first_check: bool, force_analysis: bool = False
) -> None:
async def analyse_resource(check_id: int, is_first_check: bool) -> None:
"""
Perform analysis on the resource designated by check_id:
- change analysis
Expand Down Expand Up @@ -72,7 +70,7 @@ async def analyse_resource(
# if the change status is NO_GUESS or HAS_CHANGED, let's download the file to get more infos
dl_analysis = {}
tmp_file = None
if change_status != Change.HAS_NOT_CHANGED or force_analysis:
if change_status != Change.HAS_NOT_CHANGED:
try:
tmp_file = await download_resource(url, headers, max_size_allowed)
except IOError:
Expand Down Expand Up @@ -109,7 +107,7 @@ async def analyse_resource(

analysis_results = {**dl_analysis, **(change_payload or {})}

if change_status == Change.HAS_CHANGED or is_first_check or force_analysis:
if change_status == Change.HAS_CHANGED or is_first_check:
if is_tabular and tmp_file:
# Change status to TO_ANALYSE_CSV
await Resource.update(resource_id, data={"status": "TO_ANALYSE_CSV"})
Expand Down
3 changes: 1 addition & 2 deletions udata_hydra/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def crawl_url(url: str, method: str = "get"):


@cli
async def check_resource(resource_id: str, method: str = "get", force_analysis: bool = True):
async def check_resource(resource_id: str, method: str = "get"):
"""Trigger a complete check for a given resource_id"""
resource: asyncpg.Record | None = await Resource.get(resource_id)
if not resource:
Expand All @@ -149,7 +149,6 @@ async def check_resource(resource_id: str, method: str = "get", force_analysis:
resource_id=resource_id,
session=session,
method=method,
force_analysis=force_analysis,
worker_priority="high",
)

Expand Down
16 changes: 2 additions & 14 deletions udata_hydra/crawl/check_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ async def check_resource(
sleep: float = 0,
method: str = "head",
worker_priority: str = "default",
force_analysis: bool = False,
) -> str:
log.debug(f"check {url}, sleep {sleep}, method {method}")

Expand Down Expand Up @@ -102,12 +101,7 @@ async def check_resource(
end = time.time()
if method != "get" and not has_nice_head(resp):
return await check_resource(
url,
resource_id,
session,
force_analysis=force_analysis,
method="get",
worker_priority=worker_priority,
url, resource_id, session, method="get", worker_priority=worker_priority
)
resp.raise_for_status()

Expand All @@ -128,13 +122,7 @@ async def check_resource(
await Resource.update(resource_id, data={"status": "TO_ANALYSE_RESOURCE"})

# Enqueue the resource for analysis
queue.enqueue(
analyse_resource,
check["id"],
is_first_check,
force_analysis,
_priority=worker_priority,
)
queue.enqueue(analyse_resource, check["id"], is_first_check, _priority=worker_priority)

return RESOURCE_RESPONSE_STATUSES["OK"]

Expand Down
7 changes: 1 addition & 6 deletions udata_hydra/routes/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ async def create_check(request: web.Request) -> web.Response:
try:
payload: dict = await request.json()
resource_id: str = payload["resource_id"]
force_analysis: bool = payload.get("force_analysis", True)
except Exception as err:
raise web.HTTPBadRequest(text=json.dumps({"error": str(err)}))

Expand All @@ -81,11 +80,7 @@ async def create_check(request: web.Request) -> web.Response:
timeout=None, headers={"user-agent": config.USER_AGENT}
) as session:
status: str = await check_resource(
url=url,
resource_id=resource_id,
force_analysis=force_analysis,
session=session,
worker_priority="high",
url=url, resource_id=resource_id, session=session, worker_priority="high"
)
context.monitor().refresh(status)

Expand Down

0 comments on commit 3e2a6b0

Please sign in to comment.