Skip to content

Commit 2115225

Browse files
committed
Stabilize Test kafka connect upgrade
1 parent 9751f13 commit 2115225

File tree

4 files changed

+163
-7
lines changed

4 files changed

+163
-7
lines changed

.github/workflows/ci_build_test.yaml

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,93 @@ jobs:
218218
pip install -r test/requirements.txt
219219
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
220220
echo "Test kafka connect upgrade ..."
221+
source $GITHUB_WORKSPACE/test/config.sh
222+
test -f $connector_path/$old_connector_name && echo $connector_path /$old_connector_name
223+
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
224+
sleep 20
225+
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
226+
"name": "kafka_connect",
227+
"config": {
228+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
229+
"tasks.max": "1",
230+
"splunk.indexes": "'"$splunk_index"'",
231+
"topics": "kafka_connect_upgrade",
232+
"splunk.hec.ack.enabled": "false",
233+
"splunk.hec.uri": "'"$splunk_hec_url"'",
234+
"splunk.hec.ssl.validate.certs": "false",
235+
"splunk.hec.token": "'"$splunk_token"'" ,
236+
"splunk.flush.window": "1",
237+
"splunk.sources": "kafka_connect"
238+
}
239+
}'
240+
sleep 10
241+
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
242+
"name": "kafka_connect_ack",
243+
"config": {
244+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
245+
"tasks.max": "1",
246+
"splunk.indexes": "'"$splunk_index"'",
247+
"topics": "kafka_connect_upgrade",
248+
"splunk.hec.ack.enabled": "true",
249+
"splunk.hec.uri": "'"$splunk_hec_url"'",
250+
"splunk.hec.ssl.validate.certs": "false",
251+
"splunk.hec.token": "'"$splunk_token_ack"'" ,
252+
"splunk.flush.window": "1",
253+
"splunk.sources": "kafka_connect_ack"
254+
}
255+
}'
256+
sleep 5
257+
python test/lib/eventproducer_connector_upgrade.py --log-level=INFO
258+
curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect"
259+
curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect_ack"
260+
sudo kill $(sudo lsof -t -i:8083) && sleep 2
261+
sudo rm $connector_path/$old_connector_name && sleep 2
262+
sudo cp $connector_build_target/splunk-kafka-connect*.jar $connector_path && sleep 2
263+
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
264+
sleep 10
265+
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
266+
"name": "kafka_connect",
267+
"config": {
268+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
269+
"tasks.max": "1",
270+
"splunk.indexes": "'"$splunk_index"'",
271+
"topics": "kafka_connect_upgrade",
272+
"splunk.hec.ack.enabled": "false",
273+
"splunk.hec.uri": "'"$splunk_hec_url"'",
274+
"splunk.hec.ssl.validate.certs": "false",
275+
"splunk.hec.token": "'"$splunk_token"'" ,
276+
"splunk.sources": "kafka_connect",
277+
"splunk.hec.json.event.formatted": "true",
278+
"splunk.flush.window": "1",
279+
"splunk.hec.raw": "false"
280+
}
281+
}'
282+
sleep 10
283+
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
284+
"name": "kafka_connect_ack",
285+
"config": {
286+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
287+
"tasks.max": "1",
288+
"splunk.indexes": "'"$splunk_index"'",
289+
"topics": "kafka_connect_upgrade",
290+
"splunk.hec.ack.enabled": "true",
291+
"splunk.hec.uri": "'"$splunk_hec_url"'",
292+
"splunk.hec.ssl.validate.certs": "false",
293+
"splunk.hec.token": "'"$splunk_token_ack"'" ,
294+
"splunk.sources": "kafka_connect_ack",
295+
"splunk.hec.json.event.formatted": "true",
296+
"splunk.flush.window": "1",
297+
"splunk.hec.raw": "false"
298+
}
299+
}'
300+
sleep 5
301+
python test/lib/eventproducer_connector_upgrade.py --log-level=INFO
221302
python test/lib/connector_upgrade.py --log-level=INFO
303+
- uses: actions/upload-artifact@v3
304+
if: always()
305+
with:
306+
name: kafka-connect-logs
307+
path: output.log
222308

223309
- name: Install kafka connect
224310
run: |

test/config.sh

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
export splunkd_url=https://127.0.0.1:8089
2+
export splunk_hec_url=https://127.0.0.1:8088
3+
export splunk_user=admin
4+
export splunk_password=helloworld
5+
export splunk_index=main
6+
export splunk_token=a6b5e77f-d5f6-415a-bd43-930cecb12959
7+
export splunk_token_ack=a6b5e77f-d5f6-415a-bd43-930cecb12950
8+
export kafka_broker_url=127.0.0.1:9092
9+
export kafka_connect_url=http://127.0.0.1:8083
10+
export kafka_topic=test-datagen
11+
export kafka_topic_2=kafka_topic_2
12+
export kafka_header_topic=kafka_header_topic
13+
export kafka_header_index=kafka
14+
export connector_path=/usr/local/share/kafka/plugins
15+
export connector_build_target=/usr/local/share/kafka-connector
16+
export kafka_home=/usr/local/kafka
17+
export kafka_connect_home=/home/circleci/repo
18+
export old_connector_name=splunk-kafka-connect-v2.0.1.jar

test/lib/connector_upgrade.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,21 +158,21 @@ def update_kafka_connectors():
158158
thread_upgrade = threading.Thread(target=upgrade_connector_plugin, daemon=True)
159159
thread_upgrade.start()
160160
time.sleep(100)
161-
search_query_1 = f"index={config['splunk_index']} | search timestamp=\"{_time_stamp}\" source::{_connector}"
161+
search_query_1 = f"index={config['splunk_index']} | search source::{_connector}"
162162
logger.debug(search_query_1)
163-
events_1 = check_events_from_splunk(start_time="-15m@m",
163+
events_1 = check_events_from_splunk(start_time="-24h@h",
164164
url=config["splunkd_url"],
165165
user=config["splunk_user"],
166166
query=[f"search {search_query_1}"],
167167
password=config["splunk_password"])
168-
logger.info("Splunk received %s events in the last 15m", len(events_1))
168+
logger.info("Splunk received %s events in the last 24h", len(events_1))
169169
assert len(events_1) == 2000
170-
search_query_2 = f"index={config['splunk_index']} | search timestamp=\"{_time_stamp}\" source::{_connector_ack}"
170+
search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack}"
171171
logger.debug(search_query_2)
172-
events_2 = check_events_from_splunk(start_time="-15m@m",
172+
events_2 = check_events_from_splunk(start_time="-24h@h",
173173
url=config["splunkd_url"],
174174
user=config["splunk_user"],
175175
query=[f"search {search_query_2}"],
176176
password=config["splunk_password"])
177-
logger.info("Splunk received %s events in the last 15m", len(events_2))
178-
assert len(events_2) == 2000
177+
logger.info("Splunk received %s events in the last 24h", len(events_2))
178+
assert len(events_2) == 2000
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from kafka.producer import KafkaProducer
2+
import sys
3+
import os
4+
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
5+
6+
from lib.commonsplunk import check_events_from_splunk
7+
from lib.commonkafka import *
8+
from lib.helper import *
9+
from datetime import datetime
10+
import threading
11+
import logging.config
12+
import yaml
13+
import subprocess
14+
import logging
15+
import time
16+
17+
logging.config.fileConfig(os.path.join(get_test_folder(), "logging.conf"))
18+
logger = logging.getLogger('connector_upgrade')
19+
20+
_config_path = os.path.join(get_test_folder(), 'config.yaml')
21+
with open(_config_path, 'r') as yaml_file:
22+
config = yaml.load(yaml_file)
23+
now = datetime.now()
24+
_time_stamp = str(datetime.timestamp(now))
25+
_topic = 'kafka_connect_upgrade'
26+
27+
28+
def generate_kafka_events(num):
29+
topics = [_topic]
30+
client = KafkaAdminClient(bootstrap_servers=config["kafka_broker_url"], client_id='test')
31+
broker_topics = client.list_topics()
32+
logger.info(broker_topics)
33+
if _topic not in broker_topics:
34+
create_kafka_topics(config, topics)
35+
producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"],
36+
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
37+
38+
for _ in range(num):
39+
msg = {"timestamp": _time_stamp}
40+
producer.send(_topic, msg)
41+
time.sleep(0.05)
42+
producer.flush()
43+
44+
45+
46+
if __name__ == '__main__':
47+
48+
time.sleep(20)
49+
logger.info("Generate Kafka events ...")
50+
thread_gen = threading.Thread(target=generate_kafka_events, args=(1000,), daemon=True)
51+
thread_gen.start()
52+
time.sleep(100)

0 commit comments

Comments
 (0)