Skip to content

Commit

Permalink
Pagination on esearch request (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
arcones authored Nov 18, 2023
1 parent 0a9bd0a commit d1d2624
Show file tree
Hide file tree
Showing 14 changed files with 1,202 additions and 417 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
steps:
- run: curl -sSL https://install.python-poetry.org | python3 -
- name: Check out repository code
uses: actions/checkout@v3
uses: actions/checkout@v4
- run: poetry env use 3.10.12 && poetry env info
- run: poetry install
- run: cd tests && poetry run pytest
46 changes: 6 additions & 40 deletions kilombo/service/external/ncbi/ncbi.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,34 @@
import asyncio
import json
import logging
import time

import aiohttp
import requests

from kilombo.service.external.ncbi.ncbiextractor import NCBIExtractor
from kilombo.service.external.ncbi.ncbi_extractor import NCBIExtractor
from kilombo.service.external.ncbi.ncbi_request import NCBIRequest


class NCBI:
def __init__(self, study_hierarchy):
self.NCBI_API_KEYS = ["ed06bd0f3c27a605d87e51e94eecab115908", "b81884ffa1519f17cae15f6bd21ac8070108"]
self.NCBI_EUTILS_BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
self.NCBI_ESEARCH_GDS_URL = f"{self.NCBI_EUTILS_BASE_URL}/esearch.fcgi?db=gds&retmode=json&&retmax=10000"
self.NCBI_ESUMMARY_GDS_URL = f"{self.NCBI_EUTILS_BASE_URL}/esummary.fcgi?db=gds&retmode=json&"
self.NCBI_RETRY_MAX = 100
self.NCBI_STUDY_ID_MIN = 200000000
self.NCBI_STUDY_ID_MAX = 299999999
self.study_hierarchy = study_hierarchy
self.ncbi_request = NCBIRequest()

def get_study_list(self, search_keyword: str):
logging.info(f"Get study list for keyword {search_keyword}...")
ncbi_study_list_http_response = json.loads(self._fetch_study_list(search_keyword).text)["esearchresult"]
idlist = self.ncbi_request.esearch_study_list(search_keyword)
logging.info(f"Done get study list for keyword {search_keyword}")
items = ncbi_study_list_http_response["idlist"]
for item in items:
for item in idlist:
if self._is_study(int(item)):
self.study_hierarchy.add_pending_study(item)

def _is_study(self, item: int) -> bool:
return self.NCBI_STUDY_ID_MIN <= item <= self.NCBI_STUDY_ID_MAX

def _fetch_study_list(self, keyword: str):
url = f"{self.NCBI_ESEARCH_GDS_URL}&term={keyword}"
logging.debug(f"HTTP GET started ==> {url}")
response = requests.get(url)
logging.debug(f"HTTP GET done ==> {url}")
return response

async def get_study_summaries(self):
init = time.perf_counter()

for index, study_id in enumerate(self.study_hierarchy.pending):
self.study_hierarchy.pending[study_id] = asyncio.create_task(self._fetch_study_summaries(study_id))
self.study_hierarchy.pending[study_id] = asyncio.create_task(self.ncbi_request.esummary_study(study_id))

await asyncio.wait(self.study_hierarchy.pending.values())

Expand All @@ -54,25 +39,6 @@ async def get_study_summaries(self):

logging.info(f"Fetched details of {len(self.study_hierarchy.pending)} studies in {round(end - init, 2)} seconds")

async def _fetch_study_summaries(self, study_id: int):
logging.debug(f"Started get summary for study ==> {study_id}")
unauthenticated_url = f"{self.NCBI_ESUMMARY_GDS_URL}&id={study_id}"
retries_count = 1
while retries_count < self.NCBI_RETRY_MAX:
api_key = self.NCBI_API_KEYS[0] if retries_count % 2 == 0 else self.NCBI_API_KEYS[1]
url = unauthenticated_url + f"&api_key={api_key}"
async with aiohttp.ClientSession() as session:
logging.debug(f"HTTP GET started ==> {url}")
async with session.get(url) as response:
logging.debug(f"HTTP GET Done ==> {url}")
if response.status == 200:
logging.debug(f"Done get summary in retry #{retries_count} ==> {study_id}")
return json.loads(await response.text())
else:
retries_count += 1
logging.debug(f"Get a {response.status} from {url}, retries count incremented to {retries_count}")
raise Exception(f"Unable to fetch {study_id} in {self.NCBI_RETRY_MAX} attempts")

def link_study_and_accessions(self):
for study_id in self.study_hierarchy.pending:
ncbi_extractor = NCBIExtractor(study_id, self.study_hierarchy)
Expand Down
53 changes: 53 additions & 0 deletions kilombo/service/external/ncbi/ncbi_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json
import logging

import aiohttp
import requests


class NCBIRequest:
def __init__(self):
self.NCBI_API_KEYS = ["ed06bd0f3c27a605d87e51e94eecab115908", "b81884ffa1519f17cae15f6bd21ac8070108"]
self.NCBI_EUTILS_BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
self.NCBI_ESEARCH_GDS_URL = f"{self.NCBI_EUTILS_BASE_URL}/esearch.fcgi?db=gds&retmode=json"
self.NCBI_ESUMMARY_GDS_URL = f"{self.NCBI_EUTILS_BASE_URL}/esummary.fcgi?db=gds&retmode=json"
self.NCBI_RETRY_MAX = 100
self.BATCH_SIZE = 500

def esearch_study_list(self, keyword):
url = f"{self.NCBI_ESEARCH_GDS_URL}&term={keyword}"
logging.debug(f"HTTP GET started ==> {url}")
response = self._paginated_esearch(url)
logging.debug(f"HTTP GET done ==> {url}")
return response

def _paginated_esearch(self, url):
retstart = 0
paginated_url = url + f"&retmax={self.BATCH_SIZE}&usehistory=y"
idlist = []
while True:
response = json.loads(requests.get(f"{paginated_url}&retstart={retstart}").text)
idlist += response["esearchresult"]["idlist"]
if int(response["esearchresult"]["retmax"]) < self.BATCH_SIZE:
return idlist
else:
retstart += self.BATCH_SIZE

async def esummary_study(self, study_id: int):
logging.debug(f"Started get summary for study ==> {study_id}")
unauthenticated_url = f"{self.NCBI_ESUMMARY_GDS_URL}&id={study_id}"
retries_count = 1
while retries_count < self.NCBI_RETRY_MAX:
api_key = self.NCBI_API_KEYS[0] if retries_count % 2 == 0 else self.NCBI_API_KEYS[1]
url = unauthenticated_url + f"&api_key={api_key}"
async with aiohttp.ClientSession() as session:
logging.debug(f"HTTP GET started ==> {url}")
async with session.get(url) as response:
logging.debug(f"HTTP GET Done ==> {url}")
if response.status == 200:
logging.debug(f"Done get summary in retry #{retries_count} ==> {study_id}")
return json.loads(await response.text())
else:
retries_count += 1
logging.debug(f"Get a {response.status} from {url}, retries count incremented to {retries_count}")
raise Exception(f"Unable to fetch {study_id} in {self.NCBI_RETRY_MAX} attempts")
Loading

0 comments on commit d1d2624

Please sign in to comment.