Skip to content

edanalytics/edu_edfi_airflow

Repository files navigation

Overview

edu_edfi_airflow provides Airflow hooks and operators for transforming and posting data into Ed-Fi ODS using Earthmover and Lightbeam, and for transferring data from an Ed-Fi ODS to a data warehouse.

This package is part of Enable Data Union (EDU). Please visit the EDU docs site for more information.

EdFiResourceDAG

EdFiResourceDAG is an Airflow DAG that pulls a specified selection of Ed-Fi endpoints to disk, then copies them into the database. The EdFiToObjectStorage pulls JSON rows for a specified Ed-Fi resource and writes them locally before copying the files to the object storage backend. The ObjectStorageToDatabaseOperator copies the transferred files into the data warehouse.

This implementation takes advantage of Ed-Fi3 change-version logic, allowing daily ingestion of ODS deltas and incorporation of resource deletes. Although the DAG-implementation is designed for Ed-Fi3, it will work for Ed-Fi2 ODS instances, although without incremental ingestion.

EdFiResourceDAG

When initializing the DAG, pass an {endpoint: metadata} dictionary into resource_configs or descriptor_configs. The following metadata are customizable per endpoint:

  • enabled (default True)
  • fetch_deletes (default True)
  • namespace (default 'ed-fi')
  • page_size (default 500)
  • num_retries (default 5)
  • change_version_step_size (default 50000)
  • query_parameters (default {})

Note: Historically, the add_resource(), add_resource_deletes(), and add_descriptor() methods were used to populate endpoint metadata. These methods will be deprecated in a future release.

To trigger a subset-ingestion of endpoints, populate the endpoints DAG-level config. The names of the endpoints can be in any casing and will be forced to snake_case before being processed.

Set full_refresh to True in DAG-level configs to reset the change-version table for the specified (tenant, year). This forces a full drop-replace of a given endpoint's data in the database. This functionality can be paired with the endpoints DAG-level config to run a full-refresh on a subset of endpoints. Alternatively, use argument schedule_interval_full_refresh to set an automatic refresh cadence.

Class Arguments:
Argument Description
tenant_code ODS-tenant representation to be saved in the database tables
api_year ODS API-year to be saved in the database tables
edfi_conn_id Airflow connection with Ed-Fi ODS credentials and metadata defined for a specific tenant
object_storage_conn_id Airflow connection with object storage credentials
database_conn_id Airflow connection with database credentials, database, and schema defined
object_storage_type Object storage connection type (used in task naming and connection disambiguation)
database_type Database connection type (used in task naming and connection disambiguation)
pool Airflow pool to assign EdFi-to-ObjectStorage pulls for this DAG (designed to prevent the ODS from being overwhelmed)
tmp_dir Path to the temporary directory on the EC2 server where ODS data is written before their transfer to S3
multiyear Boolean flag for whether the ODS has multiple years of data within one API year (defaults to False; dispreferred implementation)
schedule_interval_full_refresh CRON schedule that automatically triggers a full-refresh, instead of the default delta run (default None)
use_change_version Boolean flag for using change versions to complete delta ingests (default True; turned off for Ed-Fi2)
get_key_changes Boolean flag for whether to build a /keyChanges task-group (only applicable in newer ODSes; default False)
get_deletes_cv_with_deltas Boolean flag for whether to use Total-Count for reverse paging deletes (default True; change only if API version does not have Total-Count)
pull_total_counts Boolean flag for whether to pull total record counts from each resource (default False; used for detecting sync drift)
run_type Specifies the run-type for the Ed-Fi task groups in the DAG (default 'default')
resource_configs An {endpoint: metadata} dictionary to populate the DAG (replaces deprecated add_resource() and add_resource_deletes() methods; default None)
descriptor_configs An {endpoint: metadata} dictionary to populate the DAG (replaces deprecated add_descriptor() method; default None)
change_version_table Name of the table to record resource change versions in the database (defaults to '_meta_change_versions')
deletes_table Name of the table to record resource deletes in the database (defaults to '_deletes')
key_changes_table Name of the table to record resource keyChanges in the database (defaults to '_key_changes')
descriptors_table Name of the table to record descriptors in the database (defaults to '_descriptors')
total_counts_table Name of the table to record resource total counts in the database (defaults to '_meta_total_counts'; only needed if pull_total_counts = True)
dbt_incrementer_var Optional Airflow variable to increment upon a finished run

Additional EACustomDAG parameters (e.g. slack_conn_id, schedule_interval, default_args, etc.) can be passed as kwargs.


Run-Type Overview:

The run_type argument determines the task-logic used when ingesting data from Ed-Fi to the object storage backend.

  • default: one task is created for each endpoint (default behavior)
  • bulk: one task is created, and each endpoint is looped-over within this task
  • dynamic: one dynamically-mapped task is created for each endpoint with new data to ingest

No matter the approach, copies from object storage to the database always occur in bulk to reduce database connection-time. Dynamic runs can only be used when use_change_version is true.

Dynamic and bulk task groups trade visibility for performance: both drastically reduce the number of tasks run each day, but more digging in the Airflow UI is required to identify failures. Here are some recommendations for when to use each run-type:

run_type DAG count data volume
default low low
bulk high low
dynamic high high

Example DAG-factory YAML instantiation:

Default instantiation of these in our edu_project_template come in the following YAML structure:

edfi_resource_dags__default_args: &default_dag_args
  default_args: *default_task_args

  schedule_interval: null
  schedule_interval_resources: null    # Optional to provide differing schedule logic between resources and descriptors.
  schedule_interval_descriptors: null  # If either is unpopulated, `schedule_interval` will be used by default.

  # Airflow Connection IDs
  edfi_conn_id: ~
  object_storage_conn_id: 'data_lake'
  object_storage_type: 's3'
  database_conn_id: 'snowflake'
  database_type: 'snowflake'

  # Variables for pulling from EdFi
  tmp_dir: '/efs/tmp_storage'
  pool: ~

  # Variables for interacting with the database
  change_version_table: '_meta_change_versions'


edfi_resource_dags:
  # note that `YEAR` must match the `schoolYear` of the ODS, which will be a 4 digit integer representing the spring year, e.g. for '2022-2023' it would be 2023.
  TENANT1:
    YEAR1:
      pool: default_pool
      edfi_conn_id: 'edfi_TENANT1_YEAR1'
      schedule_interval: null
      <<: *edfi_resource_dags__default_args
  TENANT2:
    YEAR1:
      pool: default_pool
      edfi_conn_id: 'edfi_TENANT2_YEAR1'
      schedule_interval: null
      <<: *edfi_resource_dags__default_args

Connections

Three types of connections must be defined in Airflow to complete a full run. Each connection is outlined below with required fields that must be populated for a successful run. An optional Slack connection for logging run failures has also been outlined.

Ed-Fi Connection

Each Ed-Fi connection references one API year in one ODS (unless the ODS is multiyear). These are passed into an EdFiHook to an EdFiApiClient.

Arguments:
Argument Description
Connection Id Name of connection to reference in config files and across operators
Connection Type HTTP
Host Base URL for the specific Ed-Fi ODS instantiation (extra pathing must be removed)
Login Client Key for the ODS
Password Client Secret for the ODS
Extra JSON structure with api_mode (required), api_version (default 3), and instance_code (optional) defined

If api_version or api_mode are undefined in Extra, these will be inferred from the ODS (Ed-Fi3 only).


AWS S3 Connection

This connection outlines the S3 datalake bucket to which Ed-Fi data is staged before transferring to the database.

Arguments:
Argument Description
Connection Id Name of connection to reference in config files and across operators
Connection Type http
Schema S3 bucket name used to store data transferred from the Ed-Fi ODS to the database
Login [Empty]; Must be defined if EC2 IAM role is not scoped
Password [Empty]; Must be defined if EC2 IAM role is not scoped

It is recommended to extend the EC2 server's IAM role to include S3 permissions on the datalake bucket specified in schema. If done correctly, login and password can be left blank and inferred automatically.


Snowflake Connection

This connection outlines the Snowflake account to which the object storage stage and subsequent raw tables are defined.

Arguments:
Argument Description
Connection Id Name of connection to reference in config files and across operators
Connection Type Snowflake
Host Host URL for the Snowflake instance
Schema Snowflake schema destination for raw Ed-Fi data
Login Snowflake Airflow loader role
Password Snowflake loader password
Extra JSON structure with Snowflake-specific fields (also defined below)
Account Snowflake account associated with instance (extra__snowflake__account)
AWS Access Key Access key to AWS account associated with object storage bucket (extra__snowflake__aws_access_key_id)
AWS Secret Key Secret key to AWS account associated with object storage bucket (extra__snowflake__aws_secret_access_key)
Database Snowflake database destination for raw Ed-Fi data (extra__snowflake__database)
Region (Optional) AWS region associated with object storage bucket (extra__snowflake__region)
Role Snowflake loader role (extra__snowflake__role)
Warehouse Snowflake warehouse destination for raw Ed-Fi data (extra__snowflake__warehouse)

Snowflake-specific fields can be defined either as a JSON object in Extra, or in the extra fields underneath. When editing these values, it is recommended to edit the JSON object directly. The values in the fields will update after saving the connection.


Slack Connection

This connection is used for sending Airflow task successes and failures to a dedicated Slack channel. Channel configurations must be specified on webhook-creation.

Arguments:
Argument Description
Connection Id Name of connection to reference in config files and across operators
Connection Type Slack Webhook
Host https://hooks.slack.com/services
Password The trailing ID path of the webhook URL, including the initial "/"


Hooks and Operators

EdFiHook

This Airflow Hook connects to the Ed-Fi ODS using an EdFiClient.

Arguments:
Argument Description
edfi_conn_id Name of the Airflow connection where Ed-Fi ODS connection metadata has been defined

EdFiToObjectStorageOperator

Transfers a specific resource (or resource deletes) from the Ed-Fi ODS to an object storage backend.

Arguments:
Argument Description
edfi_conn_id Name of the Airflow connection where Ed-Fi ODS connection metadata has been defined
resource Name of Ed-Fi resource/descriptor to pull from the ODS
tmp_dir Path to the temporary directory on the EC2 server where ODS data is written before their transfer to object storage
object_storage_conn_id Name of the Airflow connection where object storage connection metadata has been defined
object_storage_type Object storage connection type (used in connection disambiguation)
destination_key Destination key where Ed-Fi resource data should be written in object storage
query_parameters Custom parameters to apply to the pull (default None)
min_change_version Minimum change version to pull for the resource (default None)
max_change_version Maximum change version to pull for the resource (default None)
change_version_step_size Window size to apply during change-version stepping (default 50000)
api_namespace Namespace under which the resource is assigned (default "ed-fi")
api_get_deletes Boolean flag for whether to retrieve the resource's associated deletes (default False)
api_retries Number of attempts the pull should make before giving up (default 5)
page_size Number of rows to pull at each GET (default 100)

page_size should be tuned per ODS and resource. Contact your ODS-administrator to determine maximum-allowable page-size and recommendeded size to prevent overwhelming the ODS.

If either min_change_version or max_change_version are undefined, change version stepping does not occur.


ObjectStorageToDatabaseOperator

Copies a specific file in object storage to the specified database object. First completes a DELETE FROM statement if full_refresh is set to True in the DAG configs.

Arguments:
Argument Description
tenant_code ODS-tenant representation to be saved in the database table
api_year ODS API-year to be saved in the database table
resource Static name of Ed-Fi resource, placed in the name column of the destination table
table_name Name of the raw database table to copy into
destination_key Source key where JSON data is saved in the object storage backend
database_conn_id Name of the Airflow connection where database connection metadata has been defined
database_type Database connection type (used in connection disambiguation)
edfi_conn_id Name of the Airflow connection where Ed-Fi ODS connection metadata has been defined
ods_version Optional Ed-Fi ODS version to save as metadata if edfi_conn_id is undefined
data_model_version Optional Ed-Fi data model version to save as metadata if edfi_conn_id is undefined
full_refresh Boolean flag to run a full truncate-replace of the warehouse data for the given grain


Callables

get_newest_edfi_change_version

Retrieves the most recent change version in the ODS using EdFiClient.get_newest_change_version() Returns None if an Ed-Fi2 ODS, as change versions are unimplemented.

Arguments:
Argument Description
edfi_conn_id Name of the Airflow connection where Ed-Fi ODS connection metadata has been defined

get_previous_change_versions

Pushes and XCom of the most recent change version for each resource of a given (tenant, year) grain saved in the database change_version_table. Pushes None if Ed-Fi2 or if no records for this resource are found (signifying full-refresh).

Combining get_newest_edfi_change_version with get_previous_change_versions allows a min_change_version to max_change_version window to be defined for the pull. This allows incremental ingests of resource deltas since the last pull, drastically improving runtimes when compared to full-refreshes. Because Ed-Fi2 lacks change versions, all Ed-Fi2 pulls are full-refreshes.

Arguments:
Argument Description
tenant_code ODS-tenant representation to be saved in the database tables
api_year ODS API-year to be saved in the database tables
database_conn_id Name of the Airflow connection where database connection metadata has been defined
database_type Database connection type (used in connection disambiguation)
change_version_table Name of the table to record resource change versions in the database

update_change_versions

Updates the change version table in the database with the most recent change version for all endpoints in which data was ingested, as specified by the (tenant, year) grain.

Arguments:
Argument Description
tenant_code ODS-tenant representation to be saved in the database tables
api_year ODS API-year to be saved in the database tables
database_conn_id Name of the Airflow connection where database connection metadata has been defined
database_type Database connection type (used in connection disambiguation)
change_version_table Name of the table to record resource change versions in the database
edfi_change_version The most recent change version present in the ODS (as retrieved from get_newest_edfi_change_version)

reset_change_versions

Marks all rows in the database change_version_table for the specified (tenant, year) grain as inactive.

Arguments:
Argument Description
tenant_code ODS-tenant representation to be saved in the database tables
api_year ODS API-year to be saved in the database tables
database_conn_id Name of the Airflow connection where database connection metadata has been defined
database_type Database connection type (used in connection disambiguation)
change_version_table Name of the table to record resource change versions in the database


EarthbeamDAG

EarthbeamDAG is a comprehensive Airflow DAG designed around transforming a raw dataset into the Ed-Fi data model. The transformed files are sent into an Ed-Fi ODS using Lightbeam, or they are copied directly into the raw tables in Snowflake. Two new operators, EarthmoverOperator and LightbeamOperator, provide the means to interface with Earthmover and Lightbeam through Airflow.

The EarthbeamDAG provides multiple optional utility functions, including:

  • Saving raw and Earthmover-transformed data to S3
  • Logging Earthmover and Lightbeam results to a specified logging table in Snowflake
  • Dynamic preprocessing of data using Bash or Python at the DAG- and grain-level

EarthbeamDAG

The grain of EarthbeamDAG differs from that of EdFiResourceDAG. Earthmover-Lightbeam runs are defined at the run-type level (i.e., the type of Earthmover process being completed). Additionally, optional Bash and Python operations can be chained prior to task-groups via build_bash_preprocessing_operator and build_python_preprocessing_operator respectively.

Set force to True in DAG-level configs or under a task-group's lightbeam_kwargs to force Lightbeam to resend payloads.

DAG-level Arguments:
Argument Description
run_type Representation of the type of DAG being run (e.g., NWEA MAP assessment)
earthmover_path Path to installed Earthmover package
lightbeam_path Path to installed Lightbeam package
pool Airflow pool against which all operations are applied
earthmover_pool Optional Airflow pool against which all Earthmover operations are applied (default pool)
lightbeam_pool Optional Airflow pool against which all Lightbeam operations are applied (default pool)
fast_cleanup Boolean flag for whether to remove local files immediately upon failure, or only after success (default False)

Additional EACustomDAG parameters (e.g. slack_conn_id, schedule_interval, default_args, etc.) can be passed as kwargs.


EarthbeamDAG builds task-groups for each (tenant, year), with an optional grain_update to distinguish runs by subject. Task-groups are fully independent of one another, and different operations can be applied in each. Task-groups can also apply an optional Python preprocessing callable to the raw data before running Earthmover.

Taskgroup-level Arguments:
Argument Description
tenant_code ODS-tenant representation to be saved in Snowflake tables
api_year ODS API-year to be saved in Snowflake tables
raw_dir Path to the directory on the EC2 server where pre-Earthmover data is found
grain_update Optional string to extend the grain of the task-group beyond (tenant, year)
group_id Optional static string-representation of the task-group (a generated name is used if undefined)
prefix_group_id Optional boolean flag for whether to prefix the name of task-group operators with the group_id (default False)
earthmover_kwargs Kwarg command-line arguments to be passed into EarthmoverOperator
edfi_conn_id Airflow connection with Ed-Fi ODS credentials and metadata defined for a specific tenant
lightbeam_kwargs Kwarg command-line arguments to be passed into LightbeamOperator
s3_conn_id Airflow connection with S3 bucket defined under schema
s3_filepath S3 root key to copy staging data to before and after running Earthmover
python_callable Optional Python callable to run at the start of the task-group prior to Earthmover
python_kwargs Optional kwargs to pass into python_callable
python_postprocess_callable Optional Python callable to run at the end of the task-group
python_postprocess_kwargs Optional kwargs to pass into python_postprocess_callable. Kwargs em_data_dir and em_s3_filepath will also be passed
snowflake_conn_id Optional Airflow connection with Snowflake credentials, database, and schema defined, used for loading to raw
logging_table Optional name of a table to record Earthmover and Lightbeam results to in Snowflake
ods_version Optional Ed-Fi ODS version to save as metadata if copying data directly into the ODS
data_model_version Optional Ed-Fi data model version to save as metadata if copying data directly into the ODS
endpoints Optional list of resource/descriptor endpoints to copy data directly into the ODS (these should align with Earthmover outputs)
full_refresh Boolean flag to run a full truncate-replace of the warehouse data for the given grain if copying data directly into the ODS
assessment_bundle Optional (required for student ID xwalking) name of the assessment bundle being run
student_id_match_rates_table Optional (required for student ID xwalking) Snowflake table set up for storing student ID match rates (db.schema.table)
snowflake_read_conn_id Optional (required for student ID xwalking) Airflow connection with Snowflake credentials for reading from the analytics db
required_id_match_rate Optional float value for minimum student ID match rate, otherwise EM will fail

XComs can be passed between tasks in a tenant-year taskgroup (e.g., referencing return values from the Python preprocessing task to earthmover_kwargs). When passing XComs in this manner, make sure to include the name of the task-group ID in the referenced task ID.

For example, to reference the optional Python preprocessing task at the beginning of a task-group, use one of the following (depending on whether the optional argument grain_update is defined):

"{{ ti.xcom_pull(task_ids='{group_id}.{tenant_code}_{api_year}__python_preprocess', key='return_value') }}"
"{{ ti.xcom_pull(task_ids='{group_id}.{tenant_code}_{api_year}_{grain_update}__python_preprocess', key='return_value') }}"

Task-group IDs are built dynamically, depending on the type of processing being completed. If you are using XComs in this manner, it is recommended to use the optional group_id argument when initializing your task-group to ensure that its value is static and easily-referenced in your DAG-initialization code.


Operators

EarthmoverOperator

Extends BashOperator to run Earthmover with optional CLI arguments.

Arguments:
Argument Description
earthmover_path Path to installed Earthmover package
output_dir Directory to output files created by Earthmover (also definable within the config file)
state_file Path to file where Earthmover saves state between runs (also definable within the config file)
config_file Path to the Earthmover config file to run
selector Optional selector string to filter Earthmover run
parameters Optional params JSON payload to pass into Earthmover
results_file Optional path to save results payload after run
force Boolean flag to force an Earthmover run, even if nothing has changed since last run (default False)
skip_hashing Boolean flag to force Earthmover to skip hashing before run (default False)
show_graph Boolean flag to generate graph .png and .svg files (default False)
show_stacktrace Boolean flag to display full stacktrace in the case of an error (default False)

LightbeamOperator

Extends BashOperator to run Lightbeam with optional CLI arguments.

Arguments:
Argument Description
lightbeam_path Path to installed Ligthbeam package
command Lightbeam run command (i.e., send, send+validate, validate, delete) (default send)
data_dir Directory of files for Lightbeam to use (also definable within the config file)
state_dir Path to directory where Lightbeam saves state between runs (also definable within the config file)
edfi_conn_id Optional Airflow connection with Ed-Fi ODS credentials (if present, fills config environment variables prefixed with EDFI_API_)
config_file Path to the Lightbeam config file to run
selector Optional selector string to filter Lightbeam run
parameters Optional params JSON payload to pass into Lightbeam
results_file Optional path to save results payload after run
wipe Boolean flag to force a cache-reset and re-fetch API metadata (default False)
force Boolean flag to force a Lightbeam run, even if payloads have already been sent (default False)
older_than Optional timestamp string to filter payloads against when re-running Lightbeam
newer_than Optional timestamp string to filter payloads against when re-running Lightbeam
resend_status_codes Optional list of status-codes to filter payloads against when re-running Lightbeam


Callables

local_filepath_to_s3

Generic callable to upload files within a local filepath to a specified S3 bucket and key. This callable works on either a single file or a directory of files.

Arguments:
Argument Description
local_filepath A filepath or directory path to upload to S3
s3_destination_key An S3 key or root where the local file or directory is uploaded, respectively
s3_conn_id Name of the S3 connection in Airflow where the bucket has been defined
remove_local_filepath Boolean flag for whether to delete the contents of local_filepath at run completion (default False)

remove_filepaths

Generic callable to delete a list of files or directories on local disk. This method works on both filepaths and directory paths.

Arguments:
Argument Description
filepaths A single filepath or a list of filepaths to delete

Plug-and-play preprocessing functions

Static methods provided by EarthbeamDAG that can be provided as callables Python preprocessing operators

partition_on_tenant_and_year

Shards data to parquet on disk. This is useful when a single input file contains multiple years and/or tenants.

Arguments:
Argument Description
csv_paths one or more complete file paths pointing to input data
output_dir root directory of the parquet
tenant_col (optional) name of the column to use as tenant code
tenant_map (optional) map values from the contents of tenant_col to valid tenant codes
year_col (optional) name of the column to use as API year
year_map (optional) map values from the contents of api_col to valid API years

Runway

To use Runway objects, you must first install runway_python_client.

The RunwayHook creates and wraps RunwayClient:

from edu_edfi_airflow.providers.runway.hooks import RunwayHook

hook = RunwayHook(runway_conn_id="runway")
client = hook.get_runway_client()

Your "runway" Connection will look like this:

  • conn_type: http
  • host: https://api.\<env>.runwayloader.org/api/v1
  • login: <client key>
  • password: <client secret>
  • extra
    • extra__runway__auth_base_url: <EA's Auth0 URL for authenticating requests>
    • extra__runway__partner_code: <Partner code>

The Partner code is set in the Connection, because it's expected that one Airflow instance is operating with one Partner. Runway requests are authenticated at the Partner level, so token caching is simpler if the Partner remains the same across job requests.

See runway_python_client for documentation on using RunwayClient, which is a slim wrapper over Runway's Job Request API. One callable is provided that executes all steps in the workflow, send_to_runway:

from airflow.decorators import dag, task

from edu_edfi_airflow.callables.runway import send_to_runway

@dag
def test_runway():
  runway_task = task(send_to_runway)

  runway_task(
      runway_conn_id="runway",
      tenant_code="ea",
      bundle_name="assessments/PSAT_SAT",
      input_files={"INPUT_FILE": "/tmp/sat.csv"},
      bundle_params={"TEST_TYPE": "SAT"},
      school_year="2026",
  )

test_runway()

Here's an example of using Dynamic Task Mapping to run multiple input files against the same year and bundle params:

from airflow.decorators import dag, task

from edu_edfi_airflow.callables.runway import send_to_runway

@dag
def test_runway():
    runway_task = task(send_to_runway)

    runway_task.partial(
        runway_conn_id="runway",
        tenant_code="ea",
        bundle_name="assessments/PSAT_SAT",
        bundle_params={"TEST_TYPE": "SAT"},
        school_year="2026",
    ).expand(
        input_files=[
          {"INPUT_FILE": "/tmp/sat_1.csv"},
          {"INPUT_FILE": "/tmp/sat_2.csv"},
          {"INPUT_FILE": "/tmp/sat_3.csv"},
        ],
    )

test_runway()

You are welcome to write your own methods/tasks to suite your own Runway workflows. If it would be useful across projects, consider creating an Issue or submitting a PR.

About

Manages extract-load of Ed-Fi data in Airflow

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages