Skip to content

GroupRowFilter buffer not forwarded when empty line(s) at the end of the csv file #667

@Belbli

Description

@Belbli

Hello!

I'm using FilePulseSourceConnector with CSVFilter:

    "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
    "filters.ParseCSVLine.columns": "col1:STRING;col2:STRING",
    "filters.ParseCSVLine.separator": ";",

and GroupRowFilter:

    "filters.GroupByFirstColumn.type": "io.streamthoughts.kafka.connect.filepulse.filter.GroupRowFilter",
    "filters.GroupByFirstColumn.fields": "col1",
    "filters.GroupByFirstColumn.max.buffered.records": "5",
    "filters.GroupByFirstColumn.target": "batch",

And looks like GroupRowFilter's last buffer is not forwarded when the processed csv file has an empty line at the end of the csv file. Example:

val1;val1
val1;val2
val1;val3
val1;val4
val1;val5
val1;val6
val1;val7
<empty line>

So when I submit this file to connector which has max.buffered.records=5 for GroupRowFilter, I see only one record in the topic instead of two. val1;val6 and val1;val7 are not sent to Kafka

Steps To Reproduce

  1. Create filepulse connector with the filters described above
  2. submit csv file with the number of non empty rows of which is not a multiple of the filters.GroupByFirstColumn.max.buffered.records (in this case 5) and the last line of which is empty

bash script to reproduce it with docker-compose-debug.yml from the project:

#!/bin/bash

./debug.sh

curl -X PUT localhost:8083/connectors/group-filter-connector/config --header "Content-Type: application/json" \
  -d '{
        "schema.registry.url": "http://schema-registry:8085",
        "fs.listing.directory.path": "/tmp/kafka-connect/",
        "tasks.file.status.storage.bootstrap.servers": "kafka:29092",
        "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
        "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy ",
        "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path": "/tmp/kafka-connect/",
        "fs.scan.interval.ms": "10000",
        "fs.scan.filters": "io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern": ".*\\.csv$",
        "filters": "ParseCSVLine,GroupByFirstColumn",

        "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
        "filters.ParseCSVLine.columns": "col1:STRING;col2:STRING",
        "filters.ParseCSVLine.separator": ";",

        "filters.GroupByFirstColumn.type": "io.streamthoughts.kafka.connect.filepulse.filter.GroupRowFilter",
        "filters.GroupByFirstColumn.fields": "col1",
        "filters.GroupByFirstColumn.max.buffered.records": "5",
        "filters.GroupByFirstColumn.target": "batch",

        "value.connect.schema":"{\"name\":\"com.example.Data\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"batch\":{\"type\":\"ARRAY\",\"isOptional\":false,\"valueSchema\":{\"name\":\"com.example.Batch\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"col1\":{\"type\":\"STRING\",\"isOptional\":false},\"col2\":{\"type\":\"STRING\",\"isOptional\":false}}}}},\"version\":1}",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "topic": "grouped-data",
        "internal.kafka.reporter.bootstrap.servers": "kafka:29092",
        "internal.kafka.reporter.topic": "connect-file-pulse-status"
      }'

echo -e "\n\nCopying test file to kafka-connect-file-pulse-master-connect-1..."

docker cp ./columns.csv kafka-connect-file-pulse-master-connect-1:/tmp/kafka-connect/columns.csv

echo -e "\n\ncheck for messages here (need to wait a bit): http://localhost:8087/ui/docker-kafka-server/topic/grouped-data/data?sort=Oldest&partition=All"

exit

Expected behavior
2 records published to kafka. The first record contains 5 entries in the result array and the second record contains 2 entries.

Actual behavior
Only one record containing 5 entries is sent to kafka

Can you please check it out?
Thank you

Metadata

Metadata

Assignees

No one assigned

    Labels

    releasedIssue has been released

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions