Skip to content

Commit 8545083

Browse files
SourceAdapters should convert back and forth without allocation (#3196)
Motivation: `SourceAdapters.toSource(...)` currently returns a source without implementing the original type. In result, operation like `fromSource(toSource(notSource))` returns double wrapped object instead of wrapping only once. Modifications: - `PublisherToPublisherSource` extends `Publisher`; - `SingleToSingleSource` extends `Single`; - `CompletableToCompletableSource` extends `Completable`; - Enhance `SourceAdaptersTest` to cover all transformations (via cast and via wrapping). Result: Less wrapping layers for back-and-forth conversions and better test coverage for `SourceAdapters`.
1 parent 0eb0818 commit 8545083

11 files changed

+373
-103
lines changed

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractNoHandleSubscribeCompletable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ abstract class AbstractNoHandleSubscribeCompletable extends SubscribableCompleta
2727

2828
@Override
2929
protected final void handleSubscribe(final Subscriber subscriber) {
30-
deliverErrorFromSource(subscriber, new UnsupportedOperationException("Subscribe with no " +
31-
CapturedContext.class.getSimpleName() + " is not supported for " + getClass()));
30+
deliverErrorFromSource(subscriber, newUnsupportedOperationException(getClass()));
31+
}
32+
33+
static UnsupportedOperationException newUnsupportedOperationException(final Class<?> clazz) {
34+
return new UnsupportedOperationException(
35+
"Subscribe with no " + CapturedContext.class.getSimpleName() + " is not supported for " + clazz);
3236
}
3337
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractNoHandleSubscribePublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.servicetalk.concurrent.api.SubscribableSources.SubscribablePublisher;
1919

20+
import static io.servicetalk.concurrent.api.AbstractNoHandleSubscribeCompletable.newUnsupportedOperationException;
2021
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
2122

2223
/**
@@ -29,7 +30,6 @@ abstract class AbstractNoHandleSubscribePublisher<T> extends SubscribablePublish
2930

3031
@Override
3132
protected final void handleSubscribe(final Subscriber<? super T> subscriber) {
32-
deliverErrorFromSource(subscriber, new UnsupportedOperationException("Subscribe with no " +
33-
CapturedContext.class.getSimpleName() + " is not supported for " + getClass()));
33+
deliverErrorFromSource(subscriber, newUnsupportedOperationException(getClass()));
3434
}
3535
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractNoHandleSubscribeSingle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableSingle;
1919

20+
import static io.servicetalk.concurrent.api.AbstractNoHandleSubscribeCompletable.newUnsupportedOperationException;
2021
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
2122

2223
/**
@@ -29,7 +30,6 @@ abstract class AbstractNoHandleSubscribeSingle<T> extends SubscribableSingle<T>
2930

3031
@Override
3132
protected final void handleSubscribe(final Subscriber<? super T> subscriber) {
32-
deliverErrorFromSource(subscriber, new UnsupportedOperationException("Subscribe with no " +
33-
CapturedContext.class.getSimpleName() + " is not supported for " + getClass()));
33+
deliverErrorFromSource(subscriber, newUnsupportedOperationException(getClass()));
3434
}
3535
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableSetContextOnSubscribe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@ void handleSubscribe(final Subscriber subscriber,
3939
// This operator currently only targets the subscribe method. Given this limitation if we try to change the
4040
// ContextMap now it is possible that operators downstream in the subscribe call stack may have modified
4141
// the ContextMap and we don't want to discard those changes by using a different ContextMap.
42-
original.handleSubscribe(subscriber, capturedContext, contextProvider);
42+
original.delegateSubscribe(subscriber, capturedContext, contextProvider);
4343
}
4444
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableShareContextOnSubscribe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ void handleSubscribe(final Subscriber subscriber,
3333
// This operator currently only targets the subscribe method. Given this limitation if we try to change the
3434
// ContextMap now it is possible that operators downstream in the subscribe call stack may have modified
3535
// the ContextMap and we don't want to discard those changes by using a different ContextMap.
36-
original.handleSubscribe(subscriber, capturedContext, contextProvider);
36+
original.delegateSubscribe(subscriber, capturedContext, contextProvider);
3737
}
3838
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherSetContextOnSubscribe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@ void handleSubscribe(final Subscriber<? super T> singleSubscriber,
3939
// This operator currently only targets the subscribe method. Given this limitation if we try to change the
4040
// ContextMap now it is possible that operators downstream in the subscribe call stack may have modified
4141
// the ContextMap and we don't want to discard those changes by using a different ContextMap.
42-
original.handleSubscribe(singleSubscriber, capturedContext, contextProvider);
42+
original.delegateSubscribe(singleSubscriber, capturedContext, contextProvider);
4343
}
4444
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherShareContextOnSubscribe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ void handleSubscribe(final Subscriber<? super T> singleSubscriber,
3333
// This operator currently only targets the subscribe method. Given this limitation if we try to change the
3434
// ContextMap now it is possible that operators downstream in the subscribe call stack may have modified
3535
// the ContextMap and we don't want to discard those changes by using a different ContextMap.
36-
original.handleSubscribe(singleSubscriber, capturedContext, contextProvider);
36+
original.delegateSubscribe(singleSubscriber, capturedContext, contextProvider);
3737
}
3838
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleSetContextOnSubscribe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@ void handleSubscribe(final Subscriber<? super T> singleSubscriber,
3939
// This operator currently only targets the subscribe method. Given this limitation if we try to change the
4040
// ContextMap now it is possible that operators downstream in the subscribe call stack may have modified
4141
// the ContextMap and we don't want to discard those changes by using a different ContextMap.
42-
original.handleSubscribe(singleSubscriber, capturedContext, contextProvider);
42+
original.delegateSubscribe(singleSubscriber, capturedContext, contextProvider);
4343
}
4444
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleShareContextOnSubscribe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ void handleSubscribe(final Subscriber<? super T> singleSubscriber,
3333
// This operator currently only targets the subscribe method. Given this limitation if we try to change the
3434
// ContextMap now it is possible that operators downstream in the subscribe call stack may have modified
3535
// the ContextMap and we don't want to discard those changes by using a different ContextMap.
36-
original.handleSubscribe(singleSubscriber, capturedContext, contextProvider);
36+
original.delegateSubscribe(singleSubscriber, capturedContext, contextProvider);
3737
}
3838
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SourceAdapters.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.servicetalk.concurrent.PublisherSource;
2020
import io.servicetalk.concurrent.SingleSource;
2121

22+
import static io.servicetalk.concurrent.api.AbstractNoHandleSubscribeCompletable.newUnsupportedOperationException;
23+
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
2224
import static java.util.Objects.requireNonNull;
2325

2426
/**
@@ -133,39 +135,75 @@ private static <T> SingleSource<T> uncheckedCast(final Single<T> single) {
133135
return (SingleSource<T>) single;
134136
}
135137

136-
private static final class PublisherToPublisherSource<T> implements PublisherSource<T> {
138+
// Visible for testing
139+
static final class PublisherToPublisherSource<T> extends Publisher<T> implements PublisherSource<T> {
137140
private final Publisher<T> publisher;
138141

139142
PublisherToPublisherSource(final Publisher<T> publisher) {
140143
this.publisher = requireNonNull(publisher);
141144
}
142145

146+
@Override
147+
protected void handleSubscribe(final Subscriber<? super T> subscriber) {
148+
deliverErrorFromSource(subscriber, newUnsupportedOperationException(getClass()));
149+
}
150+
151+
@Override
152+
void handleSubscribe(final Subscriber<? super T> subscriber,
153+
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
154+
publisher.delegateSubscribe(subscriber, capturedContext, contextProvider);
155+
}
156+
143157
@Override
144158
public void subscribe(final Subscriber<? super T> subscriber) {
145159
publisher.subscribeInternal(subscriber);
146160
}
147161
}
148162

149-
private static final class SingleToSingleSource<T> implements SingleSource<T> {
163+
// Visible for testing
164+
static final class SingleToSingleSource<T> extends Single<T> implements SingleSource<T> {
150165
private final Single<T> single;
151166

152167
SingleToSingleSource(final Single<T> single) {
153168
this.single = requireNonNull(single);
154169
}
155170

171+
@Override
172+
protected void handleSubscribe(final Subscriber<? super T> subscriber) {
173+
deliverErrorFromSource(subscriber, newUnsupportedOperationException(getClass()));
174+
}
175+
176+
@Override
177+
void handleSubscribe(final Subscriber<? super T> subscriber,
178+
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
179+
single.delegateSubscribe(subscriber, capturedContext, contextProvider);
180+
}
181+
156182
@Override
157183
public void subscribe(final Subscriber<? super T> subscriber) {
158184
single.subscribeInternal(subscriber);
159185
}
160186
}
161187

162-
private static final class CompletableToCompletableSource implements CompletableSource {
188+
// Visible for testing
189+
static final class CompletableToCompletableSource extends Completable implements CompletableSource {
163190
private final Completable completable;
164191

165192
CompletableToCompletableSource(final Completable completable) {
166193
this.completable = requireNonNull(completable);
167194
}
168195

196+
@Override
197+
protected void handleSubscribe(final Subscriber subscriber) {
198+
deliverErrorFromSource(subscriber, newUnsupportedOperationException(getClass()));
199+
}
200+
201+
@Override
202+
void handleSubscribe(final Subscriber subscriber,
203+
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
204+
completable.delegateSubscribe(subscriber, capturedContext, contextProvider);
205+
}
206+
169207
@Override
170208
public void subscribe(final Subscriber subscriber) {
171209
completable.subscribeInternal(subscriber);
@@ -180,7 +218,7 @@ private static final class PublisherSourceToPublisher<T> extends Publisher<T> im
180218
}
181219

182220
@Override
183-
protected void handleSubscribe(final PublisherSource.Subscriber<? super T> subscriber) {
221+
protected void handleSubscribe(final Subscriber<? super T> subscriber) {
184222
source.subscribe(subscriber);
185223
}
186224

@@ -198,7 +236,7 @@ private static final class SingleSourceToSingle<T> extends Single<T> implements
198236
}
199237

200238
@Override
201-
protected void handleSubscribe(final SingleSource.Subscriber<? super T> subscriber) {
239+
protected void handleSubscribe(final Subscriber<? super T> subscriber) {
202240
source.subscribe(subscriber);
203241
}
204242

@@ -216,7 +254,7 @@ private static final class CompletableSourceToCompletable extends Completable im
216254
}
217255

218256
@Override
219-
protected void handleSubscribe(final CompletableSource.Subscriber subscriber) {
257+
protected void handleSubscribe(final Subscriber subscriber) {
220258
source.subscribe(subscriber);
221259
}
222260

0 commit comments

Comments
 (0)