From 50598a34fc1ad14a8174138748bfd4592d093492 Mon Sep 17 00:00:00 2001 From: Rishi Panthee Date: Mon, 3 Apr 2023 23:12:50 -0500 Subject: [PATCH 1/2] Swap console.log to node-color-log --- client.ts | 22 ++++---- package-lock.json | 11 ++++ package.json | 1 + src/api/gateway.controller.ts | 13 ++--- src/api/graphql/resolvers.ts | 3 +- src/common/utils.ts | 7 +-- src/index.ts | 10 ++-- src/modules/core/encoder.service.ts | 47 ++++++++--------- src/modules/core/gateway.service.ts | 33 ++++++------ src/modules/core/gatewayClient.service.ts | 57 +++++++++++---------- src/modules/core/identity.service.ts | 3 +- src/modules/core/misc/discordbot.service.ts | 5 +- src/modules/libp2p/libp2p.service.ts | 25 ++++----- 13 files changed, 127 insertions(+), 110 deletions(-) diff --git a/client.ts b/client.ts index 4d52236..317454a 100644 --- a/client.ts +++ b/client.ts @@ -17,6 +17,7 @@ import { MESSAGE_TYPES } from './src/modules/libp2p/messages.model' import { TileDocument } from '@ceramicnetwork/stream-tile' import { EncodeStatus } from './src/modules/encoder.model' import logUpdate from 'log-update'; +import logger from 'node-color-log' import cli from 'cli-ux' @@ -29,8 +30,7 @@ const ClientKey = void (async () => { - - console.log('P2P interface starting up') + logger.info('P2P interface starting up') const ceramic = new CeramicClient('https://ceramic-clay.3boxlabs.com') //Using the public node for now. const idListener = await PeerId.createFromJSON(ClientKey) @@ -80,21 +80,18 @@ void (async () => { // start libp2p await libp2p.start() - console.log(libp2p.peerId.toJSON()) - setInterval(() => { - //console.log(libp2p.connections.size) - }, 5000) + logger.info('LibP2P Peer ID', libp2p.peerId.toJSON()) const handler = async ({ connection, stream, protocol }) => { // use stream or connection according to the needs - console.log(connection, stream, protocol) + logger.info(connection, stream, protocol) for await(let item of stream.source) { - console.log(item) + logger.info(item) } } libp2p.handle('/spk-video-encoder/1.0.0', handler) const output = await libp2p.dialProtocol("/ip4/10.0.1.188/tcp/14445/p2p/QmctF7GPunpcLu3Fn77Ypo657TA9GpTiipSDUUMoD2k5Kq", '/spk-video-encoder/1.0.0') - console.log(output) + logger.info(output) cli.action.start('Encoding Video') @@ -107,7 +104,7 @@ void (async () => { void (async () => { for await(let item of output.stream.source) { const decodedMessage = decode(item._bufs[0]) - console.log(decodedMessage) + logger.info(decodedMessage) if(decodedMessage.type === MESSAGE_TYPES.RESPONE_ENCODE_JOB) { encodeId = decodedMessage.streamId pushable.push(encode({ @@ -119,7 +116,7 @@ void (async () => { await encodeDoc.sync() const contentData = encodeDoc.content as any; if(contentData.status === EncodeStatus.COMPLETE) { - console.log(`Job complete, IPFS Hash is ${contentData.outCid}`) + logger.info(`Job complete, IPFS Hash is ${contentData.outCid}`) cli.action.stop() process.exit(0) } @@ -130,10 +127,9 @@ void (async () => { const pushable = Pushable() pipe(pushable, output.stream) pushable.push(encode({ - type: MESSAGE_TYPES.REQUEST_ENCODE_JOB, ipfsHash: 'Qma9ZjjtH7fdLWSrMU43tFihvSN59aQdes7f5KW6vGGk6e' })) //pushable.end() - //console.log(output.stream.close()) + //logger.info(output.stream.close()) })() diff --git a/package-lock.json b/package-lock.json index 95ffeb7..fe85a18 100644 --- a/package-lock.json +++ b/package-lock.json @@ -52,6 +52,7 @@ "libp2p-tcp": "^0.17.2", "moment": "^2.29.3", "mongodb": "^4.2.2", + "node-color-log": "^10.0.2", "node-graceful-shutdown": "^1.1.0", "node-schedule": "^2.1.0", "nodejs-file-downloader": "^4.9.3", @@ -10849,6 +10850,11 @@ "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-2.0.2.tgz", "integrity": "sha512-Ntyt4AIXyaLIuMHF6IOoTakB3K+RWxwtsHNRxllEoA6vPwP9o4866g6YWDLUdnucilZhmkxiHwHr11gAENw+QA==" }, + "node_modules/node-color-log": { + "version": "10.0.2", + "resolved": "https://registry.npmjs.org/node-color-log/-/node-color-log-10.0.2.tgz", + "integrity": "sha512-mSIv0cguSaEo0kbHkcPs3bohnFPq0uTBqMjwFmlF6YGRPXUPmExorLNaUEinijLV3cokZNKWcX2ieZiKoHftSw==" + }, "node_modules/node-fetch": { "version": "2.6.1", "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.6.1.tgz", @@ -22612,6 +22618,11 @@ "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-2.0.2.tgz", "integrity": "sha512-Ntyt4AIXyaLIuMHF6IOoTakB3K+RWxwtsHNRxllEoA6vPwP9o4866g6YWDLUdnucilZhmkxiHwHr11gAENw+QA==" }, + "node-color-log": { + "version": "10.0.2", + "resolved": "https://registry.npmjs.org/node-color-log/-/node-color-log-10.0.2.tgz", + "integrity": "sha512-mSIv0cguSaEo0kbHkcPs3bohnFPq0uTBqMjwFmlF6YGRPXUPmExorLNaUEinijLV3cokZNKWcX2ieZiKoHftSw==" + }, "node-fetch": { "version": "2.6.1", "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.6.1.tgz", diff --git a/package.json b/package.json index 5061fe3..4cf0a16 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ "libp2p-tcp": "^0.17.2", "moment": "^2.29.3", "mongodb": "^4.2.2", + "node-color-log": "^10.0.2", "node-graceful-shutdown": "^1.1.0", "node-schedule": "^2.1.0", "nodejs-file-downloader": "^4.9.3", diff --git a/src/api/gateway.controller.ts b/src/api/gateway.controller.ts index 542d1e0..38ef65d 100644 --- a/src/api/gateway.controller.ts +++ b/src/api/gateway.controller.ts @@ -4,6 +4,7 @@ */ import { BadRequestException, Body, HttpCode, HttpStatus, Post, Put, Query } from '@nestjs/common' import { Controller, Get, Param } from '@nestjs/common' +import logger from 'node-color-log' import { EncoderService } from '../modules/core/encoder.service' import { JobStatus } from '../modules/encoder.model' import { encoderContainer } from './index' @@ -17,9 +18,9 @@ export class GatewayApiController { @Post('/updateNode') async updateNode(@Body() body) { - console.log(body) + logger.info(body) const { payload, did } = await unwrapJWS(body.jws) - console.log(payload, did) + logger.info(payload, did) encoderContainer.self.gateway.updateNode(did, payload.node_info) } @@ -46,7 +47,7 @@ export class GatewayApiController { @Get('/nodeJobs/:nodeId') async nodeJobs(@Param('nodeId') nodeId) { - console.log(nodeId) + logger.info(nodeId) } /** @@ -69,7 +70,7 @@ export class GatewayApiController { @Post('/acceptJob') async acceptJob(@Body() body) { const { kid, payload, did } = await unwrapJWS(body.jws) - console.log(kid, payload, did) + logger.info(kid, payload, did) await encoderContainer.self.gateway.acceptJob(payload.job_id, did) @@ -88,7 +89,7 @@ export class GatewayApiController { @Post('/failJob') async failJob(@Body() body) { - console.log(body) + logger.info(body) const {payload, did} = await unwrapJWS(body.jws) await encoderContainer.self.gateway.rejectJob(payload.job_id, did) @@ -148,7 +149,7 @@ export class GatewayApiController { @Get('/sync') async syncFile(@Param('job_id') job_id) { - console.log(job_id) + logger.info(job_id) } @Get('jobstatus/:job_id') diff --git a/src/api/graphql/resolvers.ts b/src/api/graphql/resolvers.ts index 966ad42..856dd1e 100644 --- a/src/api/graphql/resolvers.ts +++ b/src/api/graphql/resolvers.ts @@ -2,6 +2,7 @@ import moment from "moment"; import { encoderContainer } from ".." import { peerList } from "../../common/utils"; import {JobReason, JobStatus} from '../../modules/encoder.model' +import logger from 'node-color-log' class JobInfo { data: any; @@ -102,7 +103,7 @@ export const Resolvers = { } } - //console.log('preferred_nodes', preferred_nodes, scoreMap) + //logger.info('preferred_nodes', preferred_nodes, scoreMap) const nodeInfo = await encoderContainer.self.gateway.clusterNodes.findOne({ node_id: node_id diff --git a/src/common/utils.ts b/src/common/utils.ts index 90a26f1..64e61c6 100644 --- a/src/common/utils.ts +++ b/src/common/utils.ts @@ -1,4 +1,5 @@ import Axios from 'axios' +import logger from 'node-color-log' /** * @template {Object} T @@ -99,15 +100,15 @@ const ndjsonParse = async function* (stream) { const stream = response.data; // stream.on('data', data => { -// console.log(data); +// logger.info(data); // }); // stream.on('end', () => { -// console.log("stream done"); +// logger.info("stream done"); // }); let infos = [] for await (const d of ndjsonParse(stream)) { - // console.log(JSON.parse(d.toString())) + // logger.info(JSON.parse(d.toString())) infos.push(toClusterInfo(d)) } return infos diff --git a/src/index.ts b/src/index.ts index aa3459d..87437bb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,6 @@ import {CeramicClient} from '@ceramicnetwork/http-client' import { onShutdown } from "node-graceful-shutdown"; +import logger from 'node-color-log' import { ConfigService } from './config.service' // import IPFSHTTP from 'ipfs-http-client' import { CoreService } from './modules/core/core.service' @@ -12,14 +13,13 @@ async function startup(): Promise { // init ceramic const ceramic = new CeramicClient(ConfigService.getConfig().ceramicHost) //Using the public node for now. - instance = new CoreService(ceramic) await instance.start() const api = new EncoderApiModule(4005, instance) await api.listen() } catch (ex) { - console.log(ex.message) + logger.error(ex.message) await instance.stop() process.exit(0) } @@ -30,13 +30,13 @@ void startup() process.on('unhandledRejection', (error: Error) => { - console.log('unhandledRejection', error) + logger.error('unhandledRejection', error) }) onShutdown(async () => { - console.log('Video encoder stopping... ') + logger.info('Video encoder stopping... ') await instance.stop() - console.log('Exit'); + logger.info('Exit'); }); diff --git a/src/modules/core/encoder.service.ts b/src/modules/core/encoder.service.ts index 2a011dd..0a91916 100644 --- a/src/modules/core/encoder.service.ts +++ b/src/modules/core/encoder.service.ts @@ -16,6 +16,7 @@ import URL from 'url' import Downloader from 'nodejs-file-downloader' import execa from 'execa' import moment from 'moment' +import logger from 'node-color-log' PouchDB.plugin(PouchdbFind) PouchDB.plugin(PouchdbUpsert) @@ -110,7 +111,7 @@ const tutils = { res = res[0] res = res.split('x') } else { - console.log('RES IS NULL , ', res) + logger.info('RES IS NULL , ', res) return null } let width = parseInt(res[0]) @@ -140,10 +141,10 @@ export class EncoderService { try { // const tileDoc = await TileDocument.load(this.self.ceramic, streamId) // const content = tileDoc.content - console.log('updateJob - function', updateData) + logger.info('updateJob - function', updateData) await this.self.gatewayClient.dbQueue.add(async () => { await this.pouch.upsert(job_id, (doc) => { - console.log('updateJob - crdt', updateData) + logger.info('updateJob - crdt', updateData) for (let [key, value] of Object.entries(updateData)) { doc[key] = value } @@ -163,7 +164,7 @@ export class EncoderService { streamId: docNew.streamId, }) } catch (ex) { - console.log(ex) + logger.error(ex) } } async executeJobRaw(jobInfo, streamId) { @@ -197,12 +198,12 @@ export class EncoderService { // downloadProgress.stderr.pipe(process.stdout) for await(let chunk of downloadProcess.stderr) { const outArray = chunk.toString().split(' ') - // console.log(outArray) + // logger.info(outArray) const percentage = outArray.find(e => e.includes('%')); if(percentage) { const pctArray = percentage.split('%') if(Number(pctArray[0]) !== 0) { - // console.log(pctArray[0]) + // logger.info(pctArray[0]) download_pct = Number(pctArray[0]) } } @@ -210,7 +211,7 @@ export class EncoderService { } await downloadProcess clearInterval(slowUpdate) - // console.log(stdout) + // logger.info(stdout) // const downloader = new Downloader({ // url: sourceUrl, // directory: downloadFolder, @@ -218,7 +219,7 @@ export class EncoderService { // maxAttempts: 6, //Default is 1. // onError: function (error) { // //You can also hook into each failed attempt. - // console.log("Error from attempt ", error); + // logger.info("Error from attempt ", error); // }, // onProgress: (inputPercentage, chunk, remainingSize) => { // //Gets called with each chunk. @@ -232,12 +233,12 @@ export class EncoderService { // clearInterval(slowUpdate) // } catch (error) { // //If all attempts fail, the last error is thrown. - // console.log("Final fail", error); + // logger.info("Final fail", error); // fs.rmdirSync(downloadFolder) // clearInterval(slowUpdate) // throw error; // } - console.log(`Downloaded to `, srcVideo, `in ${new Date().getTime() - startTime.getTime()}ms`) + logger.info(`Downloaded to `, srcVideo, `in ${new Date().getTime() - startTime.getTime()}ms`) var command = ffmpeg(srcVideo) @@ -251,7 +252,7 @@ export class EncoderService { } }*/ if(e) { - console.log(e) + logger.error(e) } if(enc) { for (var key of Object.keys(enc)) { @@ -263,7 +264,7 @@ export class EncoderService { return resolve('libx264') }), ) - console.log(`Info: using ${codec}`) + logger.info(`Info: using ${codec}`) if(codec === "h264_qsv") { command.addOption('-preset', 'slow') command.addOption('-look_ahead', '1') @@ -293,7 +294,7 @@ export class EncoderService { status: EncodeStatus.RUNNING, }) - console.log('Started at', `in ${new Date().getTime() - startTime.getTime()}ms`) + logger.info('Started at', `in ${new Date().getTime() - startTime.getTime()}ms`) let progressPct1 let progress1; @@ -322,7 +323,7 @@ export class EncoderService { ret.on( 'progress', ((progress) => { - //console.log(jobInfo) + //logger.info(jobInfo) for (let key in progress) { progress[key] = progress[key] || 0 } @@ -356,7 +357,7 @@ export class EncoderService { ret .on('end', (stdout, stderr) => { - console.log(stderr) + logger.error(stderr) if(stderr.includes('Invalid data found when processing input')) { return reject('Invalid data found when processing input') } @@ -377,7 +378,7 @@ export class EncoderService { await fs.mkdir(Path.join(workfolder, `${String(profile.size.split('x')[1])}p`)) //ret.save(path.join(workfolder, `${String(size.split('x')[1])}p`, 'index.m3u8')) - console.log(Path.join(workfolder, `${String(profile.size.split('x')[1])}p`, 'index.m3u8')) + logger.info(Path.join(workfolder, `${String(profile.size.split('x')[1])}p`, 'index.m3u8')) ret.addOption(`-segment_format`, 'mpegts') ret.addOption( @@ -400,7 +401,7 @@ export class EncoderService { var manifest = this._generateManifest(codecData, sizes) await fs.writeFile(Path.join(workfolder, 'manifest.m3u8'), manifest) - console.log('Adding IPFS content for job: ', jobInfo.id) + logger.info('Adding IPFS content for job: ', jobInfo.id) try { await this.updateJob(jobInfo.id, { status: EncodeStatus.UPLOADING, @@ -415,7 +416,7 @@ export class EncoderService { await fs.rm(workfolder, {recursive: true, force: true}) await fs.rm(downloadFolder, {recursive: true, force: true}) - console.log('Removing local temp content', workfolder, ipfsHash.cid.toString()) + logger.info('Removing local temp content', workfolder, ipfsHash.cid.toString()) await this.updateJob(jobInfo.id, { status: EncodeStatus.COMPLETE, @@ -425,7 +426,7 @@ export class EncoderService { return ipfsHash.cid.toString() } catch (ex) { clearInterval(progressInterval) - console.log(ex) + logger.error(ex) this.updateJob(jobInfo.id, { status: EncodeStatus.FAILED, }) @@ -444,7 +445,7 @@ export class EncoderService { //jobInfo = (await TileDocument.load(this.self.ceramic, streamId)).content jobInfo = await this.pouch.get(jobInfoOrId) } catch (ex) { - console.log(ex) + logger.error(ex) throw new Error('Error not streamId') } } else if (typeof jobInfoOrId === 'object') { @@ -457,7 +458,7 @@ export class EncoderService { // }) const out = await this.executeJobRaw(jobInfo, streamId) - console.log(out) + logger.info(out) } /** * generate the master manifest for the transcoded video. @@ -475,7 +476,7 @@ export class EncoderService { )},NAME=${tutils.getHeight(size)}\n` } let result = master - console.log('availableSizes: ', sizes) + logger.info('availableSizes: ', sizes) sizes.forEach((size) => { // log(`format: ${JSON.stringify(formats[size])} , size: ${size}`) result += resolutionLine(size) @@ -549,7 +550,7 @@ export class EncoderService { } async start() { /*const data = await this.createJob('Qma9ZjjtH7fdLWSrMU43tFihvSN59aQdes7f5KW6vGGk6e', 'QmctF7GPunpcLu3Fn77Ypo657TA9GpTiipSDUUMoD2k5Kq', 'did:3:kjzl6cwe1jw14aijwpxwaa1ybg708bp9n5jqt8q89j6yrdqvt8tfxdxw1q5dpxh') - console.log(data) + logger.info(data) this.executeJob(data.streamId)*/ } } diff --git a/src/modules/core/gateway.service.ts b/src/modules/core/gateway.service.ts index fb1f388..17d74a2 100644 --- a/src/modules/core/gateway.service.ts +++ b/src/modules/core/gateway.service.ts @@ -22,6 +22,7 @@ import { ActivityService } from './oplog.service' import { ScoringService } from './gateway/scoring' import Axios from 'axios' import moment from 'moment' +import logger from 'node-color-log' export class GatewayService { @@ -160,7 +161,7 @@ export class GatewayService { async cancelJob(job_id) {} async rejectJob(job_id, node_id) { - console.log('job rejecting', job_id, node_id) + logger.info('job rejecting', job_id, node_id) const jobInfo = await this.jobs.findOne({ id: job_id }) @@ -249,12 +250,12 @@ export class GatewayService { const jobInfo = await this.jobs.findOne({ id: payload.job_id, }) - console.log(payload) - console.log('received finish job from ' + did) + logger.info(payload) + logger.info('received finish job from ' + did) if (jobInfo.assigned_to === did) { if (payload.output) { if (payload.output.cid) { - console.log('accepted finish job from ' + did) + logger.info('accepted finish job from ' + did) await this.activity.changeState({ job_id: payload.job_id, new_status: JobStatus.UPLOADING, @@ -274,7 +275,7 @@ export class GatewayService { replicationFactorMin: 2, replicationFactorMax: 3, }) - console.log(out) + logger.info(out) } else { throw new Error('Output CID not provided') } @@ -354,7 +355,7 @@ export class GatewayService { async createJob(url: string, metadata, storageMetadata) { const { headers } = await Axios.head(url) - console.log(headers['content-length']) + logger.info(headers['content-length']) const obj = { id: uuid(), created_at: new Date(), @@ -402,9 +403,9 @@ export class GatewayService { }, ], }) - console.log(`${await expiredJobs.count()} number of jobs has expired`) + logger.info(`${await expiredJobs.count()} number of jobs has expired`) for await (let job of expiredJobs) { - console.log(`Re-assigning ${job.id} from ${job.assigned_to}`) + logger.info(`Re-assigning ${job.id} from ${job.assigned_to}`) await this.activity.changeState({ job_id: job.id, new_status: JobStatus.QUEUED, @@ -443,12 +444,12 @@ export class GatewayService { status: JobStatus.UPLOADING, }) .toArray() - console.log(jobs) + logger.info(jobs) for (let job of jobs) { const cid = (job.result as any).cid - console.log(cid) + logger.info(cid) const pinStatus = await this.ipfsCluster.status(cid) - console.log(pinStatus) + logger.info(pinStatus) let uploaded = false let pinning = false let pinQueued = false; @@ -496,7 +497,7 @@ export class GatewayService { }) } - console.log(`${job.id}: ${uploaded}`) + logger.info(`${job.id}: ${uploaded}`) } } @@ -515,7 +516,7 @@ export class GatewayService { NodeSchedule.scheduleJob('45 * * * * *', this.runUploadingCheck) this.scoring = new ScoringService(this) - console.log( + logger.info( JSON.stringify( await this.self.identityService.identity.createJWS({ hello: 'world', @@ -546,9 +547,9 @@ void (async () => { }) const a = await ipfsCluster.status('QmaCRG6bam6XJiXfVSSPkXAY388GUgv22bvhqkHNHeqL8h') const b = await ipfsCluster.status('QmeeZ8sDCG6krbLQ7h5Su4YXjKA6qVjGW6FeRCc7u5HiCP') - console.log(a) - console.log(b) - console.log(await ipfsCluster.pin.ls()) + logger.info(a) + logger.info(b) + logger.info(await ipfsCluster.pin.ls()) await ipfsCluster.pin.rm('QmaSL1VwhRERhHPnddb19o6K2BhRazVTtDZq1TVuqQA5dd') } catch {} })() diff --git a/src/modules/core/gatewayClient.service.ts b/src/modules/core/gatewayClient.service.ts index 9bd7a5d..61f19fc 100644 --- a/src/modules/core/gatewayClient.service.ts +++ b/src/modules/core/gatewayClient.service.ts @@ -5,6 +5,7 @@ import PQueue from 'p-queue' import { TileDocument } from '@ceramicnetwork/stream-tile' import { EncodeStatus, JobStatus } from '../encoder.model' import GitCommitInfo from 'git-commit-info' +import logger from 'node-color-log' const queue_concurrency = 1 export class GatewayClient { @@ -39,7 +40,7 @@ export class GatewayClient { job_id: remoteJob.id, }), }) - console.log(data) + logger.info(data) } catch { //If job was already stolen. return; @@ -50,7 +51,7 @@ export class GatewayClient { this.activeJobs[job_id] = remoteJob; const job = await this.self.encoder.createJob(remoteJob.input.uri); //Creates an internal job - console.log(job) + logger.info(job) let pid; //Adds job to the queue. @@ -68,15 +69,15 @@ export class GatewayClient { }, 5000) const eventListenr = async (jobUpdate) => { - console.log(jobUpdate) + logger.info(jobUpdate) - // console.log(jobUpdate.content.status, JobStatus.COMPLETE, jobUpdate.streamId.toString(), job.streamId) + // logger.info(jobUpdate.content.status, JobStatus.COMPLETE, jobUpdate.streamId.toString(), job.streamId) //Make sure the event is not destine for another job if (jobUpdate.streamId.toString() === job.streamId) { - console.log(`Current Status: ${job.streamId} ${jobUpdate.content.status}`) + logger.info(`Current Status: ${job.streamId} ${jobUpdate.content.status}`) //Ensure the job is complete and not something else if (jobUpdate.content.status === JobStatus.COMPLETE) { - console.log(`Encode Complete ${job_id} submitting`) + logger.info(`Encode Complete ${job_id} submitting`) //Submitting the job await Axios.post(`${this.apiUrl}/api/v0/gateway/finishJob`, { jws: await this.self.identityService.identity.createJWS({ @@ -96,7 +97,7 @@ export class GatewayClient { }) }) this.ipfsBootstrap().catch((e) => { - console.log(e) + logger.error(e) }) delete this.activeJobs[job_id]; clearInterval(pid) @@ -113,25 +114,25 @@ export class GatewayClient { //TODO: Probably should redo the whole remote and local job ID thing await this.self.encoder.executeJob(job.id) } catch(ex) { - console.log('failing job ' + job_id) + logger.error('failing job ' + job_id) await this.failJob(job_id) clearInterval(pid) delete this.activeJobs[job_id]; - console.log(ex) + logger.error(ex) } }) } catch (ex) { - console.log(ex) + logger.error(ex) } //this.self.encoder.createJob(jobInfo.input.url) } async getNewJobs() { - console.log(this.apiUrl) + logger.info(this.apiUrl) /*const { data } = await Axios.get(`${this.apiUrl}/api/v0/gateway/getJob`) - console.log(data)*/ + logger.info(data)*/ const { data } = await Axios.post(`${this.apiUrl}/v1/graphql`, { query: ` query Query($node_id: String) { @@ -166,12 +167,12 @@ export class GatewayClient { } }) - console.log('jobInfo', JSON.stringify(data), { + logger.info('jobInfo', JSON.stringify(data), { node_id: this.self.identityService.identity.id }) if(data.data.queueJob.job) { - console.log(this.jobQueue.size === 0, this.jobQueue.pending, (queue_concurrency - 1)) + logger.info(this.jobQueue.size === 0, this.jobQueue.pending, (queue_concurrency - 1)) if (this.jobQueue.size === 0 && this.jobQueue.pending === (queue_concurrency - 1)) { this.queueJob(data.data.queueJob.job) } @@ -214,10 +215,10 @@ export class GatewayClient { } async encoderUnpinCheck() { - console.log(`[GC] Running unpin cycle`) + logger.info(`[GC] Running unpin cycle`) try { const doc:Record = await this.self.encoder.pouch.get('pin-allocation') - console.log(doc) + logger.info(doc) for(let [job_id, docData] of Object.entries(doc)) { const { data } = await Axios.post(`${this.apiUrl}/v1/graphql`, { query: ` @@ -235,9 +236,9 @@ export class GatewayClient { const jobInfo = data.data.jobInfo - console.log(jobInfo) + logger.info(jobInfo) if(jobInfo.status === "complete") { - console.log(`[GC] Unpinning ${docData.cid} from ${job_id}`) + logger.info(`[GC] Unpinning ${docData.cid} from ${job_id}`) try { await this.self.ipfs.pin.rm(docData.cid) } catch { @@ -252,22 +253,22 @@ export class GatewayClient { } } } catch(ex) { - // console.log(ex) + // logger.error(ex) } - console.log('[GC] Running IPFS GC') + logger.info('[GC] Running IPFS GC') for await(let gcResult of this.self.ipfs.repo.gc()) { - // console.log(gcResult) + // logger.info(gcResult) //Don't log here unless you want spam } - console.log('[GC] IPFS GC complete') + logger.info('[GC] IPFS GC complete') } async start() { if (this.self.config.get('remote_gateway.enabled')) { this.apiUrl = this.self.config.get('remote_gateway.api') || 'http://127.0.0.1:4005' - console.log(`${Math.round(Math.random() * (60 + 1))} * * * * *`) + logger.info(`${Math.round(Math.random() * (60 + 1))} * * * * *`) - console.log('Startup: Checking if IPFS is running') + logger.info('Startup: Checking if IPFS is running') try { await (await this.self.ipfs.id()).id } catch { @@ -295,7 +296,7 @@ export class GatewayClient { }), }) } catch (ex) { - console.log(ex) + logger.error(ex) process.exit(0) } }, 1000) @@ -303,13 +304,13 @@ export class GatewayClient { } async stop() { NodeSchedule.gracefulShutdown(); - console.log(Object.keys(this.activeJobs)) + logger.info(Object.keys(this.activeJobs)) for (let job_id of Object.keys(this.activeJobs)) { - console.log('Cancelling all jobs') + logger.info('Cancelling all jobs') try { await this.rejectJob(job_id) } catch (ex) { - console.log(ex) + logger.error(ex) } } } diff --git a/src/modules/core/identity.service.ts b/src/modules/core/identity.service.ts index 6f52519..21255e7 100644 --- a/src/modules/core/identity.service.ts +++ b/src/modules/core/identity.service.ts @@ -2,6 +2,7 @@ import { Ed25519Provider } from "key-did-provider-ed25519"; import Crypto from 'crypto' import KeyResolver from 'key-did-resolver' import { DID } from 'dids' +import logger from 'node-color-log' import { CoreService } from "./core.service"; @@ -35,6 +36,6 @@ export class IdentityService { this.self.config.set('node.publicKey', this.identity.id); } - console.info(`Logged in with ${did.id}`) + logger.info(`Logged in with ${did.id}`) } } \ No newline at end of file diff --git a/src/modules/core/misc/discordbot.service.ts b/src/modules/core/misc/discordbot.service.ts index e6bb470..b5a0c4c 100644 --- a/src/modules/core/misc/discordbot.service.ts +++ b/src/modules/core/misc/discordbot.service.ts @@ -7,6 +7,7 @@ import { Partials, APIEmbedField } from 'discord.js' +import logger from 'node-color-log' import { CoreService } from '../core.service' export class DiscordBot { @@ -25,7 +26,7 @@ export class DiscordBot { id: did, }) const embedFields:Array = [] - console.log(nodeScores) + logger.info(nodeScores) for(let field in nodeScores) { embedFields.push({ name: field, @@ -73,7 +74,7 @@ export class DiscordBot { const nodeOwner = await this.self.gateway.clusterNodes.findOne({ id: did, }) - console.log(nodeScores) + logger.info(nodeScores) for(let field in nodeScores) { embedFields.push({ name: field, diff --git a/src/modules/libp2p/libp2p.service.ts b/src/modules/libp2p/libp2p.service.ts index b43f369..2ff4aad 100644 --- a/src/modules/libp2p/libp2p.service.ts +++ b/src/modules/libp2p/libp2p.service.ts @@ -13,6 +13,7 @@ import { decode, encode } from './frame-codec.util' import { MESSAGE_TYPES } from './messages.model' import pipe from 'it-pipe' import Pushable from 'it-pushable' +import logger from 'node-color-log' const PEERINFO = { id: 'QmctF7GPunpcLu3Fn77Ypo657TA9GpTiipSDUUMoD2k5Kq', @@ -36,18 +37,18 @@ export class Lib2pService { } async connectionHandler({ connection, stream, protocol }) { - console.log(connection) + logger.info(connection) const pushable = Pushable() pipe(pushable, stream.sink) let listener; for await (const item of stream.source) { const decodedMessage = decode(item._bufs[0]) - console.log(decodedMessage) + logger.info(decodedMessage) if(decodedMessage.type === MESSAGE_TYPES.SUBSCRIBE_UPDATE) { listener = this.self.encoder.events.on('job.progress', (streamId, statusUpdate) => { if(streamId === decodedMessage.streamId) { - console.log(statusUpdate) + logger.info(statusUpdate) pushable.push(encode({ type: MESSAGE_TYPES.STATUS_UPDATE, statusUpdate @@ -59,8 +60,8 @@ export class Lib2pService { } if(decodedMessage.type === MESSAGE_TYPES.REQUEST_ENCODE_JOB) { - console.log(stream) - console.log(connection) + logger.info(stream) + logger.info(connection) const data = await this.self.encoder.createJob(decodedMessage.ipfsHash, connection.remotePeer.toString()) @@ -74,7 +75,7 @@ export class Lib2pService { } //this.self.encoder.events.off //clear event listeners - console.log('stream is ending') + logger.info('stream is ending') } async start() { @@ -82,7 +83,7 @@ export class Lib2pService { return; } const idListener = await PeerId.createFromJSON(PEERINFO) - console.log('P2P interface starting up') + logger.info('P2P interface starting up') this.libp2p = await Libp2p.create({ peerId: idListener, modules: { @@ -129,16 +130,16 @@ export class Lib2pService { // start libp2p await this.libp2p.start() - console.log(this.libp2p.addresses) + logger.info(this.libp2p.addresses) setInterval(() => { - //console.log(this.libp2p.connections.size) + //logger.info(this.libp2p.connections.size) }, 5000) const handler = async ({ connection, stream, protocol }) => { // use stream or connection according to the needs - console.log(connection, stream, protocol) + logger.info(connection, stream, protocol) for await (const item of stream.source) { - console.log(item) - console.log(decode(item._bufs[0])) + logger.info(item) + logger.info(decode(item._bufs[0])) } } From cac9a3bb3b3303df337cfeab770141c0d84a8611 Mon Sep 17 00:00:00 2001 From: Rishi Panthee Date: Mon, 3 Apr 2023 23:25:26 -0500 Subject: [PATCH 2/2] Add more logging info --- client.ts | 2 -- src/modules/core/gatewayClient.service.ts | 5 +++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/client.ts b/client.ts index 317454a..e73fd5e 100644 --- a/client.ts +++ b/client.ts @@ -29,9 +29,7 @@ const ClientKey = } void (async () => { - logger.info('P2P interface starting up') - const ceramic = new CeramicClient('https://ceramic-clay.3boxlabs.com') //Using the public node for now. const idListener = await PeerId.createFromJSON(ClientKey) const libp2p = await Libp2p.create({ diff --git a/src/modules/core/gatewayClient.service.ts b/src/modules/core/gatewayClient.service.ts index 61f19fc..f9b3ce6 100644 --- a/src/modules/core/gatewayClient.service.ts +++ b/src/modules/core/gatewayClient.service.ts @@ -30,6 +30,7 @@ export class GatewayClient { } async queueJob(remoteJob) { + logger.info('Attempting to queue remote job', remoteJob) try { this.jobQueue.add(async () => { //Asks gateway to accept the job @@ -51,7 +52,7 @@ export class GatewayClient { this.activeJobs[job_id] = remoteJob; const job = await this.self.encoder.createJob(remoteJob.input.uri); //Creates an internal job - logger.info(job) + logger.info('Internal job created', job) let pid; //Adds job to the queue. @@ -130,7 +131,7 @@ export class GatewayClient { //this.self.encoder.createJob(jobInfo.input.url) } async getNewJobs() { - logger.info(this.apiUrl) + logger.info('Getting new jobs from', this.apiUrl) /*const { data } = await Axios.get(`${this.apiUrl}/api/v0/gateway/getJob`) logger.info(data)*/ const { data } = await Axios.post(`${this.apiUrl}/v1/graphql`, {