From eef9f95c80bcd4b8ce0c60d3468cf0149107c092 Mon Sep 17 00:00:00 2001 From: pubkey <8926560+pubkey@users.noreply.github.com> Date: Thu, 7 Dec 2023 14:16:52 +0100 Subject: [PATCH 1/5] CHORE use deno timestamp instead of revision --- src/incremental-write.ts | 5 +- src/plugins/storage-denokv/denokv-helper.ts | 9 +++ src/plugins/storage-denokv/denokv-query.ts | 6 +- .../rx-storage-instance-denokv.ts | 62 +++++++++++++++++-- src/rx-document.ts | 7 +++ src/rx-storage-helper.ts | 20 +++--- 6 files changed, 90 insertions(+), 19 deletions(-) diff --git a/src/incremental-write.ts b/src/incremental-write.ts index e7b8b490b71..db89003cf46 100644 --- a/src/incremental-write.ts +++ b/src/incremental-write.ts @@ -209,10 +209,13 @@ export function modifierFromPublicToInternal( } +/** + * From a list of document states, + * returns the newest one, based on revision. + */ export function findNewestOfDocumentStates( docs: RxDocumentData[] ): RxDocumentData { - let newest = docs[0]; let newestRevisionHeight = parseRevision(newest._rev).height; docs.forEach(doc => { diff --git a/src/plugins/storage-denokv/denokv-helper.ts b/src/plugins/storage-denokv/denokv-helper.ts index dbeab2b878f..e5deea7e294 100644 --- a/src/plugins/storage-denokv/denokv-helper.ts +++ b/src/plugins/storage-denokv/denokv-helper.ts @@ -1,4 +1,5 @@ import { RxStorageDefaultStatics } from "../../rx-storage-statics.ts"; +import type { RxDocumentData } from '../../types/rx-storage'; export const RX_STORAGE_NAME_DENOKV = 'denokv'; export const RxStorageDenoKVStatics = RxStorageDefaultStatics; @@ -13,9 +14,17 @@ export function getDenoKVIndexName(index: string[]): string { */ export const DENOKV_DOCUMENT_ROOT_PATH = '||'; +export const DENOKV_VERSION_META_FLAG = 'denokv'; export const CLEANUP_INDEX: string[] = ['_deleted', '_meta.lwt']; + +export function denoKvRowToDocument(row: any): RxDocumentData { + const docData = row.value; + docData._meta[DENOKV_VERSION_META_FLAG] = row.versionstamp; + return docData; +} + /** * Get the global Deno variable from globalThis.Deno * so that compiling with plain typescript does not fail. diff --git a/src/plugins/storage-denokv/denokv-query.ts b/src/plugins/storage-denokv/denokv-query.ts index 03e50f4e643..00fa5e00494 100644 --- a/src/plugins/storage-denokv/denokv-query.ts +++ b/src/plugins/storage-denokv/denokv-query.ts @@ -11,7 +11,7 @@ import type { import { ensureNotFalsy } from '../../plugins/utils/index.ts'; import { getQueryMatcher, getSortComparator } from '../../rx-query-helper.ts'; import { RxStorageInstanceDenoKV } from "./rx-storage-instance-denokv.ts"; -import { DENOKV_DOCUMENT_ROOT_PATH, getDenoKVIndexName } from "./denokv-helper.ts"; +import { DENOKV_DOCUMENT_ROOT_PATH, denoKvRowToDocument, getDenoKVIndexName } from "./denokv-helper.ts"; import type { DenoKVPreparedQuery } from "./denokv-types.ts"; export async function queryDenoKV( @@ -84,7 +84,7 @@ export async function queryDenoKV( if (singleDocResult.value) { const docId: string = singleDocResult.value; const docDataResult = await kv.get([instance.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId], instance.kvOptions); - const docData = ensureNotFalsy(docDataResult.value); + const docData = denoKvRowToDocument(docDataResult); if (!queryMatcher || queryMatcher(docData)) { result.push(docData); } @@ -106,7 +106,7 @@ export async function queryDenoKV( for await (const indexDocEntry of range) { const docId = indexDocEntry.value; const docDataResult = await kv.get([instance.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId], instance.kvOptions); - const docData = ensureNotFalsy(docDataResult.value); + const docData = denoKvRowToDocument(docDataResult); if (!queryMatcher || queryMatcher(docData)) { result.push(docData); } diff --git a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts index 2e99977c277..285b2117973 100644 --- a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts +++ b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts @@ -24,7 +24,7 @@ import { getPrimaryFieldOfPrimaryKey } from '../../rx-schema-helper.ts'; import { addRxStorageMultiInstanceSupport } from '../../rx-storage-multiinstance.ts'; import type { DenoKVIndexMeta, DenoKVPreparedQuery, DenoKVSettings, DenoKVStorageInternals } from './denokv-types.ts'; import { RxStorageDenoKV } from './index.ts'; -import { CLEANUP_INDEX, DENOKV_DOCUMENT_ROOT_PATH, RX_STORAGE_NAME_DENOKV, getDenoGlobal, getDenoKVIndexName } from "./denokv-helper.ts"; +import { CLEANUP_INDEX, DENOKV_DOCUMENT_ROOT_PATH, DENOKV_VERSION_META_FLAG, RX_STORAGE_NAME_DENOKV, denoKvRowToDocument, getDenoGlobal, getDenoKVIndexName } from "./denokv-helper.ts"; import { getIndexableStringMonad, getStartIndexStringFromLowerBound, changeIndexableStringByOneQuantum } from "../../custom-index.ts"; import { appendToArray, batchArray, lastOfArray, toArray } from "../utils/utils-array.ts"; import { ensureNotFalsy } from "../utils/utils-other.ts"; @@ -97,6 +97,21 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< error: [] }; + + console.log('BULK WRITE:::'); + console.log(JSON.stringify(documentWrites, null, 4)); + + // TODO remove this check when everything works + documentWrites.forEach(r => { + if (r.previous && !r.previous._meta[DENOKV_VERSION_META_FLAG]) { + console.error('PREVIOUS DENO META NOT SET:'); + console.log(JSON.stringify(r, null, 4)); + const err = new Error('previous deno meta not set'); + console.log(err.stack); + throw err; + } + }); + const batches = batchArray(documentWrites, ensureNotFalsy(this.settings.batchSize)); /** @@ -124,10 +139,10 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< }) ); docsResult.map((row: any) => { - const docData = row.value; - if (!docData) { + if (!row.value) { return; } + const docData = denoKvRowToDocument(row); const docId: string = docData[primaryPath] as any; docsInDB.set(docId, docData); }); @@ -179,8 +194,40 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< }); const txResult = await tx.commit(); + console.dir(txResult); + const newVersionStamp = txResult.versionstamp; + + // if (ret.success.length > 1 && ret.success.length < 10 && txResult.ok) { + // console.log('--------------------------'); + // console.dir(txResult); + // ret.success.map(d => d[primaryPath]) + + + // const checkAfterIds = documentWrites.slice(0, 10); + // const docsResult = await kv.getMany( + // checkAfterIds.map(writeRow => { + // const docId: string = writeRow.document[primaryPath] as any; + // return [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId]; + // }) + // ); + // console.log(JSON.stringify(docsResult, null, 4)); + + // } + + ret.success.forEach(d => { + d._meta[DENOKV_VERSION_META_FLAG] = newVersionStamp; + }) + if (txResult.ok) { appendToArray(ret.error, categorized.errors); + + categorized.eventBulk.events.forEach(ev => { + ev.documentData._meta[DENOKV_VERSION_META_FLAG] = newVersionStamp; + if (ev.previousDocumentData) { + // ev.previousDocumentData._meta[DENOKV_VERSION_META_FLAG] = newVersionStamp; + } + }); + if (categorized.eventBulk.events.length > 0) { const lastState = ensureNotFalsy(categorized.newestRow).document; categorized.eventBulk.checkpoint = { @@ -203,7 +250,10 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< ids.map(async (docId) => { const kvKey = [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId]; const findSingleResult = await kv.get(kvKey, this.kvOptions); - const docInDb = findSingleResult.value; + if (!findSingleResult.value) { + return; + } + const docInDb = denoKvRowToDocument(findSingleResult); if ( docInDb && ( @@ -303,7 +353,7 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< for (const batch of batches) { const docs = await kv.getMany(batch); docs.forEach((row: any) => { - const docData = row.value; + const docData = denoKvRowToDocument(row); result.push(docData as any); }); } @@ -371,7 +421,7 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< if (!docDataResult.value) { continue; } - const docData = ensureNotFalsy(docDataResult.value); + const docData = denoKvRowToDocument(docDataResult); if ( !docData._deleted || docData._meta.lwt > maxDeletionTime diff --git a/src/rx-document.ts b/src/rx-document.ts index fb4126fb848..d021ab83749 100644 --- a/src/rx-document.ts +++ b/src/rx-document.ts @@ -424,6 +424,13 @@ export function createRxDocumentConstructor(proto = basePrototype) { collection: RxCollection, docData: RxDocumentData ) { + + // TODO remove this check when everything works + if (docData && !docData._meta.denokv) { + console.log(JSON.stringify(docData, null, 4)); + throw new Error('DO NOT CREATE DOC LIKE THIS WITHOUT META.denokv'); + } + this.collection = collection; // assume that this is always equal to the doc-data in the database diff --git a/src/rx-storage-helper.ts b/src/rx-storage-helper.ts index 37425a7945a..ced6ea4649e 100644 --- a/src/rx-storage-helper.ts +++ b/src/rx-storage-helper.ts @@ -33,6 +33,7 @@ import type { import { PROMISE_RESOLVE_TRUE, appendToArray, + arrayFilterNotEmpty, createRevision, ensureNotFalsy, flatClone, @@ -626,15 +627,16 @@ export function getWrappedStorageInstance< * field of plugin A was not removed. */ if (writeRow.previous) { - Object.keys(writeRow.previous._meta) - .forEach(metaFieldName => { - if (!writeRow.document._meta.hasOwnProperty(metaFieldName)) { - throw newRxError('SNH', { - dataBefore: writeRow.previous, - dataAfter: writeRow.document - }); - } - }); + // TODO do we need this check? + // Object.keys(writeRow.previous._meta) + // .forEach(metaFieldName => { + // if (!writeRow.document._meta.hasOwnProperty(metaFieldName)) { + // throw newRxError('SNH', { + // dataBefore: writeRow.previous, + // dataAfter: writeRow.document + // }); + // } + // }); } } data._meta.lwt = now(); From b9f3ca408c962a06f31f4fd9f2407047d12c67a6 Mon Sep 17 00:00:00 2001 From: pubkey <8926560+pubkey@users.noreply.github.com> Date: Thu, 7 Dec 2023 14:44:53 +0100 Subject: [PATCH 2/5] FIX tests --- src/plugin-helpers.ts | 6 ++++++ src/plugins/storage-denokv/rx-storage-instance-denokv.ts | 9 ++++++++- src/rx-collection.ts | 3 +++ test/unit/conflict-handling.test.ts | 3 +++ test/unit/encryption.test.ts | 6 +++++- 5 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/plugin-helpers.ts b/src/plugin-helpers.ts index 40f2a36752d..bd8ad90b240 100644 --- a/src/plugin-helpers.ts +++ b/src/plugin-helpers.ts @@ -212,7 +212,13 @@ export function wrapRxStorageInstance( }) ); + console.log('--- BEFORE!!'); + console.log(JSON.stringify(useRows, null, 4)); const writeResult = await instance.bulkWrite(useRows, context); + + console.log('WRTIE RESULT IN STORAGE WRAPPER:'); + console.log(JSON.stringify(writeResult, null, 4)); + const ret: RxStorageBulkWriteResponse = { success: [], error: [] diff --git a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts index 285b2117973..0ebd307ccd1 100644 --- a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts +++ b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts @@ -90,6 +90,7 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< } async bulkWrite(documentWrites: BulkWriteRow[], context: string): Promise> { + console.log('DNEOKV.bulkWrite()'); const kv = await this.kvPromise; const primaryPath = this.primaryPath; const ret: RxStorageBulkWriteResponse = { @@ -214,7 +215,9 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< // } - ret.success.forEach(d => { + ret.success.map(d => { + d = flatClone(d); + d._meta = flatClone(d._meta); d._meta[DENOKV_VERSION_META_FLAG] = newVersionStamp; }) @@ -241,6 +244,10 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< } } } + + console.log('DENO.bulkWrite() innerst return:'); + console.log(JSON.stringify(ret, null, 4)); + return ret; } async findDocumentsById(ids: string[], withDeleted: boolean): Promise[]> { diff --git a/src/rx-collection.ts b/src/rx-collection.ts index fe2c0a1530b..1cc239d417e 100644 --- a/src/rx-collection.ts +++ b/src/rx-collection.ts @@ -339,6 +339,9 @@ export class RxCollectionBase< 'rx-collection-bulk-insert' ); + console.log('BULK INSERT RESULT:'); + console.log(JSON.stringify(results, null, 4)); + // create documents const rxDocuments = mapDocumentsDataToCacheDocs(this._docCache, results.success); diff --git a/test/unit/conflict-handling.test.ts b/test/unit/conflict-handling.test.ts index 82d5a635fa2..4dc9e6dcc86 100644 --- a/test/unit/conflict-handling.test.ts +++ b/test/unit/conflict-handling.test.ts @@ -19,6 +19,9 @@ import { HumanDocumentType } from '../helper/schemas.ts'; config.parallel('conflict-handling.test.js', () => { describe('RxStorageInterface', () => { it('should resolve the emitted conflict of conflictResultionTasks()', async () => { + if (config.storage.name !== 'memory') { + return; + } const db = await createRxDatabase({ name: randomCouchString(10), storage: getRxStorageMemory(), diff --git a/test/unit/encryption.test.ts b/test/unit/encryption.test.ts index 574f2d1e5db..45134c9299c 100644 --- a/test/unit/encryption.test.ts +++ b/test/unit/encryption.test.ts @@ -292,7 +292,11 @@ config.parallel('encryption.test.ts', () => { return; } it('replication state meta should not contain a secret in cleartext', async () => { - if (config.storage.hasEncryption) { + if ( + config.storage.hasEncryption || + // this test deeply reaches into the internals of the memory storage + config.storage.name !== 'memory' + ) { return; } const clientCollection = await createEncryptedCollection(0, getRxStorageMemory()); From b1a552d850fed869dd5c78359b75ead0e5a3156d Mon Sep 17 00:00:00 2001 From: pubkey <8926560+pubkey@users.noreply.github.com> Date: Thu, 7 Dec 2023 14:54:08 +0100 Subject: [PATCH 3/5] FIX tests --- test/unit/import-export.test.ts | 2 +- test/unit/replication-protocol.test.ts | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/test/unit/import-export.test.ts b/test/unit/import-export.test.ts index 96b413ec4d0..b5204038059 100644 --- a/test/unit/import-export.test.ts +++ b/test/unit/import-export.test.ts @@ -89,7 +89,7 @@ config.parallel('import-export.test.js', () => { if (!config.storage.hasMultiInstance) { return; } - const col = await humansCollection.createMultiInstance('pref1', 5); + const col = await humansCollection.createMultiInstance(randomCouchString(10), 5); const json = await col.exportJSON(); const differentSchemaCol = await humansCollection.createNested(); await AsyncTestUtil.assertThrows( diff --git a/test/unit/replication-protocol.test.ts b/test/unit/replication-protocol.test.ts index a61dd066fa8..cc5280410ad 100644 --- a/test/unit/replication-protocol.test.ts +++ b/test/unit/replication-protocol.test.ts @@ -875,15 +875,10 @@ useParallel(testContext + ' (implementation: ' + config.storage.name + ')', () = assert.ok(masterDocs[0]._rev.startsWith('2-')); /** - * Ensure it only contains the _meta fields that we really need. + * Ensure it contains the _meta fields that we really need. */ const masterDoc = (await runQuery(masterInstance))[0]; - // should only have the 'lwt' - assert.strictEqual(Object.keys(masterDoc._meta).length, 1); - - // const forkDoc = (await runQuery(forkInstance))[0]; - // should only have the 'lwt' AND the current state of the master. - // assert.strictEqual(Object.keys(forkDoc._meta).length, 3); // TODO + assert.ok(masterDoc._meta.lwt); cleanUp(replicationState, masterInstance); }); From 8cdd952350edacf6cdbb96f7b6b8515735f2295b Mon Sep 17 00:00:00 2001 From: pubkey <8926560+pubkey@users.noreply.github.com> Date: Thu, 7 Dec 2023 15:53:25 +0100 Subject: [PATCH 4/5] FIX tests --- src/plugins/storage-denokv/index.ts | 6 ++ .../rx-storage-instance-denokv.ts | 32 +++++++--- test/helper/test-util.ts | 22 ++++++- test/unit/rx-storage-denokv.test.ts | 58 +++++++++++++++++++ test/unit/rx-storage-implementations.test.ts | 9 +-- 5 files changed, 114 insertions(+), 13 deletions(-) create mode 100644 test/unit/rx-storage-denokv.test.ts diff --git a/src/plugins/storage-denokv/index.ts b/src/plugins/storage-denokv/index.ts index 35a37d48b33..23b9ce51664 100644 --- a/src/plugins/storage-denokv/index.ts +++ b/src/plugins/storage-denokv/index.ts @@ -34,3 +34,9 @@ export function getRxStorageDenoKV( const storage = new RxStorageDenoKV(settings); return storage; } + + +export * from './denokv-helper.ts'; +export * from './denokv-types.ts'; +export * from './denokv-query.ts'; +export * from './rx-storage-instance-denokv.ts'; diff --git a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts index 0ebd307ccd1..336544b0de7 100644 --- a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts +++ b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts @@ -22,10 +22,27 @@ import type { } from '../../types/index.d.ts'; import { getPrimaryFieldOfPrimaryKey } from '../../rx-schema-helper.ts'; import { addRxStorageMultiInstanceSupport } from '../../rx-storage-multiinstance.ts'; -import type { DenoKVIndexMeta, DenoKVPreparedQuery, DenoKVSettings, DenoKVStorageInternals } from './denokv-types.ts'; +import type { + DenoKVIndexMeta, + DenoKVPreparedQuery, + DenoKVSettings, + DenoKVStorageInternals +} from './denokv-types.ts'; import { RxStorageDenoKV } from './index.ts'; -import { CLEANUP_INDEX, DENOKV_DOCUMENT_ROOT_PATH, DENOKV_VERSION_META_FLAG, RX_STORAGE_NAME_DENOKV, denoKvRowToDocument, getDenoGlobal, getDenoKVIndexName } from "./denokv-helper.ts"; -import { getIndexableStringMonad, getStartIndexStringFromLowerBound, changeIndexableStringByOneQuantum } from "../../custom-index.ts"; +import { + CLEANUP_INDEX, + DENOKV_DOCUMENT_ROOT_PATH, + DENOKV_VERSION_META_FLAG, + RX_STORAGE_NAME_DENOKV, + denoKvRowToDocument, + getDenoGlobal, + getDenoKVIndexName +} from "./denokv-helper.ts"; +import { + getIndexableStringMonad, + getStartIndexStringFromLowerBound, + changeIndexableStringByOneQuantum +} from "../../custom-index.ts"; import { appendToArray, batchArray, lastOfArray, toArray } from "../utils/utils-array.ts"; import { ensureNotFalsy } from "../utils/utils-other.ts"; import { categorizeBulkWriteRows } from "../../rx-storage-helper.ts"; @@ -36,7 +53,6 @@ import { PROMISE_RESOLVE_VOID } from "../utils/utils-promise.ts"; import { flatClone } from "../utils/utils-object.ts"; - export class RxStorageInstanceDenoKV implements RxStorageInstance< RxDocType, DenoKVStorageInternals, @@ -98,7 +114,6 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< error: [] }; - console.log('BULK WRITE:::'); console.log(JSON.stringify(documentWrites, null, 4)); @@ -159,6 +174,10 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< let tx = kv.atomic(); tx = tx.set([this.keySpace], ensureNotFalsy(writeBlockKey.value) + 1); + + console.log('WRITE BLOCK KEY:'); + console.log(JSON.stringify(writeBlockKey, null, 4)); + tx = tx.check(writeBlockKey); // INSERTS @@ -202,8 +221,6 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< // console.log('--------------------------'); // console.dir(txResult); // ret.success.map(d => d[primaryPath]) - - // const checkAfterIds = documentWrites.slice(0, 10); // const docsResult = await kv.getMany( // checkAfterIds.map(writeRow => { @@ -212,7 +229,6 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< // }) // ); // console.log(JSON.stringify(docsResult, null, 4)); - // } ret.success.map(d => { diff --git a/test/helper/test-util.ts b/test/helper/test-util.ts index 068b657ad94..c07aa97c48d 100644 --- a/test/helper/test-util.ts +++ b/test/helper/test-util.ts @@ -1,6 +1,6 @@ import type { Func } from 'mocha'; import assert from 'assert'; -import { RxCollection, requestIdlePromise } from '../../plugins/core/index.mjs'; +import { RxCollection, RxDocumentData, deepEqual, requestIdlePromise } from '../../plugins/core/index.mjs'; import { RxReplicationState } from '../../plugins/replication/index.mjs'; export function testMultipleTimes(times: number, title: string, test: Func) { @@ -48,3 +48,23 @@ export function ensureReplicationHasNoErrors(replicationState: RxReplicationStat throw err; }); } + +/** + * Ensure equal document states by ignoring the _meta value. + * This helps in tests with storage implementations + * that add additional _meta fields. + */ +export function assertEqualDocumentData( + doc1: RxDocumentData, + doc2: RxDocumentData +) { + const withoutMeta1 = Object.assign({}, doc1, { _meta: {} }); + const withoutMeta2 = Object.assign({}, doc2, { _meta: {} }); + if (!deepEqual(withoutMeta1, withoutMeta2)) { + console.dir({ + withoutMeta1, + withoutMeta2 + }); + throw new Error('assertEqualDocumentData(): Not Equal'); + } +} diff --git a/test/unit/rx-storage-denokv.test.ts b/test/unit/rx-storage-denokv.test.ts new file mode 100644 index 00000000000..0a9623c96f8 --- /dev/null +++ b/test/unit/rx-storage-denokv.test.ts @@ -0,0 +1,58 @@ +import assert from 'assert'; + +import config from './config.ts'; +import { + clone, + ensureNotFalsy, + fillWithDefaultSettings, + MangoQuery, + normalizeMangoQuery, + randomCouchString, + now, + createRevision +} from '../../plugins/core/index.mjs'; + +import { + getDenoGlobal +} from '../../plugins/storage-denokv/index.mjs'; + +import * as schemaObjects from '../helper/schema-objects.ts'; +import { + HumanDocumentType, + humanSchemaLiteral +} from '../helper/schemas.ts'; + +/** + * RxStorageDexie specific tests + */ +config.parallel('rx-storage-denokv.test.js', () => { + if (config.storage.name !== 'denokv') { + return; + } + function getDenoKV() { + return getDenoGlobal().openKv(); + } + describe('ensure correct assumptions of the api', () => { + it('must be able to detect on-insert conflicts', async () => { + const kv = await getDenoKV(); + + const key = [randomCouchString(10)]; + + // should work if not exits + const txResult = await kv.atomic() + .check({ key }) + .set(key, 1) + .commit(); + assert.ok(txResult.ok); + + // should error on conflict + const txResult2 = await kv.atomic() + .check({ key }) + .set(key, 1) + .commit(); + assert.strictEqual(txResult2.ok, false); + + kv.close(); + }); + }); +}); diff --git a/test/unit/rx-storage-implementations.test.ts b/test/unit/rx-storage-implementations.test.ts index 6f4b5868442..89618fbadbc 100644 --- a/test/unit/rx-storage-implementations.test.ts +++ b/test/unit/rx-storage-implementations.test.ts @@ -61,6 +61,7 @@ import { EXAMPLE_REVISION_4 } from '../helper/revisions.ts'; import { compressObject } from 'jsonschema-key-compression'; +import { assertEqualDocumentData } from '../helper/test-util.ts'; addRxPlugin(RxDBQueryBuilderPlugin); @@ -292,7 +293,7 @@ config.parallel('rx-storage-implementations.test.ts (implementation: ' + config. assert.deepStrictEqual(writeResponse.error, []); const first = writeResponse.success[0]; - assert.deepStrictEqual(docData, first); + assertEqualDocumentData(docData, first); storageInstance.close(); }); it('should error on conflict', async () => { @@ -815,7 +816,7 @@ config.parallel('rx-storage-implementations.test.ts (implementation: ' + config. const res2 = await storageInstance.bulkWrite( [{ - previous: docData, + previous: res1.success[0], document: clone(newDocData) }], testContext @@ -832,7 +833,7 @@ config.parallel('rx-storage-implementations.test.ts (implementation: ' + config. const res3 = await storageInstance.bulkWrite( [{ - previous: docData, + previous: res2.success[0], document: clone(newDocData) }], testContext @@ -936,7 +937,7 @@ config.parallel('rx-storage-implementations.test.ts (implementation: ' + config. updated._rev = EXAMPLE_REVISION_4; const updateResponse = await storageInstance.bulkWrite( [{ - previous: docData, + previous: insertResponse.success[0], document: updated }], testContext From 68a1c23f505d0b9caabf3878fc678296b8ddb270 Mon Sep 17 00:00:00 2001 From: pubkey <8926560+pubkey@users.noreply.github.com> Date: Thu, 7 Dec 2023 17:04:42 +0100 Subject: [PATCH 5/5] FIX deno tests and refactor bulkWrite --- .../rx-storage-instance-denokv.ts | 193 +++++++----------- src/rx-collection.ts | 3 - src/rx-storage-helper.ts | 75 ++++--- test/unit.test.ts | 1 + test/unit/migration-storage.test.ts | 3 +- test/unit/replication-protocol.test.ts | 7 +- test/unit/rx-query.test.ts | 3 +- test/unit/rx-storage-denokv.test.ts | 5 +- test/unit/rx-storage-implementations.test.ts | 9 +- 9 files changed, 133 insertions(+), 166 deletions(-) diff --git a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts index 336544b0de7..c73edb166af 100644 --- a/src/plugins/storage-denokv/rx-storage-instance-denokv.ts +++ b/src/plugins/storage-denokv/rx-storage-instance-denokv.ts @@ -45,12 +45,13 @@ import { } from "../../custom-index.ts"; import { appendToArray, batchArray, lastOfArray, toArray } from "../utils/utils-array.ts"; import { ensureNotFalsy } from "../utils/utils-other.ts"; -import { categorizeBulkWriteRows } from "../../rx-storage-helper.ts"; +import { categorizeBulkWriteRows, writeRowToEvent } from "../../rx-storage-helper.ts"; import { now } from "../utils/utils-time.ts"; import { queryDenoKV } from "./denokv-query.ts"; import { INDEX_MAX } from "../../query-planner.ts"; import { PROMISE_RESOLVE_VOID } from "../utils/utils-promise.ts"; import { flatClone } from "../utils/utils-object.ts"; +import { randomCouchString } from '../utils/utils-string.ts'; export class RxStorageInstanceDenoKV implements RxStorageInstance< @@ -106,163 +107,109 @@ export class RxStorageInstanceDenoKV implements RxStorageInstance< } async bulkWrite(documentWrites: BulkWriteRow[], context: string): Promise> { - console.log('DNEOKV.bulkWrite()'); const kv = await this.kvPromise; const primaryPath = this.primaryPath; const ret: RxStorageBulkWriteResponse = { success: [], error: [] }; + const eventBulk: EventBulk>, any> = { + id: randomCouchString(10), + events: [], + checkpoint: null, + context, + startTime: now(), + endTime: 0 + }; - console.log('BULK WRITE:::'); - console.log(JSON.stringify(documentWrites, null, 4)); - - // TODO remove this check when everything works + // TODO remove this check when everything works and denoKV storage is out of beta documentWrites.forEach(r => { if (r.previous && !r.previous._meta[DENOKV_VERSION_META_FLAG]) { console.error('PREVIOUS DENO META NOT SET:'); console.log(JSON.stringify(r, null, 4)); - const err = new Error('previous deno meta not set'); + const err = new Error('previous denokv meta not set'); console.log(err.stack); throw err; } }); - const batches = batchArray(documentWrites, ensureNotFalsy(this.settings.batchSize)); - - /** - * DenoKV does not have transactions - * so we use a special writeBlock row to ensure - * atomic writes (per document) - * and so that we can do bulkWrites - */ - for (const writeBatch of batches) { - while (true) { - const writeBlockKey = await kv.get([this.keySpace], this.kvOptions); - const docsInDB = new Map>(); - - /** - * TODO the max amount for .getMany() is 10 which is defined by deno itself. - * How can this be increased? - */ - const readManyBatches = batchArray(writeBatch, 10); - await Promise.all( - readManyBatches.map(async (readManyBatch) => { - const docsResult = await kv.getMany( - readManyBatch.map(writeRow => { - const docId: string = writeRow.document[primaryPath] as any; - return [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId]; - }) - ); - docsResult.map((row: any) => { - if (!row.value) { - return; - } - const docData = denoKvRowToDocument(row); - const docId: string = docData[primaryPath] as any; - docsInDB.set(docId, docData); - }); - }) - ); - const categorized = categorizeBulkWriteRows( - this, - this.primaryPath as any, - docsInDB, - writeBatch, - context - ); + await Promise.all( + documentWrites.map(async (writeRow) => { + const docId: string = writeRow.document[primaryPath] as any; + const key = [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId]; let tx = kv.atomic(); - tx = tx.set([this.keySpace], ensureNotFalsy(writeBlockKey.value) + 1); - - console.log('WRITE BLOCK KEY:'); - console.log(JSON.stringify(writeBlockKey, null, 4)); - - tx = tx.check(writeBlockKey); - - // INSERTS - categorized.bulkInsertDocs.forEach(writeRow => { - const docId: string = writeRow.document[this.primaryPath] as any; - ret.success.push(writeRow.document); - // insert document data - tx = tx.set([this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId], writeRow.document); - - // insert secondary indexes - Object.values(this.internals.indexes).forEach(indexMeta => { - const indexString = indexMeta.getIndexableString(writeRow.document as any); - tx = tx.set([this.keySpace, indexMeta.indexId, indexString], docId); + // conflict detection + if (writeRow.previous) { + tx = tx.check({ + key, + versionstamp: writeRow.previous._meta[DENOKV_VERSION_META_FLAG] }); - }); - // UPDATES - categorized.bulkUpdateDocs.forEach((writeRow: BulkWriteRow) => { - const docId: string = writeRow.document[this.primaryPath] as any; + } else { + tx = tx.check({ key }); + } - // insert document data - tx = tx.set([this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId], writeRow.document); + // insert document data + const kvKey = [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId]; + tx = tx.set(kvKey, writeRow.document); + + // insert secondary indexes + Object.values(this.internals.indexes).forEach(indexMeta => { + if (writeRow.previous) { - // insert secondary indexes - Object.values(this.internals.indexes).forEach(indexMeta => { const oldIndexString = indexMeta.getIndexableString(ensureNotFalsy(writeRow.previous)); const newIndexString = indexMeta.getIndexableString(writeRow.document as any); if (oldIndexString !== newIndexString) { tx = tx.delete([this.keySpace, indexMeta.indexId, oldIndexString]); tx = tx.set([this.keySpace, indexMeta.indexId, newIndexString], docId); } - }); - ret.success.push(writeRow.document as any); + } else { + Object.values(this.internals.indexes).forEach(indexMeta => { + const indexString = indexMeta.getIndexableString(writeRow.document as any); + tx = tx.set([this.keySpace, indexMeta.indexId, indexString], docId); + }); + } }); const txResult = await tx.commit(); - console.dir(txResult); - const newVersionStamp = txResult.versionstamp; - - // if (ret.success.length > 1 && ret.success.length < 10 && txResult.ok) { - // console.log('--------------------------'); - // console.dir(txResult); - // ret.success.map(d => d[primaryPath]) - // const checkAfterIds = documentWrites.slice(0, 10); - // const docsResult = await kv.getMany( - // checkAfterIds.map(writeRow => { - // const docId: string = writeRow.document[primaryPath] as any; - // return [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId]; - // }) - // ); - // console.log(JSON.stringify(docsResult, null, 4)); - // } - - ret.success.map(d => { - d = flatClone(d); - d._meta = flatClone(d._meta); - d._meta[DENOKV_VERSION_META_FLAG] = newVersionStamp; - }) - if (txResult.ok) { - appendToArray(ret.error, categorized.errors); - - categorized.eventBulk.events.forEach(ev => { - ev.documentData._meta[DENOKV_VERSION_META_FLAG] = newVersionStamp; - if (ev.previousDocumentData) { - // ev.previousDocumentData._meta[DENOKV_VERSION_META_FLAG] = newVersionStamp; - } - }); - - if (categorized.eventBulk.events.length > 0) { - const lastState = ensureNotFalsy(categorized.newestRow).document; - categorized.eventBulk.checkpoint = { - id: lastState[primaryPath], - lwt: lastState._meta.lwt - }; - categorized.eventBulk.endTime = now(); - this.changes$.next(categorized.eventBulk); + const newDoc = flatClone(writeRow.document); + newDoc._meta = flatClone(newDoc._meta); + newDoc._meta[DENOKV_VERSION_META_FLAG] = txResult.versionstamp; + ret.success.push(newDoc); + const event = writeRowToEvent(docId, writeRow, false); + if (event) { + event.documentData = newDoc; + eventBulk.events.push(event); + } + } else { + const docInDb = await kv.get(kvKey); + if (docInDb.value) { + ret.error.push({ + status: 409, + isError: true, + writeRow, + documentId: docId, + documentInDb: denoKvRowToDocument(docInDb) + }); + } else { + throw new Error('unknown denoKV write error'); } - break; } - } - } + }) + ); - console.log('DENO.bulkWrite() innerst return:'); - console.log(JSON.stringify(ret, null, 4)); + const lastEvent = lastOfArray(eventBulk.events); + if (lastEvent) { + const lastState = lastEvent.documentData; + eventBulk.checkpoint = { + id: lastState[primaryPath], + lwt: lastState._meta.lwt + }; + eventBulk.endTime = now(); + this.changes$.next(eventBulk); + } return ret; } diff --git a/src/rx-collection.ts b/src/rx-collection.ts index 1cc239d417e..fe2c0a1530b 100644 --- a/src/rx-collection.ts +++ b/src/rx-collection.ts @@ -339,9 +339,6 @@ export class RxCollectionBase< 'rx-collection-bulk-insert' ); - console.log('BULK INSERT RESULT:'); - console.log(JSON.stringify(results, null, 4)); - // create documents const rxDocuments = mapDocumentsDataToCacheDocs(this._docCache, results.success); diff --git a/src/rx-storage-helper.ts b/src/rx-storage-helper.ts index ced6ea4649e..a3b7117784b 100644 --- a/src/rx-storage-helper.ts +++ b/src/rx-storage-helper.ts @@ -33,7 +33,6 @@ import type { import { PROMISE_RESOLVE_TRUE, appendToArray, - arrayFilterNotEmpty, createRevision, ensureNotFalsy, flatClone, @@ -429,33 +428,14 @@ export function categorizeBulkWriteRows( } newestRow = updatedRow as any; } - - let eventDocumentData: RxDocumentData | undefined = null as any; - let previousEventDocumentData: RxDocumentData | undefined = null as any; - let operation: 'INSERT' | 'UPDATE' | 'DELETE' = null as any; - - if (previousDeleted && !documentDeleted) { - operation = 'INSERT'; - eventDocumentData = hasAttachments ? stripAttachmentsDataFromDocument(document) : document as any; - } else if (previous && !previousDeleted && !documentDeleted) { - operation = 'UPDATE'; - eventDocumentData = hasAttachments ? stripAttachmentsDataFromDocument(document) : document as any; - previousEventDocumentData = previous; - } else if (documentDeleted) { - operation = 'DELETE'; - eventDocumentData = ensureNotFalsy(document) as any; - previousEventDocumentData = previous; - } else { - throw newRxError('SNH', { args: { writeRow } }); + const event = writeRowToEvent( + docId, + writeRow, + hasAttachments + ); + if (event) { + eventBulkEvents.push(event); } - - const event = { - documentId: docId, - documentData: eventDocumentData as RxDocumentData, - previousDocumentData: previousEventDocumentData, - operation: operation - }; - eventBulkEvents.push(event); } } @@ -471,6 +451,47 @@ export function categorizeBulkWriteRows( }; } +export function writeRowToEvent( + docId: string, + writeRow: BulkWriteRow, + hasAttachments: boolean +): RxStorageChangeEvent> | null { + const previous = writeRow.previous; + const document = writeRow.document; + const previousDeleted = previous && previous._deleted; + const documentDeleted = writeRow.document._deleted; + + let operation: 'INSERT' | 'UPDATE' | 'DELETE' = null as any; + let eventDocumentData; + let previousEventDocumentData; + if (!previous && !documentDeleted) { + operation = 'INSERT'; + eventDocumentData = hasAttachments ? stripAttachmentsDataFromDocument(document) : document as any; + } else if (previousDeleted && !documentDeleted) { + operation = 'INSERT'; + eventDocumentData = hasAttachments ? stripAttachmentsDataFromDocument(document) : document as any; + } else if (previous && !previousDeleted && !documentDeleted) { + operation = 'UPDATE'; + eventDocumentData = hasAttachments ? stripAttachmentsDataFromDocument(document) : document as any; + previousEventDocumentData = previous; + } else if (documentDeleted) { + operation = 'DELETE'; + eventDocumentData = ensureNotFalsy(document) as any; + previousEventDocumentData = previous; + } else { + throw newRxError('SNH', { args: { writeRow } }); + } + + return { + documentId: docId, + documentData: eventDocumentData as RxDocumentData, + previousDocumentData: previousEventDocumentData, + operation: operation + }; + +} + + export function stripAttachmentsDataFromRow(writeRow: BulkWriteRow): BulkWriteRowProcessed { return { previous: writeRow.previous, diff --git a/test/unit.test.ts b/test/unit.test.ts index ba249b4abb5..60707e930bd 100644 --- a/test/unit.test.ts +++ b/test/unit.test.ts @@ -10,6 +10,7 @@ import './unit/util.test.ts'; import './unit/custom-index.test.ts'; import './unit/query-planner.test.ts'; +import './unit/rx-storage-denokv.test.ts'; /** * Move these tests around so that diff --git a/test/unit/migration-storage.test.ts b/test/unit/migration-storage.test.ts index a8880b8b3b4..b51d93d56ba 100644 --- a/test/unit/migration-storage.test.ts +++ b/test/unit/migration-storage.test.ts @@ -77,7 +77,8 @@ const testStorages = [ const DB_PREFIX = 'test-db-'; testStorages.forEach(storages => { - describe('migration-storage.test.ts (' + storages.name + ')', () => { + describe('migration-storage.test.ts (' + storages.name + ')', function () { + this.timeout(1000 * 20); describe('basic migrations', () => { it('create both databases', async () => { const oldDb = await storages.createRxDatabaseOld({ diff --git a/test/unit/replication-protocol.test.ts b/test/unit/replication-protocol.test.ts index cc5280410ad..577b298525f 100644 --- a/test/unit/replication-protocol.test.ts +++ b/test/unit/replication-protocol.test.ts @@ -71,7 +71,8 @@ function ensureReplicationHasNoErrors(replicationState: RxStorageInstanceReplica }); } -useParallel(testContext + ' (implementation: ' + config.storage.name + ')', () => { +useParallel(testContext + ' (implementation: ' + config.storage.name + ')', function () { + this.timeout(1000 * 20); if (!config.storage.hasReplication) { return; } @@ -900,7 +901,7 @@ useParallel(testContext + ' (implementation: ' + config.storage.name + ')', () = }); docData._rev = createRevision(randomCouchString(10), docData); docData._meta.lwt = now(); - await instance.bulkWrite([{ + const insertResult = await instance.bulkWrite([{ document: docData }], testContext); @@ -910,7 +911,7 @@ useParallel(testContext + ' (implementation: ' + config.storage.name + ')', () = newDocData._rev = createRevision(randomCouchString(10), docData); newDocData._meta.lwt = now(); const updateResult = await instance.bulkWrite([{ - previous: docData, + previous: insertResult.success[0], document: newDocData }], testContext); assert.deepStrictEqual(updateResult.error, []); diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index d18b1292994..54af3badf92 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -741,7 +741,8 @@ describe('rx-query.test.ts', () => { }); }); config.parallel('issues', () => { - it('#278 queryCache breaks when pointer out of bounds', async () => { + it('#278 queryCache breaks when pointer out of bounds', async function () { + this.timeout(1000 * 20); const c = await humansCollection.createPrimary(0); // insert some docs diff --git a/test/unit/rx-storage-denokv.test.ts b/test/unit/rx-storage-denokv.test.ts index 0a9623c96f8..26d0fe3b5d9 100644 --- a/test/unit/rx-storage-denokv.test.ts +++ b/test/unit/rx-storage-denokv.test.ts @@ -35,17 +35,16 @@ config.parallel('rx-storage-denokv.test.js', () => { describe('ensure correct assumptions of the api', () => { it('must be able to detect on-insert conflicts', async () => { const kv = await getDenoKV(); - const key = [randomCouchString(10)]; - // should work if not exits + // should .set() if not exits const txResult = await kv.atomic() .check({ key }) .set(key, 1) .commit(); assert.ok(txResult.ok); - // should error on conflict + // should error on conflict because exists already const txResult2 = await kv.atomic() .check({ key }) .set(key, 1) diff --git a/test/unit/rx-storage-implementations.test.ts b/test/unit/rx-storage-implementations.test.ts index 89618fbadbc..acbf515e7af 100644 --- a/test/unit/rx-storage-implementations.test.ts +++ b/test/unit/rx-storage-implementations.test.ts @@ -439,7 +439,6 @@ config.parallel('rx-storage-implementations.test.ts (implementation: ' + config. assert.deepStrictEqual(insertResponse.error, []); const first = insertResponse.success[0]; - // make an update const updateData = Object.assign({}, insertData, { value: 'barfoo2', @@ -450,7 +449,7 @@ config.parallel('rx-storage-implementations.test.ts (implementation: ' + config. }); const updateResponse = await storageInstance.bulkWrite( [{ - previous: insertData, + previous: insertResponse.success[0], document: updateData }], testContext @@ -460,7 +459,7 @@ config.parallel('rx-storage-implementations.test.ts (implementation: ' + config. // make the delete const deleteResponse = await storageInstance.bulkWrite( [{ - previous: updateData, + previous: updateResponse.success[0], document: Object.assign({}, first, { value: 'barfoo_deleted', _deleted: true, @@ -767,7 +766,7 @@ config.parallel('rx-storage-implementations.test.ts (implementation: ' + config. assert.deepStrictEqual(writeResponse.error, []); const getDocFromDb = await storageInstance.findDocumentsById([docData.id], false); - assert.deepStrictEqual( + assertEqualDocumentData( getDocFromDb[0], compressedDocData ); @@ -1893,7 +1892,7 @@ config.parallel('rx-storage-implementations.test.ts (implementation: ' + config. const found = await storageInstance.findDocumentsById(['foobar'], false); const foundDoc = found[0]; - assert.deepStrictEqual(foundDoc, docData); + assertEqualDocumentData(foundDoc, docData); storageInstance.close(); });