1616package com .splunk .kafka .connect ;
1717
1818import com .splunk .hecclient .*;
19+ import com .sun .tools .corba .se .idl .StringGen ;
1920import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
2021import org .apache .kafka .common .TopicPartition ;
2122import org .apache .kafka .connect .errors .RetriableException ;
@@ -143,7 +144,7 @@ private void preventTooManyOutstandingEvents() {
143144
144145 private void handleRaw (final Collection <SinkRecord > records ) {
145146 if (connectorConfig .headerSupport ) {
146- handleRecordsWithHeader (records );
147+ if ( records != null ) { handleRecordsWithHeader (records ); }
147148 }
148149
149150 else if (connectorConfig .hasMetaDataConfigured ()) {
@@ -160,27 +161,67 @@ else if (connectorConfig.hasMetaDataConfigured()) {
160161 }
161162
162163 private void handleRecordsWithHeader (final Collection <SinkRecord > records ) {
163- HashMap <SplunkSinkRecord , ArrayList <SinkRecord >> recordsWithSameHeaders = new HashMap <>();
164+ log .info ("Inside handle records" );
165+
166+ HashMap <String , ArrayList <SinkRecord >> recordsWithSameHeaders = new HashMap <>();
164167 SplunkSinkRecord splunkSinkRecord ;
165168 for (SinkRecord record : records ) {
166- splunkSinkRecord = new SplunkSinkRecord ( record , connectorConfig );
169+ log . info ( "Inside loop" );
167170
168- if (!recordsWithSameHeaders .containsKey (splunkSinkRecord )) {
169- recordsWithSameHeaders .put (splunkSinkRecord , new ArrayList <>());
171+ // splunkSinkRecord = new SplunkSinkRecord(record, connectorConfig);
172+ String key = headerId (record );
173+ if (!recordsWithSameHeaders .containsKey (key )) {
174+ recordsWithSameHeaders .put (key , new ArrayList <>());
170175 }
171176 ArrayList <SinkRecord > recordList = recordsWithSameHeaders .get (record );
172177 recordList .add (record );
173- recordsWithSameHeaders .put (splunkSinkRecord , recordList );
178+ recordsWithSameHeaders .put (key , recordList );
174179 }
175180
181+ int index = 0 ;
176182 Iterator itr = recordsWithSameHeaders .entrySet ().iterator ();
177183 while (itr .hasNext ()) {
184+ log .info ("Sending Log {}" , index );
178185 Map .Entry set = (Map .Entry )itr .next ();
179186 SplunkSinkRecord splunkSinkRecordKey = (SplunkSinkRecord )set .getKey ();
180187 ArrayList <SinkRecord > recordArrayList = (ArrayList )set .getValue ();
181- EventBatch batch = createRawHeaderEventBatch (1 );
188+ EventBatch batch = createRawHeaderEventBatch (splunkSinkRecordKey );
182189 sendEvents (recordArrayList , batch );
190+ index ++;
191+ }
192+ }
193+
194+ public String headerId (SinkRecord sinkRecord ) {
195+ Headers headers = sinkRecord .headers ();
196+ String headerId = "" ;
197+
198+ if (headers .lastWithName (connectorConfig .headerIndex ).value () != null ) {
199+ headerId += headers .lastWithName (connectorConfig .headerIndex ).value ().toString ();
183200 }
201+
202+ insertheaderToken (headerId );
203+
204+ if (headers .lastWithName (connectorConfig .headerHost ).value () != null ) {
205+ headerId += headers .lastWithName (connectorConfig .headerHost ).value ().toString ();
206+ }
207+
208+ insertheaderToken (headerId );
209+
210+ if (headers .lastWithName (connectorConfig .headerSource ).value () != null ) {
211+ headerId += headers .lastWithName (connectorConfig .headerSource ).value ().toString ();
212+ }
213+
214+ insertheaderToken (headerId );
215+
216+ if (headers .lastWithName (connectorConfig .headerSourcetype ).value () != null ) {
217+ headerId += headers .lastWithName (connectorConfig .headerSourcetype ).value ().toString ();
218+ }
219+
220+ return headerId ;
221+ }
222+
223+ public String insertheaderToken (String id ) {
224+ return id += "$$$" ;
184225 }
185226
186227 private void handleEvent (final Collection <SinkRecord > records ) {
0 commit comments