Skip to content

Commit f261200

Browse files
authored
netty:Fix Netty composite buffer merging to be compatible with Netty 4.1.111 (#11294) (#11303)
* Use addComponent instead of addFlattenedComponent and do not append to components that are composites.
1 parent 2fd95b8 commit f261200

File tree

2 files changed

+66
-47
lines changed

2 files changed

+66
-47
lines changed

netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,18 @@
2323
import io.netty.buffer.CompositeByteBuf;
2424
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;
2525

26+
27+
/**
28+
* "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
29+
* compose strategies.
30+
* <br><br>
31+
*
32+
* <p><b><font color="red">Avoid using</font></b>
33+
* {@link CompositeByteBuf#addFlattenedComponents(boolean, ByteBuf)} as it can lead
34+
* to corruption, where the components' readable area are not equal to the Composite's capacity
35+
* (see https://github.com/netty/netty/issues/12844).
36+
*/
37+
2638
class NettyAdaptiveCumulator implements Cumulator {
2739
private final int composeMinSize;
2840

@@ -83,8 +95,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
8395
composite.capacity(composite.writerIndex());
8496
}
8597
} else {
86-
composite = alloc.compositeBuffer(Integer.MAX_VALUE)
87-
.addFlattenedComponents(true, cumulation);
98+
composite = alloc.compositeBuffer(Integer.MAX_VALUE).addComponent(true, cumulation);
8899
}
89100
addInput(alloc, composite, in);
90101
in = null;
@@ -104,7 +115,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
104115
@VisibleForTesting
105116
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
106117
if (shouldCompose(composite, in, composeMinSize)) {
107-
composite.addFlattenedComponents(true, in);
118+
composite.addComponent(true, in);
108119
} else {
109120
// The total size of the new data and the last component are below the threshold. Merge them.
110121
mergeWithCompositeTail(alloc, composite, in);
@@ -150,31 +161,13 @@ static void mergeWithCompositeTail(
150161
ByteBuf tail = composite.component(tailComponentIndex);
151162
ByteBuf newTail = null;
152163
try {
153-
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) {
164+
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()
165+
&& !isCompositeOrWrappedComposite(tail)) {
154166
// Ideal case: the tail isn't shared, and can be expanded to the required capacity.
167+
155168
// Take ownership of the tail.
156169
newTail = tail.retain();
157170

158-
// TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with
159-
// the issue fixed.
160-
// In certain cases, removing the CompositeByteBuf component, and then adding it back
161-
// isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844.
162-
// This happens because the buffer returned by composite.component() has out-of-sync
163-
// indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying
164-
// buffer, but doesn't set the indexes.
165-
//
166-
// To get the right indexes we use the fact that composite.internalComponent() returns
167-
// the slice() into the readable portion of the underlying buffer.
168-
// We use this implementation detail (internalComponent() returning a *SlicedByteBuf),
169-
// and combine it with the fact that SlicedByteBuf duplicates have their indexes
170-
// adjusted so they correspond to the to the readable portion of the slice.
171-
//
172-
// Hence composite.internalComponent().duplicate() returns a buffer with the
173-
// indexes that should've been on the composite.component() in the first place.
174-
// Until the issue is fixed, we manually adjust the indexes of the removed component.
175-
ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate();
176-
newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex());
177-
178171
/*
179172
* The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
180173
*
@@ -188,20 +181,26 @@ static void mergeWithCompositeTail(
188181
* as pronounced because the capacity is doubled with each reallocation.
189182
*/
190183
newTail.writeBytes(in);
184+
191185
} else {
192-
// The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
186+
// The tail satisfies one or more criteria:
187+
// - Shared
188+
// - Not expandable
189+
// - Composite
190+
// - Wrapped Composite
193191
newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
194192
newTail.setBytes(0, composite, tailStart, tailSize)
195193
.setBytes(tailSize, in, in.readerIndex(), inputSize)
196194
.writerIndex(newTailSize);
197195
in.readerIndex(in.writerIndex());
198196
}
197+
199198
// Store readerIndex to avoid out of bounds writerIndex during component replacement.
200199
int prevReader = composite.readerIndex();
201200
// Remove the old tail, reset writer index.
202201
composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
203202
// Add back the new tail.
204-
composite.addFlattenedComponents(true, newTail);
203+
composite.addComponent(true, newTail);
205204
// New tail's ownership transferred to the composite buf.
206205
newTail = null;
207206
composite.readerIndex(prevReader);
@@ -216,4 +215,12 @@ static void mergeWithCompositeTail(
216215
}
217216
}
218217
}
218+
219+
private static boolean isCompositeOrWrappedComposite(ByteBuf tail) {
220+
ByteBuf cur = tail;
221+
while (cur.unwrap() != null) {
222+
cur = cur.unwrap();
223+
}
224+
return cur instanceof CompositeByteBuf;
225+
}
219226
}

netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void setUp() {
8181
@Override
8282
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
8383
// To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose
84-
composite.addFlattenedComponents(true, in);
84+
composite.addComponent(true, in);
8585
}
8686
};
8787

@@ -208,8 +208,8 @@ public void setUp() {
208208
in = ByteBufUtil.writeAscii(alloc, inData);
209209
tail = ByteBufUtil.writeAscii(alloc, tailData);
210210
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
211-
// Note that addFlattenedComponents() will not add a new component when tail is not readable.
212-
composite.addFlattenedComponents(true, tail);
211+
// Note that addComponent() will not add a new component when tail is not readable.
212+
composite.addComponent(true, tail);
213213
}
214214

215215
@After
@@ -345,7 +345,7 @@ public void mergeWithCompositeTail_tailExpandable_write() {
345345
assertThat(in.readableBytes()).isAtMost(tail.writableBytes());
346346

347347
// All fits, so tail capacity must stay the same.
348-
composite.addFlattenedComponents(true, tail);
348+
composite.addComponent(true, tail);
349349
assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity);
350350
}
351351

@@ -362,7 +362,7 @@ public void mergeWithCompositeTail_tailExpandable_fastWrite() {
362362
alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE);
363363

364364
// Tail capacity is extended to its fast capacity.
365-
composite.addFlattenedComponents(true, tail);
365+
composite.addComponent(true, tail);
366366
assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity);
367367
}
368368

@@ -372,7 +372,7 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() {
372372
@SuppressWarnings("InlineMeInliner") // Requires Java 11
373373
String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1);
374374
int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length();
375-
composite.addFlattenedComponents(true, tail);
375+
composite.addComponent(true, tail);
376376

377377
// Make input larger than tailFastCapacity
378378
in.writeCharSequence(inSuffixOverFastBytes, US_ASCII);
@@ -435,21 +435,21 @@ public void mergeWithCompositeTail_tailNotExpandable_maxCapacityReached() {
435435
@SuppressWarnings("InlineMeInliner") // Requires Java 11
436436
String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes());
437437
tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII);
438-
composite.addFlattenedComponents(true, tail);
438+
composite.addComponent(true, tail);
439439
assertTailReplaced();
440440
}
441441

442442
@Test
443443
public void mergeWithCompositeTail_tailNotExpandable_shared() {
444444
tail.retain();
445-
composite.addFlattenedComponents(true, tail);
445+
composite.addComponent(true, tail);
446446
assertTailReplaced();
447447
tail.release();
448448
}
449449

450450
@Test
451451
public void mergeWithCompositeTail_tailNotExpandable_readOnly() {
452-
composite.addFlattenedComponents(true, tail.asReadOnly());
452+
composite.addComponent(true, tail.asReadOnly());
453453
assertTailReplaced();
454454
}
455455

@@ -527,8 +527,7 @@ public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() {
527527
CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
528528
tail) {
529529
@Override
530-
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
531-
ByteBuf buffer) {
530+
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
532531
throw expectedError;
533532
}
534533
};
@@ -561,8 +560,7 @@ public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() {
561560
CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
562561
tail.asReadOnly()) {
563562
@Override
564-
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
565-
ByteBuf buffer) {
563+
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
566564
throw expectedError;
567565
}
568566
};
@@ -616,14 +614,14 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
616614
ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII));
617615

618616
// Start with a regular cumulation and add the buf as the only component.
619-
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf);
617+
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addComponent(true, buf);
620618
// Read composite1 buf to the beginning of the numbers.
621619
assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---");
622620

623621
// Wrap composite1 into another cumulation. This is similar to
624622
// what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1.
625623
CompositeByteBuf composite2 =
626-
alloc.compositeBuffer(8).addFlattenedComponents(true, composite1);
624+
alloc.compositeBuffer(8).addComponent(true, composite1);
627625
assertThat(composite2.toString(US_ASCII)).isEqualTo("01234");
628626

629627
// The previous operation does not adjust the read indexes of the underlying buffers,
@@ -639,13 +637,27 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
639637
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2,
640638
ByteBufUtil.writeAscii(alloc, "56789"));
641639
assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789");
640+
}
641+
642+
@Test
643+
public void mergeWithNonCompositeTail() {
644+
NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024);
645+
ByteBufAllocator alloc = new PooledByteBufAllocator();
646+
ByteBuf buf = alloc.buffer().writeBytes("tail".getBytes(US_ASCII));
647+
ByteBuf in = alloc.buffer().writeBytes("-012345".getBytes(US_ASCII));
648+
649+
CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, buf);
642650

643-
// Correctness check: we still have a single component, and this component is still the
644-
// original underlying buffer.
645-
assertThat(cumulation.numComponents()).isEqualTo(1);
646-
// Replace '2' with '*', and '8' with '$'.
647-
buf.setByte(5, '*').setByte(11, '$');
648-
assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9");
651+
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite, in);
652+
653+
assertEquals("tail-012345", cumulation.toString(US_ASCII));
654+
assertEquals(0, in.refCnt());
655+
assertEquals(1, cumulation.numComponents());
656+
657+
buf.setByte(2, '*').setByte(7, '$');
658+
assertEquals("ta*l-01$345", cumulation.toString(US_ASCII));
659+
660+
composite.release();
649661
}
650662
}
651663
}

0 commit comments

Comments
 (0)