Skip to content

Commit 61ce47d

Browse files
committed
Updated tasks and stabilizing crud test
1 parent 2115225 commit 61ce47d

File tree

6 files changed

+83
-172
lines changed

6 files changed

+83
-172
lines changed

.github/workflows/ci_build_test.yaml

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ jobs:
205205
- name: Test kafka connect upgrade
206206
run: |
207207
echo "Download kafka connect "$CI_OLD_CONNECTOR_VERSION
208+
# Summary for the test
209+
#1)We will deploy old kafka connector and create 2 tasks for that to check ack and without ack functionality
210+
#2)then we will remove that old kafka connector and deploy new kafka connector with updation of two tasks
211+
#3) Here in the updation we will check for the new functionality("splunk.hec.json.event.formatted" and "splunk.hec.raw") so that we can check if we can successfully upgrade the connector
212+
#4)At last we will check if we have recieved 2000 events for both the tasks
208213
sudo mkdir -p /usr/local/share/kafka/plugins/
209214
wget https://github.com/splunk/kafka-connect-splunk/releases/download/$CI_OLD_CONNECTOR_VERSION/splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar
210215
sudo cp splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar /usr/local/share/kafka/plugins/
@@ -220,8 +225,10 @@ jobs:
220225
echo "Test kafka connect upgrade ..."
221226
source $GITHUB_WORKSPACE/test/config.sh
222227
test -f $connector_path/$old_connector_name && echo $connector_path /$old_connector_name
228+
# Starting Old Connector
223229
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
224230
sleep 20
231+
# Creating the two tasks (with ack and without ack)
225232
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
226233
"name": "kafka_connect",
227234
"config": {
@@ -233,11 +240,12 @@ jobs:
233240
"splunk.hec.uri": "'"$splunk_hec_url"'",
234241
"splunk.hec.ssl.validate.certs": "false",
235242
"splunk.hec.token": "'"$splunk_token"'" ,
236-
"splunk.flush.window": "1",
237-
"splunk.sources": "kafka_connect"
243+
"splunk.sources": "kafka_connect",
244+
"splunk.hec.raw": "true",
245+
"splunk.sourcetypes":"upgraded_test"
238246
}
239247
}'
240-
sleep 10
248+
sleep 5
241249
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
242250
"name": "kafka_connect_ack",
243251
"config": {
@@ -248,23 +256,23 @@ jobs:
248256
"splunk.hec.ack.enabled": "true",
249257
"splunk.hec.uri": "'"$splunk_hec_url"'",
250258
"splunk.hec.ssl.validate.certs": "false",
251-
"splunk.hec.token": "'"$splunk_token_ack"'" ,
252-
"splunk.flush.window": "1",
253-
"splunk.sources": "kafka_connect_ack"
259+
"splunk.hec.token": "'"$splunk_token_ack"'" ,
260+
"splunk.sources": "kafka_connect_ack",
261+
"splunk.hec.raw": "true",
262+
"splunk.sourcetypes":"upgraded_test"
254263
}
255264
}'
256265
sleep 5
257-
python test/lib/eventproducer_connector_upgrade.py --log-level=INFO
258-
curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect"
259-
curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect_ack"
266+
# Generating 1000 events
267+
python test/lib/eventproducer_connector_upgrade.py 1000 --log-level=INFO
260268
sudo kill $(sudo lsof -t -i:8083) && sleep 2
261269
sudo rm $connector_path/$old_connector_name && sleep 2
262270
sudo cp $connector_build_target/splunk-kafka-connect*.jar $connector_path && sleep 2
271+
# Starting New Connector
263272
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
273+
# Updating the two tasks (with ack and without ack)
264274
sleep 10
265-
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
266-
"name": "kafka_connect",
267-
"config": {
275+
curl ${kafka_connect_url}/connectors/kafka_connect/config -X PUT -H "Content-Type: application/json" -d '{
268276
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
269277
"tasks.max": "1",
270278
"splunk.indexes": "'"$splunk_index"'",
@@ -274,15 +282,12 @@ jobs:
274282
"splunk.hec.ssl.validate.certs": "false",
275283
"splunk.hec.token": "'"$splunk_token"'" ,
276284
"splunk.sources": "kafka_connect",
285+
"splunk.hec.raw": "false",
277286
"splunk.hec.json.event.formatted": "true",
278-
"splunk.flush.window": "1",
279-
"splunk.hec.raw": "false"
280-
}
287+
"splunk.sourcetypes":"upgraded_test"
281288
}'
282-
sleep 10
283-
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
284-
"name": "kafka_connect_ack",
285-
"config": {
289+
sleep 5
290+
curl ${kafka_connect_url}/connectors/kafka_connect_ack/config -X PUT -H "Content-Type: application/json" -d '{
286291
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
287292
"tasks.max": "1",
288293
"splunk.indexes": "'"$splunk_index"'",
@@ -292,18 +297,19 @@ jobs:
292297
"splunk.hec.ssl.validate.certs": "false",
293298
"splunk.hec.token": "'"$splunk_token_ack"'" ,
294299
"splunk.sources": "kafka_connect_ack",
300+
"splunk.hec.raw": "false",
295301
"splunk.hec.json.event.formatted": "true",
296-
"splunk.flush.window": "1",
297-
"splunk.hec.raw": "false"
298-
}
302+
"splunk.sourcetypes":"upgraded_test"
299303
}'
300304
sleep 5
301-
python test/lib/eventproducer_connector_upgrade.py --log-level=INFO
305+
# Generating 1000 events
306+
python test/lib/eventproducer_connector_upgrade.py 2000 --log-level=INFO
307+
# Check in splunk that we have recieved 2000 events for with ack and without ack tasks
302308
python test/lib/connector_upgrade.py --log-level=INFO
303309
- uses: actions/upload-artifact@v3
304-
if: always()
310+
if: failure()
305311
with:
306-
name: kafka-connect-logs
312+
name: kafka-connect-logs-${{ matrix.kafka_version }}
307313
path: output.log
308314

309315
- name: Install kafka connect
@@ -320,3 +326,9 @@ jobs:
320326
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
321327
echo "Running functional tests....."
322328
python -m pytest --log-level=INFO
329+
330+
- uses: actions/upload-artifact@v3
331+
if: failure()
332+
with:
333+
name: splunk-events-${{ matrix.kafka_version }}
334+
path: events.txt

test/conftest.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from lib.connect_params import *
1818

1919
from kafka.producer import KafkaProducer
20+
from lib.commonsplunk import check_events_from_splunk
2021
from lib.helper import get_test_folder
2122
from lib.data_gen import generate_connector_content
2223
import pytest
@@ -89,3 +90,17 @@ def pytest_unconfigure():
8990
# Delete launched connectors
9091
for param in connect_params:
9192
delete_kafka_connector(config, param)
93+
94+
def pytest_sessionfinish(session, exitstatus):
95+
if exitstatus != 0:
96+
search_query = f"index={setup['splunk_index']}"
97+
logger.info(search_query)
98+
events = check_events_from_splunk(start_time="-24h@h",
99+
url=setup["splunkd_url"],
100+
user=setup["splunk_user"],
101+
query=[f"search {search_query}"],
102+
password=setup["splunk_password"])
103+
myfile = open('events.txt', 'w+')
104+
for i in events:
105+
myfile.write("%s\n" % i)
106+
myfile.close()

test/lib/commonkafka.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ def delete_kafka_connector(setup, connector):
8181
return False
8282

8383

84-
def get_kafka_connector_tasks(setup, params):
84+
def get_kafka_connector_tasks(setup, params, sleepDuration=0):
8585
'''
8686
Get kafka connect connector tasks using kafka connect REST API
8787
'''
88-
88+
time.sleep(sleepDuration)
8989
t_end = time.time() + 10
9090
while time.time() < t_end:
9191
response = requests.get(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/tasks",
@@ -96,7 +96,6 @@ def get_kafka_connector_tasks(setup, params):
9696

9797
return 0
9898

99-
10099
def get_kafka_connector_status(setup, params, action, state):
101100
'''
102101
Get kafka connect connector tasks using kafka connect REST API

test/lib/connector_upgrade.py

Lines changed: 7 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -21,158 +21,28 @@
2121
with open(_config_path, 'r') as yaml_file:
2222
config = yaml.load(yaml_file)
2323
now = datetime.now()
24-
_time_stamp = str(datetime.timestamp(now))
25-
_topic = 'kafka_connect_upgrade'
2624
_connector = 'kafka_connect'
2725
_connector_ack = 'kafka_connect_ack'
2826

2927

30-
def start_old_connector():
31-
cmds = [f"test -f {config['connector_path']}/{config['old_connector_name']} && echo {config['connector_path']}/{config['old_connector_name']}",
32-
f"cd {config['kafka_home']}",
33-
f"sudo {config['kafka_home']}/bin/connect-distributed.sh {os.environ.get('GITHUB_WORKSPACE')}/config/connect-distributed-quickstart.properties &"]
34-
35-
cmd = "\n".join(cmds)
36-
try:
37-
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
38-
stderr=subprocess.STDOUT)
39-
proc.wait()
40-
except OSError as e:
41-
logger.error(e)
42-
43-
44-
def generate_kafka_events(num):
45-
# Generate message data
46-
topics = [_topic]
47-
connector_content = {
48-
"name": _connector,
49-
"config": {
50-
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
51-
"tasks.max": "1",
52-
"splunk.indexes": config["splunk_index"],
53-
"topics": _topic,
54-
"splunk.hec.ack.enabled": "false",
55-
"splunk.hec.uri": config["splunk_hec_url"],
56-
"splunk.hec.ssl.validate.certs": "false",
57-
"splunk.hec.token": config["splunk_token"],
58-
"splunk.sources": _connector
59-
}
60-
}
61-
create_kafka_connector(config, connector_content)
62-
connector_content_ack = {
63-
"name": _connector_ack,
64-
"config": {
65-
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
66-
"tasks.max": "1",
67-
"splunk.indexes": config["splunk_index"],
68-
"topics": _topic,
69-
"splunk.hec.ack.enabled": "true",
70-
"splunk.hec.uri": config["splunk_hec_url"],
71-
"splunk.hec.ssl.validate.certs": "false",
72-
"splunk.hec.token": config["splunk_token_ack"],
73-
"splunk.sources": _connector_ack
74-
}
75-
}
76-
create_kafka_connector(config, connector_content_ack)
77-
client = KafkaAdminClient(bootstrap_servers=config["kafka_broker_url"], client_id='test')
78-
broker_topics = client.list_topics()
79-
logger.info(broker_topics)
80-
if _topic not in broker_topics:
81-
create_kafka_topics(config, topics)
82-
producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"],
83-
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
84-
85-
for _ in range(num):
86-
msg = {"timestamp": _time_stamp}
87-
producer.send(_topic, msg)
88-
time.sleep(0.05)
89-
producer.flush()
90-
91-
92-
def upgrade_connector_plugin():
93-
cmds = ["sudo kill $(sudo lsof -t -i:8083) && sleep 2",
94-
f"sudo rm {config['connector_path']}/{config['old_connector_name']} && sleep 2",
95-
f"sudo cp {config['connector_build_target']}/splunk-kafka-connect*.jar {config['connector_path']} && sleep 2",
96-
f"sudo {config['kafka_home']}/bin/connect-distributed.sh {os.environ.get('GITHUB_WORKSPACE')}/config/connect-distributed-quickstart.properties &"]
97-
98-
cmd = "\n".join(cmds)
99-
try:
100-
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
101-
stderr=subprocess.STDOUT)
102-
output, error = proc.communicate()
103-
logger.debug(output)
104-
time.sleep(2)
105-
update_kafka_connectors()
106-
except OSError as e:
107-
logger.error(e)
108-
109-
110-
def update_kafka_connectors():
111-
logger.info("Update kafka connectors ...")
112-
connector_content = {
113-
"name": _connector,
114-
"config": {
115-
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
116-
"tasks.max": "1",
117-
"splunk.indexes": config["splunk_index"],
118-
"topics": _topic,
119-
"splunk.hec.ack.enabled": "false",
120-
"splunk.hec.uri": config["splunk_hec_url"],
121-
"splunk.hec.ssl.validate.certs": "false",
122-
"splunk.hec.token": config["splunk_token"],
123-
"splunk.sources": _connector,
124-
"splunk.hec.json.event.formatted": "true",
125-
"splunk.hec.raw": True
126-
}
127-
}
128-
create_kafka_connector(config, connector_content)
129-
connector_content_ack = {
130-
"name": _connector_ack,
131-
"config": {
132-
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
133-
"tasks.max": "1",
134-
"splunk.indexes": config["splunk_index"],
135-
"topics": _topic,
136-
"splunk.hec.ack.enabled": "true",
137-
"splunk.hec.uri": config["splunk_hec_url"],
138-
"splunk.hec.ssl.validate.certs": "false",
139-
"splunk.hec.token": config["splunk_token_ack"],
140-
"splunk.sources": _connector_ack,
141-
"splunk.hec.json.event.formatted": "true",
142-
"splunk.hec.raw": True
143-
}
144-
}
145-
create_kafka_connector(config, connector_content_ack)
146-
14728

14829
if __name__ == '__main__':
149-
logger.info("Start old Kafka connector ...")
150-
thread_old_connect = threading.Thread(target=start_old_connector, daemon=True)
151-
thread_old_connect.start()
152-
time.sleep(10)
153-
logger.info("Generate Kafka events ...")
154-
thread_gen = threading.Thread(target=generate_kafka_events, args=(2000,), daemon=True)
155-
thread_gen.start()
156-
time.sleep(50)
157-
logger.info("Upgrade Kafka connector ...")
158-
thread_upgrade = threading.Thread(target=upgrade_connector_plugin, daemon=True)
159-
thread_upgrade.start()
16030
time.sleep(100)
161-
search_query_1 = f"index={config['splunk_index']} | search source::{_connector}"
31+
search_query_1 = f"index={config['splunk_index']} | search source::{_connector} sourcetype::upgraded_test"
16232
logger.debug(search_query_1)
163-
events_1 = check_events_from_splunk(start_time="-24h@h",
33+
events_1 = check_events_from_splunk(start_time="-48h@h",
16434
url=config["splunkd_url"],
16535
user=config["splunk_user"],
16636
query=[f"search {search_query_1}"],
16737
password=config["splunk_password"])
168-
logger.info("Splunk received %s events in the last 24h", len(events_1))
38+
logger.info("Splunk received %s events", len(events_1))
16939
assert len(events_1) == 2000
170-
search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack}"
40+
search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack} sourcetype::upgraded_test"
17141
logger.debug(search_query_2)
172-
events_2 = check_events_from_splunk(start_time="-24h@h",
42+
events_2 = check_events_from_splunk(start_time="-48h@m",
17343
url=config["splunkd_url"],
17444
user=config["splunk_user"],
17545
query=[f"search {search_query_2}"],
17646
password=config["splunk_password"])
177-
logger.info("Splunk received %s events in the last 24h", len(events_2))
178-
assert len(events_2) == 2000
47+
logger.info("Splunk received %s events ", len(events_2))
48+
assert len(events_2) == 2000

0 commit comments

Comments
 (0)