-
Notifications
You must be signed in to change notification settings - Fork 271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add onErrorClose
to onErrorResume
extensions
#369
Comments
listen(cancelOnError: true) |
Yep, @hoc081098 is exactly right: When you listen to the Stream, you can set |
No guys, that os not the behavior I want because I demands the user to manually call cancelOnError. I am developing an API and it must be concise, I don't want to depend on user for it. |
@shinayser Ah, gotcha -- just as a heads up, that would go against the normal behavior for Dart Streams, which usually require the listener to define that behavior. If you're cool living with that, you could create a custom transformer and apply it to the Streams that need to exhibit this behavior. import 'dart:async';
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';
extension OnErrorCancel<T> on Stream<T> {
Stream<T> onErrorCancel() {
return transform(
StreamTransformer.fromHandlers(
handleError: (err, stacktrace, sink) => sink.close(),
),
);
}
}
void main() {
test('onErrorCancel cancels a Stream when it emits an error', () {
final stream = ConcatStream<int>([
Stream.fromIterable([1]),
Stream.error(Exception()),
Stream.fromIterable([2]),
]);
expect(
stream.onErrorCancel(),
emitsInOrder(<dynamic>[1, emitsDone]),
);
});
} |
Iactually I solved it using the operators we already have, like this: timeout(ms(timeoutMillis))
.onErrorResumeNext(Observable.empty()) But it is a ugly workaround. I was thinking on on official, api supported, way of doing it. Perhaps even going forward and passing a predicate as a parameter that will check if the stream needs to be closed, or not. |
Yep, that's another way to go about it. If you were designing the api, what would it look like?
|
Yes, exaclly like that! return _maxpbPackagesStream
.where((it) => getMessageType(it) == Mts.MAXPB_CMD_POSITION)
.map((it) => MultipleReportData.fromBuffer(removeHeader(it)))
.doOnData((it) => sendAckForCommandId(it.packetID))
.timeout(ms(timeoutMillis))
.onErrorResume(
(error) {
if (error is TimeoutException) { // <--- Here I check is the error is just a timout, if it is, I "close" the stream.
return Observable.empty();
} else {
throw error;
}
},
); As you can see, it is pretty similar to what you proposed. Now going beyond that, we could also have some more operators that could close the stream based on the data received, what you think? Something like that: stream.closeAfterEmitting( bool Function<T>(T) shouldClose) //Checks if need to close, and if true, emits the last value, and closes.
stream.closeBeforeEmitting( bool Function<T>(T) shouldClose) //Checks if need to close, and if true, closes without emitting the latest value.
(the word "close" is more expressive than "cancel", what you think?) |
(Ops, I closed by mistake, sorry!) |
onErrorClose
to onErrorResume
extensions
Naming is hard :) I don't feel strongly one way or the other. I changed the issue to reflect the work that needs to be done to address the initial topic.
If I understand you correctly,
I'd say that'd be better to discuss in another issue, though, if you find it important so we can keep this issue focused on the original topic! |
Great brianegan! I will open another issue to discuss the takeWhileInclusive then! Thanks! |
I am trying to figure out how can I convert a "error" event into a "done" event but can't find a way.
Example: I have a
timeout
operator on my stream and want it to close the stream intead of transmitting the error.Is there a way of doing it?
Perhaps a
onErrorClose
event could be an interesting addition? What you think?The text was updated successfully, but these errors were encountered: