Skip to content

Commit

Permalink
feat: handle incoming blob filters
Browse files Browse the repository at this point in the history
This is the bulk of the work required in [#905]. It takes incoming blob
filters and converts them to want bitfields.

It starts in `MapeoProject` and gets all the way down to `PeerState`.

[#905]: #905
  • Loading branch information
EvanHahn committed Nov 7, 2024
1 parent a2dfa79 commit 70f65b4
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 12 deletions.
8 changes: 7 additions & 1 deletion src/blob-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ export class BlobStore extends TypedEmitter {
const entriesStream = createEntriesStream(this.#driveIndex, { live })
if (!filter) return entriesStream
const filterStream = new FilterEntriesStream(filter)
return pipeline(entriesStream, filterStream, noop)
const result = pipeline(entriesStream, filterStream, noop)
// Destroying the pipeline should destroy the *first* stream, not the
// whole pipeline.
Object.defineProperty(result, 'destroy', {
value: entriesStream.destroy.bind(entriesStream),
})
return result
}

/**
Expand Down
23 changes: 23 additions & 0 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import { omit } from './lib/omit.js'
import { MemberApi } from './member-api.js'
import {
SyncApi,
kAddBlobWantRange,
kClearBlobWantRanges,
kHandleDiscoveryKey,
kSetBlobDownloadFilter,
kWaitForInitialSyncWithPeer,
Expand Down Expand Up @@ -430,6 +432,27 @@ export class MapeoProject extends TypedEmitter {
getReplicationStream,
})

/** @type {Map<string, BlobStoreEntriesStream>} */
const entriesReadStreams = new Map()
this.#coreManager.on('peer-download-intent', async (filter, peerId) => {
entriesReadStreams.get(peerId)?.destroy()

const entriesReadStream = this.#blobStore.createEntriesReadStream({
live: true,
filter,
})
entriesReadStreams.set(peerId, entriesReadStream)

this.#syncApi[kClearBlobWantRanges](peerId)

for await (const entry of entriesReadStream) {
if (entriesReadStream.destroyed) break
const { blockOffset, blockLength } = entry.value.blob
this.#syncApi[kAddBlobWantRange](peerId, blockOffset, blockLength)
if (entriesReadStream.destroyed) break
}
})

this.#translationApi = new TranslationApi({
dataType: this.#dataTypes.translation,
})
Expand Down
20 changes: 15 additions & 5 deletions src/sync/core-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,23 @@ export class CoreSyncState {
* blocks/ranges that are added here
*
* @param {PeerId} peerId
* @param {Array<{ start: number, length: number }>} ranges
* @param {number} start
* @param {number} length
* @returns {void}
*/
setPeerWants(peerId, ranges) {
addWantRange(peerId, start, length) {
const peerState = this.#getOrCreatePeerState(peerId)
for (const { start, length } of ranges) {
peerState.setWantRange({ start, length })
}
peerState.addWantRange(start, length)
this.#update()
}

/**
* @param {PeerId} peerId
* @returns {void}
*/
clearWantRanges(peerId) {
const peerState = this.#getOrCreatePeerState(peerId)
peerState.clearWantRanges()
this.#update()
}

Expand Down
22 changes: 22 additions & 0 deletions src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,28 @@ export class NamespaceSyncState {
this.#getCoreState(coreDiscoveryId).insertPreHaves(peerId, start, bitfield)
}

/**
* @param {string} peerId
* @param {number} start
* @param {number} length
* @returns {void}
*/
addWantRange(peerId, start, length) {
for (const coreState of this.#coreStates.values()) {
coreState.addWantRange(peerId, start, length)
}
}

/**
* @param {string} peerId
* @returns {void}
*/
clearWantRanges(peerId) {
for (const coreState of this.#coreStates.values()) {
coreState.clearWantRanges(peerId)
}
}

/**
* @param {string} discoveryId
*/
Expand Down
24 changes: 24 additions & 0 deletions src/sync/sync-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export const kWaitForInitialSyncWithPeer = Symbol(
'wait for initial sync with peer'
)
export const kSetBlobDownloadFilter = Symbol('set isArchiveDevice')
export const kAddBlobWantRange = Symbol('add blob want range')
export const kClearBlobWantRanges = Symbol('clear blob want ranges')

/**
* @typedef {'initial' | 'full'} SyncType
Expand Down Expand Up @@ -163,6 +165,28 @@ export class SyncApi extends TypedEmitter {
}
}

/**
* Add some blob blocks this peer wants.
*
* @param {string} peerId
* @param {number} start
* @param {number} length
* @returns {void}
*/
[kAddBlobWantRange](peerId, start, length) {
this[kSyncState].addBlobWantRange(peerId, start, length)
}

/**
* Clear the blob blocks this peer wants.
*
* @param {string} peerId
* @returns {void}
*/
[kClearBlobWantRanges](peerId) {
this[kSyncState].clearBlobWantRanges(peerId)
}

/** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */
[kHandleDiscoveryKey](discoveryKey, protomux) {
const peerSyncController = this.#peerSyncControllers.get(protomux)
Expand Down
18 changes: 18 additions & 0 deletions src/sync/sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ export class SyncState extends TypedEmitter {
])
}

/**
* @param {string} peerId
* @param {number} start
* @param {number} length
* @returns {void}
*/
addBlobWantRange(peerId, start, length) {
this.#syncStates.blob.addWantRange(peerId, start, length)
}

/**
* @param {string} peerId
* @returns {void}
*/
clearBlobWantRanges(peerId) {
this.#syncStates.blob.clearWantRanges(peerId)
}

#handleUpdate = () => {
this.emit('state', this.getState())
}
Expand Down
54 changes: 48 additions & 6 deletions test-e2e/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ test('non-archive devices only sync a subset of blobs', async (t) => {
const fastifyController = new FastifyController({ fastify })
t.after(() => fastifyController.stop())
const invitee = createManager('invitee', t, { fastify })

invitee.setIsArchiveDevice(false)

const managers = [invitee, invitor]
Expand All @@ -209,6 +210,9 @@ test('non-archive devices only sync a subset of blobs', async (t) => {
const [invitorProject, inviteeProject] = projects

const fixturesPath = new URL('../test/fixtures/', import.meta.url)

// Test that only previews and thumbnails sync to non-archive devices

const imagesFixturesPath = new URL('images/', fixturesPath)
const photoFixturePaths = {
original: new URL('02-digidem-logo.jpg', imagesFixturesPath).pathname,
Expand All @@ -233,12 +237,10 @@ test('non-archive devices only sync a subset of blobs', async (t) => {
invitorProject.$sync.start()
inviteeProject.$sync.start()

// TODO: We should replace this with `await waitForSync(projects, 'full')` once
// the following issues are merged:
//
// - <https://github.com/digidem/comapeo-core/issues/682>
// - <https://github.com/digidem/comapeo-core/issues/905>
await delay(2000)
await waitForSync(projects, 'full')

inviteeProject.$sync.stop()
inviteeProject.$sync.stop()

/**
* @param {BlobId} blobId
Expand Down Expand Up @@ -278,6 +280,46 @@ test('non-archive devices only sync a subset of blobs', async (t) => {
photoFixturePaths.thumbnail
),
])

// Devices can become archives again and get all the data

invitee.setIsArchiveDevice(true)

invitorProject.$sync.start()
inviteeProject.$sync.start()

await waitForSync(projects, 'full')

await Promise.all([
assertLoads(
{ ...photoBlob, variant: 'original' },
photoFixturePaths.original
),
assertLoads({ ...audioBlob, variant: 'original' }, audioFixturePath),
])

// Devices can toggle whether they're an archive device while sync is running

invitee.setIsArchiveDevice(false)

const photoBlob2 = await invitorProject.$blobs.create(
photoFixturePaths,
blobMetadata({ mimeType: 'image/jpeg' })
)

await waitForSync(projects, 'full')

await Promise.all([
assert404({ ...photoBlob2, variant: 'original' }),
assertLoads(
{ ...photoBlob2, type: 'photo', variant: 'preview' },
photoFixturePaths.preview
),
assertLoads(
{ ...photoBlob2, type: 'photo', variant: 'thumbnail' },
photoFixturePaths.thumbnail
),
])
})

test('start and stop sync', async function (t) {
Expand Down

0 comments on commit 70f65b4

Please sign in to comment.