Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions lib/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ import { check } from 'meteor/check';
import { Cache } from './cache';
import { config } from './config';
import { actions } from './mongo';
import { getCursor, formatId, convertFilter, buildProjection, removeValue, overrideLowLevelPublishAPI, mergeDocIntoFetchResult } from './utils/server';
import {
getCursor,
formatId,
convertFilter,
convertObjectId,
buildProjection,
removeValue,
overrideLowLevelPublishAPI,
mergeDocIntoFetchResult,
} from './utils/server';
import { isEmpty, createKey } from './utils/shared';

let onces = [];
Expand Down Expand Up @@ -127,7 +136,7 @@ function publishOnce(name, handler) {
// change streams
const formatDoc = streamDoc => {
const _id = formatId(streamDoc.documentKey._id);
const doc = streamDoc.fullDocument || {};
const doc = convertObjectId(streamDoc.fullDocument) || {};
doc._id = _id;

if (streamDoc.operationType !== 'update') {
Expand Down
4 changes: 3 additions & 1 deletion lib/utils/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ export const convertFilter = filter => {
if (key === '$or' || key === '$and' || key === '$nor') {
result[key] = convertFilter(filter[key]);
} else {
result[`fullDocument.${key}`] = filter[key];
result[`fullDocument.${key}`] = filter[key]._str
? new ObjectId(filter[key]._str)
: filter[key];
}
}
return result;
Expand Down
59 changes: 57 additions & 2 deletions tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const Notes = new Mongo.Collection('notes');
const Items = new Mongo.Collection('items');
const Books = new Mongo.Collection('books');
const Dogs = new Mongo.Collection('dogs');
const Cats = new Mongo.Collection('cats', {
idGeneration: 'MONGO' // Mongo.ObjectID
});
const Markers = new Mongo.Collection('markers', {
idGeneration: 'MONGO' // Mongo.ObjectID
});
Expand Down Expand Up @@ -73,6 +76,17 @@ const resetDogs = async () => {
return;
}

const resetCats = async () => {
await Cats.removeAsync({});
await Cats.insertAsync({ name: 'fluffy', something: new Mongo.ObjectID('123456789012345678901234') });
await Cats.insertAsync({ name: 'Phantom', something: new Mongo.ObjectID('123456789012345678901234') });
await Cats.insertAsync({
name: 'Mittens',
something: new Mongo.ObjectID('012341234567890123456789'),
});
return;
}

const insertThing = async ({ text, num }) => {
return Things.insertAsync({ text, num });
}
Expand Down Expand Up @@ -149,6 +163,10 @@ const insertDog = async ({ text }) => {
return Dogs.insertAsync({ text, ...(Meteor.isServer && { something: 1 }) });
}

const insertCat = async ({ name, id }) => {
return Cats.insertAsync({ name, something: id });
}

const updateDog = async ({ _id, text }) => {
return Dogs.updateAsync({ _id }, { $set: { text, ...(Meteor.isServer && { something: 2 }) }});
}
Expand All @@ -169,6 +187,7 @@ if (Meteor.isServer) {
await resetBooks();
await resetMarkers();
await resetDogs();
await resetCats();
})

Meteor.publish('notes.all', function() {
Expand Down Expand Up @@ -219,11 +238,15 @@ if (Meteor.isServer) {
return Dogs.find({}, { fields: { text: 1 }});
});

Meteor.methods({ reset, resetNotes, resetItems, resetBooks, resetMarkers, resetDogs, updateThing, updateThingWithUnset, updateThings, updateThingsWithUnset, updateThingUpsert, updateThingUpsertMulti, upsertThing, replaceThing, removeThing, fetchThings, updateItem, fetchItems })
Meteor.publish.stream('cats.stream.something', function(filterObjectId) {
return Cats.find({something: filterObjectId}, { fields: { name: 1, something: 1 }});
});

Meteor.methods({ reset, resetNotes, resetItems, resetBooks, resetMarkers, resetDogs, resetCats, updateThing, updateThingWithUnset, updateThings, updateThingsWithUnset, updateThingUpsert, updateThingUpsertMulti, upsertThing, replaceThing, removeThing, fetchThings, updateItem, fetchItems })
}

// isomorphic methods
Meteor.methods({ insertThing, insertItem, insertBook, insertMarker, updateMarker, updateMarkers, insertDog, updateDog, replaceDog, removeDog });
Meteor.methods({ insertThing, insertItem, insertBook, insertMarker, updateMarker, updateMarkers, insertDog, updateDog, insertCat, replaceDog, removeDog });


function createConnection() {
Expand Down Expand Up @@ -1070,6 +1093,38 @@ if (Meteor.isClient) {
computation.stop();
});

Tinytest.addAsync(
'subscribe - .stream - successful with Mongo.ObjectID as a filter',
async (test) => {
await Meteor.callAsync('resetCats');

let sub;
Tracker.autorun(() => {
sub = Meteor.subscribe('cats.stream.something', new Mongo.ObjectID('123456789012345678901234'), { cacheDuration: 0.1 });
});

let cats;
const computation = Tracker.autorun(() => {
if (sub.ready()) {
cats = Cats.find().fetch();
sub.stop();
}
});



await wait(101);
console.log('cats', cats);
test.equal(cats.length, 2);

await Meteor.callAsync('insertCat', { name: 'sup', id: new Mongo.ObjectID('123456789012345678901234') });
await wait(100);
test.equal(cats.length, 3);

computation.stop();
}
);


Tinytest.addAsync('cache - regular pubsub - successful', async (test) => {
let sub;
Expand Down