55
66import config from 'config' ;
77import _ from 'lodash' ;
8+ import Promise from 'bluebird' ;
89import util from '../../util' ;
910
1011const ES_PROJECT_INDEX = config . get ( 'elasticsearchConfig.indexName' ) ;
@@ -19,37 +20,22 @@ const eClient = util.getElasticSearchClient();
1920 * @param {Object } channel channel to ack, nack
2021 * @returns {undefined }
2122 */
22- const projectAttachmentAddedHandler = ( logger , msg , channel ) => {
23- const data = JSON . parse ( msg . content . toString ( ) ) ;
24-
25- eClient . get ( {
26- index : ES_PROJECT_INDEX ,
27- type : ES_PROJECT_TYPE ,
28- id : data . projectId ,
29- } ) . then ( ( doc ) => {
23+ const projectAttachmentAddedHandler = Promise . coroutine ( function * ( logger , msg , channel ) { // eslint-disable-line func-names
24+ try {
25+ const data = JSON . parse ( msg . content . toString ( ) ) ;
26+ const doc = yield eClient . get ( { index : ES_PROJECT_INDEX , type : ES_PROJECT_TYPE , id : data . projectId } ) ;
3027 const attachments = _ . isArray ( doc . _source . attachments ) ? doc . _source . attachments : [ ] ; // eslint-disable-line no-underscore-dangle
3128 attachments . push ( data ) ;
3229 const merged = _ . merge ( doc . _source , { attachments } ) ; // eslint-disable-line no-underscore-dangle
33- eClient . update ( {
34- index : ES_PROJECT_INDEX ,
35- type : ES_PROJECT_TYPE ,
36- id : data . projectId ,
37- body : {
38- doc : merged ,
39- } ,
40- } ) . then ( ( ) => {
41- logger . debug ( 'project attachment added to project document successfully' ) ;
42- channel . ack ( msg ) ;
43- } ) . catch ( ( error ) => {
44- logger . error ( 'failed to add project attachment to project document' , error ) ;
45- channel . nack ( msg , false , ! msg . fields . redelivered ) ;
46- } ) ;
47- } ) . catch ( ( error ) => {
48- logger . error ( 'Error fetching project document from elasticsearch' , error ) ;
30+ yield eClient . update ( { index : ES_PROJECT_INDEX , type : ES_PROJECT_TYPE , id : data . projectId , body : { doc : merged } } ) ;
31+ logger . debug ( 'project attachment added to project document successfully' ) ;
32+ channel . ack ( msg ) ;
33+ } catch ( error ) {
34+ logger . error ( 'Error handling project.attachment.added event' , error ) ;
4935 // if the message has been redelivered dont attempt to reprocess it
5036 channel . nack ( msg , false , ! msg . fields . redelivered ) ;
51- } ) ;
52- } ;
37+ }
38+ } ) ;
5339
5440/**
5541 * Handler for project attachment updated event
@@ -58,41 +44,33 @@ const projectAttachmentAddedHandler = (logger, msg, channel) => {
5844 * @param {Object } channel channel to ack, nack
5945 * @returns {undefined }
6046 */
61- const projectAttachmentUpdatedHandler = ( logger , msg , channel ) => {
62- const data = JSON . parse ( msg . content . toString ( ) ) ;
63-
64- eClient . get ( {
65- index : ES_PROJECT_INDEX ,
66- type : ES_PROJECT_TYPE ,
67- id : data . original . projectId ,
68- } ) . then ( ( doc ) => {
47+ const projectAttachmentUpdatedHandler = Promise . coroutine ( function * ( logger , msg , channel ) { // eslint-disable-line func-names
48+ try {
49+ const data = JSON . parse ( msg . content . toString ( ) ) ;
50+ const doc = yield eClient . get ( { index : ES_PROJECT_INDEX , type : ES_PROJECT_TYPE , id : data . original . projectId } ) ;
6951 const attachments = _ . map ( doc . _source . attachments , ( single ) => { // eslint-disable-line no-underscore-dangle
7052 if ( single . id === data . original . id ) {
7153 return _ . merge ( single , data . updated ) ;
7254 }
7355 return single ;
7456 } ) ;
7557 const merged = _ . merge ( doc . _source , { attachments } ) ; // eslint-disable-line no-underscore-dangle
76- eClient . update ( {
58+ yield eClient . update ( {
7759 index : ES_PROJECT_INDEX ,
7860 type : ES_PROJECT_TYPE ,
7961 id : data . original . projectId ,
8062 body : {
8163 doc : merged ,
8264 } ,
83- } ) . then ( ( ) => {
84- logger . debug ( 'elasticsearch index updated, project attachment updated successfully' ) ;
85- channel . ack ( msg ) ;
86- } ) . catch ( ( error ) => {
87- logger . error ( 'failed to update project attachment for project document' , error ) ;
88- channel . nack ( msg , false , ! msg . fields . redelivered ) ;
8965 } ) ;
90- } ) . catch ( ( error ) => {
91- logger . error ( 'Error fetching project document from elasticsearch' , error ) ;
66+ logger . debug ( 'elasticsearch index updated, project attachment updated successfully' ) ;
67+ channel . ack ( msg ) ;
68+ } catch ( error ) {
69+ logger . error ( 'Error handling project.attachment.updated event' , error ) ;
9270 // if the message has been redelivered dont attempt to reprocess it
9371 channel . nack ( msg , false , ! msg . fields . redelivered ) ;
94- } ) ;
95- } ;
72+ }
73+ } ) ;
9674
9775/**
9876 * Handler for project attachment deleted event
@@ -101,35 +79,28 @@ const projectAttachmentUpdatedHandler = (logger, msg, channel) => {
10179 * @param {Object } channel channel to ack, nack
10280 * @returns {undefined }
10381 */
104- const projectAttachmentRemovedHandler = ( logger , msg , channel ) => {
105- const data = JSON . parse ( msg . content . toString ( ) ) ;
106- eClient . get ( {
107- index : ES_PROJECT_INDEX ,
108- type : ES_PROJECT_TYPE ,
109- id : data . projectId ,
110- } ) . then ( ( doc ) => {
82+ const projectAttachmentRemovedHandler = Promise . coroutine ( function * ( logger , msg , channel ) { // eslint-disable-line func-names
83+ try {
84+ const data = JSON . parse ( msg . content . toString ( ) ) ;
85+ const doc = yield eClient . get ( { index : ES_PROJECT_INDEX , type : ES_PROJECT_TYPE , id : data . projectId } ) ;
11186 const attachments = _ . filter ( doc . _source . attachments , single => single . id !== data . id ) ; // eslint-disable-line no-underscore-dangle
11287 const merged = _ . merge ( doc . _source , { attachments } ) ; // eslint-disable-line no-underscore-dangle
113- eClient . update ( {
88+ yield eClient . update ( {
11489 index : ES_PROJECT_INDEX ,
11590 type : ES_PROJECT_TYPE ,
11691 id : data . projectId ,
11792 body : {
11893 doc : merged ,
11994 } ,
120- } ) . then ( ( ) => {
121- logger . debug ( 'project attachment removed from project document successfully' ) ;
122- channel . ack ( msg ) ;
123- } ) . catch ( ( error ) => {
124- logger . error ( 'failed to remove project attachment from project document' , error ) ;
125- channel . nack ( msg , false , ! msg . fields . redelivered ) ;
12695 } ) ;
127- } ) . catch ( ( error ) => {
96+ logger . debug ( 'project attachment removed from project document successfully' ) ;
97+ channel . ack ( msg ) ;
98+ } catch ( error ) {
12899 logger . error ( 'Error fetching project document from elasticsearch' , error ) ;
129100 // if the message has been redelivered dont attempt to reprocess it
130101 channel . nack ( msg , false , ! msg . fields . redelivered ) ;
131- } ) ;
132- } ;
102+ }
103+ } ) ;
133104
134105
135106module . exports = {
0 commit comments