File tree Expand file tree Collapse file tree 1 file changed +2
-18
lines changed Expand file tree Collapse file tree 1 file changed +2
-18
lines changed Original file line number Diff line number Diff line change 6
6
import pytest
7
7
from kafka .vendor import six
8
8
9
- from kafka import SimpleClient
10
9
from kafka .conn import ConnectionStates
11
10
from kafka .consumer .group import KafkaConsumer
12
11
from kafka .coordinator .base import MemberState , Generation
@@ -20,25 +19,10 @@ def get_connect_str(kafka_broker):
20
19
return kafka_broker .host + ':' + str (kafka_broker .port )
21
20
22
21
23
- @pytest .fixture
24
- def simple_client (kafka_broker ):
25
- return SimpleClient (get_connect_str (kafka_broker ))
26
-
27
-
28
- @pytest .fixture
29
- def topic (simple_client ):
30
- topic = random_string (5 )
31
- simple_client .ensure_topic_exists (topic )
32
- return topic
33
-
34
-
35
22
@pytest .mark .skipif (not version (), reason = "No KAFKA_VERSION set" )
36
- def test_consumer (kafka_broker , version ):
37
-
23
+ def test_consumer (kafka_broker , topic , version ):
24
+ # The `topic` fixture is included because
38
25
# 0.8.2 brokers need a topic to function well
39
- if version >= (0 , 8 , 2 ) and version < (0 , 9 ):
40
- topic (simple_client (kafka_broker ))
41
-
42
26
consumer = KafkaConsumer (bootstrap_servers = get_connect_str (kafka_broker ))
43
27
consumer .poll (500 )
44
28
assert len (consumer ._client ._conns ) > 0
You can’t perform that action at this time.
0 commit comments