-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from rveshovda/genstage
GenStage
- Loading branch information
Showing
19 changed files
with
212 additions
and
145 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
alias Experimental.{GenStage} | ||
|
||
defmodule XGPS.Broadcaster do | ||
@moduledoc """ | ||
Heavily inspired (almost a copy) from the GenEvent-replacement example from GenStage-repo at: | ||
https://github.com/elixir-lang/gen_stage | ||
""" | ||
use GenStage | ||
|
||
@doc """ | ||
Starts the broadcaster. | ||
""" | ||
def start_link() do | ||
GenStage.start_link(__MODULE__, :ok, name: __MODULE__) | ||
end | ||
|
||
@doc """ | ||
Sends an event async. | ||
""" | ||
def async_notify(event) do | ||
GenStage.cast(__MODULE__, {:notify, event}) | ||
end | ||
|
||
## Callbacks | ||
|
||
def init(:ok) do | ||
{:producer, {:queue.new, 0, 0}, dispatcher: GenStage.BroadcastDispatcher} | ||
end | ||
|
||
def handle_cancel(_, _, {queue, demand, number_of_subscribers}) do | ||
{:noreply, [], {queue, demand, number_of_subscribers - 1}} | ||
end | ||
|
||
def handle_subscribe(_, _ ,_ ,{queue, demand, number_of_subscribers}) do | ||
{:automatic, {queue, demand, number_of_subscribers + 1}} | ||
end | ||
|
||
def handle_cast({:notify, _event}, {_queue, _demand, 0}) do | ||
{:noreply, [], {:queue.new, 0, 0}} | ||
end | ||
|
||
def handle_cast({:notify, event}, {queue, demand, number_of_subscribers}) do | ||
dispatch_events(:queue.in(event, queue), demand, [], number_of_subscribers) | ||
end | ||
|
||
def handle_demand(incoming_demand, {queue, demand, number_of_subscribers}) do | ||
dispatch_events(queue, incoming_demand + demand, [], number_of_subscribers) | ||
end | ||
|
||
# TODO: Make sure the queue does not grow too big | ||
defp dispatch_events(queue, demand, events, number_of_subscribers) do | ||
with d when d > 0 <- demand, | ||
{{:value, event}, queue} <- :queue.out(queue) do | ||
dispatch_events(queue, demand - 1, [event | events], number_of_subscribers) | ||
else | ||
_ -> {:noreply, Enum.reverse(events), {queue, demand, number_of_subscribers}} | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
alias Experimental.{GenStage} | ||
defmodule XGPS.Example.Consumer do | ||
@moduledoc """ | ||
The GenEvent handler implementation is a simple consumer. | ||
""" | ||
use GenStage | ||
|
||
def start_link() do | ||
GenStage.start_link(__MODULE__, :ok) | ||
end | ||
|
||
# Callbacks | ||
|
||
def init(:ok) do | ||
# Starts a permanent subscription to the broadcaster | ||
# which will automatically start requesting items. | ||
{:consumer, :ok, subscribe_to: [XGPS.Broadcaster]} | ||
end | ||
|
||
@doc """ | ||
This function will be called once for each report from the GPS. | ||
""" | ||
def handle_events(events, _from, state) do | ||
for event <- events do | ||
IO.inspect {self(), event} | ||
end | ||
{:noreply, [], state} | ||
end | ||
end |
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.