From 71264b5075647d59cb608d8a03e5540a787287ce Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 16 Jan 2025 14:38:15 -0300 Subject: [PATCH 1/3] fix pipeline cartao pagamento --- pipelines/utils/crawler_cgu/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index bec196b99..33d6a2a0f 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -42,7 +42,7 @@ def partition_data(table_id: str, dataset_id : str) -> str: log("---------------------------- Read data ----------------------------") df = read_csv(dataset_id = dataset_id, table_id = table_id) log(df.head()) - if dataset_id == "br_cgu_cartao_pagamento:": + if dataset_id == "br_cgu_cartao_pagamento": log(" ---------------------------- Partiting data -----------------------") to_partitions( data = df, @@ -77,6 +77,7 @@ def partition_data(table_id: str, dataset_id : str) -> str: log("---------------------------- Data partitioned ----------------------") return constants.TABELA_SERVIDORES.value[table_id]['OUTPUT'] + @task def get_current_date_and_download_file(table_id : str, dataset_id : str, From 6af302e81cccab9225e66ba8acc86f1f25bb2952 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 16 Jan 2025 15:00:17 -0300 Subject: [PATCH 2/3] fix pipeline cartao pagamento register now --- pipelines/utils/crawler_cgu/constants.py | 1 + pipelines/utils/crawler_cgu/flows.py | 2 ++ pipelines/utils/crawler_cgu/tasks.py | 2 +- pipelines/utils/crawler_cgu/utils.py | 1 + 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index ab32534c9..0f31a39fd 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -175,3 +175,4 @@ class constants(Enum): # pylint: disable=c0103 "READ": "_TermoAditivo.csv", }, } + diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index d5a09c638..74372f3ec 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -317,3 +317,5 @@ ) flow_cgu_licitacao_contrato.storage = GCS(constants.GCS_FLOWS_BUCKET.value) flow_cgu_licitacao_contrato.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) + + diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 33d6a2a0f..99b4a47ee 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -43,7 +43,7 @@ def partition_data(table_id: str, dataset_id : str) -> str: df = read_csv(dataset_id = dataset_id, table_id = table_id) log(df.head()) if dataset_id == "br_cgu_cartao_pagamento": - log(" ---------------------------- Partiting data -----------------------") + log("---------------------------- Partiting data -----------------------") to_partitions( data = df, partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 1746b0022..ac243adbf 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -2,6 +2,7 @@ """ General purpose functions for the br_cgu_cartao_pagamento project """ + import datetime from arrow import get from dateutil.relativedelta import relativedelta From b7a966726966d88d1d86ddd28a8c83a88e988755 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 16 Jan 2025 17:00:45 -0300 Subject: [PATCH 3/3] register flow definitive --- pipelines/datasets/br_cgu_cartao_pagamento/schedules.py | 1 + pipelines/utils/crawler_cgu/flows.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py index c9bd50a4c..99ec99fa3 100644 --- a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py +++ b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py @@ -5,6 +5,7 @@ from pipelines.constants import constants from pipelines.utils.crawler_cgu.constants import constants as constants_cgu + every_day_microdados_governo_federal = Schedule( clocks=[ CronClock( diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index 74372f3ec..99d28f291 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -32,7 +32,7 @@ ) as flow_cgu_cartao_pagamento: dataset_id = Parameter("dataset_id", default='br_cgu_cartao_pagamento', required=True) - table_id = Parameter("table_id", default ="microdados_governo_federal", required=True) + table_id = Parameter("table_id", required=True) #### # Relative_month = 1 means that the data will be downloaded for the current month ####