Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public Void block(Duration m) {
return block();
}

/**
* This method is deliberately non-blocking regardless it is named as `.block`. The main intent to
* keep this method along with the {@link #subscribe()} is to eliminate redundancy which comes
* with a default block method implementation.
*/
@Override
@Nullable
public Void block() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public Void block(Duration m) {
return block();
}

/**
* This method is deliberately non-blocking regardless it is named as `.block`. The main intent to
* keep this method along with the {@link #subscribe()} is to eliminate redundancy which comes
* with a default block method implementation.
*/
@Override
@Nullable
public Void block() {
Expand All @@ -133,15 +138,16 @@ public Void block() {
try {
final boolean hasMetadata = p.hasMetadata();
metadata = p.metadata();
if (hasMetadata) {
if (!hasMetadata) {
lazyTerminate(STATE, this);
p.release();
throw new IllegalArgumentException("Metadata push does not support metadata field");
throw new IllegalArgumentException("Metadata push should have metadata field present");
}
if (!isValidMetadata(this.maxFrameLength, metadata)) {
lazyTerminate(STATE, this);
p.release();
throw new IllegalArgumentException("Too Big Payload size");
throw new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
}
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static io.rsocket.frame.FrameType.REQUEST_RESPONSE;
import static io.rsocket.frame.FrameType.REQUEST_STREAM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

Expand Down Expand Up @@ -81,7 +82,6 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -169,7 +169,7 @@ protected void hookOnSubscribe(Subscription subscription) {
public void testHandleSetupException() {
rule.connection.addToReceivedBuffer(
ErrorFrameCodec.encode(rule.alloc(), 0, new RejectedSetupException("boom")));
Assertions.assertThatThrownBy(() -> rule.socket.onClose().block())
assertThatThrownBy(() -> rule.socket.onClose().block())
.isInstanceOf(RejectedSetupException.class);
rule.assertHasNoLeaks();
}
Expand Down Expand Up @@ -373,6 +373,65 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
});
}

@ParameterizedTest
@ValueSource(ints = {128, 256, FRAME_LENGTH_MASK})
public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmentation1(
int maxFrameLength) {
rule.setMaxFrameLength(maxFrameLength);
prepareCalls()
.forEach(
generator -> {
byte[] metadata = new byte[maxFrameLength];
byte[] data = new byte[maxFrameLength];
ThreadLocalRandom.current().nextBytes(metadata);
ThreadLocalRandom.current().nextBytes(data);

assertThatThrownBy(
() -> {
final Publisher<?> source =
generator.apply(rule.socket, DefaultPayload.create(data, metadata));

if (source instanceof Mono) {
((Mono<?>) source).block();
} else {
((Flux) source).blockLast();
}
})
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(String.format(INVALID_PAYLOAD_ERROR_MESSAGE, maxFrameLength));

rule.assertHasNoLeaks();
});
}

@Test
public void shouldRejectCallOfNoMetadataPayload() {
final ByteBuf data = rule.allocator.buffer(10);
final Payload payload = ByteBufPayload.create(data);
StepVerifier.create(rule.socket.metadataPush(payload))
.expectSubscription()
.expectErrorSatisfies(
t ->
assertThat(t)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Metadata push should have metadata field present"))
.verify();
PayloadAssert.assertThat(payload).isReleased();
rule.assertHasNoLeaks();
}

@Test
public void shouldRejectCallOfNoMetadataPayloadBlocking() {
final ByteBuf data = rule.allocator.buffer(10);
final Payload payload = ByteBufPayload.create(data);

assertThatThrownBy(() -> rule.socket.metadataPush(payload).block())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Metadata push should have metadata field present");
PayloadAssert.assertThat(payload).isReleased();
rule.assertHasNoLeaks();
}

static Stream<BiFunction<RSocket, Payload, Publisher<?>>> prepareCalls() {
return Stream.of(
RSocket::fireAndForget,
Expand Down