diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/request/dto/Task.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/dto/Task.java index ca48e9f..9d324a3 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/request/dto/Task.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/dto/Task.java @@ -45,7 +45,7 @@ public String toString() { /** * Defines a Task Id. */ - private static class TaskId { + public static class TaskId { private String connector; private int task; diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorPlugins.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorPlugins.java index 78d708b..33d8e25 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorPlugins.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorPlugins.java @@ -21,6 +21,7 @@ import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; /** @@ -40,6 +41,6 @@ public Object getRequestBody() { @Override public Collection parseResponse(final String responseStr) throws IOException { - return JacksonFactory.newInstance().readValue(responseStr, Collection.class); + return Arrays.asList(JacksonFactory.newInstance().readValue(responseStr, ConnectorPlugin[].class)); } } diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorTasks.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorTasks.java index ca6de30..1f78b55 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorTasks.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectorTasks.java @@ -21,6 +21,7 @@ import org.sourcelab.kafka.connect.apiclient.request.dto.Task; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Objects; @@ -54,6 +55,6 @@ public Object getRequestBody() { @Override public Collection parseResponse(final String responseStr) throws IOException { - return JacksonFactory.newInstance().readValue(responseStr, Collection.class); + return Arrays.asList(JacksonFactory.newInstance().readValue(responseStr, Task[].class)); } } diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectors.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectors.java index ea3f619..eaf255a 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectors.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/get/GetConnectors.java @@ -20,6 +20,7 @@ import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; /** @@ -38,6 +39,6 @@ public Object getRequestBody() { @Override public Collection parseResponse(final String responseStr) throws IOException { - return JacksonFactory.newInstance().readValue(responseStr, Collection.class); + return Arrays.asList(JacksonFactory.newInstance().readValue(responseStr, String[].class)); } } diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/request/get/connector/GetConnectorPluginsTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/request/get/connector/GetConnectorPluginsTest.java new file mode 100644 index 0000000..65869ac --- /dev/null +++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/request/get/connector/GetConnectorPluginsTest.java @@ -0,0 +1,57 @@ +/** + * Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.sourcelab.kafka.connect.apiclient.request.get.connector; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sourcelab.kafka.connect.apiclient.request.AbstractRequestTest; +import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin; +import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorPlugins; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.*; + +public class GetConnectorPluginsTest extends AbstractRequestTest { + private static final Logger logger = LoggerFactory.getLogger(GetConnectorPluginsTest.class); + + /** + * Test Parsing GET /connectors response. + */ + @Test + public void testParseResponse() throws IOException { + final String mockResponse = readFile("getConnectorPlugins.json"); + final List result = new ArrayList<>(new GetConnectorPlugins().parseResponse(mockResponse)); + + // Validate + assertNotNull("Should not be null", result); + assertEquals("Should have two entries", 2, result.size()); + + assertEquals("Should have connector", result.get(0).getClassName(), "org.apache.kafka.connect.file.FileStreamSinkConnector"); + assertEquals("Should have type", result.get(0).getType(), "sink"); + assertEquals("Should have version", result.get(0).getVersion(), "1.0.0-cp1"); + + assertEquals("Should have connector", result.get(1).getClassName(), "org.apache.kafka.connect.file.FileStreamSourceConnector"); + assertEquals("Should have type", result.get(1).getType(), "source"); + assertEquals("Should have version", result.get(1).getVersion(), "1.0.0-cp1"); + } +} \ No newline at end of file diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/request/get/connector/GetConnectorTasksTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/request/get/connector/GetConnectorTasksTest.java new file mode 100644 index 0000000..cf0f6b2 --- /dev/null +++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/request/get/connector/GetConnectorTasksTest.java @@ -0,0 +1,66 @@ +/** + * Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.sourcelab.kafka.connect.apiclient.request.get.connector; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sourcelab.kafka.connect.apiclient.request.AbstractRequestTest; +import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin; +import org.sourcelab.kafka.connect.apiclient.request.dto.Task; +import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorPlugins; +import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTasks; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class GetConnectorTasksTest extends AbstractRequestTest { + private static final Logger logger = LoggerFactory.getLogger(GetConnectorTasksTest.class); + + /** + * Test Parsing GET /connectors response. + */ + @Test + public void testParseResponse() throws IOException { + final String mockResponse = readFile("getConnectorTasks.json"); + final List result = new ArrayList<>(new GetConnectorTasks("MyTestConnector").parseResponse(mockResponse)); + + // Validate + assertNotNull("Should not be null", result); + assertEquals("Should have one entry", 1, result.size()); + + assertEquals("Should have connector", result.get(0).getId().getConnector(), "MyTestConnector"); + assertEquals("Should have task id", result.get(0).getId().getTask(), 0); + + assertEquals("Should have configs", result.get(0).getConfig(), ImmutableMap.builder() + .put("connector.class", "org.apache.kafka.connect.tools.VerifiableSourceConnector") + .put("task.class", "org.apache.kafka.connect.tools.VerifiableSourceTask") + .put("tasks.max", "1") + .put("topics", "test-topic") + .put("name", "MyTestConnector") + .put("id", "0") + .build() + ); + } +} \ No newline at end of file