Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.core;

import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
import static io.rsocket.core.PayloadValidationUtils.isValid;
import static io.rsocket.core.SendUtils.sendReleasingPayload;
import static io.rsocket.core.StateUtils.*;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.IllegalReferenceCountException;
import io.rsocket.Payload;
import io.rsocket.frame.FrameType;
import io.rsocket.internal.UnboundedProcessor;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;

final class FireAndForgetRequesterMono extends Mono<Void> implements Subscription, Scannable {

volatile long state;

static final AtomicLongFieldUpdater<FireAndForgetRequesterMono> STATE =
AtomicLongFieldUpdater.newUpdater(FireAndForgetRequesterMono.class, "state");

final Payload payload;

final ByteBufAllocator allocator;
final int mtu;
final int maxFrameLength;
final RequesterResponderSupport requesterResponderSupport;
final UnboundedProcessor<ByteBuf> sendProcessor;

FireAndForgetRequesterMono(Payload payload, RequesterResponderSupport requesterResponderSupport) {
this.allocator = requesterResponderSupport.getAllocator();
this.payload = payload;
this.mtu = requesterResponderSupport.getMtu();
this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
this.requesterResponderSupport = requesterResponderSupport;
this.sendProcessor = requesterResponderSupport.getSendProcessor();
}

@Override
public void subscribe(CoreSubscriber<? super Void> actual) {
long previousState = markSubscribed(STATE, this);
if (isSubscribedOrTerminated(previousState)) {
Operators.error(
actual, new IllegalStateException("FireAndForgetMono allows only a single Subscriber"));
return;
}

actual.onSubscribe(this);

final Payload p = this.payload;
int mtu = this.mtu;
try {
if (!isValid(mtu, this.maxFrameLength, p, false)) {
lazyTerminate(STATE, this);
p.release();
actual.onError(
new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength)));
return;
}
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);
actual.onError(e);
return;
}

final int streamId;
try {
streamId = this.requesterResponderSupport.getNextStreamId();
} catch (Throwable t) {
lazyTerminate(STATE, this);
p.release();
actual.onError(Exceptions.unwrap(t));
return;
}

try {
if (isTerminated(this.state)) {
p.release();
return;
}

sendReleasingPayload(
streamId, FrameType.REQUEST_FNF, mtu, p, this.sendProcessor, this.allocator, true);
} catch (Throwable e) {
lazyTerminate(STATE, this);
actual.onError(e);
return;
}

lazyTerminate(STATE, this);
actual.onComplete();
}

@Override
public void request(long n) {
// no ops
}

@Override
public void cancel() {
markTerminated(STATE, this);
}

@Override
@Nullable
public Void block(Duration m) {
return block();
}

@Override
@Nullable
public Void block() {
long previousState = markSubscribed(STATE, this);
if (isSubscribedOrTerminated(previousState)) {
throw new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
}

final Payload p = this.payload;
try {
if (!isValid(this.mtu, this.maxFrameLength, p, false)) {
lazyTerminate(STATE, this);
p.release();
throw new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
}
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);
throw Exceptions.propagate(e);
}

final int streamId;
try {
streamId = this.requesterResponderSupport.getNextStreamId();
} catch (Throwable t) {
lazyTerminate(STATE, this);
p.release();
throw Exceptions.propagate(t);
}

try {
sendReleasingPayload(
streamId,
FrameType.REQUEST_FNF,
this.mtu,
this.payload,
this.sendProcessor,
this.allocator,
true);
} catch (Throwable e) {
lazyTerminate(STATE, this);
throw Exceptions.propagate(e);
}

lazyTerminate(STATE, this);
return null;
}

@Override
public Object scanUnsafe(Scannable.Attr key) {
return null; // no particular key to be represented, still useful in hooks
}

@Override
@NonNull
public String stepName() {
return "source(FireAndForgetMono)";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.decoder.PayloadDecoder;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;

final class FireAndForgetResponderSubscriber
implements CoreSubscriber<Void>, ResponderFrameHandler {

static final Logger logger = LoggerFactory.getLogger(FireAndForgetResponderSubscriber.class);

static final FireAndForgetResponderSubscriber INSTANCE = new FireAndForgetResponderSubscriber();

final int streamId;
final ByteBufAllocator allocator;
final PayloadDecoder payloadDecoder;
final RequesterResponderSupport requesterResponderSupport;
final RSocket handler;
final int maxInboundPayloadSize;

CompositeByteBuf frames;

private FireAndForgetResponderSubscriber() {
this.streamId = 0;
this.allocator = null;
this.payloadDecoder = null;
this.maxInboundPayloadSize = 0;
this.requesterResponderSupport = null;
this.handler = null;
this.frames = null;
}

FireAndForgetResponderSubscriber(
int streamId,
ByteBuf firstFrame,
RequesterResponderSupport requesterResponderSupport,
RSocket handler) {
this.streamId = streamId;
this.allocator = requesterResponderSupport.getAllocator();
this.payloadDecoder = requesterResponderSupport.getPayloadDecoder();
this.maxInboundPayloadSize = requesterResponderSupport.getMaxInboundPayloadSize();
this.requesterResponderSupport = requesterResponderSupport;
this.handler = handler;

this.frames =
ReassemblyUtils.addFollowingFrame(
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
}

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Void voidVal) {}

@Override
public void onError(Throwable t) {
logger.debug("Dropped Outbound error", t);
}

@Override
public void onComplete() {}

@Override
public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLastPayload) {
final CompositeByteBuf frames =
ReassemblyUtils.addFollowingFrame(this.frames, followingFrame, this.maxInboundPayloadSize);

if (!hasFollows) {
this.requesterResponderSupport.remove(this.streamId, this);
this.frames = null;

Payload payload;
try {
payload = this.payloadDecoder.apply(frames);
frames.release();
} catch (Throwable t) {
ReferenceCountUtil.safeRelease(frames);
logger.debug("Reassembly has failed", t);
return;
}

Mono<Void> source = this.handler.fireAndForget(payload);
source.subscribe(this);
}
}

@Override
public final void handleCancel() {
final CompositeByteBuf frames = this.frames;
if (frames != null) {
this.frames = null;
this.requesterResponderSupport.remove(this.streamId, this);
frames.release();
}
}
}
Loading