Skip to content

Commit c4f56e3

Browse files
committed
simplifies validation utils. puts error message to constant field
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 0f113a3 commit c4f56e3

File tree

7 files changed

+45
-49
lines changed

7 files changed

+45
-49
lines changed

rsocket-core/src/main/java/io/rsocket/core/DefaultClientRSocketFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ public Mono<RSocket> start() {
331331
payloadDecoder,
332332
errorConsumer,
333333
StreamIdSupplier.clientSupplier(),
334+
mtu,
334335
keepAliveTickPeriod(),
335336
keepAliveTimeout(),
336337
keepAliveHandler,
@@ -379,7 +380,8 @@ public Mono<RSocket> start() {
379380
wrappedRSocketHandler,
380381
payloadDecoder,
381382
errorConsumer,
382-
responderLeaseHandler);
383+
responderLeaseHandler,
384+
mtu);
383385

384386
return wrappedConnection
385387
.sendOne(setupFrame)

rsocket-core/src/main/java/io/rsocket/core/DefaultServerRSocketFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ private Mono<Void> acceptSetup(
281281
payloadDecoder,
282282
errorConsumer,
283283
StreamIdSupplier.serverSupplier(),
284+
mtu,
284285
setupPayload.keepAliveInterval(),
285286
setupPayload.keepAliveMaxLifetime(),
286287
keepAliveHandler,
@@ -317,7 +318,8 @@ private Mono<Void> acceptSetup(
317318
wrappedRSocketHandler,
318319
payloadDecoder,
319320
errorConsumer,
320-
responderLeaseHandler);
321+
responderLeaseHandler,
322+
mtu);
321323
})
322324
.doFinally(signalType -> setupPayload.release())
323325
.then();
Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,32 @@
11
package io.rsocket.core;
22

3-
import io.netty.buffer.ByteBuf;
43
import io.rsocket.Payload;
54
import io.rsocket.frame.FrameHeaderFlyweight;
65
import io.rsocket.frame.FrameLengthFlyweight;
76

87
final class PayloadValidationUtils {
9-
public static boolean isValid(int mtu, Payload payload) {
10-
return payload.hasMetadata()
11-
? isValid(mtu, payload.data(), payload.metadata())
12-
: isValid(mtu, payload.data());
13-
}
8+
static final String INVALID_PAYLOAD_ERROR_MESSAGE =
9+
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.";
1410

15-
public static boolean isValid(int mtu, ByteBuf data) {
16-
return mtu > 0
17-
|| (((FrameHeaderFlyweight.size()
18-
+ data.readableBytes()
19-
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE)
20-
& ~FrameLengthFlyweight.FRAME_LENGTH_MASK)
21-
== 0);
22-
}
11+
static boolean isValid(int mtu, Payload payload) {
12+
if (mtu > 0) {
13+
return true;
14+
}
2315

24-
public static boolean isValid(int mtu, ByteBuf data, ByteBuf metadata) {
25-
return mtu > 0
26-
|| (((FrameHeaderFlyweight.size()
27-
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE
28-
+ FrameHeaderFlyweight.size()
29-
+ data.readableBytes()
30-
+ metadata.readableBytes())
31-
& ~FrameLengthFlyweight.FRAME_LENGTH_MASK)
32-
== 0);
16+
if (payload.hasMetadata()) {
17+
return (((FrameHeaderFlyweight.size()
18+
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE
19+
+ FrameHeaderFlyweight.size()
20+
+ payload.data().readableBytes()
21+
+ payload.metadata().readableBytes())
22+
& ~FrameLengthFlyweight.FRAME_LENGTH_MASK)
23+
== 0);
24+
} else {
25+
return (((FrameHeaderFlyweight.size()
26+
+ payload.data().readableBytes()
27+
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE)
28+
& ~FrameLengthFlyweight.FRAME_LENGTH_MASK)
29+
== 0);
30+
}
3331
}
3432
}

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.core;
1818

19+
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
1920
import static io.rsocket.keepalive.KeepAliveSupport.ClientKeepAliveSupport;
2021
import static io.rsocket.keepalive.KeepAliveSupport.KeepAlive;
2122

@@ -191,9 +192,7 @@ private Mono<Void> handleFireAndForget(Payload payload) {
191192

192193
if (!PayloadValidationUtils.isValid(this.mtu, payload)) {
193194
payload.release();
194-
return Mono.error(
195-
new IllegalArgumentException(
196-
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory."));
195+
return Mono.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
197196
}
198197

199198
final int streamId = streamIdSupplier.nextStreamId(receivers);
@@ -222,9 +221,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
222221

223222
if (!PayloadValidationUtils.isValid(this.mtu, payload)) {
224223
payload.release();
225-
return Mono.error(
226-
new IllegalArgumentException(
227-
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory."));
224+
return Mono.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
228225
}
229226

230227
int streamId = streamIdSupplier.nextStreamId(receivers);
@@ -274,9 +271,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
274271

275272
if (!PayloadValidationUtils.isValid(this.mtu, payload)) {
276273
payload.release();
277-
return Flux.error(
278-
new IllegalArgumentException(
279-
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory."));
274+
return Flux.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
280275
}
281276

282277
int streamId = streamIdSupplier.nextStreamId(receivers);
@@ -344,8 +339,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
344339
if (!PayloadValidationUtils.isValid(mtu, payload)) {
345340
payload.release();
346341
final IllegalArgumentException t =
347-
new IllegalArgumentException(
348-
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.");
342+
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
349343
errorConsumer.accept(t);
350344
return Mono.error(t);
351345
}
@@ -384,8 +378,7 @@ protected void hookOnNext(Payload payload) {
384378
payload.release();
385379
cancel();
386380
final IllegalArgumentException t =
387-
new IllegalArgumentException(
388-
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.");
381+
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
389382
errorConsumer.accept(t);
390383
// no need to send any errors.
391384
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
@@ -480,9 +473,7 @@ private Mono<Void> handleMetadataPush(Payload payload) {
480473

481474
if (!PayloadValidationUtils.isValid(this.mtu, payload)) {
482475
payload.release();
483-
return Mono.error(
484-
new IllegalArgumentException(
485-
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory."));
476+
return Mono.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
486477
}
487478

488479
return UnicastMonoEmpty.newInstance(

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.rsocket.core;
1818

19+
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
20+
1921
import io.netty.buffer.ByteBuf;
2022
import io.netty.buffer.ByteBufAllocator;
2123
import io.netty.util.ReferenceCountUtil;
@@ -379,8 +381,7 @@ protected void hookOnNext(Payload payload) {
379381
payload.release();
380382
cancel();
381383
final IllegalArgumentException t =
382-
new IllegalArgumentException(
383-
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.");
384+
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
384385
handleError(streamId, t);
385386
return;
386387
}
@@ -435,8 +436,7 @@ protected void hookOnNext(Payload payload) {
435436
payload.release();
436437
cancel();
437438
final IllegalArgumentException t =
438-
new IllegalArgumentException(
439-
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.");
439+
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
440440
handleError(streamId, t);
441441
return;
442442
}

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.core;
1818

19+
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
1920
import static io.rsocket.frame.FrameHeaderFlyweight.frameType;
2021
import static io.rsocket.frame.FrameType.CANCEL;
2122
import static io.rsocket.frame.FrameType.KEEPALIVE;
@@ -37,6 +38,7 @@
3738
import io.netty.buffer.ByteBufAllocator;
3839
import io.netty.util.CharsetUtil;
3940
import io.rsocket.Payload;
41+
import io.rsocket.RSocket;
4042
import io.rsocket.exceptions.ApplicationErrorException;
4143
import io.rsocket.exceptions.RejectedSetupException;
4244
import io.rsocket.frame.CancelFrameFlyweight;
@@ -300,7 +302,7 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
300302
t ->
301303
Assertions.assertThat(t)
302304
.isInstanceOf(IllegalArgumentException.class)
303-
.hasMessage("The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory."))
305+
.hasMessage(INVALID_PAYLOAD_ERROR_MESSAGE))
304306
.verify();
305307
});
306308
}
@@ -327,7 +329,7 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
327329
t ->
328330
Assertions.assertThat(t)
329331
.isInstanceOf(IllegalArgumentException.class)
330-
.hasMessage("The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory."))
332+
.hasMessage(INVALID_PAYLOAD_ERROR_MESSAGE))
331333
.verify();
332334
}
333335

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.core;
1818

19+
import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
1920
import static io.rsocket.frame.FrameHeaderFlyweight.frameType;
2021
import static org.hamcrest.MatcherAssert.assertThat;
2122
import static org.hamcrest.Matchers.*;
@@ -153,12 +154,12 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
153154
Assertions.assertThat(rule.errors)
154155
.first()
155156
.isInstanceOf(IllegalArgumentException.class)
156-
.hasToString("java.lang.IllegalArgumentException: The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.");
157+
.hasToString("java.lang.IllegalArgumentException: " + INVALID_PAYLOAD_ERROR_MESSAGE);
157158
Assertions.assertThat(rule.connection.getSent())
158159
.hasSize(1)
159160
.first()
160161
.matches(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
161-
.matches(bb -> ErrorFrameFlyweight.dataUtf8(bb).contains("The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory."));
162+
.matches(bb -> ErrorFrameFlyweight.dataUtf8(bb).contains(INVALID_PAYLOAD_ERROR_MESSAGE));
162163

163164
assertThat("Subscription not cancelled.", cancelled.get(), is(true));
164165
rule.init();

0 commit comments

Comments
 (0)