Skip to content

Commit

Permalink
updated requirements and pbar width decrease
Browse files Browse the repository at this point in the history
+pyspark==3.5.0
+DBUtils==3.0.3
+databricks-cli==0.17.7
+databricks-connect==13.2.0
+databricks-sdk==0.4.0
+datetime-distance==0.1.3
+urllib3==1.26.18
+Unidecode==1.3.7
  • Loading branch information
drelsabrouty committed Apr 17, 2024
1 parent 7cc1187 commit 1fa4c71
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
10 changes: 9 additions & 1 deletion src/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,12 @@ beautifulsoup4==4.12.2
numpy==1.26.2
pandas==2.1.4
tqdm==4.66.1
aiohttp==3.9.3
aiohttp==3.9.3
pyspark==3.5.0
DBUtils==3.0.3
databricks-cli==0.17.7
databricks-connect==13.2.0
databricks-sdk==0.4.0
datetime-distance==0.1.3
urllib3==1.26.18
Unidecode==1.3.7
20 changes: 10 additions & 10 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def bound_fetch(self, sem, url, session, function, pbar):
pbar.update(1)
return result

async def send_async_task(self, links, function):
async def send_async_task(self, links, function, type):
tasks = []
# create instance of Semaphore
sem = asyncio.Semaphore(1000)
Expand All @@ -47,7 +47,7 @@ async def send_async_task(self, links, function):
# per each request.
async with aiohttp.ClientSession() as session:
# start timer
pbar = tqdm(total=len(links), desc='Scraping')
pbar = tqdm(total=len(links), desc='Scraping {}'.format(type), bar_format='{l_bar}{bar:10}{r_bar}{bar:-10b}')
for link in links:
# pass Semaphore and session to every GET request
task = asyncio.ensure_future(self.bound_fetch(sem, link, session, function, pbar))
Expand Down Expand Up @@ -137,7 +137,7 @@ async def extract_subsector_page_urls(self, url, session):
subsector_page_urls = [url.rsplit("/", 1)[0] + "/pg-{}/".format(i) + url.rsplit("/", 1)[1] for i in range(1, subsector_page_size+1)]
except:
subsector_page_urls = [url]
return await self.send_async_task(subsector_page_urls, self.extract_company_urls)
return await self.send_async_task(subsector_page_urls, self.extract_company_urls, "subsectors")

async def extract_company_information(self, url, session):
async with session.get(url,timeout=5000) as response:
Expand Down Expand Up @@ -250,7 +250,7 @@ async def scrape_and_export(self, type="product_updater", country=None, group=No
complete_ids = list(set([item for sublist in complete_ids for item in sublist]))
companies = [company.upper() for company in complete_ids]
links = np.array([self.base_url.format(*company.split("_")) for company in companies])
results = await self.send_async_task(links, self.extract_products_and_services)
results = await self.send_async_task(links, self.extract_products_and_services, "products and services")

elif type == "company_scraper":
input = "../output_data/ep_categorization.csv"
Expand All @@ -264,10 +264,10 @@ async def scrape_and_export(self, type="product_updater", country=None, group=No

# first scrape on sector level
country_filtered_sector_url = sector_url.rsplit("/",1)[0]+ "/{}/".format(country) + sector_url.rsplit("/",1)[1]
company_urls = (await self.send_async_task([country_filtered_sector_url], self.extract_subsector_page_urls))[0]
company_urls = (await self.send_async_task([country_filtered_sector_url], self.extract_subsector_page_urls, "company urls"))[0]
# flatten company_urls
company_urls = [item for sublist in company_urls for item in sublist]
company_info = await self.send_async_task(company_urls, self.extract_company_information)
company_info = await self.send_async_task(company_urls, self.extract_company_information, "company information")
for company in company_info:
company["group"] = group
company["sector"] = sector
Expand All @@ -279,10 +279,10 @@ async def scrape_and_export(self, type="product_updater", country=None, group=No
country_filtered_subsector_urls = [subsector_url.rsplit("/",1)[0]+ "/{}/".format(country) + subsector_url.rsplit("/",1)[1] for subsector_url in subsector_urls]
for i in range(len(subsector_urls)):
subsector = categorization[categorization["subsector_url"] == subsector_urls[i]]["subsector"].values[0]
company_urls = (await self.send_async_task([country_filtered_subsector_urls[i]], self.extract_subsector_page_urls))[0]
company_urls = (await self.send_async_task([country_filtered_subsector_urls[i]], self.extract_subsector_page_urls, "company urls"))[0]
# flatten company_urls
company_urls = [item for sublist in company_urls for item in sublist]
company_info = await self.send_async_task(company_urls, self.extract_company_information)
company_info = await self.send_async_task(company_urls, self.extract_company_information, "company information")
# for every company_info line, set the group, sector and subsector
for company in company_info:
company["group"] = group
Expand Down Expand Up @@ -319,10 +319,10 @@ async def scrape_and_export(self, type="product_updater", country=None, group=No

elif type == "categories_scraper":
out = f"../output_data/ep_categorization.csv"
cats_and_sectors = (await self.send_async_task(["https://www.europages.co.uk/bs"], self.extract_ep_categories_and_sectors))[0]
cats_and_sectors = (await self.send_async_task(["https://www.europages.co.uk/bs"], self.extract_ep_categories_and_sectors, "groups and sectors"))[0]
cats_and_sectors_links = ['https://www.europages.co.uk/bs/{0}/{1}'.format(re.sub('[^0-9a-zA-Z]+', "-", cats_and_sectors[i][0].lower()), re.sub('[^0-9a-zA-Z]+', "-", cats_and_sectors[i][1].lower()))
for i in range(len(cats_and_sectors))]
cats_sectors_and_subsectors = (await self.send_async_task(cats_and_sectors_links, self.extract_ep_subsectors))
cats_sectors_and_subsectors = (await self.send_async_task(cats_and_sectors_links, self.extract_ep_subsectors, "groups, sectors and subsectors"))
# flatten cats_sectors_and_subsectors
results = [item for sublist in cats_sectors_and_subsectors for item in sublist]

Expand Down

0 comments on commit 1fa4c71

Please sign in to comment.