Skip to content

Commit 84680c2

Browse files
committed
add connection to metrics listener
1 parent 5d02936 commit 84680c2

File tree

4 files changed

+1234
-5
lines changed

4 files changed

+1234
-5
lines changed

handler.js

Lines changed: 121 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env node
22
// Executor of 'jobs' using the Redis task status notification mechanism
3-
3+
const { Connection } = require('@hflow-monitoring/hyperflow-metrics-listeners').Client;
44
const { spawn } = require('child_process');
55
const redis = require('redis');
66
const fs = require('fs');
@@ -79,7 +79,8 @@ async function handleJob(taskId, rcl) {
7979

8080
var pids = {} // pids of the entire pid tree (in case the main process starts child processes)
8181
var jm; // parsed job message
82-
82+
var metricsListenerConnection; // metrics listener client instance (instantiated after receiving the job message)
83+
var metricBase; // metric object with all common fields set
8384
// logging basic process info from the procfs
8485
logProcInfo = function (pid) {
8586
// log process command line
@@ -98,6 +99,16 @@ async function handleJob(taskId, rcl) {
9899
let ioInfo = procfs.processIo(pid);
99100
ioInfo.pid = pid;
100101
ioInfo.name = jm["name"];
102+
103+
let io = {
104+
...metricBase,
105+
time: new Date(),
106+
pid: pid + '',
107+
parameter: 'io',
108+
value: ioInfo
109+
}
110+
metricsListenerConnection.send('metric', io);
111+
101112
logger.info("IO:", JSON.stringify(ioInfo));
102113
setTimeout(() => logProcIO(pid), probeInterval);
103114
} catch (error) {
@@ -111,8 +122,16 @@ async function handleJob(taskId, rcl) {
111122
logProcNetDev = function (pid) {
112123
try {
113124
let netDevInfo = procfs.processNetDev(pid);
125+
let network = {
126+
...metricBase,
127+
time: new Date(stats.timestamp),
128+
pid: pid + '',
129+
parameter: 'network',
130+
value: JSON.stringify(netDevInfo)
131+
}
114132
//netDevInfo.pid = pid;
115133
//netDevInfo.name = jm["name"];
134+
metricsListenerConnection.send('metric', network);
116135
logger.info("NetDev: pid:", pid, JSON.stringify(netDevInfo));
117136
setTimeout(() => logProcNetDev(pid), probeInterval);
118137
} catch (error) {
@@ -139,6 +158,30 @@ async function handleJob(taskId, rcl) {
139158
// elapsed: 6650000, // ms since the start of the process
140159
// timestamp: 864000000 // ms since epoch
141160
// }
161+
let cpu = {
162+
...metricBase,
163+
time: new Date(stats.timestamp),
164+
pid: pid + '',
165+
parameter: 'cpu',
166+
value: stats.cpu
167+
};
168+
let memory = {
169+
...metricBase,
170+
time: new Date(stats.timestamp),
171+
pid: pid + '',
172+
parameter: 'memory',
173+
value: stats.memory
174+
}
175+
let ctime = {
176+
...metricBase,
177+
time: new Date(stats.timestamp),
178+
pid: pid + '',
179+
parameter: 'ctime',
180+
value: stats.ctime
181+
}
182+
metricsListenerConnection.send('metric', cpu);
183+
metricsListenerConnection.send('metric', memory);
184+
metricsListenerConnection.send('metric', ctime);
142185
logger.info("Procusage: pid:", pid, JSON.stringify(stats));
143186
setTimeout(() => logPidUsage(pid), probeInterval);
144187
});
@@ -168,6 +211,15 @@ async function handleJob(taskId, rcl) {
168211
cmd.stderr.pipe(stderrLog);
169212

170213
logProcInfo(targetPid);
214+
215+
// send jobStart event
216+
const job_start_event = {
217+
...metricBase,
218+
time: new Date(),
219+
parameter: 'event',
220+
value: 'jobStart'
221+
}
222+
metricsListenerConnection.send('metric', job_start_event);
171223
logger.info('job started:', jm["name"]);
172224

173225
var sysinfo = {};
@@ -181,7 +233,15 @@ async function handleJob(taskId, rcl) {
181233
then(data => {
182234
sysinfo.mem = data;
183235
}).
184-
then(data => logger.info("Sysinfo:", JSON.stringify(sysinfo))).
236+
then(data =>{
237+
let sysInfo = {
238+
jobId: taskId.split(':').slice(0, 3).join('-'),
239+
cpu: sysinfo.cpu,
240+
mem: sysinfo.mem
241+
}
242+
metricsListenerConnection.send('sys_info', sysInfo);
243+
logger.info("Sysinfo:", JSON.stringify(sysinfo))
244+
}).
185245
catch(err => console.err(error));
186246

187247
//console.log(Date.now(), 'job started');
@@ -233,6 +293,16 @@ async function handleJob(taskId, rcl) {
233293
} else {
234294
logger.info('job successful (try ' + attempt + '):', jm["name"]);
235295
}
296+
297+
// send jobEnd event
298+
const job_end_event = {
299+
...metricBase,
300+
time: new Date(),
301+
parameter: 'event',
302+
value: 'jobEnd'
303+
}
304+
metricsListenerConnection.send('metric', job_end_event);
305+
236306
logger.info('job exit code:', code);
237307

238308
// retry the job
@@ -346,6 +416,7 @@ async function handleJob(taskId, rcl) {
346416

347417
//var rcl = redis.createClient(redisUrl);
348418

419+
const handlerStartTime = new Date();
349420
logger.info('handler started, (ID: ' + handlerId + ')');
350421

351422
// 0. Detect multiple task acquisitions
@@ -378,6 +449,43 @@ async function handleJob(taskId, rcl) {
378449
logger.info('jobMessage: ', jobMessage)
379450
console.log("Received job message:", jobMessage);
380451
jm = JSON.parse(jobMessage);
452+
metricsListenerConnection = new Connection(process.env.METRICS_LISTENER_URL);
453+
454+
const jobDescription = {
455+
workflowName: process.env.HF_WORKFLOW_NAME || 'unknown',
456+
size: 1, // TODO
457+
version: '1.0.0',
458+
hyperflowId: taskId.split(':')[0],
459+
jobId: taskId.split(':').slice(0, 3).join('-'),
460+
env: {
461+
podIp: process.env.HF_LOG_POD_IP || "unknown",
462+
nodeName: process.env.HF_LOG_NODE_NAME || "unknown",
463+
podName: process.env.HF_LOG_POD_NAME || "unknown",
464+
podServiceAccount: process.env.HF_LOG_POD_SERVICE_ACCOUNT || "default",
465+
podNameSpace: process.env.HF_LOG_POD_NAMESPACE || "default"
466+
},
467+
executable: jm['executable'],
468+
args: jm['args'],
469+
nodeName: process.env.HF_LOG_NODE_NAME || 'unknown',
470+
inputs: jm['inputs'],
471+
outputs: jm['outputs'],
472+
name: jm['name'],
473+
command: jm['executable'] + ' ' + jm["args"].join(' ')
474+
}
475+
metricBase = {
476+
workflowId: taskId.split(':').slice(0, 2).join('-'),
477+
jobId: taskId.split(':').slice(0, 3).join('-'),
478+
name: jm['name'],
479+
}
480+
// send handler_start event (has to be done after connection has been established)
481+
const handler_start_event = {
482+
...metricBase,
483+
time: handlerStartTime,
484+
parameter: 'event',
485+
value: 'handlerStart'
486+
}
487+
metricsListenerConnection.send('metric', handler_start_event);
488+
metricsListenerConnection.send('job_description', jobDescription);
381489

382490
// 3. Check/wait for input files
383491
if (process.env.HF_VAR_WAIT_FOR_INPUT_FILES=="1" && jm.inputs && jm.inputs.length) {
@@ -437,6 +545,15 @@ async function handleJob(taskId, rcl) {
437545
// **Experimental**: remove job info from Redis "hf_all_jobs" set
438546
rcl.srem("hf_all_jobs", allJobsMember, function (err, ret) { if (err) console.log(err); });
439547

548+
// send handler_finished event
549+
const handler_finished_event = {
550+
...metricBase,
551+
time: new Date(),
552+
parameter: "event",
553+
value: "handlerEnd"
554+
};
555+
metricsListenerConnection.send('metric', handler_finished_event);
556+
440557
logger.info('handler finished, code=', jobExitCode);
441558

442559
// 6. Perform cleanup operations
@@ -450,7 +567,7 @@ async function handleJob(taskId, rcl) {
450567
}
451568
});
452569
pidusage.clear();
453-
570+
metricsListenerConnection.close()
454571
return jobExitCode;
455572
}
456573

jobexec.js

100644100755
File mode changed.

0 commit comments

Comments
 (0)