diff --git a/handler.js b/handler.js index d9f4178..e21b5bb 100644 --- a/handler.js +++ b/handler.js @@ -1,212 +1,213 @@ -'use strict'; - -const childProcess = require("child_process"); -const fs = require("fs"); -const async = require("async"); -const aws = require("aws-sdk"); -const s3 = new aws.S3(); - -function handleRequest(request) { - - const metrics = { - "fargateStart": Date.now(), - "fargateEnd": "", - "downloadStart": "", - "downloadEnd": "", - "executionStart": "", - "executionEnd": "", - "uploadStart": "", - "uploadEnd": "", - }; - - const executable = request.executable; - const args = request.args; - const bucket_name = request.options.bucket; - const prefix = request.options.prefix; - const inputs = request.inputs.map(input => input.name); - const outputs = request.outputs.map(output => output.name); - const files = inputs.slice(); - const logName = request.logName; - files.push(executable); - - console.log("Executable: " + executable); - console.log("Arguments: " + args); - console.log("Inputs: " + inputs); - console.log("Outputs: " + outputs); - console.log("Bucket: " + bucket_name); - console.log("Prefix: " + prefix); - console.log("Stdout: " + request.stdout); - - async.waterfall([ - download, - execute, - upload - ], async function (err) { - if (err) { - console.error("Error: " + err); - process.exit(1) - } else { - console.log("Success"); - metrics.fargateEnd = Date.now(); - const metricsString = "fargate start: " + metrics.fargateStart + " fargate end: " + metrics.fargateEnd + - " download start: " + metrics.downloadStart + " download end: " + metrics.downloadEnd + - " execution start: " + metrics.executionStart + " execution end: " + metrics.executionEnd + - " upload start: " + metrics.uploadStart + " upload end: " + metrics.uploadEnd; - if (logName !== undefined) { - await s3.putObject({ - Bucket: bucket_name, - Key: "logs/" + logName, - ContentType: 'text/plain', - Body: metricsString - }).promise(); - } else { - console.log(metricsString); - } - } - }); - - function download(callback) { - metrics.downloadStart = Date.now(); - async.each(files, function (file, callback) { - - console.log("Downloading " + bucket_name + "/" + prefix + "/" + file); - - const params = { - Bucket: bucket_name, - Key: prefix + "/" + file - }; - s3.getObject(params, function (err, data) { - if (err) { - console.log("Error downloading file " + JSON.stringify(params)); - process.exit(1) - } else { - const path = "/tmp/" + file; - fs.writeFile(path, data.Body, function (err) { - if (err) { - console.log("Unable to save file " + path); - process.exit(1) - } - console.log("Downloaded " + path); - console.log("Downloaded and saved file " + path); - callback(); - }); - } - }); - }, function (err) { - metrics.downloadEnd = Date.now(); - if (err) { - console.error("Failed to download file:" + err); - process.exit(1) - } else { - console.log("All files have been downloaded successfully"); - callback() - } - }); - } - - function execute(callback) { - metrics.executionStart = Date.now(); - const proc_name = /tmp/ + "/" + executable; - fs.chmodSync(proc_name, "777"); - - let proc; - console.log("Running executable" + proc_name); - - if (proc_name.endsWith(".js")) { - proc = childProcess.fork(proc_name, args, {cwd: "/tmp"}); - } else if (proc_name.endsWith(".jar")) { - let java_args = ['-jar', proc_name]; - const program_args = java_args.concat(args); - proc = childProcess.spawn('java', program_args, {cwd: "/tmp"}); - } else { - proc = childProcess.spawn(proc_name, args, {cwd: "/tmp"}); - - proc.stdout.on("data", function (exedata) { - console.log("Stdout: " + executable + exedata); - }); - - proc.stderr.on("data", function (exedata) { - console.log("Stderr: " + executable + exedata); - }); - } - - if (request.stdout) { - let stdoutStream = fs.createWriteStream("/tmp" + "/" + request.stdout, {flags: 'w'}); - proc.stdout.pipe(stdoutStream); - } - - proc.on("error", function (code) { - console.error("Error!!" + executable + JSON.stringify(code)); - }); - proc.on("exit", function () { - console.log("My exe exit " + executable); - }); - - proc.on("close", function () { - console.log("My exe close " + executable); - metrics.executionEnd = Date.now(); - callback() - }); - } - - function upload(callback) { - metrics.uploadStart = Date.now(); - async.each(outputs, function (file, callback) { - - console.log("Uploading " + bucket_name + "/" + prefix + "/" + file); - const path = "/tmp/" + file; - - fs.readFile(path, function (err, data) { - if (err) { - console.log("Error reading file " + path); - process.exit(1) - } - - const params = { - Bucket: bucket_name, - Key: prefix + "/" + file, - Body: data - }; - - s3.putObject(params, function (err) { - if (err) { - console.log("Error uploading file " + file); - process.exit(1) - } - console.log("Uploaded file " + file); - callback(); - }); - }); - - }, function (err) { - metrics.uploadEnd = Date.now(); - if (err) { - console.log("Error uploading file " + err); - process.exit(1) - } else { - console.log("All files have been uploaded successfully"); - callback() - } - }); - } -} - -let arg = process.argv[2]; - -if (!arg) { - console.log("Received empty request, exiting..."); - process.exit(1); -} - -if (arg.startsWith("S3")) { - const params = JSON.parse(arg.split("=")[1]); - console.log("Getting executable config from S3: " + JSON.stringify(params)); - s3.getObject(params, function (err, data) { - if (err) - return err; - arg = data.Body.toString(); - handleRequest(JSON.parse(arg)); - }); -} else { - handleRequest(JSON.parse(arg)); -} \ No newline at end of file +'use strict'; + +const childProcess = require('child_process'); +const fs = require('fs'); +const async = require('async'); +const aws = require('aws-sdk'); +const s3 = new aws.S3(); + +const init = require('./monitoring').init; + +function handleRequest(request) { + if (request.s3) { + const params = { + Bucket: request.options.bucket, + Key: `${request.options.prefix}/${request.s3}` + }; + s3.getObject(params, (err, data) => { + if (err) { + console.error(`Error downloading job message file from S3 - bucket: ${params.Bucket}, key: ${params.Key}`); + process.exit(1); + } + handle(JSON.parse(data.Body.toString())); + }) + } else { + handle(request); + } +} + +function handle(request) { + const start = Date.now(); + + const executable = request.executable; + const args = request.args; + const bucket_name = request.options.bucket; + const prefix = request.options.prefix; + const inputs = request.inputs.map(input => input.name); + const outputs = request.outputs.map(output => output.name); + const files = inputs.slice(); + files.push(executable); + + console.log(`Executable: ${executable}`); + console.log(`Arguments: ${args}`); + console.log(`Inputs: ${inputs}`); + console.log(`Outputs: ${outputs}`); + console.log(`Bucket: ${bucket_name}`); + console.log(`Prefix: ${prefix}`); + console.log(`Stdout: ${request.stdout}`); + console.log(`PATH: ${process.env.PATH}`); + + async.waterfall([ + init, + download, + execute, + upload, + ], function (err, reporter, previousData) { + if (err) { + console.error(`Error in waterfall: ${err}`); + process.exit(1); + } else { + const end = Date.now(); + + reporter.report({start: start, end: end, ...previousData}, function (err) { + if (err) { + console.error(`Error on reporting exec time: ${err}`); + } + + console.log(`AWS Fargate exit: duration ${end - start} ms, executable: ${executable}, args: ${args}`); + process.exit(0); + }); + } + }); + + function download(reporter, callback) { + const downloadStart = Date.now(); + + async.each(files, function (file, callback) { + + console.log(`Downloading ${bucket_name}/${prefix}/${file}`); + + const params = { + Bucket: bucket_name, + Key: `${prefix}/${file}` + }; + + s3.getObject(params, function (err, data) { + if (err) { + console.log(`Error downloading file ${JSON.stringify(params)}`); + process.exit(1); + } else { + const path = "/tmp/" + file; + + fs.writeFile(path, data.Body, function (err) { + if (err) { + console.log(`Unable to save file ${path}`); + process.exit(1) + } + + console.log(`Downloaded and saved file ${path}`); + + fs.chmod(path, '777', callback); + }); + } + }); + }, function (err) { + if (err) { + console.error(`Failed to download file: ${err}`); + process.exit(1) + } else { + console.log(`All files have been downloaded successfully`); + + callback(null, reporter, { + download_start: downloadStart, + download_end: Date.now() + }); + } + }); + } + + function execute(reporter, previousData, callback) { + const executeStart = Date.now(); + + const proc_name = `/tmp/${executable}`; + + let proc; + console.log("Running executable" + proc_name); + + if (proc_name.endsWith('.js')) { + proc = childProcess.fork(proc_name, args, {cwd: '/tmp'}); + } else if (proc_name.endsWith('.jar')) { + let java_args = ['-jar', proc_name]; + const program_args = java_args.concat(args); + proc = childProcess.spawn('java', program_args, {cwd: '/tmp'}); + } else { + proc = childProcess.spawn(proc_name, args, {cwd: '/tmp', env: {PATH: process.env.PATH}}); + + proc.stdout.on('data', function (exedata) { + console.log(`Stdout: ${executable} / ${exedata}`); + }); + + proc.stderr.on('data', function (exedata) { + console.log(`Stderr: ${executable} / ${exedata}`); + }); + } + + if (request.stdout) { + let stdoutStream = fs.createWriteStream(`/tmp/${request.stdout}`, {flags: 'w'}); + proc.stdout.pipe(stdoutStream); + } + + proc.on('error', function (code) { + console.error(`Error: ${JSON.stringify(code)}`); + callback(code); + }); + proc.on('exit', function () { + console.log(`My exe exit ${executable}`); + }); + + proc.on('close', function () { + console.log(`My exe close ${executable}`); + + callback(null, reporter, {execute_start: executeStart, execute_end: Date.now(), ...previousData}); + }); + } + + function upload(reporter, previousData, callback) { + const uploadStart = Date.now(); + + async.each(outputs, function (file, callback) { + + console.log(`Uploading ${bucket_name}/${prefix}/${file}`); + const path = `/tmp/${file}`; + + fs.readFile(path, function (err, data) { + if (err) { + console.log(`Error reading file ${path}, err: ${err}`); + process.exit(1); + } + + const params = { + Bucket: bucket_name, + Key: `${prefix}/${file}`, + Body: data + }; + + s3.putObject(params, function (err) { + if (err) { + console.log(`Error uploading file ${file}`); + process.exit(1) + } + console.log(`Uploaded file ${file}`); + callback(); + }); + }); + + }, function (err) { + if (err) { + console.log(`Error uploading file ${err}`); + process.exit(1); + } else { + console.log('All files have been uploaded successfully'); + + callback(null, reporter, {upload_start: uploadStart, upload_end: Date.now(), ...previousData}) + } + }); + } +} + +if (!process.argv[2]) { + console.log('Received empty request, exiting...'); + process.exit(1); +} + +handleRequest(JSON.parse(process.argv[2])); diff --git a/handler_dockerfiles/default/Dockerfile b/handler_dockerfiles/default/Dockerfile index 58f0d8d..efcc630 100644 --- a/handler_dockerfiles/default/Dockerfile +++ b/handler_dockerfiles/default/Dockerfile @@ -1,6 +1,6 @@ -FROM node:8 -WORKDIR /usr/src/app -COPY package*.json ./ -RUN npm install -COPY . . +FROM node:8-alpine +WORKDIR /usr/src/app +COPY package*.json ./ +RUN npm install +COPY . . CMD [ "npm", "start" ] \ No newline at end of file diff --git a/metadata.js b/metadata.js new file mode 100644 index 0000000..8c54b50 --- /dev/null +++ b/metadata.js @@ -0,0 +1,17 @@ +'use strict'; + +const METADATA_URL = process.env.ECS_CONTAINER_METADATA_URI; + +let _metadata = null; + +function fetch(callback) { + if (_metadata) { + return callback(null, _metadata) + } + + _metadata = METADATA_URL.substring(METADATA_URL.lastIndexOf('/'), METADATA_URL.length); + + return callback(null, _metadata); +} + +exports.fetch = fetch; diff --git a/monitoring.js b/monitoring.js new file mode 100644 index 0000000..5974aa7 --- /dev/null +++ b/monitoring.js @@ -0,0 +1,142 @@ +'use strict'; + +const INTERFACE = process.env.INTERFACE || null; +const DISK_OPTS = { + device: process.env.DISK_DEVICE || 'xvda1', + units: 'KiB', +}; +const TASK_ID = process.env.TASK_ID || 'undefinedTaskId'; +const START = process.env.START || 0; +const LABELS = process.env.LABELS || ''; +const INFLUXDB_HOST = process.env.INFLUXDB_HOST || 'influxdb'; +const INFLUXDB_DB_NAME = process.env.INFLUXDB_NAME || 'hyperflow-database'; +const COLLECT_INTERVAL = 1000; + +const si = require('systeminformation'); +const os_utils = require('os-utils'); +const diskStat = require('disk-stat'); +const async = require('async'); +const Influx = require('influx'); + +const fetchMetaData = require('./metadata').fetch; + +const MetricDispatcher = function (err, metadata) { + this.tags = { + containerID: err ? 'undefinedContainerId' : metadata, + taskID: TASK_ID, + ...this._parseLabels(LABELS) + }; + + this._collectUsage = this._collectUsage.bind(this); + + // initialize influx, create database and start collecting usage + this._initInflux(INFLUXDB_HOST, INFLUXDB_DB_NAME, Object.keys(this.tags)) + .then(() => setInterval(this._collectUsage, COLLECT_INTERVAL)); +}; + +// splits labels in form of a string: 'key1=val1,key2=val2', to object: {key1: val1, key2: val2} +MetricDispatcher.prototype._parseLabels = function (labelsString) { + return labelsString ? labelsString.split(',') + .map(s => s.split('=')) + .reduce((acc, curr) => { + acc[curr[0]] = curr[1]; + return acc; + }, {}) : + {}; +}; + +MetricDispatcher.prototype._initInflux = function (url, dbName, tags) { + this.influx = new Influx.InfluxDB({ + host: url, + database: dbName, + schema: [ + { + measurement: 'performance', + fields: { + cpu_usage: Influx.FieldType.FLOAT, + mem_usage: Influx.FieldType.INTEGER, + conn_recv: Influx.FieldType.INTEGER, + conn_transferred: Influx.FieldType.INTEGER, + disk_read: Influx.FieldType.INTEGER, + disk_write: Influx.FieldType.INTEGER + }, + tags: tags + }, + { + measurement: 'hflow_task', + fields: { + start: Influx.FieldType.FLOAT, + end: Influx.FieldType.FLOAT, + download_start: Influx.FieldType.FLOAT, + download_end: Influx.FieldType.FLOAT, + execute_start: Influx.FieldType.FLOAT, + execute_end: Influx.FieldType.FLOAT, + upload_start: Influx.FieldType.FLOAT, + upload_end: Influx.FieldType.FLOAT + }, + tags: tags + }, + ] + }); + + return this.influx.createDatabase(dbName) +}; + +MetricDispatcher.prototype._write = function (measurement, fields, callback) { + this.influx.writeMeasurement(measurement, [{ + tags: this.tags, + fields: fields + }]).then(callback).catch(callback); +}; + +MetricDispatcher.prototype._collectUsage = function () { + async.waterfall([ + callback => { + os_utils.cpuUsage(value => this._write('hflow_performance', {cpu_usage: value}, callback)); + }, + callback => { + si.mem(data => this._write('hflow_performance', {mem_usage: data.used / 1024}, callback)); + }, + callback => { + si.networkStats(INTERFACE, data => this._write('hflow_performance', { + conn_recv: data[0].rx_sec, + conn_transferred: data[0].tx_sec + }, callback)); + }, + callback => { + diskStat.usageRead(DISK_OPTS, value => this._write('hflow_performance', {disk_read: value}, callback)); + }, + callback => { + diskStat.usageWrite(DISK_OPTS, value => this._write('hflow_performance', {disk_write: value}, callback)); + } + + ], function (err) { + if (err) { + console.warn(`Error while pushing metrics to tsdb: ${err.message}`); + } else { + console.log('Successfully pushed metrics to tsdb'); + } + }); +}; + +MetricDispatcher.prototype.reportExecTime = function (start, end, callback) { + this._write('hflow_task', {start: start, end: end}, err => callback(err, end - start)); +}; + +MetricDispatcher.prototype.report = function (fields, callback) { + this._write('hflow_task', fields, callback); +}; + +function _init(callback) { + fetchMetaData((err, metadata) => { + const reporter = new MetricDispatcher(err, metadata); + callback(null, reporter); + }); + +} + +if (START) { + _init(() => console.log('Initialized monitoring service')); +} else { + exports.init = _init; +} \ No newline at end of file diff --git a/package.json b/package.json index b11ebd2..2322300 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,17 @@ -{ - "name": "aws-fargate-executor", - "version": "1.0.0", - "main": "handler.js", - "scripts": { - "start": "node handler.js" - }, - "dependencies": { - "async": "^2.6.2", - "aws-sdk": "^2.429.0" - } -} +{ + "name": "aws-fargate-executor", + "version": "1.0.0", + "main": "handler.js", + "scripts": { + "start": "node handler.js", + "dev": "node monitoring.js" + }, + "dependencies": { + "async": "^2.6.2", + "aws-sdk": "^2.429.0", + "disk-stat": "^1.0.4", + "influx": "^5.0.7", + "os-utils": "0.0.14", + "systeminformation": "^4.1.4" + } +}