Replies: 2 comments · 12 replies
-
@thaoula can you please provide the code snippet you used to repro this behavior? |
Beta Was this translation helpful? Give feedback.
All reactions
-
Hi @vkarpov15, Thanks for replying. Please find the code below where I am using a MongoDB change stream obtained by calling .getClient(). I have been running the changestream for the last 3 to 4 days and seems to be still active and I am restart the app and resume where I left off. The mongoose based changestream never raised any events and is missing RESUME_TOKEN_CHANGED which I think is very important when tracking changes on collections / databases where changes are infrequent. The token received in the last change event may have been removed from the oplog so you cannot resume.
FYI we are using Mongo DB 6 and Mongoose 7.3.0. In production we are using Atlas cluster. Regards, |
Beta Was this translation helpful? Give feedback.
All reactions
-
Copying my response from #13607:
'use strict';
const mongoose = require('mongoose');
void async function main() {
const uri = 'mongodb://127.0.0.1:27017,127.0.0.1:27018/mongoose_test';
await mongoose.connect(uri);
const Person = mongoose.model('Person', mongoose.Schema({ name: String }));
await Person.createCollection();
// Create a change stream. The 'change' event gets emitted when there's a
// change in the database
const changeStream = await Person.watch();
changeStream.on('change', data => console.log(new Date(), data));
changeStream.on('resumeTokenChanged', data => console.log(new Date(), 'resumeTokenChanged', data));
changeStream.on('error', data => console.log(new Date(), 'error', data));
changeStream.on('close', data => console.log(new Date(), 'close', data));
await new Promise(resolve => setTimeout(resolve, 1000));
// Insert a doc, will trigger the change stream handler above
console.log(new Date(), 'Inserting doc');
await Person.create({ name: 'Axl Rose' });
console.log(new Date(), 'Inserted doc');
}(); Prints the following output if I kill the entire replica set after a few seconds:
|
Beta Was this translation helpful? Give feedback.
All reactions
-
I am facing the same problem. There is no absolute solution to this at least in mongoose. I have to try the native nodejs driver |
Beta Was this translation helpful? Give feedback.
All reactions
-
@anantanandgupta it is a bit strange that the let resumeAfter = null;
let changeStream = await Person.watch();
addChangeStreamListeners(changeStream);
function addChangeStreamListeners(changeStream) {
changeStream.on('change', data => console.log(new Date(), data));
changeStream.on('resumeTokenChanged', data => {
console.log(new Date(), 'resumeTokenChanged', data);
resumeAfter = data;
});
changeStream.on('error', data => console.log(new Date(), 'error', data));
changeStream.on('close', data => {
console.log(new Date(), 'close', data);
changeStream = Person.watch([], { resumeAfter });
addChangeStreamListeners(changeStream);
});
} |
Beta Was this translation helpful? Give feedback.
All reactions
-
Thank @vkarpov15. We were doing the same by exiting the process and restarting the process itself (being run in docker and relying on the capability of it to restart on error). For multiple close event however i have no idea as the above code will create a new change handlers in this as. I have tried earlier this approach only, but then the change event was firing multiple times and it kept growing on (for an instance in testing each change in database it sent almost 1024 emails). recently i observed another issue that though the changestream is closed the event listers are somehow active and still processing the events even after we set the changestream variable to new watch in the close callback. For your question on version number (i am sure we are using latest but will check again) and our current working code I will post another reply in some time once I go on my desk. (just wokeup 🥱🥱🥱) |
Beta Was this translation helpful? Give feedback.
All reactions
-
here is my final piece of code how to read this code:
this code only fails when the resume token is not valid as there is no error thrown in that case. there should be a mechanism for lib to tell if the connection is closed, because the provided resume token is invalid. (you can test the use case by opening a changestream with invalid / expired resume token). import { Constants, Types } from '@sap/common';
import * as database from '@sap/database';
import {
onEmailVerificationQueue,
onForgotPasswordQueue,
onSchoolInvitationQueue,
onSchoolRegistrationQueue,
onStudentInvitationQueue,
} from './handlers';
export * from './handlers';
let forgotPasswordChangeStream: undefined | Awaited<ReturnType<typeof createChangeStream>> = undefined;
let schoolInvitationChangeStream: undefined | Awaited<ReturnType<typeof createChangeStream>> = undefined;
let studentInvitationChangeStream: undefined | Awaited<ReturnType<typeof createChangeStream>> = undefined;
let schoolRegistrationQueueChangeStream: undefined | Awaited<ReturnType<typeof createChangeStream>> = undefined;
let emailVerificationChangeStream: undefined | Awaited<ReturnType<typeof createChangeStream>> = undefined;
const createChangeStream = async (resumeTokenPurpose: Types.ResumeTokenPurpose, queueHandler?: (change: any) => void, operationType: string = 'insert') => {
try {
let resumeToken = (await database.Data.ResumeTokenData.readResumeToken(resumeTokenPurpose))?.toObject();
const pipeline = [
{
$match: {
$and: [
{ 'fullDocument.purpose': resumeTokenPurpose },
{ 'operationType': operationType },
],
},
},
];
const changeStream = database.Data.VerificationCodeData.watch(pipeline, { resumeAfter: resumeToken ? { _data: resumeToken?._data } : undefined });
changeStream.on('change', async (change: any) => {
if (resumeToken?._data !== change?._id?._data) {
resumeToken = await database.Data.ResumeTokenData.saveResumeToken(change._id, resumeTokenPurpose);
}
queueHandler?.(change.fullDocument);
});
changeStream.on('error', (...args) => {
console.error(`[Change Stream] Error in ${resumeTokenPurpose} change stream:`, args);
// we have to do this so that the docker container is destroyed and recreated, otherwise the change stream listener is never recovered.
process.exit(1);
});
changeStream.on('close', (...args) => {
console.error(`[Change Stream] Closed ${resumeTokenPurpose} change stream:`, args);
});
return changeStream;
} catch (error) {
console.error(`[Change Stream] Error connecting to the database for ${resumeTokenPurpose}:`, error);
// we have to do this so that the docker container is destroyed and recreated, otherwise the change stream listener is never recovered.
process.exit(1);
}
};
const closeStream = async (stream: undefined | Awaited<ReturnType<typeof createChangeStream>>) => {
if (stream && stream.listenerCount('change') > 0) {
console.log('[Change Stream] Removing Listeners');
stream?.removeAllListeners('change');
return stream.close();
} else {
return;
}
};
// initiate streams
database.Connection.getConnection(undefined, async () => {
await closeStream(forgotPasswordChangeStream)
.then(async () => {
console.log('[Change Stream] Creating for Forgot Password');
forgotPasswordChangeStream = await createChangeStream(Constants.ResumeTokenPurpose.forgot_password, onForgotPasswordQueue, 'insert');
});
await closeStream(schoolInvitationChangeStream)
.then(async () => {
console.log('[Change Stream] Creating for School Invitation');
schoolInvitationChangeStream = await createChangeStream(Constants.ResumeTokenPurpose.school_invitation, onSchoolInvitationQueue, 'insert');
});
await closeStream(studentInvitationChangeStream)
.then(async () => {
console.log('[Change Stream] Creating for Student Invitation');
studentInvitationChangeStream = await createChangeStream(Constants.ResumeTokenPurpose.student_invitation, onStudentInvitationQueue, 'insert');
});
await closeStream(schoolRegistrationQueueChangeStream)
.then(async () => {
console.log('[Change Stream] Creating for School Registration');
schoolRegistrationQueueChangeStream = await createChangeStream(Constants.ResumeTokenPurpose.school_registration, onSchoolRegistrationQueue, 'insert');
});
await closeStream(emailVerificationChangeStream)
.then(async () => {
console.log('[Change Stream] Creating for Email Verification');
emailVerificationChangeStream = await createChangeStream(Constants.ResumeTokenPurpose.email_verification, onEmailVerificationQueue, 'insert');
});
}); |
Beta Was this translation helpful? Give feedback.
All reactions
-
also for the mongoose version, it is 7.6.3 |
Beta Was this translation helpful? Give feedback.
All reactions
-
updated to version 7.6.9 |
Beta Was this translation helpful? Give feedback.
-
Hi All,
We are trying to utilise ChangeStreams using Mongoose and have encountered some challenges with reliability.
We have the following issues:
For some unknown reason, our DB level ChangeStream which is connected to an Atlas Cluster stops receiving changes. We do not get any errors even though we are using ChangeStream.on('error') listener. Nothing is logged.
When this happens, restarting the container but keeping the resume token i.e we are setting resumeAfter does not work. We only get changes after deleting the resume token and restarting the container.
Before logging some Github issues I wanted to see if anyone has experienced problems like this and whether you have some tips.
What Works
For some unknown reason (maybe low activity due to weekend or overnight, or primary changes), the change stream stops receiving change events. I am not sure how to debug this issue as no event listeners are called on the change stream. ie. error, resumeTokenChanged, closed events are never ever triggered.
By chance after reviewing the mongoose code and noticing the returned changestream was a wrapped object, I decided to just use connection.getClient().watch which returns a mongdb native ChangeStream and I am able to successfully listen to resumeTokenChanged, closed, change and error events as I would have expected when using connection.watch().
When using this approach, I suprisingly found that the resumeTokenChanged event is called almost once per second and the token is changing. Also, the token returned by this event is also much shorter than the resume token that is provided in the change event.
I also noticed that if i stop the database (test failure) the change stream is closed (it calls the close event listener) and it is able to resume when i restart the database server.
My questions are -
Sorry for the all the questions.
Regards,
Tarek
Beta Was this translation helpful? Give feedback.
All reactions