Skip to content

Commit b12f2d4

Browse files
Adding Kafka Trigger (#116)
* Adding Kafka Trigger * Add methods for windows support * remove CustomBindings
1 parent 3c4e141 commit b12f2d4

File tree

5 files changed

+272
-1
lines changed

5 files changed

+272
-1
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,7 @@ hs_err_pid*
3535
/azure-functions-java-worker/
3636

3737
#IDE
38-
*.iml
38+
*.iml
39+
40+
#OSX
41+
.DS_Store
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
* Licensed under the MIT License. See License.txt in the project root for
4+
* license information.
5+
*/
6+
package com.microsoft.azure.functions;
7+
8+
/**
9+
* Defines the broker authentication modes
10+
*/
11+
public enum BrokerAuthenticationMode {
12+
NOTSET(-1),
13+
GSSAPI(0),
14+
PLAIN(1),
15+
SCRAMSHA256(2),
16+
SCRAMSHA512(3);
17+
18+
private final int value;
19+
20+
BrokerAuthenticationMode(final int value) {
21+
this.value = value;
22+
}
23+
24+
public int getValue() { return value; }
25+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/**
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
* Licensed under the MIT License. See License.txt in the project root for
4+
* license information.
5+
*/
6+
package com.microsoft.azure.functions;
7+
8+
public enum BrokerProtocol {
9+
NOTSET(-1),
10+
PLAINTEXT(0),
11+
SSL(1),
12+
SASLPLAINTEXT(2),
13+
SASLSSL(3);
14+
15+
private int value;
16+
BrokerProtocol(final int value) {
17+
this.value = value;
18+
}
19+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
* Licensed under the MIT License. See License.txt in the project root for
4+
* license information.
5+
*/
6+
package com.microsoft.azure.functions.annotation;
7+
8+
import com.microsoft.azure.functions.BrokerAuthenticationMode;
9+
import com.microsoft.azure.functions.BrokerProtocol;
10+
11+
import java.lang.annotation.ElementType;
12+
import java.lang.annotation.Retention;
13+
import java.lang.annotation.RetentionPolicy;
14+
import java.lang.annotation.Target;
15+
16+
/**
17+
* <p>Annotation for Kafka output bindings</p>
18+
*/
19+
@Target(ElementType.PARAMETER)
20+
@Retention(RetentionPolicy.RUNTIME)
21+
public @interface KafkaOutput {
22+
/**
23+
* Gets the Topic.
24+
* @return
25+
*/
26+
String topic();
27+
28+
/**
29+
* Gets or sets the BrokerList.
30+
*/
31+
String brokerList();
32+
33+
/**
34+
* Gets or sets the Maximum transmit message size. Default: 1MB
35+
*/
36+
int maxMessageBytes() default 1000012; // Follow the kafka spec https://kafka.apache.org/documentation/
37+
38+
/**
39+
* Maximum number of messages batched in one MessageSet. default: 10000
40+
*/
41+
int batchSize() default 10000;
42+
43+
/**
44+
* When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false
45+
*/
46+
boolean enableIdempotence() default false;
47+
48+
/**
49+
* Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000
50+
*/
51+
int messageTimeoutMs() default 300000;
52+
53+
/**
54+
* The ack timeout of the producer request in milliseconds. default: 5000
55+
*/
56+
int requestTimeoutMs() default 5000;
57+
58+
/**
59+
* How many times to retry sending a failing Message. **Note:** default: 2
60+
* Retrying may cause reordering unless EnableIdempotence is set to true.
61+
* @see #enableIdempotence()
62+
*/
63+
int maxRetries() default 2;
64+
65+
/**
66+
* SASL mechanism to use for authentication.
67+
* Default: PLAIN
68+
*/
69+
BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET; // TODO double check if it is OK
70+
71+
/**
72+
* SASL username with the PLAIN and SASL-SCRAM-.. mechanisms
73+
* Default: ""
74+
*/
75+
String username() default "";
76+
77+
/**
78+
* SASL password with the PLAIN and SASL-SCRAM-.. mechanisms
79+
* Default is plaintext
80+
*
81+
* security.protocol in librdkafka
82+
*/
83+
String password() default "";
84+
85+
/**
86+
* Gets or sets the security protocol used to communicate with brokers
87+
* default is PLAINTEXT
88+
*/
89+
BrokerProtocol protocol() default BrokerProtocol.NOTSET;
90+
91+
/**
92+
* Path to client's private key (PEM) used for authentication.
93+
* Default ""
94+
* ssl.key.location in librdkafka
95+
*/
96+
String sslKeyLocation() default "";
97+
98+
/**
99+
* Path to CA certificate file for verifying the broker's certificate.
100+
* ssl.ca.location in librdkafka
101+
*/
102+
String sslCaLocation() default "";
103+
104+
/**
105+
* Path to client's certificate.
106+
* ssl.certificate.location in librdkafka
107+
*/
108+
String sslCertificateLocation() default "";
109+
110+
/**
111+
* Password for client's certificate.
112+
* ssl.key.password in librdkafka
113+
*/
114+
String sslKeyPassword() default "";
115+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
* Licensed under the MIT License. See License.txt in the project root for
4+
* license information.
5+
*/
6+
package com.microsoft.azure.functions.annotation;
7+
8+
import com.microsoft.azure.functions.BrokerAuthenticationMode;
9+
import com.microsoft.azure.functions.BrokerProtocol;
10+
11+
12+
import java.lang.annotation.Retention;
13+
import java.lang.annotation.Target;
14+
import java.lang.annotation.RetentionPolicy;
15+
import java.lang.annotation.ElementType;
16+
17+
18+
/**
19+
* <p>Annotation for KafkaTrigger bindings</p>
20+
*/
21+
@Target(ElementType.PARAMETER)
22+
@Retention(RetentionPolicy.RUNTIME)
23+
public @interface KafkaTrigger {
24+
/**
25+
* Gets the Topic.
26+
*/
27+
String topic();
28+
29+
/**
30+
* Gets or sets the BrokerList.
31+
*/
32+
String brokerList();
33+
34+
/**
35+
* Gets or sets the EventHub connection string when using KafkaOutput protocol header feature of Azure EventHubs.
36+
*/
37+
String eventHubConnectionString() default "";
38+
/**
39+
* Cardinality of the trigger input.
40+
* Choose 'One' if the input is a single message or 'Many' if the input is an array of messages.
41+
* If you choose 'Many', please set a dataType.
42+
* Default: 'One'
43+
*/
44+
Cardinality cardinality() default Cardinality.ONE;
45+
/**
46+
* DataType for the Cardinality settings. If you set the cardinality as Cardinality.MANY, Azure Functions Host will deserialize
47+
* the kafka events as an array of this type.
48+
* Allowed values: string, binary, stream
49+
* Default: ""
50+
*/
51+
String dataType() default "";
52+
53+
/**
54+
* Gets or sets the consumer group.
55+
*/
56+
String consumerGroup();
57+
58+
/**
59+
* SASL mechanism to use for authentication.
60+
* Allowed values: Gssapi, Plain, ScramSha256, ScramSha512
61+
* Default: PLAIN
62+
*/
63+
BrokerAuthenticationMode authenticationMode() default BrokerAuthenticationMode.NOTSET;
64+
65+
/**
66+
* SASL username with the PLAIN and SASL-SCRAM-.. mechanisms
67+
* Default: ""
68+
*/
69+
String username() default "";
70+
71+
/**
72+
* SASL password with the PLAIN and SASL-SCRAM-.. mechanisms
73+
* Default: ""
74+
*
75+
* security.protocol in librdkafka
76+
*/
77+
String password() default "";
78+
79+
/**
80+
* Gets or sets the security protocol used to communicate with brokers
81+
* default is PLAINTEXT
82+
*/
83+
BrokerProtocol protocol() default BrokerProtocol.NOTSET;
84+
85+
/**
86+
* Path to client's private key (PEM) used for authentication.
87+
* Default ""
88+
* ssl.key.location in librdkafka
89+
*/
90+
String sslKeyLocation() default "";
91+
92+
/**
93+
* Path to CA certificate file for verifying the broker's certificate.
94+
* ssl.ca.location in librdkafka
95+
*/
96+
String sslCaLocation() default "";
97+
98+
/**
99+
* Path to client's certificate.
100+
* ssl.certificate.location in librdkafka
101+
*/
102+
String sslCertificateLocation() default "";
103+
104+
/**
105+
* Password for client's certificate.
106+
* ssl.key.password in librdkafka
107+
*/
108+
String sslKeyPassword() default "";
109+
}

0 commit comments

Comments
 (0)