Skip to content

Commit

Permalink
(#16)
Browse files Browse the repository at this point in the history
* Create `db_utils` in `server_utils`
* Create `validation_utils` in `server_utils`
* Continue implement logic scraper
  • Loading branch information
kggold4 committed Apr 29, 2023
1 parent 3162818 commit bc24011
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 70 deletions.
2 changes: 1 addition & 1 deletion db_driver/db_objects/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Task:
domain: str
status: str
type: str
status_timestamp: List[Timestamp] = field(default_factory=lambda: [])
status_timestamp: List[Timestamp | dict] = field(default_factory=lambda: [])
creation_time: datetime.datetime = None
collecting_time: datetime.datetime = None # todo: check if needed

Expand Down
3 changes: 1 addition & 2 deletions db_driver/db_objects/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
@dataclass
class Timestamp:
status: str
start_time: datetime.datetime
end_time: datetime.datetime
time_changed: datetime.datetime

def __repr__(self) -> str:
string = ''
Expand Down
4 changes: 2 additions & 2 deletions db_driver/mongodb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from db_driver.utils.exceptions import InsertDataDBException, DataNotFoundDBException, DeleteDataDBException, \
UpdateDataDBException
from logger import get_current_logger, log_function
from server_utils.db_utils import get_mongodb_connection_string
from server_utils.db_utils.validation_utils import get_mongodb_connection_string


class MongoDBDriver(DBDriverInterface):
Expand Down Expand Up @@ -116,7 +116,7 @@ def delete_many(self, table_name: str, data_filter: dict) -> bool:
@log_function
def update_one(self, table_name: str, data_filter: dict, new_data: dict) -> ObjectId:
try:
self.logger.debug(f"Trying to delete one data from table: '{table_name}', db: '{self.DB_NAME}'")
self.logger.debug(f"Trying to update one data from table: '{table_name}', db: '{self.DB_NAME}'")
res = self.__db[table_name].update_one(data_filter, {"$set": new_data})
if res:
object_id = res.raw_result.get('_id')
Expand Down
4 changes: 4 additions & 0 deletions scrapers/integration/prepare_for_integration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
rf -r scapres
mkdir scapres
cp ../* scapres
cp ../../server_utils scapres
100 changes: 36 additions & 64 deletions scrapers/logic_scraper.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from datetime import datetime
from time import sleep
from typing import List
from uuid import uuid4

from pymongo.errors import ConnectionFailure

from db_driver import get_current_db_driver
from db_driver.db_objects.db_objects_utils import get_db_object_from_dict
from db_driver.db_objects.task import Task
from db_driver.utils.consts import DBConsts
from db_driver.utils.exceptions import DataNotFoundDBException, UpdateDataDBException, InsertDataDBException
from logger import get_current_logger
from logger import get_current_logger, log_function
from scrapers import websites_scrapers_factory
from scrapers.websites_scrapers.utils.consts import MainConsts
from server_utils.db_utils.task_utils import TaskUtils


class LogicScaper:
Expand All @@ -21,82 +18,57 @@ class LogicScaper:
def __init__(self):
self.logger = get_current_logger()
self._db = get_current_db_driver()
self.task_utils = TaskUtils()

def _get_task_by_status(self, status: str):
try:
task: dict = self._db.get_one(table_name=DBConsts.TASKS_TABLE_NAME, data_filter={"status": status})
task_object: Task = get_db_object_from_dict(task, Task)
return task_object
except DataNotFoundDBException:
return None

def _get_new_task(self) -> Task:
for status in ["pending", "failed"]:
task = self._get_task_by_status(status=status)
if task:
return task

def _update_task_status(self, task_id: str, status: str):
try:
data_filter = {"task_id": task_id}
new_data = {"status": status}
self._db.update_one(table_name=DBConsts.TASKS_TABLE_NAME, data_filter=data_filter, new_data=new_data)
except UpdateDataDBException as e:
desc = f"Error updating task as `running`"
self.logger.error(desc)
raise e

@log_function
def _filter_only_not_exits_articles(self, urls: List[str]) -> List[str]:
data_filter = {"url": {"$in": urls}}
exists_articles = self._db.get_many(table_name=DBConsts.ARTICLE_TABLE_NAME, data_filter=data_filter)
exists_articles_urls = {exists_article.get("url") for exists_article in exists_articles}
new_articles = list(set(urls).difference(exists_articles_urls))
return new_articles

def _create_new_task(self, url: str, domain: str):
for trie in range(MainConsts.TIMES_TRY_CREATE_TASK):
@log_function
def _create_collecting_article_tasks(self, urls: List[str], domain: str):
for url in urls:
try:
task_data = {
"task_id": str(uuid4()),
"url": url,
"domain": domain,
"status": "pending",
"type": MainConsts.COLLECT_ARTICLE,
"creation_time": datetime.now()
}
new_task: dict = Task(**task_data).convert_to_dict()
inserted_id = self._db.insert_one(table_name=DBConsts.TASKS_TABLE_NAME, data=new_task)
self.logger.info(f"Created new task inserted_id: {inserted_id}")
return
self.task_utils.create_new_task(url=url, domain=domain, task_type=MainConsts.COLLECT_ARTICLE)
except Exception as e:
self.logger.warning(f"Error create new task NO. {trie}/{MainConsts.TIMES_TRY_CREATE_TASK} - {str(e)}")
continue
desc = f"Error creating new task into db after {MainConsts.TIMES_TRY_CREATE_TASK} tries"
raise InsertDataDBException(desc)
desc = f"Error creating new task with type: {MainConsts.COLLECT_ARTICLE} - {str(e)}"
self.logger.error(desc)

def _handle_task(self, task: Task):
if task.type == MainConsts.COLLECT_URLS:
website_scraper = websites_scrapers_factory(scraper_name=task.domain)
urls = website_scraper.get_new_article_urls_from_home_page()
urls = self._filter_only_not_exits_articles(urls=urls)
for url in urls:
try:
self._create_new_task(url=url, domain=task.domain)
except Exception as e:
desc = f"Error creating new task with type: {MainConsts.COLLECT_ARTICLE} - {str(e)}"
self.logger.error(desc)
elif task.type == MainConsts.COLLECT_ARTICLE:
pass
@log_function
def _handle_task(self, task: Task) -> bool:
try:
if task.type == MainConsts.COLLECT_URLS:
website_scraper = websites_scrapers_factory(scraper_name=task.domain)
urls = website_scraper.get_new_article_urls_from_home_page()
urls = self._filter_only_not_exits_articles(urls=urls)
self._create_collecting_article_tasks(urls=urls, domain=task.domain)
self.logger.info(f"Done handle task of type: `{task.type}`")
elif task.type == MainConsts.COLLECT_ARTICLE:
pass
except Exception as e:
desc = f"Failed run task task_id: `{task.task_id}`, type: `{task.type}` - {str(e)}"
self.logger.error(desc)
return False
return True

@log_function
def run(self):
while True:
try:
task = self._get_new_task()
task = self.task_utils.get_new_task()
if task:
self._update_task_status(task_id=task.task_id, status="running")
self._handle_task(task=task)
self.logger = get_current_logger(task_id=task.task_id, task_type=task.type)
self.task_utils.update_task_status(task=task, status="running")
is_task_succeeded = self._handle_task(task=task)
if is_task_succeeded:
self.task_utils.update_task_status(task=task, status="succeeded")
else:
self.task_utils.update_task_status(task=task, status="failed")
else:
self.logger.debug(f"Couldn't find task, sleeping for {self.SLEEPING_TIME / 60} minutes")
self.logger.warning(f"Couldn't find task, sleeping for {self.SLEEPING_TIME / 60} minutes")
sleep(self.SLEEPING_TIME)
except ConnectionFailure as e:
self.logger.warning(f"Error connecting to db, initialize the db again - {str(e)}")
Expand Down
1 change: 0 additions & 1 deletion scrapers/websites_scrapers/utils/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ class ScraperConsts:
class MainConsts:
COLLECT_URLS = "collect_urls"
COLLECT_ARTICLE = "collect_article"
TIMES_TRY_CREATE_TASK = int(os.getenv(key="TIMES_TRY_CREATE_TASK", default=3))
TIMES_TRY_GET_HOMEPAGE = int(os.getenv(key="TIMES_TO_TRY_GET_HOMEPAGE", default=3))
Empty file.
14 changes: 14 additions & 0 deletions server_utils/db_utils/db_utils_consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os


class TaskConsts:
TIMES_TRY_CREATE_TASK = int(os.getenv(key="TIMES_TRY_CREATE_TASK", default=3))
TASKS_TABLE_NAME = "tasks"


class ArticleConsts:
pass


class ClusterConsts:
pass
68 changes: 68 additions & 0 deletions server_utils/db_utils/task_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from datetime import datetime
from uuid import uuid4

from db_driver import get_current_db_driver
from db_driver.db_objects.db_objects_utils import get_db_object_from_dict
from db_driver.db_objects.task import Task
from db_driver.db_objects.timestamp import Timestamp
from db_driver.utils.exceptions import InsertDataDBException, UpdateDataDBException, DataNotFoundDBException
from logger import get_current_logger, log_function
from server_utils.db_utils.db_utils_consts import TaskConsts


class TaskUtils:
def __init__(self):
self.logger = get_current_logger()
self._db = get_current_db_driver()

@log_function
def create_new_task(self, url: str, domain: str, task_type: str):
for trie in range(TaskConsts.TIMES_TRY_CREATE_TASK):
try:
task_data = {
"task_id": str(uuid4()),
"url": url,
"domain": domain,
"status": "pending",
"type": task_type,
"status_timestamp": [Timestamp(status="pending", time_changed=datetime.now())],
"creation_time": datetime.now()
}
new_task: dict = Task(**task_data).convert_to_dict()
inserted_id = self._db.insert_one(table_name=TaskConsts.TASKS_TABLE_NAME, data=new_task)
self.logger.info(f"Created new task inserted_id: {inserted_id}")
return
except Exception as e:
self.logger.warning(f"Error create new task NO. {trie}/{TaskConsts.TIMES_TRY_CREATE_TASK} - {str(e)}")
continue
desc = f"Error creating new task into db after {TaskConsts.TIMES_TRY_CREATE_TASK} tries"
raise InsertDataDBException(desc)

@log_function
def update_task_status(self, task: Task, status: str):
try:
data_filter = {"task_id": task.task_id}
new_timestamp = Timestamp(status=status, time_changed=datetime.now())
task.status_timestamp.append(new_timestamp.convert_to_dict())
new_data = {"status": status, "status_timestamp": task.status_timestamp}
self._db.update_one(table_name=TaskConsts.TASKS_TABLE_NAME, data_filter=data_filter, new_data=new_data)
except UpdateDataDBException as e:
desc = f"Error updating task task_id: `{task.task_id}` as status: `{status}`"
self.logger.error(desc)
raise e

@log_function
def _get_task_by_status(self, status: str):
try:
task: dict = self._db.get_one(table_name=TaskConsts.TASKS_TABLE_NAME, data_filter={"status": status})
task_object: Task = get_db_object_from_dict(task, Task)
return task_object
except DataNotFoundDBException:
return None

@log_function
def get_new_task(self) -> Task:
for status in ["pending", "failed"]:
task = self._get_task_by_status(status=status)
if task:
return task
File renamed without changes.

0 comments on commit bc24011

Please sign in to comment.