Skip to content

Commit 00a5651

Browse files
authored
Merge branch 'segmentio:main' into feature/supporting_static_membership
2 parents 41ad7f7 + ba6f442 commit 00a5651

File tree

4 files changed

+32
-11
lines changed

4 files changed

+32
-11
lines changed

conn.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,11 +1022,17 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
10221022
}
10231023

10241024
func makeBrokers(brokers map[int32]Broker, ids ...int32) []Broker {
1025-
b := make([]Broker, 0, len(ids))
1026-
for _, id := range ids {
1027-
if br, ok := brokers[id]; ok {
1028-
b = append(b, br)
1025+
b := make([]Broker, len(ids))
1026+
for i, id := range ids {
1027+
br, ok := brokers[id]
1028+
if !ok {
1029+
// When the broker id isn't found in the current list of known
1030+
// brokers, use a placeholder to report that the cluster has
1031+
// logical knowledge of the broker but no information about the
1032+
// physical host where it is running.
1033+
br.ID = int(id)
10291034
}
1035+
b[i] = br
10301036
}
10311037
return b
10321038
}

conn_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,16 +1358,19 @@ func TestMakeBrokersAllPresent(t *testing.T) {
13581358
func TestMakeBrokersOneMissing(t *testing.T) {
13591359
brokers := make(map[int32]Broker)
13601360
brokers[1] = Broker{ID: 1, Host: "203.0.113.101", Port: 9092}
1361-
brokers[3] = Broker{ID: 1, Host: "203.0.113.103", Port: 9092}
1361+
brokers[3] = Broker{ID: 3, Host: "203.0.113.103", Port: 9092}
13621362

13631363
b := makeBrokers(brokers, 1, 2, 3)
1364-
if len(b) != 2 {
1365-
t.Errorf("Expected 2 brokers, got %d", len(b))
1364+
if len(b) != 3 {
1365+
t.Errorf("Expected 3 brokers, got %d", len(b))
13661366
}
13671367
if b[0] != brokers[1] {
13681368
t.Errorf("Expected broker 1 at index 0, got %d", b[0].ID)
13691369
}
1370-
if b[1] != brokers[3] {
1371-
t.Errorf("Expected broker 3 at index 1, got %d", b[1].ID)
1370+
if b[1] != (Broker{ID: 2}) {
1371+
t.Errorf("Expected broker 2 at index 1, got %d", b[1].ID)
1372+
}
1373+
if b[2] != brokers[3] {
1374+
t.Errorf("Expected broker 3 at index 1, got %d", b[2].ID)
13721375
}
13731376
}

error.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,6 @@ func makeError(code int16, message string) error {
684684
// // handle other errors
685685
// ...
686686
// }
687-
//
688687
type WriteErrors []error
689688

690689
// Count counts the number of non-nil errors in err.
@@ -701,5 +700,13 @@ func (err WriteErrors) Count() int {
701700
}
702701

703702
func (err WriteErrors) Error() string {
704-
return fmt.Sprintf("kafka write errors (%d/%d)", err.Count(), len(err))
703+
errCount := err.Count()
704+
errors := make([]string, 0, errCount)
705+
for _, writeError := range err {
706+
if writeError == nil {
707+
continue
708+
}
709+
errors = append(errors, writeError.Error())
710+
}
711+
return fmt.Sprintf("Kafka write errors (%d/%d), errors: %v", errCount, len(err), errors)
705712
}

kafka.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ type Partition struct {
3838
ID int
3939

4040
// Leader, replicas, and ISR for the partition.
41+
//
42+
// When no physical host is known to be running a broker, the Host and Port
43+
// fields will be set to the zero values. The logical broker ID is always
44+
// set to the value known to the kafka cluster, even if the broker is not
45+
// currently backed by a physical host.
4146
Leader Broker
4247
Replicas []Broker
4348
Isr []Broker

0 commit comments

Comments
 (0)