-
Notifications
You must be signed in to change notification settings - Fork 71
Description
{{ matches( $metadata.name, ^a_.*) }}
When i use Appendfilter i am getting following error.
[2020-08-09 21:22:04,642] INFO [coverage.basecoverage-file-source-deva|task-0] Opening new iterator for source : [name='NEXUSCOVBATCH_MEMB_PLAN_COV_4.json', path='/mnt/nexus-kafka-connect/member-coverage/MEMBPLANCOV/deva', size=43516, lastModified=1597022524000, inode=25, hash=880644864] (io.streamthoughts.kafka.connect.filepulse.source.FileInputIterable:65)
[2020-08-09 21:22:04,668] ERROR [coverage.basecoverage-file-source-deva|task-0] Error occurred while executing filter 'AppendFilter' on record='[message=[value=[B@f9f9858, schema =[type=BYTES]]]' (io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline:180)
[2020-08-09 21:22:04,668] ERROR [coverage.basecoverage-file-source-deva|task-0] Error while processing source file '[name='NEXUSCOVBATCH_MEMB_PLAN_COV_4.json', path='/mnt/nexus-kafka-connect/member-coverage/MEMBPLANCOV/deva', size=43516, lastModified=1597022524000, inode=25, hash=880644864]' (io.streamthoughts.kafka.connect.filepulse.source.KafkaFileStateReporter:119)
java.lang.NullPointerException
[2020-08-09 21:22:04,669] INFO [coverage.basecoverage-file-source-deva|task-0] WorkerSourceTask{id=coverage.basecoverage-file-source-deva-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-08-09 21:22:04,669] INFO [coverage.basecoverage-file-source-deva|task-0] WorkerSourceTask{id=coverage.basecoverage-file-source-deva-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-08-09 21:22:04,669] ERROR [coverage.basecoverage-file-source-deva|task-0] WorkerSourceTask{id=coverage.basecoverage-file-source-deva-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException: java.lang.NullPointerException
at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:188)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:126)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
[2020-08-09 21:22:04,669] ERROR [coverage.basecoverage-file-source-deva|task-0] WorkerSourceTask{id=coverage.basecoverage-file-source-deva-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
My configuration is below.
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"filters":"SetSUBGRPTopic",
"filters.SetSUBGRPTopic.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetSUBGRPTopic.if":"{{ matches($metadata.name,^NEXUSCOVBATCH_MEMB_PLAN_COV_.*) }}",
"filters.SetSUBGRPTopic.field":"{{ $topic }}",
"filters.SetSUBGRPTopic.value":"member.coverage.domain.deva.basecoverage"
Originally posted by @mohai-kafka in #47 (comment)