diff --git a/example/main.ts b/example/main.ts index 381f3e3..bdd0255 100644 --- a/example/main.ts +++ b/example/main.ts @@ -80,7 +80,7 @@ n.on('ready', () => { lobbies.forEach(lobby => { const li = document.createElement('li') li.id = lobby.code - li.innerHTML = `${lobby.code} - ${lobby.customData?.map as string ?? 'unknown map'} - ${lobby.playerCount} players` + li.innerHTML = `${lobby.code} - ${lobby.customData?.map as string ?? 'unknown map'} - ${lobby.playerCount} players (${lobby.latency ?? ''}ms)` el.appendChild(li) if (n.currentLobby === undefined) { li.querySelector('a.code')?.addEventListener('click', () => { diff --git a/features/latency.feature b/features/latency.feature new file mode 100644 index 0000000..0601159 --- /dev/null +++ b/features/latency.feature @@ -0,0 +1,174 @@ +Feature: Latency + + Background: + Given the "signaling" backend is running + And the "testproxy" backend is running + + + Scenario: Lobby listings include the latency to the peer + Given the next peer's latency vector is set to: + """ + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10 + """ + And "green" is connected as "1u8fw4aph5ypt" and ready for game "b6f7fc97-8545-4ffd-b714-7cf339048556" + And "green" creates a lobby with these settings: + """json + { + "public": true + } + """ + And "green" receives the network event "lobby" with the argument "h5yzwyizlwao" + And the next peer's latency vector is set to: + """ + 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20 + """ + And "blue" is connected as "19yrzmetd2bn7" and ready for game "b6f7fc97-8545-4ffd-b714-7cf339048556" + + When "blue" requests lobbies with: + """json + {} + """ + + Then "blue" should have received only these lobbies: + | code | latency | + | h5yzwyizlwao | 24 | + + + Scenario: Lobby with multiple peers + Given the next peer's latency vector is set to: + """ + 99, 99, 99, 99, 10, 10, 10, 10, 10, 10, 10 + """ + And "blue" is connected as "1u8fw4aph5ypt" and ready for game "4307bd86-e1df-41b8-b9df-e22afcf084bd" + And the next peer's latency vector is set to: + """ + 10, 10, 10, 99, 99, 99, 99, 10, 10, 10, 10 + """ + And "yellow" is connected as "h5yzwyizlwao" and ready for game "4307bd86-e1df-41b8-b9df-e22afcf084bd" + And "blue,yellow" are joined in a public lobby + And the next peer's latency vector is set to: + """ + 10, 10, 10, 10, 10, 10, 99, 99, 99, 99, 10 + """ + And "green" is connected as "3t3cfgcqup9e" and ready for game "4307bd86-e1df-41b8-b9df-e22afcf084bd" + + When "green" requests lobbies with: + """json + {} + """ + + Then "green" should have received only these lobbies: + | code | latency | + | 19yrzmetd2bn7 | 89 | + + + Scenario: Sort lobbies by latency + Given the next peer's latency vector is set to: + """ + 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30 + """ + And "blue" is connected as "1u8fw4aph5ypt" and ready for game "4307bd86-e1df-41b8-b9df-e22afcf084bd" + And "blue" creates a lobby with these settings: + """json + { + "public": true, + "customData": { + "map": "de_nuke" + } + } + """ + And the next peer's latency vector is set to: + """ + 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99 + """ + And "yellow" is connected as "19yrzmetd2bn7" and ready for game "4307bd86-e1df-41b8-b9df-e22afcf084bd" + And "yellow" creates a lobby with these settings: + """json + { + "public": true, + "customData": { + "map": "de_dust" + } + } + """ + And the next peer's latency vector is set to: + """ + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10 + """ + And "green" is connected as "prb67ouj837u" and ready for game "4307bd86-e1df-41b8-b9df-e22afcf084bd" + + When "green" requests lobbies with: + | filter | {} | + | sort | { "latency": 1 } | + | limit | 1 | + + Then "green" should have received only these lobbies: + | code | latency | customData | + | h5yzwyizlwao | 34 | {"map":"de_nuke"} | + + + Scenario: Latency to your own lobby + Given the next peer's latency vector is set to: + """ + 325, 523, 64, 21, 76, 23, 54, 235, 76, 23, 142 + """ + And "green" is connected as "1u8fw4aph5ypt" and ready for game "b6f7fc97-8545-4ffd-b714-7cf339048556" + And "green" creates a lobby with these settings: + """json + { + "public": true + } + """ + And "green" receives the network event "lobby" with the argument "h5yzwyizlwao" + + When "green" requests lobbies with: + """json + {} + """ + + Then "green" should have received only these lobbies: + | code | latency | + | h5yzwyizlwao | 0 | + + + Scenario: Peers without latency vectors are not included in the estimate + Given "green" is connected as "1u8fw4aph5ypt" and ready for game "f666036d-d9e1-4d70-b0c3-4a68b24a9884" + And these lobbies exist: + | code | game | peers | public | + | 1u8fw4aph5ypt | f666036d-d9e1-4d70-b0c3-4a68b24a9884 | {"peer1"} | true | + | 0qva9vyurwbbl | f666036d-d9e1-4d70-b0c3-4a68b24a9884 | {"peer2", "peer3"} | true | + And these peers exist: + | peer | game | latency_vector | + | peer1 | f666036d-d9e1-4d70-b0c3-4a68b24a9884 | null | + | peer2 | f666036d-d9e1-4d70-b0c3-4a68b24a9884 | null | + | peer3 | f666036d-d9e1-4d70-b0c3-4a68b24a9884 | {10,10,10,10,10,10,10,10,10,10,10} | + + When "green" requests lobbies with: + | filter | {} | + | sort | { "latency": 1 } | + Then "green" should have received only these lobbies: + | code | latency | + | 0qva9vyurwbbl | 10 | + | 1u8fw4aph5ypt | undefined | + + + Scenario: Client without latency vectors gives null latency estimates + Given the next peer's latency vector is set to: + """ + null + """ + Given "green" is connected as "1u8fw4aph5ypt" and ready for game "f666036d-d9e1-4d70-b0c3-4a68b24a9884" + And these lobbies exist: + | code | game | peers | public | + | 0qva9vyurwbbl | f666036d-d9e1-4d70-b0c3-4a68b24a9884 | {"peer1", "peer2"} | true | + And these peers exist: + | peer | game | latency_vector | + | peer1 | f666036d-d9e1-4d70-b0c3-4a68b24a9884 | {10,10,10,10,10,10,10,10,10,10,10} | + + When "green" requests lobbies with: + """json + {} + """ + Then "green" should have received only these lobbies: + | code | latency | + | 0qva9vyurwbbl | undefined | diff --git a/features/support/steps/network.ts b/features/support/steps/network.ts index 287fe7d..b109410 100644 --- a/features/support/steps/network.ts +++ b/features/support/steps/network.ts @@ -19,7 +19,7 @@ Given('{string} is connected as {string} and ready for game {string}', async fun } }) -async function areJoinedInALobby (this: World, playerNamesRaw: string): Promise { +async function areJoinedInALobby (this: World, playerNamesRaw: string, publc: boolean): Promise { const playerNames = playerNamesRaw.split(',').map(s => s.trim()) if (playerNames.length < 2) { throw new Error('need at least 2 players to join a lobby') @@ -29,7 +29,9 @@ async function areJoinedInALobby (this: World, playerNamesRaw: string): Promise< throw new Error(`player ${playerNames[0]} not found`) } - void first.network.create() + void first.network.create({ + public: publc + }) const lobbyEvent = await first.waitForEvent('lobby') const lobbyCode = lobbyEvent.eventPayload[0] as string @@ -55,7 +57,13 @@ async function areJoinedInALobby (this: World, playerNamesRaw: string): Promise< } } -Given('{string} are joined in a lobby', areJoinedInALobby) +Given('{string} are joined in a lobby', async function (this: World, playerNamesRaw: string) { + await areJoinedInALobby.call(this, playerNamesRaw, false) +}) + +Given('{string} are joined in a public lobby', async function (this: World, playerNamesRaw: string) { + await areJoinedInALobby.call(this, playerNamesRaw, true) +}) Given('{string} are joined in a lobby for game {string}', async function (this: World, playerNamesRaw: string, gameID: string) { const playerNames = playerNamesRaw.split(',').map(s => s.trim()) @@ -72,7 +80,7 @@ Given('{string} are joined in a lobby for game {string}', async function (this: } } - await areJoinedInALobby.call(this, playerNamesRaw) + await areJoinedInALobby.call(this, playerNamesRaw, false) }) Given('these lobbies exist:', async function (this: World, lobbies: DataTable) { @@ -123,6 +131,41 @@ Given('these lobbies exist:', async function (this: World, lobbies: DataTable) { }) }) +Given('these peers exist:', async function (this: World, peers: DataTable) { + if (this.testproxyURL === undefined) { + throw new Error('testproxy not active') + } + + const columns: string[] = [] + const values: string[] = [] + + peers.hashes().forEach(row => { + const v: string[] = [] + + Object.keys(row).forEach(key => { + const value = row[key] + if (!columns.includes(key)) { + columns.push(key) + } + + if (value === 'null') { + v.push('NULL') + } else if (key === 'latency_vector') { + v.push(`ARRAY[${value.substring(1, value.length - 1)}]::vector(11)`) + } else { + v.push(`'${value}'`) + } + }) + + values.push(`(${v.join(', ')})`) + }) + + await fetch(`${this.testproxyURL}/sql`, { + method: 'POST', + body: 'INSERT INTO peers (' + columns.join(', ') + ') VALUES ' + values.join(', ') + }) +}) + When('{string} creates a network for game {string}', async function (this: World, playerName: string, gameID: string) { await this.createPlayer(playerName, gameID) }) @@ -307,7 +350,7 @@ Then('{string} should have received only these lobbies:', function (this: World, expectedLobbies.hashes().forEach(row => { const correctCodeLobby = player.lastReceivedLobbies.filter(lobby => lobby.code === row.code) if (correctCodeLobby.length !== 1) { - throw new Error(`expected to find one lobby with code ${row.code} but found ${correctCodeLobby.length}`) + throw new Error(`expected to find one lobby with code ${row.code} but found ${correctCodeLobby.length} in [${player.lastReceivedLobbies.map(l => l.code).join(', ')}]`) } const lobby = correctCodeLobby[0] as any Object.keys(lobby).forEach(key => { @@ -318,8 +361,14 @@ Then('{string} should have received only these lobbies:', function (this: World, }) const want = row as any Object.keys(row).forEach(key => { - if (`${lobby[key] as string}` !== `${want[key] as string}`) { - throw new Error(`expected ${key} to be ${want[key] as string} but got ${lobby[key] as string}`) + if (typeof lobby[key] === 'object') { + if (JSON.stringify(lobby[key]) !== `${want[key] as string}`) { + throw new Error(`expected ${key} to be ${want[key] as string} but got ${JSON.stringify(lobby[key])}`) + } + } else { + if (`${lobby[key] as string}` !== `${want[key] as string}`) { + throw new Error(`expected ${key} to be ${want[key] as string} but got ${lobby[key] as string}`) + } } }) }) @@ -435,3 +484,16 @@ Then('{string} failed to join the lobby', function (playerName: string) { throw new Error(`player is in lobby ${player.network.currentLobby as string}`) } }) + +When('the next peer\'s latency vector is set to:', function (latencies: string) { + if (latencies === 'null') { + this.latencyVector = null + return + } + + const lv = latencies.split(',').map(s => parseInt(s.trim(), 10)) + if (lv.length !== 11) { + throw new Error('latency vector must have 11 elements') + } + this.latencyVector = lv +}) diff --git a/features/support/steps/util.ts b/features/support/steps/util.ts deleted file mode 100644 index b39cc52..0000000 --- a/features/support/steps/util.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { When } from '@cucumber/cucumber' -import { World } from '../world' - -When('I sleep for {int} second', async function (this: World, seconds: number) { - await new Promise(resolve => setTimeout(resolve, seconds * 1000)) -}) - -let time: number = 0 -When(/I (start|stop) measuring time/, function (this: World, act: string) { - if (act === 'start') { - time = performance.now() - } else if (act === 'stop') { - const now = performance.now() - console.log('took', now - time) - } -}) diff --git a/features/support/world.ts b/features/support/world.ts index 89fb18d..37332fb 100644 --- a/features/support/world.ts +++ b/features/support/world.ts @@ -7,6 +7,7 @@ import ws from 'ws' import wrtc from '@roamhq/wrtc' import { Player } from './types' +import { PeerConfiguration } from '../../lib/types' import { Network } from '../../lib' @@ -35,6 +36,7 @@ export class World extends CucumberWorld { public testproxyURL?: string public useTestProxy: boolean = false public databaseURL?: string + public latencyVector: number[] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] public players: Map = new Map() public lastError: Map = new Map() @@ -49,7 +51,13 @@ export class World extends CucumberWorld { public async createPlayer (playerName: string, gameID: string): Promise { return await new Promise((resolve) => { - const config = this.useTestProxy ? { testproxyURL: this.testproxyURL } : undefined + const config: PeerConfiguration = {} + if (this.useTestProxy) { + config.testproxyURL = this.testproxyURL + } + config.testLatency = { + vector: this.latencyVector + } const network = new Network(gameID, config, this.signalingURL) const player = new Player(playerName, network) this.players.set(playerName, player) @@ -100,6 +108,7 @@ AfterAll(function () { Before(function (this: World) { this.scenarioRunning = true + this.latencyVector = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] }) After(function (this: World, { result }) { this.scenarioRunning = false diff --git a/go.mod b/go.mod index 0e95bf5..584e22a 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/jackc/pgx/v5 v5.7.6 github.com/koenbollen/logging v0.0.0-20230520102501-e01d64214504 github.com/ory/dockertest/v3 v3.12.0 + github.com/pgvector/pgvector-go v0.3.0 github.com/poki/mongodb-filter-to-postgres v1.0.7 github.com/rs/cors v1.11.1 github.com/rs/xid v1.6.0 @@ -44,6 +45,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/x448/float16 v0.8.4 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect diff --git a/go.sum b/go.sum index c1baa8d..f856a77 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +entgo.io/ent v0.14.3 h1:wokAV/kIlH9TeklJWGGS7AYJdVckr0DloWjIcO9iIIQ= +entgo.io/ent v0.14.3/go.mod h1:aDPE/OziPEu8+OWbzy4UlvWmD2/kbRuWfK2A40hcxJM= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= @@ -41,6 +43,10 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-pg/pg/v10 v10.11.0 h1:CMKJqLgTrfpE/aOVeLdybezR2om071Vh38OLZjsyMI0= +github.com/go-pg/pg/v10 v10.11.0/go.mod h1:4BpHRoxE61y4Onpof3x1a2SQvi9c+q1dJnrNdMjsroA= +github.com/go-pg/zerochecker v0.2.0 h1:pp7f72c3DobMWOb2ErtZsnrPaSvHd2W4o9//8HtF4mU= +github.com/go-pg/zerochecker v0.2.0/go.mod h1:NJZ4wKL0NmTtz0GKCoJ8kym6Xn/EQzXRl2OnAe7MmDo= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= @@ -49,10 +55,12 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-migrate/migrate/v4 v4.19.0 h1:RcjOnCGz3Or6HQYEJ/EEVLfWnmw9KnoigPSjzhCuaSE= github.com/golang-migrate/migrate/v4 v4.19.0/go.mod h1:9dyEcu+hO+G9hPSw8AIg50yg622pXJsoHItQnDGZkI0= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -68,6 +76,12 @@ github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= +github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/koenbollen/logging v0.0.0-20230520102501-e01d64214504 h1:4XwVIPnDZkE3EMNd5DAMedHVH+t7Ge9Lig50+EzwsD4= @@ -94,6 +108,8 @@ github.com/opencontainers/runc v1.2.3 h1:fxE7amCzfZflJO2lHXf4y/y8M1BoAqp+FVmG19o github.com/opencontainers/runc v1.2.3/go.mod h1:nSxcWUydXrsBZVYNSkTjoQ/N6rcyTtn+1SD5D4+kRIM= github.com/ory/dockertest/v3 v3.12.0 h1:3oV9d0sDzlSQfHtIaB5k6ghUCVMVLpAY8hwrqoCyRCw= github.com/ory/dockertest/v3 v3.12.0/go.mod h1:aKNDTva3cp8dwOWwb9cWuX84aH5akkxXRvO7KCwWVjE= +github.com/pgvector/pgvector-go v0.3.0 h1:Ij+Yt78R//uYqs3Zk35evZFvr+G0blW0OUN+Q2D1RWc= +github.com/pgvector/pgvector-go v0.3.0/go.mod h1:duFy+PXWfW7QQd5ibqutBO4GxLsUZ9RVXhFZGIBsWSA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -113,6 +129,24 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo= +github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs= +github.com/uptrace/bun v1.1.12 h1:sOjDVHxNTuM6dNGaba0wUuz7KvDE1BmNu9Gqs2gJSXQ= +github.com/uptrace/bun v1.1.12/go.mod h1:NPG6JGULBeQ9IU6yHp7YGELRa5Agmd7ATZdz4tGZ6z0= +github.com/uptrace/bun/dialect/pgdialect v1.1.12 h1:m/CM1UfOkoBTglGO5CUTKnIKKOApOYxkcP2qn0F9tJk= +github.com/uptrace/bun/dialect/pgdialect v1.1.12/go.mod h1:Ij6WIxQILxLlL2frUBxUBOZJtLElD2QQNDcu/PWDHTc= +github.com/uptrace/bun/driver/pgdriver v1.1.12 h1:3rRWB1GK0psTJrHwxzNfEij2MLibggiLdTqjTtfHc1w= +github.com/uptrace/bun/driver/pgdriver v1.1.12/go.mod h1:ssYUP+qwSEgeDDS1xm2XBip9el1y9Mi5mTAvLoiADLM= +github.com/vmihailenco/bufpool v0.1.11 h1:gOq2WmBrq0i2yW5QJ16ykccQ4wH9UyEsgLm6czKAd94= +github.com/vmihailenco/bufpool v0.1.11/go.mod h1:AFf/MOy3l2CFTKbxwt0mp2MwnqjNEs5H/UxrkA5jxTQ= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vbd1qPqc= +github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -181,5 +215,11 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo= +gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0= +gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls= +gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= +mellium.im/sasl v0.3.1 h1:wE0LW6g7U83vhvxjC1IY8DnXM+EU095yeo8XClvCdfo= +mellium.im/sasl v0.3.1/go.mod h1:xm59PUYpZHhgQ9ZqoJ5QaCqzWMi8IeS49dhp6plPCzw= diff --git a/internal/signaling/peer.go b/internal/signaling/peer.go index f7a1e31..115a976 100644 --- a/internal/signaling/peer.go +++ b/internal/signaling/peer.go @@ -27,10 +27,11 @@ type Peer struct { retrievedIDCallback func(context.Context, string, string, string) (bool, []string, error) - ID string - Secret string - Game string - Lobby string + ID string + Secret string + Game string + Lobby string + LatencyVector []float32 } func (p *Peer) Send(ctx context.Context, packet any) error { @@ -226,6 +227,14 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error } } + if len(packet.LatencyVector) == 11 { + p.LatencyVector = packet.LatencyVector + + if err := p.store.UpdatePeerLatency(ctx, p.ID, p.LatencyVector); err != nil { + logger.Warn("failed to persist peer latency", zap.Error(err)) + } + } + err := p.Send(ctx, WelcomePacket{ Type: "welcome", ID: p.ID, @@ -352,7 +361,7 @@ func (p *Peer) HandleListPacket(ctx context.Context, packet ListPacket) error { if p.ID == "" { return fmt.Errorf("peer not connected") } - lobbies, err := p.store.ListLobbies(ctx, p.Game, packet.Filter, packet.Sort, packet.Limit) + lobbies, err := p.store.ListLobbies(ctx, p.Game, p.LatencyVector, packet.Filter, packet.Sort, packet.Limit) if err != nil { return err } diff --git a/internal/signaling/stores/postgres.go b/internal/signaling/stores/postgres.go index c108bd3..ea84cf2 100644 --- a/internal/signaling/stores/postgres.go +++ b/internal/signaling/stores/postgres.go @@ -17,6 +17,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/koenbollen/logging" + "github.com/pgvector/pgvector-go" "github.com/poki/mongodb-filter-to-postgres/filter" "github.com/poki/netlib/internal/util" "go.uber.org/zap" @@ -38,7 +39,7 @@ type PostgresStore struct { func NewPostgresStore(ctx context.Context, db *pgxpool.Pool) (*PostgresStore, error) { filterConverter, err := filter.NewConverter( - filter.WithNestedJSONB("custom_data", "code", "playerCount", "createdAt", "updatedAt"), + filter.WithNestedJSONB("custom_data", "code", "playerCount", "createdAt", "updatedAt", "latency"), filter.WithEmptyCondition("TRUE"), // No filter returns all lobbies. ) if err != nil { @@ -312,13 +313,24 @@ func (s *PostgresStore) GetLobby(ctx context.Context, game, lobbyCode string) (L return lobby, nil } -func (s *PostgresStore) ListLobbies(ctx context.Context, game, filter, sort string, limit int) ([]Lobby, error) { +func (s *PostgresStore) ListLobbies(ctx context.Context, game string, latency []float32, filter, sort string, limit int) ([]Lobby, error) { // TODO: Remove this. if filter == "" { filter = "{}" } - where, values, err := s.filterConverter.Convert([]byte(filter), 3) + if limit <= 0 { + limit = 50 + } + + var latencyVector any + if len(latency) == 11 { + latencyVector = pgvector.NewVector(latency) + } + + preValues := []any{game, latencyVector, limit} + + where, values, err := s.filterConverter.Convert([]byte(filter), len(preValues)+1) if err != nil { logger := logging.GetLogger(ctx) logger.Warn("failed to convert filter", zap.String("filter", filter), zap.Error(err)) @@ -340,10 +352,6 @@ func (s *PostgresStore) ListLobbies(ctx context.Context, game, filter, sort stri order += `, "createdAt" DESC, "code" ASC` } - if limit <= 0 { - limit = 50 - } - var lobbies []Lobby rows, err := s.DB.Query(ctx, ` WITH lobbies AS ( @@ -359,17 +367,26 @@ func (s *PostgresStore) ListLobbies(ctx context.Context, game, filter, sort stri can_update_by, creator, password IS NOT NULL, - max_players + max_players, + lobby_latency_estimate( + $2, + ARRAY( + SELECT p.latency_vector + FROM peers p + WHERE p.peer = ANY (lobbies.peers) + AND p.latency_vector IS NOT NULL + ) + ) AS latency FROM lobbies WHERE game = $1 - AND public = true + AND public = true ) SELECT * FROM lobbies WHERE `+where+` ORDER BY `+order+` - LIMIT $2 - `, append([]any{game, limit}, values...)...) + LIMIT $3 + `, append(preValues, values...)...) if err != nil { return nil, err } @@ -377,7 +394,7 @@ func (s *PostgresStore) ListLobbies(ctx context.Context, game, filter, sort stri for rows.Next() { var lobby Lobby - err = rows.Scan(&lobby.Code, &lobby.PlayerCount, &lobby.Public, &lobby.CustomData, &lobby.CreatedAt, &lobby.UpdatedAt, &lobby.Leader, &lobby.Term, &lobby.CanUpdateBy, &lobby.Creator, &lobby.HasPassword, &lobby.MaxPlayers) + err = rows.Scan(&lobby.Code, &lobby.PlayerCount, &lobby.Public, &lobby.CustomData, &lobby.CreatedAt, &lobby.UpdatedAt, &lobby.Leader, &lobby.Term, &lobby.CanUpdateBy, &lobby.Creator, &lobby.HasPassword, &lobby.MaxPlayers, &lobby.Latency) if err != nil { return nil, err } @@ -408,6 +425,36 @@ func (s *PostgresStore) CreatePeer(ctx context.Context, peerID, secret, gameID s return nil } +func (s *PostgresStore) UpdatePeerLatency(ctx context.Context, peerID string, latency []float32) error { + now := util.NowUTC(ctx) + + if len(latency) == 0 { + _, err := s.DB.Exec(ctx, ` + UPDATE peers + SET + latency_vector = NULL, + updated_at = $1 + WHERE peer = $2 + `, now, peerID) + if err != nil { + return err + } + } else { + _, err := s.DB.Exec(ctx, ` + UPDATE peers + SET + latency_vector = $1, + updated_at = $2 + WHERE peer = $3 + `, pgvector.NewVector(latency), now, peerID) + if err != nil { + return err + } + } + + return nil +} + func (s *PostgresStore) MarkPeerAsActive(ctx context.Context, peerID string) error { now := util.NowUTC(ctx) diff --git a/internal/signaling/stores/setup.go b/internal/signaling/stores/setup.go index f4c22a4..cf82438 100644 --- a/internal/signaling/stores/setup.go +++ b/internal/signaling/stores/setup.go @@ -7,21 +7,52 @@ import ( "net" "os" "strings" + "sync" "time" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/koenbollen/logging" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" "github.com/poki/netlib/migrations" "go.uber.org/zap" + + pgxvec "github.com/pgvector/pgvector-go/pgx" ) +func getConfig(url string) (*pgxpool.Config, error) { + cfg, err := pgxpool.ParseConfig(url) + if err != nil { + return nil, fmt.Errorf("failed to parse database URL: %w", err) + } + + // Ensure the vector extension is created only once, not on every new connection. + // It needs to be create before we call pgxvec.RegisterTypes. + var createExtensionOnce sync.Once + + cfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + createExtensionOnce.Do(func() { + if _, err := conn.Exec(ctx, "CREATE EXTENSION IF NOT EXISTS vector"); err != nil { + panic(err) + } + }) + + return pgxvec.RegisterTypes(ctx, conn) + } + + return cfg, nil +} + func FromEnv(ctx context.Context) (Store, chan struct{}, error) { logger := logging.GetLogger(ctx) if url, ok := os.LookupEnv("DATABASE_URL"); ok { - db, err := pgxpool.New(ctx, url) + cfg, err := getConfig(url) + if err != nil { + return nil, nil, err + } + db, err := pgxpool.NewWithConfig(ctx, cfg) if err != nil { return nil, nil, fmt.Errorf("failed to connect: %w", err) } @@ -40,8 +71,8 @@ func FromEnv(ctx context.Context) (Store, chan struct{}, error) { return nil, nil, err } resource, err := pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "postgres", - Tag: "15-alpine", + Repository: "pgvector/pgvector", + Tag: "pg15", Env: []string{ "POSTGRES_PASSWORD=test", "POSTGRES_USER=test", @@ -84,12 +115,17 @@ func FromEnv(ctx context.Context) (Store, chan struct{}, error) { // This log message is used by the test suite to pass the database URL to the testproxy. logger.Info("using database", zap.String("url", databaseUrl)) + cfg, err := getConfig(databaseUrl) + if err != nil { + return nil, nil, err + } + var db *pgxpool.Pool pool.MaxWait = 120 * time.Second if err = pool.Retry(func() error { ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second) defer cancel() - db, err = pgxpool.New(ctx, databaseUrl) + db, err = pgxpool.NewWithConfig(ctx, cfg) if err != nil { return err } diff --git a/internal/signaling/stores/shared.go b/internal/signaling/stores/shared.go index bf102ed..5790568 100644 --- a/internal/signaling/stores/shared.go +++ b/internal/signaling/stores/shared.go @@ -29,12 +29,13 @@ type Store interface { JoinLobby(ctx context.Context, game, lobby, id, password string) error LeaveLobby(ctx context.Context, game, lobby, id string) error GetLobby(ctx context.Context, game, lobby string) (Lobby, error) - ListLobbies(ctx context.Context, game, filter, sort string, limit int) ([]Lobby, error) + ListLobbies(ctx context.Context, game string, latency []float32, filter, sort string, limit int) ([]Lobby, error) Subscribe(ctx context.Context, callback SubscriptionCallback, game, lobby, peerID string) Publish(ctx context.Context, topic string, data []byte) error CreatePeer(ctx context.Context, peerID, secret, gameID string) error + UpdatePeerLatency(ctx context.Context, peerID string, latency []float32) error MarkPeerAsActive(ctx context.Context, peerID string) error MarkPeerAsDisconnected(ctx context.Context, peerID string) error MarkPeerAsReconnected(ctx context.Context, peerID, secret, gameID string) (bool, []string, error) @@ -72,6 +73,8 @@ type Lobby struct { Leader string `json:"leader,omitempty"` Term int `json:"term"` + Latency *float32 `json:"latency,omitempty"` + CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` } diff --git a/internal/signaling/types.go b/internal/signaling/types.go index 1fbcb45..ed15dad 100644 --- a/internal/signaling/types.go +++ b/internal/signaling/types.go @@ -15,10 +15,11 @@ type PingPacket struct { type HelloPacket struct { Type string `json:"type"` - Game string `json:"game"` - ID string `json:"id"` - Secret string `json:"secret"` - Version string `json:"version"` + Game string `json:"game"` + ID string `json:"id"` + Secret string `json:"secret"` + Version string `json:"version"` + LatencyVector []float32 `json:"latencyVector"` } type WelcomePacket struct { diff --git a/lib/network-latency.ts b/lib/network-latency.ts new file mode 100644 index 0000000..0a46845 --- /dev/null +++ b/lib/network-latency.ts @@ -0,0 +1,36 @@ +const latencyHosts = [ + 'netlib-ping-africa.poki.io', + 'netlib-ping-asia-northeast.poki.io', + 'netlib-ping-asia-south.poki.io', + 'netlib-ping-asia-southeast.poki.io', + 'netlib-ping-australia.poki.io', + 'netlib-ping-eu-north.poki.io', + 'netlib-ping-eu-west.poki.io', + 'netlib-ping-me-central.poki.io', + 'netlib-ping-south-america.poki.io', + 'netlib-ping-us-central.poki.io', + 'netlib-ping-us-east.poki.io' +] + +export async function getLatencyVector (max: number, pings: number): Promise { + const measurements = await Promise.all(latencyHosts.map(async (host) => { + let latency = 0 + + for (let i = 0; i < pings; i++) { + const start = performance.now() + try { + await fetch(`https://${host}/`, { + method: 'HEAD', + cache: 'no-store', + mode: 'no-cors', + signal: AbortSignal.timeout(max) + }) + } catch {} + + latency += Math.round((performance.now() - start) / 2) // Divide by 2 to estimate one-way latency. + } + + return Math.round(latency / pings) // Average of two measurements. + })) + return measurements +} diff --git a/lib/network.ts b/lib/network.ts index 27d6ca9..c3ac2c3 100644 --- a/lib/network.ts +++ b/lib/network.ts @@ -36,10 +36,10 @@ export default class Network extends EventEmitter { private readonly unloadListener: () => void - constructor (public readonly gameID: string, private readonly rtcConfig: PeerConfiguration = DefaultRTCConfiguration, signalingURL: string = DefaultSignalingURL) { + constructor (public readonly gameID: string, private readonly peerConfig: PeerConfiguration = DefaultRTCConfiguration, signalingURL: string = DefaultSignalingURL) { super() this.peers = new Map() - this.signaling = new Signaling(this, this.peers, signalingURL) + this.signaling = new Signaling(this, this.peers, signalingURL, peerConfig.testLatency) this.credentials = new Credentials(this.signaling) this.unloadListener = () => this.close() @@ -157,7 +157,7 @@ export default class Network extends EventEmitter { * @internal */ async _addPeer (id: string, polite: boolean): Promise { - const config = await this.credentials.fillCredentials(this.rtcConfig) + const config = await this.credentials.fillCredentials(this.peerConfig) config.iceServers = config.iceServers?.filter(server => !(server.urls.includes('turn:') && server.username === undefined)) @@ -176,7 +176,7 @@ export default class Network extends EventEmitter { * @internal */ _prefetchTURNCredentials (): void { - this.credentials.fillCredentials(this.rtcConfig).catch(() => {}) + this.credentials.fillCredentials(this.peerConfig).catch(() => {}) } /** diff --git a/lib/signaling.ts b/lib/signaling.ts index 92a698c..db6e01a 100644 --- a/lib/signaling.ts +++ b/lib/signaling.ts @@ -1,8 +1,9 @@ import { EventEmitter } from 'eventemitter3' import Network from './network' import Peer from './peer' -import { LobbyListEntry, SignalingPacketTypes } from './types' +import { LobbyListEntry, SignalingPacketTypes, LatencyConfiguration } from './types' import { version } from '../package.json' +import { getLatencyVector } from './network-latency' interface SignalingListeners { credentials: (data: SignalingPacketTypes) => void | Promise @@ -28,13 +29,17 @@ export default class Signaling extends EventEmitter { private pingInterval?: ReturnType - constructor (private readonly network: Network, peers: Map, url: string) { + private readonly latencyVectorPromise: Promise + + constructor (private readonly network: Network, peers: Map, url: string, testLatency?: LatencyConfiguration) { super() this.url = url this.connections = peers this.replayQueue = new Map() + this.latencyVectorPromise = testLatency?.vector !== undefined ? Promise.resolve(testLatency.vector) : getLatencyVector(testLatency?.max ?? 1000, testLatency?.pings ?? 3) + this.ws = this.connect() // Send a ping every 5 seconds to keep the connection alive, @@ -47,14 +52,17 @@ export default class Signaling extends EventEmitter { private connect (): WebSocket { const ws = new WebSocket(this.url) const onOpen = (): void => { - this.reconnectAttempt = 0 - this.reconnecting = false - this.send({ - type: 'hello', - game: this.network.gameID, - id: this.receivedID, - secret: this.receivedSecret, - version + void this.latencyVectorPromise.then(latencyVector => { + this.reconnectAttempt = 0 + this.reconnecting = false + this.send({ + type: 'hello', + game: this.network.gameID, + id: this.receivedID, + secret: this.receivedSecret, + version, + latencyVector + }) }) } const onError = (e: Event): void => { diff --git a/lib/types.ts b/lib/types.ts index 17a0232..983df49 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -4,6 +4,25 @@ export interface PeerConfiguration extends RTCConfiguration { * @internal */ testproxyURL?: string + + /** + * @internal + */ + testLatency?: LatencyConfiguration +} + +/** + * @internal + * + * This configuration is used to for our feature tests and exposes + * `max` and `pings` so we can start experimenting with these in + * a production environment. + */ +export interface LatencyConfiguration { + vector?: number[] + + max?: number + pings?: number } export interface LobbySettings { @@ -27,6 +46,7 @@ export interface LobbyListEntry { term: number createdAt: string updatedAt: string + latency?: number } interface Base { @@ -76,6 +96,7 @@ export interface HelloPacket extends Base { id?: string secret?: string version?: string + latencyVector?: number[] } export interface WelcomePacket extends Base { diff --git a/migrations/1758799200_latency_vector.down.sql b/migrations/1758799200_latency_vector.down.sql new file mode 100644 index 0000000..1c2ae55 --- /dev/null +++ b/migrations/1758799200_latency_vector.down.sql @@ -0,0 +1,10 @@ +BEGIN; + +ALTER TABLE "peers" + DROP COLUMN IF EXISTS "latency_vector"; + +DROP INDEX IF EXISTS "peers_peer_with_latency_idx"; + +DROP FUNCTION IF EXISTS lobby_latency_estimate(vector(11), vector(11)[], int, float8); + +COMMIT; diff --git a/migrations/1758799200_latency_vector.up.sql b/migrations/1758799200_latency_vector.up.sql new file mode 100644 index 0000000..d2932d7 --- /dev/null +++ b/migrations/1758799200_latency_vector.up.sql @@ -0,0 +1,77 @@ +BEGIN; + +ALTER TABLE "peers" + ADD COLUMN IF NOT EXISTS "latency_vector" vector(11); + + +-- Index to speed up lobby queries that now join on peers to get latency_vector. +CREATE INDEX "peers_peer_with_latency_idx" + ON "peers" ("peer") INCLUDE ("latency_vector") + WHERE "latency_vector" IS NOT NULL; + + +-- Estimates lobby latency for a new peer by averaging network triangulation with existing peers. +-- +-- See: https://www.wisdom.weizmann.ac.il/~robi/papers/K-triangulation-SPAA07.pdf +-- +-- for each peer, compute a Chebyshev distance lower bound (max |peer[i]-peers[i]|) +-- and a robust upper bound as the mean of the k smallest (peer[i] + peers[i]) and blend them by w (clamped), +-- (to prevent triangle inequality violations from producing estimates below the lower bound) +-- then return the average of those per-peer estimates. +-- +-- k = number of smallest sums. +-- w (0..1) biases toward the upper bound (more w = more bias toward upper bound). +-- eps is used to model measurement noise (higher eps = more noise = more bias toward lower bound). +CREATE OR REPLACE FUNCTION lobby_latency_estimate( + peer_vector vector(11), + peers_vector vector(11)[], + k int DEFAULT 3, + w float8 DEFAULT 0.7, + eps float8 DEFAULT 5 +) RETURNS float8 +LANGUAGE sql +IMMUTABLE +AS $$ +WITH peer AS ( + SELECT (peer_vector)::real[] AS pv +), +peers AS ( + SELECT p::real[] AS psv + FROM unnest(peers_vector) AS t(p) +), +per_peer AS ( + SELECT + -- lb = max_i |peer[i] - peers[i]| (Chebyshev distance) + ( + SELECT MAX(ABS(a - b)) + FROM peer + JOIN unnest(pv) WITH ORDINALITY AS pve(a, i) ON TRUE + JOIN unnest(psv) WITH ORDINALITY AS psve(b, i) USING (i) + ) AS lb, + -- ub = average of K smallest (peer[i] + peers[i]) + ( + SELECT AVG(s) + FROM ( + SELECT (a + b) AS s + FROM peer + JOIN unnest(pv) WITH ORDINALITY AS yu(a,i) ON TRUE + JOIN unnest(psv) WITH ORDINALITY AS bu(b,i) USING (i) + ORDER BY s ASC + LIMIT k + ) kbest + ) AS ub + FROM peers +), +per_peer_estimate AS ( + SELECT + GREATEST( + lb, + LEAST((w * LEAST(1.0, lb/eps)) * ub + (1.0 - (w * LEAST(1.0, lb/eps))) * lb, ub) + ) AS estimate + FROM per_peer +) +SELECT ROUND(AVG(estimate)) +FROM per_peer_estimate; +$$; + +COMMIT; diff --git a/migrations/latest.lock b/migrations/latest.lock index fb428da..ffdf7af 100644 --- a/migrations/latest.lock +++ b/migrations/latest.lock @@ -1 +1 @@ -1742158930_lobbies_pkey +1758799200_latency_vector