@@ -75,20 +75,37 @@ function startKafkaConsumer(handlers, notificationServiceHandlers) {
7575 } ) ;
7676 } ) ;
7777
78+ var latestSubscriptions = null ;
79+
7880 const check = function ( ) {
79- logger . debug ( ' Checking Health...' ) ;
81+ logger . debug ( " Checking health" ) ;
8082 if ( ! consumer . client . initialBrokers && ! consumer . client . initialBrokers . length ) {
8183 logger . debug ( 'Found unhealthy Kafka Brokers...' ) ;
8284 return false ;
8385 }
8486 let connected = true ;
87+ let currentSubscriptions = consumer . subscriptions ;
88+ for ( var sIdx in currentSubscriptions ) {
89+ // current subscription
90+ let sub = currentSubscriptions [ sIdx ] ;
91+ // previous subscription
92+ let prevSub = latestSubscriptions ? latestSubscriptions [ sIdx ] : null ;
93+ // levarage the `paused` field (https://github.com/oleksiyk/kafka/blob/master/lib/base_consumer.js#L66) to
94+ // determine if there was a possibility of an unhandled exception. If we find paused status for the same
95+ // topic in two consecutive health checks, we assume it was stuck because of unhandled error
96+ if ( prevSub && prevSub . paused && sub . paused ) {
97+ logger . error ( `Found subscription for ${ sIdx } in paused state for consecutive health checks` ) ;
98+ return false ;
99+ }
100+ }
101+ // stores the latest subscription status in global variable
102+ latestSubscriptions = consumer . subscriptions ;
85103 consumer . client . initialBrokers . forEach ( conn => {
86- logger . debug ( `url ${ conn . server ( ) } - connected=${ conn . connected } ` ) ;
87- connected = conn . connected & connected ;
104+ logger . debug ( `url ${ conn . server ( ) } - connected=${ conn . connected } ` )
105+ connected = conn . connected & connected
88106 } ) ;
89- logger . debug ( 'Found all Kafka Brokers healthy...' ) ;
90- return connected ;
91- } ;
107+ return connected
108+ }
92109
93110 consumer
94111 . init ( )
0 commit comments