Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
!tests
!utils
!wflib
!tracing.js
61 changes: 0 additions & 61 deletions docker-compose.yaml

This file was deleted.

7 changes: 7 additions & 0 deletions examples/RemoteJobs/functions.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ async function submitRemoteJob(ins, outs, context, cb) {
// "submit" job (start the handler process)
var proc = spawn(cmd, ['../../../hyperflow-job-executor/jobexec.js', context.taskId, context.redis_url, parent_id, trace_id], {shell: true});

if (span.isRecording()) {
span.setAttributes({
'hfId': context.hfId, 'appId': context.appId, 'input_dir': input_dir,
'work_dir': work_dir, 'output_dir': output_dir
})
}

proc.stderr.on('data', function (data) {
logger.debug(data.toString());
console.log(data.toString());
Expand Down
4 changes: 2 additions & 2 deletions functions/k8sCommand.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const k8s = require('@kubernetes/client-node');
const yaml = require('js-yaml');
var fs = require('fs');

async function k8sCommand(ins, outs, context, cb) {
async function k8sCommand(ins, outs, context, cb, parentId, traceId) {

let handlerStart = Date.now();
console.log("[DEBUG] K8sInvoke called.");
Expand All @@ -29,7 +29,7 @@ async function k8sCommand(ins, outs, context, cb) {

const k8sApi = kc.makeApiClient(k8s.BatchV1Api);
try {
var command = 'hflow-job-execute ' + context.taskId + ' ' + context.redis_url;
var command = 'hflow-job-execute ' + context.taskId + ' ' + context.redis_url + ' ' + parentId + ' ' + traceId;
var containerName = process.env.HF_VAR_WORKER_CONTAINER;
var volumePath = '/work_dir';
var jobName = Math.random().toString(36).substring(7) + '-' + context.name.replace(/_/g, '-') + "-" + context.procId;
Expand Down
36 changes: 22 additions & 14 deletions functions/kubernetes/bojK8sCommand.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// bojK8sCommand.js
// Runs bags-of-jobs on a Kubernetes cluster

const tracer = require("../../tracing.js")("hyperflow-kubernetes");
const k8s = require('@kubernetes/client-node');
var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob;
var fs = require('fs');
Expand Down Expand Up @@ -30,21 +31,28 @@ async function bojK8sCommand(ins, outs, context, cb) {
let jobsFileName = ins["jobSetsFile"].data[0];
let jobs = JSON.parse(fs.readFileSync(jobsFileName));

let results, errors;

// run job sets in parallel with concurrency limit
const PromisePool = require('@supercharge/promise-pool');
const { results, errors } = await PromisePool
.for(jobs)
.withConcurrency(5)
.process(async jobs => {
jobPromises = jobs.map(job => {
//let taskId = job.name + "-" + jsetIdx + "-" + jIdx;
let taskId = job.name;
let customParams = {};
return submitK8sJob(kubeconfig, job, taskId, context, customParams);
});
jobExitCodes = await Promise.all(jobPromises);
return jobExitCodes;
});
tracer.startActiveSpan('bojK8s', async span => {
const PromisePool = require('@supercharge/promise-pool');
const {results, errors} = await PromisePool
.for(jobs)
.withConcurrency(5)
.process(async jobs => {
jobPromises = jobs.map(job => {
//let taskId = job.name + "-" + jsetIdx + "-" + jIdx;
let taskId = job.name;
let customParams = {};
var traceId = span.spanContext().traceId
var parentId = span.spanContext().spanId
return submitK8sJob(kubeconfig, job, taskId, context, customParams, parentId, traceId);
});
jobExitCodes = await Promise.all(jobPromises);
return jobExitCodes;
});
span.end();
});

console.log(results, errors);

Expand Down
9 changes: 8 additions & 1 deletion functions/kubernetes/k8sCommand.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Runs a job as a Pod (Kubernetes Job) in a Kubernetes cluster

const tracer = require("../../tracing.js")("hyperflow-kubernetes");
const k8s = require('@kubernetes/client-node');
var BufferManager = require('./buffer_manager.js').BufferManager;
var RestartCounter = require('./restart_counter.js').RestartCounter;
Expand Down Expand Up @@ -125,17 +126,23 @@ async function k8sCommandGroup(bufferItems) {
kubeconfig.loadFromDefault(); // loadFromString(JSON.stringify(kconfig))

let jobExitCodes = [];
tracer.startActiveSpan('k8sCommands', async span => {
try {
if (getExecutorType(context) === "WORKER_POOL") {
await amqpEnqueueJobs(jobArr, taskIdArr, contextArr, customParams)
} else {
await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams)
var traceId = span.spanContext().traceId
var parentId = span.spanContext().spanId
await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, parentId, traceId)

}
jobExitCodes = await synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn);
} catch (err) {
console.log("Error when submitting job:", err);
throw err;
}
span.end();
});

let endTime = Date.now();
console.log("Ending k8sCommandGroup function, time:", endTime, "exit codes:", jobExitCodes);
Expand Down
8 changes: 4 additions & 4 deletions functions/kubernetes/k8sJobSubmit.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ function createK8sJobMessage(job, taskId, context) {
//
// Returns:
// - jobYaml: string with job YAML to create the k8s job
var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) => {
var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams, parentId, traceId) => {
let quotedTaskIds = taskIds.map(x => '"' + x + '"');
var command = 'hflow-job-execute ' + context.redis_url + ' -a -- ' + quotedTaskIds.join(' ');
var command = 'hflow-job-execute ' + context.redis_url + ' ' + parentId + ' ' + traceId + ' -a -- ' + quotedTaskIds.join(' ');
var containerName = job.image || process.env.HF_VAR_WORKER_CONTAINER;
var volumePath = '/work_dir';
var jobName = Math.random().toString(36).substring(7) + '-' +
Expand Down Expand Up @@ -105,7 +105,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
//
//
// Returns: job exit code
var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams) => {
var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams, parentId, traceId) => {

// Load definition of the the worker job pod
// File 'job-template.yaml' should be provided externally during deployment
Expand All @@ -116,7 +116,7 @@ var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams
let context = contextArr[0];

// CAUTION: When creating job YAML first job details (requests, container) are used.
var jobYaml = createK8sJobYaml(jobArr[0], taskIdArr, contextArr[0], jobYamlTemplate, customParams);
var jobYaml = createK8sJobYaml(jobArr[0], taskIdArr, contextArr[0], jobYamlTemplate, customParams, parentId, traceId);
let jobMessages = [];
for (var i=0; i<jobArr.length; i++) {
let job = jobArr[i];
Expand Down
Loading