Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions .github/workflows/ci_build_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ jobs:
- name: Test kafka connect upgrade
run: |
echo "Download kafka connect "$CI_OLD_CONNECTOR_VERSION
# Summary for the test
#1)We will deploy old kafka connector and create 2 tasks for that to check ack and without ack functionality
#2)then we will remove that old kafka connector and deploy new kafka connector with updation of two tasks
#3) Here in the updation we will check for the new functionality("splunk.hec.json.event.formatted" and "splunk.hec.raw") so that we can check if we can successfully upgrade the connector
#4)At last we will check if we have recieved 2000 events for both the tasks
sudo mkdir -p /usr/local/share/kafka/plugins/
wget https://github.com/splunk/kafka-connect-splunk/releases/download/$CI_OLD_CONNECTOR_VERSION/splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar
sudo cp splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar /usr/local/share/kafka/plugins/
Expand All @@ -218,7 +223,94 @@ jobs:
pip install -r test/requirements.txt
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
echo "Test kafka connect upgrade ..."
source $GITHUB_WORKSPACE/test/config.sh
test -f $connector_path/$old_connector_name && echo $connector_path /$old_connector_name
# Starting Old Connector
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
sleep 20
# Creating the two tasks (with ack and without ack)
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "kafka_connect",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "false",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token"'" ,
"splunk.sources": "kafka_connect",
"splunk.hec.raw": "true",
"splunk.sourcetypes":"upgraded_test"
}
}'
sleep 5
curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "kafka_connect_ack",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "true",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token_ack"'" ,
"splunk.sources": "kafka_connect_ack",
"splunk.hec.raw": "true",
"splunk.sourcetypes":"upgraded_test"
}
}'
sleep 5
# Generating 1000 events
python test/lib/eventproducer_connector_upgrade.py 1000 --log-level=INFO
sudo kill $(sudo lsof -t -i:8083) && sleep 2
sudo rm $connector_path/$old_connector_name && sleep 2
sudo cp $connector_build_target/splunk-kafka-connect*.jar $connector_path && sleep 2
# Starting New Connector
sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 &
# Updating the two tasks (with ack and without ack)
sleep 10
curl ${kafka_connect_url}/connectors/kafka_connect/config -X PUT -H "Content-Type: application/json" -d '{
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "false",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token"'" ,
"splunk.sources": "kafka_connect",
"splunk.hec.raw": "false",
"splunk.hec.json.event.formatted": "true",
"splunk.sourcetypes":"upgraded_test"
}'
sleep 5
curl ${kafka_connect_url}/connectors/kafka_connect_ack/config -X PUT -H "Content-Type: application/json" -d '{
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": "'"$splunk_index"'",
"topics": "kafka_connect_upgrade",
"splunk.hec.ack.enabled": "true",
"splunk.hec.uri": "'"$splunk_hec_url"'",
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": "'"$splunk_token_ack"'" ,
"splunk.sources": "kafka_connect_ack",
"splunk.hec.raw": "false",
"splunk.hec.json.event.formatted": "true",
"splunk.sourcetypes":"upgraded_test"
}'
sleep 5
# Generating 1000 events
python test/lib/eventproducer_connector_upgrade.py 2000 --log-level=INFO
# Check in splunk that we have recieved 2000 events for with ack and without ack tasks
python test/lib/connector_upgrade.py --log-level=INFO
- uses: actions/upload-artifact@v3
if: failure()
with:
name: kafka-connect-logs-${{ matrix.kafka_version }}
path: output.log

- name: Install kafka connect
run: |
Expand All @@ -234,3 +326,9 @@ jobs:
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
echo "Running functional tests....."
python -m pytest --log-level=INFO

- uses: actions/upload-artifact@v3
if: failure()
with:
name: splunk-events-${{ matrix.kafka_version }}
path: events.txt
18 changes: 18 additions & 0 deletions test/config.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export splunkd_url=https://127.0.0.1:8089
export splunk_hec_url=https://127.0.0.1:8088
export splunk_user=admin
export splunk_password=helloworld
export splunk_index=main
export splunk_token=a6b5e77f-d5f6-415a-bd43-930cecb12959
export splunk_token_ack=a6b5e77f-d5f6-415a-bd43-930cecb12950
export kafka_broker_url=127.0.0.1:9092
export kafka_connect_url=http://127.0.0.1:8083
export kafka_topic=test-datagen
export kafka_topic_2=kafka_topic_2
export kafka_header_topic=kafka_header_topic
export kafka_header_index=kafka
export connector_path=/usr/local/share/kafka/plugins
export connector_build_target=/usr/local/share/kafka-connector
export kafka_home=/usr/local/kafka
export kafka_connect_home=/home/circleci/repo
export old_connector_name=splunk-kafka-connect-v2.0.1.jar
15 changes: 15 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from lib.connect_params import *

from kafka.producer import KafkaProducer
from lib.commonsplunk import check_events_from_splunk
from lib.helper import get_test_folder
from lib.data_gen import generate_connector_content
import pytest
Expand Down Expand Up @@ -89,3 +90,17 @@ def pytest_unconfigure():
# Delete launched connectors
for param in connect_params:
delete_kafka_connector(config, param)

def pytest_sessionfinish(session, exitstatus):
if exitstatus != 0:
search_query = "index=*"
logger.info(search_query)
events = check_events_from_splunk(start_time="-24h@h",
url=setup["splunkd_url"],
user=setup["splunk_user"],
query=[f"search {search_query}"],
password=setup["splunk_password"])
myfile = open('events.txt', 'w+')
for i in events:
myfile.write("%s\n" % i)
myfile.close()
5 changes: 2 additions & 3 deletions test/lib/commonkafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ def delete_kafka_connector(setup, connector):
return False


def get_kafka_connector_tasks(setup, params):
def get_kafka_connector_tasks(setup, params, sleepDuration=0):
'''
Get kafka connect connector tasks using kafka connect REST API
'''

time.sleep(sleepDuration)
t_end = time.time() + 10
while time.time() < t_end:
response = requests.get(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/tasks",
Expand All @@ -96,7 +96,6 @@ def get_kafka_connector_tasks(setup, params):

return 0


def get_kafka_connector_status(setup, params, action, state):
'''
Get kafka connect connector tasks using kafka connect REST API
Expand Down
144 changes: 7 additions & 137 deletions test/lib/connector_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,158 +21,28 @@
with open(_config_path, 'r') as yaml_file:
config = yaml.load(yaml_file)
now = datetime.now()
_time_stamp = str(datetime.timestamp(now))
_topic = 'kafka_connect_upgrade'
_connector = 'kafka_connect'
_connector_ack = 'kafka_connect_ack'


def start_old_connector():
cmds = [f"test -f {config['connector_path']}/{config['old_connector_name']} && echo {config['connector_path']}/{config['old_connector_name']}",
f"cd {config['kafka_home']}",
f"sudo {config['kafka_home']}/bin/connect-distributed.sh {os.environ.get('GITHUB_WORKSPACE')}/config/connect-distributed-quickstart.properties &"]

cmd = "\n".join(cmds)
try:
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
proc.wait()
except OSError as e:
logger.error(e)


def generate_kafka_events(num):
# Generate message data
topics = [_topic]
connector_content = {
"name": _connector,
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": config["splunk_index"],
"topics": _topic,
"splunk.hec.ack.enabled": "false",
"splunk.hec.uri": config["splunk_hec_url"],
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": config["splunk_token"],
"splunk.sources": _connector
}
}
create_kafka_connector(config, connector_content)
connector_content_ack = {
"name": _connector_ack,
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": config["splunk_index"],
"topics": _topic,
"splunk.hec.ack.enabled": "true",
"splunk.hec.uri": config["splunk_hec_url"],
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": config["splunk_token_ack"],
"splunk.sources": _connector_ack
}
}
create_kafka_connector(config, connector_content_ack)
client = KafkaAdminClient(bootstrap_servers=config["kafka_broker_url"], client_id='test')
broker_topics = client.list_topics()
logger.info(broker_topics)
if _topic not in broker_topics:
create_kafka_topics(config, topics)
producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for _ in range(num):
msg = {"timestamp": _time_stamp}
producer.send(_topic, msg)
time.sleep(0.05)
producer.flush()


def upgrade_connector_plugin():
cmds = ["sudo kill $(sudo lsof -t -i:8083) && sleep 2",
f"sudo rm {config['connector_path']}/{config['old_connector_name']} && sleep 2",
f"sudo cp {config['connector_build_target']}/splunk-kafka-connect*.jar {config['connector_path']} && sleep 2",
f"sudo {config['kafka_home']}/bin/connect-distributed.sh {os.environ.get('GITHUB_WORKSPACE')}/config/connect-distributed-quickstart.properties &"]

cmd = "\n".join(cmds)
try:
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output, error = proc.communicate()
logger.debug(output)
time.sleep(2)
update_kafka_connectors()
except OSError as e:
logger.error(e)


def update_kafka_connectors():
logger.info("Update kafka connectors ...")
connector_content = {
"name": _connector,
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": config["splunk_index"],
"topics": _topic,
"splunk.hec.ack.enabled": "false",
"splunk.hec.uri": config["splunk_hec_url"],
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": config["splunk_token"],
"splunk.sources": _connector,
"splunk.hec.json.event.formatted": "true",
"splunk.hec.raw": True
}
}
create_kafka_connector(config, connector_content)
connector_content_ack = {
"name": _connector_ack,
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"splunk.indexes": config["splunk_index"],
"topics": _topic,
"splunk.hec.ack.enabled": "true",
"splunk.hec.uri": config["splunk_hec_url"],
"splunk.hec.ssl.validate.certs": "false",
"splunk.hec.token": config["splunk_token_ack"],
"splunk.sources": _connector_ack,
"splunk.hec.json.event.formatted": "true",
"splunk.hec.raw": True
}
}
create_kafka_connector(config, connector_content_ack)


if __name__ == '__main__':
logger.info("Start old Kafka connector ...")
thread_old_connect = threading.Thread(target=start_old_connector, daemon=True)
thread_old_connect.start()
time.sleep(10)
logger.info("Generate Kafka events ...")
thread_gen = threading.Thread(target=generate_kafka_events, args=(2000,), daemon=True)
thread_gen.start()
time.sleep(50)
logger.info("Upgrade Kafka connector ...")
thread_upgrade = threading.Thread(target=upgrade_connector_plugin, daemon=True)
thread_upgrade.start()
time.sleep(100)
search_query_1 = f"index={config['splunk_index']} | search timestamp=\"{_time_stamp}\" source::{_connector}"
search_query_1 = f"index={config['splunk_index']} | search source::{_connector} sourcetype::upgraded_test"
logger.debug(search_query_1)
events_1 = check_events_from_splunk(start_time="-15m@m",
events_1 = check_events_from_splunk(start_time="-48h@h",
url=config["splunkd_url"],
user=config["splunk_user"],
query=[f"search {search_query_1}"],
password=config["splunk_password"])
logger.info("Splunk received %s events in the last 15m", len(events_1))
logger.info("Splunk received %s events", len(events_1))
assert len(events_1) == 2000
search_query_2 = f"index={config['splunk_index']} | search timestamp=\"{_time_stamp}\" source::{_connector_ack}"
search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack} sourcetype::upgraded_test"
logger.debug(search_query_2)
events_2 = check_events_from_splunk(start_time="-15m@m",
events_2 = check_events_from_splunk(start_time="-48h@m",
url=config["splunkd_url"],
user=config["splunk_user"],
query=[f"search {search_query_2}"],
password=config["splunk_password"])
logger.info("Splunk received %s events in the last 15m", len(events_2))
assert len(events_2) == 2000
logger.info("Splunk received %s events ", len(events_2))
assert len(events_2) == 2000
Loading