Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/deno improvements #5367

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/incremental-write.ts
Expand Up @@ -209,10 +209,13 @@ export function modifierFromPublicToInternal<RxDocType>(
}


/**
* From a list of document states,
* returns the newest one, based on revision.
*/
export function findNewestOfDocumentStates<RxDocType>(
docs: RxDocumentData<RxDocType>[]
): RxDocumentData<RxDocType> {

let newest = docs[0];
let newestRevisionHeight = parseRevision(newest._rev).height;
docs.forEach(doc => {
Expand Down
6 changes: 6 additions & 0 deletions src/plugin-helpers.ts
Expand Up @@ -212,7 +212,13 @@ export function wrapRxStorageInstance<RxDocType>(
})
);

console.log('--- BEFORE!!');
console.log(JSON.stringify(useRows, null, 4));
const writeResult = await instance.bulkWrite(useRows, context);

console.log('WRTIE RESULT IN STORAGE WRAPPER:');
console.log(JSON.stringify(writeResult, null, 4));

const ret: RxStorageBulkWriteResponse<RxDocType> = {
success: [],
error: []
Expand Down
9 changes: 9 additions & 0 deletions src/plugins/storage-denokv/denokv-helper.ts
@@ -1,4 +1,5 @@
import { RxStorageDefaultStatics } from "../../rx-storage-statics.ts";
import type { RxDocumentData } from '../../types/rx-storage';

export const RX_STORAGE_NAME_DENOKV = 'denokv';
export const RxStorageDenoKVStatics = RxStorageDefaultStatics;
Expand All @@ -13,9 +14,17 @@ export function getDenoKVIndexName(index: string[]): string {
*/
export const DENOKV_DOCUMENT_ROOT_PATH = '||';

export const DENOKV_VERSION_META_FLAG = 'denokv';
export const CLEANUP_INDEX: string[] = ['_deleted', '_meta.lwt'];



export function denoKvRowToDocument<RxDocType>(row: any): RxDocumentData<RxDocType> {
const docData = row.value;
docData._meta[DENOKV_VERSION_META_FLAG] = row.versionstamp;
return docData;
}

/**
* Get the global Deno variable from globalThis.Deno
* so that compiling with plain typescript does not fail.
Expand Down
6 changes: 3 additions & 3 deletions src/plugins/storage-denokv/denokv-query.ts
Expand Up @@ -11,7 +11,7 @@ import type {
import { ensureNotFalsy } from '../../plugins/utils/index.ts';
import { getQueryMatcher, getSortComparator } from '../../rx-query-helper.ts';
import { RxStorageInstanceDenoKV } from "./rx-storage-instance-denokv.ts";
import { DENOKV_DOCUMENT_ROOT_PATH, getDenoKVIndexName } from "./denokv-helper.ts";
import { DENOKV_DOCUMENT_ROOT_PATH, denoKvRowToDocument, getDenoKVIndexName } from "./denokv-helper.ts";
import type { DenoKVPreparedQuery } from "./denokv-types.ts";

export async function queryDenoKV<RxDocType>(
Expand Down Expand Up @@ -84,7 +84,7 @@ export async function queryDenoKV<RxDocType>(
if (singleDocResult.value) {
const docId: string = singleDocResult.value;
const docDataResult = await kv.get([instance.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId], instance.kvOptions);
const docData = ensureNotFalsy(docDataResult.value);
const docData = denoKvRowToDocument<RxDocType>(docDataResult);
if (!queryMatcher || queryMatcher(docData)) {
result.push(docData);
}
Expand All @@ -106,7 +106,7 @@ export async function queryDenoKV<RxDocType>(
for await (const indexDocEntry of range) {
const docId = indexDocEntry.value;
const docDataResult = await kv.get([instance.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId], instance.kvOptions);
const docData = ensureNotFalsy(docDataResult.value);
const docData = denoKvRowToDocument<RxDocType>(docDataResult);
if (!queryMatcher || queryMatcher(docData)) {
result.push(docData);
}
Expand Down
6 changes: 6 additions & 0 deletions src/plugins/storage-denokv/index.ts
Expand Up @@ -34,3 +34,9 @@ export function getRxStorageDenoKV(
const storage = new RxStorageDenoKV(settings);
return storage;
}


export * from './denokv-helper.ts';
export * from './denokv-types.ts';
export * from './denokv-query.ts';
export * from './rx-storage-instance-denokv.ts';
190 changes: 105 additions & 85 deletions src/plugins/storage-denokv/rx-storage-instance-denokv.ts
Expand Up @@ -22,19 +22,36 @@ import type {
} from '../../types/index.d.ts';
import { getPrimaryFieldOfPrimaryKey } from '../../rx-schema-helper.ts';
import { addRxStorageMultiInstanceSupport } from '../../rx-storage-multiinstance.ts';
import type { DenoKVIndexMeta, DenoKVPreparedQuery, DenoKVSettings, DenoKVStorageInternals } from './denokv-types.ts';
import type {
DenoKVIndexMeta,
DenoKVPreparedQuery,
DenoKVSettings,
DenoKVStorageInternals
} from './denokv-types.ts';
import { RxStorageDenoKV } from './index.ts';
import { CLEANUP_INDEX, DENOKV_DOCUMENT_ROOT_PATH, RX_STORAGE_NAME_DENOKV, getDenoGlobal, getDenoKVIndexName } from "./denokv-helper.ts";
import { getIndexableStringMonad, getStartIndexStringFromLowerBound, changeIndexableStringByOneQuantum } from "../../custom-index.ts";
import {
CLEANUP_INDEX,
DENOKV_DOCUMENT_ROOT_PATH,
DENOKV_VERSION_META_FLAG,
RX_STORAGE_NAME_DENOKV,
denoKvRowToDocument,
getDenoGlobal,
getDenoKVIndexName
} from "./denokv-helper.ts";
import {
getIndexableStringMonad,
getStartIndexStringFromLowerBound,
changeIndexableStringByOneQuantum
} from "../../custom-index.ts";
import { appendToArray, batchArray, lastOfArray, toArray } from "../utils/utils-array.ts";
import { ensureNotFalsy } from "../utils/utils-other.ts";
import { categorizeBulkWriteRows } from "../../rx-storage-helper.ts";
import { categorizeBulkWriteRows, writeRowToEvent } from "../../rx-storage-helper.ts";
import { now } from "../utils/utils-time.ts";
import { queryDenoKV } from "./denokv-query.ts";
import { INDEX_MAX } from "../../query-planner.ts";
import { PROMISE_RESOLVE_VOID } from "../utils/utils-promise.ts";
import { flatClone } from "../utils/utils-object.ts";

import { randomCouchString } from '../utils/utils-string.ts';


export class RxStorageInstanceDenoKV<RxDocType> implements RxStorageInstance<
Expand Down Expand Up @@ -96,104 +113,104 @@ export class RxStorageInstanceDenoKV<RxDocType> implements RxStorageInstance<
success: [],
error: []
};
const eventBulk: EventBulk<RxStorageChangeEvent<RxDocumentData<RxDocType>>, any> = {
id: randomCouchString(10),
events: [],
checkpoint: null,
context,
startTime: now(),
endTime: 0
};

const batches = batchArray(documentWrites, ensureNotFalsy(this.settings.batchSize));

/**
* DenoKV does not have transactions
* so we use a special writeBlock row to ensure
* atomic writes (per document)
* and so that we can do bulkWrites
*/
for (const writeBatch of batches) {
while (true) {
const writeBlockKey = await kv.get([this.keySpace], this.kvOptions);
const docsInDB = new Map<string, RxDocumentData<RxDocType>>();
// TODO remove this check when everything works and denoKV storage is out of beta
documentWrites.forEach(r => {
if (r.previous && !r.previous._meta[DENOKV_VERSION_META_FLAG]) {
console.error('PREVIOUS DENO META NOT SET:');
console.log(JSON.stringify(r, null, 4));
const err = new Error('previous denokv meta not set');
console.log(err.stack);
throw err;
}
});

/**
* TODO the max amount for .getMany() is 10 which is defined by deno itself.
* How can this be increased?
*/
const readManyBatches = batchArray(writeBatch, 10);
await Promise.all(
readManyBatches.map(async (readManyBatch) => {
const docsResult = await kv.getMany(
readManyBatch.map(writeRow => {
const docId: string = writeRow.document[primaryPath] as any;
return [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId];
})
);
docsResult.map((row: any) => {
const docData = row.value;
if (!docData) {
return;
}
const docId: string = docData[primaryPath] as any;
docsInDB.set(docId, docData);
});
})
);
const categorized = categorizeBulkWriteRows<RxDocType>(
this,
this.primaryPath as any,
docsInDB,
writeBatch,
context
);

await Promise.all(
documentWrites.map(async (writeRow) => {
const docId: string = writeRow.document[primaryPath] as any;
const key = [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId];
let tx = kv.atomic();
tx = tx.set([this.keySpace], ensureNotFalsy(writeBlockKey.value) + 1);
tx = tx.check(writeBlockKey);

// INSERTS
categorized.bulkInsertDocs.forEach(writeRow => {
const docId: string = writeRow.document[this.primaryPath] as any;
ret.success.push(writeRow.document);

// insert document data
tx = tx.set([this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId], writeRow.document);

// insert secondary indexes
Object.values(this.internals.indexes).forEach(indexMeta => {
const indexString = indexMeta.getIndexableString(writeRow.document as any);
tx = tx.set([this.keySpace, indexMeta.indexId, indexString], docId);
// conflict detection
if (writeRow.previous) {
tx = tx.check({
key,
versionstamp: writeRow.previous._meta[DENOKV_VERSION_META_FLAG]
});
});
// UPDATES
categorized.bulkUpdateDocs.forEach((writeRow: BulkWriteRow<RxDocType>) => {
const docId: string = writeRow.document[this.primaryPath] as any;
} else {
tx = tx.check({ key });
}

// insert document data
const kvKey = [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId];
tx = tx.set(kvKey, writeRow.document);

// insert document data
tx = tx.set([this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId], writeRow.document);
// insert secondary indexes
Object.values(this.internals.indexes).forEach(indexMeta => {
if (writeRow.previous) {

// insert secondary indexes
Object.values(this.internals.indexes).forEach(indexMeta => {
const oldIndexString = indexMeta.getIndexableString(ensureNotFalsy(writeRow.previous));
const newIndexString = indexMeta.getIndexableString(writeRow.document as any);
if (oldIndexString !== newIndexString) {
tx = tx.delete([this.keySpace, indexMeta.indexId, oldIndexString]);
tx = tx.set([this.keySpace, indexMeta.indexId, newIndexString], docId);
}
});
ret.success.push(writeRow.document as any);
} else {
Object.values(this.internals.indexes).forEach(indexMeta => {
const indexString = indexMeta.getIndexableString(writeRow.document as any);
tx = tx.set([this.keySpace, indexMeta.indexId, indexString], docId);
});
}
});

const txResult = await tx.commit();
if (txResult.ok) {
appendToArray(ret.error, categorized.errors);
if (categorized.eventBulk.events.length > 0) {
const lastState = ensureNotFalsy(categorized.newestRow).document;
categorized.eventBulk.checkpoint = {
id: lastState[primaryPath],
lwt: lastState._meta.lwt
};
categorized.eventBulk.endTime = now();
this.changes$.next(categorized.eventBulk);
const newDoc = flatClone(writeRow.document);
newDoc._meta = flatClone(newDoc._meta);
newDoc._meta[DENOKV_VERSION_META_FLAG] = txResult.versionstamp;
ret.success.push(newDoc);
const event = writeRowToEvent(docId, writeRow, false);
if (event) {
event.documentData = newDoc;
eventBulk.events.push(event);
}
} else {
const docInDb = await kv.get(kvKey);
if (docInDb.value) {
ret.error.push({
status: 409,
isError: true,
writeRow,
documentId: docId,
documentInDb: denoKvRowToDocument(docInDb)
});
} else {
throw new Error('unknown denoKV write error');
}
break;
}
}
})
);

const lastEvent = lastOfArray(eventBulk.events);
if (lastEvent) {
const lastState = lastEvent.documentData;
eventBulk.checkpoint = {
id: lastState[primaryPath],
lwt: lastState._meta.lwt
};
eventBulk.endTime = now();
this.changes$.next(eventBulk);
}

return ret;
}
async findDocumentsById(ids: string[], withDeleted: boolean): Promise<RxDocumentData<RxDocType>[]> {
Expand All @@ -203,7 +220,10 @@ export class RxStorageInstanceDenoKV<RxDocType> implements RxStorageInstance<
ids.map(async (docId) => {
const kvKey = [this.keySpace, DENOKV_DOCUMENT_ROOT_PATH, docId];
const findSingleResult = await kv.get(kvKey, this.kvOptions);
const docInDb = findSingleResult.value;
if (!findSingleResult.value) {
return;
}
const docInDb = denoKvRowToDocument<RxDocType>(findSingleResult);
if (
docInDb &&
(
Expand Down Expand Up @@ -303,7 +323,7 @@ export class RxStorageInstanceDenoKV<RxDocType> implements RxStorageInstance<
for (const batch of batches) {
const docs = await kv.getMany(batch);
docs.forEach((row: any) => {
const docData = row.value;
const docData = denoKvRowToDocument<RxDocType>(row);
result.push(docData as any);
});
}
Expand Down Expand Up @@ -371,7 +391,7 @@ export class RxStorageInstanceDenoKV<RxDocType> implements RxStorageInstance<
if (!docDataResult.value) {
continue;
}
const docData = ensureNotFalsy(docDataResult.value);
const docData = denoKvRowToDocument<RxDocType>(docDataResult);
if (
!docData._deleted ||
docData._meta.lwt > maxDeletionTime
Expand Down
7 changes: 7 additions & 0 deletions src/rx-document.ts
Expand Up @@ -424,6 +424,13 @@ export function createRxDocumentConstructor(proto = basePrototype) {
collection: RxCollection,
docData: RxDocumentData<any>
) {

// TODO remove this check when everything works
if (docData && !docData._meta.denokv) {
console.log(JSON.stringify(docData, null, 4));
throw new Error('DO NOT CREATE DOC LIKE THIS WITHOUT META.denokv');
}

this.collection = collection;

// assume that this is always equal to the doc-data in the database
Expand Down