@@ -81,8 +81,8 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object,
81
81
return NULL ;
82
82
}
83
83
} else {
84
- // Do nothing - Connection:: NodeConfigureCallbacks will handle this for each
85
- // of the three client types.
84
+ // Do nothing - NodeConfigureCallbacks will handle this for each
85
+ // of the three client types, called from within JavaScript .
86
86
}
87
87
}
88
88
@@ -118,6 +118,23 @@ void Conf::ConfigureCallback(const std::string &string_key, const v8::Local<v8::
118
118
offset_commit->dispatcher .RemoveCallback (cb);
119
119
}
120
120
}
121
+ } else if (string_key.compare (" oauthbearer_token_refresh_cb" ) == 0 ) {
122
+ NodeKafka::Callbacks::OAuthBearerTokenRefresh *oauthbearer_token_refresh =
123
+ oauthbearer_token_refresh_cb ();
124
+ if (add) {
125
+ if (oauthbearer_token_refresh == NULL ) {
126
+ oauthbearer_token_refresh =
127
+ new NodeKafka::Callbacks::OAuthBearerTokenRefresh ();
128
+ this ->set (string_key, oauthbearer_token_refresh, errstr);
129
+ }
130
+ oauthbearer_token_refresh->dispatcher .AddCallback (cb);
131
+ } else {
132
+ if (oauthbearer_token_refresh != NULL ) {
133
+ oauthbearer_token_refresh->dispatcher .RemoveCallback (cb);
134
+ }
135
+ }
136
+ } else {
137
+ errstr = " Invalid callback type" ;
121
138
}
122
139
}
123
140
@@ -131,6 +148,12 @@ void Conf::listen() {
131
148
if (offset_commit) {
132
149
offset_commit->dispatcher .Activate ();
133
150
}
151
+
152
+ NodeKafka::Callbacks::OAuthBearerTokenRefresh *oauthbearer_token_refresh =
153
+ oauthbearer_token_refresh_cb ();
154
+ if (oauthbearer_token_refresh) {
155
+ oauthbearer_token_refresh->dispatcher .Activate ();
156
+ }
134
157
}
135
158
136
159
void Conf::stop () {
@@ -143,6 +166,12 @@ void Conf::stop() {
143
166
if (offset_commit) {
144
167
offset_commit->dispatcher .Deactivate ();
145
168
}
169
+
170
+ NodeKafka::Callbacks::OAuthBearerTokenRefresh *oauthbearer_token_refresh =
171
+ oauthbearer_token_refresh_cb ();
172
+ if (oauthbearer_token_refresh) {
173
+ oauthbearer_token_refresh->dispatcher .Deactivate ();
174
+ }
146
175
}
147
176
148
177
Conf::~Conf () {
@@ -167,4 +196,21 @@ NodeKafka::Callbacks::OffsetCommit* Conf::offset_commit_cb() const {
167
196
return static_cast <NodeKafka::Callbacks::OffsetCommit*>(cb);
168
197
}
169
198
199
+ NodeKafka::Callbacks::OAuthBearerTokenRefresh *
200
+ Conf::oauthbearer_token_refresh_cb () const {
201
+ RdKafka::OAuthBearerTokenRefreshCb *cb = NULL ;
202
+ if (this ->get (cb) != RdKafka::Conf::CONF_OK) {
203
+ return NULL ;
204
+ }
205
+ return static_cast <NodeKafka::Callbacks::OAuthBearerTokenRefresh *>(cb);
206
+ }
207
+
208
+ bool Conf::is_sasl_oauthbearer () const {
209
+ std::string sasl_mechanism;
210
+ if (this ->get (" sasl.mechanisms" , sasl_mechanism) != RdKafka::Conf::CONF_OK) {
211
+ return false ;
212
+ }
213
+ return sasl_mechanism.compare (" OAUTHBEARER" ) == 0 ;
214
+ }
215
+
170
216
} // namespace NodeKafka
0 commit comments