Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement loops as Streams #573

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

Implement loops as Streams #573

wants to merge 1 commit into from

Conversation

seanmor5
Copy link
Contributor

This takes the regular loop run and builds it based off of this stream which will return a loop state per epoch. Basically this should be really flexible in that I can now control the running of the loop with regular Elixir Stream and Enum functions. You can see this in how much simpler the older run is now.

This also kills output_transform in the loop. I may add it to the Loop.run as an option and that case we can add some output transforms that will not break some older training code

Resolves #362

Will potentially resolve #455 depending on how my experiments go

debug? = Keyword.get(jit_opts, :debug, false)

sample_data =
case Enum.take(data, 1) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky...

If the stream is pointing to an external resource, then you are opening it and traversing it. If the stream has side-effects, this first element is effectively lost. The correct way is to do this when streaming starts. Something like this:

Stream.unfold({:init, arg1, arg2}, fn
  {:init, arg1, arg2}, data ->
    state = init_loop_state(...)
    {emit_next(state), {:state, state}
  {:state, state}, data ->
    ....
  :halt, _data ->
    ...
end)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you would have to move everything to init, including fire_event(:started, ...).

loop
|> stream(data, init_state, opts)
|> Stream.take(max_epochs)
|> Stream.with_index()
Copy link
Contributor

@josevalim josevalim May 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need with_index if you already have state.epoch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants