Skip to content

transhapHigsn/freak

Repository files navigation

Freak

Build status Python Version Dependencies Status

Code style: black Security: bandit Pre-commit Semantic Versions License

Freak is a data flow engine.

About

Freak is primarily composed of two components, Engine and Flow, which are further divided into smaller components. Engine is encapsulation of logic on how to read flows and execute them in order of specified definition. Engine does this using components:

  • Butler
  • Executor
  • Inspector

Before explaining about these components, it is a good idea to understand what exactly is a flow in a Freak and how it is implemented. A flow is nothing but a set of steps intended to achieve an objective. This objective can be anything that is identifiable as business logic. The implementation of flow is done by defining three components:

  • Flow decorator, it is used to identify steps of the flow and enforce policies on how it is used or executed.

  • Locator, it uses flow decorator to locate steps defined in a module (module here is used as reference to single python file). Every flow is required to define one of its own, so that Butler can use it to identify and execute the flow.

Since, we have a basic understanding of what exactly a flow in freak is, let's move on to engine components.

  • Butler is responsible for reading python modules, locating steps and organizing them using locator specified by flow.

  • Executor is core component of the engine. It is responsible for executing steps. Currently, it only supports linear and choice-based flows.

  • Inspector is used to return input schema for every step defined by the flow. This is intended to be part of view logic of the engine.

Supported flows

  • Linear Flows
  • Choice Flows

Sample Code

Freak is currently under active development. Following code should give you an idea how it implements data flows.

This is how you define a flow using base flow.

from freak.flows.base_flow import base_flow
from freak.models.input import InputModel, InputModelB
from freak.models.request import RequestContext
from freak.models.response import Response, SuccessResponseContext
from freak.types import Flow


@base_flow(
    name="func_one",
    order=1,
    input_model=InputModel,
    uid="func_one",
    parent_uid=None,
)
def func_one(ctx: RequestContext) -> SuccessResponseContext:
    a = ctx.input["a"]
    b = ctx.input["b"]
    return SuccessResponseContext(
        input=ctx.input, output={"a": a + 1, "b": b + 2}
    )


@base_flow(
    name="func_two",
    order=2,
    input_model=InputModel,
    uid="func_two",
    parent_uid="func_one",
)
def func_two(ctx: RequestContext) -> SuccessResponseContext:
    a = ctx.input["a"]
    b = ctx.input["b"]
    return SuccessResponseContext(
        input=ctx.input,
        output={"a": a + 2, "b": b + 3},
    )


@base_flow(
    name="func_three",
    order=3,
    input_model=InputModel,
    uid="func_three",
    parent_uid="func_two",
)
def func_three(ctx: RequestContext) -> SuccessResponseContext:
    a = ctx.input["a"]
    b = ctx.input["b"]
    return SuccessResponseContext(
        input=ctx.input,
        output={"a": a + 3, "b": b + 4},
    )


@base_flow(
    name="func_four",
    order=4,
    input_model=InputModelB,
    uid="func_four",
    parent_uid="func_three",
)
def func_four(ctx: RequestContext) -> Response:
    a = ctx.input["a"]
    b = ctx.input["b"]
    c = ctx.input["c"]

    return SuccessResponseContext(
        input=ctx.input,
        output={"a": a + 4, "b": b + 5, "c": c + 6},
    )

Following test case will use above defintion to execute the flow.

from freak.provider import EngineProvider

def test_base_flow_prosecutioner():
    engine = EngineProvider(flow_name="base_flow").engine
    executioner = engine(module_name=__name__, decorator_name="base_flow")

    response = executioner.execute(data={"a": 4, "b": 7}, from_step="func_one")

    output, path_traversed = response

    assert path_traversed == {
        "last_step": "func_three",
        "traversed": {
            "func_one": ["func_two"],
            "func_two": ["func_three"],
            "func_three": ["func_four"],
        },
    }

    responses = output.responses

    assert output.from_step == 1
    assert output.to_step == 4

    assert output.last_successful_step == 3

    assert responses[0].output == {"a": 5, "b": 9}
    assert responses[1].output == {"a": 6, "b": 10}
    assert responses[2].output == {"a": 7, "b": 11}

    assert responses[3].success == False
    assert responses[3].output == {}
    assert (
        responses[3].messages[0]
        == "Variable: c | Type: value_error.missing | Message: field required"
    )

    response = executioner.execute(
        data={"a": 4, "b": 7, "c": 5},
        from_step="func_four",
        executed_steps=path_traversed,
    )

    output, path_traversed = response

    responses = output.responses
    assert len(responses) == 1
    assert responses[0].output == {"a": 8, "b": 12, "c": 11}

    assert output.last_successful_step == 4
    assert output.from_step == 4
    assert output.to_step == 4

    assert path_traversed == {
        "last_step": "func_four",
        "traversed": {
            "func_one": ["func_two"],
            "func_two": ["func_three"],
            "func_three": ["func_four"],
            "func_four": [],
        },
    }

Using above code, it is also possible to generate input schema for every step. Following test case will demonstrate this behaviour.

from freak.provider import EngineProvider

def test_base_flow_fetch_schema():
    engine = EngineProvider(flow_name="base_flow").engine
    executioner = engine(module_name=__name__, decorator_name="base_flow")
    responses = executioner.inspect()

    input_model_b_schema = {
        "title": "InputModelB",
        "description": "Class for defining structure of request data.",
        "type": "object",
        "properties": {
            "a": {"title": "A", "type": "integer"},
            "b": {"title": "B", "type": "integer"},
            "c": {"title": "C", "type": "integer"},
        },
        "required": ["a", "b", "c"],
    }
    input_model_schema = {
        "title": "InputModel",
        "description": "Class for defining structure of request data.",
        "type": "object",
        "properties": {
            "a": {"title": "A", "type": "integer"},
            "b": {"title": "B", "type": "integer"},
        },
        "required": ["a", "b"],
    }

    assert input_model_schema == InputModel.schema()
    assert input_model_b_schema == InputModelB.schema()

    assert executioner.flow.predecessor == responses["graph"]

    schema_info = responses["schema"]

    assert schema_info["func_one"]["schema"] == input_model_schema
    assert schema_info["func_two"]["schema"] == input_model_schema
    assert schema_info["func_three"]["schema"] == input_model_schema
    assert schema_info["func_four"]["schema"] == input_model_b_schema

πŸ›‘ License

License

This project is licensed under the terms of the MIT license. See LICENSE for more details.

πŸ“ƒ Citation

@misc{freak,
  author = {transhapHigsn},
  title = {Freak is a data flow engine.},
  year = {2020},
  publisher = {GitHub},
  journal = {GitHub repository},
  howpublished = {\url{https://github.com/transhaphigsn/freak}}
}

Credits

This project was generated with python-package-template.