-
Notifications
You must be signed in to change notification settings - Fork 71
Description
I followed the official Getting Started Guide using a local Confluent Hub with v2.8 of kafka-connect-file-pulse. Everything works. But now I am trying to fetch only metadata of files in a folder, the path+filename, size and timestamp. As no further instruction are provided I tried to use the LocalMetadataFileInputReader. I was hoping that it let me access the mentioned values by default. Silly as I am, I used the "connect-file-pulse-quickstart-raw.json" example and changed the "tasks.reader.class" to "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalMetadataFileInputReader". With everything running I copied the file "quickstart-musics-dataset.csv" into "/tmp/kafka-connect/examples". This gives me the following output:
$ kafka-console-consumer --topic quickstart-meta --from-beginning --bootstrap-server localhost:9092 [2023-02-28 15:21:31,595] WARN [Consumer clientId=console-consumer, groupId=console-consumer-19722] Error while fetching metadata with correlation id 2 : {quickstart-meta=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Status topic gives me a failure:
kafka-console-consumer --topic file-pulse-status-meta --from-beginning --bootstrap-server localhost:9092 {"metadata":{"uri":"file:/tmp/kafka-connect/examples/musics-dataset.csv","name":"musics-dataset.csv","contentLength":6588,"lastModified":1677594128325,"contentDigest":{"digest":"1466679696","algorithm":"CRC32"},"userDefinedMetadata":{"system.inode":1973042,"system.hostname":"LenovoW530"}},"offset":{"position":-1,"rows":0,"timestamp":1677594130119},"status":"SCHEDULED"} {"metadata":{"uri":"file:/tmp/kafka-connect/examples/musics-dataset.csv","name":"musics-dataset.csv","contentLength":6588,"lastModified":1677594128325,"contentDigest":{"digest":"1466679696","algorithm":"CRC32"},"userDefinedMetadata":{"system.inode":1973042,"system.hostname":"LenovoW530"}},"offset":{"position":-1,"rows":0,"timestamp":1677594130183},"status":"STARTED"} {"metadata":{"uri":"file:/tmp/kafka-connect/examples/musics-dataset.csv","name":"musics-dataset.csv","contentLength":6588,"lastModified":1677594128325,"contentDigest":{"digest":"1466679696","algorithm":"CRC32"},"userDefinedMetadata":{"system.inode":1973042,"system.hostname":"LenovoW530"}},"offset":{"position":-1,"rows":0,"timestamp":1677594130183},"status":"FAILED"} {"metadata":{"uri":"file:/tmp/kafka-connect/examples/musics-dataset.csv","name":"musics-dataset.csv","contentLength":6588,"lastModified":1677594128325,"contentDigest":{"digest":"1466679696","algorithm":"CRC32"},"userDefinedMetadata":{"system.inode":1973042,"system.hostname":"LenovoW530"}},"offset":{"position":-1,"rows":0,"timestamp":1677594130183},"status":"CLEANED"}
What is the right way to use the LocalMetadataFileInputReader? Or is this error kafka related?