-
Notifications
You must be signed in to change notification settings - Fork 5
/
write-server.js
115 lines (97 loc) · 3.63 KB
/
write-server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
RPS.write = function (collection, method, options) {
options = options || {};
options.selector = options.selector ? Mongo.Collection._rewriteSelector(options.selector) : options.doc || {};
const _id = options.selector._id;
const _idIsId = !!_id && typeof _id === 'string';
const collectionName = collection._name;
const config = RPS.config[collectionName] || {};
const channels = !options.noPublish && (options.channels || config.channels || collectionName);
let idMap = [];
let docs = [];
function publish (doc, id) {
let channelsForDoc;
if (_.isFunction(channels)) {
channelsForDoc = channels(doc, options.selector, options.fields);
} else {
channelsForDoc = channels;
}
if (!channelsForDoc) return;
const message = {
_serverId: RPS._serverId,
doc: method !== 'remove' && doc,
method: method,
selector: options.selector,
modifier: options.redModifier || options.modifier,
withoutMongo: options.withoutMongo,
id: id || (doc && doc._id),
ts: Date.now()
};
const messageString = JSON.stringify(message);
_.each(_.isArray(channelsForDoc) ? channelsForDoc : [channelsForDoc], function (channel) {
if (!channel) return;
RPS._messenger.onMessage(channel, message);
RPS._pub(channel, messageString);
});
}
function afterWrite (res) {
if (!channels) return res;
if (options.withoutMongo) {
const id = _idIsId ? _id : (method === 'insert' || method === 'upsert') && Random.id();
publish(null, id);
} else if (method === 'remove') {
docs.forEach(function (doc) {
publish(doc);
});
} else {
if (idMap.length) {
docs = collection.find({_id: {$in: idMap}});
} else if (method === 'upsert' && res.insertedId) {
docs = collection.find({_id: res.insertedId});
} else if (method === 'insert') {
const doc = options.selector;
docs = [doc];
idMap = [doc._id = doc._id || res];
}
docs && docs.forEach(function (doc) {
publish(doc);
});
}
return res;
}
if (options.noWrite) {
publish(options.doc);
} else {
if (channels && method !== 'insert' && !options.withoutMongo) {
const findOptions = {};
if (method !== 'remove') {
if (_idIsId) {
idMap.push(_id);
} else {
findOptions.fields = {_id: 1};
if (!options.options || !options.options.multi) {
findOptions.limit = 1;
}
}
}
if (!idMap.length) {
collection.find(options.selector, findOptions).forEach(function (doc) {
idMap.push(doc._id);
docs.push(doc);
});
}
}
const callback = _.last(_.toArray(arguments));
const async = _.isFunction(callback);
if (async && !options.withoutMongo) {
return RPS._write(collection, method, options, function (err, res) {
if (!err) {
afterWrite(res);
}
callback(err, res);
});
} else {
const res = !options.withoutMongo && RPS._write(collection, method, options);
return afterWrite(res);
}
}
};