2121with open (_config_path , 'r' ) as yaml_file :
2222 config = yaml .load (yaml_file )
2323now = datetime .now ()
24- _time_stamp = str (datetime .timestamp (now ))
25- _topic = 'kafka_connect_upgrade'
2624_connector = 'kafka_connect'
2725_connector_ack = 'kafka_connect_ack'
2826
2927
30- def start_old_connector ():
31- cmds = [f"test -f { config ['connector_path' ]} /{ config ['old_connector_name' ]} && echo { config ['connector_path' ]} /{ config ['old_connector_name' ]} " ,
32- f"cd { config ['kafka_home' ]} " ,
33- f"sudo { config ['kafka_home' ]} /bin/connect-distributed.sh { os .environ .get ('GITHUB_WORKSPACE' )} /config/connect-distributed-quickstart.properties &" ]
34-
35- cmd = "\n " .join (cmds )
36- try :
37- proc = subprocess .Popen (cmd , shell = True , stdout = subprocess .PIPE ,
38- stderr = subprocess .STDOUT )
39- proc .wait ()
40- except OSError as e :
41- logger .error (e )
42-
43-
44- def generate_kafka_events (num ):
45- # Generate message data
46- topics = [_topic ]
47- connector_content = {
48- "name" : _connector ,
49- "config" : {
50- "connector.class" : "com.splunk.kafka.connect.SplunkSinkConnector" ,
51- "tasks.max" : "1" ,
52- "splunk.indexes" : config ["splunk_index" ],
53- "topics" : _topic ,
54- "splunk.hec.ack.enabled" : "false" ,
55- "splunk.hec.uri" : config ["splunk_hec_url" ],
56- "splunk.hec.ssl.validate.certs" : "false" ,
57- "splunk.hec.token" : config ["splunk_token" ],
58- "splunk.sources" : _connector
59- }
60- }
61- create_kafka_connector (config , connector_content )
62- connector_content_ack = {
63- "name" : _connector_ack ,
64- "config" : {
65- "connector.class" : "com.splunk.kafka.connect.SplunkSinkConnector" ,
66- "tasks.max" : "1" ,
67- "splunk.indexes" : config ["splunk_index" ],
68- "topics" : _topic ,
69- "splunk.hec.ack.enabled" : "true" ,
70- "splunk.hec.uri" : config ["splunk_hec_url" ],
71- "splunk.hec.ssl.validate.certs" : "false" ,
72- "splunk.hec.token" : config ["splunk_token_ack" ],
73- "splunk.sources" : _connector_ack
74- }
75- }
76- create_kafka_connector (config , connector_content_ack )
77- client = KafkaAdminClient (bootstrap_servers = config ["kafka_broker_url" ], client_id = 'test' )
78- broker_topics = client .list_topics ()
79- logger .info (broker_topics )
80- if _topic not in broker_topics :
81- create_kafka_topics (config , topics )
82- producer = KafkaProducer (bootstrap_servers = config ["kafka_broker_url" ],
83- value_serializer = lambda v : json .dumps (v ).encode ('utf-8' ))
84-
85- for _ in range (num ):
86- msg = {"timestamp" : _time_stamp }
87- producer .send (_topic , msg )
88- time .sleep (0.05 )
89- producer .flush ()
90-
91-
92- def upgrade_connector_plugin ():
93- cmds = ["sudo kill $(sudo lsof -t -i:8083) && sleep 2" ,
94- f"sudo rm { config ['connector_path' ]} /{ config ['old_connector_name' ]} && sleep 2" ,
95- f"sudo cp { config ['connector_build_target' ]} /splunk-kafka-connect*.jar { config ['connector_path' ]} && sleep 2" ,
96- f"sudo { config ['kafka_home' ]} /bin/connect-distributed.sh { os .environ .get ('GITHUB_WORKSPACE' )} /config/connect-distributed-quickstart.properties &" ]
97-
98- cmd = "\n " .join (cmds )
99- try :
100- proc = subprocess .Popen (cmd , shell = True , stdout = subprocess .PIPE ,
101- stderr = subprocess .STDOUT )
102- output , error = proc .communicate ()
103- logger .debug (output )
104- time .sleep (2 )
105- update_kafka_connectors ()
106- except OSError as e :
107- logger .error (e )
108-
109-
110- def update_kafka_connectors ():
111- logger .info ("Update kafka connectors ..." )
112- connector_content = {
113- "name" : _connector ,
114- "config" : {
115- "connector.class" : "com.splunk.kafka.connect.SplunkSinkConnector" ,
116- "tasks.max" : "1" ,
117- "splunk.indexes" : config ["splunk_index" ],
118- "topics" : _topic ,
119- "splunk.hec.ack.enabled" : "false" ,
120- "splunk.hec.uri" : config ["splunk_hec_url" ],
121- "splunk.hec.ssl.validate.certs" : "false" ,
122- "splunk.hec.token" : config ["splunk_token" ],
123- "splunk.sources" : _connector ,
124- "splunk.hec.json.event.formatted" : "true" ,
125- "splunk.hec.raw" : True
126- }
127- }
128- create_kafka_connector (config , connector_content )
129- connector_content_ack = {
130- "name" : _connector_ack ,
131- "config" : {
132- "connector.class" : "com.splunk.kafka.connect.SplunkSinkConnector" ,
133- "tasks.max" : "1" ,
134- "splunk.indexes" : config ["splunk_index" ],
135- "topics" : _topic ,
136- "splunk.hec.ack.enabled" : "true" ,
137- "splunk.hec.uri" : config ["splunk_hec_url" ],
138- "splunk.hec.ssl.validate.certs" : "false" ,
139- "splunk.hec.token" : config ["splunk_token_ack" ],
140- "splunk.sources" : _connector_ack ,
141- "splunk.hec.json.event.formatted" : "true" ,
142- "splunk.hec.raw" : True
143- }
144- }
145- create_kafka_connector (config , connector_content_ack )
146-
14728
14829if __name__ == '__main__' :
149- logger .info ("Start old Kafka connector ..." )
150- thread_old_connect = threading .Thread (target = start_old_connector , daemon = True )
151- thread_old_connect .start ()
152- time .sleep (10 )
153- logger .info ("Generate Kafka events ..." )
154- thread_gen = threading .Thread (target = generate_kafka_events , args = (2000 ,), daemon = True )
155- thread_gen .start ()
156- time .sleep (50 )
157- logger .info ("Upgrade Kafka connector ..." )
158- thread_upgrade = threading .Thread (target = upgrade_connector_plugin , daemon = True )
159- thread_upgrade .start ()
16030 time .sleep (100 )
161- search_query_1 = f"index={ config ['splunk_index' ]} | search timestamp= \" { _time_stamp } \" source::{ _connector } "
31+ search_query_1 = f"index={ config ['splunk_index' ]} | search source::{ _connector } sourcetype::upgraded_test "
16232 logger .debug (search_query_1 )
163- events_1 = check_events_from_splunk (start_time = "-15m@m " ,
33+ events_1 = check_events_from_splunk (start_time = "-48h@h " ,
16434 url = config ["splunkd_url" ],
16535 user = config ["splunk_user" ],
16636 query = [f"search { search_query_1 } " ],
16737 password = config ["splunk_password" ])
168- logger .info ("Splunk received %s events in the last 15m " , len (events_1 ))
38+ logger .info ("Splunk received %s events" , len (events_1 ))
16939 assert len (events_1 ) == 2000
170- search_query_2 = f"index={ config ['splunk_index' ]} | search timestamp= \" { _time_stamp } \" source::{ _connector_ack } "
40+ search_query_2 = f"index={ config ['splunk_index' ]} | search source::{ _connector_ack } sourcetype::upgraded_test "
17141 logger .debug (search_query_2 )
172- events_2 = check_events_from_splunk (start_time = "-15m @m" ,
42+ events_2 = check_events_from_splunk (start_time = "-48h @m" ,
17343 url = config ["splunkd_url" ],
17444 user = config ["splunk_user" ],
17545 query = [f"search { search_query_2 } " ],
17646 password = config ["splunk_password" ])
177- logger .info ("Splunk received %s events in the last 15m " , len (events_2 ))
178- assert len (events_2 ) == 2000
47+ logger .info ("Splunk received %s events " , len (events_2 ))
48+ assert len (events_2 ) == 2000
0 commit comments