Skip to content

Latest commit



329 lines (284 loc) · 7.4 KB

File metadata and controls

329 lines (284 loc) · 7.4 KB

Welcome to microflow 👋

Version Documentation Maintenance License: MIT

Finite state machine based HTTP microservice orchestration


Microflow helps you build and run complex workflows which are composed of HTTP microservices and manual (human moderator) stages, all by definiing a JSON workflow blueprint. It is built on robust concepts of finite state machine, and allows you to control input/output data as template variables (think jsonpath, handlebars). A workflow consists of manual states and task states (aka HTTP workers which could be sync/async).

License: MIT



npm i --save microflow@alpha


The Microflow class provides various methods to author/execute/infer workflow and workflow instances

import { Microflow } from "microflow";

const flow = new Microflow({
  jwt: {
    secretOrPublicKey: 'dummySecretKey',
    sign: {
      expiresIn: '1h'

const {
  // task interface
  // workflow interface
  // execution interface
} = flow;


import { Microflow } from "microflow";

const flow = new Microflow({
  // bring your own persistence here 
  // (implements MicroflowStorage)
  storage: undefined,
  jwt: {
    secretOrPublicKey: 'dummySecretKey',
    sign: {
      expiresIn: '1h'

// Authoring task and workflow

// Register a task
const task = await flow.task.create({
  // Recognisiable identified for the task
  id: 'airflow',
  // type of task (only 'http' supported right now)
  type: 'http',
  //  <AxiosRequestConfig> supported (
  config: {
    url: 'http://localhost:1000/api/experimental/dags/{{dagId}}/dag_runs',
    headers: {
      'Cache-Control': 'no-cache',
      'Content-Type': 'application/json'
    data: {
      conf: {
        // $ is a reference to 'parameters' object of a task in the workflow
        actualData: '$.data',
        token: '$.token',
        envKey: '$.envKey',
        executionId: '$.executionId'
    method: 'post'

const { id: taskId } = await;

// Create a workflow
const workflow = await flow.workflow.create({
  id: 'sample',
  config: {
    initial: 'auto_test_1',
    states: {
      auto_test_1: {
        type: 'task',
        parameters: {
          //example of constant
          dagId: 'dag1',
          // $ = input event data to the task state
          data: '$',
          // $$ = Execution context object 
          token: '$$.task.token',
          // $$$ = process.env aka environment variables
          envKey: '$$$.myKey1',
          executionId: '$$.executionId'
        onDone: {
          target: 'ready_for_approval',
          resultSelector: {
            a: 'a',
            b: 'b',
            out: '$'
          resultPath: '$.pipeline1.success'
        onError: {
          target: 'failed',
          resultSelector: {
            c: 'c',
            d: 'd',
            out: '$'
          resultPath: '$.pipeline1.error'
      ready_for_approval: {
        type: 'atomic',
        on: {
          reject: {
            target: 'failed',
            resultPath: '$'
          approve: {
            target: 'auto_test_2',
            resultPath: '$'
      auto_test_2: {
        type: 'task',
        parameters: {
          dagId: 'dag2',
          data: '$',
          token: '$$.task.token',
          envKey: '$$$.myKey1',
          executionId: '$$.executionId'
        onDone: {
          target: 'done',
          resultSelector: {
            e: 'e',
            out: '$'
          resultPath: '$.pipeline2.success'
        onError: {
          target: 'failed',
          resultSelector: {
            f: 'f',
            out: '$'
          resultPath: '$.pipeline2.error'
      done: {
        type: 'final'
      failed: {
        type: 'final'

// start an execution with initial data
const execution = await workflow.start({
  input1: 'val1',
  input2: 'val2'

// Sending events to an execution
await execution.send({
  type: 'success-auto_test_1',
  data: {
    test_a_result: true,
    test_b_result: false

await execution.send({
  type: 'approve',
  data: {
    message: 'The acceptance test was fine'

await execution.send({
  type: 'success-auto_test_2',
  data: {
    test_c_result: true

const { completed, output, state } = await;

  "input1": "val1",
  "input2": "val2",
  "pipeline1": {
    "success": {
      "a": "a",
      "b": "b",
      "out": {
        "test_a_result": true,
        "test_b_result": false
  "approval": {
    "data": {
      "message": "The acceptance test was fine"
  "pipeline2": {
    "success": {
      "e": "e",
      "out": {
        "test_c_result": true


import { Microflow } from "microflow";
import { MicroflowStorage } from "microflow/types";

// define your own storage from MicroflowStorage abstract class
class MyStorage implements IMicroflowStorage {
  workflow: ICrudable<IWorkflow>;
  task: ICrudable<ITask>;
  execution: ICrudable<IExecution>;
  // define CRUD functions
    this.workflow = {
      //define CRUD implementation here

    this.task = {
      //define CRUD implementation here

    this.execution = {
      //define CRUD implementation here

const store = new MyStorage();

// create an instance of microflow with custom store injected
const flow = new Microflow({
  storage: store,
  jwt: {
    secretOrPublicKey: 'dummySecretKey',
    sign: {
      expiresIn: '1h'


Navigate to examples/basic to run a sample express project with ephemeral storage.

Run tests

npm run test


👤 Karan Chhabra

🤝 Contributing

Contributions, issues and feature requests are welcome!
Feel free to check issues page. You can also take a look at the contributing guide.

Show your support

Give a ⭐️ if this project helped you!

📝 License

Copyright © 2020 Karan Chhabra.
This project is MIT licensed.