Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial implementation of --follow-refs flag #64

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 118 additions & 46 deletions lightbeam/fetch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
import math
import json
import asyncio
Expand All @@ -10,6 +11,11 @@ class Fetcher:
def __init__(self, lightbeam=None):
self.lightbeam = lightbeam
self.logger = self.lightbeam.logger
if self.lightbeam.query and type(self.lightbeam.query) == str:
try:
self.lightbeam.query = json.loads(self.lightbeam.query)
except Exception as e:
self.logger.error(f"A query was provided, but could not be parsed. Please give a JSON object as a string.")

def fetch(self):
self.lightbeam.results = []
Expand All @@ -23,7 +29,7 @@ async def get_records(self, do_write=True, log_status_counts=True):
tasks = []
counter = 0
limit = self.lightbeam.config["fetch"]["page_size"]
params = json.loads(self.lightbeam.query)
params = self.lightbeam.query
for endpoint in self.lightbeam.endpoints:
# figure out how many (paginated) requests we must make
tasks.append(asyncio.create_task(self.lightbeam.counter.get_record_count(endpoint, params)))
Expand All @@ -45,64 +51,130 @@ async def get_records(self, do_write=True, log_status_counts=True):
file_handle = open(os.path.join(self.lightbeam.config["data_dir"], endpoint + ".jsonl"), "w")
for p in range(0, num_pages):
counter += 1
tasks.append(asyncio.create_task(self.get_endpoint_records(endpoint, limit, p*limit, file_handle)))
tasks.append(asyncio.create_task(self.get_endpoint_records(endpoint, limit, p*limit, file_handle, depth=int(self.lightbeam.config.get("fetch",{}).get("follow_refs", 999999)))))

if len(tasks)>0:
await self.lightbeam.do_tasks(tasks, counter, log_status_counts=log_status_counts)

# Fetches records for a specific endpoint
async def get_endpoint_records(self, endpoint, limit, offset, file_handle=None):
async def get_endpoint_records(self, endpoint, limit, offset, file_handle=None, depth=999999):
curr_token_version = int(str(self.lightbeam.token_version))
while True: # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop
refs = {}
stop = False
while not stop:
try:
# construct the URL query params:
params = json.loads(self.lightbeam.query)
params.update({"limit": str(limit), "offset": str(offset)})

# send GET request
async with self.lightbeam.api.client.get(
util.url_join(self.lightbeam.api.config["data_url"], self.lightbeam.config["namespace"], endpoint),
params=urlencode(params),
ssl=self.lightbeam.config["connection"]["verify_ssl"],
headers=self.lightbeam.api.headers
) as response:
body = await response.text()
status = str(response.status)
if status=='401':
# this could be broken out to a separate function call,
# but not doing so should help keep the critical section small
if self.lightbeam.token_version == curr_token_version:
self.lightbeam.lock.acquire()
self.lightbeam.api.update_oauth()
self.lightbeam.lock.release()
else:
await asyncio.sleep(1)
curr_token_version = int(str(self.lightbeam.token_version))
elif status not in ['200', '201']:
self.logger.warn(f"Unable to load records for {endpoint}... {status} API response.")
else:
if response.content_type == "application/json":
values = json.loads(body)
if type(values) != list:
self.logger.warn(f"Unable to load records for {endpoint}... API JSON response was not a list of records.")
# this section deals with the fact that the query might be
# singular (if provided via CLI) or a list (from `--follow-refs`)
params = self.lightbeam.query
if type(params) == dict:
params_list = [params]
else: params_list = params

for params in params_list:
# construct the URL query params:
params.update({"limit": str(limit), "offset": str(offset)})

# send GET request
async with self.lightbeam.api.client.get(
util.url_join(self.lightbeam.api.config["data_url"], self.lightbeam.config["namespace"], endpoint),
params=params,
ssl=self.lightbeam.config["connection"]["verify_ssl"],
headers=self.lightbeam.api.headers
) as response:
body = await response.text()
status = str(response.status)
if status=='401':
# this could be broken out to a separate function call,
# but not doing so should help keep the critical section small
if self.lightbeam.token_version == curr_token_version:
self.lightbeam.lock.acquire()
self.lightbeam.api.update_oauth()
self.lightbeam.lock.release()
else:
payload_keys = list(values[0].keys())
final_keys = util.apply_selections(payload_keys, self.lightbeam.keep_keys, self.lightbeam.drop_keys)
do_key_filtering = len(payload_keys) != len(final_keys)

for v in values:
if do_key_filtering: row = {k: v.get(k, None) for k in final_keys} #v.get() to account for missing keys
else: row = v
if file_handle: file_handle.write(json.dumps(row)+"\n")
else: self.lightbeam.results.append(row)
self.lightbeam.increment_status_counts(status)
break
await asyncio.sleep(1)
curr_token_version = int(str(self.lightbeam.token_version))
elif status not in ['200', '201']:
self.logger.warn(f"Unable to load records for {endpoint}... {status} API response.")
else:
self.logger.warn(f"Unable to load records for {endpoint}... API response was not JSON.")
if response.content_type == "application/json":
values = json.loads(body)
if type(values) != list:
self.logger.warn(f"Unable to load records for {endpoint}... API JSON response was not a list of records.")
else:
if len(list(values))>0:
payload_keys = list(values[0].keys())
final_keys = util.apply_selections(payload_keys, self.lightbeam.keep_keys, self.lightbeam.drop_keys)
do_key_filtering = len(payload_keys) != len(final_keys)

# follow-refs: set up the data structure where we store the refs to fetch next
ref_keys = [k for k in payload_keys if k.endswith("Reference") and util.pluralize_endpoint(k.replace("Reference","")) not in self.lightbeam.endpoints]
refs = {k: [] for k in ref_keys}

for v in values:
if do_key_filtering: row = {k: v.get(k, None) for k in final_keys} #v.get() to account for missing keys
else: row = v
# follow-refs:
for k in ref_keys:
if v.get(k, False):
q = v[k]
if "link" in q.keys(): del q["link"] # remove "link" element
if q not in refs[k]: # add ref payload to data structure, if not already present
refs[k].append(q)
# back to row processing: write to JSONL file
if file_handle: file_handle.write(json.dumps(row)+"\n")
else: self.lightbeam.results.append(row)
self.lightbeam.increment_status_counts(status)
stop = True
else:
self.logger.warn(f"Unable to load records for {endpoint}... API response was not JSON.")

except RuntimeError as e:
await asyncio.sleep(1)
except Exception as e:
self.logger.critical(f"Unable to load records for {endpoint} from API... terminating. Check API connectivity.")

# follow-refs:
save_query = self.lightbeam.query
for k in refs.keys():
if depth>0 and len(refs[k])>0:
ref_endpoint = util.pluralize_endpoint(k.replace("Reference",""))
# this deals with the fact that an educationOrganizationReference may be to a school, LEA, etc.:
endpoints_to_check = self.lightbeam.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(ref_endpoint, [ref_endpoint])
for ref_endpoint in endpoints_to_check:
# this renames (for example) course.educationOrganizationReference: { educationOrganizationId: 9999 }
# to { localEducationAgencyId: 9999 }, { stateEducationAgencyId: 9999 }, etc.
for ref_k in self.lightbeam.EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING.keys():
new_key = self.lightbeam.EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING[ref_k].get(endpoint, ref_k)
refs[k] = [{new_key if k == 2 else k:v for k,v in row.items()} for row in refs[k]]
# some refs have a descriptive label prepended to the endpoint name, for example:
# * `studentSchoolAssociations.graduationSchoolYearTypeReference` is a `schoolYearType` reference
# * `assessments.contentStandard.mandatingEducationOrganizationReference` is a `educationOrganization` reference
# * `courseTranscripts.responsibleTeacherStaffReference` is a `staffs` reference
# (etc.) This while-loop repeatedly removes the front word from a camel-cased endpoint name
# and checkes whether the result is a valid endpoint name. If none is found, a warning is printed.
pieces = re.split('(?<=[a-z])(?=[A-Z])', ref_endpoint)
while len(pieces)>0 and ref_endpoint not in self.lightbeam.all_endpoints:
ref_endpoint = "".join(pieces[1:])
if len(ref_endpoint)>0: ref_endpoint = ref_endpoint[0].lower() + ref_endpoint[1:]
else:
pieces = []
break
pieces = re.split('(?<=[a-z])(?=[A-Z])', ref_endpoint)
if len(pieces)==0:
self.logger.warn(f"Could not find an endpoint corresponding to {k}.")
continue
# set up the fetch for this ref endpoint, and all its payloads
self.lightbeam.query = refs[k]
# print(ref_endpoint, refs[k][0])
self.lightbeam.results = []
await self.get_endpoint_records(ref_endpoint, limit=limit, offset=0, file_handle=None, depth=depth-1)
# if there were results, write them to a JSONL file
if len(self.lightbeam.results)>0:
with open(os.path.join(self.lightbeam.config["data_dir"], ref_endpoint + ".jsonl"), "w") as ref_file_handle:
for result in self.lightbeam.results:
ref_file_handle.write(json.dumps(result)+"\n")
break # no need to process other `endpoints_to_check`

self.lightbeam.query = save_query

self.lightbeam.num_finished += 1
11 changes: 11 additions & 0 deletions lightbeam/lightbeam.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ class Lightbeam:
MAX_TASK_QUEUE_SIZE = 2000
MAX_STATUS_REASONS_TO_DISPLAY = 10
DATA_FILE_EXTENSIONS = ['json', 'jsonl', 'ndjson']

EDFI_GENERICS_TO_RESOURCES_MAPPING = {
"educationOrganizations": ["schools", "localEducationAgencies", "stateEducationAgencies"],
}
EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING = {
"educationOrganizationId": {
"localEducationAgencies": "localEducationAgencyId",
"stateEducationAgencies": "stateEducationAgencyId",
"schools": "schoolId",
},
}

def __init__(self, config_file, logger=None, selector="*", exclude="", keep_keys="*", drop_keys="", query="{}", params="", wipe=False, force=False, older_than="", newer_than="", resend_status_codes="", results_file=""):
self.config_file = config_file
Expand Down
19 changes: 4 additions & 15 deletions lightbeam/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@ class Validator:
MAX_VALIDATE_TASK_QUEUE_SIZE = 100
DEFAULT_VALIDATION_METHODS = ["schema", "descriptors", "uniqueness"]

EDFI_GENERICS_TO_RESOURCES_MAPPING = {
"educationOrganizations": ["localEducationAgencies", "stateEducationAgencies", "schools"],
}
EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING = {
"educationOrganizationId": {
"localEducationAgencies": "localEducationAgencyId",
"stateEducationAgencies": "stateEducationAgencyId",
"schools": "schoolId",
},
}

def __init__(self, lightbeam=None):
self.lightbeam = lightbeam
self.logger = self.lightbeam.logger
Expand Down Expand Up @@ -80,7 +69,7 @@ def build_local_reference_cache(self, endpoint):
references_structure = self.rebalance_local_references_structure(references_structure)
# more memory-efficient to load local data and populate cache for one endpoint at a time:
for original_endpoint in references_structure.keys():
endpoints_to_check = self.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(original_endpoint, [original_endpoint])
endpoints_to_check = self.lightbeam.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(original_endpoint, [original_endpoint])
for endpoint in endpoints_to_check:
if endpoint in self.local_reference_cache.keys():
# already loaded (when validating another endpoint); no need to reload
Expand Down Expand Up @@ -126,7 +115,7 @@ def load_references_data(self, endpoint, references_structure):
self.logger.warning(f"... (ignoring invalid JSON payload at {line_counter} of {file_name})")
ref_payload = {}
for key in references_structure[endpoint]:
key = self.EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING.get(key, {}).get(endpoint, key)
key = self.lightbeam.EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING.get(key, {}).get(endpoint, key)
tmpdata = payload
for subkey in key.split("."):
tmpdata = tmpdata[subkey]
Expand All @@ -148,7 +137,7 @@ def load_references_structure(self, swagger, definition):
original_endpoint = util.pluralize_endpoint(k.replace("Reference", ""))

# this deals with the fact that an educationOrganizationReference may be to a school, LEA, etc.:
endpoints_to_check = self.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(original_endpoint, [original_endpoint])
endpoints_to_check = self.lightbeam.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(original_endpoint, [original_endpoint])

for endpoint in endpoints_to_check:
ref_definition = schema["properties"][k]["$ref"]
Expand Down Expand Up @@ -374,7 +363,7 @@ def has_invalid_references(self, payload, path=""):
original_endpoint = util.pluralize_endpoint(k.replace("Reference",""))

# this deals with the fact that an educationOrganizationReference may be to a school, LEA, etc.:
endpoints_to_check = self.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(original_endpoint, [original_endpoint])
endpoints_to_check = self.lightbeam.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(original_endpoint, [original_endpoint])
for endpoint in endpoints_to_check:
# check if it's a local reference:
if endpoint not in self.local_reference_cache.keys(): break
Expand Down