diff --git a/.gitignore b/.gitignore index 17e895c7..7d259617 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ build # Jest coverage report /coverage + +# NBS recordings +/recordings diff --git a/jest.config.js b/jest.config.js index ccb47896..b4334611 100644 --- a/jest.config.js +++ b/jest.config.js @@ -1,43 +1,2 @@ -module.exports = { - coverageDirectory: 'coverage', - coveragePathIgnorePatterns: ['/node_modules/', 'src/global.d.ts'], - collectCoverageFrom: [ - '**/*.{ts,tsx}', - '!src/shared/proto/**', - '!**/node_modules/**', - '!**/tests/**', - ], - globals: { - 'ts-jest': { - 'tsConfigFile': './tsconfig.test.json', - }, - }, - mapCoverage: true, - moduleDirectories: [ - 'node_modules', - '/src', - ], - moduleFileExtensions: [ - 'js', - 'ts', - 'tsx', - ], - moduleNameMapper: { - '\\.(css)$': 'identity-obj-proxy', - '\\.(vert)$': '/__mocks__/mock.vert', - '\\.(frag)$': '/__mocks__/mock.frag', - }, - roots: [ - '/src', - ], - modulePaths: [ - '/src', - ], - testMatch: [ - '**/tests/**/*.tests.{ts,tsx}', - ], - transform: { - '.(ts|tsx)': '/node_modules/ts-jest/preprocessor.js', - }, -} +module.exports = require('./jestconfig.json') diff --git a/jestconfig.json b/jestconfig.json new file mode 100644 index 00000000..ef8252e0 --- /dev/null +++ b/jestconfig.json @@ -0,0 +1,45 @@ +{ + "coverageDirectory": "coverage", + "coveragePathIgnorePatterns": [ + "/node_modules/", + "src/global.d.ts" + ], + "collectCoverageFrom": [ + "**/*.{ts,tsx}", + "!src/shared/proto/**", + "!**/node_modules/**", + "!**/tests/**" + ], + "globals": { + "ts-jest": { + "tsConfigFile": "./tsconfig.test.json" + } + }, + "mapCoverage": true, + "moduleDirectories": [ + "node_modules", + "/src" + ], + "moduleFileExtensions": [ + "js", + "ts", + "tsx" + ], + "moduleNameMapper": { + "\\.(css)$": "identity-obj-proxy", + "\\.(vert)$": "/__mocks__/mock.vert", + "\\.(frag)$": "/__mocks__/mock.frag" + }, + "roots": [ + "/src" + ], + "modulePaths": [ + "/src" + ], + "testMatch": [ + "**/tests/**/*.tests.{ts,tsx}" + ], + "transform": { + ".(ts|tsx)": "/node_modules/ts-jest/preprocessor.js" + } +} diff --git a/package.json b/package.json index c45e0df7..4ca790a3 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "start": "nodemon ./src/server/dev.ts", "start:sim": "nodemon ./src/server/dev.ts --with-simulators", "simulate": "nodemon ./src/simulators/simulate.ts", + "validate": "ts-node ./src/validator/validate.ts", "prod": "ts-node -F ./src/server/prod.ts", "prod:sim": "ts-node -F ./src/server/prod.ts --with-simulators", "build": "yarn clean:build && webpack -p --progress --colors", @@ -29,6 +30,7 @@ }, "license": "MIT", "devDependencies": { + "@types/buffers": "^0.1.30", "@types/classnames": "^2.2.0", "@types/compression": "^0.0.33", "@types/copy-webpack-plugin": "^4.0.0", @@ -88,6 +90,7 @@ "webpack-hot-middleware": "^2.18.2" }, "dependencies": { + "buffers": "^0.1.1", "classnames": "^2.2.5", "compression": "^1.7.0", "connect-history-api-fallback": "^1.3.0", @@ -95,7 +98,7 @@ "minimist": "^1.2.0", "mobx": "^3.2.0", "mobx-react": "^4.2.2", - "nuclearnet.js": "^1.2.0", + "nuclearnet.js": "^1.4.2", "protobufjs": "^6.7.3", "react": "^15.6.1", "react-dom": "^15.6.1", diff --git a/src/client/base/memoize.ts b/src/client/base/memoize.ts index ba156745..b4e54508 100644 --- a/src/client/base/memoize.ts +++ b/src/client/base/memoize.ts @@ -1,10 +1,10 @@ /** * Given a function that takes an object A and returns a B, create a new function which memoizes that A -> B transform. * - * i.e. The first time the memoized function called with an A, it calculates B using fn(A) and stores B in its internal - * map. The second time it is called with the same A, it will not call fn(A) and instead just return the B that was - * created the previous time. Internally the function uses a WeakMap, so B will be automatically garbage collected when - * its corresponding A no longer exists in memory. + * i.e. The first time the memoized function is called with an A, it calculates B using fn(A) and stores B in its + * internal map. The second time it is called with the same A, it will not call fn(A) and instead just return the B that + * was created the previous time. Internally the function uses a WeakMap, so B will be automatically garbage collected + * when its corresponding A no longer exists in memory. * * e.g. * const a = { name: 'Foo' } diff --git a/src/client/components/navigation/icons/record.svg b/src/client/components/navigation/icons/record.svg new file mode 100644 index 00000000..008df2f3 --- /dev/null +++ b/src/client/components/navigation/icons/record.svg @@ -0,0 +1,7 @@ + + + + + + + diff --git a/src/client/components/navigation/view.tsx b/src/client/components/navigation/view.tsx index b237c33e..2d4c29ad 100644 --- a/src/client/components/navigation/view.tsx +++ b/src/client/components/navigation/view.tsx @@ -7,6 +7,7 @@ import EyeIcon from './icons/eye.svg' import MapIcon from './icons/map.svg' import NUClearIcon from './icons/nuclear.svg' import OrderingIcon from './icons/ordering.svg' +import RecordIcon from './icons/record.svg' import ScatterIcon from './icons/scatter.svg' import SpeedometerIcon from './icons/speedometer.svg' import * as style from './style.css' @@ -40,6 +41,7 @@ export const NavigationView = () => ( Classifier Subsumption GameState + Record ) diff --git a/src/client/components/record/controller.ts b/src/client/components/record/controller.ts new file mode 100644 index 00000000..8a4ce1c2 --- /dev/null +++ b/src/client/components/record/controller.ts @@ -0,0 +1,27 @@ +import { action } from 'mobx' +import { NUsightNetwork } from '../../network/nusight_network' +import { RecordRobotModel } from './model' + +export class RecordController { + public constructor(private nusightNetwork: NUsightNetwork) { + } + + public static of(nusightNetwork: NUsightNetwork): RecordController { + return new RecordController(nusightNetwork) + } + + @action + public onStartRecordingClick(robot: RecordRobotModel) { + const peer = { name: robot.name, address: robot.address, port: robot.port } + robot.stopRecording = this.nusightNetwork.record(peer) + robot.recording = true + } + + @action + public onStopRecordingClick(robot: RecordRobotModel) { + if (robot.stopRecording) { + robot.stopRecording() + } + robot.recording = false + } +} diff --git a/src/client/components/record/model.ts b/src/client/components/record/model.ts new file mode 100644 index 00000000..bc3b6043 --- /dev/null +++ b/src/client/components/record/model.ts @@ -0,0 +1,58 @@ +import { observable } from 'mobx' +import { computed } from 'mobx' +import { memoize } from '../../base/memoize' +import { AppModel } from '../app/model' +import { RobotModel } from '../robot/model' + +export class RecordModel { + @observable private appModel: AppModel + + public constructor(appModel: AppModel) { + this.appModel = appModel + } + + public static of(appModel: AppModel) { + return new RecordModel(appModel) + } + + @computed + public get robots(): RecordRobotModel[] { + return this.appModel.robots.map(robot => RecordRobotModel.of(robot)) + } +} + +type RecordRobotModelOpts = { + recording: boolean +} + +export class RecordRobotModel { + @observable private robotModel: RobotModel + @observable public recording: boolean + public stopRecording?: () => void + + public constructor(robotModel: RobotModel, opts: RecordRobotModelOpts) { + this.robotModel = robotModel + this.recording = opts.recording + } + + public static of = memoize((robot: RobotModel): RecordRobotModel => { + return new RecordRobotModel(robot, { + recording: false // TODO (Annable): get from server? + }) + }) + + @computed + public get name(): string { + return this.robotModel.name + } + + @computed + public get address(): string { + return this.robotModel.address + } + + @computed + public get port(): number { + return this.robotModel.port + } +} diff --git a/src/client/components/record/styles.css b/src/client/components/record/styles.css new file mode 100644 index 00000000..a4375cc5 --- /dev/null +++ b/src/client/components/record/styles.css @@ -0,0 +1,6 @@ +.record { + flex-grow: 1; +} +.recordMenuBar { + flex: 1; +} diff --git a/src/client/components/record/view.tsx b/src/client/components/record/view.tsx new file mode 100644 index 00000000..57a1d605 --- /dev/null +++ b/src/client/components/record/view.tsx @@ -0,0 +1,54 @@ +import { observer } from 'mobx-react' +import * as React from 'react' +import { ComponentType } from 'react' +import { Component } from 'react' +import { NUsightNetwork } from '../../network/nusight_network' +import { RecordController } from './controller' +import { RecordModel } from './model' +import * as styles from './styles.css' + +type Props = { + menu: ComponentType<{}> + controller: RecordController + model: RecordModel +} + +@observer +export class RecordView extends Component { + public static of(menu: ComponentType<{}>, nusightNetwork: NUsightNetwork, model: RecordModel) { + const controller = RecordController.of(nusightNetwork) + return + } + + public render() { + const { menu, controller, model } = this.props + const { robots } = model + return ( +
+ +
+ {robots.map(robot => ( +
+
Name: {robot.name}
+
Record: {robot.recording + ? + : } +
+
+ ))} +
+
+ ) + } +} + +type RecordMenuBarProps = { + menu: ComponentType<{}> +} + +const RecordMenuBar = observer((props: RecordMenuBarProps) => { + const { menu: Menu } = props + return +}) diff --git a/src/client/index.tsx b/src/client/index.tsx index 60fb6e44..6e2deedd 100644 --- a/src/client/index.tsx +++ b/src/client/index.tsx @@ -18,10 +18,12 @@ import { LocalisationNetwork } from './components/localisation/network' import { LocalisationView } from './components/localisation/view' import { withRobotSelectorMenuBar } from './components/menu_bar/view' import { NUClear } from './components/nuclear/view' +import { RecordView } from './components/record/view' import { Scatter } from './components/scatter_plot/view' import { Subsumption } from './components/subsumption/view' import { Vision } from './components/vision/view' import { NUsightNetwork } from './network/nusight_network' +import { RecordModel } from './components/record/model' // enable MobX strict mode useStrict(true) @@ -36,6 +38,9 @@ const appController = AppController.of() AppNetwork.of(nusightNetwork, appModel) const menu = withRobotSelectorMenuBar(appModel.robots, appController.toggleRobotEnabled) + +const recordModel = RecordModel.of(appModel) + ReactDOM.render( @@ -54,6 +59,7 @@ ReactDOM.render( + RecordView.of(menu, nusightNetwork, recordModel)}/> , diff --git a/src/client/network/nusight_network.ts b/src/client/network/nusight_network.ts index e44ffe81..5073b57b 100644 --- a/src/client/network/nusight_network.ts +++ b/src/client/network/nusight_network.ts @@ -6,6 +6,7 @@ import { WebSocketProxyNUClearNetClient } from '../nuclearnet/web_socket_proxy_n import { MessageTypePath } from './message_type_names' import { RobotModel } from '../components/robot/model' import { AppModel } from '../components/app/model' +import { WebSocketClient } from '../nuclearnet/web_socket_client' const HEADER_SIZE = 9 @@ -15,15 +16,24 @@ const HEADER_SIZE = 9 * instead create their own ComponentNetwork class which uses the Network helper class. */ export class NUsightNetwork { + private nextRequestTokenId: number + public constructor(private nuclearnetClient: NUClearNetClient, + private socket: WebSocketClient, private appModel: AppModel, private messageTypePath: MessageTypePath) { + this.nextRequestTokenId = 0 } public static of(appModel: AppModel) { const messageTypePath = MessageTypePath.of() const nuclearnetClient: NUClearNetClient = WebSocketProxyNUClearNetClient.of() - return new NUsightNetwork(nuclearnetClient, appModel, messageTypePath) + const uri = `${document.location.origin}/nusight` + const socket = WebSocketClient.of(uri, { + upgrade: false, + transports: ['websocket'], + }) + return new NUsightNetwork(nuclearnetClient, socket, appModel, messageTypePath) } public connect(opts: NUClearNetOptions): () => void { @@ -53,6 +63,19 @@ export class NUsightNetwork { public onNUClearLeave(cb: (peer: NUClearNetPeer) => void) { this.nuclearnetClient.onLeave(cb) } + + public record(peer: NUClearNetPeer): () => void { + const token = this.getNextRequestToken() + this.socket.send('record', peer, token) + return () => { + console.log(`unrecording for ${peer}`) + this.socket.send('unrecord', token) + } + } + + private getNextRequestToken() { + return String(this.nextRequestTokenId++) + } } export interface MessageType { diff --git a/src/client/nuclearnet/web_socket_proxy_nuclearnet_client.ts b/src/client/nuclearnet/web_socket_proxy_nuclearnet_client.ts index 51cd41ac..2e2a3492 100644 --- a/src/client/nuclearnet/web_socket_proxy_nuclearnet_client.ts +++ b/src/client/nuclearnet/web_socket_proxy_nuclearnet_client.ts @@ -1,11 +1,10 @@ import { NUClearNetOptions } from 'nuclearnet.js' import { NUClearNetSend } from 'nuclearnet.js' +import { NUClearNetPacket } from 'nuclearnet.js' import { NUClearPacketListener } from '../../shared/nuclearnet/nuclearnet_client' import { NUClearEventListener } from '../../shared/nuclearnet/nuclearnet_client' import { NUClearNetClient } from '../../shared/nuclearnet/nuclearnet_client' import { WebSocketClient } from './web_socket_client' -import SocketIOSocket = SocketIOClient.Socket -import { NUClearNetPacket } from 'nuclearnet.js' type PacketListener = (packet: NUClearNetPacket, ack?: () => void) => void @@ -111,6 +110,10 @@ export class WebSocketProxyNUClearNetClient implements NUClearNetClient { } } + public onPacket(cb: NUClearPacketListener): () => void { + return this.on('nuclear_packet', cb) + } + public send(options: NUClearNetSend): void { if (typeof options.type === 'string') { this.socket.send(options.type, options) diff --git a/src/server/dev.ts b/src/server/dev.ts index 98ceb703..741e7a3e 100644 --- a/src/server/dev.ts +++ b/src/server/dev.ts @@ -1,6 +1,7 @@ import * as compression from 'compression' import * as history from 'connect-history-api-fallback' import * as express from 'express' +import * as fs from 'fs' import * as http from 'http' import * as minimist from 'minimist' import * as favicon from 'serve-favicon' @@ -9,10 +10,14 @@ import * as webpack from 'webpack' import * as webpackDevMiddleware from 'webpack-dev-middleware' import * as webpackHotMiddleware from 'webpack-hot-middleware' import webpackConfig from '../../webpack.config' -import { VirtualRobots } from '../simulators/virtual_robots' import { SensorDataSimulator } from '../simulators/sensor_data_simulator' +import { VirtualRobots } from '../simulators/virtual_robots' +import { NbsNUClearPlayback } from './nbs/nbs_nuclear_playback' +import { DirectNUClearNetClient } from './nuclearnet/direct_nuclearnet_client' +import { FakeNUClearNetClient } from './nuclearnet/fake_nuclearnet_client' import { WebSocketProxyNUClearNetServer } from './nuclearnet/web_socket_proxy_nuclearnet_server' import { WebSocketServer } from './nuclearnet/web_socket_server' +import { NUsightServer } from './nusight_server' const compiler = webpack(webpackConfig) @@ -47,17 +52,34 @@ server.listen(port, () => { console.log(`NUsight server started at http://localhost:${port}`) }) -if (withSimulators) { - const virtualRobots = VirtualRobots.of({ - fakeNetworking: true, - numRobots: 3, - simulators: [ - SensorDataSimulator.of(), - ], - }) - virtualRobots.simulateWithFrequency(60) -} - -WebSocketProxyNUClearNetServer.of(WebSocketServer.of(sioNetwork.of('/nuclearnet')), { - fakeNetworking: withSimulators, +devMiddleware.waitUntilValid(() => { + if (withSimulators) { + const virtualRobots = VirtualRobots.of({ + fakeNetworking: true, + numRobots: 3, + simulators: [ + SensorDataSimulator.of(), + ], + }) + virtualRobots.simulateWithFrequency(60) + } + + const nuclearnetClient = withSimulators ? FakeNUClearNetClient.of() : DirectNUClearNetClient.of() + + WebSocketProxyNUClearNetServer.of(WebSocketServer.of(sioNetwork.of('/nuclearnet')), nuclearnetClient) + + NUsightServer.of(WebSocketServer.of(sioNetwork.of('/nusight')), nuclearnetClient) + + async function playback() { + const fake = withSimulators ? FakeNUClearNetClient.of() : DirectNUClearNetClient.of() + fake.connect({ name: 'Fake Stream' }) + while (true) { + const file = fs.createReadStream('/Users/brendan/Lab/NUsight2/recordings/darwin3_WalkAround.nbs') + // const file = fs.createReadStream('/Users/brendan/Lab/NUsight2/recordings/darwin3_FollowBall.nbs') + const out = NbsNUClearPlayback.fromRawStream(file, fake) + await new Promise(res => out.on('finish', res)) + } + } + + playback() }) diff --git a/src/server/nbs/nbs_frame_chunker.ts b/src/server/nbs/nbs_frame_chunker.ts new file mode 100644 index 00000000..50a728a9 --- /dev/null +++ b/src/server/nbs/nbs_frame_chunker.ts @@ -0,0 +1,55 @@ +import * as Buffers from 'buffers' +import * as stream from 'stream' +import { NBS_HEADER } from './nbs_frame_codecs' + +export class NbsFrameChunker extends stream.Transform { + private buffers: Buffers + private foundHeader: boolean + private foundPacketSize: boolean + + constructor() { + super({ + objectMode: true, + }) + + this.buffers = new Buffers() + this.foundHeader = false + this.foundPacketSize = false + } + + public static of(): NbsFrameChunker { + return new NbsFrameChunker() + } + + public _transform(chunk: any, encoding: string, done: (err?: any, data?: any) => void) { + this.buffers.push(chunk) + + let frame + while ((frame = this.getNextFrame(this.buffers)) !== undefined) { + this.push(frame.buffer) + this.buffers.splice(0, frame.offset + frame.buffer.byteLength) + } + + done() + } + + private getNextFrame(buffer: Buffers): { offset: number, buffer: Buffer } | undefined { + const headerIndex = buffer.indexOf(NBS_HEADER) + const headerSize = NBS_HEADER.byteLength + const packetLengthSize = 4 + const headerAndPacketLengthSize = headerSize + packetLengthSize + if (headerIndex >= 0) { + const frame = buffer.slice(headerIndex) + if (frame.length >= headerAndPacketLengthSize) { + const packetSize = frame.slice(headerSize, headerSize + headerAndPacketLengthSize).readUInt32LE(0) + if (frame.length >= headerAndPacketLengthSize + packetSize) { + return { + offset: headerIndex, + buffer: frame.slice(0, headerAndPacketLengthSize + packetSize), + } + } + } + } + return undefined + } +} diff --git a/src/server/nbs/nbs_frame_codecs.ts b/src/server/nbs/nbs_frame_codecs.ts new file mode 100644 index 00000000..5211dfb4 --- /dev/null +++ b/src/server/nbs/nbs_frame_codecs.ts @@ -0,0 +1,49 @@ +import * as Long from 'long' +import { NUClearNetPacket } from 'nuclearnet.js' + +export const NBS_HEADER = Buffer.from([0xE2, 0x98, 0xA2]) // NUClear radiation symbol. + +export type NbsFrame = { + // Omitted redundant header information. + // header: Buffer, + // size: number, + timestampInMicroseconds: number, + hash: Buffer, + payload: Buffer, +} + +// NBS frame format: +// 3 Bytes - NUClear radiation symbol header, useful for synchronisation when attaching to an existing stream. +// 4 Bytes - The remaining packet length i.e. 16 bytes + N payload bytes +// 8 Bytes - 64bit timestamp in microseconds. Note: this is not necessarily a unix timestamp. +// 8 Bytes - 64bit bit hash of the message type. +// N bytes - The binary packet payload. + +export function encodeFrame(frame: NbsFrame): Buffer { + const size = 16 + frame.payload.byteLength + const buffer = new Buffer(7 + size) + NBS_HEADER.copy(buffer, 0, 0, 3) + buffer.writeUInt32LE(size, 3) + const timeLong = Long.fromNumber(frame.timestampInMicroseconds) + buffer.writeUInt32LE(timeLong.low, 7) + buffer.writeUInt32LE(timeLong.high, 11) + frame.hash.copy(buffer, 15, 0, 8) + frame.payload.copy(buffer, 23) + return buffer +} + +export function decodeFrame(buffer: Buffer): NbsFrame { + const size = buffer.readUInt32LE(3) + const timestampInMicroseconds = Long.fromBits(buffer.readUInt32LE(7), buffer.readUInt32LE(11)).toNumber() + const hash = buffer.slice(15, 23) + const payload = buffer.slice(23, 23 + size) + return { timestampInMicroseconds, hash, payload } +} + +export function packetToFrame(packet: NUClearNetPacket, timestampInMicroseconds: number): NbsFrame { + return { + timestampInMicroseconds, + hash: packet.hash, + payload: packet.payload, + } +} diff --git a/src/server/nbs/nbs_frame_streams.ts b/src/server/nbs/nbs_frame_streams.ts new file mode 100644 index 00000000..caeff7ab --- /dev/null +++ b/src/server/nbs/nbs_frame_streams.ts @@ -0,0 +1,46 @@ +import * as stream from 'stream' +import { Clock } from '../time/clock' +import { NodeSystemClock } from '../time/node_clock' +import { NbsFrame } from './nbs_frame_codecs' +import { encodeFrame } from './nbs_frame_codecs' +import { NUClearNetPacket } from 'nuclearnet.js' +import { packetToFrame } from './nbs_frame_codecs' +import { decodeFrame } from './nbs_frame_codecs' + +export class NbsFrameEncoder extends stream.Transform { + public constructor(private clock: Clock) { + super({ + objectMode: true, + }) + } + + public static of() { + return new NbsFrameEncoder(NodeSystemClock) + } + + public _transform(frame: NbsFrame, encoding: string, done: (err?: any, data?: any) => void) { + this.push(encodeFrame(frame)) + done() + } + + public writePacket(packet: NUClearNetPacket) { + this.write(packetToFrame(packet, this.clock.performanceNow() * 1e6)) + } +} + +export class NbsFrameDecoder extends stream.Transform { + public constructor() { + super({ + objectMode: true, + }) + } + + public static of() { + return new NbsFrameDecoder() + } + + public _transform(buffer: Buffer, encoding: string, done: (err?: any, data?: any) => void) { + this.push(decodeFrame(buffer)) + done() + } +} diff --git a/src/server/nbs/nbs_nuclear_playback.ts b/src/server/nbs/nbs_nuclear_playback.ts new file mode 100644 index 00000000..57357e45 --- /dev/null +++ b/src/server/nbs/nbs_nuclear_playback.ts @@ -0,0 +1,49 @@ +import { ReadStream } from 'fs' +import * as stream from 'stream' +import { NUClearNetClient } from '../../shared/nuclearnet/nuclearnet_client' +import { Clock } from '../time/clock' +import { NodeSystemClock } from '../time/node_clock' +import { NbsFrameChunker } from './nbs_frame_chunker' +import { NbsFrame } from './nbs_frame_codecs' +import { NbsFrameDecoder } from './nbs_frame_streams' + +export class NbsNUClearPlayback extends stream.Writable { + private firstFrameTimestamp?: number + private firstLocalTimestamp?: number + + public constructor(private nuclearnetClient: NUClearNetClient, + private clock: Clock) { + super({ + objectMode: true, + }) + } + + public static of(nuclearnetClient: NUClearNetClient) { + return new NbsNUClearPlayback(nuclearnetClient, NodeSystemClock) + } + + public static fromRawStream(rawStream: ReadStream, nuclearnetClient: NUClearNetClient) { + const playback = NbsNUClearPlayback.of(nuclearnetClient) + rawStream.pipe(new NbsFrameChunker()).pipe(new NbsFrameDecoder()).pipe(playback) + return playback + } + + public _write(frame: NbsFrame, encoding: string, done: Function) { + const now = this.clock.performanceNow() + if (this.firstFrameTimestamp === undefined || this.firstLocalTimestamp === undefined) { + this.firstFrameTimestamp = frame.timestampInMicroseconds + this.firstLocalTimestamp = now + } + + const timeOffset = (frame.timestampInMicroseconds - this.firstFrameTimestamp) * 1e-6 + const timeout = Math.max(0, this.firstLocalTimestamp + timeOffset - now) + + this.clock.setTimeout(() => { + this.nuclearnetClient.send({ + type: frame.hash, + payload: frame.payload, + }) + done() + }, timeout) + } +} diff --git a/src/server/nbs/nbs_playback_controller.ts b/src/server/nbs/nbs_playback_controller.ts new file mode 100644 index 00000000..b2db09c3 --- /dev/null +++ b/src/server/nbs/nbs_playback_controller.ts @@ -0,0 +1,58 @@ +import * as fs from 'fs' +import { ReadStream } from 'fs' +import { NUClearNetClient } from '../../shared/nuclearnet/nuclearnet_client' +import { NbsNUClearPlayback } from './nbs_nuclear_playback' +import WritableStream = NodeJS.WritableStream + +export class NbsPlaybackController { + private state: PlaybackState + private inputStream?: ReadStream + private outputStream: WritableStream + private currentIndex: number + + public constructor(private nuclearnetClient: NUClearNetClient) { + this.state = PlaybackState.Idle + this.currentIndex = 0 + } + + public static of(nuclearnetClient: NUClearNetClient) { + return new NbsPlaybackController(nuclearnetClient) + } + + public play(filename: string): () => void { + this.state = PlaybackState.Playing + + this.inputStream = fs.createReadStream(filename, { encoding: 'binary' }) + this.outputStream = NbsNUClearPlayback.fromRawStream(this.inputStream, this.nuclearnetClient) + .on('finish', () => { + this.state = PlaybackState.Idle + }) + + return () => { + if (this.inputStream) { + this.state = PlaybackState.Idle + this.inputStream.close() + this.inputStream = undefined + } + } + } + + public pause(): () => void { + if (this.inputStream) { + this.state = PlaybackState.Paused + this.inputStream.pause() + } + return () => { + if (this.inputStream && this.state === PlaybackState.Paused) { + this.state = PlaybackState.Playing + this.inputStream.resume() + } + } + } +} + +enum PlaybackState { + Idle = 1, + Paused, + Playing, +} diff --git a/src/server/nbs/nbs_recorder_controller.ts b/src/server/nbs/nbs_recorder_controller.ts new file mode 100644 index 00000000..edf59caa --- /dev/null +++ b/src/server/nbs/nbs_recorder_controller.ts @@ -0,0 +1,46 @@ +import { WriteStream } from 'fs' +import * as fs from 'fs' +import { NUClearNetPacket } from 'nuclearnet.js' +import { NUClearNetPeer } from 'nuclearnet.js' +import { NUClearNetClient } from '../../shared/nuclearnet/nuclearnet_client' +import { NbsFrameEncoder } from './nbs_frame_streams' + +export class NbsRecorderController { + private frameEncoder: NbsFrameEncoder + private recording: boolean + private file: WriteStream + + public constructor(private peer: NUClearNetPeer, + private nuclearnetClient: NUClearNetClient) { + this.recording = false + } + + public static of(peer: NUClearNetPeer, nuclearnetClient: NUClearNetClient): NbsRecorderController { + return new NbsRecorderController(peer, nuclearnetClient) + } + + public record(filename: string): () => void { + this.frameEncoder = NbsFrameEncoder.of() + this.file = fs.createWriteStream(filename, { defaultEncoding: 'binary' }) + const stopListening = this.nuclearnetClient.onPacket(this.onPacket) + this.frameEncoder.pipe(this.file) + this.recording = true + return () => { + stopListening() + this.recording = false + this.file.end() + } + } + + private onPacket = (packet: NUClearNetPacket) => { + if (!this.arePeersEqual(this.peer, packet.peer)) { + return + } + + this.frameEncoder.writePacket(packet) + } + + private arePeersEqual(peerA: NUClearNetPeer, peerB: NUClearNetPeer) { + return peerA.name === peerB.name && peerA.address === peerB.address && peerA.port === peerB.port + } +} diff --git a/src/server/nbs/tests/nbs_frame_codecs.tests.ts b/src/server/nbs/tests/nbs_frame_codecs.tests.ts new file mode 100644 index 00000000..8c9715ca --- /dev/null +++ b/src/server/nbs/tests/nbs_frame_codecs.tests.ts @@ -0,0 +1,43 @@ +import { hashType } from '../../nuclearnet/fake_nuclearnet_server' +import { encodeFrame } from '../nbs_frame_codecs' +import { decodeFrame } from '../nbs_frame_codecs' + +describe('NbsFrameCodecs', () => { + describe('encoding', () => { + it('encodes frames', () => { + const hash = hashType('message.input.sensors') + const timestamp = 1500379664696000 + const payload = new Buffer(8).fill(0x12) + const buffer = encodeFrame({ timestampInMicroseconds: timestamp, hash, payload }) + expect(buffer.toString('hex')).toEqual('e298a218000000c042f15c9654050010abef8b5398f0d41212121212121212') + }) + }) + + describe('decoding', () => { + it('decodes frames', () => { + const buffer = Buffer.from('e298a218000000c042f15c9654050010abef8b5398f0d41212121212121212', 'hex') + const frame = decodeFrame(buffer) + expect(frame).toEqual({ + timestampInMicroseconds: 1500379664696000, + hash: hashType('message.input.sensors'), + payload: new Buffer(8).fill(0x12) + }) + }) + }) + + describe('roundtrip', () => { + it('decode than encode should equal original', () => { + const buffer = Buffer.from('e298a218000000c042f15c9654050010abef8b5398f0d41212121212121212', 'hex') + expect(encodeFrame(decodeFrame(buffer))).toEqual(buffer) + }) + + it('encode than decode should equal original', () => { + const frame = { + hash: hashType('message.input.sensors'), + timestampInMicroseconds: 1500379664696000, + payload: new Buffer(8).fill(0x12), + } + expect(decodeFrame(encodeFrame(frame))).toEqual(frame) + }) + }) +}) diff --git a/src/server/nuclearnet/direct_nuclearnet_client.ts b/src/server/nuclearnet/direct_nuclearnet_client.ts index 8575584b..14a95726 100644 --- a/src/server/nuclearnet/direct_nuclearnet_client.ts +++ b/src/server/nuclearnet/direct_nuclearnet_client.ts @@ -38,6 +38,10 @@ export class DirectNUClearNetClient implements NUClearNetClient { return () => this.nuclearNetwork.removeListener(event, cb) } + public onPacket(cb: NUClearPacketListener): () => void { + return this.on('nuclear_packet', cb) + } + public send(options: NUClearNetSend): void { this.nuclearNetwork.send(options) } diff --git a/src/server/nuclearnet/fake_nuclearnet_client.ts b/src/server/nuclearnet/fake_nuclearnet_client.ts index 38dcd395..58d3af6a 100644 --- a/src/server/nuclearnet/fake_nuclearnet_client.ts +++ b/src/server/nuclearnet/fake_nuclearnet_client.ts @@ -7,6 +7,7 @@ import { NUClearEventListener } from '../../shared/nuclearnet/nuclearnet_client' import { NUClearPacketListener } from '../../shared/nuclearnet/nuclearnet_client' import { NUClearNetClient } from '../../shared/nuclearnet/nuclearnet_client' import { FakeNUClearNetServer } from './fake_nuclearnet_server' +import { hashType } from './fake_nuclearnet_server' /** * A fake NUClearNetClient, which collaborates with FakeNUClearNetServer. Designed to allow completely offline @@ -70,15 +71,26 @@ export class FakeNUClearNetClient implements NUClearNetClient { } public on(event: string, cb: NUClearPacketListener): () => void { + const hash = hashType(event).toString('hex') const listener = (packet: NUClearNetPacket) => { if (this.connected) { cb(packet) } } - this.events.on(event, listener) + this.events.on(hash, listener) return () => this.events.removeListener(event, listener) } + public onPacket(cb: NUClearPacketListener): () => void { + const listener = (packet: NUClearNetPacket) => { + if (this.connected) { + cb(packet) + } + } + this.events.on('nuclear_packet', listener) + return () => this.events.removeListener('nuclear_packet', listener) + } + public send(options: NUClearNetSend): void { this.server.send(this, options) } @@ -92,7 +104,8 @@ export class FakeNUClearNetClient implements NUClearNetClient { this.events.emit('nuclear_leave', peer) } - public fakePacket(event: string, packet: NUClearNetPacket) { - this.events.emit(event, packet) + public fakePacket(hash: string, packet: NUClearNetPacket) { + this.events.emit(hash, packet) + this.events.emit('nuclear_packet', packet) } } diff --git a/src/server/nuclearnet/fake_nuclearnet_server.ts b/src/server/nuclearnet/fake_nuclearnet_server.ts index 0fe39ce4..6a15a9a5 100644 --- a/src/server/nuclearnet/fake_nuclearnet_server.ts +++ b/src/server/nuclearnet/fake_nuclearnet_server.ts @@ -1,14 +1,14 @@ import * as EventEmitter from 'events' import { NUClearNetSend } from 'nuclearnet.js' +import * as XXH from 'xxhashjs' import { createSingletonFactory } from '../../shared/base/create_singleton_factory' import { FakeNUClearNetClient } from './fake_nuclearnet_client' -import * as XXH from 'xxhashjs' /** * A fake in-memory NUClearNet 'server' which routes messages between each FakeNUClearNetClient. * * All messages are 'reliable' in that nothing is intentially dropped. - * Targetted messages are supported. + * Targeted messages are supported. */ export class FakeNUClearNetServer { private events: EventEmitter @@ -57,33 +57,33 @@ export class FakeNUClearNetServer { } public send(client: FakeNUClearNetClient, opts: NUClearNetSend) { - if (typeof opts.type === 'string') { - const packet = { - peer: client.peer, - type: opts.type, - hash: this.hash(opts.type), - payload: opts.payload, - reliable: !!opts.reliable, - } + const hash: Buffer = typeof opts.type === 'string' ? hashType(opts.type) : opts.type + const packet = { + peer: client.peer, + type: typeof opts.type === 'string' ? opts.type : undefined, + hash, + payload: opts.payload, + reliable: !!opts.reliable, + } - /* - * This list intentially includes the sender unless explicitly targeting another peer. This matches the real - * NUClearNet behaviour. - */ - const targetClients = opts.target === undefined - ? this.clients - : this.clients.filter(otherClient => otherClient.peer.name === opts.target) + /* + * This list intentionally includes the sender unless explicitly targeting another peer. This matches the real + * NUClearNet behaviour. + */ + const targetClients = opts.target === undefined + ? this.clients + : this.clients.filter(otherClient => otherClient.peer.name === opts.target) - for (const client of targetClients) { - client.fakePacket(opts.type, packet) - } + const hashString = hash.toString('hex') + for (const client of targetClients) { + client.fakePacket(hashString, packet) } } +} - private hash(input: string): Buffer { - // Matches hashing implementation from NUClearNet - // See https://goo.gl/6NDPo2 - const hashString: string = XXH.h64(input, 0x4e55436c).toString(16) - return Buffer.from((hashString.match(/../g) as string[]).reverse().join(''), 'hex') - } +export function hashType(type: string): Buffer { + // Matches hashing implementation from NUClearNet + // See https://goo.gl/6NDPo2 + const hashString: string = XXH.h64(type, 0x4e55436c).toString(16) + return Buffer.from((hashString.match(/../g) as string[]).reverse().join(''), 'hex') } diff --git a/src/server/nuclearnet/tests/fake_nuclearnet_client.tests.ts b/src/server/nuclearnet/tests/fake_nuclearnet_client.tests.ts index a4d3202f..edfc2e2b 100644 --- a/src/server/nuclearnet/tests/fake_nuclearnet_client.tests.ts +++ b/src/server/nuclearnet/tests/fake_nuclearnet_client.tests.ts @@ -178,6 +178,42 @@ describe('FakeNUClearNetClient', () => { })) }) + it('receives messages sent from other clients', () => { + bob.connect({ name: 'bob' }) + alice.connect({ name: 'alice' }) + eve.connect({ name: 'eve' }) + + const bobOnPacket = jest.fn() + bob.onPacket(bobOnPacket) + + const aliceOnPacket = jest.fn() + alice.onPacket(aliceOnPacket) + + const eveOnSensors = jest.fn() + eve.on('sensors', eveOnSensors) + + const payload = new Buffer(8) + eve.send({ type: 'sensors', payload }) + + expect(bobOnPacket).toHaveBeenCalledTimes(1) + expect(bobOnPacket).toHaveBeenLastCalledWith(expect.objectContaining({ + payload, + peer: expect.objectContaining({ name: 'eve' }), + })) + + expect(aliceOnPacket).toHaveBeenCalledTimes(1) + expect(aliceOnPacket).toHaveBeenLastCalledWith(expect.objectContaining({ + payload, + peer: expect.objectContaining({ name: 'eve' }), + })) + + expect(eveOnSensors).toHaveBeenCalledTimes(1) + expect(eveOnSensors).toHaveBeenLastCalledWith(expect.objectContaining({ + payload, + peer: expect.objectContaining({ name: 'eve' }), + })) + }) + it('only receives targetted messages if they are the target', () => { bob.connect({ name: 'bob' }) alice.connect({ name: 'alice' }) diff --git a/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts b/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts index a4540913..71979026 100644 --- a/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts +++ b/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts @@ -9,10 +9,6 @@ import { WebSocket } from './web_socket_server' import { NodeSystemClock } from '../time/node_clock' import { Clock } from '../time/clock' -type Opts = { - fakeNetworking: boolean -} - /** * The server component of a NUClearNet proxy running over web sockets. Acts as a gateway to the NUClear network. * All clients currently share a single NUClearNet connection, mostly for performance reasons. Could potentially be @@ -23,8 +19,7 @@ export class WebSocketProxyNUClearNetServer { server.onConnection(this.onClientConnection) } - public static of(server: WebSocketServer, { fakeNetworking }: Opts): WebSocketProxyNUClearNetServer { - const nuclearnetClient: NUClearNetClient = fakeNetworking ? FakeNUClearNetClient.of() : DirectNUClearNetClient.of() + public static of(server: WebSocketServer, nuclearnetClient: NUClearNetClient): WebSocketProxyNUClearNetServer { return new WebSocketProxyNUClearNetServer(server, nuclearnetClient) } @@ -127,7 +122,7 @@ class PacketProcessor { // The maximum number of packets of a unique type to send before receiving acknowledgements. private limit: number - // The number of milliseconds before giving up on an acknowledge + // The number of seconds before giving up on an acknowledge private timeout: number constructor(private socket: WebSocket, @@ -139,7 +134,7 @@ class PacketProcessor { } public static of(socket: WebSocket) { - return new PacketProcessor(socket, NodeSystemClock, { limit: 1, timeout: 5000 }) + return new PacketProcessor(socket, NodeSystemClock, { limit: 1, timeout: 5 }) } public onPacket(event: string, packet: NUClearNetPacket) { diff --git a/src/server/nusight_server.ts b/src/server/nusight_server.ts new file mode 100644 index 00000000..9c44bc74 --- /dev/null +++ b/src/server/nusight_server.ts @@ -0,0 +1,81 @@ +import { NUClearNetPeer } from 'nuclearnet.js' +import { NUClearNetClient } from '../shared/nuclearnet/nuclearnet_client' +import { NbsPlaybackController } from './nbs/nbs_playback_controller' +import { NbsRecorderController } from './nbs/nbs_recorder_controller' +import { WebSocketServer } from './nuclearnet/web_socket_server' +import { WebSocket } from './nuclearnet/web_socket_server' +import { Clock } from './time/clock' +import { NodeSystemClock } from './time/node_clock' + +export class NUsightServer { + public constructor(private server: WebSocketServer, private nuclearnetClient: NUClearNetClient) { + server.onConnection(this.onClientConnection) + } + + public static of(server: WebSocketServer, nuclearnetClient: NUClearNetClient): NUsightServer { + return new NUsightServer(server, nuclearnetClient) + } + + private onClientConnection = (socket: WebSocket) => { + NUsightServerClient.of(socket, this.nuclearnetClient) + } +} + +class NUsightServerClient { + private stopRecordingMap: Map void> + + public constructor(private socket: WebSocket, private clock: Clock, private nuclearnetClient: NUClearNetClient) { + this.stopRecordingMap = new Map() + + this.socket.on('record', this.onRecord) + this.socket.on('unrecord', this.onUnrecord) + + this.socket.on('play', this.onPlay) + this.socket.on('pause', this.onPause) + this.socket.on('resume', this.onResume) + this.socket.on('stop', this.onStop) + } + + public static of(socket: WebSocket, nuclearnetClient: NUClearNetClient): NUsightServerClient { + return new NUsightServerClient(socket, NodeSystemClock, nuclearnetClient) + } + + private onRecord = (peer: NUClearNetPeer, requestToken: string) => { + const recorder = NbsRecorderController.of(peer, this.nuclearnetClient) + const filename = `${peer.name.replace(/[^A-Za-z0-9]/g, '_')}_${this.clock.now()}.nbs` + console.log('recording', peer, requestToken) + const stopRecording = recorder.record(`recordings/${filename}`) + this.stopRecordingMap.set(requestToken, stopRecording) + } + + private onUnrecord = (requestToken: string) => { + const stopRecording = this.stopRecordingMap.get(requestToken) + if (stopRecording) { + console.log('stop recording', requestToken) + stopRecording() + } + } + + private onPlay = (filename: string, requestToken: string) => { + const player = NbsPlaybackController.of(this.nuclearnetClient) + console.log('playing', filename, requestToken) + const stopPlaying = player.play(`recordings/${filename}`) + this.stopRecordingMap.set(requestToken, stopPlaying) + } + + private onPause = () => { + + } + + private onResume = () => { + + } + + private onStop = (requestToken: string) => { + const stopRecording = this.stopRecordingMap.get(requestToken) + if (stopRecording) { + console.log('stop recording', requestToken) + stopRecording() + } + } +} diff --git a/src/server/prod.ts b/src/server/prod.ts index 53b448f3..c14faf2e 100644 --- a/src/server/prod.ts +++ b/src/server/prod.ts @@ -5,8 +5,10 @@ import * as http from 'http' import * as minimist from 'minimist' import * as favicon from 'serve-favicon' import * as sio from 'socket.io' -import { VirtualRobots } from '../simulators/virtual_robots' import { SensorDataSimulator } from '../simulators/sensor_data_simulator' +import { VirtualRobots } from '../simulators/virtual_robots' +import { DirectNUClearNetClient } from './nuclearnet/direct_nuclearnet_client' +import { FakeNUClearNetClient } from './nuclearnet/fake_nuclearnet_client' import { WebSocketProxyNUClearNetServer } from './nuclearnet/web_socket_proxy_nuclearnet_server' import { WebSocketServer } from './nuclearnet/web_socket_server' @@ -40,6 +42,6 @@ if (withSimulators) { virtualRobots.simulateWithFrequency(60) } -WebSocketProxyNUClearNetServer.of(WebSocketServer.of(sioNetwork.of('/nuclearnet')), { - fakeNetworking: withSimulators, -}) +const nuclearnetClient = withSimulators ? FakeNUClearNetClient.of() : DirectNUClearNetClient.of() + +WebSocketProxyNUClearNetServer.of(WebSocketServer.of(sioNetwork.of('/nuclearnet')), nuclearnetClient) diff --git a/src/server/tests/fake_nbs_binary_stream.ts b/src/server/tests/fake_nbs_binary_stream.ts new file mode 100644 index 00000000..14891b91 --- /dev/null +++ b/src/server/tests/fake_nbs_binary_stream.ts @@ -0,0 +1,19 @@ +import * as stream from 'stream' + +export class FakeNbsStream extends stream.PassThrough { + public generate(numFrames: number) { + for (let i = 0; i < numFrames; i++) { + const buffer = Buffer.from('e298a218000000c042f15c9654050010abef8b5398f0d41212121212121212', 'hex') + this.write(buffer) + } + this.end() + } + + public generatewithGarbage(numFrames: number) { + for (let i = 0; i < numFrames; i++) { + const buffer = Buffer.from('c96540e298a218000000c042f15c9654050010abef8b5398f0d41212121212121212f350ab21', 'hex') + this.write(buffer) + } + this.end() + } +} diff --git a/src/server/tests/nbs_nuclear_writeable_stream.tests.ts b/src/server/tests/nbs_nuclear_writeable_stream.tests.ts new file mode 100644 index 00000000..5775e980 --- /dev/null +++ b/src/server/tests/nbs_nuclear_writeable_stream.tests.ts @@ -0,0 +1,99 @@ +import * as fs from 'fs' +import * as stream from 'stream' +import { Stream } from 'stream' +import { NUClearNetClient } from '../../shared/nuclearnet/nuclearnet_client' +import { NbsFrameChunker } from '../nbs/nbs_frame_chunker' +import { NbsFrameDecoder } from '../nbs/nbs_frame_streams' +import { NbsNUClearPlayback } from '../nbs/nbs_nuclear_playback' +import { FakeNUClearNetClient } from '../nuclearnet/fake_nuclearnet_client' +import { FakeNUClearNetServer } from '../nuclearnet/fake_nuclearnet_server' +import { FakeNodeClock } from '../time/fake_node_clock' +import { FakeNbsStream } from './fake_nbs_binary_stream' + +describe('NbsFrameChunker', () => { + let transform: stream.Transform + + beforeEach(() => { + transform = new NbsFrameChunker() + }) + + it.skip('finds 6988 frames within binary stream', done => { + const file = fs.createReadStream('/Users/brendan/Lab/NUsight2/recordings/darwin3_WalkAround.nbs') + const spy = jest.fn() + file.pipe(transform).on('data', spy).on('finish', () => { + expect(spy).toHaveBeenCalledTimes(6988) + done() + }) + }) +}) + +describe('NbsNUClearPlayback', () => { + let stream: Stream + let nuclearnetClient: NUClearNetClient + + beforeEach(() => { + const nuclearnetServer = new FakeNUClearNetServer() + nuclearnetClient = new FakeNUClearNetClient(nuclearnetServer) + + stream = fs.createReadStream('/Users/brendan/Lab/NUsight2/recordings/darwin3_WalkAround.nbs') + .pipe(new NbsFrameChunker()) + .pipe(new NbsFrameDecoder()) + + }) + + it.skip('sends 6988 messages to NUClearNet', done => { + const fakeClock = FakeNodeClock.of() + jest.spyOn(nuclearnetClient, 'send') + stream + .on('data', () => { + // Ensure that all timers instantly run after each chunk is received. + // Run on the next tick to allow NbsNUClearPlayback to schedule the timers first. + process.nextTick(() => fakeClock.runAllTimers()) + }) + .pipe(new NbsNUClearPlayback(nuclearnetClient, fakeClock)) + .on('finish', () => { + expect(nuclearnetClient.send).toHaveBeenCalledTimes(6988) + done() + }) + }) + + it('sends all generated messages to NUClearNet', done => { + const fakeClock = FakeNodeClock.of() + jest.spyOn(nuclearnetClient, 'send') + const stream = new FakeNbsStream() + stream + .pipe(new NbsFrameChunker()) + .pipe(new NbsFrameDecoder()) + .on('data', () => { + // Ensure that all timers instantly run after each chunk is received. + // Run on the next tick to allow NbsNUClearPlayback to schedule the timers first. + process.nextTick(() => fakeClock.runAllTimers()) + }) + .pipe(new NbsNUClearPlayback(nuclearnetClient, fakeClock)) + .on('finish', () => { + expect(nuclearnetClient.send).toHaveBeenCalledTimes(1000) + done() + }) + stream.generate(1000) + }) + + it('can handle garbage', done => { + const fakeClock = FakeNodeClock.of() + jest.spyOn(nuclearnetClient, 'send') + const stream = new FakeNbsStream() + stream + .pipe(new NbsFrameChunker()) + .pipe(new NbsFrameDecoder()) + .on('data', () => { + // Ensure that all timers instantly run after each chunk is received. + // Run on the next tick to allow NbsNUClearPlayback to schedule the timers first. + process.nextTick(() => fakeClock.runAllTimers()) + }) + .pipe(new NbsNUClearPlayback(nuclearnetClient, fakeClock)) + .on('finish', () => { + expect(nuclearnetClient.send).toHaveBeenCalledTimes(1000) + done() + }) + stream.generatewithGarbage(1000) + }) +}) diff --git a/src/server/time/clock.ts b/src/server/time/clock.ts index 04507fd7..db9ee7e1 100644 --- a/src/server/time/clock.ts +++ b/src/server/time/clock.ts @@ -1,6 +1,7 @@ export interface Clock { now(): number - setTimeout(cb: (...args: any[]) => void, ms: number): () => void - setInterval(cb: (...args: any[]) => void, ms: number): () => void + performanceNow(): number + setTimeout(cb: (...args: any[]) => void, seconds: number): () => void + setInterval(cb: (...args: any[]) => void, seconds: number): () => void setImmediate(cb: (...args: any[]) => void): () => void } diff --git a/src/server/time/fake_node_clock.ts b/src/server/time/fake_node_clock.ts new file mode 100644 index 00000000..40e096e3 --- /dev/null +++ b/src/server/time/fake_node_clock.ts @@ -0,0 +1,124 @@ +import { Clock } from './clock' + +type Task = { + id: number, + nextTime: number, + period?: number, + fn(): void, +} + +export class FakeNodeClock implements Clock { + private nextId: number + private time: number + private tasks: Task[] + + constructor() { + this.nextId = 0 + this.time = 0 + this.tasks = [] + } + + public static of() { + return new FakeNodeClock() + } + + public now(): number { + return this.time + } + + public performanceNow(): number { + return this.time + } + + public setTimeout(fn: () => void, seconds: number): () => void { + const id = this.nextId++ + this.addTask({ id, nextTime: this.now() + seconds, fn }) + return () => this.removeTask(id) + } + + public setInterval(fn: () => void, seconds: number): () => void { + const id = this.nextId++ + this.addTask({ id, nextTime: this.now() + seconds, period: seconds, fn }) + return () => this.removeTask(id) + } + + public setImmediate(fn: () => void): () => void { + const id = this.nextId++ + this.addTask({ id, nextTime: this.now() + 1, fn }) + return () => this.removeTask(id) + } + + public tick(delta: number = 1) { + const newTime = this.now() + delta + + while (this.tasks.length > 0 && this.tasks[0].nextTime <= newTime) { + const task = this.tasks[0] + this.consumeTask(task) + } + + this.time = newTime + } + + public runAllTimers() { + const limit = 1000 + let i = 0 + + while (this.tasks.length > 0) { + if (i > limit) { + throw new Error(`Exceeded clock task limit of ${limit}, possibly caused by a infinite task loop?`) + } + const task = this.tasks[0] + this.consumeTask(task) + i++ + } + } + + public runOnlyPendingTimers() { + const limit = 1000 + let i = 0 + + while (this.tasks.length > 0 && this.now() <= this.tasks[this.tasks.length - 1].nextTime) { + if (i > limit) { + throw new Error(`Exceeded clock task limit of ${limit}, possibly caused by a infinite task loop?`) + } + const task = this.tasks[0] + this.consumeTask(task) + i++ + } + } + + public runTimersToTime() { + throw new Error('Not implemented') + } + + private addTask(task: Task) { + this.tasks.push(task) + this.sortTasks() + } + + private sortTasks() { + this.tasks.sort((t1, t2) => { + return t1.nextTime - t2.nextTime + }) + } + + private consumeTask(task: Task) { + this.time = task.nextTime + if (task.period != null) { + task.nextTime += task.period + this.sortTasks() + } else { + this.tasks.shift() + } + task.fn() + } + + private removeTask(taskId: number) { + for (let i = 0; i < this.tasks.length; i++) { + if (this.tasks[i].id == taskId) { + this.tasks.splice(i, 1) + break + } + } + } +} diff --git a/src/server/time/node_clock.ts b/src/server/time/node_clock.ts index 64238432..cca3cc9e 100644 --- a/src/server/time/node_clock.ts +++ b/src/server/time/node_clock.ts @@ -2,13 +2,13 @@ import { Clock } from './clock' export type CancelTimer = () => void -function setTimeout(cb: (...args: any[]) => void, ms: number): CancelTimer { - const handle = global.setTimeout(cb, ms) +function setTimeout(cb: (...args: any[]) => void, seconds: number): CancelTimer { + const handle = global.setTimeout(cb, seconds * 1e3) return global.clearTimeout.bind(null, handle) } -function setInterval(cb: (...args: any[]) => void, ms: number): CancelTimer { - const handle = global.setInterval(cb, ms) +function setInterval(cb: (...args: any[]) => void, seconds: number): CancelTimer { + const handle = global.setInterval(cb, seconds * 1e3) return global.clearInterval.bind(null, handle) } @@ -17,8 +17,15 @@ function setImmediate(cb: (...args: any[]) => void): CancelTimer { return global.clearImmediate.bind(null, handle) } +function performanceNow() { + const t = process.hrtime() + return t[0] + t[1] * 1e-9 +} + + export const NodeSystemClock: Clock = { - now: () => Date.now(), + now: () => Date.now() * 1e-3, + performanceNow, setTimeout, setInterval, setImmediate, diff --git a/src/shared/nuclearnet/nuclearnet_client.ts b/src/shared/nuclearnet/nuclearnet_client.ts index 033c3e4e..eb18ca6a 100644 --- a/src/shared/nuclearnet/nuclearnet_client.ts +++ b/src/shared/nuclearnet/nuclearnet_client.ts @@ -12,5 +12,6 @@ export interface NUClearNetClient { onJoin(cb: NUClearEventListener): () => void onLeave(cb: NUClearEventListener): () => void on(event: string, cb: NUClearPacketListener): () => void + onPacket(cb: NUClearPacketListener): () => void send(options: NUClearNetSend): void } diff --git a/src/simulators/sensor_data_simulator.ts b/src/simulators/sensor_data_simulator.ts index 6bd363f4..28946002 100644 --- a/src/simulators/sensor_data_simulator.ts +++ b/src/simulators/sensor_data_simulator.ts @@ -16,10 +16,10 @@ export class SensorDataSimulator implements Simulator { const messageType = 'message.input.Sensors' // Simulate a walk - const t = time * 5E-3 + index + const t = time + index - const angle = index * (2 * Math.PI) / numRobots + time / 4E4 - const distance = Math.cos(time / 1E3 + 4 * index) * 0.3 + 1 + const angle = index * (2 * Math.PI) / numRobots + time / 4E1 + const distance = Math.cos(time + 4 * index) * 0.3 + 1 const x = distance * Math.cos(angle) const y = distance * Math.sin(angle) const heading = -angle - Math.PI / 2 diff --git a/src/simulators/virtual_robot.ts b/src/simulators/virtual_robot.ts index e6a4d1b3..feb9cbf1 100644 --- a/src/simulators/virtual_robot.ts +++ b/src/simulators/virtual_robot.ts @@ -32,7 +32,7 @@ export class VirtualRobot { public simulateWithFrequency(frequency: number, index: number, numRobots: number) { const disconnect = this.connect() - const period = 1000 / frequency + const period = 1 / frequency const cancelLoop = this.clock.setInterval(() => this.simulate(index, numRobots), period) return () => { diff --git a/src/validator/validate.ts b/src/validator/validate.ts new file mode 100644 index 00000000..08c9cce4 --- /dev/null +++ b/src/validator/validate.ts @@ -0,0 +1,75 @@ +import * as fs from 'fs' +import * as minimist from 'minimist' + +const HEADER = Buffer.from([0xE2, 0x98, 0xA2]) +const HEADER_SIZE = HEADER.byteLength +const REMAINING_LENGTH_SIZE = 4 +const TIMESTAMP_SIZE = 8 +const HASH_SIZE = 8 + +function main() { + const args = minimist(process.argv.slice(2)) + const filename = args.filename + const buffer = fs.readFileSync(filename) + + const packets = readPackets(buffer) + const types = packets.reduce((map: Map, packet) => { + map.set(packet.hash, (map.get(packet.hash) || 0) + 1) + return map + }, new Map()) + + console.log(`Num packets: ${packets.length}`) + console.log('Types:') + console.log(Array.from(types.entries()).map(([hash, occurances]) => `${hash} = ${occurances}`).join('\n')) +} + +function readPackets(buffer: Buffer) { + const packets = [] + let index = 0 + do { + const packet = readPacket(buffer, index) + packets.push(packet) + index = packet.lastIndex + } while (index < buffer.byteLength) + return packets +} + +function readPacket(buffer: Buffer, offset: number) { + let index = findNextHeader(buffer, offset) + // console.log(`Header found at index ${index}`) + index += HEADER_SIZE + + const remainingByteLength = buffer.readUInt32LE(index) + // console.log(`Remaining byte length: ${remainingByteLength}`) + index += REMAINING_LENGTH_SIZE + + const timeHighByte = buffer.readUInt32LE(index) + const timeLowByte = buffer.readUInt32LE(index + 4) + const timestamp = timeHighByte + timeLowByte // TODO (Annable): actually convert + // console.log(`Timestamp: ${timeHighByte} ${timeLowByte} ${timestamp}`) + index += TIMESTAMP_SIZE + + const hash = buffer.slice(index, index + HASH_SIZE).toString('hex') + // console.log(`Hash: ${hash}`) + index += HASH_SIZE + + const payloadByteLength = remainingByteLength - TIMESTAMP_SIZE - HASH_SIZE + // console.log(`Data exists: ${payloadByteLength} ${buffer.byteLength >= index + payloadByteLength}`) + const payload = buffer.slice(index, payloadByteLength) + index += payloadByteLength + + return { remainingByteLength, timestamp, hash, payloadByteLength, payload, lastIndex: index } +} + +function findNextHeader(buffer: Buffer, offset: number) { + for (var i = offset; i < buffer.byteLength; i++) { + if (buffer[i] === HEADER[0] && buffer[i + 1] === HEADER[1] && buffer[i + 2] === HEADER[2]) { + return i + } + } + return -1 +} + +if (require.main === module) { + main() +} diff --git a/tsconfig.json b/tsconfig.json index 5755b861..3b49aa99 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -26,7 +26,8 @@ ], "exclude": [ "build", - "node_modules" + "node_modules", + "**/*.tests.ts" ], "awesomeTypescriptLoaderOptions": { "useWebpackText": true, diff --git a/webpack.config.ts b/webpack.config.ts index a2a1411e..f4d5c6d2 100644 --- a/webpack.config.ts +++ b/webpack.config.ts @@ -47,11 +47,11 @@ export default { { test: /\.tsx?$/, use: isProduction - ? 'awesome-typescript-loader?module=es6' - : [ - 'react-hot-loader', - 'awesome-typescript-loader', - ], + ? 'awesome-typescript-loader?module=es6' + : [ + 'react-hot-loader', + 'awesome-typescript-loader', + ], }, // local css { @@ -78,15 +78,15 @@ export default { }), }, /* - External libraries generally do not support css modules so the selector mangling will break external components. - This separate simplified loader is used for anything within the node_modules folder instead. - */ + External libraries generally do not support css modules so the selector mangling will break external components. + This separate simplified loader is used for anything within the node_modules folder instead. + */ { test: /\.css$/, include: [ path.resolve(__dirname, 'node_modules'), ], - use: [ 'style-loader', 'css-loader' ], + use: ['style-loader', 'css-loader'], }, { test: /\.svg$/, diff --git a/yarn.lock b/yarn.lock index 3f3f7d6b..bc28dcec 100644 --- a/yarn.lock +++ b/yarn.lock @@ -76,6 +76,12 @@ dependencies: "@types/babel-types" "*" +"@types/buffers@^0.1.30": + version "0.1.30" + resolved "https://registry.yarnpkg.com/@types/buffers/-/buffers-0.1.30.tgz#8764e8425ecc52fe131e7b531b5fc100c93bfa10" + dependencies: + "@types/node" "*" + "@types/classnames@^2.2.0": version "2.2.0" resolved "https://registry.yarnpkg.com/@types/classnames/-/classnames-2.2.0.tgz#f2312039e780bdf89d7d4102a26ec11de5ec58aa" @@ -1369,6 +1375,10 @@ buffer@^4.3.0: ieee754 "^1.1.4" isarray "^1.0.0" +buffers@^0.1.1: + version "0.1.1" + resolved "https://registry.yarnpkg.com/buffers/-/buffers-0.1.1.tgz#b24579c3bed4d6d396aeee6d9a8ae7f5482ab7bb" + builtin-modules@^1.0.0: version "1.1.1" resolved "https://registry.yarnpkg.com/builtin-modules/-/builtin-modules-1.1.1.tgz#270f076c5a72c02f5b65a47df94c5fe3a278892f" @@ -4475,9 +4485,9 @@ nth-check@~1.0.1: dependencies: boolbase "~1.0.0" -nuclearnet.js@^1.2.0: - version "1.2.0" - resolved "https://registry.yarnpkg.com/nuclearnet.js/-/nuclearnet.js-1.2.0.tgz#a26c83d7495974c87dff1817a4eade8d8e012abe" +nuclearnet.js@^1.4.2: + version "1.4.2" + resolved "https://registry.yarnpkg.com/nuclearnet.js/-/nuclearnet.js-1.4.2.tgz#9a25ea04ac84d866e92bf1851b1754229c6e173c" dependencies: bindings "^1.2.1" nan "^2.0.0"