Skip to content

Commit

Permalink
only allow one state collector at a time (#308)
Browse files Browse the repository at this point in the history
* only allow one state collector at a time

* Update flowredux/src/commonTest/kotlin/com/freeletics/flowredux/dsl/FlowReduxStateMachineTest.kt

Co-authored-by: Hannes Dorfmann <[email protected]>

Co-authored-by: Hannes Dorfmann <[email protected]>
  • Loading branch information
gabrielittner and sockeqwe authored Jun 4, 2022
1 parent 85061f0 commit 1387be5
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 41 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package com.freeletics.flowredux.dsl

import com.freeletics.flowredux.dsl.internal.Action
import com.freeletics.flowredux.dsl.internal.ExternalWrappedAction
import com.freeletics.flowredux.dsl.internal.InitialStateAction
import com.freeletics.flowredux.dsl.internal.reducer
import com.freeletics.flowredux.dsl.util.AtomicCounter
import com.freeletics.flowredux.reduxStore
import com.freeletics.mad.statemachine.StateMachine
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.receiveAsFlow

Expand All @@ -32,11 +38,24 @@ public abstract class FlowReduxStateMachine<S : Any, A : Any>(
)
}

val sideEffects = FlowReduxStoreBuilder<S, A>().apply(specBlock).generateSideEffects()

outputState = inputActions
.receiveAsFlow()
.reduxStore(initialStateSupplier, specBlock)
.map<A, Action<S, A>> { ExternalWrappedAction(it) }
.onStart {
emit(InitialStateAction())
}
.reduxStore(initialStateSupplier, sideEffects, ::reducer)
.distinctUntilChanged { old, new -> old === new } // distinct until not the same object reference.
.onStart {
activeFlowCounter.incrementAndGet()
if (activeFlowCounter.incrementAndGet() > 1) {
throw IllegalStateException(
"Can not collect state more than once at the same time. Make sure the" +
"previous collection is cancelled before starting a new one. " +
"Collecting state in parallel would lead to subtle bugs."
)
}
}
.onCompletion {
activeFlowCounter.decrementAndGet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.fail
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch

@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
class FlowReduxStateMachineTest {
Expand Down Expand Up @@ -88,4 +90,43 @@ class FlowReduxStateMachineTest {

assertEquals(expectedMsg, exception.message)
}

@Test
fun `observing state multiple times in parallel throws exception`() = suspendTest {
val sm = StateMachine {}

var collectionStarted = false
val job = launch {
sm.state.collect {
collectionStarted = true
}
}

while (!collectionStarted) {
delay(1)
}

val exception = assertFailsWith<IllegalStateException> {
sm.state.collect { }
}


val expectedMsg =
"Can not collect state more than once at the same time. Make sure the" +
"previous collection is cancelled before starting a new one. " +
"Collecting state in parallel would lead to subtle bugs."

assertEquals(expectedMsg, exception.message)

job.cancel()
}

@Test
fun `observing state multiple times in sequence`() = suspendTest {
val sm = StateMachine {}

// each call will collect the first item and then stop collecting
sm.state.first()
sm.state.first()
}
}

0 comments on commit 1387be5

Please sign in to comment.