Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ const logger = require('./common/logger')
const helper = require('./common/helper')
const JobProcessorService = require('./services/JobProcessorService')
const JobCandidateProcessorService = require('./services/JobCandidateProcessorService')
const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService')
const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService')
const InterviewProcessorService = require('./services/InterviewProcessorService')
const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService')
const RoleProcessorService = require('./services/RoleProcessorService')
// const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService')
// const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService')
// const InterviewProcessorService = require('./services/InterviewProcessorService')
// const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService')
// const RoleProcessorService = require('./services/RoleProcessorService')
const ActionProcessorService = require('./services/ActionProcessorService')
const Mutex = require('async-mutex').Mutex
const events = require('events')
Expand All @@ -34,30 +34,30 @@ const topicServiceMapping = {
// job
[config.topics.TAAS_JOB_CREATE_TOPIC]: JobProcessorService.processCreate,
[config.topics.TAAS_JOB_UPDATE_TOPIC]: JobProcessorService.processUpdate,
[config.topics.TAAS_JOB_DELETE_TOPIC]: JobProcessorService.processDelete,
// [config.topics.TAAS_JOB_DELETE_TOPIC]: JobProcessorService.processDelete,
// job candidate
[config.topics.TAAS_JOB_CANDIDATE_CREATE_TOPIC]: JobCandidateProcessorService.processCreate,
// [config.topics.TAAS_JOB_CANDIDATE_CREATE_TOPIC]: JobCandidateProcessorService.processCreate,
[config.topics.TAAS_JOB_CANDIDATE_UPDATE_TOPIC]: JobCandidateProcessorService.processUpdate,
[config.topics.TAAS_JOB_CANDIDATE_DELETE_TOPIC]: JobCandidateProcessorService.processDelete,
// [config.topics.TAAS_JOB_CANDIDATE_DELETE_TOPIC]: JobCandidateProcessorService.processDelete,
// resource booking
[config.topics.TAAS_RESOURCE_BOOKING_CREATE_TOPIC]: ResourceBookingProcessorService.processCreate,
[config.topics.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingProcessorService.processUpdate,
[config.topics.TAAS_RESOURCE_BOOKING_DELETE_TOPIC]: ResourceBookingProcessorService.processDelete,
// [config.topics.TAAS_RESOURCE_BOOKING_CREATE_TOPIC]: ResourceBookingProcessorService.processCreate,
// [config.topics.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingProcessorService.processUpdate,
// [config.topics.TAAS_RESOURCE_BOOKING_DELETE_TOPIC]: ResourceBookingProcessorService.processDelete,
// work period
[config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate,
[config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate,
[config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete,
// [config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate,
// [config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate,
// [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete,
// work period payment
[config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate,
[config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate,
// [config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate,
// [config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate,
// interview
[config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview,
[config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview,
[config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews,
// [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview,
// [config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview,
// [config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews,
// role
[config.topics.TAAS_ROLE_CREATE_TOPIC]: RoleProcessorService.processCreate,
[config.topics.TAAS_ROLE_UPDATE_TOPIC]: RoleProcessorService.processUpdate,
[config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete,
// [config.topics.TAAS_ROLE_CREATE_TOPIC]: RoleProcessorService.processCreate,
// [config.topics.TAAS_ROLE_UPDATE_TOPIC]: RoleProcessorService.processUpdate,
// [config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete,
// action
[config.topics.TAAS_ACTION_RETRY_TOPIC]: ActionProcessorService.processRetry
}
Expand Down Expand Up @@ -117,12 +117,10 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
}
const transactionId = _.uniqueId('transaction_')
try {
if (!topicServiceMapping[topic]) {
throw new Error(`Unknown topic: ${topic}`) // normally it never reaches this line
if (topicServiceMapping[topic]) {
await topicServiceMapping[topic](messageJSON, transactionId)
localLogger.debug(`Successfully processed message with count ${messageCount}`)
}
await topicServiceMapping[topic](messageJSON, transactionId)

localLogger.debug(`Successfully processed message with count ${messageCount}`)
} catch (err) {
logger.logFullError(err, { component: 'app' })
} finally {
Expand Down
24 changes: 12 additions & 12 deletions src/services/JobCandidateProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ async function processUpdate (message, transactionId) {
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: data.id
})
await esClient.updateExtra({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: data.id,
transactionId,
body: {
doc: data
},
refresh: constants.esRefreshOption
})
// await esClient.updateExtra({
// index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
// id: data.id,
// transactionId,
// body: {
// doc: data
// },
// refresh: constants.esRefreshOption
// })
await postMessageToZapier({
type: constants.Zapier.MessageType.JobCandidateUpdate,
payload: data,
Expand Down Expand Up @@ -174,9 +174,9 @@ processDelete.schema = {
}

module.exports = {
processCreate,
processUpdate,
processDelete
// processCreate,
processUpdate
// processDelete
}

logger.buildService(module.exports, 'JobCandidateProcessorService')
32 changes: 16 additions & 16 deletions src/services/JobProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ async function postMessageToZapier ({ type, payload }) {
*/
async function processCreate (message, transactionId) {
const job = message.payload
await esClient.createExtra({
index: config.get('esConfig.ES_INDEX_JOB'),
id: job.id,
transactionId,
body: job,
refresh: constants.esRefreshOption
})
// await esClient.createExtra({
// index: config.get('esConfig.ES_INDEX_JOB'),
// id: job.id,
// transactionId,
// body: job,
// refresh: constants.esRefreshOption
// })
await postMessageToZapier({
type: constants.Zapier.MessageType.JobCreate,
payload: job
Expand Down Expand Up @@ -110,15 +110,15 @@ processCreate.schema = {
*/
async function processUpdate (message, transactionId) {
const data = message.payload
await esClient.updateExtra({
index: config.get('esConfig.ES_INDEX_JOB'),
id: data.id,
transactionId,
body: {
doc: data
},
refresh: constants.esRefreshOption
})
// await esClient.updateExtra({
// index: config.get('esConfig.ES_INDEX_JOB'),
// id: data.id,
// transactionId,
// body: {
// doc: data
// },
// refresh: constants.esRefreshOption
// })
await postMessageToZapier({
type: constants.Zapier.MessageType.JobUpdate,
payload: data
Expand Down