From 2aab522c05569aec5c04e54c53e952005647ebb3 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Mon, 13 Jul 2020 17:11:05 -0700 Subject: [PATCH 1/9] Add comment for the release --- .../functions/annotation/KafkaOutput.java | 77 +++++++++++++++++-- .../functions/annotation/KafkaTrigger.java | 70 +++++++++++++++-- 2 files changed, 132 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index b3a539a..6e21118 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -14,13 +14,43 @@ import java.lang.annotation.Target; /** - *

Annotation for Kafka output bindings

+ *

+ * Place this on a parameter whose value would be written to Kafka. The parameter type should be + * OutputBinding<T>, where T could be one of: + *

+ * + * + * + *

+ * The following example shows a Java function that produce a message to the Kafka cluster, using event + * provided in the body of an HTTP Post request. + *

+ * + *
+ * {@literal @}FunctionName("kafkaInupt-Java")
+ *
+ * public String input(
+ *    {@literal @}HttpTrigger(name = "request", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
+ *     final String message,
+ *    {@literal @}KafkaOutput(name = "database", topic = "users", brokerList="broker:29092") OutputBinding output,
+ *    final ExecutionContext context) {
+ *     context.getLogger().info("Message:" + message);
+ *     output.setValue(message);
+ *     return "{ \"id\": \"" + System.currentTimeMillis() + "\", \"description\": \"" + message + "\" }";
+ * }
+ * 
+ * + * @since 1.0.0 */ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface KafkaOutput { /** * The variable name used in function.json. + * * @return The variable name used in function.json. */ String name(); @@ -36,38 +66,51 @@ String dataType() default ""; /** - * Gets the Topic. - * @return + * Defines the Topic. + * + * @return The topic name. */ String topic(); /** - * Gets or sets the BrokerList. + * Defines the BrokerList. + * + * @return The brokerList name string. */ String brokerList(); /** - * Gets or sets the Maximum transmit message size. Default: 1MB + * Defines the maximum transmit message size. Default: 1MB + * + * @return The maximum trnasmit message size. */ int maxMessageBytes() default 1000012; // Follow the kafka spec https://kafka.apache.org/documentation/ /** - * Maximum number of messages batched in one MessageSet. default: 10000 + * Defines the maximum number of messages batched in one MessageSet. default: 10000 + * + * @return The maximum number of messages batched in one MessageSet. */ int batchSize() default 10000; /** * When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false + * + * @return whether idempotence is enabled. */ boolean enableIdempotence() default false; /** * Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000 + * + * @return The local message timeout. */ int messageTimeoutMs() default 300000; /** * The ack timeout of the producer request in milliseconds. default: 5000 + * + * @return The ack timeout of the producer request in milliseconds. */ int requestTimeoutMs() default 5000; @@ -75,18 +118,24 @@ * How many times to retry sending a failing Message. **Note:** default: 2 * Retrying may cause reordering unless EnableIdempotence is set to true. * @see #enableIdempotence() + * + * @return The number of the max retries. */ int maxRetries() default 2; /** * SASL mechanism to use for authentication. * Default: PLAIN + * + * @return The SASL mechanism. */ - BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET; // TODO double check if it is OK + BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET; /** * SASL username with the PLAIN and SASL-SCRAM-.. mechanisms * Default: "" + * + * @return The SASL username. */ String username() default ""; @@ -95,12 +144,16 @@ * Default is plaintext * * security.protocol in librdkafka + * + * @return The SASL password. */ String password() default ""; /** * Gets or sets the security protocol used to communicate with brokers * default is PLAINTEXT + * + * @return The protocol. */ BrokerProtocol protocol() default BrokerProtocol.NOTSET; @@ -108,24 +161,32 @@ * Path to client's private key (PEM) used for authentication. * Default "" * ssl.key.location in librdkafka + * + * @return The ssl.key.location. */ String sslKeyLocation() default ""; - /** + /** * Path to CA certificate file for verifying the broker's certificate. * ssl.ca.location in librdkafka + * + * @return The ssl ca location. */ String sslCaLocation() default ""; /** * Path to client's certificate. * ssl.certificate.location in librdkafka + * + * @return The client certificate. */ String sslCertificateLocation() default ""; /** * Password for client's certificate. * ssl.key.password in librdkafka + * + * @return The password of the client certificate. */ String sslKeyPassword() default ""; } diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index c00819d..7cb3335 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -16,7 +16,37 @@ /** - *

Annotation for KafkaTrigger bindings

+ *

+ * Place this on a parameter whose value would come from Kafka, and causing the method to run + * when Kafka event is consumed. The parameter type can be one of the following: + *

+ * + * + * + *

+ * The following example shows a Java function that is invoked when messages are consumed with + * the specified topic, brokerList, and consumerGroup on a Kafka cluster. + *

+ * + *
+ * {@literal @}FunctionName("KafkaTrigger-Java")
+ * public void run(
+ *    {@literal @}KafkaTrigger(name = "kafkaTrigger",
+ *                      topic = "users", 
+ *                      brokerList="broker:29092",
+ *                      consumerGroup="functions")
+ *                      List<Map<String, String>> kafkaEventData,
+ *     final ExecutionContext context
+ * ) {
+ *     context.getLogger().info(kafkaEventData);
+ * }
+ * 
+ * + * @since 1.0.0 */ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @@ -29,17 +59,23 @@ String name(); /** - * Gets the Topic. + * Defines the Topic. + * + * @return The topic. */ String topic(); /** - * Gets or sets the BrokerList. + * Defines the BrokerList. + * + * @return The brokerList. */ String brokerList(); /** - * Gets or sets the EventHub connection string when using KafkaOutput protocol header feature of Azure EventHubs. + * Defines the EventHub connection string when using KafkaOutput protocol header feature of Azure EventHubs. + * + * @return The EventHub connection string. */ String eventHubConnectionString() default ""; /** @@ -47,6 +83,8 @@ * Choose 'One' if the input is a single message or 'Many' if the input is an array of messages. * If you choose 'Many', please set a dataType. * Default: 'One' + * + * @return The cardinality. */ Cardinality cardinality() default Cardinality.ONE; /** @@ -54,11 +92,13 @@ * the kafka events as an array of this type. * Allowed values: string, binary, stream * Default: "" + * + * @return The dataType. */ String dataType() default ""; /** - * Gets or sets the consumer group. + * Defines the consumer group. */ String consumerGroup(); @@ -66,12 +106,16 @@ * SASL mechanism to use for authentication. * Allowed values: Gssapi, Plain, ScramSha256, ScramSha512 * Default: PLAIN + * + * @return The broker authentication mode. */ BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET; /** * SASL username with the PLAIN and SASL-SCRAM-.. mechanisms * Default: "" + * + * @return The SASL username. */ String username() default ""; @@ -80,12 +124,16 @@ * Default: "" * * security.protocol in librdkafka + * + * @return The SASL password. */ String password() default ""; /** - * Gets or sets the security protocol used to communicate with brokers + * Defines the security protocol used to communicate with brokers * default is PLAINTEXT + * + * @return The security protocol. */ BrokerProtocol protocol() default BrokerProtocol.NOTSET; @@ -93,24 +141,32 @@ * Path to client's private key (PEM) used for authentication. * Default "" * ssl.key.location in librdkafka + * + * @return The ssl key location. */ String sslKeyLocation() default ""; - /** + /** * Path to CA certificate file for verifying the broker's certificate. * ssl.ca.location in librdkafka + * + * @return The path to CA certificate file. */ String sslCaLocation() default ""; /** * Path to client's certificate. * ssl.certificate.location in librdkafka + * + * @return The ssl certificate location. */ String sslCertificateLocation() default ""; /** * Password for client's certificate. * ssl.key.password in librdkafka + * + * @return The ssl key password. */ String sslKeyPassword() default ""; } From ba3f3f13403ddfbed215523c4d32b5556cf0b539 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Mon, 13 Jul 2020 17:17:15 -0700 Subject: [PATCH 2/9] Fit the context to the Kfaka --- .../microsoft/azure/functions/annotation/KafkaOutput.java | 6 +++--- .../microsoft/azure/functions/annotation/KafkaTrigger.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index 6e21118..0abb1c9 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -20,8 +20,8 @@ *

* * * *

@@ -35,7 +35,7 @@ * public String input( * {@literal @}HttpTrigger(name = "request", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) * final String message, - * {@literal @}KafkaOutput(name = "database", topic = "users", brokerList="broker:29092") OutputBinding output, + * {@literal @}KafkaOutput(name = "event", topic = "users", brokerList="broker:29092") OutputBinding output, * final ExecutionContext context) { * context.getLogger().info("Message:" + message); * output.setValue(message); diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index 7cb3335..e6aea73 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -22,9 +22,9 @@ *

* * * *

From efd27fb93c5286bfb542ecb71087b598ff4fadf6 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Mon, 13 Jul 2020 17:23:04 -0700 Subject: [PATCH 3/9] fix ci fails --- .../com/microsoft/azure/functions/annotation/KafkaOutput.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index 0abb1c9..9068edd 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -35,7 +35,7 @@ * public String input( * {@literal @}HttpTrigger(name = "request", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) * final String message, - * {@literal @}KafkaOutput(name = "event", topic = "users", brokerList="broker:29092") OutputBinding output, + * {@literal @}KafkaOutput(name = "event", topic = "users", brokerList="broker:29092") OutputBinding<String< output, * final ExecutionContext context) { * context.getLogger().info("Message:" + message); * output.setValue(message); From 3a41a840449b83c531a19bc64cfe4e1e371e2231 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Mon, 13 Jul 2020 17:49:43 -0700 Subject: [PATCH 4/9] update version and fix the grammar --- .../com/microsoft/azure/functions/annotation/KafkaOutput.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index 9068edd..d8a7fb1 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -15,7 +15,7 @@ /** *

- * Place this on a parameter whose value would be written to Kafka. The parameter type should be + * Place this on a parameter whose value would be published to Kafka. The parameter type should be * OutputBinding<T>, where T could be one of: *

* @@ -43,7 +43,7 @@ * } * * - * @since 1.0.0 + * @since 1.4.0 */ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) From d1d41fdfcd4ffe4e668edc64c31e60f681a92b9c Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Mon, 13 Jul 2020 17:53:54 -0700 Subject: [PATCH 5/9] apply the same change to the KafkaTrigger --- .../com/microsoft/azure/functions/annotation/KafkaOutput.java | 4 ++-- .../microsoft/azure/functions/annotation/KafkaTrigger.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index d8a7fb1..2247d90 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -162,7 +162,7 @@ * Default "" * ssl.key.location in librdkafka * - * @return The ssl.key.location. + * @return The ssl key location. */ String sslKeyLocation() default ""; @@ -178,7 +178,7 @@ * Path to client's certificate. * ssl.certificate.location in librdkafka * - * @return The client certificate. + * @return The ssl client certification. */ String sslCertificateLocation() default ""; diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index e6aea73..fabe4d8 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -46,7 +46,7 @@ * } * * - * @since 1.0.0 + * @since 1.4.0 */ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @@ -158,7 +158,7 @@ * Path to client's certificate. * ssl.certificate.location in librdkafka * - * @return The ssl certificate location. + * @return The ssl client certification. */ String sslCertificateLocation() default ""; From d033b2741aceda1a65e8567a8adc147d03ea7a90 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Mon, 13 Jul 2020 18:24:19 -0700 Subject: [PATCH 6/9] add new line for commenting --- .../azure/functions/annotation/KafkaOutput.java | 3 ++- .../azure/functions/annotation/KafkaTrigger.java | 15 +++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index 2247d90..8b85252 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -48,6 +48,7 @@ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface KafkaOutput { + /** * The variable name used in function.json. * @@ -59,7 +60,7 @@ *

Defines how Functions runtime should treat the parameter value. Possible values are:

*
    *
  • "" or string: treat it as a string whose value is serialized from the parameter
  • - *
  • binary: treat it as a binary data whose value comes from for example OutputBinding<byte[]>
  • + *
  • binary: treat it as a binary data whose value comes from for example OutputBinding<byte[]<
  • *
* @return The dataType which will be used by the Functions runtime. */ diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index fabe4d8..d998911 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -51,6 +51,7 @@ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface KafkaTrigger { + /** * The variable name used in function code for the request or request body. * @@ -87,13 +88,15 @@ * @return The cardinality. */ Cardinality cardinality() default Cardinality.ONE; + /** - * DataType for the Cardinality settings. If you set the cardinality as Cardinality.MANY, Azure Functions Host will deserialize - * the kafka events as an array of this type. - * Allowed values: string, binary, stream - * Default: "" - * - * @return The dataType. + *

Defines how Functions runtime should treat the parameter value. Possible values are:

+ *
    + *
  • "": get the value as a string, and try to deserialize to actual parameter type like POJO
  • + *
  • string: always get the value as a string
  • + *
  • binary: get the value as a binary data, and try to deserialize to actual parameter type byte[]
  • + *
+ * @return The dataType which will be used by the Functions runtime. */ String dataType() default ""; From b098c745a87b20b5ee45bfa62d77916789821bb7 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Mon, 13 Jul 2020 18:34:11 -0700 Subject: [PATCH 7/9] nit --- .../microsoft/azure/functions/annotation/KafkaOutput.java | 6 +++--- .../microsoft/azure/functions/annotation/KafkaTrigger.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index 8b85252..2010abd 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -48,7 +48,7 @@ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface KafkaOutput { - + /** * The variable name used in function.json. * @@ -179,7 +179,7 @@ * Path to client's certificate. * ssl.certificate.location in librdkafka * - * @return The ssl client certification. + * @return The ssl certificate location. */ String sslCertificateLocation() default ""; @@ -187,7 +187,7 @@ * Password for client's certificate. * ssl.key.password in librdkafka * - * @return The password of the client certificate. + * @return The ssl key password. */ String sslKeyPassword() default ""; } diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index d998911..0f8554c 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -51,7 +51,7 @@ @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) public @interface KafkaTrigger { - + /** * The variable name used in function code for the request or request body. * @@ -161,7 +161,7 @@ * Path to client's certificate. * ssl.certificate.location in librdkafka * - * @return The ssl client certification. + * @return The ssl certificate location. */ String sslCertificateLocation() default ""; From 5761d30b2a97bd6dc8145d6e63220bc0b3d4fad7 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Tue, 14 Jul 2020 10:02:19 -0700 Subject: [PATCH 8/9] update comment for POJO. Pojo is available only for cardinality.one not for cardinality.many --- .../com/microsoft/azure/functions/annotation/KafkaTrigger.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index 0f8554c..ddc292f 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -24,7 +24,7 @@ *
    *
  • Any native Java types such as int, String, byte[]
  • *
  • Nullable values using Optional<T>
  • - *
  • Any POJO type
  • + *
  • Any POJO type for Cardinality.One
  • *
* *

From 3648b89bbc764c0a18991484b6aa1bcde6a0f38a Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Tue, 14 Jul 2020 10:10:02 -0700 Subject: [PATCH 9/9] update the sentense to understand more --- .../com/microsoft/azure/functions/annotation/KafkaTrigger.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index ddc292f..301f87d 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -24,7 +24,7 @@ *

    *
  • Any native Java types such as int, String, byte[]
  • *
  • Nullable values using Optional<T>
  • - *
  • Any POJO type for Cardinality.One
  • + *
  • Any POJO type, currently supported only for Cardinality.One
  • *
* *