-
Notifications
You must be signed in to change notification settings - Fork 4
Update Stream.merge to use the same message format as GenEvent.Stream #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I believe this is done? :D Anyway, I just want to let you know that we will no longer change the gen event message, at least not for 1.0. We will continue sending I have only one issue in mind but I believe it can be solved later on. Today, if you use
Thoughts? |
What is the cost of monitoring? In what circumstances would we not want to monitor? That's still an area that's a bit foreign to me (not coming from an erlang background). Maybe Connectable should require monitoring. Linking would still be handled by the caller. |
For For |
Ok, after this discussion, I have done some final changes on how GenEvent works. I believe those are the relevant commits: I think it is actually nice we no longer need to match on |
Cool I'll clean up my branch here and see how things look. I still want to Curious, what is the post 1.0.0 roadmap for elixir? Right now its easy to I don't think merge is ripe yet (error handling is still not well defined)
|
The goal is to build all new ideas as separate projects and then assess its importance to the community. This will also be helpful to get things right because we won't have much opportunity for code churning once they get merged into Elixir. So, to be more explicit, we should move Streamz from experimental stage to usable, i.e. tidy up the abstractions, docs, documented APIs, etc. Then we promote it, see how involved the community gets, and then merge it in. For example, once we have built enough abstractions around Connectable as a protocol we can move it to Elixir with things like Stream.merge while we continue evaluating other aspects. The next stage would be to bring in the parallel stuff and so on and on. Does it make sense? |
Yep. Just the clarification I was looking for. On Fri, Sep 5, 2014 at 8:30 AM, José Valim [email protected] wrote:
|
I approached the protocol from a new angle. I ignored It's all under https://github.com/hamiltop/streamz/tree/event_source_protocol Some notable design decisions:
These were just sort of convenient solutions to problems I hadn't thought about before. There are probably other ways to go about it. There are some simple tests to make sure it works with I've also put together https://github.com/hamiltop/systemex which uses
The idea with In Also, I'm using rx.js on the Javascript side, just to keep things consistent. @josevalim Let me know if you think I'm heading down the wrong path. I've ignored gen_eventExit thus far as these are all infinite streams, but otherwise I feel like my design is the result of some real world usage. |
I think this looks great. I think returning some sort of struct is likely going to be very helpful in the long term too. Quick question though: why are we returning a list on add? Is there a chance none or more than one stream being added? I think I just thought of a scenario where this could happen: if you have a Stream.merge/1 and you call event source on it, we can skip the merge process altogether and send the messages directly to the process given to the event source, is this what you have in mind? |
Yep. I did this on my first iteration with https://github.com/hamiltop/streamz/blob/connectable_protocol/lib/streamz/connectable.ex#L55 I haven't gotten to Merge yet with the new code, but it will be similar. That's the "many" case. I'm not sure of there is an empty list scenario. |
I am writing a document that is meant to consider all use cases we have discussed so far. For example, we want to have event pools but avoid copying the data from sender to pool master to pool slave. We want to have pipelines but be able to track events through the pipeline. At the end of the document, I have done so many extensions to the "EventSource message protocol" that I had to basically reimplement GenEvent in the implementation of I am wondering if it is indeed easier to just say: EventSource is about a protocol that starts a GenEvent (if one is not running yet) with custom behaviour. For example, if we support sticky handlers, which is basically handlers that cause GenEvent termination once removed, we can implement merge with a GenEvent as follows: def merge(left, right) do
fn acc, fun ->
{:ok, manager} = GenEvent.start_link()
# Ask the GenEvent to send events to the
# current process. If the current process
# exits for any reason (including normal),
# the GenEvent aborts due to sticky.
GenEvent.add_process_handler(manager, self(), sticky: true)
# Register a merger handler that will call
# EventSource.add/2. See the module below.
GenEvent.add_handler(manager, GenEvent.MergeHandler,
[left, right], sticky: true)
# Consume events as in GenEvent.Stream with acc, fun
end
end
defmodule GenEvent.MergeHandler do
use GenEvent
def init(sources) do
state = Enum.map sources, fn source ->
{:ok, pid, ref} = EventSource.add(source, self())
# A crash in any source brings the whole thing down
Process.link(pid)
{pid, ref}
end
{:ok, state}
end
def handle_info({:gen_event_EXIT, id, reason}, state) do
# Remove id from state until state is empty.
# When the state is empty, return :remove_handler
# which will cause the whole GenEvent to crash
# as this handler is sticky.
end
end I think it should be able to implement EventSource.Any with a GenEvent with few extensions to GenEvent too. This feels it will make our lives much, much, much easier. Thoughts? |
So that implementation won't flatten nested Merges. It probably could be As is, it can't really be extended to work with an infinite stream of As a side note, I found that in practical usage I dislike calling I don't see any other major gaps. I think it will work. On Mon, Sep 15, 2014 at 10:59 AM, José Valim [email protected]
|
Right!
Oh wow, I haven't even thought about this case before. Honestly, I would be ok with saying that a list is required. Do you have an use case for using infinity in merge?
Yes I am on the same boat. The issue though is that EventSource.add works on data structures and the add_process_handler works on pid/atom. So we would need to wrap it on a stream for this particular use case (which is fine). I thought about implementing EventSource for PID/atom/tuple but that is probably more trouble than it is really worthy (we should on all likelihood keep the data structures that implement EventSource a subset of Enumerable). Thank you for all those conversations! ❤️ |
On Mon, Sep 15, 2014 at 11:41 AM, José Valim [email protected]
A TCPServer that produces streamable TCPSockets is another case (An RxJava has a ton of stream of streams stuff. I think it's worth supporting.
I guess that raises the question of usage. If merge is the only thing If not, function handlers on gen_event could mitigate the issues. Just Though that smells a bit like monadic callback based Scala, which is a I'll think about this a bit more.
|
You are right, it is going to be N+1, but there is no reason. So this example is pretty much moot. The trouble is in defining sources, a sink is much easier to hand-roll. Let me try to do something that shows an Enumerable as source. Do you have other sources you would like me to try as an example? |
I think Enumerable is really the only Any supported. Everything else would
|
Right, that is the question! Anything else (besides Enumerable) that would require a custom implementation that involves setting up its own process (like an Enumerable requires)? |
I think the implied requirement is one or more elements emitted. Anything
|
Ok, that said, here is the implementation of an Enumerable source: defmodule GenEvent.StreamHandler do
use GenEvent
def init({enumerable, ref}) do
cont = &Enumerable.reduce(enumerable, &1, fn x, _ -> {:suspend, x} end)
send_next({cont, ref})
end
def handle_event(_event, state) do
send_next(state)
end
defp send_next({cont, ref}) do
case cont.({:cont, :ok}) do
{:suspended, x, cont} ->
send self, {self, {self, ref}, {:ack_notify, x}}
{:ok, {cont, ref}}
{:halted, _} ->
:remove_handler
{:done, _} ->
:remove_handler
end
end
end
defimpl EventSource, for: Any do
def add(enumerable, pid)
{:ok, manager} = GenEvent.start_link()
GenEvent.add_process_handler(manager, pid, sticky: true)
ref = make_ref()
GenEvent.add_handler(manager, {GenEvent.StreamHandler, ref},
{enumerable, ref}, sticky: true)
end
end Is it straight-forward enough? |
It is not going to work on master for three reasons:
But I have a spike of those things in a branch and I have confirmed the code works. :) |
First impression. Without fully grokking the code, I would feel comfortable Second impression. Once you understand what's going on, it feels "clever". On the positive side: I understand better why GenEvent is a useful The negative side: I think emulating this for other types will be a little Overall I like it a lot. Just trying to capture my first impressions On Mon, Sep 15, 2014 at 1:35 PM, José Valim [email protected]
|
We have defined a message format for GenEvent.Stream. https://github.com/elixir-lang/elixir/blob/ab9450d92ff1582ad9ce11e6ec4baf450a07e00e/lib/elixir/lib/gen_event/stream.ex#L107
If merge were to conform to that format then Stream.merge can easily accept messages directly from GenEvent.Stream.
The text was updated successfully, but these errors were encountered: