diff --git a/test/replicate.js b/test/replicate.js index b2aaa077..d52f2042 100644 --- a/test/replicate.js +++ b/test/replicate.js @@ -1,6 +1,9 @@ const test = require('brittle') const b4a = require('b4a') +const compact = require('compact-encoding') +const DebuggingStream = require('debugging-stream') const NoiseSecretStream = require('@hyperswarm/secret-stream') +const Protomux = require('protomux') const { create, createStored, @@ -757,6 +760,61 @@ test('multiplexing with createProtocolStream (ondiscoverykey is called)', async s2.destroy() }) +test('custom bulk channel does not delay hypercore discovery', async function (t) { + const advertised = await create(t) + + const n1 = new NoiseSecretStream(true) + const n2 = new NoiseSecretStream(false) + + const slowIncoming = new DebuggingStream(n2.rawStream, { + writeSpeed: 512 * 1024, + readSpeed: Infinity + }) + + t.teardown(function () { + n1.destroy() + n2.destroy() + slowIncoming.destroy() + }) + + n1.rawStream.pipe(slowIncoming).pipe(n1.rawStream) + + const mux1 = Protomux.from(n1) + const mux2 = Protomux.from(n2) + + const app1 = mux1.createChannel({ protocol: 'app/bulk' }) + const app2 = mux2.createChannel({ protocol: 'app/bulk' }) + const bulk = app1.addMessage({ encoding: compact.buffer }) + app2.addMessage({ encoding: compact.buffer }) + + app1.open() + app2.open() + + await Promise.all([app1.fullyOpened(), app2.fullyOpened()]) + + // Model a user-defined high-throughput app channel sharing the same + // Hyperswarm connection as Hypercore. It writes 64 * 64 KiB = 4 MiB at + // 512 KiB/s before Hypercore opens its replication channel, so the + // receiver can take ~8s to discover the core behind those app bytes. + bulk.send(Buffer.alloc(64 * 64 * 1024)) + + const discovery = new Promise((resolve) => { + Hypercore.createProtocolStream(n2, { + ondiscoverykey(discoveryKey) { + if (b4a.equals(discoveryKey, advertised.discoveryKey)) resolve('discovered') + } + }) + }) + + const stream = Hypercore.createProtocolStream(n1) + const replication = advertised.replicate(stream, { keepAlive: false }) + + t.teardown(() => replication.destroy()) + + const result = await Promise.race([discovery, sleep(5000).then(() => 'timeout')]) + t.is(result, 'discovered', 'hypercore discovery should not wait behind app bulk data') +}) + test('seeking while replicating', async function (t) { const a = await create(t) const b = await create(t, a.key) @@ -2958,4 +3016,8 @@ async function waitForRequestBlock(core) { } } +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + function noop() {}