From 3a41f851ad395c5f4a1870c9ddf5da776164fd6a Mon Sep 17 00:00:00 2001
From: rashmi73 <41687423+rashmi73@users.noreply.github.com>
Date: Tue, 21 Jan 2020 16:13:25 +0530
Subject: [PATCH 01/15] Update project-team.html
---
emails/src/partials/project-team.html | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/emails/src/partials/project-team.html b/emails/src/partials/project-team.html
index 5c3671d..f02ddf3 100644
--- a/emails/src/partials/project-team.html
+++ b/emails/src/partials/project-team.html
@@ -61,7 +61,7 @@
{{#if [isSSO]}}
Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please use the link below to sign in and join the project.
{{else}}
- Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please click on the button ("View project on Connect") below to join.
+ Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please click on the button ("Join Project") below to join.
{{/if}}
{{/if}}
{{#if [connect.notification.project.member.invite.requested]}}
From d03dbaa56e913ff8744ce8a4c9e97ed385cdea65 Mon Sep 17 00:00:00 2001
From: yoution
Date: Wed, 22 Jan 2020 11:45:01 +0800
Subject: [PATCH 02/15] #3455 In app notifications are sent though they are
turned off #3455
---
src/services/NotificationService.js | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js
index cb92781..daea260 100644
--- a/src/services/NotificationService.js
+++ b/src/services/NotificationService.js
@@ -205,8 +205,8 @@ function* listNotifications(query, userId) {
if (_.keys(notificationSettings).length > 0) {
// only filter out notifications types which were explicitly set to 'no' - so we return notification by default
const notifications = _.keys(notificationSettings).filter((notificationType) =>
- !notificationSettings[notificationType] &&
- !notificationSettings[notificationType].web &&
+ notificationSettings[notificationType] &&
+ notificationSettings[notificationType].web &&
notificationSettings[notificationType].web.enabled === 'no'
);
filter.where.type = Object.assign(filter.where.type || {}, { $notIn: notifications });
From c3b96a59e582063eddf9672fc3363e47b54f902c Mon Sep 17 00:00:00 2001
From: Maksym Mykhailenko
Date: Wed, 29 Jan 2020 10:46:14 +0800
Subject: [PATCH 03/15] fix: join project text for SSO login
---
emails/src/partials/project-team.html | 4 ----
1 file changed, 4 deletions(-)
diff --git a/emails/src/partials/project-team.html b/emails/src/partials/project-team.html
index f02ddf3..e8802ae 100644
--- a/emails/src/partials/project-team.html
+++ b/emails/src/partials/project-team.html
@@ -58,11 +58,7 @@
{{userFullName}} joined the project
{{/if}}
{{#if [connect.notification.project.member.invite.created]}}
- {{#if [isSSO]}}
- Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please use the link below to sign in and join the project.
- {{else}}
Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please click on the button ("Join Project") below to join.
- {{/if}}
{{/if}}
{{#if [connect.notification.project.member.invite.requested]}}
You are requested to add {{userFullName}} as a copilot
From 71bca54d47849775e454c2f4f7ec7d086fdb45d8 Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Mon, 17 Feb 2020 15:28:26 +0530
Subject: [PATCH 04/15] initial bulk message developemnt.
---
config/default.js | 7 +-
src/hooks/hookBulkMessage.js | 137 ++++++++++++++++++++++++++++
src/hooks/index.js | 17 ++++
src/models/BulkMessageUserRefs.js | 34 +++++++
src/models/BulkMessages.js | 21 +++++
src/models/index.js | 6 ++
src/services/NotificationService.js | 6 ++
test/checkHooks.js | 3 +
8 files changed, 228 insertions(+), 3 deletions(-)
create mode 100644 src/hooks/hookBulkMessage.js
create mode 100644 src/hooks/index.js
create mode 100644 src/models/BulkMessageUserRefs.js
create mode 100644 src/models/BulkMessages.js
create mode 100644 test/checkHooks.js
diff --git a/config/default.js b/config/default.js
index bbcc8c5..c1252b5 100644
--- a/config/default.js
+++ b/config/default.js
@@ -58,7 +58,7 @@ module.exports = {
{
id: 0, /** challengeid or projectid */
name: '', /** challenge name */
- group: 'Challenge',
+ group: 'challenge',
title: 'Challenge specification is modified.',
},
},
@@ -75,7 +75,7 @@ module.exports = {
{
id: 0, /** challengeid or projectid */
name: '', /** challenge name */
- group: 'Challenge',
+ group: 'challenge',
title: 'Challenge checkpoint review.',
},
},
@@ -92,7 +92,7 @@ module.exports = {
{
id: 0, /** challengeid or projectid */
name: '', /** challenge name */
- group: 'Submission',
+ group: 'submission',
title: 'A new submission is uploaded.',
},
},
@@ -108,4 +108,5 @@ module.exports = {
ENABLE_DEV_MODE: process.env.ENABLE_DEV_MODE ? Boolean(process.env.ENABLE_DEV_MODE) : true,
DEV_MODE_EMAIL: process.env.DEV_MODE_EMAIL,
DEFAULT_REPLY_EMAIL: process.env.DEFAULT_REPLY_EMAIL,
+ ENABLE_HOOK_BULK_NOTIFICATION : process.env.ENABLE_HOOK_BULK_NOTIFICATION || false,
};
diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js
new file mode 100644
index 0000000..e7278f3
--- /dev/null
+++ b/src/hooks/hookBulkMessage.js
@@ -0,0 +1,137 @@
+/**
+ * Hook to insert broadcast notification into database for a user.
+ */
+
+'use strict'
+
+const _ = require('lodash')
+//const Joi = require('joi')
+//const errors = require('../common/errors')
+const logger = require('../common/logger')
+const models = require('../models')
+const logPrefix = "BulkNotificationHook: "
+
+models.BulkMessages.sync()
+models.BulkMessageUserRefs.sync()
+
+/**
+ * Main function
+ * @param {Integer} userId
+ */
+function checkBulkMessageForUser(userId) {
+ models.BulkMessages.count().then(function (tBulkMessages) {
+ if (tBulkMessages > 0) {
+ // the condition can help to optimize the execution
+ models.BulkMessageUserRefs.count({
+ where: {
+ user_id: userId
+ }
+ }).then(function (tUserRefs) {
+ if (tUserRefs < tBulkMessages) {
+ logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`)
+ syncBulkMessageForUser(userId)
+ }
+ }).catch((e) => {
+ logger.error(`${logPrefix} Failed to check total userRefs condition. Error: `, e)
+ })
+ }
+ }).catch((e) => {
+ logger.error(`${logPrefix} Failed to check total broadcast message condition. Error: `, e)
+ })
+}
+
+/**
+ * Helper function
+ * @param {Integer} userId
+ */
+function syncBulkMessageForUser(userId) {
+
+ /**
+ * Check if all bulk mesaages processed for current user or not
+ */
+ let q = "SELECT a.* FROM bulk_messages AS a " +
+ " LEFT OUTER JOIN (SELECT id as refid, bulk_message_id " +
+ " FROM bulk_message_user_refs AS bmur WHERE bmur.user_id=$1)" +
+ " AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL"
+ models.sequelize.query(q, { bind: [userId] })
+ .then(function (res) {
+ _.map(res[0], async (r) => {
+ logger.info(`${logPrefix} need to process for bulk message id: `, r.id)
+ // call function to check if current user in reciepent group
+ // insert row in userRef table
+ if (isBroadCastMessageForUser(userId, r)) {
+ // current user in reciepent group
+ createNotificationForUser(userId, r)
+ } else {
+ /**
+ * Insert row in userRef with notification-id null value
+ * It means - broadcast message in not for current user
+ */
+ insertUserRefs(userId, r.id, null)
+ }
+ })
+ }).catch((e) => {
+ logger.error(`${logPrefix} Failed to check bulk message condition: `, err)
+ })
+}
+
+/**
+ * Helper function
+ * Check if current user in broadcast recipent group
+ * @param {Integer} userId
+ * @param {Object} bulkMessage
+ */
+function isBroadCastMessageForUser(userId, bulkMessage) {
+ // TODO
+ return true;
+}
+
+/**
+ * Helper function
+ * @param {Integer} userId
+ * @param {Integer} bulkMessageId
+ * @param {Integer} notificationId
+ */
+function insertUserRefs(userId, bulkMessageId, notificationId) {
+ models.BulkMessageUserRefs.create({
+ bulk_message_id: bulkMessageId,
+ user_id: userId,
+ notification_id: notificationId,
+ }).then((b) => {
+ logger.info(`${logPrefix} Inserted userRef record ${b.id} for current user ${userId}`)
+ }).catch((e) => {
+ logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: `, e)
+ })
+}
+
+/**
+ * Helper function
+ * @param {Integer} userId
+ * @param {Object} bulkMessage
+ */
+function createNotificationForUser(userId, bulkMessage) {
+ models.Notification.create({
+ userId: userId,
+ type: bulkMessage.type,
+ contents: {
+ id: bulkMessage.id, /** broadcast message id */
+ name: bulkMessage.contents, /** broadcast message */
+ group: 'broadcast',
+ title: 'Broadcast Message',
+ },
+ read: false,
+ seen: false,
+ version: null,
+ }).then((n) => {
+ logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`)
+ insertUserRefs(userId, bulkMessage.id, n.id)
+ }).catch((err) => {
+ logger.error(`${logPrefix} Error in inserting broadcast message `, err)
+ })
+}
+
+
+// Exports
+module.exports = {
+ checkBulkMessageForUser,
+};
\ No newline at end of file
diff --git a/src/hooks/index.js b/src/hooks/index.js
new file mode 100644
index 0000000..5dcc1a6
--- /dev/null
+++ b/src/hooks/index.js
@@ -0,0 +1,17 @@
+/**
+ * Copyright (C) 2020 TopCoder Inc., All Rights Reserved.
+ */
+
+/**
+ * Hook implementation
+ *
+ * @author TCSCODER
+ * @version 1.0
+ */
+
+const hookBulkMessage = require("./hookBulkMessage")
+
+
+module.exports = {
+ hookBulkMessage,
+};
diff --git a/src/models/BulkMessageUserRefs.js b/src/models/BulkMessageUserRefs.js
new file mode 100644
index 0000000..7f3baf3
--- /dev/null
+++ b/src/models/BulkMessageUserRefs.js
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2020 TopCoder Inc., All Rights Reserved.
+ */
+
+/**
+ * The Bulk Message User Reference schema
+ *
+ * @author TCSCODER
+ * @version 1.0
+ */
+
+
+module.exports = (sequelize, DataTypes) => sequelize.define('bulk_message_user_refs', {
+ id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true },
+ bulk_message_id: {
+ type: DataTypes.BIGINT,
+ allowNull: false,
+ references: {
+ model: 'bulk_messages',
+ key: 'id'
+ }
+ },
+ notification_id: {
+ type: DataTypes.BIGINT,
+ allowNull: true,
+ references: {
+ model: 'Notifications',
+ key: 'id'
+ }
+ },
+ user_id: { type: DataTypes.BIGINT, allowNull: false }
+}, {});
+
+ // sequelize will generate and manage createdAt, updatedAt fields
diff --git a/src/models/BulkMessages.js b/src/models/BulkMessages.js
new file mode 100644
index 0000000..1479b2c
--- /dev/null
+++ b/src/models/BulkMessages.js
@@ -0,0 +1,21 @@
+/**
+ * Copyright (C) 2020 TopCoder Inc., All Rights Reserved.
+ */
+
+/**
+ * The Bulk Message Store schema
+ *
+ * @author TCSCODER
+ * @version 1.0
+ */
+
+
+module.exports = (sequelize, DataTypes) => sequelize.define('bulk_messages', {
+ id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true },
+ type: { type: DataTypes.STRING, allowNull: false },
+ contents: { type: DataTypes.JSONB, allowNull: false },
+ recipient_group: { type: DataTypes.STRING, allowNull: false }
+ }, {});
+
+ // sequelize will generate and manage createdAt, updatedAt fields
+
\ No newline at end of file
diff --git a/src/models/index.js b/src/models/index.js
index e6ef09e..d18f68a 100644
--- a/src/models/index.js
+++ b/src/models/index.js
@@ -16,11 +16,17 @@ const Notification = require('./Notification')(sequelize, DataTypes);
const NotificationSetting = require('./NotificationSetting')(sequelize, DataTypes);
const ServiceSettings = require('./ServiceSettings')(sequelize, DataTypes);
const ScheduledEvents = require('./ScheduledEvents')(sequelize, DataTypes);
+const BulkMessages = require('./BulkMessages')(sequelize, DataTypes);
+const BulkMessageUserRefs = require('./BulkMessageUserRefs')(sequelize, DataTypes);
+
module.exports = {
Notification,
NotificationSetting,
ServiceSettings,
ScheduledEvents,
+ BulkMessages,
+ BulkMessageUserRefs,
+ sequelize,
init: () => sequelize.sync(),
};
diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js
index cb92781..cf8c9cb 100644
--- a/src/services/NotificationService.js
+++ b/src/services/NotificationService.js
@@ -9,6 +9,8 @@ const Joi = require('joi');
const errors = require('../common/errors');
const logger = require('../common/logger');
const models = require('../models');
+const config = require('config');
+const hooks = require('../hooks');
const DEFAULT_LIMIT = 10;
@@ -202,6 +204,10 @@ function* listNotifications(query, userId) {
break;
}
+ if (config.ENABLE_HOOK_BULK_NOTIFICATION){
+ hooks.hookBulkMessage.checkBulkMessageForUser(userId)
+ }
+
if (_.keys(notificationSettings).length > 0) {
// only filter out notifications types which were explicitly set to 'no' - so we return notification by default
const notifications = _.keys(notificationSettings).filter((notificationType) =>
diff --git a/test/checkHooks.js b/test/checkHooks.js
new file mode 100644
index 0000000..6371283
--- /dev/null
+++ b/test/checkHooks.js
@@ -0,0 +1,3 @@
+const bulkhook = require("../src/hooks/hookBulkMessage")
+
+bulkhook.checkBulkMessageForUser(123)
\ No newline at end of file
From 6e80a00beba6315aec9259df52bfd64036d785d5 Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Mon, 17 Feb 2020 18:29:27 +0530
Subject: [PATCH 05/15] adding broadcast processor
---
config/default.js | 4 +++
src/hooks/hookBulkMessage.js | 10 ++++--
src/models/BulkMessages.js | 5 +--
.../broadcast/bulkNotificationHandler.js | 35 +++++++++++++++++++
src/processors/index.js | 2 ++
5 files changed, 51 insertions(+), 5 deletions(-)
create mode 100644 src/processors/broadcast/bulkNotificationHandler.js
diff --git a/config/default.js b/config/default.js
index c1252b5..904b068 100644
--- a/config/default.js
+++ b/config/default.js
@@ -98,6 +98,10 @@ module.exports = {
},
},
],
+ 'admin.notification.broadcast' : [{
+ handleBulkNotification: {}
+ }
+ ]
//'notifications.community.challenge.created': ['handleChallengeCreated'],
//'notifications.community.challenge.phasewarning': ['handleChallengePhaseWarning'],
},
diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js
index e7278f3..16d8c41 100644
--- a/src/hooks/hookBulkMessage.js
+++ b/src/hooks/hookBulkMessage.js
@@ -11,8 +11,12 @@ const logger = require('../common/logger')
const models = require('../models')
const logPrefix = "BulkNotificationHook: "
-models.BulkMessages.sync()
-models.BulkMessageUserRefs.sync()
+/**
+ * CREATE NEW TABLES IF NOT EXISTS
+ */
+models.BulkMessages.sync().then((t)=> {
+ models.BulkMessageUserRefs.sync()
+})
/**
* Main function
@@ -115,7 +119,7 @@ function createNotificationForUser(userId, bulkMessage) {
type: bulkMessage.type,
contents: {
id: bulkMessage.id, /** broadcast message id */
- name: bulkMessage.contents, /** broadcast message */
+ message: bulkMessage.message, /** broadcast message */
group: 'broadcast',
title: 'Broadcast Message',
},
diff --git a/src/models/BulkMessages.js b/src/models/BulkMessages.js
index 1479b2c..5026f6e 100644
--- a/src/models/BulkMessages.js
+++ b/src/models/BulkMessages.js
@@ -13,8 +13,9 @@
module.exports = (sequelize, DataTypes) => sequelize.define('bulk_messages', {
id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true },
type: { type: DataTypes.STRING, allowNull: false },
- contents: { type: DataTypes.JSONB, allowNull: false },
- recipient_group: { type: DataTypes.STRING, allowNull: false }
+ message: { type: DataTypes.TEXT, allowNull: false },
+ recipients: { type: DataTypes.JSONB, allowNull: false },
+ rules: {type: DataTypes.JSONB, allowNull: true}
}, {});
// sequelize will generate and manage createdAt, updatedAt fields
diff --git a/src/processors/broadcast/bulkNotificationHandler.js b/src/processors/broadcast/bulkNotificationHandler.js
new file mode 100644
index 0000000..11aa557
--- /dev/null
+++ b/src/processors/broadcast/bulkNotificationHandler.js
@@ -0,0 +1,35 @@
+/**
+ * Bulk notification handler.
+ */
+const co = require('co');
+const models = require('../../models');
+const logger = require('../../common/logger')
+
+/**
+ * Handle Kafka JSON message of broadcast.
+ *
+ * @param {Object} message the Kafka JSON message
+ * @param {Object} ruleSets
+ *
+ * @return {Promise} promise resolved to notifications
+ */
+const handle = (message, ruleSets) => co(function* () {
+ return new Promise(function(resolve, reject){
+ models.BulkMessages.create({
+ type: message.topic,
+ message: message.payload.message,
+ recipients: message.payload.recipients,
+ rules: message.payload.rules || null,
+ }).then((bm) => {
+ logger.info("Broadcast message recieved and inserted in db with id:", bm.id)
+ resolve([]) // no notification need to insert at this point
+ }).catch((e) => {
+ logger.error("Broadcast processor failed in db operation. Error: ", e)
+ reject(e)
+ })
+ })
+});
+
+module.exports = {
+ handle,
+};
\ No newline at end of file
diff --git a/src/processors/index.js b/src/processors/index.js
index 70d8c6a..a0243be 100644
--- a/src/processors/index.js
+++ b/src/processors/index.js
@@ -8,6 +8,7 @@ const ChallengePhaseWarningHandler = require('./challenge/ChallengePhaseWarningH
const ChallengeHandler = require('./challenge/ChallengeHandler');
const AutoPilotHandler = require('./challenge/AutoPilotHandler');
const SubmissionHandler = require('./challenge/SubmissionHandler');
+const BulkNotificationHandler = require('./broadcast/bulkNotificationHandler');
// Exports
module.exports = {
@@ -16,4 +17,5 @@ module.exports = {
handleChallenge: ChallengeHandler.handle,
handleAutoPilot: AutoPilotHandler.handle,
handleSubmission: SubmissionHandler.handle,
+ handleBulkNotification: BulkNotificationHandler.handle,
};
From 9d3ed96fcda4a268ed0976ce52a444833176e6e2 Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Tue, 18 Feb 2020 16:53:54 +0530
Subject: [PATCH 06/15] adding functions to decide recipient condition.
---
src/common/broadcastAPIHelper.js | 86 +++++++++++++++++++
src/hooks/hookBulkMessage.js | 123 +++++++++++++++-------------
src/services/NotificationService.js | 6 +-
3 files changed, 157 insertions(+), 58 deletions(-)
create mode 100644 src/common/broadcastAPIHelper.js
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
new file mode 100644
index 0000000..167c34c
--- /dev/null
+++ b/src/common/broadcastAPIHelper.js
@@ -0,0 +1,86 @@
+/**
+ *
+ */
+
+const _ = require('lodash')
+const config = require('config')
+const request = require('superagent')
+const logger = require('./logger')
+const m2mAuth = require('tc-core-library-js').auth.m2m;
+const m2m = m2mAuth(config);
+
+const logPrefix = "BroadcastAPI: "
+
+async function getM2MToken() {
+ return m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
+}
+
+async function getMemberInfo(userId) {
+ const url = config.TC_API_V3_BASE_URL +
+ `/members/_search/?fields=userId%2Cskills&query=userId%3A${userId}&limit=1`
+ return new Promise(function (resolve, reject) {
+ let memberInfo = []
+ logger.info(`calling member api ${url} `)
+ request
+ .get(url).then((res) => {
+ if (!_.get(res, 'body.result.success')) {
+ reject(new Error(`Failed to get member api detail for user id ${userId}`))
+ }
+ memberInfo = _.get(res, 'body.result.content')
+ logger.info(`Feteched ${memberInfo.length} record(s) from member api`)
+ resolve(memberInfo)
+ })
+ .catch((err) => {
+ reject(new Error(`Failed to get member api detail for user id ${userId}, ${err}`))
+ })
+
+ })
+ // Need clean-up
+ /*const m2m = await getM2MToken().catch((err) => {
+ logger.error(`${logPrefix} Failed to get m2m token`)
+ return new Promise(function(res, rej) {
+ rej(err)
+ })
+ })
+ logger.info(`${logPrefix} Fetched m2m token sucessfully. Token length is: `, m2m.length)
+ */
+ //return request.get(url)
+}
+
+async function checkBroadcastMessageForUser(userId, bulkMessage) {
+ return new Promise(function (resolve, reject) {
+ const skills = _.get(bulkMessage, 'recipients.skills')
+ if (skills && skills.length > 0) {
+ try {
+ getMemberInfo(userId).then((m) => {
+ let flag = false
+ logger.info(`${logPrefix} got member info.`)
+ const ms = _.get(m[0], "skills")
+ const memberSkills = []
+ _.map(ms, (o) => {
+ memberSkills.push(_.get(o, 'name').toLowerCase())
+ })
+ logger.info(`${logPrefix} user id have following skills`, memberSkills)
+ _.map(skills, (s) => {
+ if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) {
+ flag = true;
+ logger.info(`${logPrefix} '${s}' skill matached for user id ${userId}`)
+ }
+ })
+ resolve(flag)
+ }).catch((err) => {
+ reject(err)
+ })
+ } catch (err) {
+ reject(new Error(`${logPrefix} issue at skill condition check, ${err.message}`))
+ }
+ } else {
+ resolve(true) // no condition on recipient, so for all
+ }
+ }) // promise end
+
+}
+
+module.exports = {
+ checkBroadcastMessageForUser,
+}
\ No newline at end of file
diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js
index 16d8c41..067b400 100644
--- a/src/hooks/hookBulkMessage.js
+++ b/src/hooks/hookBulkMessage.js
@@ -9,12 +9,14 @@ const _ = require('lodash')
//const errors = require('../common/errors')
const logger = require('../common/logger')
const models = require('../models')
+const api = require('../common/broadcastAPIHelper')
+
const logPrefix = "BulkNotificationHook: "
/**
* CREATE NEW TABLES IF NOT EXISTS
*/
-models.BulkMessages.sync().then((t)=> {
+models.BulkMessages.sync().then((t) => {
models.BulkMessageUserRefs.sync()
})
@@ -22,25 +24,34 @@ models.BulkMessages.sync().then((t)=> {
* Main function
* @param {Integer} userId
*/
-function checkBulkMessageForUser(userId) {
- models.BulkMessages.count().then(function (tBulkMessages) {
- if (tBulkMessages > 0) {
- // the condition can help to optimize the execution
- models.BulkMessageUserRefs.count({
- where: {
- user_id: userId
- }
- }).then(function (tUserRefs) {
- if (tUserRefs < tBulkMessages) {
- logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`)
- syncBulkMessageForUser(userId)
- }
- }).catch((e) => {
- logger.error(`${logPrefix} Failed to check total userRefs condition. Error: `, e)
- })
- }
- }).catch((e) => {
- logger.error(`${logPrefix} Failed to check total broadcast message condition. Error: `, e)
+async function checkBulkMessageForUser(userId) {
+ return new Promise(function (resolve, reject) {
+ models.BulkMessages.count().then(function (tBulkMessages) {
+ if (tBulkMessages > 0) {
+ // the condition can help to optimize the execution
+ models.BulkMessageUserRefs.count({
+ where: {
+ user_id: userId
+ }
+ }).then(async function (tUserRefs) {
+ if (tUserRefs < tBulkMessages) {
+ logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`)
+ syncBulkMessageForUser(userId).catch((e) => {
+ reject(e)
+ })
+ }
+ resolve(true) // resolve here
+ }).catch((e) => {
+ logger.error(`${logPrefix} Failed to check total userRefs condition. Error: `, e)
+ reject(e)
+ })
+ } else {
+ resolve(true)
+ }
+ }).catch((e) => {
+ logger.error(`${logPrefix} Failed to check total broadcast message condition. Error: `, e)
+ reject(e)
+ })
})
}
@@ -48,35 +59,36 @@ function checkBulkMessageForUser(userId) {
* Helper function
* @param {Integer} userId
*/
-function syncBulkMessageForUser(userId) {
+async function syncBulkMessageForUser(userId) {
- /**
- * Check if all bulk mesaages processed for current user or not
- */
- let q = "SELECT a.* FROM bulk_messages AS a " +
- " LEFT OUTER JOIN (SELECT id as refid, bulk_message_id " +
- " FROM bulk_message_user_refs AS bmur WHERE bmur.user_id=$1)" +
- " AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL"
- models.sequelize.query(q, { bind: [userId] })
- .then(function (res) {
- _.map(res[0], async (r) => {
- logger.info(`${logPrefix} need to process for bulk message id: `, r.id)
- // call function to check if current user in reciepent group
- // insert row in userRef table
- if (isBroadCastMessageForUser(userId, r)) {
- // current user in reciepent group
- createNotificationForUser(userId, r)
- } else {
- /**
- * Insert row in userRef with notification-id null value
- * It means - broadcast message in not for current user
- */
- insertUserRefs(userId, r.id, null)
- }
+ return new Promise(function (resolve, reject) {
+ /**
+ * Check if all bulk mesaages processed for current user or not
+ */
+ let q = "SELECT a.* FROM bulk_messages AS a " +
+ " LEFT OUTER JOIN (SELECT id as refid, bulk_message_id " +
+ " FROM bulk_message_user_refs AS bmur WHERE bmur.user_id=$1)" +
+ " AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL"
+ models.sequelize.query(q, { bind: [userId] })
+ .then(function (res) {
+ _.map(res[0], (r) => {
+ logger.info(`${logPrefix} need to process for bulk message id: `, r.id)
+ isBroadCastMessageForUser(userId, r).then((result) => {
+ if (result) {
+ createNotificationForUser(userId, r)
+ } else {
+ insertUserRefs(userId, r.id, null)
+ }
+ }).catch((err) => {
+ logger.error("failed in checking recipient group condition, Error:", err)
+ })
+ })
+ resolve(true)
+ }).catch((e) => {
+ logger.error(`${logPrefix} Failed to check bulk message condition: `, e)
+ reject(e)
})
- }).catch((e) => {
- logger.error(`${logPrefix} Failed to check bulk message condition: `, err)
- })
+ })
}
/**
@@ -85,9 +97,8 @@ function syncBulkMessageForUser(userId) {
* @param {Integer} userId
* @param {Object} bulkMessage
*/
-function isBroadCastMessageForUser(userId, bulkMessage) {
- // TODO
- return true;
+async function isBroadCastMessageForUser(userId, bulkMessage) {
+ return api.checkBroadcastMessageForUser(userId, bulkMessage)
}
/**
@@ -96,8 +107,8 @@ function isBroadCastMessageForUser(userId, bulkMessage) {
* @param {Integer} bulkMessageId
* @param {Integer} notificationId
*/
-function insertUserRefs(userId, bulkMessageId, notificationId) {
- models.BulkMessageUserRefs.create({
+async function insertUserRefs(userId, bulkMessageId, notificationId) {
+ await models.BulkMessageUserRefs.create({
bulk_message_id: bulkMessageId,
user_id: userId,
notification_id: notificationId,
@@ -113,8 +124,8 @@ function insertUserRefs(userId, bulkMessageId, notificationId) {
* @param {Integer} userId
* @param {Object} bulkMessage
*/
-function createNotificationForUser(userId, bulkMessage) {
- models.Notification.create({
+async function createNotificationForUser(userId, bulkMessage) {
+ await models.Notification.create({
userId: userId,
type: bulkMessage.type,
contents: {
@@ -126,9 +137,9 @@ function createNotificationForUser(userId, bulkMessage) {
read: false,
seen: false,
version: null,
- }).then((n) => {
+ }).then(async (n) => {
logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`)
- insertUserRefs(userId, bulkMessage.id, n.id)
+ await insertUserRefs(userId, bulkMessage.id, n.id)
}).catch((err) => {
logger.error(`${logPrefix} Error in inserting broadcast message `, err)
})
diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js
index cf8c9cb..906bb3d 100644
--- a/src/services/NotificationService.js
+++ b/src/services/NotificationService.js
@@ -204,8 +204,10 @@ function* listNotifications(query, userId) {
break;
}
- if (config.ENABLE_HOOK_BULK_NOTIFICATION){
- hooks.hookBulkMessage.checkBulkMessageForUser(userId)
+ if (config.ENABLE_HOOK_BULK_NOTIFICATION) {
+ yield hooks.hookBulkMessage.checkBulkMessageForUser(userId).catch((e) => {
+ logger.info(`Issue in calling bulk notification hook.`, e)
+ })
}
if (_.keys(notificationSettings).length > 0) {
From d30fbefaeae4684d5e263d008cd8dfda4edf373b Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Tue, 18 Feb 2020 19:11:26 +0530
Subject: [PATCH 07/15] sync calls for testing
---
src/common/broadcastAPIHelper.js | 6 +-
src/hooks/hookBulkMessage.js | 95 +++++++++++++++--------------
src/services/NotificationService.js | 2 +-
3 files changed, 56 insertions(+), 47 deletions(-)
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
index 167c34c..8debbca 100644
--- a/src/common/broadcastAPIHelper.js
+++ b/src/common/broadcastAPIHelper.js
@@ -50,6 +50,7 @@ async function getMemberInfo(userId) {
async function checkBroadcastMessageForUser(userId, bulkMessage) {
return new Promise(function (resolve, reject) {
const skills = _.get(bulkMessage, 'recipients.skills')
+ logger.info(`Got skills in DB...`, skills)
if (skills && skills.length > 0) {
try {
getMemberInfo(userId).then((m) => {
@@ -67,7 +68,10 @@ async function checkBroadcastMessageForUser(userId, bulkMessage) {
logger.info(`${logPrefix} '${s}' skill matached for user id ${userId}`)
}
})
- resolve(flag)
+ resolve({
+ record: bulkMessage,
+ result: flag
+ })
}).catch((err) => {
reject(err)
})
diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js
index 067b400..9eb59f3 100644
--- a/src/hooks/hookBulkMessage.js
+++ b/src/hooks/hookBulkMessage.js
@@ -34,16 +34,14 @@ async function checkBulkMessageForUser(userId) {
user_id: userId
}
}).then(async function (tUserRefs) {
+ let result = true
if (tUserRefs < tBulkMessages) {
logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`)
- syncBulkMessageForUser(userId).catch((e) => {
- reject(e)
- })
+ result = await syncBulkMessageForUser(userId)
}
- resolve(true) // resolve here
+ resolve(result) // resolve here
}).catch((e) => {
- logger.error(`${logPrefix} Failed to check total userRefs condition. Error: `, e)
- reject(e)
+ reject(`${logPrefix} Failed to check total userRefs condition. Error: ${e}`)
})
} else {
resolve(true)
@@ -71,22 +69,24 @@ async function syncBulkMessageForUser(userId) {
" AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL"
models.sequelize.query(q, { bind: [userId] })
.then(function (res) {
- _.map(res[0], (r) => {
- logger.info(`${logPrefix} need to process for bulk message id: `, r.id)
- isBroadCastMessageForUser(userId, r).then((result) => {
- if (result) {
- createNotificationForUser(userId, r)
- } else {
- insertUserRefs(userId, r.id, null)
- }
- }).catch((err) => {
- logger.error("failed in checking recipient group condition, Error:", err)
+ Promise.all(res[0].map((r) => isBroadCastMessageForUser(userId, r)))
+ .then((results) => {
+ Promise.all(results.map((o) => {
+ if (o.result) {
+ return createNotificationForUser(userId, o.record)
+ } else {
+ return insertUserRefs(userId, o.record.id, null)
+ }
+ })).then((results) => {
+ resolve(results)
+ }).catch((e) => {
+ reject(e)
+ })
+ }).catch((e) => {
+ reject(e)
})
- })
- resolve(true)
}).catch((e) => {
- logger.error(`${logPrefix} Failed to check bulk message condition: `, e)
- reject(e)
+ reject(`${logPrefix} Failed to check bulk message condition: error - ${e}`)
})
})
}
@@ -108,14 +108,16 @@ async function isBroadCastMessageForUser(userId, bulkMessage) {
* @param {Integer} notificationId
*/
async function insertUserRefs(userId, bulkMessageId, notificationId) {
- await models.BulkMessageUserRefs.create({
- bulk_message_id: bulkMessageId,
- user_id: userId,
- notification_id: notificationId,
- }).then((b) => {
- logger.info(`${logPrefix} Inserted userRef record ${b.id} for current user ${userId}`)
- }).catch((e) => {
- logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: `, e)
+ return new Promise(function (resolve, reject) {
+ models.BulkMessageUserRefs.create({
+ bulk_message_id: bulkMessageId,
+ user_id: userId,
+ notification_id: notificationId,
+ }).then((b) => {
+ resolve(`${logPrefix} Inserted userRef record ${b.id} for current user ${userId}`)
+ }).catch((e) => {
+ reject(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`)
+ })
})
}
@@ -125,23 +127,26 @@ async function insertUserRefs(userId, bulkMessageId, notificationId) {
* @param {Object} bulkMessage
*/
async function createNotificationForUser(userId, bulkMessage) {
- await models.Notification.create({
- userId: userId,
- type: bulkMessage.type,
- contents: {
- id: bulkMessage.id, /** broadcast message id */
- message: bulkMessage.message, /** broadcast message */
- group: 'broadcast',
- title: 'Broadcast Message',
- },
- read: false,
- seen: false,
- version: null,
- }).then(async (n) => {
- logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`)
- await insertUserRefs(userId, bulkMessage.id, n.id)
- }).catch((err) => {
- logger.error(`${logPrefix} Error in inserting broadcast message `, err)
+ return new Promise(function (resolve, reject) {
+ models.Notification.create({
+ userId: userId,
+ type: bulkMessage.type,
+ contents: {
+ id: bulkMessage.id, /** broadcast message id */
+ message: bulkMessage.message, /** broadcast message */
+ group: 'broadcast',
+ title: 'Broadcast Message',
+ },
+ read: false,
+ seen: false,
+ version: null,
+ }).then(async (n) => {
+ logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`)
+ const result = await insertUserRefs(userId, bulkMessage.id, n.id)
+ resolve(result)
+ }).catch((err) => {
+ reject(`${logPrefix} Error in inserting broadcast message: ${err} `)
+ })
})
}
diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js
index 906bb3d..4456c3a 100644
--- a/src/services/NotificationService.js
+++ b/src/services/NotificationService.js
@@ -206,7 +206,7 @@ function* listNotifications(query, userId) {
if (config.ENABLE_HOOK_BULK_NOTIFICATION) {
yield hooks.hookBulkMessage.checkBulkMessageForUser(userId).catch((e) => {
- logger.info(`Issue in calling bulk notification hook.`, e)
+ logger.error(`Issue in calling bulk notification hook.`, e)
})
}
From 918f970e5bb1ae2402b39c22b8b6f52ab1c62d9a Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Wed, 19 Feb 2020 20:12:39 +0530
Subject: [PATCH 08/15] adding logic for user group checking.
---
src/common/broadcastAPIHelper.js | 168 +++++++++++++++++++---------
src/services/NotificationService.js | 8 +-
2 files changed, 120 insertions(+), 56 deletions(-)
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
index 8debbca..d8c11e4 100644
--- a/src/common/broadcastAPIHelper.js
+++ b/src/common/broadcastAPIHelper.js
@@ -15,74 +15,136 @@ async function getM2MToken() {
return m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
}
+/**
+ * Helper Function - get member profile
+ * @param {Integer} userId
+ */
async function getMemberInfo(userId) {
const url = config.TC_API_V3_BASE_URL +
- `/members/_search/?fields=userId%2Cskills&query=userId%3A${userId}&limit=1`
- return new Promise(function (resolve, reject) {
+ "/members/_search/?" +
+ "fields=userId%2Cskills" +
+ `&query=userId%3A${userId}` +
+ `&limit=1`
+ return new Promise(async function (resolve, reject) {
let memberInfo = []
logger.info(`calling member api ${url} `)
- request
- .get(url).then((res) => {
- if (!_.get(res, 'body.result.success')) {
- reject(new Error(`Failed to get member api detail for user id ${userId}`))
- }
- memberInfo = _.get(res, 'body.result.content')
- logger.info(`Feteched ${memberInfo.length} record(s) from member api`)
- resolve(memberInfo)
- })
- .catch((err) => {
- reject(new Error(`Failed to get member api detail for user id ${userId}, ${err}`))
- })
+ try {
+ const res = await request.get(url)
+ if (!_.get(res, 'body.result.success')) {
+ reject(new Error(`BCA Memeber API: Failed to get member detail for user id ${userId}`))
+ }
+ memberInfo = _.get(res, 'body.result.content')
+ logger.info(`BCA Memeber API: Feteched ${memberInfo.length} record(s) from member api`)
+ resolve(memberInfo)
+ } catch (err) {
+ reject(new Error(`BCA Memeber API: Failed to get member api detail for user id ${userId}, ${err}`))
+ }
})
- // Need clean-up
- /*const m2m = await getM2MToken().catch((err) => {
- logger.error(`${logPrefix} Failed to get m2m token`)
- return new Promise(function(res, rej) {
- rej(err)
- })
+}
+
+/**
+ * Helper Function - get user group
+ * @param {Integer} userId
+ */
+async function getUserGroup(userId) {
+ //TODO need to take care of pagination
+ const url = config.TC_API_V5_BASE_URL +
+ `/groups/?memberId=${userId}` +
+ "&membershipType=user&page=1"
+ let groupInfo = []
+ return new Promise(async (resolve, reject) => {
+ try {
+ const machineToken = await getM2MToken()
+ //logger.info(`BCA Group API: got m2m token of length ${machineToken.length}`)
+ const res = await request.get(url).set('Authorization', `Bearer ${machineToken}`);
+ if (_.get(res, 'res.statusCode') != 200) {
+ reject(new Error(`BCA Group API: Failed to get group detail for user id ${userId}`))
+ }
+ groupInfo = _.get(res, 'body')
+ logger.info(`BCA Group API: Feteched ${groupInfo.length} record(s) from group api`)
+ resolve(groupInfo)
+ } catch (e) {
+ reject(`Calling group api ${e}`)
+ }
})
- logger.info(`${logPrefix} Fetched m2m token sucessfully. Token length is: `, m2m.length)
- */
- //return request.get(url)
}
async function checkBroadcastMessageForUser(userId, bulkMessage) {
return new Promise(function (resolve, reject) {
- const skills = _.get(bulkMessage, 'recipients.skills')
- logger.info(`Got skills in DB...`, skills)
- if (skills && skills.length > 0) {
- try {
- getMemberInfo(userId).then((m) => {
- let flag = false
- logger.info(`${logPrefix} got member info.`)
- const ms = _.get(m[0], "skills")
- const memberSkills = []
- _.map(ms, (o) => {
- memberSkills.push(_.get(o, 'name').toLowerCase())
- })
- logger.info(`${logPrefix} user id have following skills`, memberSkills)
- _.map(skills, (s) => {
- if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) {
- flag = true;
- logger.info(`${logPrefix} '${s}' skill matached for user id ${userId}`)
- }
- })
- resolve({
- record: bulkMessage,
- result: flag
- })
- }).catch((err) => {
- reject(err)
+ Promise.all([
+ checkUserSkill(userId, bulkMessage),
+ checkUserGroup(userId, bulkMessage),
+ ]).then((results) => {
+ let flag = true // TODO need to be sure about default value
+ _.map(results, (r) => {
+ flag = !r ? false : flag // TODO recheck condition
+ })
+ logger.info(`Final condition result is: ${flag}`)
+ resolve({
+ record: bulkMessage,
+ result: flag
+ })
+ }).catch((err) => {
+ reject(`${logPrefix} got issue in checking recipient condition. ${err}`)
+ })
+ }) // promise end
+}
+
+/**
+ * Helper function - check Skill condition
+ */
+async function checkUserSkill(userId, bulkMessage) {
+ return new Promise(async function (resolve, reject) {
+ try {
+ const skills = _.get(bulkMessage, 'recipients.skills')
+ let flag = true // allow for all
+ if (skills && skills.length > 0) {
+ const m = await getMemberInfo(userId)
+ const ms = _.get(m[0], "skills") // get member skills
+ const memberSkills = []
+ flag = false
+ _.map(ms, (o) => {
+ memberSkills.push(_.get(o, 'name').toLowerCase())
+ })
+ _.map(skills, (s) => {
+ if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) {
+ flag = true
+ logger.info(`BroadcastMessageId: ${bulkMessage.id}, '${s}' skill matached for user id ${userId}`)
+ }
})
- } catch (err) {
- reject(new Error(`${logPrefix} issue at skill condition check, ${err.message}`))
}
- } else {
- resolve(true) // no condition on recipient, so for all
+ resolve(flag)
+ } catch (e) {
+ reject(e)
}
- }) // promise end
+ }) // promise end
+}
+/**
+ * Helper function - check group condition
+ */
+async function checkUserGroup(userId, bulkMessage) {
+ return new Promise(async function (resolve, reject) {
+ try {
+ const groups = _.get(bulkMessage, 'recipients.groups')
+ let flag = true // TODO
+ if (groups.length > 0) {
+ flag = false
+ const groupInfo = await getUserGroup(userId)
+ _.map(groupInfo, (o) => {
+ if (_.indexOf(groups, "public") >= 0) {
+ flag = (_.get(o, "privateGroup")) ? false : flag
+ } else {
+ flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag
+ }
+ })
+ }
+ resolve(flag)
+ } catch (e) {
+ reject(e)
+ }
+ })
}
module.exports = {
diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js
index 4456c3a..fcae5a2 100644
--- a/src/services/NotificationService.js
+++ b/src/services/NotificationService.js
@@ -205,9 +205,11 @@ function* listNotifications(query, userId) {
}
if (config.ENABLE_HOOK_BULK_NOTIFICATION) {
- yield hooks.hookBulkMessage.checkBulkMessageForUser(userId).catch((e) => {
- logger.error(`Issue in calling bulk notification hook.`, e)
- })
+ try {
+ yield hooks.hookBulkMessage.checkBulkMessageForUser(userId)
+ } catch (e) {
+ logger.error(`Error in calling bulk notification hook: ${e}`)
+ }
}
if (_.keys(notificationSettings).length > 0) {
From 5c6eaad50f1a6e5f40955b5b9d9366718c5980cf Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Thu, 20 Feb 2020 15:33:10 +0530
Subject: [PATCH 09/15] deploying on dev
---
.circleci/config.yml | 2 +-
src/common/broadcastAPIHelper.js | 51 +++++++++++--------
src/hooks/hookBulkMessage.js | 39 +++++++-------
src/models/BulkMessages.js | 14 +++--
.../broadcast/bulkNotificationHandler.js | 51 ++++++++++++-------
5 files changed, 92 insertions(+), 65 deletions(-)
diff --git a/.circleci/config.yml b/.circleci/config.yml
index f87288b..7bc897d 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -102,7 +102,7 @@ workflows:
context : org-global
filters:
branches:
- only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/platform-filtering']
+ only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/bulk-notification']
- "build-prod":
context : org-global
filters:
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
index d8c11e4..3cda1e2 100644
--- a/src/common/broadcastAPIHelper.js
+++ b/src/common/broadcastAPIHelper.js
@@ -11,6 +11,9 @@ const m2m = m2mAuth(config);
const logPrefix = "BroadcastAPI: "
+/**
+ * Helper Function - get m2m token
+ */
async function getM2MToken() {
return m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
}
@@ -70,27 +73,6 @@ async function getUserGroup(userId) {
})
}
-async function checkBroadcastMessageForUser(userId, bulkMessage) {
- return new Promise(function (resolve, reject) {
- Promise.all([
- checkUserSkill(userId, bulkMessage),
- checkUserGroup(userId, bulkMessage),
- ]).then((results) => {
- let flag = true // TODO need to be sure about default value
- _.map(results, (r) => {
- flag = !r ? false : flag // TODO recheck condition
- })
- logger.info(`Final condition result is: ${flag}`)
- resolve({
- record: bulkMessage,
- result: flag
- })
- }).catch((err) => {
- reject(`${logPrefix} got issue in checking recipient condition. ${err}`)
- })
- }) // promise end
-}
-
/**
* Helper function - check Skill condition
*/
@@ -147,6 +129,33 @@ async function checkUserGroup(userId, bulkMessage) {
})
}
+/**
+ * Main Function - check if broadcast message is for current user or not
+ *
+ * @param {Integer} userId
+ * @param {Object} bulkMessage
+ */
+async function checkBroadcastMessageForUser(userId, bulkMessage) {
+ return new Promise(function (resolve, reject) {
+ Promise.all([
+ checkUserSkill(userId, bulkMessage),
+ checkUserGroup(userId, bulkMessage),
+ ]).then((results) => {
+ let flag = true // TODO need to be sure about default value
+ _.map(results, (r) => {
+ flag = !r ? false : flag // TODO recheck condition
+ })
+ logger.info(`Final condition result is: ${flag}`)
+ resolve({
+ record: bulkMessage,
+ result: flag
+ })
+ }).catch((err) => {
+ reject(`${logPrefix} got issue in checking recipient condition. ${err}`)
+ })
+ }) // promise end
+}
+
module.exports = {
checkBroadcastMessageForUser,
}
\ No newline at end of file
diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js
index 9eb59f3..1d0c266 100644
--- a/src/hooks/hookBulkMessage.js
+++ b/src/hooks/hookBulkMessage.js
@@ -37,7 +37,7 @@ async function checkBulkMessageForUser(userId) {
let result = true
if (tUserRefs < tBulkMessages) {
logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`)
- result = await syncBulkMessageForUser(userId)
+ result = await syncBulkMessageForUser(userId)
}
resolve(result) // resolve here
}).catch((e) => {
@@ -102,23 +102,25 @@ async function isBroadCastMessageForUser(userId, bulkMessage) {
}
/**
- * Helper function
+ * Helper function - Insert in bulkMessage user reference table
+ *
* @param {Integer} userId
* @param {Integer} bulkMessageId
* @param {Integer} notificationId
*/
async function insertUserRefs(userId, bulkMessageId, notificationId) {
- return new Promise(function (resolve, reject) {
- models.BulkMessageUserRefs.create({
+ try {
+ const r = await models.BulkMessageUserRefs.create({
bulk_message_id: bulkMessageId,
user_id: userId,
notification_id: notificationId,
- }).then((b) => {
- resolve(`${logPrefix} Inserted userRef record ${b.id} for current user ${userId}`)
- }).catch((e) => {
- reject(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`)
})
- })
+ logger.info(`${logPrefix} Inserted userRef record for bulk message id ${r.id} for current user ${userId}`)
+ return r
+ } catch (e) {
+ logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`)
+ return e
+ }
}
/**
@@ -127,8 +129,8 @@ async function insertUserRefs(userId, bulkMessageId, notificationId) {
* @param {Object} bulkMessage
*/
async function createNotificationForUser(userId, bulkMessage) {
- return new Promise(function (resolve, reject) {
- models.Notification.create({
+ try {
+ const n = await models.Notification.create({
userId: userId,
type: bulkMessage.type,
contents: {
@@ -140,14 +142,15 @@ async function createNotificationForUser(userId, bulkMessage) {
read: false,
seen: false,
version: null,
- }).then(async (n) => {
- logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`)
- const result = await insertUserRefs(userId, bulkMessage.id, n.id)
- resolve(result)
- }).catch((err) => {
- reject(`${logPrefix} Error in inserting broadcast message: ${err} `)
})
- })
+ logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`)
+ // TODO need to be in transaction so that rollback will be possible
+ const result = await insertUserRefs(userId, bulkMessage.id, n.id)
+ return result
+ } catch (e) {
+ logger.error(`${logPrefix} Error in inserting broadcast message: ${err} `)
+ return e
+ }
}
diff --git a/src/models/BulkMessages.js b/src/models/BulkMessages.js
index 5026f6e..2bdbc6b 100644
--- a/src/models/BulkMessages.js
+++ b/src/models/BulkMessages.js
@@ -11,12 +11,10 @@
module.exports = (sequelize, DataTypes) => sequelize.define('bulk_messages', {
- id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true },
- type: { type: DataTypes.STRING, allowNull: false },
- message: { type: DataTypes.TEXT, allowNull: false },
- recipients: { type: DataTypes.JSONB, allowNull: false },
- rules: {type: DataTypes.JSONB, allowNull: true}
- }, {});
-
+ id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true },
+ type: { type: DataTypes.STRING, allowNull: false },
+ message: { type: DataTypes.TEXT, allowNull: false },
+ recipients: { type: DataTypes.JSONB, allowNull: false },
+}, {});
+
// sequelize will generate and manage createdAt, updatedAt fields
-
\ No newline at end of file
diff --git a/src/processors/broadcast/bulkNotificationHandler.js b/src/processors/broadcast/bulkNotificationHandler.js
index 11aa557..93d1e04 100644
--- a/src/processors/broadcast/bulkNotificationHandler.js
+++ b/src/processors/broadcast/bulkNotificationHandler.js
@@ -1,8 +1,9 @@
/**
* Bulk notification handler.
*/
-const co = require('co');
-const models = require('../../models');
+const joi = require('joi')
+const co = require('co')
+const models = require('../../models')
const logger = require('../../common/logger')
/**
@@ -14,22 +15,38 @@ const logger = require('../../common/logger')
* @return {Promise} promise resolved to notifications
*/
const handle = (message, ruleSets) => co(function* () {
- return new Promise(function(resolve, reject){
- models.BulkMessages.create({
- type: message.topic,
- message: message.payload.message,
- recipients: message.payload.recipients,
- rules: message.payload.rules || null,
- }).then((bm) => {
- logger.info("Broadcast message recieved and inserted in db with id:", bm.id)
- resolve([]) // no notification need to insert at this point
- }).catch((e) => {
- logger.error("Broadcast processor failed in db operation. Error: ", e)
- reject(e)
- })
- })
+ try {
+ const bm = yield models.BulkMessages.create({
+ type: message.topic,
+ message: message.payload.message,
+ recipients: message.payload.recipients,
+ })
+ logger.info("Broadcast message recieved and inserted in db with id:", bm.id)
+ } catch (e) {
+ logger.error(`Broadcast processor failed in db operation. Error: ${e}`)
+ }
+ return [] // this point of time, send empty notification object
});
+/**
+ * validate kafka payload
+ */
+handle.schema = {
+ message: joi.object().keys({
+ topic: joi.string().required(),
+ originator: joi.string().required(),
+ timestamp: joi.date().required(),
+ 'mime-type': joi.string().required(),
+ payload: joi.object().keys({
+ message: joi.string().required(),
+ recipients: joi.object().required(),
+ }),
+ }).required(),
+ ruleSets: joi.object(),
+};
+
module.exports = {
handle,
-};
\ No newline at end of file
+};
+
+logger.buildService(module.exports);
\ No newline at end of file
From 457a97ee30b8eaa3e4b627427726a117dd5203da Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Thu, 20 Feb 2020 17:01:37 +0530
Subject: [PATCH 10/15] correcting logic for public group
---
src/common/broadcastAPIHelper.js | 20 +++++++++++---------
1 file changed, 11 insertions(+), 9 deletions(-)
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
index 3cda1e2..9669921 100644
--- a/src/common/broadcastAPIHelper.js
+++ b/src/common/broadcastAPIHelper.js
@@ -110,16 +110,18 @@ async function checkUserGroup(userId, bulkMessage) {
return new Promise(async function (resolve, reject) {
try {
const groups = _.get(bulkMessage, 'recipients.groups')
- let flag = true // TODO
+ let flag = false // default
+ const userGroupInfo = await getUserGroup(userId)
if (groups.length > 0) {
- flag = false
- const groupInfo = await getUserGroup(userId)
- _.map(groupInfo, (o) => {
- if (_.indexOf(groups, "public") >= 0) {
- flag = (_.get(o, "privateGroup")) ? false : flag
- } else {
- flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag
- }
+ _.map(userGroupInfo, (o) => {
+ // particular group only condition
+ flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag
+ })
+ } else { // no group condition means its for `public` no private group
+ flag = true // default allow for all
+ _.map(userGroupInfo, (o) => {
+ // not allow if user is part of any private group
+ flag = (_.get(o, "privateGroup")) ? false : flag
})
}
resolve(flag)
From a036f9fde1e37c90d491413cf38800a0c1494b77 Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Thu, 20 Feb 2020 17:06:13 +0530
Subject: [PATCH 11/15] prining log
---
src/common/broadcastAPIHelper.js | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
index 9669921..1f20b58 100644
--- a/src/common/broadcastAPIHelper.js
+++ b/src/common/broadcastAPIHelper.js
@@ -147,7 +147,7 @@ async function checkBroadcastMessageForUser(userId, bulkMessage) {
_.map(results, (r) => {
flag = !r ? false : flag // TODO recheck condition
})
- logger.info(`Final condition result is: ${flag}`)
+ logger.info(`BCA: Final recepient condition result is: ${flag} for userId ${userId}`)
resolve({
record: bulkMessage,
result: flag
From 62cf1a1f30764439b8f5290e5c37b7093bd845a8 Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Fri, 21 Feb 2020 17:59:20 +0530
Subject: [PATCH 12/15] changes in group api calling..
---
src/common/broadcastAPIHelper.js | 78 ++++++++++++++++++++++----------
1 file changed, 55 insertions(+), 23 deletions(-)
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
index 1f20b58..89e7f80 100644
--- a/src/common/broadcastAPIHelper.js
+++ b/src/common/broadcastAPIHelper.js
@@ -1,5 +1,5 @@
/**
- *
+ * Broadcast: API calling helper
*/
const _ = require('lodash')
@@ -40,7 +40,8 @@ async function getMemberInfo(userId) {
logger.info(`BCA Memeber API: Feteched ${memberInfo.length} record(s) from member api`)
resolve(memberInfo)
} catch (err) {
- reject(new Error(`BCA Memeber API: Failed to get member api detail for user id ${userId}, ${err}`))
+ reject(new Error(`BCA Memeber API: Failed to get member ` +
+ `api detail for user id ${userId}, ${err}`))
}
})
@@ -51,26 +52,53 @@ async function getMemberInfo(userId) {
* @param {Integer} userId
*/
async function getUserGroup(userId) {
- //TODO need to take care of pagination
- const url = config.TC_API_V5_BASE_URL +
- `/groups/?memberId=${userId}` +
- "&membershipType=user&page=1"
- let groupInfo = []
- return new Promise(async (resolve, reject) => {
- try {
- const machineToken = await getM2MToken()
- //logger.info(`BCA Group API: got m2m token of length ${machineToken.length}`)
- const res = await request.get(url).set('Authorization', `Bearer ${machineToken}`);
- if (_.get(res, 'res.statusCode') != 200) {
- reject(new Error(`BCA Group API: Failed to get group detail for user id ${userId}`))
- }
- groupInfo = _.get(res, 'body')
- logger.info(`BCA Group API: Feteched ${groupInfo.length} record(s) from group api`)
- resolve(groupInfo)
- } catch (e) {
- reject(`Calling group api ${e}`)
+ try {
+ const machineToken = await getM2MToken()
+ if (machineToken.length <= 0) {
+ return (new Error(`BCA Group API: fecthing m2m token failed for ${userId}`))
}
- })
+ let nextPage
+ let res
+ let url
+ let page = 1
+ let groupInfo = []
+ const perPage = 100
+ do {
+ url = config.TC_API_V5_BASE_URL +
+ `/groups/?memberId=${userId}&membershipType=user` +
+ `&page=${page}&perPage=${perPage}`
+ res = await callApi(url, machineToken)
+ let resStatus = _.get(res, 'res.statusCode')
+ if (resStatus != 200) {
+ throw new Error(`BCA Group API: Failed for user id ${userId},` +
+ ` response status ${resStatus}`)
+ }
+ let data = _.get(res, 'body')
+ groupInfo = groupInfo.concat(data)
+ nextPage = _.get(res, 'header.x-next-page')
+ page = nextPage
+ } while (nextPage)
+ logger.info(`BCA Group API: Feteched ${groupInfo.length} record(s) from group api`)
+ return groupInfo
+ } catch (e) {
+ logger.error(`BCA: Error calling group api : ${e}`)
+ throw new Error(`getUserGroup() : ${e}`)
+ }
+}
+
+/**
+ *
+ * @param {String} url
+ * @param {String} machineToken
+ */
+async function callApi(url, machineToken) {
+ try {
+ logger.info(`calling api url ${url}`)
+ return request.get(url).set('Authorization', `Bearer ${machineToken}`)
+ } catch (e) {
+ logger.error(`Error in calling URL ${url}, ${e}`)
+ throw new Error(`callApi() : ${e}`)
+ }
}
/**
@@ -92,7 +120,8 @@ async function checkUserSkill(userId, bulkMessage) {
_.map(skills, (s) => {
if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) {
flag = true
- logger.info(`BroadcastMessageId: ${bulkMessage.id}, '${s}' skill matached for user id ${userId}`)
+ logger.info(`BroadcastMessageId: ${bulkMessage.id},` +
+ ` '${s}' skill matached for user id ${userId}`)
}
})
}
@@ -123,6 +152,8 @@ async function checkUserGroup(userId, bulkMessage) {
// not allow if user is part of any private group
flag = (_.get(o, "privateGroup")) ? false : flag
})
+ logger.info(`public group condition for userId ${userId}` +
+ ` and BC messageId ${bulkMessage.id}, the result is: ${flag}`)
}
resolve(flag)
} catch (e) {
@@ -147,7 +178,8 @@ async function checkBroadcastMessageForUser(userId, bulkMessage) {
_.map(results, (r) => {
flag = !r ? false : flag // TODO recheck condition
})
- logger.info(`BCA: Final recepient condition result is: ${flag} for userId ${userId}`)
+ logger.info(`BCA: messageId: ${bulkMessage.id} Final recipient` +
+ ` condition result is: ${flag} for userId ${userId}`)
resolve({
record: bulkMessage,
result: flag
From 83490b3b82aa54c4c66ece04de2abfd14f4abfa0 Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Mon, 24 Feb 2020 19:09:23 +0530
Subject: [PATCH 13/15] adding track conditions
---
src/common/broadcastAPIHelper.js | 125 ++++++++++++++++++-------------
1 file changed, 73 insertions(+), 52 deletions(-)
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
index 89e7f80..18d759b 100644
--- a/src/common/broadcastAPIHelper.js
+++ b/src/common/broadcastAPIHelper.js
@@ -25,8 +25,7 @@ async function getM2MToken() {
async function getMemberInfo(userId) {
const url = config.TC_API_V3_BASE_URL +
"/members/_search/?" +
- "fields=userId%2Cskills" +
- `&query=userId%3A${userId}` +
+ `query=userId%3A${userId}` +
`&limit=1`
return new Promise(async function (resolve, reject) {
let memberInfo = []
@@ -102,64 +101,86 @@ async function callApi(url, machineToken) {
}
/**
- * Helper function - check Skill condition
+ * Helper function - check Skills and Tracks condition
*/
-async function checkUserSkill(userId, bulkMessage) {
- return new Promise(async function (resolve, reject) {
- try {
- const skills = _.get(bulkMessage, 'recipients.skills')
- let flag = true // allow for all
- if (skills && skills.length > 0) {
- const m = await getMemberInfo(userId)
- const ms = _.get(m[0], "skills") // get member skills
- const memberSkills = []
- flag = false
- _.map(ms, (o) => {
- memberSkills.push(_.get(o, 'name').toLowerCase())
- })
- _.map(skills, (s) => {
- if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) {
- flag = true
- logger.info(`BroadcastMessageId: ${bulkMessage.id},` +
- ` '${s}' skill matached for user id ${userId}`)
- }
- })
- }
- resolve(flag)
- } catch (e) {
- reject(e)
+async function checkUserSkillsAndTracks(userId, bulkMessage) {
+ try {
+ const skills = _.get(bulkMessage, 'recipients.skills')
+ const tracks = _.get(bulkMessage, 'recipients.tracks')
+ const m = await getMemberInfo(userId)
+ let skillMatch, trackMatch = false // default
+ if (skills && skills.length > 0) {
+ const ms = _.get(m[0], "skills") // get member skills
+ const memberSkills = []
+ skillMatch = false
+ _.map(ms, (o) => {
+ memberSkills.push(_.get(o, 'name').toLowerCase())
+ })
+ _.map(skills, (s) => {
+ if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) {
+ skillMatch = true
+ logger.info(`BroadcastMessageId: ${bulkMessage.id},` +
+ ` '${s}' skill matached for user id ${userId}`)
+ }
+ })
+ } else {
+ skillMatch = true // no condition, means allow for all
}
- }) // promise end
+
+ //
+ if (tracks.length > 0) {
+ trackMatch = false
+ const uDevChallenges = _.get(m[0], "stats[0].DEVELOP.challenges1")
+ const uDesignChallenges = _.get(m[0], "stats[0].DEVELOP.challenges")
+ const uDSChallenges = _.get(m[0], "stats[0].DEVELOP.challenges")
+ _.map(tracks, (t) => {
+ /**
+ * checking if user participated in specific challenges
+ */
+ if (t.equalsIgnoreCase("DEVELOP")) {
+ trackMatch = uDevChallenges > 0 ? true : trackMatch
+ } else if (t.equalsIgnoreCase("DESIGN")) {
+ trackMatch = uDesignChallenges > 0 ? true : trackMatch
+ } else if (t.equalsIgnoreCase("DATA_SCIENCE")) {
+ trackMatch = uDSChallenges > 0 ? true : trackMatch
+ }
+ })
+ } else {
+ trackMatch = true // no condition, means allow for all
+ }
+ const flag = (skillMatch && trackMatch) ? true : false
+ return flag
+ } catch (e) {
+ throw new Error(`checkUserSkillsAndTracks() : ${e}`)
+ }
}
/**
* Helper function - check group condition
*/
async function checkUserGroup(userId, bulkMessage) {
- return new Promise(async function (resolve, reject) {
- try {
- const groups = _.get(bulkMessage, 'recipients.groups')
- let flag = false // default
- const userGroupInfo = await getUserGroup(userId)
- if (groups.length > 0) {
- _.map(userGroupInfo, (o) => {
- // particular group only condition
- flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag
- })
- } else { // no group condition means its for `public` no private group
- flag = true // default allow for all
- _.map(userGroupInfo, (o) => {
- // not allow if user is part of any private group
- flag = (_.get(o, "privateGroup")) ? false : flag
- })
- logger.info(`public group condition for userId ${userId}` +
- ` and BC messageId ${bulkMessage.id}, the result is: ${flag}`)
- }
- resolve(flag)
- } catch (e) {
- reject(e)
+ try {
+ const groups = _.get(bulkMessage, 'recipients.groups')
+ let flag = false // default
+ const userGroupInfo = await getUserGroup(userId)
+ if (groups.length > 0) {
+ _.map(userGroupInfo, (o) => {
+ // particular group only condition
+ flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag
+ })
+ } else { // no group condition means its for `public` no private group
+ flag = true // default allow for all
+ _.map(userGroupInfo, (o) => {
+ // not allow if user is part of any private group
+ flag = (_.get(o, "privateGroup")) ? false : flag
+ })
+ logger.info(`public group condition for userId ${userId}` +
+ ` and BC messageId ${bulkMessage.id}, the result is: ${flag}`)
}
- })
+ return flag
+ } catch (e) {
+ throw new Error(`checkUserGroup(): ${e}`)
+ }
}
/**
@@ -171,7 +192,7 @@ async function checkUserGroup(userId, bulkMessage) {
async function checkBroadcastMessageForUser(userId, bulkMessage) {
return new Promise(function (resolve, reject) {
Promise.all([
- checkUserSkill(userId, bulkMessage),
+ checkUserSkillsAndTracks(userId, bulkMessage),
checkUserGroup(userId, bulkMessage),
]).then((results) => {
let flag = true // TODO need to be sure about default value
From 7810afce454c8227f8e8cd39d4de127caf3424fc Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Wed, 26 Feb 2020 12:27:53 +0530
Subject: [PATCH 14/15] fixing typo
---
src/common/broadcastAPIHelper.js | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
index 18d759b..b196cf4 100644
--- a/src/common/broadcastAPIHelper.js
+++ b/src/common/broadcastAPIHelper.js
@@ -130,18 +130,19 @@ async function checkUserSkillsAndTracks(userId, bulkMessage) {
//
if (tracks.length > 0) {
trackMatch = false
- const uDevChallenges = _.get(m[0], "stats[0].DEVELOP.challenges1")
- const uDesignChallenges = _.get(m[0], "stats[0].DEVELOP.challenges")
- const uDSChallenges = _.get(m[0], "stats[0].DEVELOP.challenges")
+ const uDevChallenges = _.get(m[0], "stats[0].DEVELOP.challenges")
+ const uDesignChallenges = _.get(m[0], "stats[0].DESIGN.challenges")
+ const uDSChallenges = _.get(m[0], "stats[0].DATA_SCIENCE.challenges")
_.map(tracks, (t) => {
/**
* checking if user participated in specific challenges
*/
- if (t.equalsIgnoreCase("DEVELOP")) {
+ let key = t.toLowerCase()
+ if (key === "develop") {
trackMatch = uDevChallenges > 0 ? true : trackMatch
- } else if (t.equalsIgnoreCase("DESIGN")) {
+ } else if (key === "design") {
trackMatch = uDesignChallenges > 0 ? true : trackMatch
- } else if (t.equalsIgnoreCase("DATA_SCIENCE")) {
+ } else if (key === "data_science") {
trackMatch = uDSChallenges > 0 ? true : trackMatch
}
})
From e957bed7d08ee9e8c80828db58286e8a2e5ee67a Mon Sep 17 00:00:00 2001
From: Sachin Maheshwari
Date: Thu, 27 Feb 2020 12:58:48 +0530
Subject: [PATCH 15/15] code clean-up
---
src/hooks/hookBulkMessage.js | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js
index 1d0c266..cacbe44 100644
--- a/src/hooks/hookBulkMessage.js
+++ b/src/hooks/hookBulkMessage.js
@@ -5,8 +5,6 @@
'use strict'
const _ = require('lodash')
-//const Joi = require('joi')
-//const errors = require('../common/errors')
const logger = require('../common/logger')
const models = require('../models')
const api = require('../common/broadcastAPIHelper')
@@ -119,7 +117,7 @@ async function insertUserRefs(userId, bulkMessageId, notificationId) {
return r
} catch (e) {
logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`)
- return e
+ throw new Error(`insertUserRefs() : ${e}`)
}
}
@@ -148,8 +146,8 @@ async function createNotificationForUser(userId, bulkMessage) {
const result = await insertUserRefs(userId, bulkMessage.id, n.id)
return result
} catch (e) {
- logger.error(`${logPrefix} Error in inserting broadcast message: ${err} `)
- return e
+ logger.error(`${logPrefix} insert broadcast notification error: ${e} `)
+ throw new Error(`createNotificationForUser() : ${e}`)
}
}