Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,44 @@
import java.lang.annotation.Target;

/**
* <p>Annotation for Kafka output bindings</p>
* <p>
* Place this on a parameter whose value would be published to Kafka. The parameter type should be
* OutputBinding&lt;T&gt;, where T could be one of:
* </p>
*
* <ul>
* <li>Any native Java types such as int, String, byte[]</li>
* <li>Any POJO type</li>
* </ul>
*
* <p>
* The following example shows a Java function that produce a message to the Kafka cluster, using event
* provided in the body of an HTTP Post request.
* </p>
*
* <pre>
* {@literal @}FunctionName("kafkaInupt-Java")
*
* public String input(
* {@literal @}HttpTrigger(name = "request", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
* final String message,
* {@literal @}KafkaOutput(name = "event", topic = "users", brokerList="broker:29092") OutputBinding&lt;String&lt; output,
* final ExecutionContext context) {
* context.getLogger().info("Message:" + message);
* output.setValue(message);
* return "{ \"id\": \"" + System.currentTimeMillis() + "\", \"description\": \"" + message + "\" }";
* }
* </pre>
*
* @since 1.4.0
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaOutput {

/**
* The variable name used in function.json.
*
* @return The variable name used in function.json.
*/
String name();
Expand All @@ -29,64 +60,83 @@
* <p>Defines how Functions runtime should treat the parameter value. Possible values are:</p>
* <ul>
* <li>"" or string: treat it as a string whose value is serialized from the parameter</li>
* <li>binary: treat it as a binary data whose value comes from for example OutputBinding&lt;byte[]&gt;</li>
* <li>binary: treat it as a binary data whose value comes from for example OutputBinding&lt;byte[]&lt;</li>
* </ul>
* @return The dataType which will be used by the Functions runtime.
*/
String dataType() default "";

/**
* Gets the Topic.
* @return
* Defines the Topic.
*
* @return The topic name.
*/
String topic();

/**
* Gets or sets the BrokerList.
* Defines the BrokerList.
*
* @return The brokerList name string.
*/
String brokerList();

/**
* Gets or sets the Maximum transmit message size. Default: 1MB
* Defines the maximum transmit message size. Default: 1MB
*
* @return The maximum trnasmit message size.
*/
int maxMessageBytes() default 1000012; // Follow the kafka spec https://kafka.apache.org/documentation/

/**
* Maximum number of messages batched in one MessageSet. default: 10000
* Defines the maximum number of messages batched in one MessageSet. default: 10000
*
* @return The maximum number of messages batched in one MessageSet.
*/
int batchSize() default 10000;

/**
* When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false
*
* @return whether idempotence is enabled.
*/
boolean enableIdempotence() default false;

/**
* Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000
*
* @return The local message timeout.
*/
int messageTimeoutMs() default 300000;

/**
* The ack timeout of the producer request in milliseconds. default: 5000
*
* @return The ack timeout of the producer request in milliseconds.
*/
int requestTimeoutMs() default 5000;

/**
* How many times to retry sending a failing Message. **Note:** default: 2
* Retrying may cause reordering unless EnableIdempotence is set to true.
* @see #enableIdempotence()
*
* @return The number of the max retries.
*/
int maxRetries() default 2;

/**
* SASL mechanism to use for authentication.
* Default: PLAIN
*
* @return The SASL mechanism.
*/
BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET; // TODO double check if it is OK
BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET;

/**
* SASL username with the PLAIN and SASL-SCRAM-.. mechanisms
* Default: ""
*
* @return The SASL username.
*/
String username() default "";

Expand All @@ -95,37 +145,49 @@
* Default is plaintext
*
* security.protocol in librdkafka
*
* @return The SASL password.
*/
String password() default "";

/**
* Gets or sets the security protocol used to communicate with brokers
* default is PLAINTEXT
*
* @return The protocol.
*/
BrokerProtocol protocol() default BrokerProtocol.NOTSET;

/**
* Path to client's private key (PEM) used for authentication.
* Default ""
* ssl.key.location in librdkafka
*
* @return The ssl key location.
*/
String sslKeyLocation() default "";

/**
/**
* Path to CA certificate file for verifying the broker's certificate.
* ssl.ca.location in librdkafka
*
* @return The ssl ca location.
*/
String sslCaLocation() default "";

/**
* Path to client's certificate.
* ssl.certificate.location in librdkafka
*
* @return The ssl certificate location.
*/
String sslCertificateLocation() default "";

/**
* Password for client's certificate.
* ssl.key.password in librdkafka
*
* @return The ssl key password.
*/
String sslKeyPassword() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,42 @@


/**
* <p>Annotation for KafkaTrigger bindings</p>
* <p>
* Place this on a parameter whose value would come from Kafka, and causing the method to run
* when Kafka event is consumed. The parameter type can be one of the following:
* </p>
*
* <ul>
* <li>Any native Java types such as int, String, byte[]</li>
* <li>Nullable values using Optional&lt;T&gt;</li>
* <li>Any POJO type, currently supported only for Cardinality.One</li>
* </ul>
*
* <p>
* The following example shows a Java function that is invoked when messages are consumed with
* the specified topic, brokerList, and consumerGroup on a Kafka cluster.
* </p>
*
* <pre>
* {@literal @}FunctionName("KafkaTrigger-Java")
* public void run(
* {@literal @}KafkaTrigger(name = "kafkaTrigger",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we define datatype?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. dataType has default value. :)

* topic = "users",
* brokerList="broker:29092",
* consumerGroup="functions")
* List&lt;Map&lt;String, String&gt;&gt; kafkaEventData,
* final ExecutionContext context
* ) {
* context.getLogger().info(kafkaEventData);
* }
* </pre>
*
* @since 1.4.0
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaTrigger {

/**
* The variable name used in function code for the request or request body.
*
Expand All @@ -29,49 +60,65 @@
String name();

/**
* Gets the Topic.
* Defines the Topic.
*
* @return The topic.
*/
String topic();

/**
* Gets or sets the BrokerList.
* Defines the BrokerList.
*
* @return The brokerList.
*/
String brokerList();

/**
* Gets or sets the EventHub connection string when using KafkaOutput protocol header feature of Azure EventHubs.
* Defines the EventHub connection string when using KafkaOutput protocol header feature of Azure EventHubs.
*
* @return The EventHub connection string.
*/
String eventHubConnectionString() default "";
/**
* Cardinality of the trigger input.
* Choose 'One' if the input is a single message or 'Many' if the input is an array of messages.
* If you choose 'Many', please set a dataType.
* Default: 'One'
*
* @return The cardinality.
*/
Cardinality cardinality() default Cardinality.ONE;

/**
* DataType for the Cardinality settings. If you set the cardinality as Cardinality.MANY, Azure Functions Host will deserialize
* the kafka events as an array of this type.
* Allowed values: string, binary, stream
* Default: ""
* <p>Defines how Functions runtime should treat the parameter value. Possible values are:</p>
* <ul>
* <li>"": get the value as a string, and try to deserialize to actual parameter type like POJO</li>
* <li>string: always get the value as a string</li>
* <li>binary: get the value as a binary data, and try to deserialize to actual parameter type byte[]</li>
* </ul>
* @return The dataType which will be used by the Functions runtime.
*/
String dataType() default "";

/**
* Gets or sets the consumer group.
* Defines the consumer group.
*/
String consumerGroup();

/**
* SASL mechanism to use for authentication.
* Allowed values: Gssapi, Plain, ScramSha256, ScramSha512
* Default: PLAIN
*
* @return The broker authentication mode.
*/
BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET;

/**
* SASL username with the PLAIN and SASL-SCRAM-.. mechanisms
* Default: ""
*
* @return The SASL username.
*/
String username() default "";

Expand All @@ -80,37 +127,49 @@
* Default: ""
*
* security.protocol in librdkafka
*
* @return The SASL password.
*/
String password() default "";

/**
* Gets or sets the security protocol used to communicate with brokers
* Defines the security protocol used to communicate with brokers
* default is PLAINTEXT
*
* @return The security protocol.
*/
BrokerProtocol protocol() default BrokerProtocol.NOTSET;

/**
* Path to client's private key (PEM) used for authentication.
* Default ""
* ssl.key.location in librdkafka
*
* @return The ssl key location.
*/
String sslKeyLocation() default "";

/**
/**
* Path to CA certificate file for verifying the broker's certificate.
* ssl.ca.location in librdkafka
*
* @return The path to CA certificate file.
*/
String sslCaLocation() default "";

/**
* Path to client's certificate.
* ssl.certificate.location in librdkafka
*
* @return The ssl certificate location.
*/
String sslCertificateLocation() default "";

/**
* Password for client's certificate.
* ssl.key.password in librdkafka
*
* @return The ssl key password.
*/
String sslKeyPassword() default "";
}