From 21152253cedbbf9db04c9a179d46e9d5ddfaa858 Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Tue, 23 Aug 2022 21:24:08 +0530 Subject: [PATCH 1/3] Stabilize Test kafka connect upgrade --- .github/workflows/ci_build_test.yaml | 86 +++++++++++++++++++++ test/config.sh | 18 +++++ test/lib/connector_upgrade.py | 14 ++-- test/lib/eventproducer_connector_upgrade.py | 52 +++++++++++++ 4 files changed, 163 insertions(+), 7 deletions(-) create mode 100644 test/config.sh create mode 100644 test/lib/eventproducer_connector_upgrade.py diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index 52c5930c..fbc8883e 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -218,7 +218,93 @@ jobs: pip install -r test/requirements.txt export PYTHONWARNINGS="ignore:Unverified HTTPS request" echo "Test kafka connect upgrade ..." + source $GITHUB_WORKSPACE/test/config.sh + test -f $connector_path/$old_connector_name && echo $connector_path /$old_connector_name + sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 & + sleep 20 + curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ + "name": "kafka_connect", + "config": { + "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", + "tasks.max": "1", + "splunk.indexes": "'"$splunk_index"'", + "topics": "kafka_connect_upgrade", + "splunk.hec.ack.enabled": "false", + "splunk.hec.uri": "'"$splunk_hec_url"'", + "splunk.hec.ssl.validate.certs": "false", + "splunk.hec.token": "'"$splunk_token"'" , + "splunk.flush.window": "1", + "splunk.sources": "kafka_connect" + } + }' + sleep 10 + curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ + "name": "kafka_connect_ack", + "config": { + "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", + "tasks.max": "1", + "splunk.indexes": "'"$splunk_index"'", + "topics": "kafka_connect_upgrade", + "splunk.hec.ack.enabled": "true", + "splunk.hec.uri": "'"$splunk_hec_url"'", + "splunk.hec.ssl.validate.certs": "false", + "splunk.hec.token": "'"$splunk_token_ack"'" , + "splunk.flush.window": "1", + "splunk.sources": "kafka_connect_ack" + } + }' + sleep 5 + python test/lib/eventproducer_connector_upgrade.py --log-level=INFO + curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect" + curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect_ack" + sudo kill $(sudo lsof -t -i:8083) && sleep 2 + sudo rm $connector_path/$old_connector_name && sleep 2 + sudo cp $connector_build_target/splunk-kafka-connect*.jar $connector_path && sleep 2 + sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 & + sleep 10 + curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ + "name": "kafka_connect", + "config": { + "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", + "tasks.max": "1", + "splunk.indexes": "'"$splunk_index"'", + "topics": "kafka_connect_upgrade", + "splunk.hec.ack.enabled": "false", + "splunk.hec.uri": "'"$splunk_hec_url"'", + "splunk.hec.ssl.validate.certs": "false", + "splunk.hec.token": "'"$splunk_token"'" , + "splunk.sources": "kafka_connect", + "splunk.hec.json.event.formatted": "true", + "splunk.flush.window": "1", + "splunk.hec.raw": "false" + } + }' + sleep 10 + curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ + "name": "kafka_connect_ack", + "config": { + "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", + "tasks.max": "1", + "splunk.indexes": "'"$splunk_index"'", + "topics": "kafka_connect_upgrade", + "splunk.hec.ack.enabled": "true", + "splunk.hec.uri": "'"$splunk_hec_url"'", + "splunk.hec.ssl.validate.certs": "false", + "splunk.hec.token": "'"$splunk_token_ack"'" , + "splunk.sources": "kafka_connect_ack", + "splunk.hec.json.event.formatted": "true", + "splunk.flush.window": "1", + "splunk.hec.raw": "false" + } + }' + sleep 5 + python test/lib/eventproducer_connector_upgrade.py --log-level=INFO python test/lib/connector_upgrade.py --log-level=INFO + - uses: actions/upload-artifact@v3 + if: always() + with: + name: kafka-connect-logs + path: output.log - name: Install kafka connect run: | diff --git a/test/config.sh b/test/config.sh new file mode 100644 index 00000000..dc52c68a --- /dev/null +++ b/test/config.sh @@ -0,0 +1,18 @@ +export splunkd_url=https://127.0.0.1:8089 +export splunk_hec_url=https://127.0.0.1:8088 +export splunk_user=admin +export splunk_password=helloworld +export splunk_index=main +export splunk_token=a6b5e77f-d5f6-415a-bd43-930cecb12959 +export splunk_token_ack=a6b5e77f-d5f6-415a-bd43-930cecb12950 +export kafka_broker_url=127.0.0.1:9092 +export kafka_connect_url=http://127.0.0.1:8083 +export kafka_topic=test-datagen +export kafka_topic_2=kafka_topic_2 +export kafka_header_topic=kafka_header_topic +export kafka_header_index=kafka +export connector_path=/usr/local/share/kafka/plugins +export connector_build_target=/usr/local/share/kafka-connector +export kafka_home=/usr/local/kafka +export kafka_connect_home=/home/circleci/repo +export old_connector_name=splunk-kafka-connect-v2.0.1.jar \ No newline at end of file diff --git a/test/lib/connector_upgrade.py b/test/lib/connector_upgrade.py index 564ec88e..fdc19463 100644 --- a/test/lib/connector_upgrade.py +++ b/test/lib/connector_upgrade.py @@ -158,21 +158,21 @@ def update_kafka_connectors(): thread_upgrade = threading.Thread(target=upgrade_connector_plugin, daemon=True) thread_upgrade.start() time.sleep(100) - search_query_1 = f"index={config['splunk_index']} | search timestamp=\"{_time_stamp}\" source::{_connector}" + search_query_1 = f"index={config['splunk_index']} | search source::{_connector}" logger.debug(search_query_1) - events_1 = check_events_from_splunk(start_time="-15m@m", + events_1 = check_events_from_splunk(start_time="-24h@h", url=config["splunkd_url"], user=config["splunk_user"], query=[f"search {search_query_1}"], password=config["splunk_password"]) - logger.info("Splunk received %s events in the last 15m", len(events_1)) + logger.info("Splunk received %s events in the last 24h", len(events_1)) assert len(events_1) == 2000 - search_query_2 = f"index={config['splunk_index']} | search timestamp=\"{_time_stamp}\" source::{_connector_ack}" + search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack}" logger.debug(search_query_2) - events_2 = check_events_from_splunk(start_time="-15m@m", + events_2 = check_events_from_splunk(start_time="-24h@h", url=config["splunkd_url"], user=config["splunk_user"], query=[f"search {search_query_2}"], password=config["splunk_password"]) - logger.info("Splunk received %s events in the last 15m", len(events_2)) - assert len(events_2) == 2000 + logger.info("Splunk received %s events in the last 24h", len(events_2)) + assert len(events_2) == 2000 \ No newline at end of file diff --git a/test/lib/eventproducer_connector_upgrade.py b/test/lib/eventproducer_connector_upgrade.py new file mode 100644 index 00000000..f66f9e0f --- /dev/null +++ b/test/lib/eventproducer_connector_upgrade.py @@ -0,0 +1,52 @@ +from kafka.producer import KafkaProducer +import sys +import os +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) + +from lib.commonsplunk import check_events_from_splunk +from lib.commonkafka import * +from lib.helper import * +from datetime import datetime +import threading +import logging.config +import yaml +import subprocess +import logging +import time + +logging.config.fileConfig(os.path.join(get_test_folder(), "logging.conf")) +logger = logging.getLogger('connector_upgrade') + +_config_path = os.path.join(get_test_folder(), 'config.yaml') +with open(_config_path, 'r') as yaml_file: + config = yaml.load(yaml_file) +now = datetime.now() +_time_stamp = str(datetime.timestamp(now)) +_topic = 'kafka_connect_upgrade' + + +def generate_kafka_events(num): + topics = [_topic] + client = KafkaAdminClient(bootstrap_servers=config["kafka_broker_url"], client_id='test') + broker_topics = client.list_topics() + logger.info(broker_topics) + if _topic not in broker_topics: + create_kafka_topics(config, topics) + producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], + value_serializer=lambda v: json.dumps(v).encode('utf-8')) + + for _ in range(num): + msg = {"timestamp": _time_stamp} + producer.send(_topic, msg) + time.sleep(0.05) + producer.flush() + + + +if __name__ == '__main__': + + time.sleep(20) + logger.info("Generate Kafka events ...") + thread_gen = threading.Thread(target=generate_kafka_events, args=(1000,), daemon=True) + thread_gen.start() + time.sleep(100) \ No newline at end of file From 61ce47da2da96b6bc873297025224a260a781721 Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Mon, 29 Aug 2022 11:01:09 +0530 Subject: [PATCH 2/3] Updated tasks and stabilizing crud test --- .github/workflows/ci_build_test.yaml | 62 +++++---- test/conftest.py | 15 ++ test/lib/commonkafka.py | 5 +- test/lib/connector_upgrade.py | 144 +------------------- test/lib/eventproducer_connector_upgrade.py | 27 +++- test/testcases/test_crud.py | 2 +- 6 files changed, 83 insertions(+), 172 deletions(-) diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index fbc8883e..ff347f71 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -205,6 +205,11 @@ jobs: - name: Test kafka connect upgrade run: | echo "Download kafka connect "$CI_OLD_CONNECTOR_VERSION + # Summary for the test + #1)We will deploy old kafka connector and create 2 tasks for that to check ack and without ack functionality + #2)then we will remove that old kafka connector and deploy new kafka connector with updation of two tasks + #3) Here in the updation we will check for the new functionality("splunk.hec.json.event.formatted" and "splunk.hec.raw") so that we can check if we can successfully upgrade the connector + #4)At last we will check if we have recieved 2000 events for both the tasks sudo mkdir -p /usr/local/share/kafka/plugins/ wget https://github.com/splunk/kafka-connect-splunk/releases/download/$CI_OLD_CONNECTOR_VERSION/splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar sudo cp splunk-kafka-connect-$CI_OLD_CONNECTOR_VERSION.jar /usr/local/share/kafka/plugins/ @@ -220,8 +225,10 @@ jobs: echo "Test kafka connect upgrade ..." source $GITHUB_WORKSPACE/test/config.sh test -f $connector_path/$old_connector_name && echo $connector_path /$old_connector_name + # Starting Old Connector sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 & sleep 20 + # Creating the two tasks (with ack and without ack) curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ "name": "kafka_connect", "config": { @@ -233,11 +240,12 @@ jobs: "splunk.hec.uri": "'"$splunk_hec_url"'", "splunk.hec.ssl.validate.certs": "false", "splunk.hec.token": "'"$splunk_token"'" , - "splunk.flush.window": "1", - "splunk.sources": "kafka_connect" + "splunk.sources": "kafka_connect", + "splunk.hec.raw": "true", + "splunk.sourcetypes":"upgraded_test" } }' - sleep 10 + sleep 5 curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ "name": "kafka_connect_ack", "config": { @@ -248,23 +256,23 @@ jobs: "splunk.hec.ack.enabled": "true", "splunk.hec.uri": "'"$splunk_hec_url"'", "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": "'"$splunk_token_ack"'" , - "splunk.flush.window": "1", - "splunk.sources": "kafka_connect_ack" + "splunk.hec.token": "'"$splunk_token_ack"'" , + "splunk.sources": "kafka_connect_ack", + "splunk.hec.raw": "true", + "splunk.sourcetypes":"upgraded_test" } }' sleep 5 - python test/lib/eventproducer_connector_upgrade.py --log-level=INFO - curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect" - curl -s -XDELETE "${kafka_connect_url}/connectors/kafka_connect_ack" + # Generating 1000 events + python test/lib/eventproducer_connector_upgrade.py 1000 --log-level=INFO sudo kill $(sudo lsof -t -i:8083) && sleep 2 sudo rm $connector_path/$old_connector_name && sleep 2 sudo cp $connector_build_target/splunk-kafka-connect*.jar $connector_path && sleep 2 + # Starting New Connector sudo $kafka_home/bin/connect-distributed.sh $GITHUB_WORKSPACE/config/connect-distributed-quickstart.properties > output.log 2>&1 & + # Updating the two tasks (with ack and without ack) sleep 10 - curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ - "name": "kafka_connect", - "config": { + curl ${kafka_connect_url}/connectors/kafka_connect/config -X PUT -H "Content-Type: application/json" -d '{ "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", "tasks.max": "1", "splunk.indexes": "'"$splunk_index"'", @@ -274,15 +282,12 @@ jobs: "splunk.hec.ssl.validate.certs": "false", "splunk.hec.token": "'"$splunk_token"'" , "splunk.sources": "kafka_connect", + "splunk.hec.raw": "false", "splunk.hec.json.event.formatted": "true", - "splunk.flush.window": "1", - "splunk.hec.raw": "false" - } + "splunk.sourcetypes":"upgraded_test" }' - sleep 10 - curl ${kafka_connect_url}/connectors -X POST -H "Content-Type: application/json" -d '{ - "name": "kafka_connect_ack", - "config": { + sleep 5 + curl ${kafka_connect_url}/connectors/kafka_connect_ack/config -X PUT -H "Content-Type: application/json" -d '{ "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", "tasks.max": "1", "splunk.indexes": "'"$splunk_index"'", @@ -292,18 +297,19 @@ jobs: "splunk.hec.ssl.validate.certs": "false", "splunk.hec.token": "'"$splunk_token_ack"'" , "splunk.sources": "kafka_connect_ack", + "splunk.hec.raw": "false", "splunk.hec.json.event.formatted": "true", - "splunk.flush.window": "1", - "splunk.hec.raw": "false" - } + "splunk.sourcetypes":"upgraded_test" }' sleep 5 - python test/lib/eventproducer_connector_upgrade.py --log-level=INFO + # Generating 1000 events + python test/lib/eventproducer_connector_upgrade.py 2000 --log-level=INFO + # Check in splunk that we have recieved 2000 events for with ack and without ack tasks python test/lib/connector_upgrade.py --log-level=INFO - uses: actions/upload-artifact@v3 - if: always() + if: failure() with: - name: kafka-connect-logs + name: kafka-connect-logs-${{ matrix.kafka_version }} path: output.log - name: Install kafka connect @@ -320,3 +326,9 @@ jobs: export PYTHONWARNINGS="ignore:Unverified HTTPS request" echo "Running functional tests....." python -m pytest --log-level=INFO + + - uses: actions/upload-artifact@v3 + if: failure() + with: + name: splunk-events-${{ matrix.kafka_version }} + path: events.txt \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index ad3d0ad1..a043e28e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -17,6 +17,7 @@ from lib.connect_params import * from kafka.producer import KafkaProducer +from lib.commonsplunk import check_events_from_splunk from lib.helper import get_test_folder from lib.data_gen import generate_connector_content import pytest @@ -89,3 +90,17 @@ def pytest_unconfigure(): # Delete launched connectors for param in connect_params: delete_kafka_connector(config, param) + +def pytest_sessionfinish(session, exitstatus): + if exitstatus != 0: + search_query = f"index={setup['splunk_index']}" + logger.info(search_query) + events = check_events_from_splunk(start_time="-24h@h", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=[f"search {search_query}"], + password=setup["splunk_password"]) + myfile = open('events.txt', 'w+') + for i in events: + myfile.write("%s\n" % i) + myfile.close() \ No newline at end of file diff --git a/test/lib/commonkafka.py b/test/lib/commonkafka.py index dc101b2e..d55f82f5 100644 --- a/test/lib/commonkafka.py +++ b/test/lib/commonkafka.py @@ -81,11 +81,11 @@ def delete_kafka_connector(setup, connector): return False -def get_kafka_connector_tasks(setup, params): +def get_kafka_connector_tasks(setup, params, sleepDuration=0): ''' Get kafka connect connector tasks using kafka connect REST API ''' - + time.sleep(sleepDuration) t_end = time.time() + 10 while time.time() < t_end: response = requests.get(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/tasks", @@ -96,7 +96,6 @@ def get_kafka_connector_tasks(setup, params): return 0 - def get_kafka_connector_status(setup, params, action, state): ''' Get kafka connect connector tasks using kafka connect REST API diff --git a/test/lib/connector_upgrade.py b/test/lib/connector_upgrade.py index fdc19463..90cce3e6 100644 --- a/test/lib/connector_upgrade.py +++ b/test/lib/connector_upgrade.py @@ -21,158 +21,28 @@ with open(_config_path, 'r') as yaml_file: config = yaml.load(yaml_file) now = datetime.now() -_time_stamp = str(datetime.timestamp(now)) -_topic = 'kafka_connect_upgrade' _connector = 'kafka_connect' _connector_ack = 'kafka_connect_ack' -def start_old_connector(): - cmds = [f"test -f {config['connector_path']}/{config['old_connector_name']} && echo {config['connector_path']}/{config['old_connector_name']}", - f"cd {config['kafka_home']}", - f"sudo {config['kafka_home']}/bin/connect-distributed.sh {os.environ.get('GITHUB_WORKSPACE')}/config/connect-distributed-quickstart.properties &"] - - cmd = "\n".join(cmds) - try: - proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - proc.wait() - except OSError as e: - logger.error(e) - - -def generate_kafka_events(num): - # Generate message data - topics = [_topic] - connector_content = { - "name": _connector, - "config": { - "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", - "tasks.max": "1", - "splunk.indexes": config["splunk_index"], - "topics": _topic, - "splunk.hec.ack.enabled": "false", - "splunk.hec.uri": config["splunk_hec_url"], - "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": config["splunk_token"], - "splunk.sources": _connector - } - } - create_kafka_connector(config, connector_content) - connector_content_ack = { - "name": _connector_ack, - "config": { - "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", - "tasks.max": "1", - "splunk.indexes": config["splunk_index"], - "topics": _topic, - "splunk.hec.ack.enabled": "true", - "splunk.hec.uri": config["splunk_hec_url"], - "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": config["splunk_token_ack"], - "splunk.sources": _connector_ack - } - } - create_kafka_connector(config, connector_content_ack) - client = KafkaAdminClient(bootstrap_servers=config["kafka_broker_url"], client_id='test') - broker_topics = client.list_topics() - logger.info(broker_topics) - if _topic not in broker_topics: - create_kafka_topics(config, topics) - producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], - value_serializer=lambda v: json.dumps(v).encode('utf-8')) - - for _ in range(num): - msg = {"timestamp": _time_stamp} - producer.send(_topic, msg) - time.sleep(0.05) - producer.flush() - - -def upgrade_connector_plugin(): - cmds = ["sudo kill $(sudo lsof -t -i:8083) && sleep 2", - f"sudo rm {config['connector_path']}/{config['old_connector_name']} && sleep 2", - f"sudo cp {config['connector_build_target']}/splunk-kafka-connect*.jar {config['connector_path']} && sleep 2", - f"sudo {config['kafka_home']}/bin/connect-distributed.sh {os.environ.get('GITHUB_WORKSPACE')}/config/connect-distributed-quickstart.properties &"] - - cmd = "\n".join(cmds) - try: - proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - output, error = proc.communicate() - logger.debug(output) - time.sleep(2) - update_kafka_connectors() - except OSError as e: - logger.error(e) - - -def update_kafka_connectors(): - logger.info("Update kafka connectors ...") - connector_content = { - "name": _connector, - "config": { - "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", - "tasks.max": "1", - "splunk.indexes": config["splunk_index"], - "topics": _topic, - "splunk.hec.ack.enabled": "false", - "splunk.hec.uri": config["splunk_hec_url"], - "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": config["splunk_token"], - "splunk.sources": _connector, - "splunk.hec.json.event.formatted": "true", - "splunk.hec.raw": True - } - } - create_kafka_connector(config, connector_content) - connector_content_ack = { - "name": _connector_ack, - "config": { - "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", - "tasks.max": "1", - "splunk.indexes": config["splunk_index"], - "topics": _topic, - "splunk.hec.ack.enabled": "true", - "splunk.hec.uri": config["splunk_hec_url"], - "splunk.hec.ssl.validate.certs": "false", - "splunk.hec.token": config["splunk_token_ack"], - "splunk.sources": _connector_ack, - "splunk.hec.json.event.formatted": "true", - "splunk.hec.raw": True - } - } - create_kafka_connector(config, connector_content_ack) - if __name__ == '__main__': - logger.info("Start old Kafka connector ...") - thread_old_connect = threading.Thread(target=start_old_connector, daemon=True) - thread_old_connect.start() - time.sleep(10) - logger.info("Generate Kafka events ...") - thread_gen = threading.Thread(target=generate_kafka_events, args=(2000,), daemon=True) - thread_gen.start() - time.sleep(50) - logger.info("Upgrade Kafka connector ...") - thread_upgrade = threading.Thread(target=upgrade_connector_plugin, daemon=True) - thread_upgrade.start() time.sleep(100) - search_query_1 = f"index={config['splunk_index']} | search source::{_connector}" + search_query_1 = f"index={config['splunk_index']} | search source::{_connector} sourcetype::upgraded_test" logger.debug(search_query_1) - events_1 = check_events_from_splunk(start_time="-24h@h", + events_1 = check_events_from_splunk(start_time="-48h@h", url=config["splunkd_url"], user=config["splunk_user"], query=[f"search {search_query_1}"], password=config["splunk_password"]) - logger.info("Splunk received %s events in the last 24h", len(events_1)) + logger.info("Splunk received %s events", len(events_1)) assert len(events_1) == 2000 - search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack}" + search_query_2 = f"index={config['splunk_index']} | search source::{_connector_ack} sourcetype::upgraded_test" logger.debug(search_query_2) - events_2 = check_events_from_splunk(start_time="-24h@h", + events_2 = check_events_from_splunk(start_time="-48h@m", url=config["splunkd_url"], user=config["splunk_user"], query=[f"search {search_query_2}"], password=config["splunk_password"]) - logger.info("Splunk received %s events in the last 24h", len(events_2)) - assert len(events_2) == 2000 \ No newline at end of file + logger.info("Splunk received %s events ", len(events_2)) + assert len(events_2) == 2000 \ No newline at end of file diff --git a/test/lib/eventproducer_connector_upgrade.py b/test/lib/eventproducer_connector_upgrade.py index f66f9e0f..5eede3ba 100644 --- a/test/lib/eventproducer_connector_upgrade.py +++ b/test/lib/eventproducer_connector_upgrade.py @@ -15,7 +15,7 @@ import time logging.config.fileConfig(os.path.join(get_test_folder(), "logging.conf")) -logger = logging.getLogger('connector_upgrade') +logger = logging.getLogger('eventproducer_connector_upgrade') _config_path = os.path.join(get_test_folder(), 'config.yaml') with open(_config_path, 'r') as yaml_file: @@ -25,7 +25,23 @@ _topic = 'kafka_connect_upgrade' +def check_events_from_topic(target): + + t_end = time.time() + 100 + time.sleep(5) + while time.time() < t_end: + output1 = subprocess.getoutput(" echo $(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 'localhost:9092' --topic kafka_connect_upgrade --time -1 | grep -e ':[[:digit:]]*:' | awk -F ':' '{sum += $3} END {print sum}')") + output2 = subprocess.getoutput("echo $(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 'localhost:9092' --topic kafka_connect_upgrade --time -2 | grep -e ':[[:digit:]]*:' | awk -F ':' '{sum += $3} END {print sum}')") + time.sleep(5) + if (int(output1)-int(output2))==target: + logger.info("Events in the topic :" + str(int(output1)-int(output2))) + break + elif (int(output1)-int(output2))>2000: + logger.info("Events in the topic :" + str(int(output1)-int(output2))) + logger.info("Events in the topic :" + str(int(output1)-int(output2))) + def generate_kafka_events(num): + # Generate message data topics = [_topic] client = KafkaAdminClient(bootstrap_servers=config["kafka_broker_url"], client_id='test') broker_topics = client.list_topics() @@ -35,18 +51,17 @@ def generate_kafka_events(num): producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], value_serializer=lambda v: json.dumps(v).encode('utf-8')) - for _ in range(num): - msg = {"timestamp": _time_stamp} + for i in range(num): + msg = f'timestamp={_time_stamp} count={i+1}\n' producer.send(_topic, msg) time.sleep(0.05) producer.flush() - - if __name__ == '__main__': time.sleep(20) logger.info("Generate Kafka events ...") thread_gen = threading.Thread(target=generate_kafka_events, args=(1000,), daemon=True) thread_gen.start() - time.sleep(100) \ No newline at end of file + check_events_from_topic(int(sys.argv[1])) + time.sleep(50) \ No newline at end of file diff --git a/test/testcases/test_crud.py b/test/testcases/test_crud.py index 0ff5e771..59798137 100644 --- a/test/testcases/test_crud.py +++ b/test/testcases/test_crud.py @@ -68,7 +68,7 @@ def test_valid_crud_tasks(self, setup, test_input, expected): assert update_kafka_connector(setup, connector_definition) == expected # Validate get tasks - tasks = get_kafka_connector_tasks(setup, connector_definition) + tasks = get_kafka_connector_tasks(setup, connector_definition,10) assert tasks == int(connector_definition["config"]["tasks.max"]) # Validate pause task From 25436267470dee683668e44e5368044af01d4e6e Mon Sep 17 00:00:00 2001 From: foram-splunk Date: Mon, 29 Aug 2022 14:03:41 +0530 Subject: [PATCH 3/3] Modify search query --- test/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/conftest.py b/test/conftest.py index a043e28e..68061e21 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -93,7 +93,7 @@ def pytest_unconfigure(): def pytest_sessionfinish(session, exitstatus): if exitstatus != 0: - search_query = f"index={setup['splunk_index']}" + search_query = "index=*" logger.info(search_query) events = check_events_from_splunk(start_time="-24h@h", url=setup["splunkd_url"],