Skip to content

Commit 889b683

Browse files
authored
[actor] receiveLoop with state + scaladocs + tests (#1126)
A few improvements in the `kyo-actor` module. Please see comments for more details.
1 parent 6aa02b6 commit 889b683

File tree

2 files changed

+410
-5
lines changed

2 files changed

+410
-5
lines changed

kyo-actor/shared/src/main/scala/kyo/Actor.scala

Lines changed: 149 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ sealed abstract class Actor[+E, A, B](_subject: Subject[A], _fiber: Fiber[Closed
8585
* - Does not interrupt the processing of the current message if one is being handled
8686
*
8787
* @return
88-
* A Maybe containing a sequence of any messages that were in the mailbox when it was closed
88+
* A Maybe containing a sequence of any messages that were in the mailbox if the close is successful
8989
*/
9090
def close(using Frame): Maybe[Seq[A]] < IO
9191

@@ -170,20 +170,20 @@ object Actor:
170170
/** Receives and processes a single message from the actor's mailbox.
171171
*
172172
* This method polls for the next available message and applies the provided processing function. Message processing is done
173-
* sequentially, ensuring only one message is handled at a time.
173+
* sequentially, ensuring only one message is handled at a time. The result of the processing function is discarded.
174174
*
175175
* @param f
176176
* The function to process each received message
177177
* @tparam A
178178
* The type of messages being received
179179
*/
180-
def receiveAll[A](using Tag[A])[B, S](f: A => B < S)(using Frame): Unit < (Context[A] & S) =
180+
def receiveAll[A: Tag](using Frame)[S](f: A => Any < S): Unit < (Context[A] & S) =
181181
Poll.values[A](f)
182182

183183
/** Receives and processes up to n messages from the actor's mailbox.
184184
*
185185
* This method polls for messages and applies the provided processing function to each one, up to the specified limit. Message
186-
* processing is done sequentially.
186+
* processing is done sequentially. The result of the processing function is discarded.
187187
*
188188
* @param max
189189
* The maximum number of messages to process
@@ -214,14 +214,158 @@ object Actor:
214214
* @return
215215
* An effect representing the message processing loop
216216
*/
217-
def receiveLoop[A](using Tag[A])[S](f: A => Loop.Outcome[Unit, Unit] < S)(using Frame): Unit < (Context[A] & S) =
217+
def receiveLoop[A: Tag](using Frame)[S](f: A => Loop.Outcome[Unit, Unit] < S): Unit < (Context[A] & S) =
218218
Loop(()) { _ =>
219219
Poll.one[A].map {
220220
case Absent => Loop.done
221221
case Present(v) => f(v)
222222
}
223223
}
224224

225+
/** Receives and processes messages from the actor's mailbox in a loop with a single state value.
226+
*
227+
* This method continuously polls for messages and applies the provided processing function to each one, maintaining a state value
228+
* between iterations. The function returns a Loop.Outcome that determines whether to continue processing more messages with an updated
229+
* state or stop with a final result.
230+
*
231+
* To control the loop:
232+
* - Return `Loop.continue(newState)` to continue processing with an updated state
233+
* - Return `Loop.done(finalState)` to stop processing and return the final state
234+
*
235+
* When the loop completes, the final state value is returned as the result.
236+
*
237+
* @param initialState
238+
* The initial state value to use for the first message
239+
* @param f
240+
* A function that processes each received message with the current state and returns a Loop.Outcome
241+
* @tparam A
242+
* The type of messages being received
243+
* @return
244+
* The final state value after the loop completes
245+
*/
246+
def receiveLoop[A: Tag](
247+
using Frame
248+
)[State, S](state: State)(
249+
f: (A, State) => Loop.Outcome[State, State] < S
250+
): State < (Context[A] & S) =
251+
Loop(state) { state =>
252+
Poll.one[A].map {
253+
case Absent => Loop.done(state)
254+
case Present(v) => f(v, state)
255+
}
256+
}
257+
258+
/** Receives and processes messages from the actor's mailbox in a loop with two state values.
259+
*
260+
* This method continuously polls for messages and applies the provided processing function to each one, maintaining two state values
261+
* between iterations. The function returns a Loop.Outcome that determines whether to continue processing more messages with updated
262+
* states or stop with a final result.
263+
*
264+
* To control the loop:
265+
* - Return `Loop.continue(newState1, newState2)` to continue processing with updated states
266+
* - Return `Loop.done((finalState1, finalState2))` to stop processing and return the final states
267+
*
268+
* When the loop completes, the final state values are returned as a tuple.
269+
*
270+
* @param state1
271+
* The first initial state value
272+
* @param state2
273+
* The second initial state value
274+
* @param f
275+
* A function that processes each received message with the current states and returns a Loop.Outcome
276+
* @tparam A
277+
* The type of messages being received
278+
* @return
279+
* A tuple containing the final state values after the loop completes
280+
*/
281+
def receiveLoop[A: Tag](
282+
using Frame
283+
)[State1, State2, S](state1: State1, state2: State2)(
284+
f: (A, State1, State2) => Loop.Outcome2[State1, State2, (State1, State2)] < S
285+
): (State1, State2) < (Context[A] & S) =
286+
Loop(state1, state2) { (s1, s2) =>
287+
Poll.one[A].map {
288+
case Absent => Loop.done((s1, s2))
289+
case Present(v) => f(v, s1, s2)
290+
}
291+
}
292+
293+
/** Receives and processes messages from the actor's mailbox in a loop with three state values.
294+
*
295+
* This method continuously polls for messages and applies the provided processing function to each one, maintaining three state values
296+
* between iterations. The function returns a Loop.Outcome that determines whether to continue processing more messages with updated
297+
* states or stop with a final result.
298+
*
299+
* To control the loop:
300+
* - Return `Loop.continue(newState1, newState2, newState3)` to continue processing with updated states
301+
* - Return `Loop.done((finalState1, finalState2, finalState3))` to stop processing and return the final states
302+
*
303+
* When the loop completes, the final state values are returned as a tuple.
304+
*
305+
* @param state1
306+
* The first initial state value
307+
* @param state2
308+
* The second initial state value
309+
* @param state3
310+
* The third initial state value
311+
* @param f
312+
* A function that processes each received message with the current states and returns a Loop.Outcome
313+
* @tparam A
314+
* The type of messages being received
315+
* @return
316+
* A tuple containing the final state values after the loop completes
317+
*/
318+
def receiveLoop[A: Tag](
319+
using Frame
320+
)[State1, State2, State3, S](state1: State1, state2: State2, state3: State3)(
321+
f: (A, State1, State2, State3) => Loop.Outcome3[State1, State2, State3, (State1, State2, State3)] < S
322+
): (State1, State2, State3) < (Context[A] & S) =
323+
Loop(state1, state2, state3) { (s1, s2, s3) =>
324+
Poll.one[A].map {
325+
case Absent => Loop.done((s1, s2, s3))
326+
case Present(v) => f(v, s1, s2, s3)
327+
}
328+
}
329+
330+
/** Receives and processes messages from the actor's mailbox in a loop with four state values.
331+
*
332+
* This method continuously polls for messages and applies the provided processing function to each one, maintaining four state values
333+
* between iterations. The function returns a Loop.Outcome that determines whether to continue processing more messages with updated
334+
* states or stop with a final result.
335+
*
336+
* To control the loop:
337+
* - Return `Loop.continue(newState1, newState2, newState3, newState4)` to continue processing with updated states
338+
* - Return `Loop.done((finalState1, finalState2, finalState3, finalState4))` to stop processing and return the final states
339+
*
340+
* When the loop completes, the final state values are returned as a tuple.
341+
*
342+
* @param state1
343+
* The first initial state value
344+
* @param state2
345+
* The second initial state value
346+
* @param state3
347+
* The third initial state value
348+
* @param state4
349+
* The fourth initial state value
350+
* @param f
351+
* A function that processes each received message with the current states and returns a Loop.Outcome
352+
* @tparam A
353+
* The type of messages being received
354+
* @return
355+
* A tuple containing the final state values after the loop completes
356+
*/
357+
def receiveLoop[A: Tag](
358+
using Frame
359+
)[State1, State2, State3, State4, S](state1: State1, state2: State2, state3: State3, state4: State4)(
360+
f: (A, State1, State2, State3, State4) => Loop.Outcome4[State1, State2, State3, State4, (State1, State2, State3, State4)] < S
361+
): (State1, State2, State3, State4) < (Context[A] & S) =
362+
Loop(state1, state2, state3, state4) { (s1, s2, s3, s4) =>
363+
Poll.one[A].map {
364+
case Absent => Loop.done((s1, s2, s3, s4))
365+
case Present(v) => f(v, s1, s2, s3, s4)
366+
}
367+
}
368+
225369
/** Creates and starts a new actor with default capacity from a message processing behavior.
226370
*
227371
* This is a convenience method that calls `run(defaultCapacity)(behavior)`. It creates an actor with the default mailbox capacity as

0 commit comments

Comments
 (0)