Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions test/replicate.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
const test = require('brittle')
const b4a = require('b4a')
const compact = require('compact-encoding')
const DebuggingStream = require('debugging-stream')
const NoiseSecretStream = require('@hyperswarm/secret-stream')
const Protomux = require('protomux')
const {
create,
createStored,
Expand Down Expand Up @@ -757,6 +760,61 @@ test('multiplexing with createProtocolStream (ondiscoverykey is called)', async
s2.destroy()
})

test('custom bulk channel does not delay hypercore discovery', async function (t) {
const advertised = await create(t)

const n1 = new NoiseSecretStream(true)
const n2 = new NoiseSecretStream(false)

const slowIncoming = new DebuggingStream(n2.rawStream, {
writeSpeed: 512 * 1024,
readSpeed: Infinity
})

t.teardown(function () {
n1.destroy()
n2.destroy()
slowIncoming.destroy()
})

n1.rawStream.pipe(slowIncoming).pipe(n1.rawStream)

const mux1 = Protomux.from(n1)
const mux2 = Protomux.from(n2)

const app1 = mux1.createChannel({ protocol: 'app/bulk' })
const app2 = mux2.createChannel({ protocol: 'app/bulk' })
const bulk = app1.addMessage({ encoding: compact.buffer })
app2.addMessage({ encoding: compact.buffer })

app1.open()
app2.open()

await Promise.all([app1.fullyOpened(), app2.fullyOpened()])

// Model a user-defined high-throughput app channel sharing the same
// Hyperswarm connection as Hypercore. It writes 64 * 64 KiB = 4 MiB at
// 512 KiB/s before Hypercore opens its replication channel, so the
// receiver can take ~8s to discover the core behind those app bytes.
bulk.send(Buffer.alloc(64 * 64 * 1024))

const discovery = new Promise((resolve) => {
Hypercore.createProtocolStream(n2, {
ondiscoverykey(discoveryKey) {
if (b4a.equals(discoveryKey, advertised.discoveryKey)) resolve('discovered')
}
})
})

const stream = Hypercore.createProtocolStream(n1)
const replication = advertised.replicate(stream, { keepAlive: false })

t.teardown(() => replication.destroy())

const result = await Promise.race([discovery, sleep(5000).then(() => 'timeout')])
t.is(result, 'discovered', 'hypercore discovery should not wait behind app bulk data')
})

test('seeking while replicating', async function (t) {
const a = await create(t)
const b = await create(t, a.key)
Expand Down Expand Up @@ -2958,4 +3016,8 @@ async function waitForRequestBlock(core) {
}
}

function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}

function noop() {}
Loading