Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 116 additions & 4 deletions handler.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env node
// Executor of 'jobs' using the Redis task status notification mechanism

const { Connection } = require('@hflow-monitoring/hyperflow-metrics-listeners').Client;
const { spawn } = require('child_process');
const redis = require('redis');
const fs = require('fs');
Expand Down Expand Up @@ -79,7 +79,8 @@ async function handleJob(taskId, rcl) {

var pids = {} // pids of the entire pid tree (in case the main process starts child processes)
var jm; // parsed job message

const metricsListenerConnection = new Connection(process.env.METRICS_LISTENER_URL);
var metricBase; // metric object with all common fields set
// logging basic process info from the procfs
logProcInfo = function (pid) {
// log process command line
Expand All @@ -98,6 +99,14 @@ async function handleJob(taskId, rcl) {
let ioInfo = procfs.processIo(pid);
ioInfo.pid = pid;
ioInfo.name = jm["name"];
let io = {
...metricBase,
time: new Date(),
pid: pid + '',
parameter: 'io',
value: ioInfo
}
metricsListenerConnection.send('metric', io);
logger.info("IO:", JSON.stringify(ioInfo));
setTimeout(() => logProcIO(pid), probeInterval);
} catch (error) {
Expand All @@ -111,8 +120,16 @@ async function handleJob(taskId, rcl) {
logProcNetDev = function (pid) {
try {
let netDevInfo = procfs.processNetDev(pid);
let network = {
...metricBase,
time: new Date(stats.timestamp),
pid: pid + '',
parameter: 'network',
value: JSON.stringify(netDevInfo)
}
//netDevInfo.pid = pid;
//netDevInfo.name = jm["name"];
metricsListenerConnection.send('metric', network);
logger.info("NetDev: pid:", pid, JSON.stringify(netDevInfo));
setTimeout(() => logProcNetDev(pid), probeInterval);
} catch (error) {
Expand All @@ -139,6 +156,30 @@ async function handleJob(taskId, rcl) {
// elapsed: 6650000, // ms since the start of the process
// timestamp: 864000000 // ms since epoch
// }
let cpu = {
...metricBase,
time: new Date(stats.timestamp),
pid: pid + '',
parameter: 'cpu',
value: stats.cpu
};
let memory = {
...metricBase,
time: new Date(stats.timestamp),
pid: pid + '',
parameter: 'memory',
value: stats.memory
}
let ctime = {
...metricBase,
time: new Date(stats.timestamp),
pid: pid + '',
parameter: 'ctime',
value: stats.ctime
}
metricsListenerConnection.send('metric', cpu);
metricsListenerConnection.send('metric', memory);
metricsListenerConnection.send('metric', ctime);
logger.info("Procusage: pid:", pid, JSON.stringify(stats));
setTimeout(() => logPidUsage(pid), probeInterval);
});
Expand Down Expand Up @@ -168,6 +209,14 @@ async function handleJob(taskId, rcl) {
cmd.stderr.pipe(stderrLog);

logProcInfo(targetPid);
// send jobStart event
const job_start_event = {
...metricBase,
time: new Date(),
parameter: 'event',
value: 'jobStart'
}
metricsListenerConnection.send('metric', job_start_event);
logger.info('job started:', jm["name"]);

var sysinfo = {};
Expand All @@ -181,7 +230,15 @@ async function handleJob(taskId, rcl) {
then(data => {
sysinfo.mem = data;
}).
then(data => logger.info("Sysinfo:", JSON.stringify(sysinfo))).
then(data =>{
let sysInfo = {
jobId: taskId.split(':').slice(0, 3).join('-'),
cpu: sysinfo.cpu,
mem: sysinfo.mem
}
metricsListenerConnection.send('sys_info', sysInfo);
logger.info("Sysinfo:", JSON.stringify(sysinfo))
}).
catch(err => console.err(error));

//console.log(Date.now(), 'job started');
Expand Down Expand Up @@ -233,6 +290,15 @@ async function handleJob(taskId, rcl) {
} else {
logger.info('job successful (try ' + attempt + '):', jm["name"]);
}

// send jobEnd event
const job_end_event = {
...metricBase,
time: new Date(),
parameter: 'event',
value: 'jobEnd'
}
metricsListenerConnection.send('metric', job_end_event);
logger.info('job exit code:', code);

// retry the job
Expand Down Expand Up @@ -346,6 +412,7 @@ async function handleJob(taskId, rcl) {

//var rcl = redis.createClient(redisUrl);

const handlerStartTime = new Date();
logger.info('handler started, (ID: ' + handlerId + ')');

// 0. Detect multiple task acquisitions
Expand Down Expand Up @@ -379,6 +446,42 @@ async function handleJob(taskId, rcl) {
console.log("Received job message:", jobMessage);
jm = JSON.parse(jobMessage);

const jobDescription = {
workflowName: process.env.HF_WORKFLOW_NAME || 'unknown',
size: 1, // TODO
version: '1.0.0',
hyperflowId: taskId.split(':')[0],
jobId: taskId.split(':').slice(0, 3).join('-'),
env: {
podIp: process.env.HF_LOG_POD_IP || "unknown",
nodeName: process.env.HF_LOG_NODE_NAME || "unknown",
podName: process.env.HF_LOG_POD_NAME || "unknown",
podServiceAccount: process.env.HF_LOG_POD_SERVICE_ACCOUNT || "default",
podNameSpace: process.env.HF_LOG_POD_NAMESPACE || "default"
},
executable: jm['executable'],
args: jm['args'],
nodeName: process.env.HF_LOG_NODE_NAME || 'unknown',
inputs: jm['inputs'],
outputs: jm['outputs'],
name: jm['name'],
command: jm['executable'] + ' ' + jm["args"].join(' ')
}
metricBase = {
workflowId: taskId.split(':').slice(0, 2).join('-'),
jobId: taskId.split(':').slice(0, 3).join('-'),
name: jm['name'],
}
// send handler_start event (has to be done after connection has been established)
const handler_start_event = {
...metricBase,
time: handlerStartTime,
parameter: 'event',
value: 'handlerStart'
}
metricsListenerConnection.send('metric', handler_start_event);
metricsListenerConnection.send('job_description', jobDescription);

// 3. Check/wait for input files
if (process.env.HF_VAR_WAIT_FOR_INPUT_FILES=="1" && jm.inputs && jm.inputs.length) {
var files = jm.inputs.map(input => input.name).slice();
Expand Down Expand Up @@ -437,6 +540,15 @@ async function handleJob(taskId, rcl) {
// **Experimental**: remove job info from Redis "hf_all_jobs" set
rcl.srem("hf_all_jobs", allJobsMember, function (err, ret) { if (err) console.log(err); });

// send handler_finished event
const handler_finished_event = {
...metricBase,
time: new Date(),
parameter: "event",
value: "handlerEnd"
};
metricsListenerConnection.send('metric', handler_finished_event);

logger.info('handler finished, code=', jobExitCode);

// 6. Perform cleanup operations
Expand All @@ -450,7 +562,7 @@ async function handleJob(taskId, rcl) {
}
});
pidusage.clear();

metricsListenerConnection.close()
return jobExitCode;
}

Expand Down
Empty file modified jobexec.js
100644 → 100755
Empty file.
Loading