Skip to content

Commit 41038d1

Browse files
committed
add connection to metrics listener
1 parent 5d02936 commit 41038d1

File tree

4 files changed

+1229
-5
lines changed

4 files changed

+1229
-5
lines changed

handler.js

Lines changed: 116 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env node
22
// Executor of 'jobs' using the Redis task status notification mechanism
3-
3+
const { Connection } = require('@hflow-monitoring/hyperflow-metrics-listeners').Client;
44
const { spawn } = require('child_process');
55
const redis = require('redis');
66
const fs = require('fs');
@@ -79,7 +79,8 @@ async function handleJob(taskId, rcl) {
7979

8080
var pids = {} // pids of the entire pid tree (in case the main process starts child processes)
8181
var jm; // parsed job message
82-
82+
const metricsListenerConnection = new Connection(process.env.METRICS_LISTENER_URL);
83+
var metricBase; // metric object with all common fields set
8384
// logging basic process info from the procfs
8485
logProcInfo = function (pid) {
8586
// log process command line
@@ -98,6 +99,14 @@ async function handleJob(taskId, rcl) {
9899
let ioInfo = procfs.processIo(pid);
99100
ioInfo.pid = pid;
100101
ioInfo.name = jm["name"];
102+
let io = {
103+
...metricBase,
104+
time: new Date(),
105+
pid: pid + '',
106+
parameter: 'io',
107+
value: ioInfo
108+
}
109+
metricsListenerConnection.send('metric', io);
101110
logger.info("IO:", JSON.stringify(ioInfo));
102111
setTimeout(() => logProcIO(pid), probeInterval);
103112
} catch (error) {
@@ -111,8 +120,16 @@ async function handleJob(taskId, rcl) {
111120
logProcNetDev = function (pid) {
112121
try {
113122
let netDevInfo = procfs.processNetDev(pid);
123+
let network = {
124+
...metricBase,
125+
time: new Date(stats.timestamp),
126+
pid: pid + '',
127+
parameter: 'network',
128+
value: JSON.stringify(netDevInfo)
129+
}
114130
//netDevInfo.pid = pid;
115131
//netDevInfo.name = jm["name"];
132+
metricsListenerConnection.send('metric', network);
116133
logger.info("NetDev: pid:", pid, JSON.stringify(netDevInfo));
117134
setTimeout(() => logProcNetDev(pid), probeInterval);
118135
} catch (error) {
@@ -139,6 +156,30 @@ async function handleJob(taskId, rcl) {
139156
// elapsed: 6650000, // ms since the start of the process
140157
// timestamp: 864000000 // ms since epoch
141158
// }
159+
let cpu = {
160+
...metricBase,
161+
time: new Date(stats.timestamp),
162+
pid: pid + '',
163+
parameter: 'cpu',
164+
value: stats.cpu
165+
};
166+
let memory = {
167+
...metricBase,
168+
time: new Date(stats.timestamp),
169+
pid: pid + '',
170+
parameter: 'memory',
171+
value: stats.memory
172+
}
173+
let ctime = {
174+
...metricBase,
175+
time: new Date(stats.timestamp),
176+
pid: pid + '',
177+
parameter: 'ctime',
178+
value: stats.ctime
179+
}
180+
metricsListenerConnection.send('metric', cpu);
181+
metricsListenerConnection.send('metric', memory);
182+
metricsListenerConnection.send('metric', ctime);
142183
logger.info("Procusage: pid:", pid, JSON.stringify(stats));
143184
setTimeout(() => logPidUsage(pid), probeInterval);
144185
});
@@ -168,6 +209,14 @@ async function handleJob(taskId, rcl) {
168209
cmd.stderr.pipe(stderrLog);
169210

170211
logProcInfo(targetPid);
212+
// send jobStart event
213+
const job_start_event = {
214+
...metricBase,
215+
time: new Date(),
216+
parameter: 'event',
217+
value: 'jobStart'
218+
}
219+
metricsListenerConnection.send('metric', job_start_event);
171220
logger.info('job started:', jm["name"]);
172221

173222
var sysinfo = {};
@@ -181,7 +230,15 @@ async function handleJob(taskId, rcl) {
181230
then(data => {
182231
sysinfo.mem = data;
183232
}).
184-
then(data => logger.info("Sysinfo:", JSON.stringify(sysinfo))).
233+
then(data =>{
234+
let sysInfo = {
235+
jobId: taskId.split(':').slice(0, 3).join('-'),
236+
cpu: sysinfo.cpu,
237+
mem: sysinfo.mem
238+
}
239+
metricsListenerConnection.send('sys_info', sysInfo);
240+
logger.info("Sysinfo:", JSON.stringify(sysinfo))
241+
}).
185242
catch(err => console.err(error));
186243

187244
//console.log(Date.now(), 'job started');
@@ -233,6 +290,15 @@ async function handleJob(taskId, rcl) {
233290
} else {
234291
logger.info('job successful (try ' + attempt + '):', jm["name"]);
235292
}
293+
294+
// send jobEnd event
295+
const job_end_event = {
296+
...metricBase,
297+
time: new Date(),
298+
parameter: 'event',
299+
value: 'jobEnd'
300+
}
301+
metricsListenerConnection.send('metric', job_end_event);
236302
logger.info('job exit code:', code);
237303

238304
// retry the job
@@ -346,6 +412,7 @@ async function handleJob(taskId, rcl) {
346412

347413
//var rcl = redis.createClient(redisUrl);
348414

415+
const handlerStartTime = new Date();
349416
logger.info('handler started, (ID: ' + handlerId + ')');
350417

351418
// 0. Detect multiple task acquisitions
@@ -379,6 +446,42 @@ async function handleJob(taskId, rcl) {
379446
console.log("Received job message:", jobMessage);
380447
jm = JSON.parse(jobMessage);
381448

449+
const jobDescription = {
450+
workflowName: process.env.HF_WORKFLOW_NAME || 'unknown',
451+
size: 1, // TODO
452+
version: '1.0.0',
453+
hyperflowId: taskId.split(':')[0],
454+
jobId: taskId.split(':').slice(0, 3).join('-'),
455+
env: {
456+
podIp: process.env.HF_LOG_POD_IP || "unknown",
457+
nodeName: process.env.HF_LOG_NODE_NAME || "unknown",
458+
podName: process.env.HF_LOG_POD_NAME || "unknown",
459+
podServiceAccount: process.env.HF_LOG_POD_SERVICE_ACCOUNT || "default",
460+
podNameSpace: process.env.HF_LOG_POD_NAMESPACE || "default"
461+
},
462+
executable: jm['executable'],
463+
args: jm['args'],
464+
nodeName: process.env.HF_LOG_NODE_NAME || 'unknown',
465+
inputs: jm['inputs'],
466+
outputs: jm['outputs'],
467+
name: jm['name'],
468+
command: jm['executable'] + ' ' + jm["args"].join(' ')
469+
}
470+
metricBase = {
471+
workflowId: taskId.split(':').slice(0, 2).join('-'),
472+
jobId: taskId.split(':').slice(0, 3).join('-'),
473+
name: jm['name'],
474+
}
475+
// send handler_start event (has to be done after connection has been established)
476+
const handler_start_event = {
477+
...metricBase,
478+
time: handlerStartTime,
479+
parameter: 'event',
480+
value: 'handlerStart'
481+
}
482+
metricsListenerConnection.send('metric', handler_start_event);
483+
metricsListenerConnection.send('job_description', jobDescription);
484+
382485
// 3. Check/wait for input files
383486
if (process.env.HF_VAR_WAIT_FOR_INPUT_FILES=="1" && jm.inputs && jm.inputs.length) {
384487
var files = jm.inputs.map(input => input.name).slice();
@@ -437,6 +540,15 @@ async function handleJob(taskId, rcl) {
437540
// **Experimental**: remove job info from Redis "hf_all_jobs" set
438541
rcl.srem("hf_all_jobs", allJobsMember, function (err, ret) { if (err) console.log(err); });
439542

543+
// send handler_finished event
544+
const handler_finished_event = {
545+
...metricBase,
546+
time: new Date(),
547+
parameter: "event",
548+
value: "handlerEnd"
549+
};
550+
metricsListenerConnection.send('metric', handler_finished_event);
551+
440552
logger.info('handler finished, code=', jobExitCode);
441553

442554
// 6. Perform cleanup operations
@@ -450,7 +562,7 @@ async function handleJob(taskId, rcl) {
450562
}
451563
});
452564
pidusage.clear();
453-
565+
metricsListenerConnection.close()
454566
return jobExitCode;
455567
}
456568

jobexec.js

100644100755
File mode changed.

0 commit comments

Comments
 (0)