Skip to content

Commit 3e4c03b

Browse files
committed
Fixed non-standard cancellation behavior
Cancellation shouldn't call completion. More here: https://twitter.com/millenomi/status/1137382877870510080
1 parent 0602dd3 commit 3e4c03b

File tree

16 files changed

+130
-100
lines changed

16 files changed

+130
-100
lines changed

Assets/EntwineTest/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ func testMap() {
187187
(300, .input("A")), // received uppercased input @ 100 + subscription time
188188
(400, .input("B")), // received uppercased input @ 200 + subscription time
189189
(500, .input("C")), // received uppercased input @ 300 + subscription time
190-
(900, .completion(.finished)), // subscription cancelled
191190
])
192191
}
193192
```

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ func testMap() {
8787
(300, .input("A")), // received uppercased input @ 100 + subscription time
8888
(400, .input("B")), // received uppercased input @ 200 + subscription time
8989
(500, .input("C")), // received uppercased input @ 300 + subscription time
90-
(900, .completion(.finished)), // subscription cancelled
9190
])
9291
}
9392
```

Sources/Common/Utilities/SinkQueue.swift

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,29 +43,28 @@ class SinkQueue<Sink: Subscriber> {
4343
self.sink = sink
4444
}
4545

46-
deinit {
47-
expediteCompletion(.finished)
48-
}
49-
5046
func requestDemand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
5147
demandRequested += demand
5248
return processDemand()
5349
}
5450

5551
func enqueue(_ input: Sink.Input) -> Subscribers.Demand {
56-
guard isActive else { return .none }
52+
assertPreCompletion()
5753
buffer.enqueue(input)
5854
return processDemand()
5955
}
6056

6157
func enqueue(completion: Subscribers.Completion<Sink.Failure>) -> Subscribers.Demand {
62-
guard isActive else { return .none }
58+
assertPreCompletion()
6359
self.completion = completion
6460
return processDemand()
6561
}
6662

6763
func expediteCompletion(_ completion: Subscribers.Completion<Sink.Failure>) {
68-
guard let sink = sink else { return }
64+
guard let sink = sink else {
65+
assertionFailure("Out of sequence. A completion signal has already been sent.")
66+
return
67+
}
6968
self.sink = nil
7069
self.buffer = .empty
7170
sink.receive(completion: completion)
@@ -74,9 +73,10 @@ class SinkQueue<Sink: Subscriber> {
7473
// Processes as much demand as requested, returns spare capacity that
7574
// can be forwarded to upstream subscriber/s
7675
func processDemand() -> Subscribers.Demand {
76+
guard let sink = sink else { return .none }
7777
while demandProcessed < demandRequested, let next = buffer.next() {
7878
demandProcessed += 1
79-
demandRequested += sink?.receive(next) ?? .none
79+
demandRequested += sink.receive(next)
8080
}
8181
if let completion = completion, demandQueued < 1 {
8282
expediteCompletion(completion)
@@ -86,4 +86,8 @@ class SinkQueue<Sink: Subscriber> {
8686
demandForwarded += spareDemand
8787
return spareDemand
8888
}
89+
90+
func assertPreCompletion() {
91+
assert(completion == nil && sink != nil, "Out of sequence. A completion signal is queued or has already been sent.")
92+
}
8993
}

Sources/Entwine/Operators/Dematerialize.swift

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ extension Publishers {
9797
// for a few more elements to arrive after this is
9898
// called.
9999
func cancel() {
100+
self.sink?.cancelUpstreamSubscription()
100101
self.sink = nil
101102
}
102103
}
@@ -107,24 +108,20 @@ extension Publishers {
107108
Downstream.Input == AnyPublisher<Upstream.Output.Input, DematerializationError<Upstream.Output.Failure>>,
108109
Downstream.Failure == DematerializationError<Upstream.Output.Failure>
109110
{
110-
111111
typealias Input = Upstream.Output
112112
typealias Failure = Upstream.Failure
113113

114+
enum Status { case pending, active(PassthroughSubject<Input.Input, Downstream.Failure>), complete }
115+
114116
var queue: SinkQueue<Downstream>
115117
var upstreamSubscription: Subscription?
116-
var currentMaterializationSubject: PassthroughSubject<Input.Input, Downstream.Failure>?
118+
var status = Status.pending
117119

118120
init(upstream: Upstream, downstream: Downstream) {
119121
self.queue = SinkQueue(sink: downstream)
120122
upstream.subscribe(self)
121123
}
122124

123-
deinit {
124-
queue.expediteCompletion(.finished)
125-
cancelUpstreamSubscription()
126-
}
127-
128125
// Called by the upstream publisher (or its agent) to signal
129126
// that the subscription has begun
130127
func receive(subscription: Subscription) {
@@ -141,18 +138,17 @@ extension Publishers {
141138

142139
switch input.signal {
143140
case .subscription:
144-
guard currentMaterializationSubject == nil else {
141+
guard case .pending = status else {
145142
queue.expediteCompletion(.failure(.outOfSequence))
146-
cancelUpstreamSubscription()
147143
return .none
148144
}
149-
currentMaterializationSubject = .init()
150-
return queue.enqueue(currentMaterializationSubject!.eraseToAnyPublisher())
145+
let subject = PassthroughSubject<Input.Input, Downstream.Failure>()
146+
status = .active(subject)
147+
return queue.enqueue(subject.eraseToAnyPublisher())
151148

152149
case .input(let dematerializedInput):
153-
guard let subject = currentMaterializationSubject else {
150+
guard case .active(let subject) = status else {
154151
queue.expediteCompletion(.failure(.outOfSequence))
155-
cancelUpstreamSubscription()
156152
return .none
157153
}
158154
subject.send(dematerializedInput)
@@ -162,16 +158,12 @@ extension Publishers {
162158
return .max(1)
163159

164160
case .completion(let dematerializedCompletion):
165-
guard let subject = currentMaterializationSubject else {
161+
guard case .active(let subject) = status else {
166162
queue.expediteCompletion(.failure(.outOfSequence))
167-
cancelUpstreamSubscription()
168163
return .none
169164
}
170-
currentMaterializationSubject = nil
165+
status = .complete
171166
subject.send(completion: wrapSourceCompletion(dematerializedCompletion))
172-
// re-imburse the sender as we're not queueing an
173-
// additional element on the outer stream, only
174-
// sending an element on the inner-stream
175167
return .none
176168
}
177169
}
@@ -185,7 +177,6 @@ extension Publishers {
185177
// that the sequence has terminated
186178
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
187179
_ = queue.enqueue(completion: .finished)
188-
cancelUpstreamSubscription()
189180
}
190181

191182
// Indirectly called by the downstream subscriber via its subscription
@@ -227,8 +218,8 @@ extension Publisher where Output: SignalConvertible, Failure == Never {
227218
///
228219
/// - Returns: A publisher that materializes an upstream publisher of `Signal`s into the represented
229220
/// sequence.
230-
public func dematerialize() -> Publishers.FlatMap<AnyPublisher<Self.Output.Input, DematerializationError<Self.Output.Failure>>, Publishers.First<Publishers.Dematerialize<Self>>> {
231-
return dematerializedValuesPublisherSequence().first().flatMap { $0 }
221+
public func dematerialize() -> Publishers.FlatMap<AnyPublisher<Self.Output.Input, DematerializationError<Self.Output.Failure>>, Publishers.Dematerialize<Self>> {
222+
Publishers.Dematerialize(upstream: self).flatMap { $0 }
232223
}
233224
}
234225

Sources/Entwine/Operators/Materialize.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ extension Publishers {
5151
var sink: MaterializeSink<Upstream, Downstream>?
5252

5353
init(upstream: Upstream, downstream: Downstream) {
54-
let sink = MaterializeSink(upstream: upstream, downstream: downstream)
55-
self.sink = sink
54+
self.sink = MaterializeSink(upstream: upstream, downstream: downstream)
5655
}
5756

5857
// Subscription Methods
@@ -89,7 +88,6 @@ extension Publishers {
8988
}
9089

9190
deinit {
92-
queue.expediteCompletion(.finished)
9391
cancelUpstreamSubscription()
9492
}
9593

Sources/Entwine/Operators/ReplaySubject.swift

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,19 +126,13 @@ fileprivate final class ReplaySubjectSubscription<Sink: Subscriber>: Subscriptio
126126

127127
func forwardCompletionToSink(_ completion: Subscribers.Completion<Sink.Failure>) {
128128
queue.expediteCompletion(completion)
129-
cleanup()
130129
}
131130

132131
func request(_ demand: Subscribers.Demand) {
133132
_ = queue.requestDemand(demand)
134133
}
135134

136135
func cancel() {
137-
queue.expediteCompletion(.finished)
138-
cleanup()
139-
}
140-
141-
func cleanup() {
142136
cleanupHandler?()
143137
cleanupHandler = nil
144138
}

Sources/Entwine/Operators/WithLatestFrom.swift

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,34 +42,59 @@ extension Publishers {
4242
}
4343

4444
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
45-
upstream.subscribe(
46-
WithLatestFromSink<Upstream, Other, S>(
47-
downstream: subscriber,
48-
otherSink: WithLatestFromOtherSink(publisher: other),
49-
transform: transform
50-
)
51-
)
45+
let otherSink = WithLatestFromOtherSink(publisher: other)
46+
let upstreamSink = WithLatestFromSink(upstream: upstream, downstream: subscriber, otherSink: otherSink, transform: transform)
47+
subscriber.receive(subscription: WithLatestFromSubscription(sink: upstreamSink))
5248
}
5349
}
5450

51+
// MARK: - Subscription
52+
53+
fileprivate class WithLatestFromSubscription<Upstream: Publisher, Other: Publisher, Downstream: Subscriber>: Subscription
54+
where Upstream.Failure == Other.Failure, Upstream.Failure == Downstream.Failure
55+
{
56+
57+
var sink: WithLatestFromSink<Upstream, Other, Downstream>?
58+
59+
init(sink: WithLatestFromSink<Upstream, Other, Downstream>) {
60+
self.sink = sink
61+
}
62+
63+
func request(_ demand: Subscribers.Demand) {
64+
sink?.signalDemand(demand)
65+
}
66+
67+
func cancel() {
68+
self.sink?.terminateSubscription()
69+
self.sink = nil
70+
}
71+
}
72+
73+
// MARK: - Main Sink
74+
5575
fileprivate class WithLatestFromSink<Upstream: Publisher, Other: Publisher, Downstream: Subscriber>: Subscriber
56-
where Upstream.Failure == Other.Failure, Upstream.Failure == Downstream.Failure {
76+
where Upstream.Failure == Other.Failure, Upstream.Failure == Downstream.Failure
77+
{
5778

5879
typealias Input = Upstream.Output
5980
typealias Failure = Upstream.Failure
6081

61-
let downstream: Downstream
82+
var queue: SinkQueue<Downstream>
83+
var upstreamSubscription: Subscription?
84+
6285
let otherSink: WithLatestFromOtherSink<Other>
6386
let transform: (Upstream.Output, Other.Output) -> Downstream.Input
6487

65-
init(downstream: Downstream, otherSink: WithLatestFromOtherSink<Other>, transform: @escaping (Input, Other.Output) -> Downstream.Input) {
66-
self.downstream = downstream
88+
init(upstream: Upstream, downstream: Downstream, otherSink: WithLatestFromOtherSink<Other>, transform: @escaping (Input, Other.Output) -> Downstream.Input) {
89+
self.queue = SinkQueue(sink: downstream)
6790
self.otherSink = otherSink
6891
self.transform = transform
92+
93+
upstream.subscribe(self)
6994
}
7095

7196
func receive(subscription: Subscription) {
72-
downstream.receive(subscription: subscription)
97+
self.upstreamSubscription = subscription
7398
otherSink.subscribe()
7499
}
75100

@@ -83,15 +108,27 @@ extension Publishers {
83108
// the dropped item by returning a Subscribers.Demand of 1.
84109
return .max(1)
85110
}
86-
return downstream.receive(transform(input, otherInput))
111+
return queue.enqueue(transform(input, otherInput))
87112
}
88113

89114
func receive(completion: Subscribers.Completion<Downstream.Failure>) {
115+
_ = queue.enqueue(completion: completion)
116+
}
117+
118+
func signalDemand(_ demand: Subscribers.Demand) {
119+
let spareDemand = queue.requestDemand(demand)
120+
guard spareDemand > .none else { return }
121+
upstreamSubscription?.request(spareDemand)
122+
}
123+
124+
func terminateSubscription() {
90125
otherSink.terminateSubscription()
91-
downstream.receive(completion: completion)
126+
upstreamSubscription?.cancel()
92127
}
93128
}
94129

130+
// MARK: - Other Sink
131+
95132
fileprivate class WithLatestFromOtherSink<P: Publisher>: Subscriber {
96133

97134
typealias Input = P.Output
@@ -101,7 +138,6 @@ extension Publishers {
101138
private (set) var lastInput: Input?
102139
private var subscription: Subscription?
103140

104-
105141
init(publisher: P) {
106142
self.publisher = publisher.eraseToAnyPublisher()
107143
}
@@ -120,9 +156,7 @@ extension Publishers {
120156
return .none
121157
}
122158

123-
func receive(completion: Subscribers.Completion<Failure>) {
124-
subscription = nil
125-
}
159+
func receive(completion: Subscribers.Completion<Failure>) { }
126160

127161
func terminateSubscription() {
128162
subscription?.cancel()

Sources/Entwine/Publishers/Factory.swift

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,29 +112,29 @@ public class Dispatcher<Input, Failure: Error> {
112112

113113
/// Queues an element to be delivered to the subscriber
114114
///
115-
/// If the subscriber has cancelled the subscription, or either the `forward(completion:)`
116-
/// or the `forwardImmediately(completion:)`method of the dispatcher has already
117-
/// been called, this will be a no-op.
115+
/// - Warning: If either the `forward(completion:)` or the
116+
/// `forwardImmediately(completion:)`method of the dispatcher has already
117+
/// been called this will raise an assertion failure.
118118
///
119119
/// - Parameter input: a value to be delivered to a downstream subscriber
120120
public func forward(_ input: Input) {
121121
fatalError("Abstract class. Override in subclass.")
122122
}
123123

124124
/// Completes the sequence once any queued elements are delivered to the subscriber
125-
/// - Parameter completion: a completion value to be delivered to the subscriber once
126125
///
127-
/// If the subscriber has cancelled the subscription, or either the `forward(completion:)`
128-
/// or the `forwardImmediately(completion:)`method of the dispatcher has already
129-
/// been called, this will be a no-op.
126+
/// - Warning: If either the `forward(completion:)` or the
127+
/// `forwardImmediately(completion:)`method of the dispatcher has already
128+
/// been called this will raise an assertion failure.
130129
///
130+
/// - Parameter completion: a completion value to be delivered to the subscriber once
131131
/// the remaining items in the queue have been delivered
132132
public func forward(completion: Subscribers.Completion<Failure>) {
133133
fatalError("Abstract class. Override in subclass.")
134134
}
135135

136-
/// Completes the sequence immediately regardless of any elements that are waiting to be delivered,
137-
/// subsequent calls to the dispatcher will be a no-op
136+
/// Completes the sequence immediately regardless of any elements that are waiting to be delivered
137+
/// - Warning: subsequent calls to the dispatcher will raise an assertion failure
138138
/// - Parameter completion: a completion value to be delivered immediately to the subscriber
139139
public func forwardImmediately(completion: Subscribers.Completion<Failure>) {
140140
fatalError("Abstract class. Override in subclass.")

Sources/Entwine/Utilities/CancellationBag.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import Combine
2727
/// A container for cancellables that will be cancelled when the bag is deallocated or cancelled itself
2828
public final class CancellationBag: Cancellable {
2929

30+
public init() {}
31+
3032
private var cancellables = [AnyCancellable]()
3133

3234
/// Adds a cancellable to the bag which will have its `.cancel()` method invoked

0 commit comments

Comments
 (0)