Skip to content
Merged
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ The following parameters can be set in config files or in env variables:
- `topics.TAAS_WORK_PERIOD_DELETE_TOPIC`: the delete work period entity Kafka message topic
- `topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC`: the create work period payment entity Kafka message topic
- `topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC`: the update work period payment entity Kafka message topic
- `topics.TAAS_INTERVIEW_REQUEST_TOPIC`: the request interview entity Kafka message topic
- `topics.TAAS_INTERVIEW_UPDATE_TOPIC`: the update interview entity Kafka message topic
- `topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC`: the bulk update interview entity Kafka message topic
- `esConfig.HOST`: Elasticsearch host
- `esConfig.AWS_REGION`: The Amazon region to use when using AWS Elasticsearch service
- `esConfig.ELASTICCLOUD.id`: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this
Expand Down
8 changes: 6 additions & 2 deletions VERIFICATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

## Create documents in ES

- Run the following commands to create `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES.
- Run the following commands to create `Job`, `JobCandidate`, `Interview`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES.

``` bash
# for Job
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.job.create < test/messages/taas.job.create.event.json
# for JobCandidate
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.jobcandidate.create < test/messages/taas.jobcandidate.create.event.json
# for Interview
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.interview.requested < test/messages/taas.interview.requested.event.json
# for ResourceBooking
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.resourcebooking.create < test/messages/taas.resourcebooking.create.event.json
# for WorkPeriod
Expand All @@ -20,13 +22,15 @@
- Run `npm run view-data <model-name-here>` to see if documents were created.

## Update documents in ES
- Run the following commands to update `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES.
- Run the following commands to update `Job`, `JobCandidate`, `Interview`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES.

``` bash
# for Job
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.job.update < test/messages/taas.job.update.event.json
# for JobCandidate
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.jobcandidate.update < test/messages/taas.jobcandidate.update.event.json
# for Interview
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.interview.update < test/messages/taas.interview.update.event.json
# for ResourceBooking
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.resourcebooking.update < test/messages/taas.resourcebooking.update.event.json
# for WorkPeriod
Expand Down
6 changes: 5 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ module.exports = {
TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete',
// topics for work period payment service
TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC || 'taas.workperiodpayment.create',
TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC || 'taas.workperiodpayment.update'
TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC || 'taas.workperiodpayment.update',
// topics for interview service
TAAS_INTERVIEW_REQUEST_TOPIC: process.env.TAAS_INTERVIEW_REQUEST_TOPIC || 'taas.interview.requested',
TAAS_INTERVIEW_UPDATE_TOPIC: process.env.TAAS_INTERVIEW_UPDATE_TOPIC || 'taas.interview.update',
TAAS_INTERVIEW_BULK_UPDATE_TOPIC: process.env.TAAS_INTERVIEW_BULK_UPDATE_TOPIC || 'taas.interview.bulkUpdate'
},

esConfig: {
Expand Down
2 changes: 1 addition & 1 deletion local/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.workperiod.create:1:1,taas.workperiodpayment.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.workperiod.update:1:1,taas.workperiodpayment.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1,taas.workperiod.delete:1:1"
KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.interview.requested:1:1,taas.resourcebooking.create:1:1,taas.workperiod.create:1:1,taas.workperiodpayment.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.interview.update:1:1,taas.interview.bulkUpdate:1:1,taas.resourcebooking.update:1:1,taas.workperiod.update:1:1,taas.workperiodpayment.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1,taas.workperiod.delete:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
esearch:
image: elasticsearch:7.7.1
Expand Down
7 changes: 6 additions & 1 deletion src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const JobProcessorService = require('./services/JobProcessorService')
const JobCandidateProcessorService = require('./services/JobCandidateProcessorService')
const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService')
const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService')
const InterviewProcessorService = require('./services/InterviewProcessorService')
const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService')
const Mutex = require('async-mutex').Mutex
const events = require('events')
Expand Down Expand Up @@ -47,7 +48,11 @@ const topicServiceMapping = {
[config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete,
// work period payment
[config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate,
[config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate
[config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate,
// interview
[config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview,
[config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview,
[config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews
}

// Start kafka consumer
Expand Down
11 changes: 9 additions & 2 deletions src/bootstrap.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
const Joi = require('@hapi/joi')
const config = require('config')
const _ = require('lodash')
const { Interview } = require('../src/common/constants')
const constants = require('./common/constants')

const allowedXAITemplates = _.values(Interview.XaiTemplate)
const allowedInterviewStatuses = _.values(Interview.Status)

global.Promise = require('bluebird')

Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly')
Joi.jobStatus = () => Joi.string().valid('sourcing', 'in-review', 'assigned', 'closed', 'cancelled')
Joi.resourceBookingStatus = () => Joi.string().valid('assigned', 'closed', 'cancelled')
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected', 'cancelled', 'interview', 'topcoder-rejected')
Joi.resourceBookingStatus = () => Joi.string().valid('placed', 'closed', 'cancelled')
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'placed', 'selected', 'client rejected - screening', 'client rejected - interview', 'rejected - other', 'cancelled', 'interview', 'topcoder-rejected')
Joi.workload = () => Joi.string().valid('full-time', 'fractional')
Joi.title = () => Joi.string().max(128)
Joi.paymentStatus = () => Joi.string().valid('pending', 'partially-completed', 'completed', 'cancelled')
Joi.xaiTemplate = () => Joi.string().valid(...allowedXAITemplates)
Joi.interviewStatus = () => Joi.string().valid(...allowedInterviewStatuses)
Joi.workPeriodPaymentStatus = () => Joi.string().valid('completed', 'cancelled')
// Empty string is not allowed by Joi by default and must be enabled with allow('').
// See https://joi.dev/api/?v=17.3.0#string fro details why it's like this.
Expand Down
14 changes: 14 additions & 0 deletions src/common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,19 @@ module.exports = {
JobCandidateCreate: 'jobcandidate:create',
JobCandidateUpdate: 'jobcandidate:update'
}
},
Interview: {
Status: {
Scheduling: 'Scheduling',
Scheduled: 'Scheduled',
RequestedForReschedule: 'Requested for reschedule',
Rescheduled: 'Rescheduled',
Completed: 'Completed',
Cancelled: 'Cancelled'
},
XaiTemplate: {
'30MinInterview': 'interview-30',
'60MinInterview': 'interview-60'
}
}
}
18 changes: 18 additions & 0 deletions src/scripts/createIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ async function createIndex () {
status: { type: 'keyword' },
externalId: { type: 'keyword' },
resume: { type: 'text' },
interviews: {
type: 'nested',
properties: {
id: { type: 'keyword' },
jobCandidateId: { type: 'keyword' },
googleCalendarId: { type: 'keyword' },
customMessage: { type: 'text' },
xaiTemplate: { type: 'keyword' },
round: { type: 'integer' },
startTimestamp: { type: 'date' },
attendeesList: [],
status: { type: 'keyword' },
createdAt: { type: 'date' },
createdBy: { type: 'keyword' },
updatedAt: { type: 'date' },
updatedBy: { type: 'keyword' }
}
},
createdAt: { type: 'date' },
createdBy: { type: 'keyword' },
updatedAt: { type: 'date' },
Expand Down
189 changes: 189 additions & 0 deletions src/services/InterviewProcessorService.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/**
* Interview Processor Service
*/

const Joi = require('@hapi/joi')
const _ = require('lodash')
const logger = require('../common/logger')
const helper = require('../common/helper')
const constants = require('../common/constants')
const config = require('config')

const esClient = helper.getESClient()

/**
* Updates jobCandidate via a painless script
*
* @param {String} jobCandidateId job candidate id
* @param {String} script script definition
* @param {String} transactionId transaction id
*/
async function updateJobCandidateViaScript (jobCandidateId, script, transactionId) {
await esClient.updateExtra({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: jobCandidateId,
transactionId,
body: { script },
refresh: constants.esRefreshOption
})
}

/**
* Process request interview entity message.
* Creates an interview record under jobCandidate.
*
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processRequestInterview (message, transactionId) {
const interview = message.payload
// add interview in collection if there's already an existing collection
// or initiate a new one with this interview
const script = {
source: `
ctx._source.containsKey("interviews")
? ctx._source.interviews.add(params.interview)
: ctx._source.interviews = [params.interview]
`,
params: { interview }
}
await updateJobCandidateViaScript(interview.jobCandidateId, script, transactionId)
}

processRequestInterview.schema = {
message: Joi.object().keys({
topic: Joi.string().required(),
originator: Joi.string().required(),
timestamp: Joi.date().required(),
'mime-type': Joi.string().required(),
payload: Joi.object().keys({
id: Joi.string().uuid().required(),
jobCandidateId: Joi.string().uuid().required(),
googleCalendarId: Joi.string().allow(null),
customMessage: Joi.string().allow(null),
xaiTemplate: Joi.xaiTemplate().required(),
round: Joi.number().integer().positive().required(),
startTimestamp: Joi.date().allow(null),
attendeesList: Joi.array().items(Joi.string().email()).allow(null),
status: Joi.interviewStatus().required(),
createdAt: Joi.date().required(),
createdBy: Joi.string().uuid().required(),
updatedAt: Joi.date().allow(null),
updatedBy: Joi.string().uuid().allow(null)
}).required()
}).required(),
transactionId: Joi.string().required()
}

/**
* Process update interview entity message
* Updates the interview record under jobCandidate.
*
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processUpdateInterview (message, transactionId) {
const interview = message.payload
// if there's an interview with this id,
// update it with the payload
const script = {
source: `
if (ctx._source.containsKey("interviews")) {
def target = ctx._source.interviews.find(i -> i.id == params.interview.id);
if (target != null) {
for (prop in params.interview.entrySet()) {
target[prop.getKey()] = prop.getValue()
}
}
}
`,
params: { interview }
}
await updateJobCandidateViaScript(interview.jobCandidateId, script, transactionId)
}

processUpdateInterview.schema = processRequestInterview.schema

/**
* Process bulk (partially) update interviews entity message.
* Currently supports status, updatedAt and updatedBy fields.
* Update Joi schema to allow more fields.
* (implementation should already handle new fields - just updating Joi schema should be enough)
*
* payload format:
* {
* "jobCandidateId": {
* "interviewId": { ...fields },
* "interviewId2": { ...fields },
* ...
* },
* "jobCandidateId2": { // like above... },
* ...
* }
*
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processBulkUpdateInterviews (message, transactionId) {
const jobCandidates = message.payload
// script to update & params
const script = {
source: `
def completedInterviews = params.jobCandidates[ctx._id];
for (interview in completedInterviews.entrySet()) {
def interviewId = interview.getKey();
def affectedFields = interview.getValue();
def target = ctx._source.interviews.find(i -> i.id == interviewId);
if (target != null) {
for (field in affectedFields.entrySet()) {
target[field.getKey()] = field.getValue();
}
}
}
`,
params: { jobCandidates }
}
// update interviews
await esClient.updateByQuery({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
transactionId,
body: {
script,
query: {
ids: {
values: _.keys(jobCandidates)
}
}
},
refresh: true
})
}

processBulkUpdateInterviews.schema = {
message: Joi.object().keys({
topic: Joi.string().required(),
originator: Joi.string().required(),
timestamp: Joi.date().required(),
'mime-type': Joi.string().required(),
payload: Joi.object().pattern(
Joi.string().uuid(), // key - jobCandidateId
Joi.object().pattern(
Joi.string().uuid(), // inner key - interviewId
Joi.object().keys({
status: Joi.interviewStatus(),
updatedAt: Joi.date(),
updatedBy: Joi.string().uuid()
}) // inner value - affected fields of interview
) // value - object containing interviews
).min(1) // at least one key - i.e. don't allow empty object
}).required(),
transactionId: Joi.string().required()
}

module.exports = {
processRequestInterview,
processUpdateInterview,
processBulkUpdateInterviews
}

logger.buildService(module.exports, 'InterviewProcessorService')
3 changes: 2 additions & 1 deletion src/services/JobCandidateProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ async function updateCandidateStatus ({ type, payload, previousData }) {
localLogger.debug({ context: 'updateCandidateStatus', message: `jobCandidate is already in status: ${payload.status}` })
return
}
if (!['rejected', 'shortlist'].includes(payload.status)) {
//if (!['rejected', 'shortlist',].includes(payload.status)) {
if (!['client rejected - screening', 'client rejected - interview','interview','selected'].includes(payload.status)) {
localLogger.debug({ context: 'updateCandidateStatus', message: `not interested status: ${payload.status}` })
return
}
Expand Down