diff --git a/db_driver/db_objects/task.py b/db_driver/db_objects/task.py index 84ac8a3..d560ad6 100644 --- a/db_driver/db_objects/task.py +++ b/db_driver/db_objects/task.py @@ -12,7 +12,7 @@ class Task: domain: str status: str type: str - status_timestamp: List[Timestamp] = field(default_factory=lambda: []) + status_timestamp: List[Timestamp | dict] = field(default_factory=lambda: []) creation_time: datetime.datetime = None collecting_time: datetime.datetime = None # todo: check if needed diff --git a/db_driver/db_objects/timestamp.py b/db_driver/db_objects/timestamp.py index 6b93c59..6b4c3cb 100644 --- a/db_driver/db_objects/timestamp.py +++ b/db_driver/db_objects/timestamp.py @@ -6,8 +6,7 @@ @dataclass class Timestamp: status: str - start_time: datetime.datetime - end_time: datetime.datetime + time_changed: datetime.datetime def __repr__(self) -> str: string = '' diff --git a/db_driver/mongodb_driver.py b/db_driver/mongodb_driver.py index b0d1deb..eec219a 100644 --- a/db_driver/mongodb_driver.py +++ b/db_driver/mongodb_driver.py @@ -8,7 +8,7 @@ from db_driver.utils.exceptions import InsertDataDBException, DataNotFoundDBException, DeleteDataDBException, \ UpdateDataDBException from logger import get_current_logger, log_function -from server_utils.db_utils import get_mongodb_connection_string +from server_utils.db_utils.validation_utils import get_mongodb_connection_string class MongoDBDriver(DBDriverInterface): @@ -116,7 +116,7 @@ def delete_many(self, table_name: str, data_filter: dict) -> bool: @log_function def update_one(self, table_name: str, data_filter: dict, new_data: dict) -> ObjectId: try: - self.logger.debug(f"Trying to delete one data from table: '{table_name}', db: '{self.DB_NAME}'") + self.logger.debug(f"Trying to update one data from table: '{table_name}', db: '{self.DB_NAME}'") res = self.__db[table_name].update_one(data_filter, {"$set": new_data}) if res: object_id = res.raw_result.get('_id') diff --git a/scrapers/integration/prepare_for_integration.sh b/scrapers/integration/prepare_for_integration.sh new file mode 100644 index 0000000..b9e9cb1 --- /dev/null +++ b/scrapers/integration/prepare_for_integration.sh @@ -0,0 +1,4 @@ +rf -r scapres +mkdir scapres +cp ../* scapres +cp ../../server_utils scapres \ No newline at end of file diff --git a/scrapers/logic_scraper.py b/scrapers/logic_scraper.py index 106b4ea..e391eb9 100644 --- a/scrapers/logic_scraper.py +++ b/scrapers/logic_scraper.py @@ -1,18 +1,15 @@ -from datetime import datetime from time import sleep from typing import List -from uuid import uuid4 from pymongo.errors import ConnectionFailure from db_driver import get_current_db_driver -from db_driver.db_objects.db_objects_utils import get_db_object_from_dict from db_driver.db_objects.task import Task from db_driver.utils.consts import DBConsts -from db_driver.utils.exceptions import DataNotFoundDBException, UpdateDataDBException, InsertDataDBException -from logger import get_current_logger +from logger import get_current_logger, log_function from scrapers import websites_scrapers_factory from scrapers.websites_scrapers.utils.consts import MainConsts +from server_utils.db_utils.task_utils import TaskUtils class LogicScaper: @@ -21,31 +18,9 @@ class LogicScaper: def __init__(self): self.logger = get_current_logger() self._db = get_current_db_driver() + self.task_utils = TaskUtils() - def _get_task_by_status(self, status: str): - try: - task: dict = self._db.get_one(table_name=DBConsts.TASKS_TABLE_NAME, data_filter={"status": status}) - task_object: Task = get_db_object_from_dict(task, Task) - return task_object - except DataNotFoundDBException: - return None - - def _get_new_task(self) -> Task: - for status in ["pending", "failed"]: - task = self._get_task_by_status(status=status) - if task: - return task - - def _update_task_status(self, task_id: str, status: str): - try: - data_filter = {"task_id": task_id} - new_data = {"status": status} - self._db.update_one(table_name=DBConsts.TASKS_TABLE_NAME, data_filter=data_filter, new_data=new_data) - except UpdateDataDBException as e: - desc = f"Error updating task as `running`" - self.logger.error(desc) - raise e - + @log_function def _filter_only_not_exits_articles(self, urls: List[str]) -> List[str]: data_filter = {"url": {"$in": urls}} exists_articles = self._db.get_many(table_name=DBConsts.ARTICLE_TABLE_NAME, data_filter=data_filter) @@ -53,50 +28,47 @@ def _filter_only_not_exits_articles(self, urls: List[str]) -> List[str]: new_articles = list(set(urls).difference(exists_articles_urls)) return new_articles - def _create_new_task(self, url: str, domain: str): - for trie in range(MainConsts.TIMES_TRY_CREATE_TASK): + @log_function + def _create_collecting_article_tasks(self, urls: List[str], domain: str): + for url in urls: try: - task_data = { - "task_id": str(uuid4()), - "url": url, - "domain": domain, - "status": "pending", - "type": MainConsts.COLLECT_ARTICLE, - "creation_time": datetime.now() - } - new_task: dict = Task(**task_data).convert_to_dict() - inserted_id = self._db.insert_one(table_name=DBConsts.TASKS_TABLE_NAME, data=new_task) - self.logger.info(f"Created new task inserted_id: {inserted_id}") - return + self.task_utils.create_new_task(url=url, domain=domain, task_type=MainConsts.COLLECT_ARTICLE) except Exception as e: - self.logger.warning(f"Error create new task NO. {trie}/{MainConsts.TIMES_TRY_CREATE_TASK} - {str(e)}") - continue - desc = f"Error creating new task into db after {MainConsts.TIMES_TRY_CREATE_TASK} tries" - raise InsertDataDBException(desc) + desc = f"Error creating new task with type: {MainConsts.COLLECT_ARTICLE} - {str(e)}" + self.logger.error(desc) - def _handle_task(self, task: Task): - if task.type == MainConsts.COLLECT_URLS: - website_scraper = websites_scrapers_factory(scraper_name=task.domain) - urls = website_scraper.get_new_article_urls_from_home_page() - urls = self._filter_only_not_exits_articles(urls=urls) - for url in urls: - try: - self._create_new_task(url=url, domain=task.domain) - except Exception as e: - desc = f"Error creating new task with type: {MainConsts.COLLECT_ARTICLE} - {str(e)}" - self.logger.error(desc) - elif task.type == MainConsts.COLLECT_ARTICLE: - pass + @log_function + def _handle_task(self, task: Task) -> bool: + try: + if task.type == MainConsts.COLLECT_URLS: + website_scraper = websites_scrapers_factory(scraper_name=task.domain) + urls = website_scraper.get_new_article_urls_from_home_page() + urls = self._filter_only_not_exits_articles(urls=urls) + self._create_collecting_article_tasks(urls=urls, domain=task.domain) + self.logger.info(f"Done handle task of type: `{task.type}`") + elif task.type == MainConsts.COLLECT_ARTICLE: + pass + except Exception as e: + desc = f"Failed run task task_id: `{task.task_id}`, type: `{task.type}` - {str(e)}" + self.logger.error(desc) + return False + return True + @log_function def run(self): while True: try: - task = self._get_new_task() + task = self.task_utils.get_new_task() if task: - self._update_task_status(task_id=task.task_id, status="running") - self._handle_task(task=task) + self.logger = get_current_logger(task_id=task.task_id, task_type=task.type) + self.task_utils.update_task_status(task=task, status="running") + is_task_succeeded = self._handle_task(task=task) + if is_task_succeeded: + self.task_utils.update_task_status(task=task, status="succeeded") + else: + self.task_utils.update_task_status(task=task, status="failed") else: - self.logger.debug(f"Couldn't find task, sleeping for {self.SLEEPING_TIME / 60} minutes") + self.logger.warning(f"Couldn't find task, sleeping for {self.SLEEPING_TIME / 60} minutes") sleep(self.SLEEPING_TIME) except ConnectionFailure as e: self.logger.warning(f"Error connecting to db, initialize the db again - {str(e)}") diff --git a/scrapers/websites_scrapers/utils/consts.py b/scrapers/websites_scrapers/utils/consts.py index aaacfe8..4f91375 100644 --- a/scrapers/websites_scrapers/utils/consts.py +++ b/scrapers/websites_scrapers/utils/consts.py @@ -8,5 +8,4 @@ class ScraperConsts: class MainConsts: COLLECT_URLS = "collect_urls" COLLECT_ARTICLE = "collect_article" - TIMES_TRY_CREATE_TASK = int(os.getenv(key="TIMES_TRY_CREATE_TASK", default=3)) TIMES_TRY_GET_HOMEPAGE = int(os.getenv(key="TIMES_TO_TRY_GET_HOMEPAGE", default=3)) diff --git a/server_utils/db_utils/__init__.py b/server_utils/db_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server_utils/db_utils/db_utils_consts.py b/server_utils/db_utils/db_utils_consts.py new file mode 100644 index 0000000..8f74d2b --- /dev/null +++ b/server_utils/db_utils/db_utils_consts.py @@ -0,0 +1,14 @@ +import os + + +class TaskConsts: + TIMES_TRY_CREATE_TASK = int(os.getenv(key="TIMES_TRY_CREATE_TASK", default=3)) + TASKS_TABLE_NAME = "tasks" + + +class ArticleConsts: + pass + + +class ClusterConsts: + pass diff --git a/server_utils/db_utils/task_utils.py b/server_utils/db_utils/task_utils.py new file mode 100644 index 0000000..15dc6c3 --- /dev/null +++ b/server_utils/db_utils/task_utils.py @@ -0,0 +1,68 @@ +from datetime import datetime +from uuid import uuid4 + +from db_driver import get_current_db_driver +from db_driver.db_objects.db_objects_utils import get_db_object_from_dict +from db_driver.db_objects.task import Task +from db_driver.db_objects.timestamp import Timestamp +from db_driver.utils.exceptions import InsertDataDBException, UpdateDataDBException, DataNotFoundDBException +from logger import get_current_logger, log_function +from server_utils.db_utils.db_utils_consts import TaskConsts + + +class TaskUtils: + def __init__(self): + self.logger = get_current_logger() + self._db = get_current_db_driver() + + @log_function + def create_new_task(self, url: str, domain: str, task_type: str): + for trie in range(TaskConsts.TIMES_TRY_CREATE_TASK): + try: + task_data = { + "task_id": str(uuid4()), + "url": url, + "domain": domain, + "status": "pending", + "type": task_type, + "status_timestamp": [Timestamp(status="pending", time_changed=datetime.now())], + "creation_time": datetime.now() + } + new_task: dict = Task(**task_data).convert_to_dict() + inserted_id = self._db.insert_one(table_name=TaskConsts.TASKS_TABLE_NAME, data=new_task) + self.logger.info(f"Created new task inserted_id: {inserted_id}") + return + except Exception as e: + self.logger.warning(f"Error create new task NO. {trie}/{TaskConsts.TIMES_TRY_CREATE_TASK} - {str(e)}") + continue + desc = f"Error creating new task into db after {TaskConsts.TIMES_TRY_CREATE_TASK} tries" + raise InsertDataDBException(desc) + + @log_function + def update_task_status(self, task: Task, status: str): + try: + data_filter = {"task_id": task.task_id} + new_timestamp = Timestamp(status=status, time_changed=datetime.now()) + task.status_timestamp.append(new_timestamp.convert_to_dict()) + new_data = {"status": status, "status_timestamp": task.status_timestamp} + self._db.update_one(table_name=TaskConsts.TASKS_TABLE_NAME, data_filter=data_filter, new_data=new_data) + except UpdateDataDBException as e: + desc = f"Error updating task task_id: `{task.task_id}` as status: `{status}`" + self.logger.error(desc) + raise e + + @log_function + def _get_task_by_status(self, status: str): + try: + task: dict = self._db.get_one(table_name=TaskConsts.TASKS_TABLE_NAME, data_filter={"status": status}) + task_object: Task = get_db_object_from_dict(task, Task) + return task_object + except DataNotFoundDBException: + return None + + @log_function + def get_new_task(self) -> Task: + for status in ["pending", "failed"]: + task = self._get_task_by_status(status=status) + if task: + return task diff --git a/server_utils/db_utils.py b/server_utils/db_utils/validation_utils.py similarity index 100% rename from server_utils/db_utils.py rename to server_utils/db_utils/validation_utils.py