Skip to content

Workflow engine based on Coloured Petri Net

License

Notifications You must be signed in to change notification settings

Byzanteam/coloured_flow

Repository files navigation

ColouredFlow

Warning

The document is WIP. Check examples at examples folder.

ColouredFlow is a workflow engine based on Coloured Petri Nets (CPN). It provides a flexible and powerful way to model and execute workflows with complex logic and concurrency. ColouredFlow allows you to define places, transitions, arcs, and tokens with different data types (colours) and expressions.

Features

  • Define workflows using Coloured Petri Nets
  • Support for various data types (colours) and expressions
  • Execute workflows with concurrency and complex logic
  • Integration with Ecto for persistence

Installation

If available in Hex, the package can be installed by adding coloured_flow to your list of dependencies in mix.exs:

def deps do
  [
    {:coloured_flow, "~> 0.1.0"}
  ]
end

Usage

Here is an example of how to use the coloured_flow package to model and execute a traffic light system using a Coloured Petri Net.

Example: Traffic Light

Mix.install(
  [
    {:coloured_flow, github: "Byzanteam/coloured_flow"},
    {:kino, "~> 0.14.1"}
  ],
  config: [
    coloured_flow: [
      {
        ColouredFlow.Runner.Storage,
        [
          repo: TrafficLight.Repo,
          storage: ColouredFlow.Runner.Storage.Default
        ]
      }
    ],
    traffic_light: [
      database_url: "ecto://postgres:postgres@localhost/coloured_flow"
    ]
  ]
)

Preparation

We assume you have a PostgreSQL database running, and the coloured_flow database exists. If not, please create the coloured_flow database before proceeding.

The default database URL is ecto://postgres:postgres@localhost/coloured_flow, you can change it in the above setup block if your database URL is different.

Coloured Petri Net graph

Open to view the graph
flowchart TB
  %% colset unit() :: {}

  subgraph EW
    %% places
    %% ~b[{}]
    r_ew((red_ew))
    g_ew((green_ew))
    y_ew((yellow_ew))

    %% transitions
    tr_ew[turn_red_ew]
    tg_ew[turn_green_ew]
    ty_ew[turn_yellow_ew]

    r_ew --{1,u}--> tg_ew
    tg_ew --{1,u}--> g_ew
    g_ew --{1,u}--> ty_ew
    ty_ew --{1,u}--> y_ew
    y_ew --{1,u}--> tr_ew
    tr_ew --{1,u}--> r_ew
  end

  subgraph NS
    %% places
    %% ~b[{}]
    r_ns((red_ns))
    g_ns((green_ns))
    y_ns((yellow_ns))

    %% transitions
    tr_ns[turn_red_ns]
    tg_ns[turn_green_ns]
    ty_ns[turn_yellow_ns]

    r_ns --{1,u}--> tg_ns
    tg_ns --{1,u}--> g_ns
    g_ns --{1,u}--> ty_ns
    ty_ns --{1,u}--> y_ns
    y_ns --{1,u}--> tr_ns
    tr_ns --{1,u}--> r_ns
  end

  %% safe
  %% ~b[{}]
  s_ew((safe_ew))
  s_ns((safe_ns))

  tr_ns --{1,u}--> s_ew
  s_ew --{1,u}--> tg_ew
  tr_ew --{1,u}--> s_ns
  s_ns --{1,u}--> tg_ns
Loading

Setup

defmodule TrafficLight.Repo do
  use Ecto.Repo,
    otp_app: :coloured_flow,
    adapter: Ecto.Adapters.Postgres
end

repo_pid =
  case TrafficLight.Repo.start_link(url: Application.fetch_env!(:traffic_light, :database_url)) do
    {:ok, pid} -> pid
    {:error, {:already_started, pid}} -> pid
    {:error, reason} -> raise inspect(reason)
  end

Ecto.Migrator.run(
  TrafficLight.Repo,
  [{0, ColouredFlow.Runner.Migrations.V0}],
  :up,
  all: true
)

IO.inspect("Repo started: #{inspect(repo_pid)}")

supervisor_pid =
  case ColouredFlow.Runner.Supervisor.start_link() do
    {:ok, pid} -> pid
    {:error, {:already_started, pid}} -> pid
    {:error, reason} -> raise inspect(reason)
  end

IO.inspect("Runner supervisor started: #{inspect(supervisor_pid)}")

"Database setup"

Flow modules

defmodule TrafficLight do
  alias ColouredFlow.Runner.Storage.Schemas

  def flow do
    alias ColouredFlow.Definition.ColouredPetriNet
    alias ColouredFlow.Definition.Place
    alias ColouredFlow.Definition.Variable

    import ColouredFlow.Builder.DefinitionHelper
    import ColouredFlow.Notation.Colset

    %ColouredPetriNet{
      colour_sets: [
        colset(unit() :: {})
      ],
      places:
        Enum.map(
          ~w[red_ew green_ew yellow_ew red_ns green_ns yellow_ns safe_ew safe_ns],
          fn name ->
            %Place{name: name, colour_set: :unit}
          end
        ),
      transitions:
        Enum.map(
          ~w[turn_red_ew turn_green_ew turn_yellow_ew turn_red_ns turn_green_ns turn_yellow_ns],
          &build_transition!(name: &1)
        ),
      arcs:
        [
          arc(turn_green_ew <~ red_ew :: "bind {1, u}"),
          arc(turn_green_ew ~> green_ew :: "{1, u}"),
          arc(turn_yellow_ew <~ green_ew :: "bind {1, u}"),
          arc(turn_yellow_ew ~> yellow_ew :: "{1, u}"),
          arc(turn_red_ew <~ yellow_ew :: "bind {1, u}"),
          arc(turn_red_ew ~> red_ew :: "{1, u}")
        ] ++
          [
            arc(turn_green_ns <~ red_ns :: "bind {1, u}"),
            arc(turn_green_ns ~> green_ns :: "{1, u}"),
            arc(turn_yellow_ns <~ green_ns :: "bind {1, u}"),
            arc(turn_yellow_ns ~> yellow_ns :: "{1, u}"),
            arc(turn_red_ns <~ yellow_ns :: "bind {1, u}"),
            arc(turn_red_ns ~> red_ns :: "{1, u}")
          ] ++
          [
            arc(turn_red_ns ~> safe_ew :: "{1, u}"),
            arc(turn_green_ew <~ safe_ew :: "bind {1, u}"),
            arc(turn_red_ew ~> safe_ns :: "{1, u}"),
            arc(turn_green_ns <~ safe_ns :: "bind {1, u}")
          ],
      variables: [
        %Variable{name: :u, colour_set: :unit}
      ]
    }
  end

  def setup_flow do
    %Schemas.Flow{}
    |> Ecto.Changeset.cast(
      %{
        name: "TrafficLight",
        definition: flow()
      },
      [:name, :definition]
    )
    |> TrafficLight.Repo.insert!()
  end

  def setup_enactment(flow, initial_markings) do
    %Schemas.Enactment{}
    |> Ecto.Changeset.cast(%{initial_markings: initial_markings}, [:initial_markings])
    |> Ecto.Changeset.put_assoc(:flow, flow)
    |> TrafficLight.Repo.insert!()
  end

  def start_enactment(flow) do
    import ColouredFlow.MultiSet, only: :sigils

    enactment =
      TrafficLight.setup_enactment(
        flow,
        [
          %{place: "red_ew", tokens: ~MS[{}]},
          %{place: "red_ns", tokens: ~MS[{}]},
          %{place: "safe_ew", tokens: ~MS[{}]}
        ]
      )

    {:ok, enactment_pid} = ColouredFlow.Runner.Enactment.Supervisor.start_enactment(enactment.id)

    {enactment, enactment_pid}
  end

  def to_kino do
    lights =
      for color <- [:red, :yellow, :green], dir <- [:ew, :ns] do
        {"#{color}_#{dir}", Kino.Frame.new(placeholder: false)}
      end

    grid = Kino.Layout.grid([EW, NS] ++ Keyword.values(lights), columns: 2)

    {grid, lights}
  end
end

defmodule TrafficLight.WorkitemPubSub do
  use GenServer

  alias ColouredFlow.Runner.Storage.Schemas
  alias ColouredFlow.Runner.Worklist.WorkitemStream

  def start_link(init_arg) do
    GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl GenServer
  def init(init_arg) do
    enactment_id = Keyword.fetch!(init_arg, :enactment_id)

    {:ok, %{enactment_id: enactment_id}, {:continue, :subscribe}}
  end

  @impl GenServer
  def handle_continue(:subscribe, state) do
    stream_task = start_stream(enactment_id: state.enactment_id)

    {:noreply, Map.put(state, :stream_task, stream_task)}
  end

  @impl GenServer
  def terminate(_reason, state) do
    Task.shutdown(state.stream_task)
  end

  @delay 1000
  defp start_stream(options) when is_list(options) do
    import Ecto.Query

    delay = Keyword.get(options, :delay, @delay)
    enactment_id = Keyword.fetch!(options, :enactment_id)

    Task.async(fn ->
      stream =
        Stream.resource(
          fn -> nil end,
          fn cursor ->
            [after_cursor: cursor]
            |> WorkitemStream.live_query()
            |> where(enactment_id: ^enactment_id)
            |> WorkitemStream.list_live()
            |> case do
              :end_of_stream ->
                Process.sleep(delay)

                {[], cursor}

              {workitems, cursor} ->
                {workitems, cursor}
            end
          end,
          fn _cursor -> :ok end
        )

      stream
      |> Stream.each(fn %Schemas.Workitem{} = workitem ->
        send(get_light(workitem), {:workitem, workitem})
      end)
      |> Stream.run()
    end)
  end

  defp get_light(%Schemas.Workitem{} = workitem) do
    dir =
      case workitem.binding_element.transition do
        "turn_red_" <> dir -> dir
        "turn_yellow_" <> dir -> dir
        "turn_green_" <> dir -> dir
      end

    case dir do
      "ew" -> TrafficLight.EWLight
      "ns" -> TrafficLight.NSLight
    end
  end
end

defmodule TrafficLight.DirectionalLight do
  use GenServer

  alias ColouredFlow.Runner.Enactment.WorkitemTransition
  alias ColouredFlow.Runner.Enactment.Workitem
  alias ColouredFlow.Runner.Storage.Schemas

  def start_link(init_arg) when is_list(init_arg) do
    {name, init_arg} = Keyword.pop!(init_arg, :name)

    GenServer.start_link(__MODULE__, init_arg, name: name)
  end

  @impl GenServer
  def init(init_arg) do
    enactment_id = Keyword.fetch!(init_arg, :enactment_id)
    enactment_pid = Keyword.fetch!(init_arg, :enactment_pid)
    direction = Keyword.fetch!(init_arg, :direction)
    lights = Keyword.fetch!(init_arg, :lights)

    {
      :ok,
      %{
        enactment_id: enactment_id,
        enactment_pid: enactment_pid,
        direction: direction,
        lights: lights
      },
      {:continue, :tick}
    }
  end

  @impl GenServer
  def handle_info({:workitem, %Schemas.Workitem{state: :enabled} = workitem}, state) do
    started_workitem = start_workitem(state.enactment_id, workitem.id)

    schedule_turn(started_workitem, workitem.inserted_at)

    {:noreply, state, {:continue, :tick}}
  end

  def handle_info({:workitem, %Schemas.Workitem{state: workitem_state}}, state)
      when workitem_state in unquote(Workitem.__in_progress_states__()) do
    # ignore in progress workitems

    {:noreply, state, {:continue, :tick}}
  end

  def handle_info({:turn, _color, %Workitem{} = workitem}, state) do
    {:ok, _completed_workitem} =
      WorkitemTransition.complete_workitem(state.enactment_id, {workitem.id, []})

    {:noreply, state, {:continue, :tick}}
  end

  @impl GenServer
  def handle_continue(:tick, state) do
    marking_places = get_marking_places(state.enactment_pid, state.direction)
    control(marking_places, state.lights)

    {:noreply, state}
  end

  defp start_workitem(enactment_id, workitem_id) do
    {:ok, started_workitem} =
      WorkitemTransition.start_workitem(enactment_id, workitem_id)

    started_workitem
  end

  @color_delay %{
    red: 3_000,
    yellow: 10_000,
    green: 0
  }

  defp schedule_turn(%Workitem{} = workitem, started_at) do
    color = get_color(workitem)
    delay = Map.fetch!(@color_delay, color)

    deplay =
      NaiveDateTime.utc_now()
      |> NaiveDateTime.diff(started_at, :millisecond)
      |> then(&Kernel.-(delay, &1))
      |> Kernel.max(0)

    Process.send_after(self(), {:turn, color, workitem}, deplay)
  end

  defp get_color(%Workitem{} = workitem) do
    case workitem.binding_element.transition do
      "turn_red" <> _dir -> :red
      "turn_yellow" <> _dir -> :yellow
      "turn_green" <> _dir -> :green
    end
  end

  defp get_marking_places(enactment_pid, direction) do
    direction = Atom.to_string(direction)

    enactment_pid
    |> :sys.get_state()
    |> Map.get(:markings)
    |> Map.keys()
    |> Enum.filter(&String.ends_with?(&1, direction))
  end

  defp control(marking_places, lights) do
    Enum.each(lights, fn {place_name, frame} ->
      light_symbol = light_symbol(place_name, place_name in marking_places)

      Kino.Frame.render(frame, light_symbol)
    end)
  end

  defp light_symbol(color, on?)

  defp light_symbol(color, true) do
    emoji =
      case color do
        "red" <> _dir -> "🔴"
        "yellow" <> _dir -> "🟡"
        "green" <> _dir -> "🟢"
      end

    Kino.Text.new(emoji, terminal: true)
  end

  defp light_symbol(_color, false) do
    Kino.Text.new("⚫️", terminal: true)
  end
end

defmodule TrafficLight.Supervisor do
  use Supervisor

  alias TrafficLight.DirectionalLight
  alias TrafficLight.WorkitemPubSub

  @spec start_link(Keyword.t()) :: Supervisor.on_start()
  def start_link(init_arg \\ []) when is_list(init_arg) do
    Process.whereis(__MODULE__) && Supervisor.stop(__MODULE__)
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl Supervisor
  def init(init_arg) do
    enactment_id = Keyword.fetch!(init_arg, :enactment_id)
    enactment_pid = Keyword.fetch!(init_arg, :enactment_pid)
    lights = Keyword.fetch!(init_arg, :lights)

    ew_lights = Enum.filter(lights, fn {name, _frame} -> String.ends_with?(name, "ew") end)
    ns_lights = Enum.filter(lights, fn {name, _frame} -> String.ends_with?(name, "ns") end)

    light_options = [enactment_id: enactment_id, enactment_pid: enactment_pid]

    children = [
      {WorkitemPubSub, [enactment_id: enactment_id]},
      Supervisor.child_spec(
        {DirectionalLight,
         [name: TrafficLight.EWLight, direction: :ew, lights: ew_lights] ++ light_options},
        id: TrafficLight.EWLight
      ),
      Supervisor.child_spec(
        {DirectionalLight,
         [name: TrafficLight.NSLight, direction: :ns, lights: ns_lights] ++ light_options},
        id: TrafficLight.NSLight
      )
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

Run

Logger.configure(level: :info)
flow = TrafficLight.setup_flow()
{enactment, enactment_pid} = TrafficLight.start_enactment(flow)

{grid, lights} = TrafficLight.to_kino()

{:ok, pid} =
  TrafficLight.Supervisor.start_link(
    enactment_id: enactment.id,
    enactment_pid: enactment_pid,
    lights: lights
  )

grid

About

Workflow engine based on Coloured Petri Net

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Languages