From e2c2f87d6bddbad34b13a959c81815d2558b376d Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Sat, 14 Nov 2020 21:11:02 +0700 Subject: [PATCH] Initial --- .gitignore | 13 + .idea/.gitignore | 3 + .idea/libraries/Dart_Packages.xml | 412 ++++++++++++++++++ .idea/libraries/Dart_SDK.xml | 29 ++ .idea/misc.xml | 6 + .idea/modules.xml | 8 + CHANGELOG.md | 3 + README.md | 22 + analysis_options.yaml | 6 + example/rx_storage_example.dart | 0 lib/rx_storage.dart | 11 + lib/src/impl/real_storage.dart | 319 ++++++++++++++ lib/src/interface/rx_storage.dart | 54 +++ lib/src/interface/storage.dart | 71 +++ lib/src/logger/default_logger.dart | 29 ++ lib/src/logger/logger.dart | 20 + lib/src/logger/logger_adapter.dart | 23 + lib/src/model/key_and_value.dart | 14 + .../map_not_null_stream_transformer.dart | 77 ++++ .../single_subscription.dart | 35 ++ lib/src/synchronous_future.dart | 66 +++ pubspec.yaml | 14 + rx_storage.iml | 14 + test/fake_storage.dart | 91 ++++ test/logger/default_logger_test.dart | 103 +++++ test/logger/logger_adapter_test.dart | 26 ++ test/model/key_and_value_test.dart | 43 ++ test/perf.dart | 51 +++ test/rx_storage_test.dart | 25 ++ test/storage/storage_test.dart | 128 ++++++ test/storage/streams_test.dart | 321 ++++++++++++++ test/stream_extensions/map_not_null_test.dart | 109 +++++ .../to_single_subscription_stream_test.dart | 34 ++ 33 files changed, 2180 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/libraries/Dart_Packages.xml create mode 100644 .idea/libraries/Dart_SDK.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 CHANGELOG.md create mode 100644 README.md create mode 100644 analysis_options.yaml create mode 100644 example/rx_storage_example.dart create mode 100644 lib/rx_storage.dart create mode 100644 lib/src/impl/real_storage.dart create mode 100644 lib/src/interface/rx_storage.dart create mode 100644 lib/src/interface/storage.dart create mode 100644 lib/src/logger/default_logger.dart create mode 100644 lib/src/logger/logger.dart create mode 100644 lib/src/logger/logger_adapter.dart create mode 100644 lib/src/model/key_and_value.dart create mode 100644 lib/src/stream_extensions/map_not_null_stream_transformer.dart create mode 100644 lib/src/stream_extensions/single_subscription.dart create mode 100644 lib/src/synchronous_future.dart create mode 100644 pubspec.yaml create mode 100644 rx_storage.iml create mode 100644 test/fake_storage.dart create mode 100644 test/logger/default_logger_test.dart create mode 100644 test/logger/logger_adapter_test.dart create mode 100644 test/model/key_and_value_test.dart create mode 100644 test/perf.dart create mode 100644 test/rx_storage_test.dart create mode 100644 test/storage/storage_test.dart create mode 100644 test/storage/streams_test.dart create mode 100644 test/stream_extensions/map_not_null_test.dart create mode 100644 test/stream_extensions/to_single_subscription_stream_test.dart diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0c44ab0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +# Files and directories created by pub +.dart_tool/ +.packages + +# Omit commiting pubspec.lock for library packages: +# https://dart.dev/guides/libraries/private-files#pubspeclock +pubspec.lock + +# Conventional directory for build outputs +build/ + +# Directory created by dartdoc +doc/api/ diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/libraries/Dart_Packages.xml b/.idea/libraries/Dart_Packages.xml new file mode 100644 index 0000000..e1316c3 --- /dev/null +++ b/.idea/libraries/Dart_Packages.xml @@ -0,0 +1,412 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/libraries/Dart_SDK.xml b/.idea/libraries/Dart_SDK.xml new file mode 100644 index 0000000..7195834 --- /dev/null +++ b/.idea/libraries/Dart_SDK.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..639900d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..eb4e0cf --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..687440b --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,3 @@ +## 1.0.0 + +- Initial version, created by Stagehand diff --git a/README.md b/README.md new file mode 100644 index 0000000..1ca6829 --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +A library for Dart developers. + +Created from templates made available by Stagehand under a BSD-style +[license](https://github.com/dart-lang/stagehand/blob/master/LICENSE). + +## Usage + +A simple usage example: + +```dart +import 'package:rx_storage/rx_storage.dart'; + +main() { + var awesome = new Awesome(); +} +``` + +## Features and bugs + +Please file feature requests and bugs at the [issue tracker][tracker]. + +[tracker]: http://example.com/issues/replaceme diff --git a/analysis_options.yaml b/analysis_options.yaml new file mode 100644 index 0000000..ea7951c --- /dev/null +++ b/analysis_options.yaml @@ -0,0 +1,6 @@ +include: package:pedantic/analysis_options.1.9.0.yaml +linter: + rules: + - public_member_api_docs + - prefer_final_locals + - prefer_relative_imports \ No newline at end of file diff --git a/example/rx_storage_example.dart b/example/rx_storage_example.dart new file mode 100644 index 0000000..e69de29 diff --git a/lib/rx_storage.dart b/lib/rx_storage.dart new file mode 100644 index 0000000..95fff58 --- /dev/null +++ b/lib/rx_storage.dart @@ -0,0 +1,11 @@ +/// Support for doing something awesome. +/// +/// More dartdocs go here. +library rx_storage; + +export 'src/interface/rx_storage.dart'; +export 'src/interface/storage.dart'; +export 'src/logger/default_logger.dart'; +export 'src/logger/logger.dart'; +export 'src/logger/logger_adapter.dart'; +export 'src/model/key_and_value.dart'; diff --git a/lib/src/impl/real_storage.dart b/lib/src/impl/real_storage.dart new file mode 100644 index 0000000..f481ac5 --- /dev/null +++ b/lib/src/impl/real_storage.dart @@ -0,0 +1,319 @@ +import 'dart:async'; +import 'dart:collection'; + +import 'package:rxdart/rxdart.dart'; + +import '../interface/rx_storage.dart'; +import '../interface/storage.dart'; +import '../logger/logger.dart'; +import '../model/key_and_value.dart'; +import '../stream_extensions/map_not_null_stream_transformer.dart'; +import '../stream_extensions/single_subscription.dart'; +import '../synchronous_future.dart'; + +/// Default [RxStorage] implementation +class RealRxStorage implements RxStorage { + /// Trigger subject + final _keyValuesSubject = PublishSubject>(); + + /// Logger subscription. Nullable + StreamSubscription> _subscription; + + /// Nullable. + Storage _storage; + + /// Nullable. + Future _storageFuture; + + /// Nullable + final Logger _logger; + + /// Nullable + final void Function() _onDispose; + + /// Construct a [RealRxStorage]. + RealRxStorage( + FutureOr storageOrFuture, [ + this._logger, + this._onDispose, + ]) : assert(storageOrFuture != null) { + if (storageOrFuture is Future) { + _storageFuture = storageOrFuture.then((value) => _storage = value); + } else { + _storageFuture = null; + _storage = storageOrFuture; + } + + if (_logger == null) { + return; + } + + _subscription = _keyValuesSubject.listen((map) { + final pairs = [ + for (final entry in map.entries) + KeyAndValue( + entry.key, + entry.value, + ), + ]; + _logger.keysChanged(UnmodifiableListView(pairs)); + }); + } + + // + // Internal + // + + /// Workaround to capture generics + static Type _typeOf() => T; + + /// Read value from persistent [storage] by [key]. + static Future _readFromStorage(Storage storage, String key) { + assert(key != null); + + if (T == dynamic) { + return storage.get(key); + } + if (T == double) { + return storage.getDouble(key); + } + if (T == int) { + return storage.getInt(key); + } + if (T == bool) { + return storage.getBool(key); + } + if (T == String) { + return storage.getString(key); + } + if (T == _typeOf>()) { + return storage.getStringList(key); + } + throw StateError('Unhandled type $T'); + } + + /// Write [value] to [storage] associated with [key] + static Future _writeToStorage( + Storage storage, + String key, + T value, + ) { + if (T == dynamic) { + assert(value == null); + return storage.remove(key); + } + + assert(key != null); + + // TODO(40014): Remove cast when type promotion works. + // This would normally be `as T` but we use `as dynamic` to make the + // unneeded check be implicit to match dart2js unsound optimizations in the + // user code. + final dynamicVal = value as dynamic; + + if (T == double) { + return storage.setDouble(key, dynamicVal); + } + if (T == int) { + return storage.setInt(key, dynamicVal); + } + if (T == bool) { + return storage.setBool(key, dynamicVal); + } + if (T == String) { + return storage.setString(key, dynamicVal); + } + if (T == _typeOf>()) { + return storage.setStringList(key, dynamicVal); + } + + throw StateError('Unhandled type $T'); + } + + /// Get [Stream] from the persistent storage + Stream _getStream(String key) { + final stream = _keyValuesSubject + .toSingleSubscriptionStream() + .mapNotNull( + (map) => map.containsKey(key) ? KeyAndValue(key, map[key]) : null) + .startWith(null) // Dummy value to trigger initial load. + .asyncMap((entry) => entry == null + ? _getValue(key) + : SynchronousFuture(entry.value as T)); + + if (_logger == null) { + return stream; + } + + return stream + .doOnData((value) => _logger.doOnDataStream(KeyAndValue(key, value))) + .doOnError((e, StackTrace s) => _logger.doOnErrorStream(e, s)); + } + + /// Get value from the persistent [Storage] by [key]. + Future _getValue(String key) async { + final storage = _storage ?? await _storageFuture; + final T value = await _readFromStorage(storage, key); + + _logger?.readValue(T, key, value); + return value; + } + + /// Set [value] associated with [key]. + Future _setValue(String key, T value) async { + final storage = _storage ?? await _storageFuture; + final result = await _writeToStorage(storage, key, value); + + _logger?.writeValue(T, key, value, result); + if (result ?? false) { + _sendKeyValueChanged({key: value}); + } + + return result; + } + + /// Add pairs to subject to trigger. + /// Do nothing if subject already closed. + void _sendKeyValueChanged(Map map) { + try { + _keyValuesSubject.add(map); + } catch (e) { + print(e); + // Do nothing + } + } + + // Get and set methods (implements [Storage]) + + @override + Future containsKey(String key) async { + assert(key != null); + final storage = _storage ?? await _storageFuture; + return storage.containsKey(key); + } + + @override + Future get(String key) => _getValue(key); + + @override + Future getBool(String key) => _getValue(key); + + @override + Future getDouble(String key) => _getValue(key); + + @override + Future getInt(String key) => _getValue(key); + + @override + Future> getKeys() async { + final storage = _storage ?? await _storageFuture; + return storage.getKeys(); + } + + @override + Future getString(String key) => _getValue(key); + + @override + Future> getStringList(String key) => + _getValue>(key); + + @override + Future clear() async { + final storage = _storage ?? await _storageFuture; + final keys = await storage.getKeys(); + final result = await storage.clear(); + + // All values are set to null + if (_logger != null) { + for (final key in keys) { + _logger.writeValue(dynamic, key, null, result); + } + } + if (result ?? false) { + final map = {for (final k in keys) k: null}; + _sendKeyValueChanged(map); + } + + return result; + } + + @override + Future reload() async { + final storage = _storage ?? await _storageFuture; + await storage.reload(); + + final keys = await storage.getKeys(); + + // Read new values from storage. + final map = {for (final k in keys) k: await storage.get(k)}; + if (_logger != null) { + for (final key in keys) { + _logger.readValue(dynamic, key, map[key]); + } + } + _sendKeyValueChanged(map); + } + + @override + Future remove(String key) => _setValue(key, null); + + @override + Future setBool(String key, bool value) => _setValue(key, value); + + @override + Future setDouble(String key, double value) => + _setValue(key, value); + + @override + Future setInt(String key, int value) => _setValue(key, value); + + @override + Future setString(String key, String value) => + _setValue(key, value); + + @override + Future setStringList(String key, List value) => + _setValue>(key, value); + + // Get streams (implements [RxStorage]) + + @override + Stream getStream(String key) => _getStream(key); + + @override + Stream getBoolStream(String key) => _getStream(key); + + @override + Stream getDoubleStream(String key) => _getStream(key); + + @override + Stream getIntStream(String key) => _getStream(key); + + @override + Stream getStringStream(String key) => _getStream(key); + + @override + Stream> getStringListStream(String key) => + _getStream>(key); + + @override + Stream> getKeysStream() => _keyValuesSubject + .toSingleSubscriptionStream() + .startWith(null) + .asyncMap((_) => getKeys()); + + @override + Future dispose() async { + final futures = [_keyValuesSubject.close(), _subscription?.cancel()] + .where((future) => future != null) + .toList(growable: false); + + if (futures.length == 1) { + await futures[0]; + } else { + await Future.wait(futures); + } + + _onDispose?.call(); + } +} diff --git a/lib/src/interface/rx_storage.dart b/lib/src/interface/rx_storage.dart new file mode 100644 index 0000000..8247bd2 --- /dev/null +++ b/lib/src/interface/rx_storage.dart @@ -0,0 +1,54 @@ +import 'dart:async'; + +import '../impl/real_storage.dart'; +import '../logger/logger.dart'; +import 'storage.dart'; + +/// Get [Stream]s by key from persistent storage. +abstract class RxStorage implements Storage { + /// TODO + factory RxStorage( + FutureOr storageOrFuture, [ + Logger logger, + void Function() onDispose, + ]) => + RealRxStorage(storageOrFuture, logger, onDispose); + + /// Return [Stream] that will emit value read from persistent storage. + /// It will automatic emit value when value associated with key was changed. + Stream getStream(String key); + + /// Return [Stream] that will emit value read from persistent storage. + /// It will automatic emit value when value associated with [key] was changed. + /// This stream will emit an error if it's not a bool. + Stream getBoolStream(String key); + + /// Return [Stream] that will emit value read from persistent storage. + /// It will automatic emit value when value associated with [key] was changed. + /// This stream will emit an error if it's not a double. + Stream getDoubleStream(String key); + + /// Return [Stream] that will emit value read from persistent storage. + /// It will automatic emit value when value associated with [key] was changed. + /// This stream will emit an error if it's not a int. + Stream getIntStream(String key); + + /// Return [Stream] that will emit value read from persistent storage. + /// It will automatic emit value when value associated with [key] was changed. + /// This stream will emit an error if it's not a String. + Stream getStringStream(String key); + + /// Return [Stream] that will emit value read from persistent storage. + /// It will automatic emit value when value associated with [key] was changed. + /// This stream will emit an error if it's not a string set. + Stream> getStringListStream(String key); + + /// Return [Stream] that will emit all keys read from persistent storage. + /// It will automatic emit all keys when any value was changed. + Stream> getKeysStream(); + + /// Clean up resources - Closes the streams. + /// This method should be called when a [RxStorage] is no longer needed. + /// Once `dispose` is called, all streams will `not` emit changed value when value changed. + Future dispose(); +} diff --git a/lib/src/interface/storage.dart b/lib/src/interface/storage.dart new file mode 100644 index 0000000..fe3b368 --- /dev/null +++ b/lib/src/interface/storage.dart @@ -0,0 +1,71 @@ +/// A persistent store for simple data. Data is persisted to disk asynchronously. +abstract class Storage { + /// Returns a future complete with value true if the persistent storage + /// contains the given [key]. + Future containsKey(String key); + + /// Reads a value of any type from persistent storage. + Future get(String key); + + /// Reads a value from persistent storage, return a future that completes + /// with an error if it's not a bool. + Future getBool(String key); + + /// Reads a value from persistent storage, return a future that completes + /// with an error if it's not a double. + Future getDouble(String key); + + /// Reads a value from persistent storage, return a future that completes + /// with an error if it's not a int. + Future getInt(String key); + + /// Returns all keys in the persistent storage. + Future> getKeys(); + + /// Reads a value from persistent storage, return a future that completes + /// with an error if it's not a String. + Future getString(String key); + + /// Reads a value from persistent storage, return a future that completes + /// with an error if it's not a string set. + Future> getStringList(String key); + + /// Completes with true once the storage for the app has been cleared. + Future clear(); + + /// Fetches the latest values from the host platform. + /// + /// Use this method to observe modifications that were made in native code + /// (without using the plugin) while the app is running. + Future reload(); + + /// Removes an entry from persistent storage. + Future remove(String key); + + /// Saves a boolean [value] to persistent storage in the background. + /// + /// If [value] is null, this is equivalent to calling [remove()] on the [key]. + Future setBool(String key, bool value); + + /// Saves a double [value] to persistent storage in the background. + /// + /// Android doesn't support storing doubles, so it will be stored as a float. + /// + /// If [value] is null, this is equivalent to calling [remove()] on the [key]. + Future setDouble(String key, double value); + + /// Saves an integer [value] to persistent storage in the background. + /// + /// If [value] is null, this is equivalent to calling [remove()] on the [key]. + Future setInt(String key, int value); + + /// Saves a string [value] to persistent storage in the background. + /// + /// If [value] is null, this is equivalent to calling [remove()] on the [key]. + Future setString(String key, String value); + + /// Saves a list of strings [value] to persistent storage in the background. + /// + /// If [value] is null, this is equivalent to calling [remove()] on the [key]. + Future setStringList(String key, List value); +} diff --git a/lib/src/logger/default_logger.dart b/lib/src/logger/default_logger.dart new file mode 100644 index 0000000..9f7d454 --- /dev/null +++ b/lib/src/logger/default_logger.dart @@ -0,0 +1,29 @@ +import '../model/key_and_value.dart'; +import 'logger.dart'; + +/// Default Logger's implementation, simply print to the console. +class DefaultLogger implements Logger { + /// Construct a [DefaultLogger]. + const DefaultLogger(); + + @override + void keysChanged(Iterable pairs) { + print(' ↓ Key changes'); + print(pairs.map((p) => ' → $p').join('\n')); + } + + @override + void doOnDataStream(KeyAndValue pair) => print(' → Stream emits data: $pair'); + + @override + void doOnErrorStream(dynamic error, StackTrace stackTrace) => + print(' → Stream emits error: $error, $stackTrace'); + + @override + void readValue(Type type, String key, dynamic value) => + print(" → Read value: type $type, key '$key' → $value"); + + @override + void writeValue(Type type, String key, dynamic value, bool writeResult) => print( + " → Write value: type $type, key '$key', value $value → result $writeResult"); +} diff --git a/lib/src/logger/logger.dart b/lib/src/logger/logger.dart new file mode 100644 index 0000000..03def0a --- /dev/null +++ b/lib/src/logger/logger.dart @@ -0,0 +1,20 @@ +import '../interface/storage.dart'; +import '../model/key_and_value.dart'; + +/// Log messages about operations (such as read, write, value change) and stream events. +abstract class Logger { + /// Called when values have changed. + void keysChanged(Iterable> pairs); + + /// Called when the stream emits an item. + void doOnDataStream(KeyAndValue pair); + + /// Called when the stream emits an error. + void doOnErrorStream(dynamic error, StackTrace stackTrace); + + /// Called when reading value from [Storage]. + void readValue(Type type, String key, dynamic value); + + /// Called when writing value to [Storage]. + void writeValue(Type type, String key, dynamic value, bool writeResult); +} diff --git a/lib/src/logger/logger_adapter.dart b/lib/src/logger/logger_adapter.dart new file mode 100644 index 0000000..6c8aa9e --- /dev/null +++ b/lib/src/logger/logger_adapter.dart @@ -0,0 +1,23 @@ +import '../model/key_and_value.dart'; +import 'logger.dart'; + +/// Logger's implementation with empty methods. +class LoggerAdapter implements Logger { + /// Constructs a [LoggerAdapter]. + const LoggerAdapter(); + + @override + void doOnDataStream(KeyAndValue pair) {} + + @override + void doOnErrorStream(dynamic error, StackTrace stackTrace) {} + + @override + void keysChanged(Iterable pairs) {} + + @override + void readValue(Type type, String key, dynamic value) {} + + @override + void writeValue(Type type, String key, dynamic value, bool writeResult) {} +} diff --git a/lib/src/model/key_and_value.dart b/lib/src/model/key_and_value.dart new file mode 100644 index 0000000..8b39466 --- /dev/null +++ b/lib/src/model/key_and_value.dart @@ -0,0 +1,14 @@ +/// Pair of [key] and [value]. +class KeyAndValue { + /// The key of the [KeyAndValue]. + final String key; + + /// The value associated to [key]. + final T value; + + /// Construct a [KeyAndValue] with [key] and [key]. + const KeyAndValue(this.key, this.value); + + @override + String toString() => "{ '$key': $value }"; +} diff --git a/lib/src/stream_extensions/map_not_null_stream_transformer.dart b/lib/src/stream_extensions/map_not_null_stream_transformer.dart new file mode 100644 index 0000000..d5dd965 --- /dev/null +++ b/lib/src/stream_extensions/map_not_null_stream_transformer.dart @@ -0,0 +1,77 @@ +import 'dart:async'; + +/// Map stream and reject null +class MapNotNullStreamTransformer extends StreamTransformerBase { + final R Function(T) _mapper; + + /// Construct a [MapNotNullStreamTransformer] with [mapper] + MapNotNullStreamTransformer(R Function(T) mapper) + : assert(mapper != null), + _mapper = mapper; + + @override + Stream bind(Stream stream) { + StreamController controller; + StreamSubscription subscription; + + void onListen() { + subscription = stream.listen( + (data) { + R mappedValue; + + try { + mappedValue = _mapper(data); + } catch (e, s) { + controller.addError(e, s); + return; + } + + if (mappedValue != null) { + controller.add(mappedValue); + } + }, + onError: controller.addError, + onDone: controller.close, + ); + } + + Future onCancel() => subscription.cancel(); + + if (stream.isBroadcast) { + controller = StreamController.broadcast( + sync: true, + onListen: onListen, + onCancel: onCancel, + ); + } else { + controller = StreamController( + sync: true, + onListen: onListen, + onPause: () => subscription.pause(), + onResume: () => subscription.resume(), + onCancel: onCancel, + ); + } + + return controller.stream; + } +} + +/// Map stream and reject null extension +/// ### Example +/// +/// Stream.fromIterable([1, 'two', 3, 'four']) +/// .mapNotNull((i) => i is int ? i : null) +/// .listen(print); // prints 1, 3 +/// +/// #### as opposed to: +/// +/// Stream.fromIterable([1, 'two', 3, 'four']) +/// .map((i) => i is int ? i : null) +/// .where((i) => i != null) +/// .listen(print); // prints 1, 3 +extension MapNotNullStreamExtension on Stream { + /// Map stream and reject null + Stream mapNotNull(R Function(T) mapper) => + transform(MapNotNullStreamTransformer(mapper)); +} diff --git a/lib/src/stream_extensions/single_subscription.dart b/lib/src/stream_extensions/single_subscription.dart new file mode 100644 index 0000000..c0d9794 --- /dev/null +++ b/lib/src/stream_extensions/single_subscription.dart @@ -0,0 +1,35 @@ +import 'dart:async'; + +/// A transformer that converts a broadcast stream into a single-subscription +/// stream. +class SingleSubscriptionTransformer extends StreamTransformerBase { + /// + const SingleSubscriptionTransformer(); + + @override + Stream bind(Stream stream) { + StreamSubscription subscription; + StreamController controller; + + controller = StreamController( + sync: true, + onListen: () { + subscription = stream.listen( + controller.add, + onError: controller.addError, + onDone: controller.close, + ); + }, + onCancel: () => subscription.cancel(), + ); + + return controller.stream; + } +} + +/// +extension ToSingleSubscriptionStreamExtension on Stream { + /// + Stream toSingleSubscriptionStream() => + transform(SingleSubscriptionTransformer()); +} diff --git a/lib/src/synchronous_future.dart b/lib/src/synchronous_future.dart new file mode 100644 index 0000000..c915186 --- /dev/null +++ b/lib/src/synchronous_future.dart @@ -0,0 +1,66 @@ +// Copyright 2014 The Flutter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +import 'dart:async'; + +/// A [Future] whose [then] implementation calls the callback immediately. +/// +/// This is similar to [new Future.value], except that the value is available in +/// the same event-loop iteration. +/// +/// ⚠ This class is useful in cases where you want to expose a single API, where +/// you normally want to have everything execute synchronously, but where on +/// rare occasions you want the ability to switch to an asynchronous model. **In +/// general use of this class should be avoided as it is very difficult to debug +/// such bimodal behavior.** +class SynchronousFuture implements Future { + /// Creates a synchronous future. + /// + /// See also: + /// + /// * [new Future.value] for information about creating a regular + /// [Future] that completes with a value. + SynchronousFuture(this._value); + + final T _value; + + @override + Stream asStream() { + final controller = StreamController(); + controller.add(_value); + controller.close(); + return controller.stream; + } + + @override + Future catchError(Function onError, {bool Function(Object error) test}) => + Completer().future; + + @override + Future then(FutureOr Function(T value) onValue, {Function onError}) { + final dynamic result = onValue(_value); + if (result is Future) { + return result; + } + return SynchronousFuture(result as R); + } + + @override + Future timeout(Duration timeLimit, {FutureOr Function() onTimeout}) { + return Future.value(_value).timeout(timeLimit, onTimeout: onTimeout); + } + + @override + Future whenComplete(FutureOr Function() action) { + try { + final result = action(); + if (result is Future) { + return result.then((dynamic value) => _value); + } + return this; + } catch (e, stack) { + return Future.error(e, stack); + } + } +} diff --git a/pubspec.yaml b/pubspec.yaml new file mode 100644 index 0000000..5061aec --- /dev/null +++ b/pubspec.yaml @@ -0,0 +1,14 @@ +name: rx_storage +description: A starting point for Dart libraries or applications. +# version: 1.0.0 +# homepage: https://www.example.com + +environment: + sdk: '>=2.7.0 <3.0.0' + +dependencies: + rxdart: '>=0.24.1 <0.26.0' + +dev_dependencies: + pedantic: ^1.9.0 + test: ^1.15.3 diff --git a/rx_storage.iml b/rx_storage.iml new file mode 100644 index 0000000..75734c9 --- /dev/null +++ b/rx_storage.iml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/fake_storage.dart b/test/fake_storage.dart new file mode 100644 index 0000000..27016d9 --- /dev/null +++ b/test/fake_storage.dart @@ -0,0 +1,91 @@ +import 'package:rx_storage/src/interface/storage.dart'; +import 'package:rx_storage/src/synchronous_future.dart'; + +Future _wrap(T value) => SynchronousFuture(value); + +class FakeStorage implements Storage { + Map _map; + Map _pendingMap; + + FakeStorage(Map map) : _map = Map.of(map); + + set map(Map map) => _pendingMap = Map.of(map); + + Future _setValue(String key, dynamic value) { + if (value is List) { + _map[key] = value?.toList(); + } else { + _map[key] = value; + } + return _wrap(true); + } + + Future _getValue(String key) { + final value = _map[key] as T; + return value is List + ? _wrap(value.toList() as dynamic) + : _wrap(value); + } + + // + // + // + + @override + Future clear() { + _map.clear(); + return _wrap(true); + } + + @override + Future containsKey(String key) => _wrap(_map.containsKey(key)); + + @override + Future get(String key) => _getValue(key); + + @override + Future getBool(String key) => _getValue(key); + + @override + Future getDouble(String key) => _getValue(key); + + @override + Future getInt(String key) => _getValue(key); + + @override + Future> getKeys() => _wrap(_map.keys.toSet()); + + @override + Future getString(String key) => _getValue(key); + + @override + Future> getStringList(String key) => _getValue(key); + + @override + Future reload() { + if (_pendingMap != null) { + _map = _pendingMap; + _pendingMap = null; + } + return _wrap(null); + } + + @override + Future remove(String key) => _setValue(key, null); + + @override + Future setBool(String key, bool value) => _setValue(key, value); + + @override + Future setDouble(String key, double value) => _setValue(key, value); + + @override + Future setInt(String key, int value) => _setValue(key, value); + + @override + Future setString(String key, String value) => _setValue(key, value); + + @override + Future setStringList(String key, List value) => + _setValue(key, value); +} diff --git a/test/logger/default_logger_test.dart b/test/logger/default_logger_test.dart new file mode 100644 index 0000000..774c622 --- /dev/null +++ b/test/logger/default_logger_test.dart @@ -0,0 +1,103 @@ +import 'dart:async'; + +import 'package:rx_storage/src/logger/default_logger.dart'; +import 'package:rx_storage/src/model/key_and_value.dart'; +import 'package:test/test.dart'; + +void main() { + final logs = []; + + dynamic Function() overridePrint(dynamic Function() testFn) { + return () { + final spec = ZoneSpecification(print: (_, __, ___, String line) { + // Add to log instead of printing to stdout + logs.add(line); + }); + return Zone.current.fork(specification: spec).run(testFn); + }; + } + + group('DefaultLogger', () { + final logger = DefaultLogger(); + + setUp(() => logs.clear()); + + test( + 'keysChanged', + overridePrint(() { + const pairs = [ + KeyAndValue('key1', 'value1'), + KeyAndValue('key2', 2), + ]; + logger.keysChanged(pairs); + expect( + logs, + [ + ' ↓ Key changes', + " → { 'key1': value1 }" '\n' " → { 'key2': 2 }", + ], + ); + }), + ); + + test( + 'doOnDataStream', + overridePrint(() { + const keyAndValue = KeyAndValue('key1', 'value1'); + logger.doOnDataStream(keyAndValue); + + expect( + logs, + [" → Stream emits data: { 'key1': value1 }"], + ); + }), + ); + + test( + 'doOnErrorStream', + overridePrint(() { + final stackTrace = StackTrace.current; + final exception = Exception(); + logger.doOnErrorStream(exception, stackTrace); + + expect( + logs, + [' → Stream emits error: $exception, $stackTrace'], + ); + }), + ); + + test( + 'readValue', + overridePrint(() { + const type = String; + const key = 'key'; + const value = 'value'; + logger.readValue(type, key, value); + + expect( + logs, + [" → Read value: type String, key 'key' → value"], + ); + }), + ); + + test( + 'writeValue', + overridePrint(() { + const type = String; + const key = 'key'; + const value = 'value'; + const writeResult = true; + logger.writeValue(type, key, value, writeResult); + + expect( + logs, + [ + " → Write value: type String, key 'key', value value → result true" + ], + ); + }), + ); + }); +} diff --git a/test/logger/logger_adapter_test.dart b/test/logger/logger_adapter_test.dart new file mode 100644 index 0000000..bb9d30e --- /dev/null +++ b/test/logger/logger_adapter_test.dart @@ -0,0 +1,26 @@ +import 'package:rx_storage/src/logger/logger_adapter.dart'; +import 'package:rx_storage/src/model/key_and_value.dart'; +import 'package:test/test.dart'; + +void main() { + group('LoggerAdapter', () { + test('Works', () { + final logger = LoggerAdapter(); + const keyAndValue = KeyAndValue('key', 'value'); + logger.keysChanged([keyAndValue]); + logger.doOnDataStream(keyAndValue); + logger.doOnErrorStream(Exception(), StackTrace.current); + logger.writeValue( + keyAndValue.value.runtimeType, + keyAndValue.key, + keyAndValue.value, + true, + ); + logger.readValue( + keyAndValue.value.runtimeType, + keyAndValue.key, + keyAndValue.value, + ); + }); + }); +} diff --git a/test/model/key_and_value_test.dart b/test/model/key_and_value_test.dart new file mode 100644 index 0000000..e994634 --- /dev/null +++ b/test/model/key_and_value_test.dart @@ -0,0 +1,43 @@ +import 'package:rx_storage/src/model/key_and_value.dart'; +import 'package:test/test.dart'; + +void main() { + group('$KeyAndValue tests', () { + test('Construct a KeyAndValue', () { + KeyAndValue('key1', 'value'); + KeyAndValue('key2', 2); + KeyAndValue('key3', 2.5); + KeyAndValue('key4', true); + KeyAndValue('key5', null); + KeyAndValue('key6', ['v1', 'v2', 'v3']); + expect(true, isTrue); + }); + + test('KeyAndValue.toString', () { + expect( + KeyAndValue('key1', 'value').toString(), + "{ 'key1': value }", + ); + expect( + KeyAndValue('key2', 2).toString(), + "{ 'key2': 2 }", + ); + expect( + KeyAndValue('key3', 2.5).toString(), + "{ 'key3': 2.5 }", + ); + expect( + KeyAndValue('key4', true).toString(), + "{ 'key4': true }", + ); + expect( + KeyAndValue('key5', null).toString(), + "{ 'key5': null }", + ); + expect( + KeyAndValue('key6', ['v1', 'v2', 'v3']).toString(), + "{ 'key6': [v1, v2, v3] }", + ); + }); + }); +} diff --git a/test/perf.dart b/test/perf.dart new file mode 100644 index 0000000..5b515b7 --- /dev/null +++ b/test/perf.dart @@ -0,0 +1,51 @@ +import 'package:rx_storage/rx_storage.dart'; + +import 'fake_storage.dart'; + +void main() async { + const kTestValues = { + 'String': 'hello world', + 'bool': true, + 'int': 42, + 'double': 3.14159, + 'List': ['foo', 'bar'], + }; + + // + // const kTestValues2 = { + // 'String': 'goodbye world', + // 'bool': false, + // 'int': 1337, + // 'double': 2.71828, + // 'List': ['baz', 'quox'], + // }; + + final storage = FakeStorage(kTestValues); + final rxStorage = RxStorage( + Future.delayed(const Duration(milliseconds: 100), () => storage)); + + final stopwatch = Stopwatch(); + final list = kTestValues.keys.toList(); + + stopwatch + ..reset() + ..start(); + print('Start...'); + for (var i = 0; i < 10000; i++) { + await rxStorage.get(list[i % list.length]); + } + print('End...'); + + stopwatch.stop(); + print(stopwatch.elapsedMilliseconds); + + /*for (var i = 0; i < 10000; i++) { + await Future.value(1) + .then((value) => SynchronousFuture(value + 1)) + .then((value) {}) + .then((value) {}) + .then((value) => SynchronousFuture('22')); + print(i); + } + print('done');*/ +} diff --git a/test/rx_storage_test.dart b/test/rx_storage_test.dart new file mode 100644 index 0000000..a063a4b --- /dev/null +++ b/test/rx_storage_test.dart @@ -0,0 +1,25 @@ +import 'logger/default_logger_test.dart' as default_logger_test; +import 'logger/logger_adapter_test.dart' as logger_adapter_test; +import 'model/key_and_value_test.dart' as key_and_value_test; +import 'storage/storage_test.dart' as storage_test; +import 'stream_extensions/map_not_null_test.dart' as map_not_null_test; +import 'stream_extensions/to_single_subscription_stream_test.dart' + as to_single_subscription_stream_test; +import 'storage/streams_test.dart' as streams_test; + +void main() { + // logger tests + default_logger_test.main(); + logger_adapter_test.main(); + + // model test + key_and_value_test.main(); + + // storage tests + storage_test.main(); + streams_test.main(); + + // stream extensions tests + map_not_null_test.main(); + to_single_subscription_stream_test.main(); +} diff --git a/test/storage/storage_test.dart b/test/storage/storage_test.dart new file mode 100644 index 0000000..f2563ce --- /dev/null +++ b/test/storage/storage_test.dart @@ -0,0 +1,128 @@ +import 'package:collection/collection.dart'; +import 'package:rx_storage/rx_storage.dart'; +import 'package:test/test.dart'; + +import '../fake_storage.dart'; + +void main() { + group('Storage', () { + const kTestValues = { + 'String': 'hello world', + 'bool': true, + 'int': 42, + 'double': 3.14159, + 'List': ['foo', 'bar'], + }; + + const kTestValues2 = { + 'String': 'goodbye world', + 'bool': false, + 'int': 1337, + 'double': 2.71828, + 'List': ['baz', 'quox'], + }; + + FakeStorage storage; + RxStorage rxStorage; + + setUp(() { + storage = FakeStorage(kTestValues); + rxStorage = RxStorage(storage, const DefaultLogger()); + }); + + tearDown(() async { + await rxStorage.clear(); + }); + + test('reading', () async { + expect(await rxStorage.get('String'), kTestValues['String']); + expect(await rxStorage.get('bool'), kTestValues['bool']); + expect(await rxStorage.get('int'), kTestValues['int']); + expect(await rxStorage.get('double'), kTestValues['double']); + expect(await rxStorage.get('List'), kTestValues['List']); + expect(await rxStorage.getString('String'), kTestValues['String']); + expect(await rxStorage.getBool('bool'), kTestValues['bool']); + expect(await rxStorage.getInt('int'), kTestValues['int']); + expect(await rxStorage.getDouble('double'), kTestValues['double']); + expect(await rxStorage.getStringList('List'), kTestValues['List']); + }); + + test('writing', () async { + await Future.wait(>[ + rxStorage.setString('String', kTestValues2['String']), + rxStorage.setBool('bool', kTestValues2['bool']), + rxStorage.setInt('int', kTestValues2['int']), + rxStorage.setDouble('double', kTestValues2['double']), + rxStorage.setStringList('List', kTestValues2['List']) + ]); + + expect(await rxStorage.getString('String'), kTestValues2['String']); + expect(await rxStorage.getBool('bool'), kTestValues2['bool']); + expect(await rxStorage.getInt('int'), kTestValues2['int']); + expect(await rxStorage.getDouble('double'), kTestValues2['double']); + expect(await rxStorage.getStringList('List'), kTestValues2['List']); + }); + + test('removing', () async { + const key = 'testKey'; + await rxStorage.setString(key, null); + await rxStorage.setBool(key, null); + await rxStorage.setInt(key, null); + await rxStorage.setDouble(key, null); + await rxStorage.setStringList(key, null); + await rxStorage.remove(key); + }); + + test('containsKey', () async { + const key = 'testKey'; + + expect(await rxStorage.containsKey(key), false); + + await rxStorage.setString(key, 'test'); + expect(await rxStorage.containsKey(key), true); + }); + + test('clearing', () async { + await rxStorage.clear(); + expect(await rxStorage.getString('String'), null); + expect(await rxStorage.getBool('bool'), null); + expect(await rxStorage.getInt('int'), null); + expect(await rxStorage.getDouble('double'), null); + expect(await rxStorage.getStringList('List'), null); + }); + + test('reloading', () async { + await rxStorage.setString('String', kTestValues['String']); + expect(await rxStorage.getString('String'), kTestValues['String']); + + storage.map = kTestValues2; + expect(await rxStorage.getString('String'), kTestValues['String']); + + await rxStorage.reload(); + expect(await rxStorage.getString('String'), kTestValues2['String']); + }); + + test('writing copy of strings list', () async { + final myList = []; + await rxStorage.setStringList('myList', myList); + myList.add('foobar'); + + final cachedList = await rxStorage.getStringList('myList'); + expect(cachedList, []); + + cachedList.add('foobar2'); + + expect(await rxStorage.getStringList('myList'), []); + }); + + test('getKeys', () async { + final keys = await rxStorage.getKeys(); + final expected = kTestValues.keys.toSet(); + + expect( + SetEquality().equals(keys, expected), + isTrue, + ); + }); + }); +} diff --git a/test/storage/streams_test.dart b/test/storage/streams_test.dart new file mode 100644 index 0000000..fbd45ea --- /dev/null +++ b/test/storage/streams_test.dart @@ -0,0 +1,321 @@ +import 'package:rx_storage/rx_storage.dart'; +import 'package:test/test.dart'; + +import '../fake_storage.dart'; + +void main() { + group('Test Stream', () { + const kTestValues = { + 'String': 'hello world', + 'bool': true, + 'int': 42, + 'double': 3.14159, + 'List': ['foo', 'bar'], + }; + + FakeStorage fakeStorage; + RxStorage rxStorage; + + setUp(() { + fakeStorage = FakeStorage(kTestValues); + + rxStorage = RxStorage( + fakeStorage, + // const DefaultLogger(), + ); + }); + + tearDown(() async => await rxStorage.dispose()); + + test( + 'Stream will emit error when read value is not valid type, or emit null when value is not set', + () async { + final intStream = + rxStorage.getIntStream('bool'); // Actual: Stream + await expectLater( + intStream, + emitsAnyOf([ + isNull, + emitsError(isA()), + ]), + ); + + final listStringStream = + rxStorage.getStringListStream('String'); // Actual: Stream + await expectLater( + listStringStream, + emitsAnyOf([ + isNull, + emitsError(isA()), + ]), + ); + + final noSuchStream = + rxStorage.getIntStream('###String'); // Actual: Stream + + await expectLater( + noSuchStream, + emits(isNull), + ); + }, + ); + + test( + 'Stream will emit value as soon as possible after listen', + () async { + await Future.wait([ + expectLater( + rxStorage.getIntStream('int'), + emits(anything), + ), + expectLater( + rxStorage.getBoolStream('bool'), + emits(anything), + ), + expectLater( + rxStorage.getDoubleStream('double'), + emits(anything), + ), + expectLater( + rxStorage.getStringStream('String'), + emits(anything), + ), + expectLater( + rxStorage.getStringListStream('List'), + emits(anything), + ), + expectLater( + rxStorage.getStream('No such key'), + emits(isNull), + ), + ]); + }, + ); + + test( + 'Stream will emit value as soon as possible after listen,' + ' and will emit value when value associated with key change', + () async { + /// + /// Bool + /// + final streamBool = rxStorage.getBoolStream('bool'); + final expectStreamBoolFuture = expectLater( + streamBool, + emitsInOrder([anything, false, true, false, true, false]), + ); + await rxStorage.setBool('bool', false); + await rxStorage.setBool('bool', true); + await rxStorage.setBool('bool', false); + await rxStorage.setBool('bool', true); + await rxStorage.setBool('bool', false); + + /// + /// Double + /// + final streamDouble = rxStorage.getDoubleStream('double'); + final expectStreamDoubleFuture = expectLater( + streamDouble, + emitsInOrder([anything, 0.3333, 1, 2, isNull, 3, isNull, 4]), + ); + await rxStorage.setDouble('double', 0.3333); + await rxStorage.setDouble('double', 1); + await rxStorage.setDouble('double', 2); + await rxStorage.setDouble('double', null); + await rxStorage.setDouble('double', 3); + await rxStorage.remove('double'); + await rxStorage.setDouble('double', 4); + + /// + /// Int + /// + final streamInt = rxStorage.getIntStream('int'); + final expectStreamIntFuture = expectLater( + streamInt, + emitsInOrder([anything, 1, isNull, 2, 3, isNull, 3, 2, 1]), + ); + await rxStorage.setInt('int', 1); + await rxStorage.setInt('int', null); + await rxStorage.setInt('int', 2); + await rxStorage.setInt('int', 3); + await rxStorage.remove('int'); + await rxStorage.setInt('int', 3); + await rxStorage.setInt('int', 2); + await rxStorage.setInt('int', 1); + + /// + /// String + /// + final streamString = rxStorage.getStringStream('String'); + final expectStreamStringFuture = expectLater( + streamString, + emitsInOrder([anything, 'h', 'e', 'l', 'l', 'o', isNull]), + ); + await rxStorage.setString('String', 'h'); + await rxStorage.setString('String', 'e'); + await rxStorage.setString('String', 'l'); + await rxStorage.setString('String', 'l'); + await rxStorage.setString('String', 'o'); + await rxStorage.setString('String', null); + + /// + /// List + /// + final streamListString = rxStorage.getStringListStream('List'); + final expectStreamListStringFuture = expectLater( + streamListString, + emitsInOrder([ + anything, + ['1', '2', '3'], + ['1', '2', '3', '4'], + ['1', '2', '3', '4', '5'], + ['1', '2', '3', '4'], + ['1', '2', '3'], + ['1', '2'], + ['1'], + [], + isNull, + ['done'], + ]), + ); + await rxStorage.setStringList('List', ['1', '2', '3']); + await rxStorage.setStringList('List', ['1', '2', '3', '4']); + await rxStorage.setStringList('List', ['1', '2', '3', '4', '5']); + await rxStorage.setStringList('List', ['1', '2', '3', '4']); + await rxStorage.setStringList('List', ['1', '2', '3']); + await rxStorage.setStringList('List', ['1', '2']); + await rxStorage.setStringList('List', ['1']); + await rxStorage.setStringList('List', []); + await rxStorage.remove('List'); + await rxStorage.setStringList('List', ['done']); + + await Future.wait([ + expectStreamBoolFuture, + expectStreamDoubleFuture, + expectStreamIntFuture, + expectStreamStringFuture, + expectStreamListStringFuture, + ]); + }, + ); + + test('Does not emit anything after disposed', () async { + final stream = rxStorage.getStringListStream('List'); + + const expected = [ + anything, + ['before', 'dispose', '1'], + ['before', 'dispose', '2'], + ]; + var index = 0; + final result = []; + stream.listen( + (data) => result.add(index == 0 ? true : data == expected[index++]), + ); + + for (final v in expected.skip(1)) { + await rxStorage.setStringList( + 'List', + v, + ); + await Future.delayed(Duration.zero); + } + + // delay + await Future.delayed(const Duration(microseconds: 500)); + await rxStorage.dispose(); + await Future.delayed(Duration.zero); + + // not emit but persisted + await rxStorage.setStringList( + 'List', + ['after', 'dispose'], + ); + // working fine + expect( + await rxStorage.getStringList('List'), + ['after', 'dispose'], + ); + + // timeout is 2 seconds + await Future.delayed(const Duration(seconds: 2)); + expect(result.length, expected.length); + expect(result.every((element) => element), isTrue); + }); + + test('Emit null when clearing', () async { + final stream = rxStorage.getStringListStream('List'); + + final later = expectLater( + stream, + emitsInOrder( + [ + anything, + isNull, + ], + ), + ); + + await rxStorage.clear(); + + await later; + }); + + test('Emit value when reloading', () async { + final stream = rxStorage.getStringListStream('List'); + + final later = expectLater( + stream, + emitsInOrder( + [ + anything, + ['AFTER RELOAD'], + ['WORKING 1'], + ['WORKING 2'], + ], + ), + ); + + fakeStorage.map = { + 'List': ['AFTER RELOAD'] + }; + await rxStorage.reload(); // emits ['AFTER RELOAD'] + + await rxStorage.setStringList('List', ['WORKING 1']); // emits ['WORKING'] + + fakeStorage.map = { + 'List': ['WORKING 2'], + }; + await rxStorage.reload(); // emits ['WORKING'] + + await later; + }); + + test('Emit keys', () async { + final keysStream = rxStorage.getKeysStream(); + + final future = expectLater( + keysStream, + emitsInOrder([ + anything, + anything, + anything, + anything, + ]), + ); + + await rxStorage.setInt('int', 0); + await rxStorage.setDouble('double', 0); + await rxStorage.setString('String', ''); + + await future; + }); + + test('Stream is single-subscription stream', () { + final stream = rxStorage.getStringListStream('List'); + expect(stream.isBroadcast, isFalse); + stream.listen(null); + expect(() => stream.listen(null), throwsStateError); + }); + }); +} diff --git a/test/stream_extensions/map_not_null_test.dart b/test/stream_extensions/map_not_null_test.dart new file mode 100644 index 0000000..240f7f5 --- /dev/null +++ b/test/stream_extensions/map_not_null_test.dart @@ -0,0 +1,109 @@ +import 'dart:async'; + +import 'package:rx_storage/src/stream_extensions/map_not_null_stream_transformer.dart'; +import 'package:test/test.dart'; + +void main() { + group('Rx.mapNotNull', () { + test('Rx.mapNotNull', () async { + // 0-----1-----2-----3-----...-----8-----9-----| + // 1-----null--3-----null--...-----9-----null--| + // 1--3--5--7--9--| + final stream = + Stream.periodic(const Duration(milliseconds: 100), (i) => i) + .take(10) + .mapNotNull((i) => i.isOdd ? null : i + 1); + await expectLater( + stream, + emitsInOrder([1, 3, 5, 7, 9, emitsDone]), + ); + }); + + test('Rx.mapNotNull.shouldThrowA', () { + expect( + () => Stream.value(42).mapNotNull(null), + throwsA(const TypeMatcher()), + ); + }); + + test('Rx.mapNotNull.shouldThrowB', () async { + final stream = Stream.error(Exception()).mapNotNull((_) => true); + await expectLater( + stream, + emitsError(isA()), + ); + }); + + test('Rx.mapNotNull.shouldThrowC', () async { + final stream = Stream.fromIterable([1, 2, 3, 4]).mapNotNull((i) { + if (i == 3) { + throw Exception(); + } else { + return i; + } + }); + expect( + stream, + emitsInOrder([ + 1, + 2, + emitsError(isA()), + 4, + emitsDone, + ]), + ); + }); + + test('Rx.mapNotNull.asBroadcastStream', () { + final stream = Stream.fromIterable([2, 3, 4, 5, 6]) + .mapNotNull((i) => null) + .asBroadcastStream(); + + // listen twice on same stream + stream.listen(null); + stream.listen(null); + + // code should reach here + expect(true, true); + }); + + test('Rx.mapNotNull.pause.resume', () async { + StreamSubscription subscription; + + subscription = + Stream.fromIterable([2, 3, 4, 5, 6]).mapNotNull((i) => i).listen( + expectAsync1( + (data) { + expect(data, 2); + subscription.cancel(); + }, + ), + ); + + subscription.pause(); + subscription.resume(); + }); + + test('Rx.mapNotNull.broadcast', () { + final streamController = StreamController.broadcast(); + final stream = streamController.stream.mapNotNull((i) => i); + + expect(stream.isBroadcast, isTrue); + stream.listen(null); + stream.listen(null); + + expect(true, true); + }); + + test('Rx.mapNotNull.not.broadcast', () { + final streamController = StreamController(); + final stream = streamController.stream.mapNotNull((i) => i); + + expect(stream.isBroadcast, isFalse); + stream.listen(null); + expect(() => stream.listen(null), throwsStateError); + + streamController.add(1); + }); + }); +} diff --git a/test/stream_extensions/to_single_subscription_stream_test.dart b/test/stream_extensions/to_single_subscription_stream_test.dart new file mode 100644 index 0000000..b8d071b --- /dev/null +++ b/test/stream_extensions/to_single_subscription_stream_test.dart @@ -0,0 +1,34 @@ +import 'dart:async'; + +import 'package:rx_storage/src/stream_extensions/single_subscription.dart'; +import 'package:test/test.dart'; + +void main() { + group('Stream.toSingleSubscriptionStream', () { + test('Stream.toSingleSubscriptionStream', () { + final streamController = StreamController.broadcast(); + final singleSubscriptionStream = + streamController.stream.toSingleSubscriptionStream(); + + expect(singleSubscriptionStream.isBroadcast, isFalse); + singleSubscriptionStream.listen(null); + expect(() => singleSubscriptionStream.listen(null), throwsStateError); + }); + + test('Emitting values since listening', () { + final streamController = StreamController.broadcast(); + final singleSubscriptionStream = + streamController.stream.toSingleSubscriptionStream(); + + expect( + singleSubscriptionStream, + emitsInOrder([1, 2, 3, emitsDone]), + ); + + streamController.add(1); + streamController.add(2); + streamController.add(3); + streamController.close(); + }); + }); +}