diff --git a/package-lock.json b/package-lock.json index 69ea9528f..e630521d1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { - "name": "yjs", - "version": "13.5.41", + "name": "@datacamp/yjs", + "version": "13.5.41-datacamp.0", "lockfileVersion": 2, "requires": true, "packages": { "": { - "name": "yjs", - "version": "13.5.41", + "name": "@datacamp/yjs", + "version": "13.5.41-datacamp.0", "license": "MIT", "dependencies": { "lib0": "^0.2.49" diff --git a/package.json b/package.json index 83a3db73f..e4284878e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { - "name": "yjs", - "version": "13.5.41", + "name": "@datacamp/yjs", + "version": "13.5.41-datacamp.0", "description": "Shared Editing Library", "main": "./dist/yjs.cjs", "module": "./dist/yjs.mjs", diff --git a/src/index.js b/src/index.js index 6624f8cb8..b9f5874f0 100644 --- a/src/index.js +++ b/src/index.js @@ -57,6 +57,7 @@ export { readUpdate, readUpdateV2, encodeStateAsUpdate, + encodeStateAsStreamOfUpdates, encodeStateAsUpdateV2, encodeStateVector, UndoManager, diff --git a/src/utils/encoding.js b/src/utils/encoding.js index 29af8bc63..70b32c8eb 100644 --- a/src/utils/encoding.js +++ b/src/utils/encoding.js @@ -46,39 +46,64 @@ import * as binary from 'lib0/binary' import * as map from 'lib0/map' import * as math from 'lib0/math' +/** + * @param {Array} structs All structs by `client` + * @param {number} minClock write structs starting with `ID(client,minClock)` + * @param {number | null} maxClock write structs with clock < maxClock for client + * + * @function + */ +const getStructsToWrite = (structs, minClock, maxClock = null) => { + minClock = math.max(minClock, structs[0].id.clock) // make sure the first id exists + const startNewStructs = findIndexSS(structs, minClock) + const lastStruct = structs[structs.length - 1] + if (maxClock == null || maxClock > lastStruct.id.clock) { + return structs.slice(startNewStructs) + } + let endNewStructs = findIndexSS(structs, maxClock) + if (maxClock > structs[endNewStructs].id.clock) { + // We write the last fully, so we don't split it until maxClock + // Therefore we need to also include the last one + endNewStructs += 1 + } + return structs.slice(startNewStructs, endNewStructs) +} + /** * @param {UpdateEncoderV1 | UpdateEncoderV2} encoder * @param {Array} structs All structs by `client` * @param {number} client * @param {number} clock write structs starting with `ID(client,clock)` + * @param {number | null} maxClock write structs with clock < maxClock `ID(client,clock)` + * @returns {number} the last clock written * * @function */ -const writeStructs = (encoder, structs, client, clock) => { - // write first id +const writeStructs = (encoder, structs, client, clock, maxClock = null) => { clock = math.max(clock, structs[0].id.clock) // make sure the first id exists - const startNewStructs = findIndexSS(structs, clock) + const newStructs = getStructsToWrite(structs, clock, maxClock) // write # encoded structs - encoding.writeVarUint(encoder.restEncoder, structs.length - startNewStructs) + encoding.writeVarUint(encoder.restEncoder, newStructs.length) encoder.writeClient(client) encoding.writeVarUint(encoder.restEncoder, clock) - const firstStruct = structs[startNewStructs] + const firstStruct = newStructs[0] // write first struct with an offset firstStruct.write(encoder, clock - firstStruct.id.clock) - for (let i = startNewStructs + 1; i < structs.length; i++) { - structs[i].write(encoder, 0) + for (let i = 1; i < newStructs.length; i++) { + newStructs[i].write(encoder, 0) } + const lastStruct = newStructs[newStructs.length - 1] + return lastStruct == null ? clock : lastStruct.id.clock + lastStruct.length } /** - * @param {UpdateEncoderV1 | UpdateEncoderV2} encoder * @param {StructStore} store * @param {Map} _sm * * @private * @function */ -export const writeClientsStructs = (encoder, store, _sm) => { +export const getStatesToWrite = (store, _sm) => { // we filter all valid _sm entries into sm const sm = new Map() _sm.forEach((clock, client) => { @@ -92,6 +117,19 @@ export const writeClientsStructs = (encoder, store, _sm) => { sm.set(client, 0) } }) + return sm +} + +/** + * @param {UpdateEncoderV1 | UpdateEncoderV2} encoder + * @param {StructStore} store + * @param {Map} _sm + * + * @private + * @function + */ +export const writeClientsStructs = (encoder, store, _sm) => { + const sm = getStatesToWrite(store, _sm) // write # states that were updated encoding.writeVarUint(encoder.restEncoder, sm.size) // Write items with higher client ids first @@ -508,7 +546,84 @@ export const writeStateAsUpdate = (encoder, doc, targetStateVector = new Map()) } /** - * Write all the document as a single update message that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will + * @param {Array<[number, number]>} clientClocks + * @return {Array<[number, number]>} + * + * @function + */ +const sortClientsLargestToSmallest = (clientClocks) => { + return clientClocks.sort((a, z) => z[0] - a[0]) +} + +/** + * Write the whole document as a stream of update messages. If you specify the state of the remote client (`targetStateVector`) it will only write the operations that are missing. + * + * @param {() => UpdateEncoderV1 | UpdateEncoderV2} getEncoder + * @param {Doc} doc + * @param {object} options + * @param {(client: number, clock: number, maxClock: number) => Iterable | Generator} [options.clockSplits] For this client, where would you like the updates to be splitted. If a clockSplit is in the middle of an item, it might return the full item and so this function doesn't split exactly on the given clockSplits. Use the injected clock to know where the last split happened + * @param {(clientClocks: Array<[number, number]>) => Array<[number, number]>} [options.sortClients] How to sort the clients. In general, it's better to sort with higher client ids first as this heavily improves the conflict algorithm. This is also the default implementation. + * @param {Map} [targetStateVector] The state of the target that receives the update. Leave empty to write all known structs + * @return {Iterable} + * + * @generator + */ +export const writeStateAsStreamOfUpdates = function * (getEncoder, doc, options, targetStateVector = new Map()) { + const deleteEncoder = getEncoder() + // no updates / structs to write + encoding.writeVarUint(deleteEncoder.restEncoder, 0) + writeDeleteSet(deleteEncoder, createDeleteSetFromStructStore(doc.store)) + yield deleteEncoder + + const sm = getStatesToWrite(doc.store, targetStateVector) + const sortClients = options.sortClients ?? sortClientsLargestToSmallest + for (let [client, clock] of sortClients(Array.from(sm.entries()))) { + const lastClockClient = getState(doc.store, client) + /** @type {Array | undefined} */ + const structs = doc.store.clients.get(client) + if (structs == null) { + continue + } + + if (options.clockSplits != null) { + const iterator = options.clockSplits(client, clock, lastClockClient)[Symbol.iterator]() + while (true) { + // @ts-expect-error clock is number and iterator expects no argument... + const clockSplit = iterator.next(clock) + if (clockSplit.done || clockSplit.value >= lastClockClient) { + break + } + if (clockSplit.value <= clock) { + continue + } + + const encoder = getEncoder() + // 1 client has structs to write + encoding.writeVarUint(encoder.restEncoder, 1) + clock = writeStructs(encoder, structs, client, clock, clockSplit.value) + + // no deletes to write + encoding.writeVarUint(encoder.restEncoder, 0) + + yield encoder + } + } + if (clock < lastClockClient) { + const encoder = getEncoder() + // 1 client has structs to write + encoding.writeVarUint(encoder.restEncoder, 1) + clock = writeStructs(encoder, structs, client, clock) + + // no deletes to write + encoding.writeVarUint(encoder.restEncoder, 0) + + yield encoder + } + } +} + +/** + * Write the document as a single update message that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will * only write the operations that are missing. * * Use `writeStateAsUpdate` instead if you are working with lib0/encoding.js#Encoder @@ -555,6 +670,61 @@ export const encodeStateAsUpdateV2 = (doc, encodedTargetStateVector = new Uint8A */ export const encodeStateAsUpdate = (doc, encodedTargetStateVector) => encodeStateAsUpdateV2(doc, encodedTargetStateVector, new UpdateEncoderV1()) +/** + * Write the whole document as a stream of update messages that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will only write the operations that are missing. + * + * Use `writeStateAsStreamOfUpdates` instead if you are working with lib0/encoding.js#Encoder + * + * @param {Doc} doc + * @param {object} options + * @param {(client: number, clock: number, maxClock: number) => Iterable | Generator} [options.clockSplits] + * @param {(clientClocks: Array<[number, number]>) => Array<[number, number]>} [options.sortClients] + * @param {Uint8Array} [encodedTargetStateVector] The state of the target that receives the update. Leave empty to write all known structs + * @param {() => UpdateEncoderV1 | UpdateEncoderV2} [getEncoder] + * @return {Iterable} + * + * @generator + */ +export const encodeStateAsStreamOfUpdatesV2 = function * (doc, options, encodedTargetStateVector = new Uint8Array([0]), getEncoder = () => new UpdateEncoderV2()) { + const targetStateVector = decodeStateVector(encodedTargetStateVector) + for (const encoder of writeStateAsStreamOfUpdates(getEncoder, doc, options, targetStateVector)) { + yield encoder.toUint8Array() + } + + const updates = [] + // also add the pending updates (if there are any) + if (doc.store.pendingDs) { + updates.push(doc.store.pendingDs) + } + if (doc.store.pendingStructs) { + updates.push(diffUpdateV2(doc.store.pendingStructs.update, encodedTargetStateVector)) + } + if (updates.length > 0) { + const encoder = getEncoder() + if (encoder.constructor === UpdateEncoderV1) { + yield mergeUpdates(updates.map((update, i) => i === 0 ? update : convertUpdateFormatV2ToV1(update))) + } else if (encoder.constructor === UpdateEncoderV2) { + yield mergeUpdatesV2(updates) + } + } +} + +/** + * Write the whole document as a stream of update messages that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will only write the operations that are missing. + * + * Use `writeStateAsStreamOfUpdates` instead if you are working with lib0/encoding.js#Encoder + * + * @param {Doc} doc + * @param {object} options + * @param {(client: number, clock: number, maxClock: number) => Iterable | Generator} [options.clockSplits] + * @param {(clientClocks: Array<[number, number]>) => Array<[number, number]>} [options.sortClients] + * @param {Uint8Array} [encodedTargetStateVector] The state of the target that receives the update. Leave empty to write all known structs + * @return {Iterable} + * + * @function + */ +export const encodeStateAsStreamOfUpdates = (doc, options, encodedTargetStateVector) => encodeStateAsStreamOfUpdatesV2(doc, options, encodedTargetStateVector, () => new UpdateEncoderV1()) + /** * Read state vector from Decoder and return as Map * diff --git a/tests/encoding.tests.js b/tests/encoding.tests.js index 0f6db1ade..86bae8205 100644 --- a/tests/encoding.tests.js +++ b/tests/encoding.tests.js @@ -106,3 +106,362 @@ export const testDiffStateVectorOfUpdateIgnoresSkips = tc => { t.assert(state.get(ydoc.clientID) === 1) t.assert(state.size === 1) } + +/** @function + * @param {number} x + */ +const splitClocksBy = (x) => { + /** + * @param {number} _client + * @param {number} clock + * @param {number} maxClock + */ + return function * (_client, clock, maxClock) { + while (clock < maxClock) { + clock = Math.min(clock + x, maxClock) + clock = yield clock + } + } +} + +/** + * @param {t.TestCase} tc + */ +export const testEncodeStateAsUpdatesWithOneClient = tc => { + const yDoc = new Y.Doc() + const yText = yDoc.getText('textBlock') + yText.applyDelta([{ insert: 'r' }]) + yText.applyDelta([{ insert: 'o' }]) + yText.applyDelta([{ insert: 'n' }]) + yText.applyDelta([{ insert: 'e' }]) + yText.applyDelta([{ insert: 'n' }]) + + const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, { clockSplits: splitClocksBy(1) })) + + const yDocToAssert = new Y.Doc() + updates.forEach((update) => { + Y.applyUpdate(yDocToAssert, update) + }) + t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nenor') + + // yDoc did 5 updates + // 1 (empty) delete set + t.compare(6, updates.length) +} + +/** + * @param {t.TestCase} tc + */ +export const testEncodeStateAsUpdatesWithTwoClients = tc => { + // Arrange + const yDoc = new Y.Doc() + const yText = yDoc.getText('textBlock') + yText.applyDelta([{ insert: 'r' }]) + yText.applyDelta([{ insert: 'o' }]) + yText.applyDelta([{ insert: 'n' }]) + + const remoteDoc = new Y.Doc() + Y.applyUpdate(remoteDoc, Y.encodeStateAsUpdate(yDoc)) + + remoteDoc.getText('textBlock').applyDelta([{ insert: 'e' }]) + + Y.applyUpdate(yDoc, Y.encodeStateAsUpdate(remoteDoc)) + yText.applyDelta([{ insert: 'n' }]) + + // Act + const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, { clockSplits: splitClocksBy(1) })) + + // Assert + const yDocToAssert = new Y.Doc() + updates.forEach((update) => { + Y.applyUpdate(yDocToAssert, update) + }) + t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nenor') + + // yDoc did 3+1=4 updates + // remoteDoc did 1 update + // 1 (empty) delete set + t.compare(6, updates.length) +} + +/** + * @param {t.TestCase} tc + */ +export const testEncodeStateAsUpdatesWithItemsOfLength2 = tc => { + // Arrange + const yDoc = new Y.Doc() + const yText = yDoc.getText('textBlock') + yText.applyDelta([{ insert: 'or' }]) + yText.applyDelta([{ insert: 'n' }]) + yText.applyDelta([{ insert: 'ne' }]) + + // Act + const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, { clockSplits: splitClocksBy(1) })) + + // Assert + // yDoc did 3 updates (ne will keep together, even if we use clockSplit of 1) + // 1 (empty) delete set + t.compare(3 + 1, updates.length) + + const yDocToAssert = new Y.Doc() + + Y.applyUpdate(yDocToAssert, updates[0]) // delete set + t.compareStrings(yDocToAssert.getText('textBlock').toString(), '') + t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), 0) + + Y.applyUpdate(yDocToAssert, updates[1]) // delete set + t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'or') + t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), 2) + + Y.applyUpdate(yDocToAssert, updates[2]) // delete set + t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nor') + t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), 3) + + Y.applyUpdate(yDocToAssert, updates[3]) // delete set + t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nenor') + t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), 5) +} + +/** + * @param {t.TestCase} tc + */ +export const testEncodeStateAsUpdatesWithBadClockSplits = tc => { + const yDoc = new Y.Doc() + const yText = yDoc.getText('textBlock') + yText.applyDelta([{ insert: 'r' }]) + yText.applyDelta([{ insert: 'o' }]) + yText.applyDelta([{ insert: 'n' }]) + yText.applyDelta([{ insert: 'e' }]) + yText.applyDelta([{ insert: 'n' }]) + + const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, { + clockSplits: function * (_client, clock, maxClock) { + clock = yield clock - 1 + clock = yield clock + 1 // first message + clock = yield clock + clock = yield clock + clock = yield clock + clock = yield clock + 1 // second message + clock = yield maxClock + 100 // last message + } + })) + + const yDocToAssert = new Y.Doc() + + // Delete set message + Y.applyUpdate(yDocToAssert, updates[0]) + t.compareStrings(yDocToAssert.getText('textBlock').toString(), '') + + // first message + Y.applyUpdate(yDocToAssert, updates[1]) + t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'r') + + // second message + Y.applyUpdate(yDocToAssert, updates[2]) + t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'or') + + Y.applyUpdate(yDocToAssert, updates[3]) + t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nenor') + + t.compare(4, updates.length) +} + +/** + * @param {t.TestCase} tc + */ +export const testEncodeStateAsUpdatesShouldRespectClockSplits = tc => { + // Arrange + const yDoc = new Y.Doc() + /** + * @type {Array} + */ + const clockSplits = [] + /** + * @type {Array} + */ + const expectedUpdates = [] + yDoc.on('update', (update) => { + clockSplits.push(Y.getState(yDoc.store, yDoc.clientID)) + expectedUpdates.push(update) + }) + const cells = yDoc.getArray('cells') + + const cell0 = new Y.Map() + cell0.set('id', new Y.Text('zero')) + cell0.set('source', new Y.Text('# Hello World')) + cells.push([cell0]) + + const cell1 = new Y.Map() + cell1.set('id', new Y.Text('one')) + cell1.set('source', new Y.Text('import pandas as pd')) + cells.push([cell1]) + + yDoc.transact(() => { + yDoc.getMap('meta').set('language', 'python') + yDoc.getMap('state').set('version', 3) + }) + + // Act + const streamOfUpdates = Y.encodeStateAsStreamOfUpdates(yDoc, { + clockSplits: () => clockSplits + }) + + // Assert + const yDocToAssert = new Y.Doc() + let i = -1 + for (const update of streamOfUpdates) { + Y.applyUpdate(yDocToAssert, update) + if (i >= 0) { // i == -1 is the delete set message + t.compare(update, expectedUpdates[i], 'updates match') + t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), clockSplits[i], 'correct clock afterwards') + } + i++ + } + t.compare(yDocToAssert.getArray('cells').toJSON(), [ + { id: 'zero', source: '# Hello World' }, + { id: 'one', source: 'import pandas as pd' } + + ]) + t.compare(yDocToAssert.getMap('meta').toJSON(), { language: 'python' }) + t.compare(yDocToAssert.getMap('state').toJSON(), { version: 3 }) +} + +/** + * @param {t.TestCase} tc + */ +export const testEncodeStateAsUpdatesWithMaps = tc => { + // Arrange + const yDoc = new Y.Doc() + const yMap = yDoc.getMap('myMap') + yMap.set('foo', 'foo1') + yMap.set('bar', 'bar1') + yMap.set('quux', 'quux1') + + yMap.set('bar', 'bar2') + + const expectedMap = { + foo: 'foo1', + bar: 'bar2', + quux: 'quux1' + } + + // Act + const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, { + clockSplits: splitClocksBy(2) + })) + + // Assert + t.compare(3, updates.length) + + const yDocToAssert = new Y.Doc() + + // Delete set message + Y.applyUpdate(yDocToAssert, updates[0]) + t.compare(10, updates[0].length) // There is a delete set! + t.compare(0, updates[0][0]) // No updates by clients + t.compareObjects(yDocToAssert.getMap('myMap').toJSON(), {}, 'after update 1') + + // First 2 updates + Y.applyUpdate(yDocToAssert, updates[1]) + // bar is not here because the item is in the delete set + t.compareObjects(yDocToAssert.getMap('myMap').toJSON(), { foo: 'foo1' }, 'after update 2') + + // Last 2 updates + Y.applyUpdate(yDocToAssert, updates[2]) + t.compareObjects(yDocToAssert.getMap('myMap').toJSON(), { foo: 'foo1', bar: 'bar2', quux: 'quux1' }, 'after update 3') + + t.compareObjects(yDocToAssert.getMap('myMap').toJSON(), expectedMap) +} + +/** + * @param {t.TestCase} tc + */ +export const testEncodeStateAsUpdatesWithDifferentSortingAndEditsByClients = tc => { + // Arrange + const yNotebook = new Y.Doc() + + /** + * @type {Array} + */ + const clockSplits = [] + yNotebook.on('update', (update) => { + clockSplits.push(Y.getState(yNotebook.store, yNotebook.clientID)) + }) + const cells = yNotebook.getArray('cells') + + const cell0 = new Y.Map() + cell0.set('id', new Y.Text('zero')) + cell0.set('source', new Y.Text('# Hello World')) + cells.push([cell0]) + + const cell1 = new Y.Map() + cell1.set('id', new Y.Text('one')) + cell1.set('source', new Y.Text('import pandas as pd')) + cells.push([cell1]) + + const cell2 = new Y.Map() + cell2.set('id', new Y.Text('two')) + cell2.set('source', new Y.Text('# Conclusion')) + cells.push([cell2]) + + yNotebook.transact(() => { + yNotebook.getMap('meta').set('language', 'python') + yNotebook.getMap('state').set('version', 3) + }) + + const clientDoc = new Y.Doc() + Y.applyUpdate(clientDoc, Y.encodeStateAsUpdate(yNotebook)) + const source = clientDoc.getArray('cells').get(1).get('source') + source.insert(source.length, '\nimport random') + t.compare(source.toString(), 'import pandas as pd\nimport random', 'clientDoc should have right code') + + Y.applyUpdate(yNotebook, Y.encodeStateAsUpdate(clientDoc)) + + console.log('clockSplits', clockSplits, yNotebook.clientID) + const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yNotebook, { + clockSplits: (client) => { + if (client === yNotebook.clientID) { + return clockSplits + } + return [] + }, + sortClients: clientClocks => { + return [ + ...clientClocks.filter(([client]) => client === clientDoc.clientID), + ...clientClocks.filter(([client]) => client === yNotebook.clientID) + ] + } + })) + + const ydoc = new Y.Doc() + Y.applyUpdate(ydoc, updates[0]) // delete set + t.compare(ydoc.getArray('cells').toJSON(), []) + + Y.applyUpdate(ydoc, updates[1]) // clientDoc updates + t.compare(ydoc.getArray('cells').toJSON(), []) + + Y.applyUpdate(ydoc, updates[2]) // cell 0 initialized + t.compare(ydoc.getArray('cells').toJSON(), [{ id: 'zero', source: '# Hello World' }]) + + Y.applyUpdate(ydoc, updates[3]) // cell 1 initialized, immediately applies edits by clients + t.compare(ydoc.getArray('cells').toJSON(), [ + { id: 'zero', source: '# Hello World' }, + { id: 'one', source: 'import pandas as pd\nimport random' } + ]) + + Y.applyUpdate(ydoc, updates[4]) // cell 2 initialized + t.compare(ydoc.getArray('cells').toJSON(), [ + { id: 'zero', source: '# Hello World' }, + { id: 'one', source: 'import pandas as pd\nimport random' }, + { id: 'two', source: '# Conclusion' } + ]) + + Y.applyUpdate(ydoc, updates[5]) // metadata + t.compare(ydoc.getArray('cells').toJSON(), [ + { id: 'zero', source: '# Hello World' }, + { id: 'one', source: 'import pandas as pd\nimport random' }, + { id: 'two', source: '# Conclusion' } + ]) + + t.compare(6, updates.length) +}