|
147 | 147 | resumed_change.each { |key| expect(resumed_change[key]).to eq(next_next_change[key]) } |
148 | 148 | end |
149 | 149 | end |
| 150 | + |
| 151 | + context 'example 4 - using a pipeline to filter changes' do |
| 152 | + |
| 153 | + it 'returns the filtered changes' do |
| 154 | + |
| 155 | + ops_thread = Thread.new do |
| 156 | + sleep 2 |
| 157 | + inventory.insert_one(username: 'wallace') |
| 158 | + inventory.insert_one(username: 'alice') |
| 159 | + inventory.delete_one(username: 'wallace') |
| 160 | + end |
| 161 | + |
| 162 | + stream_thread = Thread.new do |
| 163 | + |
| 164 | + # Start Changestream Example 4 |
| 165 | + |
| 166 | + pipeline = [ {'$match' => { '$or' => [{ 'fullDocument.username' => 'alice' }, |
| 167 | + { 'operationType' => 'delete' }] } }] |
| 168 | + cursor = inventory.watch(pipeline).to_enum |
| 169 | + cursor.next |
| 170 | + |
| 171 | + # End Changestream Example 4 |
| 172 | + end |
| 173 | + |
| 174 | + ops_thread.value |
| 175 | + change = stream_thread.value |
| 176 | + |
| 177 | + expect(change['_id']).not_to be_nil |
| 178 | + expect(change['_id']['_data']).not_to be_nil |
| 179 | + expect(change['operationType']).to eq('insert') |
| 180 | + expect(change['fullDocument']).not_to be_nil |
| 181 | + expect(change['fullDocument']['_id']).not_to be_nil |
| 182 | + expect(change['fullDocument']['username']).to eq('alice') |
| 183 | + expect(change['ns']).not_to be_nil |
| 184 | + expect(change['ns']['db']).to eq(TEST_DB) |
| 185 | + expect(change['ns']['coll']).to eq(inventory.name) |
| 186 | + expect(change['documentKey']).not_to be_nil |
| 187 | + expect(change['documentKey']['_id']).to eq(change['fullDocument']['_id']) |
| 188 | + end |
| 189 | + end |
150 | 190 | end |
0 commit comments