Skip to content

Plugin for Apache Airflow to execute serverless tasks using Lithops

License

Notifications You must be signed in to change notification settings

lithops-cloud/airflow-plugin

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Lithops - Apache Airflow Plugin

This repository contains an Apache Airflow Plugin that implements new operators to easily deploy serverless functions tasks using Lithops.

Lithops is a Python multicloud library for running serverless jobs. Litops transparently runs local sequential code over thousands of serverless functions. This plugin allows Airflow to benefit from serverless functions to achieve higher performance for highly parallelizable tasks such as big data analysis workflows whithout consuming all the resources of the cluser where Airflow is running on or without having to provision Airflow workers using Celery executor.

Contents

  1. Installation
  2. Usage
  3. Examples

Usage

Operators

This plugin provides three new operators.


Important note: Due to the way Airflow manages DAGs, the callables passed to the Lithops operators can not be declared in the DAG definition script. Instead, they must be declared inside a separate file or module. To access the functions from the DAG file, import them as regular modules.


  • LithopsCallAsyncOperator

    It invokes a single function.

    Parameter Description Default
    func Python callable mandatory
    data Key word arguments {}
    data_from_task Get the output from another task as an input parameter for this function None

    Example:

    def add(x, y):
    	return x + y
    from my_functions import add
    my_task = LithopsCallAsyncOperator(
        task_id='add_task',
        func=add,
        data={'x' : 1, 'y' : 3},
        dag=dag,
    )
    # Returns: 
    4
    from my_functions import add
    basic_task = LithopsCallAsyncOperator(
        task_id='add_task_2',
        func=add,
        data={'x' : 4},
        data_from_task={'y' : 'add_task_1'},
        dag=dag,
    )
    # Returns: 
    8
  • LithopsMapOperator

    It invokes multiple parallel tasks, as many as how much data is in parameter map_iterdata. It applies the function map_function to every element in map_iterdata:

    Parameter Description Default Type
    map_function Python callable. mandatory callable
    map_iterdata Iterable. Invokes a function for every element in iterdata mandatory Has to be iterable
    iterdata_form_task Gets the input iterdata from another function's output None Has to be iterable
    extra_params Adds extra key word arguments to map function's signature None dict
    chunk_size Splits the object in chunks, and every chunk gets this many bytes as input data (on invocation per chunk) None int
    chunk_n Splits the object in N chunks (on invocation per chunk) None int
    remote_invocation Activates pywren's remote invocation functionality False bool
    invoke_pool_threads Number of threads to use to invoke 500 int

    Example:

    def add(x, y):
    	return x + y
    from my_functions import add
    map_task = LithopsMapOperator(
        task_id='map_task',
        map_function=add,
        map_iterdata=[1, 2, 3],
        extra_params={'y' : 1},
        dag=dag,
    )
    # Returns: 
    [2, 3, 4]
  • LithopsMapReduceOperator

    It invokes multiple parallel tasks, as many as how much data is in parameter map_iterdata. It applies the function map_function to every element in iterdata. Finally, a single reduce_function is invoked that gathers all the map results.

    Parameter Description Default Type
    map_function Python callable. mandatory callable
    map_iterdata Iterable. Invokes a function for every element in iterdata mandatory Has to be iterable
    reduce_function Python callable. mandatory callable
    iterdata_form_task Gets the input iterdata from another function's output None Has to be iterable
    extra_params Adds extra key word arguments to map function's signature None dict
    map_runtime_memory Memory to use to run the map functions Loaded from config int
    reduce_runtime_memory Memory to use to run the reduce function Loaded from config int
    chunk_size Splits the object in chunks, and every chunk gets this many bytes as input data (on invocation per chunk). 'None' for processing the whole file in one function activation None int
    chunk_n Splits the object in N chunks (on invocation per chunk). 'None' for processing the whole file in one function activation None int
    remote_invocation Activates pywren's remote invocation functionality False bool
    invoke_pool_threads Number of threads to use to invoke 500 int
    reducer_one_per_object Set one reducer per object after running the partitioner False bool
    reducer_wait_local Wait for results locally False bool

    Example:

    def add(x, y):
    	return x + y
    	
    def mult_array(results):
    	result = 1
    	for n in results:
    		result *= 2
    	return result			
    from my_functions import add, mult
    mapreduce_task = LithopsMapReduceOperator(
        task_id='mapreduce_task',
        map_function=add,
        reduce_funtion=mul,
        map_iterdata=[1, 2, 3],
        extra_params={'y' : 1},
        dag=dag,
    )
    # Returns: 
    18

Inherited parameters

All operators inherit a common PyWren operator that has the following parameters:

Parameter Description Default Type
lithops_config Lithops config, as a dictionary {} dict
async_invoke Invokes functions asynchronously, does not wait to function completion False bool
get_result Downloads results upon completion True bool
clean_data Deletes PyWren metadata from COS False bool
extra_env Adds environ variables to function's runtime None dict
runtime_memory Runtime memory, in MB 256 int
timeout Time that the functions have to complete their execution before raising a timeout Default from config int
include_modules Explicitly pickle these dependencies [] list
exclude_modules Explicitly keep these modules from pickled dependencies [] list

License

Apache 2 license