From 8cb0597f76262bd05b8d72e0ba63fae266e50285 Mon Sep 17 00:00:00 2001 From: Pierlou Date: Mon, 28 Oct 2024 16:55:03 +0100 Subject: [PATCH 1/7] feat: add option to force analysis even if resource has not changed --- udata_hydra/analysis/resource.py | 4 ++-- udata_hydra/cli.py | 3 ++- udata_hydra/crawl/check_resources.py | 4 +++- udata_hydra/routes/checks.py | 3 ++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/udata_hydra/analysis/resource.py b/udata_hydra/analysis/resource.py index 914ed91d..782a6cb9 100644 --- a/udata_hydra/analysis/resource.py +++ b/udata_hydra/analysis/resource.py @@ -31,7 +31,7 @@ class Change(Enum): log = logging.getLogger("udata-hydra") -async def analyse_resource(check_id: int, is_first_check: bool) -> None: +async def analyse_resource(check_id: int, is_first_check: bool, force_analysis: bool = False) -> None: """ Perform analysis on the resource designated by check_id: - change analysis @@ -107,7 +107,7 @@ async def analyse_resource(check_id: int, is_first_check: bool) -> None: analysis_results = {**dl_analysis, **(change_payload or {})} - if change_status == Change.HAS_CHANGED or is_first_check: + if change_status == Change.HAS_CHANGED or is_first_check or force_analysis: if is_tabular and tmp_file: # Change status to TO_ANALYSE_CSV await Resource.update(resource_id, data={"status": "TO_ANALYSE_CSV"}) diff --git a/udata_hydra/cli.py b/udata_hydra/cli.py index 106f2be5..b2cbe82d 100644 --- a/udata_hydra/cli.py +++ b/udata_hydra/cli.py @@ -137,7 +137,7 @@ async def crawl_url(url: str, method: str = "get"): @cli -async def check_resource(resource_id: str, method: str = "get"): +async def check_resource(resource_id: str, method: str = "get", force_analysis: bool = False): """Trigger a complete check for a given resource_id""" resource: asyncpg.Record | None = await Resource.get(resource_id) if not resource: @@ -149,6 +149,7 @@ async def check_resource(resource_id: str, method: str = "get"): resource_id=resource_id, session=session, method=method, + force_analysis=force_analysis, worker_priority="high", ) diff --git a/udata_hydra/crawl/check_resources.py b/udata_hydra/crawl/check_resources.py index ad67092a..1ed232f2 100644 --- a/udata_hydra/crawl/check_resources.py +++ b/udata_hydra/crawl/check_resources.py @@ -58,6 +58,7 @@ async def check_resource( url: str, resource_id: str, session, + force_analysis: bool = False, sleep: float = 0, method: str = "head", worker_priority: str = "default", @@ -101,7 +102,8 @@ async def check_resource( end = time.time() if method != "get" and not has_nice_head(resp): return await check_resource( - url, resource_id, session, method="get", worker_priority=worker_priority + url, resource_id, session, + force_analysis=force_analysis, method="get", worker_priority=worker_priority ) resp.raise_for_status() diff --git a/udata_hydra/routes/checks.py b/udata_hydra/routes/checks.py index 38de7cee..b7116902 100644 --- a/udata_hydra/routes/checks.py +++ b/udata_hydra/routes/checks.py @@ -64,6 +64,7 @@ async def create_check(request: web.Request) -> web.Response: try: payload: dict = await request.json() resource_id: str = payload["resource_id"] + force_analysis: bool = payload["force_analysis"] except Exception as err: raise web.HTTPBadRequest(text=json.dumps({"error": str(err)})) @@ -80,7 +81,7 @@ async def create_check(request: web.Request) -> web.Response: timeout=None, headers={"user-agent": config.USER_AGENT} ) as session: status: str = await check_resource( - url=url, resource_id=resource_id, session=session, worker_priority="high" + url=url, resource_id=resource_id, force_analysis=force_analysis, session=session, worker_priority="high" ) context.monitor().refresh(status) From 61bf0df3c266a79eec0ac09ca79c1e950e402860 Mon Sep 17 00:00:00 2001 From: Pierlou Date: Mon, 28 Oct 2024 16:57:23 +0100 Subject: [PATCH 2/7] fix: allow to not specify optional force_analysis arg --- udata_hydra/routes/checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/udata_hydra/routes/checks.py b/udata_hydra/routes/checks.py index b7116902..bd99f188 100644 --- a/udata_hydra/routes/checks.py +++ b/udata_hydra/routes/checks.py @@ -64,7 +64,7 @@ async def create_check(request: web.Request) -> web.Response: try: payload: dict = await request.json() resource_id: str = payload["resource_id"] - force_analysis: bool = payload["force_analysis"] + force_analysis: bool = payload.get("force_analysis", False) except Exception as err: raise web.HTTPBadRequest(text=json.dumps({"error": str(err)})) From 88bd8d65a76f75a5d10132d33b4c501fd4c6a015 Mon Sep 17 00:00:00 2001 From: Pierlou Date: Mon, 28 Oct 2024 17:01:16 +0100 Subject: [PATCH 3/7] fix: lint --- udata_hydra/analysis/resource.py | 4 +++- udata_hydra/crawl/check_resources.py | 8 ++++++-- udata_hydra/routes/checks.py | 6 +++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/udata_hydra/analysis/resource.py b/udata_hydra/analysis/resource.py index 782a6cb9..d3099e11 100644 --- a/udata_hydra/analysis/resource.py +++ b/udata_hydra/analysis/resource.py @@ -31,7 +31,9 @@ class Change(Enum): log = logging.getLogger("udata-hydra") -async def analyse_resource(check_id: int, is_first_check: bool, force_analysis: bool = False) -> None: +async def analyse_resource( + check_id: int, is_first_check: bool, force_analysis: bool = False +) -> None: """ Perform analysis on the resource designated by check_id: - change analysis diff --git a/udata_hydra/crawl/check_resources.py b/udata_hydra/crawl/check_resources.py index 1ed232f2..ccceaf8b 100644 --- a/udata_hydra/crawl/check_resources.py +++ b/udata_hydra/crawl/check_resources.py @@ -102,8 +102,12 @@ async def check_resource( end = time.time() if method != "get" and not has_nice_head(resp): return await check_resource( - url, resource_id, session, - force_analysis=force_analysis, method="get", worker_priority=worker_priority + url, + resource_id, + session, + force_analysis=force_analysis, + method="get", + worker_priority=worker_priority, ) resp.raise_for_status() diff --git a/udata_hydra/routes/checks.py b/udata_hydra/routes/checks.py index bd99f188..7f5b01b1 100644 --- a/udata_hydra/routes/checks.py +++ b/udata_hydra/routes/checks.py @@ -81,7 +81,11 @@ async def create_check(request: web.Request) -> web.Response: timeout=None, headers={"user-agent": config.USER_AGENT} ) as session: status: str = await check_resource( - url=url, resource_id=resource_id, force_analysis=force_analysis, session=session, worker_priority="high" + url=url, + resource_id=resource_id, + force_analysis=force_analysis, + session=session, + worker_priority="high", ) context.monitor().refresh(status) From d5538dd962eaa3b4e70a87dd29bba861e365f763 Mon Sep 17 00:00:00 2001 From: Pierlou Date: Tue, 29 Oct 2024 11:45:53 +0100 Subject: [PATCH 4/7] refactor: force analysis by default for CLI and API call --- udata_hydra/cli.py | 2 +- udata_hydra/crawl/check_resources.py | 4 ++-- udata_hydra/routes/checks.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/udata_hydra/cli.py b/udata_hydra/cli.py index b2cbe82d..29fa3719 100644 --- a/udata_hydra/cli.py +++ b/udata_hydra/cli.py @@ -137,7 +137,7 @@ async def crawl_url(url: str, method: str = "get"): @cli -async def check_resource(resource_id: str, method: str = "get", force_analysis: bool = False): +async def check_resource(resource_id: str, method: str = "get", force_analysis: bool = True): """Trigger a complete check for a given resource_id""" resource: asyncpg.Record | None = await Resource.get(resource_id) if not resource: diff --git a/udata_hydra/crawl/check_resources.py b/udata_hydra/crawl/check_resources.py index ccceaf8b..6681f6e1 100644 --- a/udata_hydra/crawl/check_resources.py +++ b/udata_hydra/crawl/check_resources.py @@ -58,10 +58,10 @@ async def check_resource( url: str, resource_id: str, session, - force_analysis: bool = False, sleep: float = 0, method: str = "head", worker_priority: str = "default", + force_analysis: bool = False, ) -> str: log.debug(f"check {url}, sleep {sleep}, method {method}") @@ -128,7 +128,7 @@ async def check_resource( await Resource.update(resource_id, data={"status": "TO_ANALYSE_RESOURCE"}) # Enqueue the resource for analysis - queue.enqueue(analyse_resource, check["id"], is_first_check, _priority=worker_priority) + queue.enqueue(analyse_resource, check["id"], is_first_check, force_analysis, _priority=worker_priority) return RESOURCE_RESPONSE_STATUSES["OK"] diff --git a/udata_hydra/routes/checks.py b/udata_hydra/routes/checks.py index 7f5b01b1..d3d6e3f5 100644 --- a/udata_hydra/routes/checks.py +++ b/udata_hydra/routes/checks.py @@ -64,7 +64,7 @@ async def create_check(request: web.Request) -> web.Response: try: payload: dict = await request.json() resource_id: str = payload["resource_id"] - force_analysis: bool = payload.get("force_analysis", False) + force_analysis: bool = payload.get("force_analysis", True) except Exception as err: raise web.HTTPBadRequest(text=json.dumps({"error": str(err)})) From 1886071b6af98d665625617b9f9fafd365e0704a Mon Sep 17 00:00:00 2001 From: Pierlou Date: Tue, 29 Oct 2024 12:29:50 +0100 Subject: [PATCH 5/7] fix: lint --- udata_hydra/crawl/check_resources.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/udata_hydra/crawl/check_resources.py b/udata_hydra/crawl/check_resources.py index 6681f6e1..bfdf9e2f 100644 --- a/udata_hydra/crawl/check_resources.py +++ b/udata_hydra/crawl/check_resources.py @@ -128,7 +128,13 @@ async def check_resource( await Resource.update(resource_id, data={"status": "TO_ANALYSE_RESOURCE"}) # Enqueue the resource for analysis - queue.enqueue(analyse_resource, check["id"], is_first_check, force_analysis, _priority=worker_priority) + queue.enqueue( + analyse_resource, + check["id"], + is_first_check, + force_analysis, + _priority=worker_priority, + ) return RESOURCE_RESPONSE_STATUSES["OK"] From 3c2db3e62f7a0df0d04b63ba22df7e2a67e6ed0c Mon Sep 17 00:00:00 2001 From: Pierlou Date: Mon, 4 Nov 2024 16:46:31 +0100 Subject: [PATCH 6/7] fix: download resource if force_analysis --- tests/test_analysis/test_analysis_csv.py | 63 ++++++++++++++++++++++++ udata_hydra/analysis/resource.py | 2 +- 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index b4320356..6f4be00e 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -4,11 +4,16 @@ from tempfile import NamedTemporaryFile import pytest +from aiohttp import ClientSession from asyncpg.exceptions import UndefinedTableError from yarl import URL from tests.conftest import RESOURCE_ID, RESOURCE_URL from udata_hydra.analysis.csv import analyse_csv, csv_to_db +from udata_hydra.crawl.check_resources import ( + RESOURCE_RESPONSE_STATUSES, + check_resource, +) from udata_hydra.db.resource import Resource pytestmark = pytest.mark.asyncio @@ -306,3 +311,61 @@ async def test_analyse_csv_send_udata_webhook( assert webhook.get("analysis:parsing:started_at") assert webhook.get("analysis:parsing:finished_at") assert webhook.get("analysis:parsing:error") is None + + +@pytest.mark.parametrize( + "forced_analysis", + ( + (True, True), + (False, False), + ), +) +async def test_forced_analysis( + setup_catalog, + rmock, + catalog_content, + db, + fake_check, + forced_analysis, + udata_url, +): + force_analysis, table_exists = forced_analysis + check = await fake_check(headers={ + "content-type": "application/csv", + "content-length": "100", + }) + url = check["url"] + rid = check["resource_id"] + rmock.head(url, status=200, headers={ + "content-type": "application/csv", + "content-length": "100", + }) + rmock.get( + url, + status=200, + headers={ + "content-type": "application/csv", + "content-length": "100", + }, + body="a,b,c\n1,2,3".encode("utf-8"), + repeat=True, + ) + rmock.put(udata_url, status=200, repeat=True) + async with ClientSession() as session: + await check_resource(url=url, resource_id=rid, session=session, force_analysis=force_analysis) + + # check that csv was indeed pushed to db + table_name = hashlib.md5(url.encode("utf-8")).hexdigest() + tables = await db.fetch( + "SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = 'public';" + ) + assert (table_name in [r['table_name'] for r in tables]) == table_exists + + # check whether udata was pinged + if force_analysis: + webhook = rmock.requests[("PUT", URL(udata_url))][0].kwargs["json"] + assert webhook.get("analysis:parsing:started_at") + assert webhook.get("analysis:parsing:finished_at") + assert webhook.get("analysis:parsing:error") is None + else: + assert ("PUT", URL(udata_url)) not in rmock.requests.keys() diff --git a/udata_hydra/analysis/resource.py b/udata_hydra/analysis/resource.py index d3099e11..72756076 100644 --- a/udata_hydra/analysis/resource.py +++ b/udata_hydra/analysis/resource.py @@ -72,7 +72,7 @@ async def analyse_resource( # if the change status is NO_GUESS or HAS_CHANGED, let's download the file to get more infos dl_analysis = {} tmp_file = None - if change_status != Change.HAS_NOT_CHANGED: + if change_status != Change.HAS_NOT_CHANGED or force_analysis: try: tmp_file = await download_resource(url, headers, max_size_allowed) except IOError: From 7edebc5f8d58395fa3732d1957285fbe0600abaf Mon Sep 17 00:00:00 2001 From: Pierlou Date: Mon, 4 Nov 2024 16:50:13 +0100 Subject: [PATCH 7/7] fix: lint added test --- tests/test_analysis/test_analysis_csv.py | 28 +++++++++++++++--------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/tests/test_analysis/test_analysis_csv.py b/tests/test_analysis/test_analysis_csv.py index 6f4be00e..cd1abad3 100644 --- a/tests/test_analysis/test_analysis_csv.py +++ b/tests/test_analysis/test_analysis_csv.py @@ -330,16 +330,22 @@ async def test_forced_analysis( udata_url, ): force_analysis, table_exists = forced_analysis - check = await fake_check(headers={ - "content-type": "application/csv", - "content-length": "100", - }) + check = await fake_check( + headers={ + "content-type": "application/csv", + "content-length": "100", + } + ) url = check["url"] rid = check["resource_id"] - rmock.head(url, status=200, headers={ - "content-type": "application/csv", - "content-length": "100", - }) + rmock.head( + url, + status=200, + headers={ + "content-type": "application/csv", + "content-length": "100", + }, + ) rmock.get( url, status=200, @@ -352,14 +358,16 @@ async def test_forced_analysis( ) rmock.put(udata_url, status=200, repeat=True) async with ClientSession() as session: - await check_resource(url=url, resource_id=rid, session=session, force_analysis=force_analysis) + await check_resource( + url=url, resource_id=rid, session=session, force_analysis=force_analysis + ) # check that csv was indeed pushed to db table_name = hashlib.md5(url.encode("utf-8")).hexdigest() tables = await db.fetch( "SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = 'public';" ) - assert (table_name in [r['table_name'] for r in tables]) == table_exists + assert (table_name in [r["table_name"] for r in tables]) == table_exists # check whether udata was pinged if force_analysis: