Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDART-973: Add support for the new progress notifications #1546

Merged
merged 22 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/dart-desktop-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
shell: bash

- id: runner_os_lowercase
# there is no such thing as ${{ tolower(runner.os) }}, hence this abomination ¯\_(ツ)_/¯
# there is no such thing as ${{ tolower(runner.os) }}, hence this abomination ¯\_(ツ)_/¯
# use with steps.runner_os_lowercase.outputs.os
run: echo ${{ runner.os }} | awk '{print "os=" tolower($0)}' >> $GITHUB_OUTPUT
shell: bash
Expand All @@ -59,15 +59,15 @@ jobs:
ulimit -n 10240
if: ${{ contains(runner.os, 'macos') }}

- name: Run tests ${{ runner }} ${{ runner.arch }}
- name: Run tests ${{ runner.os }} ${{ runner.arch }}
run: melos test:unit

# TODO: Publish all reports
- name: Publish Test Report
uses: dorny/[email protected]
if: success() || failure()
with:
name: Test Results Dart ${{ runner }} ${{ runner.arch }}
name: Test Results Dart ${{ runner.os }} ${{ runner.arch }}
path: test-results.json
reporter: dart-json
only-summary: true
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## vNext (TBD)

### Enhancements
* None
* The download progress estimate reported by `Session.getProgressStream` will now return meaningful estimated values, while previously it always returned 1. (Issue [#1564](https://github.com/realm/realm-dart/issues/1564))

### Fixed
* [sane_uuid](https://pub.dev/packages/sane_uuid) 1.0.0 was released, which has a few minor breaking change as compared to to 1.0.0-rc.5 that impact realm:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,5 @@ void _onConnectionStateChange(Object userdata, int oldState, int newState) {
void syncProgressCallback(Object userdata, int transferred, int transferable, double estimate) {
final controller = userdata as ProgressNotificationsController;

controller.onProgress(transferred, transferable);
controller.onProgress(estimate);
}
4 changes: 2 additions & 2 deletions packages/realm_dart/lib/src/realm_class.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1016,8 +1016,8 @@ class RealmAsyncOpenProgressNotificationsController implements ProgressNotificat
}

@override
void onProgress(int transferredBytes, int transferableBytes) {
_streamController.add(SessionInternal.createSyncProgress(transferredBytes, transferableBytes));
void onProgress(double progressEstimate) {
_streamController.add(SessionInternal.createSyncProgress(progressEstimate));
}

void _start() {
Expand Down
21 changes: 6 additions & 15 deletions packages/realm_dart/lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,7 @@ class SyncProgress {
/// value may either increase or decrease as new data needs to be transferred.
final double progressEstimate;

const SyncProgress._({required this.progressEstimate});

static double _calculateProgress({required int transferred, required int transferable}) {
if (transferable == 0 || transferred > transferable) {
return 1;
}

return transferred / transferable;
}
const SyncProgress({required this.progressEstimate});
}

/// A type containing information about the transition of a connection state from one value to another.
Expand All @@ -108,12 +100,11 @@ extension SessionInternal on Session {

void raiseError(int errorCode, bool isFatal) => handle.raiseError(errorCode, isFatal);

static SyncProgress createSyncProgress(int transferredBytes, int transferableBytes) =>
SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes));
static SyncProgress createSyncProgress(double progressEstimate) => SyncProgress(progressEstimate: progressEstimate);
}

abstract interface class ProgressNotificationsController {
void onProgress(int transferredBytes, int transferableBytes);
void onProgress(double progressEstimate);
}

/// @nodoc
Expand All @@ -133,10 +124,10 @@ class SessionProgressNotificationsController implements ProgressNotificationsCon
}

@override
void onProgress(int transferredBytes, int transferableBytes) {
_streamController.add(SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes)));
void onProgress(double progressEstimate) {
_streamController.add(SyncProgress(progressEstimate: progressEstimate));

if (transferredBytes >= transferableBytes && _mode == ProgressMode.forCurrentlyOutstandingWork) {
if (progressEstimate >= 1.0 && _mode == ProgressMode.forCurrentlyOutstandingWork) {
_streamController.close();
}
}
Expand Down
25 changes: 14 additions & 11 deletions packages/realm_dart/test/realm_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1337,18 +1337,21 @@ void main() {
final user = await app.logIn(credentials);
final configuration = Configuration.flexibleSync(user, getSyncSchema());

int count = 0;
double progress = -1;

double progressEstimate = -1;
bool progressReported = false;
var syncedRealm = await getRealmAsync(configuration, onProgressCallback: (syncProgress) {
count++;
progress = syncProgress.progressEstimate;
progressEstimate = syncProgress.progressEstimate;
progressReported = true;
});

await Future<void>.delayed(Duration(milliseconds: 500));

expect(syncedRealm.isClosed, false);
// Semantics of onProgressCallback changed with https://github.com/realm/realm-core/issues/7452
expect(count, 0);
expect(progress, -1);

// For FLX realms with no subscriptions, the server won't report any progress before it resolves the
// Realm.open future.
expect(progressEstimate, -1);
expect(progressReported, false);
});

baasTest('Realm.open (flexibleSync) - download a populated realm', (appConfiguration) async {
Expand All @@ -1367,16 +1370,16 @@ void main() {
final config = await _subscribeForAtlasAddedData(app);

int printCount = 0;
double progress = 0;
double progressEstimate = 0;

final syncedRealm = await getRealmAsync(config, onProgressCallback: (syncProgress) {
printCount++;
progress = syncProgress.progressEstimate;
progressEstimate = syncProgress.progressEstimate;
});

expect(syncedRealm.isClosed, false);
expect(printCount, isNot(0));
expect(progress, 1.0);
expect(progressEstimate, 1.0);
});

baasTest('Realm.open (flexibleSync) - listen and cancel download progress of a populated realm', (appConfiguration) async {
Expand Down
93 changes: 82 additions & 11 deletions packages/realm_dart/test/session_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ void main() {
StreamProgressData subscribeToProgress(Realm realm, ProgressDirection direction, ProgressMode mode) {
final data = StreamProgressData();
final stream = realm.syncSession.getProgressStream(direction, mode);

data.subscription = stream.listen((event) {
if (mode == ProgressMode.forCurrentlyOutstandingWork) {
expect(event.progressEstimate, greaterThanOrEqualTo(data.progressEstimate));
Expand Down Expand Up @@ -191,26 +192,97 @@ void main() {

baasTest('SyncSession.getProgressStream forCurrentlyOutstandingWork', (configuration) async {
final differentiator = ObjectId();
final realmA = await getIntegrationRealm(differentiator: differentiator);
final realmB = await getIntegrationRealm(differentiator: differentiator);
final uploadRealm = await getIntegrationRealm(differentiator: differentiator);

for (var i = 0; i < 10; i++) {
realmA.write(() {
realmA.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});
}

final uploadData = subscribeToProgress(realmA, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork);
final downloadData = subscribeToProgress(realmB, ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork);
final uploadData = subscribeToProgress(uploadRealm, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork);
await uploadRealm.syncSession.waitForUpload();
await validateData(uploadData, expectDone: true);

await realmA.syncSession.waitForUpload();
// Subscribe immediately after the upload to ensure we get the entire upload message as progress notifications
final downloadRealm = await getIntegrationRealm(differentiator: differentiator, waitForSync: false);
final downloadData = subscribeToProgress(downloadRealm, ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork);

await validateData(uploadData, expectDone: true);
await downloadRealm.subscriptions.waitForSynchronization();

await realmB.syncSession.waitForDownload();
await downloadRealm.syncSession.waitForDownload();

await validateData(downloadData, expectDone: true);

// We should not see more updates in either direction
final uploadCallbacks = uploadData.callbacksInvoked;
final downloadCallbacks = downloadData.callbacksInvoked;

uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});

await uploadRealm.syncSession.waitForUpload();
await downloadRealm.syncSession.waitForDownload();

expect(uploadRealm.all<NullableTypes>().length, downloadRealm.all<NullableTypes>().length);
expect(uploadData.callbacksInvoked, uploadCallbacks);
expect(downloadData.callbacksInvoked, downloadCallbacks);
Comment on lines +217 to +230
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate the explanation of this part a bit? Why will we not see more upload callbacks here, when we add and wait for upload?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the subscription we setup is for ProgressMode.forCurrentlyOutstandingWork, which ends when you've reached 100% and no further callbacks will be invoked as more data comes in/goes out.


await uploadData.subscription.cancel();
await downloadData.subscription.cancel();
});

baasTest('SyncSession.getProgressStream after reconnecting', (configuration) async {
final differentiator = ObjectId();
final uploadRealm = await getIntegrationRealm(differentiator: differentiator);

// Make sure we've caught up, then close the Realm. We'll reopen it later and verify that progress notifications
// are delivered. This is different from "SyncSession.getProgressStream forCurrentlyOutstandingWork" where we're
// testing notifications after change of query.
final user = await getIntegrationUser(appConfig: configuration);
final config = getIntegrationConfig(user);
var downloadRealm = getRealm(config);
downloadRealm.subscriptions.update((mutableSubscriptions) {
mutableSubscriptions.add(downloadRealm.query<NullableTypes>(r'differentiator = $0', [differentiator]));
});

await downloadRealm.subscriptions.waitForSynchronization();
downloadRealm.close();

for (var i = 0; i < 10; i++) {
uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});
}

final uploadData = subscribeToProgress(uploadRealm, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork);
await uploadRealm.syncSession.waitForUpload();
await validateData(uploadData, expectDone: true);

// Reopen the download realm and subscribe for notifications - those should still be delivered as normal.
downloadRealm = getRealm(getIntegrationConfig(user));
final downloadData = subscribeToProgress(downloadRealm, ProgressDirection.download, ProgressMode.reportIndefinitely);

await downloadRealm.syncSession.waitForDownload();

await validateData(downloadData, expectDone: false);

// We should not see more updates in upload direction, but should see a callback invoked for download
final uploadCallbacks = uploadData.callbacksInvoked;
final downloadCallbacks = downloadData.callbacksInvoked;

uploadRealm.write(() {
uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50)));
});

await uploadRealm.syncSession.waitForUpload();
await downloadRealm.syncSession.waitForDownload();

expect(uploadRealm.all<NullableTypes>().length, downloadRealm.all<NullableTypes>().length);
expect(uploadData.callbacksInvoked, uploadCallbacks);
expect(downloadData.callbacksInvoked, greaterThan(downloadCallbacks));

await uploadData.subscription.cancel();
await downloadData.subscription.cancel();
});
Expand Down Expand Up @@ -254,7 +326,6 @@ void main() {
expect(downloadData.progressEstimate, 1.0);

expect(uploadData.callbacksInvoked, greaterThan(uploadSnapshot.callbacksInvoked));

expect(downloadData.callbacksInvoked, greaterThan(downloadSnapshot.callbacksInvoked));

await uploadData.subscription.cancel();
Expand Down Expand Up @@ -319,7 +390,7 @@ class StreamProgressData {
bool doneInvoked;
late StreamSubscription<SyncProgress> subscription;

StreamProgressData({this.progressEstimate = 0, this.callbacksInvoked = 0, this.doneInvoked = false});
StreamProgressData({this.progressEstimate = -1, this.callbacksInvoked = 0, this.doneInvoked = false});

StreamProgressData.snapshot(StreamProgressData other)
: this(callbacksInvoked: other.callbacksInvoked, doneInvoked: other.doneInvoked, progressEstimate: other.progressEstimate);
Expand Down
15 changes: 11 additions & 4 deletions packages/realm_dart/test/test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ void setupTests() {

Realm.logger.setLogLevel(LogLevel.detail);
Realm.logger.onRecord.listen((record) {
printOnFailure('${record.category} ${record.level.name}: ${record.message}');
printOnFailure('${DateTime.now().toUtc()} ${record.category} ${record.level.name}: ${record.message}');
});

if (Platform.isIOS) {
Expand Down Expand Up @@ -618,21 +618,28 @@ Future<User> getAnonymousUser(App app) {
return app.logIn(Credentials.anonymous(reuseCredentials: false));
}

FlexibleSyncConfiguration getIntegrationConfig(User user) {
return Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately;
}

/// Returns a synced realm after logging in a user.
///
/// A subscription for querying all [NullableTypes] objects containing
/// the `differentiator` will be added if a `differentiator` is provided.
Future<Realm> getIntegrationRealm({App? app, ObjectId? differentiator, AppConfiguration? appConfig}) async {
Future<Realm> getIntegrationRealm({App? app, ObjectId? differentiator, AppConfiguration? appConfig, bool waitForSync = true}) async {
app ??= App(appConfig ?? await baasHelper!.getAppConfig());
final user = await getIntegrationUser(app: app, appConfig: appConfig);

final config = Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately;
final config = getIntegrationConfig(user);
final realm = getRealm(config);
if (differentiator != null) {
realm.subscriptions.update((mutableSubscriptions) {
mutableSubscriptions.add(realm.query<NullableTypes>(r'differentiator = $0', [differentiator]));
});

await realm.subscriptions.waitForSynchronization();
if (waitForSync) {
await realm.subscriptions.waitForSynchronization();
}
}

return realm;
Expand Down
Loading