Skip to content

Commit 35bfc4e

Browse files
committed
Support unsubscribing
1 parent 9ddded9 commit 35bfc4e

File tree

3 files changed

+79
-1
lines changed

3 files changed

+79
-1
lines changed

crates/core/src/sync/storage_adapter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,11 @@ impl StorageAdapter {
266266
self.delete_outdated_subscriptions()?;
267267

268268
let mut subscriptions: Vec<RequestedStreamSubscription> = Vec::new();
269+
// We have an explicit subscription iff ttl is not null. Checking is_default is not enough,
270+
// because a stream can both be a default stream and have an explicit subscription.
269271
let stmt = self
270272
.db
271-
.prepare_v2("SELECT * FROM ps_stream_subscriptions WHERE NOT is_default;")?;
273+
.prepare_v2("SELECT * FROM ps_stream_subscriptions WHERE ttl IS NOT NULL;")?;
272274

273275
while let ResultCode::ROW = stmt.step()? {
274276
let subscription = Self::read_stream_subscription(&stmt)?;

crates/core/src/sync/subscriptions.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ impl LocallyTrackedSubscription {
5353
pub enum SubscriptionChangeRequest {
5454
#[serde(rename = "subscribe")]
5555
Subscribe(SubscribeToStream),
56+
#[serde(rename = "unsubscribe")]
57+
Unsubscribe(UnsubscribeFromStream),
5658
}
5759

5860
#[serde_as]
@@ -104,6 +106,21 @@ pub fn apply_subscriptions(
104106
}?;
105107
stmt.exec()?;
106108
}
109+
SubscriptionChangeRequest::Unsubscribe(subscription) => {
110+
let stmt = db
111+
.prepare_v2("UPDATE ps_stream_subscriptions SET ttl = NULL WHERE stream_name = ? AND local_params = ?")
112+
.into_db_result(db)?;
113+
stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?;
114+
stmt.bind_text(
115+
2,
116+
match &subscription.params {
117+
Some(params) => params.get(),
118+
None => "null",
119+
},
120+
sqlite::Destructor::STATIC,
121+
)?;
122+
stmt.exec()?;
123+
}
107124
}
108125

109126
Ok(())

dart/test/sync_stream_test.dart

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,65 @@ void main() {
175175
expect(stored, containsPair('active', 1));
176176
expect(stored, containsPair('is_default', 0));
177177
});
178+
});
179+
180+
group('explicit subscriptions', () {
181+
syncTest('unsubscribe', (_) {
182+
db.execute(
183+
'INSERT INTO ps_stream_subscriptions (stream_name, ttl) VALUES (?, ?);',
184+
['my_stream', 3600]);
185+
186+
var startInstructions = control('start', null);
187+
expect(
188+
startInstructions,
189+
contains(
190+
containsPair(
191+
'EstablishSyncStream',
192+
containsPair(
193+
'request',
194+
containsPair(
195+
'streams',
196+
{
197+
'include_defaults': true,
198+
'subscriptions': isNotEmpty,
199+
},
200+
),
201+
),
202+
),
203+
),
204+
);
205+
control('stop', null);
206+
207+
control(
208+
'subscriptions',
209+
json.encode({
210+
'unsubscribe': {
211+
'stream': 'my_stream',
212+
'params': null,
213+
'immediate': false,
214+
}
215+
}),
216+
);
217+
startInstructions = control('start', null);
218+
expect(
219+
startInstructions,
220+
contains(
221+
containsPair(
222+
'EstablishSyncStream',
223+
containsPair(
224+
'request',
225+
containsPair(
226+
'streams',
227+
{
228+
'include_defaults': true,
229+
'subscriptions': isEmpty,
230+
},
231+
),
232+
),
233+
),
234+
),
235+
);
236+
});
178237

179238
syncTest('ttl', (controller) {
180239
db.execute(

0 commit comments

Comments
 (0)