Skip to content

Streamz.collapse/3 #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
hamiltop opened this issue Aug 11, 2014 · 0 comments
Open

Streamz.collapse/3 #13

hamiltop opened this issue Aug 11, 2014 · 0 comments

Comments

@hamiltop
Copy link
Owner

Spec:

@spec collapse(Enumerable.t, (Enumerable.t -> [Enumerable.t]), (Enumerable.t -> any)) :: Enumerable.t
def collapse(stream, grouper, reducer)

Example:

uniq_values = Streamz.collapse(values, &Stream.chunk(&1, 100), &Stream.uniq/1) |> Enum.uniq

Explanation:
By breaking values up into chunks of length 100, we call uniq on them in parallel. We then call uniq on the final result. This is a building block function. It could be used to easily do parallel reduce:

def parallel_reduce(stream, reducer) do
  Streamz.collapse(stream, &Stream.chunk(&1, 100), reducer) |> reducer.()
end

Implementation:

This can be done right now serially via chunk, map and reduce. To do it in parallel, we can use the same building blocks as farm. group_by will break them out into a stream of streams. That stream of streams can be Stream.farm'd out and in the map we do Enum.reduce with the passed in reducer.

Why build this one? I see parallel reduce to have two components. The lazy/parallel component which must be stream based and the final Enum.reduce component which moves from a vector component to a scalar.

They could all be built together, but I think collapse may have other use cases where maintaining the stream is a good idea.

One issue here is that this stream will not emit any values until the stream has terminated... so maybe it doesn't belong in Stream. We'll see what happens here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant