Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ workflows:
branches:
only:
- develop
- feature/shapeup4-cqrs-update

# Production builds are exectuted only on tagged commits to the
# master branch.
Expand Down
2 changes: 2 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ module.exports = {
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api',

// topics
UBAHN_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'ubahn.action.error',

UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
Expand Down
4 changes: 2 additions & 2 deletions docker-pgsql-es/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ services:
image: "postgres:12.4"
volumes:
- database-data:/var/lib/postgresql/data/
ports:
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_USER: ${DB_USERNAME}
POSTGRES_DB: ${DB_NAME}
esearch:
image: elasticsearch:7.7.1
image: elasticsearch:7.13.4
container_name: ubahn-data-processor-es_es
ports:
- "9200:9200"
Expand Down
16 changes: 10 additions & 6 deletions src/common/db-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,27 @@ async function get (model, pk, params) {
* @param model the sequelize model object
* @param entity entity to create
* @param auth the user auth object
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function create (model, entity, auth) {
async function create (model, entity, auth, transaction) {
if (auth) {
entity.createdBy = helper.getAuthUser(auth)
}
return model.create(entity)
return model.create(entity, { transaction })
}

/**
* delete object by pk
* @param model the sequelize model object
* @param pk the primary key
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function remove (model, pk, params) {
async function remove (model, pk, params, transaction) {
const instance = await get(model, pk, params)
return instance.destroy()
const result = await instance.destroy({ transaction })
return result
}

/**
Expand All @@ -132,13 +135,14 @@ async function remove (model, pk, params) {
* @param entity entity to create
* @param auth the auth object
* @param auth the path params
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function update (model, pk, entity, auth, params) {
async function update (model, pk, entity, auth, params, transaction) {
// insure that object exists
const instance = await get(model, pk, params)
entity.updatedBy = helper.getAuthUser(auth)
return instance.update(entity)
return instance.update(entity, { transaction })
}

/**
Expand Down
36 changes: 36 additions & 0 deletions src/common/es-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const config = require('config')
const _ = require('lodash')
const querystring = require('querystring')
const logger = require('../common/logger')
const helper = require('../common/helper')
const appConst = require('../consts')
const esClient = require('./es-client').getESClient()

Expand Down Expand Up @@ -282,6 +283,38 @@ function escapeRegex (str) {
/* eslint-enable no-useless-escape */
}

/**
* Process create entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processCreate (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.index({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
body: entity,
refresh: 'wait_for'
})
logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`)
}

/**
* Process delete entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processDelete (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.delete({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
refresh: 'wait_for'
})
}

async function getOrganizationId (handle) {
const dBHelper = require('../common/db-helper')
const sequelize = require('../models/index')
Expand Down Expand Up @@ -1453,6 +1486,9 @@ async function searchAchievementValues ({ organizationId, keyword }) {
}

module.exports = {
processCreate,
processUpdate: processCreate,
processDelete,
searchElasticSearch,
getFromElasticSearch,
searchUsers,
Expand Down
38 changes: 37 additions & 1 deletion src/common/helper.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const config = require('config')
const Joi = require('@hapi/joi')
const querystring = require('querystring')
const errors = require('./errors')
const appConst = require('../consts')
Expand All @@ -9,6 +10,20 @@ const busApi = require('tc-bus-api-wrapper')
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))

/**
* Function to valid require keys
* @param {Object} payload validated object
* @param {Array} keys required keys
* @throws {Error} if required key absent
*/
function validProperties (payload, keys) {
const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true)
const error = schema.validate(payload).error
if (error) {
throw error
}
}

/**
* get auth user handle or id
* @param authUser the user
Expand Down Expand Up @@ -145,12 +160,33 @@ async function postEvent (topic, payload) {
await busApiClient.postEvent(message)
}

/**
* Send error event to Kafka
* @params {String} topic the topic name
* @params {Object} payload the payload
* @params {String} action for which operation error occurred
*/
async function publishError (topic, payload, action) {
_.set(payload, 'apiAction', action)
const message = {
topic,
originator: config.KAFKA_MESSAGE_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': 'application/json',
payload
}
logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`)
await busApiClient.postEvent(message)
}

module.exports = {
validProperties,
getAuthUser,
permissionCheck,
checkIfExists,
injectSearchMeta,
getControllerMethods,
getSubControllerMethods,
postEvent
postEvent,
publishError
}
56 changes: 46 additions & 10 deletions src/common/service-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,49 @@ const MODEL_TO_RESOURCE = {
* Create record in es
* @param resource the resource to create
* @param result the resource fields
* @param toEs is to es directly
*/
async function createRecordInEs (resource, entity) {
async function createRecordInEs (resource, entity, toEs) {
try {
await publishMessage('create', resource, entity)
if (toEs) {
await esHelper.processCreate(resource, entity)
}
} catch (err) {
logger.logFullError(err)
throw err
}

if (!toEs) {
try {
await publishMessage("create", resource, entity);
} catch (err) {
logger.logFullError(err);
}
}

}

/**
* Patch record in es
* @param resource the resource to create
* @param result the resource fields
* @param toEs is to es directly
*/
async function patchRecordInEs (resource, entity) {
async function patchRecordInEs (resource, entity, toEs) {
try {
await publishMessage('patch', resource, entity)
if (toEs) {
await esHelper.processUpdate(resource, entity)
}
} catch (err) {
logger.logFullError(err)
throw err
}
if (!toEs) {
try {
await publishMessage("patch", resource, entity);
} catch (err) {
logger.logFullError(err);
}
}
}

Expand All @@ -65,8 +89,9 @@ async function patchRecordInEs (resource, entity) {
* @param id the id of record
* @param params the params of record (like nested ids)
* @param resource the resource to delete
* @param toEs is to es directly
*/
async function deleteRecordFromEs (id, params, resource) {
async function deleteRecordFromEs (id, params, resource, toEs) {
let payload
if (SUB_USER_DOCUMENTS[resource] || SUB_ORG_DOCUMENTS[resource]) {
payload = _.assign({}, params)
Expand All @@ -76,9 +101,19 @@ async function deleteRecordFromEs (id, params, resource) {
}
}
try {
await publishMessage('remove', resource, payload)
if (toEs) {
await esHelper.processDelete(resource, payload)
}
} catch (err) {
logger.logFullError(err)
throw err
}
if (!toEs) {
try {
await publishMessage("remove", resource, payload);
} catch (err) {
logger.logFullError(err);
}
}
}

Expand Down Expand Up @@ -174,13 +209,14 @@ function sleep (ms) {
}

/**
* delete child of record with delay between each item deleted
* delete child of record with delay between each item deleted and with transaction
* @param model the child model to delete
* @param id the user id to delete
* @param params the params for child
* @param resourceName the es recource name
* @param transaction the transaction object
*/
async function deleteChild (model, id, params, resourceName) {
async function deleteChild (model, id, params, resourceName, transaction) {
const query = {}
query[params[0]] = id
const result = await dbHelper.find(model, query)
Expand All @@ -194,8 +230,8 @@ async function deleteChild (model, id, params, resourceName) {
params.forEach(attr => { esParams[attr] = record[attr] })

// remove from db
dbHelper.remove(model, record.id)
deleteRecordFromEs(record.id, esParams, resourceName)
await dbHelper.remove(model, record.id, transaction)
await deleteRecordFromEs(record.id, esParams, resourceName, !!transaction)

// sleep for configured time
await sleep(config.CASCADE_PAUSE_MS)
Expand Down
Loading