Skip to content

Commit

Permalink
fix: fixed parser datetime
Browse files Browse the repository at this point in the history
  • Loading branch information
jlsneto committed Aug 15, 2024
1 parent 756828e commit f5379e3
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 4 deletions.
130 changes: 129 additions & 1 deletion cereja/concurrently/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import os
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

from ..utils import decorators

from .. import Progress, console

__all__ = ["MultiProcess"]
__all__ = ["MultiProcess", "Processor"]


class _IMultiProcess(abc.ABC):
Expand Down Expand Up @@ -218,3 +220,129 @@ def __exit__(self, exc_type, exc_val, exc_tb):
)
self._q.join()
self._with_context = False


class Processor:
def __init__(self, num_workers=None, max_in_progress=100, interval_seconds=None, use_progress=True,
on_result=None):
self._num_workers = num_workers if num_workers is not None else 10
self._on_result = on_result
self._total_success = 0
self._max_in_progress = max_in_progress
self._interval_seconds = 0 if interval_seconds is None else interval_seconds
self._process_result_service = None
self._future_to_data = set()
self._failure_data = []
self._stopped = False
self._executor = None
self._started_at = 0
self._progress = Progress(name="Processor",
max_value=100,
states=("value", "percent", "time"),
custom_state_func=self.get_status,
custom_state_name="Tx") if use_progress else None

@property
def in_progress_count(self):
return len(self._future_to_data)

@property
def total_processed(self):
return len(self._failure_data) + self._total_success

@property
def interval_seconds(self):
return self._interval_seconds

@property
def total_active_threads(self):
return threading.active_count()

def _create_process_result_service(self):
if self._process_result_service is not None:
self._process_result_service.join() # Espera terminar se estiver em execução
self._process_result_service = threading.Thread(target=self._process_result, daemon=False)
return self._process_result_service

def get_failure_data(self):
return self._failure_data

def _process_result(self):
# Roda enquanto tiver dados aguardando retorno do processo de validação e atualização do banco
while not self.stopped or self.in_progress_count > 0:
# list() é necessário call para criar cópia do objeto que está sendo manipulado em tempo de execução
for future in as_completed(list(self._future_to_data)):
result = future.result()
self._future_to_data.remove(future)

if self._on_result is not None:
self._on_result(result)
if self._progress is not None:
self._progress.show_progress(self.total_processed)

def _process(self, func, item, *args, **kwargs):
try:
result = func(item, *args, **kwargs)
self._total_success += 1
return result
except Exception as exc:
print(
f"Falha ao processa dado, mas será armazenado para conferência.\n"
f"Error: {exc}")
self._failure_data.append(item)

def get_status(self):
return f"{round(self.total_processed / (time.time() - self._started_at), 2)} cpf/s " \
f"- processing: {self.in_progress_count} " \
f"- success: {self._total_success} " \
f"- fail: {len(self._failure_data)} "

def process(self, func, data, *args, **kwargs):
"""
Função principal, responsável por controlar o tempo de envio dos dados para processar.
"""

self._stopped = False
# inicia thread para atualizar o banco com o resultado da validação.
self._create_process_result_service().start()
self._started_at = time.time()

if self._progress is not None:
self._progress.update_max_value(len(data))
self._progress.start()

with ThreadPoolExecutor(max_workers=self._num_workers,
thread_name_prefix="CPF_PROCESS_WORKER") as self._executor:
for item in data:
start_time = time.time()

future = self._executor.submit(self._process, func, item, *args, **kwargs)
self._future_to_data.add(future)

elapsed_time = time.time() - start_time
# Verifica quanto tempo passou após enviar um dado, caso o tempo for menor que o intervalo
# configurado espera a diferença antes de enviar o próximo lote
if elapsed_time < self.interval_seconds:
time.sleep(self.interval_seconds - elapsed_time)
if self.in_progress_count >= self._max_in_progress:
print(f"O Total de dados sendo processado {self.in_progress_count} é maior que o predefinido {self._max_in_progress}")
time.sleep(10)

self.stop_process()

@property
def stopped(self):
return self._stopped

def stop_process(self):
self._stopped = True
self._process_result_service.join()
self._progress.stop()

def restart_process(self):
self.stop_process() # espera terminar execução do processo anterior
self._stopped = False
self._started_at = time.time()
self._failure_data = []
self._total_success = 0
self._create_process_result_service().start()
3 changes: 3 additions & 0 deletions cereja/date/_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class DateTime(datetime):
r'\d{2}-\d{2}-\d{4}': '%d-%m-%Y', # Format (DD-MM-YYYY)
r'\d{1,2}/\d{1,2}/\d{2,4}': '%m/%d/%Y', # Format (MM/DD/YYYY)
r'\d{1,2}-\d{1,2}-\d{2,4}': '%m-%d-%Y', # Format (MM-DD-YYYY)
r'\d{8}': '%Y%m%d', # Format (YYYYMMDD)
r'\d{2}\d{2}\d{4}': '%d%m%Y', # Format (DDMMYYYY)
r'\d{4}\d{2}\d{2}': '%Y%m%d', # Format (YYYYMMDD) without separators
# Other formats can be added here
}

Expand Down
96 changes: 96 additions & 0 deletions cereja/scraping/_financial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from ..utils import camel_to_snake

__all__ = ["FinancialData"]


def _parse_keys_to_snake(obj: dict):
if isinstance(obj, dict):
return {camel_to_snake(k): v for k, v in obj.items()}
return obj


class FinancialResult:
def __init__(self, describle, value, value2='-'):
self.describle = describle
self.value = value
self.value2 = value2

def __repr__(self):
return f"FinancialResult(describle={self.describle}, value={self.value}, value2={self.value2})"


class FinancialResultShareholders:
def __init__(self, describle, on, pn, total):
self.describle = describle
self.on = on
self.pn = pn
self.total = total

def __repr__(self):
return f"FinancialResultShareholders(describle={self.describle}, on={self.on}, pn={self.pn}, total={self.total})"


class FinancialStatement:
def __init__(self, title, date_inicial, date_final, results):
self.title = title
self.date_inicial = date_inicial
self.date_final = date_final
self.results = [FinancialResult(**result) for result in results]

def __repr__(self):
return f"FinancialStatement(title={self.title}, date_inicial={self.date_inicial}, date_final={self.date_final}, results={self.results})"


class FreeFloatResult:
def __init__(self, title, describle, quantity, perc, results):
self.title = title
self.describle = describle
self.quantity = quantity
self.perc = perc
self.results = [FinancialResult(**result) for result in results]

def __repr__(self):
return f"FreeFloatResult(title={self.title}, describle={self.describle}, quantity={self.quantity}, perc={self.perc}, results={self.results})"


class ShareholderPosition:
def __init__(self, information_received, name, on, pn, total, results):
self.information_received = information_received
self.name = name
self.on = on
self.pn = pn
self.total = total
self.results = [FinancialResultShareholders(**result) for result in results]

def __repr__(self):
return f"ShareholderPosition(information_received={self.information_received}, name={self.name}, on={self.on}, pn={self.pn}, total={self.total}, results={self.results})"


class CapitalStockComposition:
def __init__(self, title, results):
self.title = title
self.results = [FinancialResult(**result) for result in results]

def __repr__(self):
return f"CapitalStockComposition(title={self.title}, results={self.results})"


class FinancialData:
def __init__(self, share: "Share", title_initial, consolidated, unconsolidated, free_float_result,
position_shareholders,
outstanding_shares, capital_stock_composition):
self._share = share
self.title_initial = title_initial
self.consolidated = [FinancialStatement(**_parse_keys_to_snake(statement)) for statement in consolidated]
self.unconsolidated = [FinancialStatement(**_parse_keys_to_snake(statement)) for statement in unconsolidated]
self.free_float_result = FreeFloatResult(**_parse_keys_to_snake(free_float_result))
self.position_shareholders = ShareholderPosition(**_parse_keys_to_snake(position_shareholders))
self.outstanding_shares = _parse_keys_to_snake(outstanding_shares)
self.capital_stock_composition = CapitalStockComposition(**_parse_keys_to_snake(capital_stock_composition))

@property
def share(self):
return self._share

def __repr__(self):
return f"FinancialData({self._share.name})"
13 changes: 10 additions & 3 deletions cereja/utils/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import ctypes
import gc
import math
import re
import string
import time
from collections import OrderedDict, defaultdict
from importlib import import_module
import importlib
import sys
import types
import random
from typing import Any, Union, List, Tuple, Sequence, Iterable, Dict, MappingView, Optional, Callable
from typing import Any, Union, List, Tuple, Sequence, Iterable, Dict, MappingView, Optional, Callable, AnyStr
import logging
import itertools
from copy import copy
Expand Down Expand Up @@ -89,7 +91,8 @@
"has_length",
"combinations",
"combinations_sizes",
"value_from_memory"
"value_from_memory",
"str_gen"
]

logger = logging.getLogger(__name__)
Expand All @@ -112,7 +115,6 @@ def _format(self, object, *args):
super()._format(object, *args)



def is_indexable(v):
return hasattr(v, "__getitem__")

Expand Down Expand Up @@ -1469,3 +1471,8 @@ def prune_values(values: Sequence, factor=2):
if len(res) == 0:
return values[k]
return res


def str_gen(pattern: AnyStr) -> Sequence[AnyStr]:
regex = re.compile(pattern)
return regex.findall(string.printable)

0 comments on commit f5379e3

Please sign in to comment.