Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ module.exports = class Hypercore extends EventEmitter {

this.replicator = new Replicator(this.core, this.key, {
eagerUpgrade: true,
defaultUploading: opts.defaultUploading,
notDownloadingLinger: opts.notDownloadingLinger,
allowFork: opts.allowFork !== false,
inflightRange: opts.inflightRange,
Expand Down
25 changes: 24 additions & 1 deletion lib/replicator.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,17 @@ class Peer {
this.lastExtensionSent = ''
this.lastExtensionRecv = ''

this.uploading = this.replicator.defaultUploading

replicator._ifAvailable++
}

setUploading (uploading) {
if (uploading === this.uploading) return
this.uploading = uploading
this.sendSync()
}

get remoteContiguousLength () {
return this.remoteBitfield.findFirst(false, this._remoteContiguousLength)
}
Expand Down Expand Up @@ -418,7 +426,7 @@ class Peer {
length: this.core.tree.length,
remoteLength: this.core.tree.fork === this.remoteFork ? this.remoteLength : 0,
canUpgrade: this.canUpgrade,
uploading: true,
uploading: this.uploading,
downloading: this.replicator.isDownloading(),
hasManifest: !!this.core.header.manifest && this.core.compat === false
})
Expand Down Expand Up @@ -646,6 +654,12 @@ class Peer {
return
}

// Respond to manifest requests even if not uploading
if (!this.uploading && proof.block) {
this.wireNoData.send({ request: msg.id })
return
}

if (proof.block !== null) {
this.replicator.onupload(proof.block.index, proof.block.value, this)
}
Expand Down Expand Up @@ -772,6 +786,7 @@ class Peer {
}

onwant ({ start, length }) {
if (!this.uploading) return
this.replicator._onwant(this, start, length)
}

Expand Down Expand Up @@ -1042,6 +1057,9 @@ class Peer {
return false
}

// Send maybeWant before checking this?
if (!this.remoteUploading) return false

if (!this._hasTreeParent(b.index)) {
return false
}
Expand All @@ -1058,6 +1076,7 @@ class Peer {
}

_requestRangeBlock (index, length) {
if (!this.remoteUploading) return false
if (this.core.bitfield.get(index) === true || !this._hasTreeParent(index)) return false

const b = this.replicator._blocks.add(index, PRIORITY.NORMAL)
Expand Down Expand Up @@ -1093,6 +1112,7 @@ class Peer {
}

_requestRange (r) {
if (!this.remoteUploading) return false
const { length, fork } = this.core.tree

if (r.blocks) {
Expand Down Expand Up @@ -1152,6 +1172,7 @@ class Peer {
}

_requestForkRange (f) {
if (!this.remoteUploading) return false
if (f.fork !== this.remoteFork || f.batch.want === null) return false

const end = Math.min(f.batch.want.end, this.remoteLength)
Expand Down Expand Up @@ -1243,6 +1264,7 @@ module.exports = class Replicator {

constructor (core, key, {
notDownloadingLinger = NOT_DOWNLOADING_SLACK,
defaultUploading = true,
eagerUpgrade = true,
allowFork = true,
inflightRange = null,
Expand All @@ -1264,6 +1286,7 @@ module.exports = class Replicator {
this.findingPeers = 0 // updateable from the outside
this.destroyed = false
this.downloading = false
this.defaultUploading = defaultUploading // should peers default to uploading
this.activeSessions = 0

this.inflightRange = inflightRange || DEFAULT_MAX_INFLIGHT
Expand Down
67 changes: 67 additions & 0 deletions test/replicate.js
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,73 @@ test.skip('can disable downloading from a peer', async function (t) {
t.is(cUploads, a.length)
})

test('can disable uploading to a peer', async function (t) {
const a = await create({ defaultUploading: false })

await a.append(['a', 'b', 'c', 'd', 'e'])

const b = await create(a.key, { valueEncoding: 'utf-8' })
const c = await create(a.key, { valueEncoding: 'utf-8' })

const [bStream] = replicate(a, b, t)
const [cStream] = replicate(a, c, t)
replicate(b, c, t)

await new Promise(resolve => a.on('peer-add', function onAdd () {
if (a.peers.length < 2) return
a.off('peer-add', onAdd)
resolve()
}))

let aUploadsToB = 0
let aUploadsToC = 0

a.on('upload', function (index, block, peer) {
if (peer.remotePublicKey.equals(bPeer.remotePublicKey)) {
aUploadsToB++
} else {
aUploadsToC++
}
})

await Promise.all([
b.update({ wait: true }),
c.update({ wait: true })
])

t.is(a.length, b.length)
t.is(a.length, c.length)

const bPeer = a.peers[0].stream.rawStream === bStream
? a.peers[0]
: a.peers[1]
const cPeer = a.peers[0].stream.rawStream === cStream
? a.peers[0]
: a.peers[1]

const cRange = c.download({ start: 0, end: a.length })
const bRange = b.download({ start: 0, end: a.length })

await new Promise(resolve => setTimeout(resolve, 200))

t.is(b.contiguousLength, 0)
t.is(c.contiguousLength, 0)
t.is(aUploadsToB, 0)
t.is(aUploadsToC, 0)

cPeer.setUploading(true)

await Promise.all([
bRange.done(),
cRange.done()
])

t.is(b.contiguousLength, 5)
t.is(c.contiguousLength, 5)
t.is(aUploadsToB, 0)
t.is(aUploadsToC, 5)
})

test('contiguous length', async function (t) {
const a = await create()

Expand Down