From f07d471f85f5ab78cd73c7ce2b015fa33e76ee12 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Sat, 26 Mar 2016 22:40:56 -0700 Subject: [PATCH 1/4] created a reactive socket interface, pointed rxjava2 to the specific version of rxjava2 to get the tests to work, updated Agrona to 0.4.13 --- .../io/reactivesocket/ReactiveSocket.java | 524 +---------------- .../io/reactivesocket/ReactiveSocketImpl.java | 527 ++++++++++++++++++ 2 files changed, 529 insertions(+), 522 deletions(-) create mode 100644 src/main/java/io/reactivesocket/ReactiveSocketImpl.java diff --git a/src/main/java/io/reactivesocket/ReactiveSocket.java b/src/main/java/io/reactivesocket/ReactiveSocket.java index 5d9ca68b3..9efa69723 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -1,527 +1,7 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package io.reactivesocket; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import io.reactivesocket.internal.Requester; -import io.reactivesocket.internal.Responder; -import io.reactivesocket.internal.rx.CompositeCompletable; -import io.reactivesocket.internal.rx.CompositeDisposable; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Disposable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import uk.co.real_logic.agrona.BitUtil; - -import static io.reactivesocket.LeaseGovernor.NULL_LEASE_GOVERNOR; - /** - * Interface for a connection that supports sending requests and receiving responses - * - * Created by servers for connections Created on demand for clients + * Created by rroeser on 3/25/16. */ -public class ReactiveSocket implements AutoCloseable { - private static final RequestHandler EMPTY_HANDLER = new RequestHandler.Builder().build(); - - private static final Consumer DEFAULT_ERROR_STREAM = t -> { - // TODO should we use SLF4j, use System.err, or swallow by default? - System.err.println("ReactiveSocket ERROR => " + t.getMessage() - + " [Provide errorStream handler to replace this default]"); - }; - - private final DuplexConnection connection; - private final boolean isServer; - private final Consumer errorStream; - private Requester requester; - private Responder responder; - private final ConnectionSetupPayload requestorSetupPayload; - private final RequestHandler clientRequestHandler; - private final ConnectionSetupHandler responderConnectionHandler; - private final LeaseGovernor leaseGovernor; - - private ReactiveSocket( - DuplexConnection connection, - boolean isServer, - ConnectionSetupPayload serverRequestorSetupPayload, - RequestHandler clientRequestHandler, - ConnectionSetupHandler responderConnectionHandler, - LeaseGovernor leaseGovernor, - Consumer errorStream - ) { - this.connection = connection; - this.isServer = isServer; - this.requestorSetupPayload = serverRequestorSetupPayload; - this.clientRequestHandler = clientRequestHandler; - this.responderConnectionHandler = responderConnectionHandler; - this.leaseGovernor = leaseGovernor; - this.errorStream = errorStream; - } - - /** - * Create a ReactiveSocket from a client-side {@link DuplexConnection}. - *

- * A client-side connection is one that initiated the connection with a - * server and will define the ReactiveSocket behaviors via the - * {@link ConnectionSetupPayload} that define mime-types, leasing - * behavior and other connection-level details. - * - * @param connection - * DuplexConnection of client-side initiated connection for - * the ReactiveSocket protocol to use. - * @param setup - * ConnectionSetupPayload that defines mime-types and other - * connection behavior details. - * @param handler - * (Optional) RequestHandler for responding to requests from - * the server. If 'null' requests will be responded to with - * "Not Found" errors. - * @param errorStream - * (Optional) Callback for errors while processing streams - * over connection. If 'null' then error messages will be - * output to System.err. - * @return ReactiveSocket for start, shutdown and sending requests. - */ - public static ReactiveSocket fromClientConnection( - DuplexConnection connection, - ConnectionSetupPayload setup, - RequestHandler handler, - Consumer errorStream - ) { - if (connection == null) { - throw new IllegalArgumentException("DuplexConnection can not be null"); - } - if (setup == null) { - throw new IllegalArgumentException("ConnectionSetupPayload can not be null"); - } - final RequestHandler h = handler != null ? handler : EMPTY_HANDLER; - Consumer es = errorStream != null ? errorStream : DEFAULT_ERROR_STREAM; - return new ReactiveSocket(connection, false, setup, h, null, NULL_LEASE_GOVERNOR, es); - } - - /** - * Create a ReactiveSocket from a client-side {@link DuplexConnection}. - *

- * A client-side connection is one that initiated the connection with a - * server and will define the ReactiveSocket behaviors via the - * {@link ConnectionSetupPayload} that define mime-types, leasing - * behavior and other connection-level details. - *

- * If this ReactiveSocket receives requests from the server it will respond - * with "Not Found" errors. - * - * @param connection - * DuplexConnection of client-side initiated connection for the - * ReactiveSocket protocol to use. - * @param setup - * ConnectionSetupPayload that defines mime-types and other - * connection behavior details. - * @param errorStream - * (Optional) Callback for errors while processing streams over - * connection. If 'null' then error messages will be output to - * System.err. - * @return ReactiveSocket for start, shutdown and sending requests. - */ - public static ReactiveSocket fromClientConnection( - DuplexConnection connection, - ConnectionSetupPayload setup, - Consumer errorStream - ) { - return fromClientConnection(connection, setup, EMPTY_HANDLER, errorStream); - } - - public static ReactiveSocket fromClientConnection( - DuplexConnection connection, - ConnectionSetupPayload setup - ) { - return fromClientConnection(connection, setup, EMPTY_HANDLER, DEFAULT_ERROR_STREAM); - } - - /** - * Create a ReactiveSocket from a server-side {@link DuplexConnection}. - *

- * A server-side connection is one that accepted the connection from a - * client and will define the ReactiveSocket behaviors via the - * {@link ConnectionSetupPayload} that define mime-types, leasing behavior - * and other connection-level details. - * - * @param connection - * @param connectionHandler - * @param errorConsumer - * @return - */ - public static ReactiveSocket fromServerConnection( - DuplexConnection connection, - ConnectionSetupHandler connectionHandler, - LeaseGovernor leaseGovernor, - Consumer errorConsumer - ) { - return new ReactiveSocket(connection, true, null, null, connectionHandler, - leaseGovernor, errorConsumer); - } - - public static ReactiveSocket fromServerConnection( - DuplexConnection connection, - ConnectionSetupHandler connectionHandler - ) { - return fromServerConnection(connection, connectionHandler, NULL_LEASE_GOVERNOR, t -> {}); - } - - /** - * Initiate a request response exchange - */ - public Publisher requestResponse(final Payload payload) { - assertRequester(); - return requester.requestResponse(payload); - } - - public Publisher fireAndForget(final Payload payload) { - assertRequester(); - return requester.fireAndForget(payload); - } - - public Publisher requestStream(final Payload payload) { - assertRequester(); - return requester.requestStream(payload); - } - - public Publisher requestSubscription(final Payload payload) { - assertRequester(); - return requester.requestSubscription(payload); - } - - public Publisher requestChannel(final Publisher payloads) { - assertRequester(); - return requester.requestChannel(payloads); - } - - public Publisher metadataPush(final Payload payload) { - assertRequester(); - return requester.metadataPush(payload); - } - - private void assertRequester() { - if (requester == null) { - if (isServer) { - if (responder == null) { - throw new IllegalStateException("Connection not initialized. " + - "Please 'start()' before submitting requests"); - } else { - throw new IllegalStateException("Setup not yet received from client. " + - "Please wait until Setup is completed, then retry."); - } - } else { - throw new IllegalStateException("Connection not initialized. " + - "Please 'start()' before submitting requests"); - } - } - } - - /** - * Client check for availability to send request based on lease - * - * @return 0.0 to 1.0 indicating availability of sending requests - */ - public double availability() { - // TODO: can happen in either direction - assertRequester(); - return requester.availability(); - } - - /** - * Server granting new lease information to client - * - * Initial lease semantics are that server waits for periodic granting of leases by server side. - * - * @param ttl - * @param numberOfRequests - */ - public void sendLease(int ttl, int numberOfRequests) { - // TODO: can happen in either direction - responder.sendLease(ttl, numberOfRequests); - } - - /** - * Start protocol processing on the given DuplexConnection. - */ - public final void start(Completable c) { - if (isServer) { - responder = Responder.createServerResponder( - new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_CLIENT_EVEN), - responderConnectionHandler, - leaseGovernor, - errorStream, - c, - setupPayload -> { - Completable two = new Completable() { - // wait for 2 success, or 1 error to pass on - AtomicInteger count = new AtomicInteger(); - - @Override - public void success() { - if (count.incrementAndGet() == 2) { - requesterReady.success(); - } - } - - @Override - public void error(Throwable e) { - requesterReady.error(e); - } - }; - requester = Requester.createServerRequester( - new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_SERVER_ODD), - setupPayload, - errorStream, - two - ); - two.success(); // now that the reference is assigned in case of synchronous setup - }); - } else { - Completable both = new Completable() { - // wait for 2 success, or 1 error to pass on - AtomicInteger count = new AtomicInteger(); - - @Override - public void success() { - if (count.incrementAndGet() == 2) { - c.success(); - } - } - - @Override - public void error(Throwable e) { - c.error(e); - } - }; - requester = Requester.createClientRequester( - new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_CLIENT_EVEN), - requestorSetupPayload, - errorStream, - new Completable() { - @Override - public void success() { - requesterReady.success(); - both.success(); - } - - @Override - public void error(Throwable e) { - requesterReady.error(e); - both.error(e); - } - }); - responder = Responder.createClientResponder( - new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_SERVER_ODD), - clientRequestHandler, - leaseGovernor, - errorStream, - both - ); - } - } - - private final CompositeCompletable requesterReady = new CompositeCompletable(); - - /** - * Invoked when Requester is ready with success or fail. - * - * @param c - */ - public final void onRequestReady(Completable c) { - requesterReady.add(c); - } - - /** - * Invoked when Requester is ready. Non-null exception if error. Null if success. - * - * @param c - */ - public final void onRequestReady(Consumer c) { - requesterReady.add(new Completable() { - @Override - public void success() { - c.accept(null); - } - - @Override - public void error(Throwable e) { - c.accept(e); - } - }); - } - - private static class ConnectionFilter implements DuplexConnection { - private enum STREAMS { - FROM_CLIENT_EVEN, FROM_SERVER_ODD; - } - - private final DuplexConnection connection; - private final STREAMS s; - - private ConnectionFilter(DuplexConnection connection, STREAMS s) { - this.connection = connection; - this.s = s; - } - - @Override - public void close() throws IOException { - connection.close(); // forward - } - - @Override - public Observable getInput() { - return new Observable() { - @Override - public void subscribe(Observer o) { - CompositeDisposable cd = new CompositeDisposable(); - o.onSubscribe(cd); - connection.getInput().subscribe(new Observer() { - - @Override - public void onNext(Frame t) { - int streamId = t.getStreamId(); - FrameType type = t.getType(); - if (streamId == 0) { - if (FrameType.SETUP.equals(type) && s == STREAMS.FROM_CLIENT_EVEN) { - o.onNext(t); - } else if (FrameType.LEASE.equals(type)) { - o.onNext(t); - } else if (FrameType.ERROR.equals(type)) { - // o.onNext(t); // TODO this doesn't work - } else if (FrameType.KEEPALIVE.equals(type)) { - o.onNext(t); // TODO need tests - } else if (FrameType.METADATA_PUSH.equals(type)) { - o.onNext(t); - } - } else if (BitUtil.isEven(streamId)) { - if (s == STREAMS.FROM_CLIENT_EVEN) { - o.onNext(t); - } - } else { - if (s == STREAMS.FROM_SERVER_ODD) { - o.onNext(t); - } - } - } - - @Override - public void onError(Throwable e) { - o.onError(e); - } - - @Override - public void onComplete() { - o.onComplete(); - } - - @Override - public void onSubscribe(Disposable d) { - cd.add(d); - } - }); - } - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - connection.addOutput(o, callback); - } - - @Override - public void addOutput(Frame f, Completable callback) { - connection.addOutput(f, callback); - } - - }; - - /** - * Start and block the current thread until startup is finished. - * - * @throws RuntimeException - * of InterruptedException - */ - public final void startAndWait() { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference err = new AtomicReference<>(); - start(new Completable() { - @Override - public void success() { - latch.countDown(); - } - - @Override - public void error(Throwable e) { - latch.countDown(); - } - }); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (err.get() != null) { - throw new RuntimeException(err.get()); - } - } - - @Override - public void close() throws Exception { - connection.close(); - leaseGovernor.unregister(responder); - if (requester != null) { - requester.shutdown(); - } - if (responder != null) { - responder.shutdown(); - } - } - - public void shutdown() { - try { - close(); - } catch (Exception e) { - throw new RuntimeException("Failed Shutdown", e); - } - } - - private static Publisher error(Throwable e) { - return (Subscriber s) -> { - s.onSubscribe(new Subscription() { - @Override - public void request(long n) { - // should probably worry about n==0 - s.onError(e); - } - - @Override - public void cancel() { - // ignoring just because - } - }); - }; - } +public interface ReactiveSocket { } diff --git a/src/main/java/io/reactivesocket/ReactiveSocketImpl.java b/src/main/java/io/reactivesocket/ReactiveSocketImpl.java new file mode 100644 index 000000000..5d9ca68b3 --- /dev/null +++ b/src/main/java/io/reactivesocket/ReactiveSocketImpl.java @@ -0,0 +1,527 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.reactivesocket.internal.Requester; +import io.reactivesocket.internal.Responder; +import io.reactivesocket.internal.rx.CompositeCompletable; +import io.reactivesocket.internal.rx.CompositeDisposable; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Disposable; +import io.reactivesocket.rx.Observable; +import io.reactivesocket.rx.Observer; +import uk.co.real_logic.agrona.BitUtil; + +import static io.reactivesocket.LeaseGovernor.NULL_LEASE_GOVERNOR; + +/** + * Interface for a connection that supports sending requests and receiving responses + * + * Created by servers for connections Created on demand for clients + */ +public class ReactiveSocket implements AutoCloseable { + private static final RequestHandler EMPTY_HANDLER = new RequestHandler.Builder().build(); + + private static final Consumer DEFAULT_ERROR_STREAM = t -> { + // TODO should we use SLF4j, use System.err, or swallow by default? + System.err.println("ReactiveSocket ERROR => " + t.getMessage() + + " [Provide errorStream handler to replace this default]"); + }; + + private final DuplexConnection connection; + private final boolean isServer; + private final Consumer errorStream; + private Requester requester; + private Responder responder; + private final ConnectionSetupPayload requestorSetupPayload; + private final RequestHandler clientRequestHandler; + private final ConnectionSetupHandler responderConnectionHandler; + private final LeaseGovernor leaseGovernor; + + private ReactiveSocket( + DuplexConnection connection, + boolean isServer, + ConnectionSetupPayload serverRequestorSetupPayload, + RequestHandler clientRequestHandler, + ConnectionSetupHandler responderConnectionHandler, + LeaseGovernor leaseGovernor, + Consumer errorStream + ) { + this.connection = connection; + this.isServer = isServer; + this.requestorSetupPayload = serverRequestorSetupPayload; + this.clientRequestHandler = clientRequestHandler; + this.responderConnectionHandler = responderConnectionHandler; + this.leaseGovernor = leaseGovernor; + this.errorStream = errorStream; + } + + /** + * Create a ReactiveSocket from a client-side {@link DuplexConnection}. + *

+ * A client-side connection is one that initiated the connection with a + * server and will define the ReactiveSocket behaviors via the + * {@link ConnectionSetupPayload} that define mime-types, leasing + * behavior and other connection-level details. + * + * @param connection + * DuplexConnection of client-side initiated connection for + * the ReactiveSocket protocol to use. + * @param setup + * ConnectionSetupPayload that defines mime-types and other + * connection behavior details. + * @param handler + * (Optional) RequestHandler for responding to requests from + * the server. If 'null' requests will be responded to with + * "Not Found" errors. + * @param errorStream + * (Optional) Callback for errors while processing streams + * over connection. If 'null' then error messages will be + * output to System.err. + * @return ReactiveSocket for start, shutdown and sending requests. + */ + public static ReactiveSocket fromClientConnection( + DuplexConnection connection, + ConnectionSetupPayload setup, + RequestHandler handler, + Consumer errorStream + ) { + if (connection == null) { + throw new IllegalArgumentException("DuplexConnection can not be null"); + } + if (setup == null) { + throw new IllegalArgumentException("ConnectionSetupPayload can not be null"); + } + final RequestHandler h = handler != null ? handler : EMPTY_HANDLER; + Consumer es = errorStream != null ? errorStream : DEFAULT_ERROR_STREAM; + return new ReactiveSocket(connection, false, setup, h, null, NULL_LEASE_GOVERNOR, es); + } + + /** + * Create a ReactiveSocket from a client-side {@link DuplexConnection}. + *

+ * A client-side connection is one that initiated the connection with a + * server and will define the ReactiveSocket behaviors via the + * {@link ConnectionSetupPayload} that define mime-types, leasing + * behavior and other connection-level details. + *

+ * If this ReactiveSocket receives requests from the server it will respond + * with "Not Found" errors. + * + * @param connection + * DuplexConnection of client-side initiated connection for the + * ReactiveSocket protocol to use. + * @param setup + * ConnectionSetupPayload that defines mime-types and other + * connection behavior details. + * @param errorStream + * (Optional) Callback for errors while processing streams over + * connection. If 'null' then error messages will be output to + * System.err. + * @return ReactiveSocket for start, shutdown and sending requests. + */ + public static ReactiveSocket fromClientConnection( + DuplexConnection connection, + ConnectionSetupPayload setup, + Consumer errorStream + ) { + return fromClientConnection(connection, setup, EMPTY_HANDLER, errorStream); + } + + public static ReactiveSocket fromClientConnection( + DuplexConnection connection, + ConnectionSetupPayload setup + ) { + return fromClientConnection(connection, setup, EMPTY_HANDLER, DEFAULT_ERROR_STREAM); + } + + /** + * Create a ReactiveSocket from a server-side {@link DuplexConnection}. + *

+ * A server-side connection is one that accepted the connection from a + * client and will define the ReactiveSocket behaviors via the + * {@link ConnectionSetupPayload} that define mime-types, leasing behavior + * and other connection-level details. + * + * @param connection + * @param connectionHandler + * @param errorConsumer + * @return + */ + public static ReactiveSocket fromServerConnection( + DuplexConnection connection, + ConnectionSetupHandler connectionHandler, + LeaseGovernor leaseGovernor, + Consumer errorConsumer + ) { + return new ReactiveSocket(connection, true, null, null, connectionHandler, + leaseGovernor, errorConsumer); + } + + public static ReactiveSocket fromServerConnection( + DuplexConnection connection, + ConnectionSetupHandler connectionHandler + ) { + return fromServerConnection(connection, connectionHandler, NULL_LEASE_GOVERNOR, t -> {}); + } + + /** + * Initiate a request response exchange + */ + public Publisher requestResponse(final Payload payload) { + assertRequester(); + return requester.requestResponse(payload); + } + + public Publisher fireAndForget(final Payload payload) { + assertRequester(); + return requester.fireAndForget(payload); + } + + public Publisher requestStream(final Payload payload) { + assertRequester(); + return requester.requestStream(payload); + } + + public Publisher requestSubscription(final Payload payload) { + assertRequester(); + return requester.requestSubscription(payload); + } + + public Publisher requestChannel(final Publisher payloads) { + assertRequester(); + return requester.requestChannel(payloads); + } + + public Publisher metadataPush(final Payload payload) { + assertRequester(); + return requester.metadataPush(payload); + } + + private void assertRequester() { + if (requester == null) { + if (isServer) { + if (responder == null) { + throw new IllegalStateException("Connection not initialized. " + + "Please 'start()' before submitting requests"); + } else { + throw new IllegalStateException("Setup not yet received from client. " + + "Please wait until Setup is completed, then retry."); + } + } else { + throw new IllegalStateException("Connection not initialized. " + + "Please 'start()' before submitting requests"); + } + } + } + + /** + * Client check for availability to send request based on lease + * + * @return 0.0 to 1.0 indicating availability of sending requests + */ + public double availability() { + // TODO: can happen in either direction + assertRequester(); + return requester.availability(); + } + + /** + * Server granting new lease information to client + * + * Initial lease semantics are that server waits for periodic granting of leases by server side. + * + * @param ttl + * @param numberOfRequests + */ + public void sendLease(int ttl, int numberOfRequests) { + // TODO: can happen in either direction + responder.sendLease(ttl, numberOfRequests); + } + + /** + * Start protocol processing on the given DuplexConnection. + */ + public final void start(Completable c) { + if (isServer) { + responder = Responder.createServerResponder( + new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_CLIENT_EVEN), + responderConnectionHandler, + leaseGovernor, + errorStream, + c, + setupPayload -> { + Completable two = new Completable() { + // wait for 2 success, or 1 error to pass on + AtomicInteger count = new AtomicInteger(); + + @Override + public void success() { + if (count.incrementAndGet() == 2) { + requesterReady.success(); + } + } + + @Override + public void error(Throwable e) { + requesterReady.error(e); + } + }; + requester = Requester.createServerRequester( + new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_SERVER_ODD), + setupPayload, + errorStream, + two + ); + two.success(); // now that the reference is assigned in case of synchronous setup + }); + } else { + Completable both = new Completable() { + // wait for 2 success, or 1 error to pass on + AtomicInteger count = new AtomicInteger(); + + @Override + public void success() { + if (count.incrementAndGet() == 2) { + c.success(); + } + } + + @Override + public void error(Throwable e) { + c.error(e); + } + }; + requester = Requester.createClientRequester( + new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_CLIENT_EVEN), + requestorSetupPayload, + errorStream, + new Completable() { + @Override + public void success() { + requesterReady.success(); + both.success(); + } + + @Override + public void error(Throwable e) { + requesterReady.error(e); + both.error(e); + } + }); + responder = Responder.createClientResponder( + new ConnectionFilter(connection, ConnectionFilter.STREAMS.FROM_SERVER_ODD), + clientRequestHandler, + leaseGovernor, + errorStream, + both + ); + } + } + + private final CompositeCompletable requesterReady = new CompositeCompletable(); + + /** + * Invoked when Requester is ready with success or fail. + * + * @param c + */ + public final void onRequestReady(Completable c) { + requesterReady.add(c); + } + + /** + * Invoked when Requester is ready. Non-null exception if error. Null if success. + * + * @param c + */ + public final void onRequestReady(Consumer c) { + requesterReady.add(new Completable() { + @Override + public void success() { + c.accept(null); + } + + @Override + public void error(Throwable e) { + c.accept(e); + } + }); + } + + private static class ConnectionFilter implements DuplexConnection { + private enum STREAMS { + FROM_CLIENT_EVEN, FROM_SERVER_ODD; + } + + private final DuplexConnection connection; + private final STREAMS s; + + private ConnectionFilter(DuplexConnection connection, STREAMS s) { + this.connection = connection; + this.s = s; + } + + @Override + public void close() throws IOException { + connection.close(); // forward + } + + @Override + public Observable getInput() { + return new Observable() { + @Override + public void subscribe(Observer o) { + CompositeDisposable cd = new CompositeDisposable(); + o.onSubscribe(cd); + connection.getInput().subscribe(new Observer() { + + @Override + public void onNext(Frame t) { + int streamId = t.getStreamId(); + FrameType type = t.getType(); + if (streamId == 0) { + if (FrameType.SETUP.equals(type) && s == STREAMS.FROM_CLIENT_EVEN) { + o.onNext(t); + } else if (FrameType.LEASE.equals(type)) { + o.onNext(t); + } else if (FrameType.ERROR.equals(type)) { + // o.onNext(t); // TODO this doesn't work + } else if (FrameType.KEEPALIVE.equals(type)) { + o.onNext(t); // TODO need tests + } else if (FrameType.METADATA_PUSH.equals(type)) { + o.onNext(t); + } + } else if (BitUtil.isEven(streamId)) { + if (s == STREAMS.FROM_CLIENT_EVEN) { + o.onNext(t); + } + } else { + if (s == STREAMS.FROM_SERVER_ODD) { + o.onNext(t); + } + } + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onComplete() { + o.onComplete(); + } + + @Override + public void onSubscribe(Disposable d) { + cd.add(d); + } + }); + } + }; + } + + @Override + public void addOutput(Publisher o, Completable callback) { + connection.addOutput(o, callback); + } + + @Override + public void addOutput(Frame f, Completable callback) { + connection.addOutput(f, callback); + } + + }; + + /** + * Start and block the current thread until startup is finished. + * + * @throws RuntimeException + * of InterruptedException + */ + public final void startAndWait() { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference err = new AtomicReference<>(); + start(new Completable() { + @Override + public void success() { + latch.countDown(); + } + + @Override + public void error(Throwable e) { + latch.countDown(); + } + }); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (err.get() != null) { + throw new RuntimeException(err.get()); + } + } + + @Override + public void close() throws Exception { + connection.close(); + leaseGovernor.unregister(responder); + if (requester != null) { + requester.shutdown(); + } + if (responder != null) { + responder.shutdown(); + } + } + + public void shutdown() { + try { + close(); + } catch (Exception e) { + throw new RuntimeException("Failed Shutdown", e); + } + } + + private static Publisher error(Throwable e) { + return (Subscriber s) -> { + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + // should probably worry about n==0 + s.onError(e); + } + + @Override + public void cancel() { + // ignoring just because + } + }); + }; + } +} From ac7748548183850dd248048e03cd2069d59b1622 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Sat, 26 Mar 2016 22:42:28 -0700 Subject: [PATCH 2/4] created a reactive socket interface, pointed rxjava2 to the specific version of rxjava2 to get the tests to work, updated Agrona to 0.4.13 --- build.gradle | 4 +- src/main/java/io/reactivesocket/Frame.java | 4 +- .../io/reactivesocket/ReactiveSocket.java | 105 +++++++++++++++++- .../reactivesocket/ReactiveSocketFactory.java | 15 +++ .../io/reactivesocket/ReactiveSocketImpl.java | 100 +++++------------ .../io/reactivesocket/internal/Requester.java | 2 +- .../io/reactivesocket/internal/Responder.java | 28 +++-- .../internal/frame/ErrorFrameFlyweight.java | 15 ++- .../internal/frame/FrameHeaderFlyweight.java | 10 +- .../internal/frame/FramePool.java | 2 +- .../frame/KeepaliveFrameFlyweight.java | 4 +- .../internal/frame/LeaseFrameFlyweight.java | 6 +- .../internal/frame/PayloadBuilder.java | 6 +- .../internal/frame/PayloadReassembler.java | 4 +- .../internal/frame/RequestFrameFlyweight.java | 6 +- .../frame/RequestNFrameFlyweight.java | 7 +- .../internal/frame/SetupFrameFlyweight.java | 6 +- .../internal/frame/ThreadLocalFramePool.java | 6 +- .../internal/frame/ThreadSafeFramePool.java | 6 +- .../internal/frame/UnpooledFrame.java | 4 +- .../io/reactivesocket/ReactiveSocketPerf.java | 17 ++- .../java/io/reactivesocket/FrameTest.java | 2 +- .../java/io/reactivesocket/LeaseTest.java | 4 +- .../io/reactivesocket/ReactiveSocketTest.java | 6 +- .../TestFlowControlRequestN.java | 32 +++--- .../reactivesocket/TestTransportRequestN.java | 27 +++-- src/test/java/io/reactivesocket/TestUtil.java | 2 +- 27 files changed, 258 insertions(+), 172 deletions(-) diff --git a/build.gradle b/build.gradle index 3a9c2511c..0302683a4 100644 --- a/build.gradle +++ b/build.gradle @@ -18,9 +18,9 @@ repositories { dependencies { compile 'org.reactivestreams:reactive-streams:1.0.0.final' - compile 'uk.co.real-logic:Agrona:0.4.2' + compile 'org.agrona:Agrona:0.4.13' - testCompile 'io.reactivex:rxjava:2.0.0-DP0-SNAPSHOT' + testCompile 'io.reactivex:rxjava:2.0.0-DP0-20151003.214425-143' testCompile 'junit:junit-dep:4.10' testCompile 'org.mockito:mockito-core:1.8.5' } diff --git a/src/main/java/io/reactivesocket/Frame.java b/src/main/java/io/reactivesocket/Frame.java index dd29c8e35..4de7d4df1 100644 --- a/src/main/java/io/reactivesocket/Frame.java +++ b/src/main/java/io/reactivesocket/Frame.java @@ -24,8 +24,8 @@ import io.reactivesocket.internal.frame.RequestNFrameFlyweight; import io.reactivesocket.internal.frame.SetupFrameFlyweight; import io.reactivesocket.internal.frame.UnpooledFrame; -import uk.co.real_logic.agrona.DirectBuffer; -import uk.co.real_logic.agrona.MutableDirectBuffer; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import java.nio.ByteBuffer; import java.nio.charset.Charset; diff --git a/src/main/java/io/reactivesocket/ReactiveSocket.java b/src/main/java/io/reactivesocket/ReactiveSocket.java index 9efa69723..8f8c2a10c 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -1,7 +1,108 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.reactivesocket; +import io.reactivesocket.rx.Completable; +import org.reactivestreams.Publisher; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + /** - * Created by rroeser on 3/25/16. + * Interface for a connection that supports sending requests and receiving responses */ -public interface ReactiveSocket { +public interface ReactiveSocket extends AutoCloseable { + Publisher requestResponse(final Payload payload); + + Publisher fireAndForget(final Payload payload); + + Publisher requestStream(final Payload payload); + + Publisher requestSubscription(final Payload payload); + + Publisher requestChannel(final Publisher payloads); + + Publisher metadataPush(final Payload payload); + + /** + * Client check for availability to send request based on lease + * + * @return 0.0 to 1.0 indicating availability of sending requests + */ + double availability(); + + /** + * Start protocol processing on the given DuplexConnection. + */ + void start(Completable c); + + /** + * Start and block the current thread until startup is finished. + * + * @throws RuntimeException + * of InterruptedException + */ + default void startAndWait() { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference err = new AtomicReference<>(); + start(new Completable() { + @Override + public void success() { + latch.countDown(); + } + + @Override + public void error(Throwable e) { + latch.countDown(); + } + }); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (err.get() != null) { + throw new RuntimeException(err.get()); + } + } + + /** + * Invoked when Requester is ready. Non-null exception if error. Null if success. + * + * @param c + */ + void onRequestReady(Consumer c); + + /** + * Invoked when Requester is ready with success or fail. + * + * @param c + */ + void onRequestReady(Completable c); + + /** + * Server granting new lease information to client + * + * Initial lease semantics are that server waits for periodic granting of leases by server side. + * + * @param ttl + * @param numberOfRequests + */ + void sendLease(int ttl, int numberOfRequests); + + void shutdown(); } diff --git a/src/main/java/io/reactivesocket/ReactiveSocketFactory.java b/src/main/java/io/reactivesocket/ReactiveSocketFactory.java index 6c5268a84..6aab5bfb0 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocketFactory.java +++ b/src/main/java/io/reactivesocket/ReactiveSocketFactory.java @@ -1,3 +1,18 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.reactivesocket; import org.reactivestreams.Publisher; diff --git a/src/main/java/io/reactivesocket/ReactiveSocketImpl.java b/src/main/java/io/reactivesocket/ReactiveSocketImpl.java index 5d9ca68b3..82fbc23b8 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocketImpl.java +++ b/src/main/java/io/reactivesocket/ReactiveSocketImpl.java @@ -15,17 +15,6 @@ */ package io.reactivesocket; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - import io.reactivesocket.internal.Requester; import io.reactivesocket.internal.Responder; import io.reactivesocket.internal.rx.CompositeCompletable; @@ -34,16 +23,21 @@ import io.reactivesocket.rx.Disposable; import io.reactivesocket.rx.Observable; import io.reactivesocket.rx.Observer; -import uk.co.real_logic.agrona.BitUtil; +import org.agrona.BitUtil; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static io.reactivesocket.LeaseGovernor.NULL_LEASE_GOVERNOR; /** - * Interface for a connection that supports sending requests and receiving responses - * - * Created by servers for connections Created on demand for clients + * An implementation of {@link ReactiveSocket} */ -public class ReactiveSocket implements AutoCloseable { +public class ReactiveSocketImpl implements ReactiveSocket { private static final RequestHandler EMPTY_HANDLER = new RequestHandler.Builder().build(); private static final Consumer DEFAULT_ERROR_STREAM = t -> { @@ -62,7 +56,7 @@ public class ReactiveSocket implements AutoCloseable { private final ConnectionSetupHandler responderConnectionHandler; private final LeaseGovernor leaseGovernor; - private ReactiveSocket( + private ReactiveSocketImpl( DuplexConnection connection, boolean isServer, ConnectionSetupPayload serverRequestorSetupPayload, @@ -118,7 +112,7 @@ public static ReactiveSocket fromClientConnection( } final RequestHandler h = handler != null ? handler : EMPTY_HANDLER; Consumer es = errorStream != null ? errorStream : DEFAULT_ERROR_STREAM; - return new ReactiveSocket(connection, false, setup, h, null, NULL_LEASE_GOVERNOR, es); + return new ReactiveSocketImpl(connection, false, setup, h, null, NULL_LEASE_GOVERNOR, es); } /** @@ -178,7 +172,7 @@ public static ReactiveSocket fromServerConnection( LeaseGovernor leaseGovernor, Consumer errorConsumer ) { - return new ReactiveSocket(connection, true, null, null, connectionHandler, + return new ReactiveSocketImpl(connection, true, null, null, connectionHandler, leaseGovernor, errorConsumer); } @@ -192,31 +186,37 @@ public static ReactiveSocket fromServerConnection( /** * Initiate a request response exchange */ + @Override public Publisher requestResponse(final Payload payload) { assertRequester(); return requester.requestResponse(payload); } + @Override public Publisher fireAndForget(final Payload payload) { assertRequester(); return requester.fireAndForget(payload); } + @Override public Publisher requestStream(final Payload payload) { assertRequester(); return requester.requestStream(payload); } + @Override public Publisher requestSubscription(final Payload payload) { assertRequester(); return requester.requestSubscription(payload); } + @Override public Publisher requestChannel(final Publisher payloads) { assertRequester(); return requester.requestChannel(payloads); } + @Override public Publisher metadataPush(final Payload payload) { assertRequester(); return requester.metadataPush(payload); @@ -239,33 +239,20 @@ private void assertRequester() { } } - /** - * Client check for availability to send request based on lease - * - * @return 0.0 to 1.0 indicating availability of sending requests - */ + @Override public double availability() { // TODO: can happen in either direction assertRequester(); return requester.availability(); } - /** - * Server granting new lease information to client - * - * Initial lease semantics are that server waits for periodic granting of leases by server side. - * - * @param ttl - * @param numberOfRequests - */ + @Override public void sendLease(int ttl, int numberOfRequests) { // TODO: can happen in either direction responder.sendLease(ttl, numberOfRequests); } - /** - * Start protocol processing on the given DuplexConnection. - */ + @Override public final void start(Completable c) { if (isServer) { responder = Responder.createServerResponder( @@ -345,20 +332,12 @@ public void error(Throwable e) { private final CompositeCompletable requesterReady = new CompositeCompletable(); - /** - * Invoked when Requester is ready with success or fail. - * - * @param c - */ + @Override public final void onRequestReady(Completable c) { requesterReady.add(c); } - /** - * Invoked when Requester is ready. Non-null exception if error. Null if success. - * - * @param c - */ + @Override public final void onRequestReady(Consumer c) { requesterReady.add(new Completable() { @Override @@ -458,36 +437,6 @@ public void addOutput(Frame f, Completable callback) { }; - /** - * Start and block the current thread until startup is finished. - * - * @throws RuntimeException - * of InterruptedException - */ - public final void startAndWait() { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference err = new AtomicReference<>(); - start(new Completable() { - @Override - public void success() { - latch.countDown(); - } - - @Override - public void error(Throwable e) { - latch.countDown(); - } - }); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (err.get() != null) { - throw new RuntimeException(err.get()); - } - } - @Override public void close() throws Exception { connection.close(); @@ -500,6 +449,7 @@ public void close() throws Exception { } } + @Override public void shutdown() { try { close(); diff --git a/src/main/java/io/reactivesocket/internal/Requester.java b/src/main/java/io/reactivesocket/internal/Requester.java index 94cbdfc28..924ffbae8 100644 --- a/src/main/java/io/reactivesocket/internal/Requester.java +++ b/src/main/java/io/reactivesocket/internal/Requester.java @@ -44,7 +44,7 @@ import io.reactivesocket.rx.Completable; import io.reactivesocket.rx.Disposable; import io.reactivesocket.rx.Observer; -import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.Int2ObjectHashMap; /** * Protocol implementation abstracted over a {@link DuplexConnection}. diff --git a/src/main/java/io/reactivesocket/internal/Responder.java b/src/main/java/io/reactivesocket/internal/Responder.java index dfead22ac..640f2f06f 100644 --- a/src/main/java/io/reactivesocket/internal/Responder.java +++ b/src/main/java/io/reactivesocket/internal/Responder.java @@ -15,18 +15,16 @@ */ package io.reactivesocket.internal; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.function.Consumer; - -import io.reactivesocket.*; +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.DuplexConnection; +import io.reactivesocket.Frame; +import io.reactivesocket.FrameType; +import io.reactivesocket.LeaseGovernor; +import io.reactivesocket.Payload; +import io.reactivesocket.RequestHandler; import io.reactivesocket.exceptions.InvalidSetupException; import io.reactivesocket.exceptions.RejectedException; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - import io.reactivesocket.exceptions.SetupException; import io.reactivesocket.internal.frame.SetupFrameFlyweight; import io.reactivesocket.internal.rx.EmptyDisposable; @@ -34,7 +32,15 @@ import io.reactivesocket.rx.Completable; import io.reactivesocket.rx.Disposable; import io.reactivesocket.rx.Observer; -import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.Int2ObjectHashMap; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Consumer; /** * Protocol implementation abstracted over a {@link DuplexConnection}. diff --git a/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java index 40dbcc115..e5df2ff11 100644 --- a/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java @@ -16,10 +16,17 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; -import io.reactivesocket.exceptions.*; -import uk.co.real_logic.agrona.BitUtil; -import uk.co.real_logic.agrona.DirectBuffer; -import uk.co.real_logic.agrona.MutableDirectBuffer; +import io.reactivesocket.exceptions.ApplicationException; +import io.reactivesocket.exceptions.CancelException; +import io.reactivesocket.exceptions.ConnectionException; +import io.reactivesocket.exceptions.InvalidRequestException; +import io.reactivesocket.exceptions.InvalidSetupException; +import io.reactivesocket.exceptions.RejectedException; +import io.reactivesocket.exceptions.RejectedSetupException; +import io.reactivesocket.exceptions.UnsupportedSetupException; +import org.agrona.BitUtil; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; diff --git a/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java index be6c61a76..696d01b10 100644 --- a/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java @@ -16,15 +16,15 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; -import uk.co.real_logic.agrona.BitUtil; -import uk.co.real_logic.agrona.DirectBuffer; -import uk.co.real_logic.agrona.MutableDirectBuffer; - -import static io.reactivesocket.internal.frame.ByteBufferUtil.*; +import org.agrona.BitUtil; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import static io.reactivesocket.internal.frame.ByteBufferUtil.preservingSlice; + /** * Per connection frame flyweight. * diff --git a/src/main/java/io/reactivesocket/internal/frame/FramePool.java b/src/main/java/io/reactivesocket/internal/frame/FramePool.java index 4290891cb..9b37a643d 100644 --- a/src/main/java/io/reactivesocket/internal/frame/FramePool.java +++ b/src/main/java/io/reactivesocket/internal/frame/FramePool.java @@ -16,7 +16,7 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; -import uk.co.real_logic.agrona.MutableDirectBuffer; +import org.agrona.MutableDirectBuffer; import java.nio.ByteBuffer; diff --git a/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java index 8e03b8904..1c90b935b 100644 --- a/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java @@ -16,8 +16,8 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; -import uk.co.real_logic.agrona.DirectBuffer; -import uk.co.real_logic.agrona.MutableDirectBuffer; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import java.nio.ByteBuffer; diff --git a/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java index 8a663fca5..1b9b4d0a7 100644 --- a/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java @@ -16,9 +16,9 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; -import uk.co.real_logic.agrona.BitUtil; -import uk.co.real_logic.agrona.DirectBuffer; -import uk.co.real_logic.agrona.MutableDirectBuffer; +import org.agrona.BitUtil; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; diff --git a/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java b/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java index a068ab96e..4db09e867 100644 --- a/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java +++ b/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java @@ -17,9 +17,9 @@ import io.reactivesocket.Frame; import io.reactivesocket.Payload; -import uk.co.real_logic.agrona.BitUtil; -import uk.co.real_logic.agrona.MutableDirectBuffer; -import uk.co.real_logic.agrona.concurrent.UnsafeBuffer; +import org.agrona.BitUtil; +import org.agrona.MutableDirectBuffer; +import org.agrona.concurrent.UnsafeBuffer; import java.nio.ByteBuffer; import java.util.Arrays; diff --git a/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java b/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java index 3f5f5ea7b..6d5212028 100644 --- a/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java +++ b/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java @@ -17,10 +17,10 @@ import io.reactivesocket.Frame; import io.reactivesocket.Payload; - +import org.agrona.collections.Int2ObjectHashMap; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import uk.co.real_logic.agrona.collections.Int2ObjectHashMap; + public class PayloadReassembler implements Subscriber { diff --git a/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java index 88e67db1c..fdbcdd735 100644 --- a/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java @@ -16,9 +16,9 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; -import uk.co.real_logic.agrona.BitUtil; -import uk.co.real_logic.agrona.DirectBuffer; -import uk.co.real_logic.agrona.MutableDirectBuffer; +import org.agrona.BitUtil; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; diff --git a/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java index 7e0f4dea1..bf7f311d7 100644 --- a/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java @@ -16,11 +16,10 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; -import uk.co.real_logic.agrona.BitUtil; -import uk.co.real_logic.agrona.DirectBuffer; -import uk.co.real_logic.agrona.MutableDirectBuffer; +import org.agrona.BitUtil; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; -import java.nio.ByteBuffer; import java.nio.ByteOrder; public class RequestNFrameFlyweight diff --git a/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java b/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java index bebe4b34c..2e3d3ae17 100644 --- a/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java +++ b/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java @@ -16,9 +16,9 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.FrameType; -import uk.co.real_logic.agrona.BitUtil; -import uk.co.real_logic.agrona.DirectBuffer; -import uk.co.real_logic.agrona.MutableDirectBuffer; +import org.agrona.BitUtil; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; diff --git a/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java b/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java index 6a4420246..b7b6e1368 100644 --- a/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java +++ b/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java @@ -16,9 +16,9 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; -import uk.co.real_logic.agrona.MutableDirectBuffer; -import uk.co.real_logic.agrona.concurrent.OneToOneConcurrentArrayQueue; -import uk.co.real_logic.agrona.concurrent.UnsafeBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.concurrent.OneToOneConcurrentArrayQueue; +import org.agrona.concurrent.UnsafeBuffer; import java.nio.ByteBuffer; diff --git a/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java b/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java index 8842a38d7..bd0e00cbf 100644 --- a/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java +++ b/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java @@ -16,9 +16,9 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; -import uk.co.real_logic.agrona.MutableDirectBuffer; -import uk.co.real_logic.agrona.concurrent.OneToOneConcurrentArrayQueue; -import uk.co.real_logic.agrona.concurrent.UnsafeBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.concurrent.OneToOneConcurrentArrayQueue; +import org.agrona.concurrent.UnsafeBuffer; import java.nio.ByteBuffer; diff --git a/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java b/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java index c5de2775e..09f04288f 100644 --- a/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java +++ b/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java @@ -16,8 +16,8 @@ package io.reactivesocket.internal.frame; import io.reactivesocket.Frame; -import uk.co.real_logic.agrona.MutableDirectBuffer; -import uk.co.real_logic.agrona.concurrent.UnsafeBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.concurrent.UnsafeBuffer; import java.nio.ByteBuffer; diff --git a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java index 1ea8836c2..7196da335 100644 --- a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java +++ b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java @@ -1,9 +1,8 @@ package io.reactivesocket; -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - +import io.reactivesocket.internal.PublisherUtils; +import io.reactivesocket.perfutil.PerfTestConnection; +import io.reactivesocket.rx.Completable; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Mode; @@ -16,9 +15,9 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import io.reactivesocket.internal.PublisherUtils; -import io.reactivesocket.perfutil.PerfTestConnection; -import io.reactivesocket.rx.Completable; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @@ -133,10 +132,10 @@ public Publisher handleMetadataPush(Payload payload) } }; - final static ReactiveSocket serverSocket = ReactiveSocket.fromServerConnection(serverConnection, setupFrame -> handler); + final static ReactiveSocket serverSocket = ReactiveSocketImpl.fromServerConnection(serverConnection, setupFrame -> handler); final static ReactiveSocket client = - ReactiveSocket.fromClientConnection( + ReactiveSocketImpl.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS), t -> {}); static { diff --git a/src/test/java/io/reactivesocket/FrameTest.java b/src/test/java/io/reactivesocket/FrameTest.java index 9cd28eac3..03efb86eb 100644 --- a/src/test/java/io/reactivesocket/FrameTest.java +++ b/src/test/java/io/reactivesocket/FrameTest.java @@ -28,7 +28,7 @@ import org.junit.experimental.theories.Theories; import org.junit.experimental.theories.Theory; import org.junit.runner.RunWith; -import uk.co.real_logic.agrona.concurrent.UnsafeBuffer; +import org.agrona.concurrent.UnsafeBuffer; import static io.reactivesocket.internal.frame.ErrorFrameFlyweight.*; import static java.nio.charset.StandardCharsets.UTF_8; diff --git a/src/test/java/io/reactivesocket/LeaseTest.java b/src/test/java/io/reactivesocket/LeaseTest.java index 40a945186..e512a46f9 100644 --- a/src/test/java/io/reactivesocket/LeaseTest.java +++ b/src/test/java/io/reactivesocket/LeaseTest.java @@ -80,7 +80,7 @@ public void setup() throws InterruptedException { clientConnection.connectToServerConnection(serverConnection); leaseGovernor = new TestingLeaseGovernor(); - socketServer = ReactiveSocket.fromServerConnection( + socketServer = ReactiveSocketImpl.fromServerConnection( serverConnection, setup -> new RequestHandler() { @Override @@ -127,7 +127,7 @@ public Publisher handleMetadataPush(Payload payload) { } }, leaseGovernor, t -> {}); - socketClient = ReactiveSocket.fromClientConnection( + socketClient = ReactiveSocketImpl.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", HONOR_LEASE) ); diff --git a/src/test/java/io/reactivesocket/ReactiveSocketTest.java b/src/test/java/io/reactivesocket/ReactiveSocketTest.java index 3d216b294..d1b27f5f8 100644 --- a/src/test/java/io/reactivesocket/ReactiveSocketTest.java +++ b/src/test/java/io/reactivesocket/ReactiveSocketTest.java @@ -64,7 +64,7 @@ public void setup() { fireAndForgetOrMetadataPush = new CountDownLatch(1); lastServerErrorCountDown = new CountDownLatch(1); - socketServer = ReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() { + socketServer = ReactiveSocketImpl.fromServerConnection(serverConnection, setup -> new RequestHandler() { @Override public Publisher handleRequestResponse(Payload payload) { @@ -205,7 +205,7 @@ private void startSockets(int setupFlag, RequestHandler handler) throws Interrup } else if (setupFlag == HONOR_LEASE) { System.out.println("Reactivesocket configured with: HONOR_LEASE"); } - socketClient = ReactiveSocket.fromClientConnection( + socketClient = ReactiveSocketImpl.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", setupFlag), handler, @@ -264,7 +264,7 @@ public void testRequestResponse(int setupFlag) throws InterruptedException { @Test(timeout=2000, expected=IllegalStateException.class) public void testRequestResponsePremature() throws InterruptedException { - socketClient = ReactiveSocket.fromClientConnection( + socketClient = ReactiveSocketImpl.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), err -> err.printStackTrace() diff --git a/src/test/java/io/reactivesocket/TestFlowControlRequestN.java b/src/test/java/io/reactivesocket/TestFlowControlRequestN.java index 5958005ba..6c8821309 100644 --- a/src/test/java/io/reactivesocket/TestFlowControlRequestN.java +++ b/src/test/java/io/reactivesocket/TestFlowControlRequestN.java @@ -15,17 +15,6 @@ */ package io.reactivesocket; -import static io.reactivesocket.ConnectionSetupPayload.*; -import static io.reactivesocket.TestUtil.*; -import static io.reactivex.Observable.*; -import static org.junit.Assert.*; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -34,6 +23,23 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static io.reactivesocket.ConnectionSetupPayload.NO_FLAGS; +import static io.reactivesocket.TestUtil.byteToString; +import static io.reactivesocket.TestUtil.utf8EncodedPayload; +import static io.reactivex.Observable.error; +import static io.reactivex.Observable.fromPublisher; +import static io.reactivex.Observable.range; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TestFlowControlRequestN { @Test(timeout=2000) @@ -309,7 +315,7 @@ public static void setup() throws InterruptedException { clientConnection.connectToServerConnection(serverConnection, false); - socketServer = ReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() { + socketServer = ReactiveSocketImpl.fromServerConnection(serverConnection, setup -> new RequestHandler() { @Override public Publisher handleRequestStream(Payload payload) { @@ -434,7 +440,7 @@ public Publisher handleMetadataPush(Payload payload) } }, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR, Throwable::printStackTrace); - socketClient = ReactiveSocket.fromClientConnection( + socketClient = ReactiveSocketImpl.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), Throwable::printStackTrace diff --git a/src/test/java/io/reactivesocket/TestTransportRequestN.java b/src/test/java/io/reactivesocket/TestTransportRequestN.java index dd19762da..feb24c9d6 100644 --- a/src/test/java/io/reactivesocket/TestTransportRequestN.java +++ b/src/test/java/io/reactivesocket/TestTransportRequestN.java @@ -15,9 +15,12 @@ */ package io.reactivesocket; -import static io.reactivesocket.TestUtil.*; -import static io.reactivex.Observable.*; -import static org.junit.Assert.*; +import io.reactivesocket.lease.FairLeaseGovernor; +import io.reactivex.subscribers.TestSubscriber; +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; +import org.reactivestreams.Publisher; import java.io.IOException; import java.util.concurrent.CountDownLatch; @@ -25,13 +28,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.junit.After; -import org.junit.Ignore; -import org.junit.Test; -import org.reactivestreams.Publisher; - -import io.reactivesocket.lease.FairLeaseGovernor; -import io.reactivex.subscribers.TestSubscriber; +import static io.reactivesocket.TestUtil.utf8EncodedPayload; +import static io.reactivex.Observable.error; +import static io.reactivex.Observable.fromPublisher; +import static io.reactivex.Observable.interval; +import static io.reactivex.Observable.just; +import static io.reactivex.Observable.range; +import static org.junit.Assert.fail; /** * Ensure that request(n) from DuplexConnection "transport" layer is respected. @@ -169,7 +172,7 @@ public void setup(TestConnectionWithControlledRequestN clientConnection, TestCon clientConnection.connectToServerConnection(serverConnection, false); lastServerErrorCountDown = new CountDownLatch(1); - socketServer = ReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() { + socketServer = ReactiveSocketImpl.fromServerConnection(serverConnection, setup -> new RequestHandler() { @Override public Publisher handleRequestResponse(Payload payload) { @@ -215,7 +218,7 @@ public Publisher handleMetadataPush(Payload payload) { lastServerErrorCountDown.countDown(); }); - socketClient = ReactiveSocket.fromClientConnection( + socketClient = ReactiveSocketImpl.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS), err -> err.printStackTrace()); diff --git a/src/test/java/io/reactivesocket/TestUtil.java b/src/test/java/io/reactivesocket/TestUtil.java index 7cebede8a..4245f24c5 100644 --- a/src/test/java/io/reactivesocket/TestUtil.java +++ b/src/test/java/io/reactivesocket/TestUtil.java @@ -15,7 +15,7 @@ */ package io.reactivesocket; -import uk.co.real_logic.agrona.MutableDirectBuffer; +import org.agrona.MutableDirectBuffer; import java.nio.ByteBuffer; import java.nio.charset.Charset; From 9e2471e646c23efebcb4f8f286af689f31e37537 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Mon, 28 Mar 2016 13:53:05 -0700 Subject: [PATCH 3/4] updated ReactiveSocketFactory --- .../reactivesocket/ReactiveSocketFactory.java | 105 +++++++++++++++++- .../ReactiveSocketSocketAddressFactory.java | 8 ++ 2 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 src/main/java/io/reactivesocket/ReactiveSocketSocketAddressFactory.java diff --git a/src/main/java/io/reactivesocket/ReactiveSocketFactory.java b/src/main/java/io/reactivesocket/ReactiveSocketFactory.java index 6aab5bfb0..08a8445a7 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocketFactory.java +++ b/src/main/java/io/reactivesocket/ReactiveSocketFactory.java @@ -15,12 +15,111 @@ */ package io.reactivesocket; +import io.reactivesocket.internal.rx.EmptySubscription; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; -import java.net.SocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; @FunctionalInterface -public interface ReactiveSocketFactory { - Publisher call(SocketAddress address, long timeout, TimeUnit timeUnit); +public interface ReactiveSocketFactory { + + Publisher call(T t); + + /** + * Gets a socket in a blocking manner + * @param t configuration to create the reactive socket + * @return blocks on create the socket + */ + default R callAndWait(T t) { + AtomicReference reference = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + call(t) + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(R reactiveSocket) { + reference.set(reactiveSocket); + } + + @Override + public void onError(Throwable t) { + error.set(t); + latch.countDown(); + } + + @Override + public void onComplete() { + latch.countDown(); + } + }); + + if (error.get() != null) { + throw new RuntimeException(error.get()); + } else { + return reference.get(); + } + } + + /** + * + * @param t the configuration used to create the reactive socket + * @param timeout timeout + * @param timeUnit timeout units + * @param executorService ScheduledExecutorService to schedule the timeout on + * @return + */ + default Publisher call(T t, long timeout, TimeUnit timeUnit, ScheduledExecutorService executorService) { + Publisher reactiveSocketPublisher = subscriber -> { + AtomicBoolean complete = new AtomicBoolean(); + subscriber.onSubscribe(EmptySubscription.INSTANCE); + call(t) + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(R reactiveSocket) { + subscriber.onNext(reactiveSocket); + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + if (!complete.get()) { + complete.set(true); + subscriber.onComplete(); + } + } + }); + + executorService.schedule(() -> { + if (!complete.get()) { + complete.set(true); + subscriber.onError(new TimeoutException()); + } + }, timeout, timeUnit); + }; + + return reactiveSocketPublisher; + } + } diff --git a/src/main/java/io/reactivesocket/ReactiveSocketSocketAddressFactory.java b/src/main/java/io/reactivesocket/ReactiveSocketSocketAddressFactory.java new file mode 100644 index 000000000..2b4f8bd15 --- /dev/null +++ b/src/main/java/io/reactivesocket/ReactiveSocketSocketAddressFactory.java @@ -0,0 +1,8 @@ +package io.reactivesocket; + +import java.net.SocketAddress; + +@FunctionalInterface +public interface ReactiveSocketSocketAddressFactory extends ReactiveSocketFactory { + +} From ebcee116daeecedbb789e07e1c4e52968882f1bc Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Mon, 28 Mar 2016 14:06:16 -0700 Subject: [PATCH 4/4] refactored ReactiveSocketImpl to DefaultReactiveSocket --- ...ReactiveSocketImpl.java => DefaultReactiveSocket.java} | 8 ++++---- src/perf/java/io/reactivesocket/ReactiveSocketPerf.java | 4 ++-- src/test/java/io/reactivesocket/LeaseTest.java | 4 ++-- src/test/java/io/reactivesocket/ReactiveSocketTest.java | 6 +++--- .../java/io/reactivesocket/TestFlowControlRequestN.java | 4 ++-- .../java/io/reactivesocket/TestTransportRequestN.java | 4 ++-- 6 files changed, 15 insertions(+), 15 deletions(-) rename src/main/java/io/reactivesocket/{ReactiveSocketImpl.java => DefaultReactiveSocket.java} (98%) diff --git a/src/main/java/io/reactivesocket/ReactiveSocketImpl.java b/src/main/java/io/reactivesocket/DefaultReactiveSocket.java similarity index 98% rename from src/main/java/io/reactivesocket/ReactiveSocketImpl.java rename to src/main/java/io/reactivesocket/DefaultReactiveSocket.java index 82fbc23b8..67700fb70 100644 --- a/src/main/java/io/reactivesocket/ReactiveSocketImpl.java +++ b/src/main/java/io/reactivesocket/DefaultReactiveSocket.java @@ -37,7 +37,7 @@ /** * An implementation of {@link ReactiveSocket} */ -public class ReactiveSocketImpl implements ReactiveSocket { +public class DefaultReactiveSocket implements ReactiveSocket { private static final RequestHandler EMPTY_HANDLER = new RequestHandler.Builder().build(); private static final Consumer DEFAULT_ERROR_STREAM = t -> { @@ -56,7 +56,7 @@ public class ReactiveSocketImpl implements ReactiveSocket { private final ConnectionSetupHandler responderConnectionHandler; private final LeaseGovernor leaseGovernor; - private ReactiveSocketImpl( + private DefaultReactiveSocket( DuplexConnection connection, boolean isServer, ConnectionSetupPayload serverRequestorSetupPayload, @@ -112,7 +112,7 @@ public static ReactiveSocket fromClientConnection( } final RequestHandler h = handler != null ? handler : EMPTY_HANDLER; Consumer es = errorStream != null ? errorStream : DEFAULT_ERROR_STREAM; - return new ReactiveSocketImpl(connection, false, setup, h, null, NULL_LEASE_GOVERNOR, es); + return new DefaultReactiveSocket(connection, false, setup, h, null, NULL_LEASE_GOVERNOR, es); } /** @@ -172,7 +172,7 @@ public static ReactiveSocket fromServerConnection( LeaseGovernor leaseGovernor, Consumer errorConsumer ) { - return new ReactiveSocketImpl(connection, true, null, null, connectionHandler, + return new DefaultReactiveSocket(connection, true, null, null, connectionHandler, leaseGovernor, errorConsumer); } diff --git a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java index 7196da335..0e92b8732 100644 --- a/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java +++ b/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java @@ -132,10 +132,10 @@ public Publisher handleMetadataPush(Payload payload) } }; - final static ReactiveSocket serverSocket = ReactiveSocketImpl.fromServerConnection(serverConnection, setupFrame -> handler); + final static ReactiveSocket serverSocket = DefaultReactiveSocket.fromServerConnection(serverConnection, setupFrame -> handler); final static ReactiveSocket client = - ReactiveSocketImpl.fromClientConnection( + DefaultReactiveSocket.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS), t -> {}); static { diff --git a/src/test/java/io/reactivesocket/LeaseTest.java b/src/test/java/io/reactivesocket/LeaseTest.java index e512a46f9..e5a8ca272 100644 --- a/src/test/java/io/reactivesocket/LeaseTest.java +++ b/src/test/java/io/reactivesocket/LeaseTest.java @@ -80,7 +80,7 @@ public void setup() throws InterruptedException { clientConnection.connectToServerConnection(serverConnection); leaseGovernor = new TestingLeaseGovernor(); - socketServer = ReactiveSocketImpl.fromServerConnection( + socketServer = DefaultReactiveSocket.fromServerConnection( serverConnection, setup -> new RequestHandler() { @Override @@ -127,7 +127,7 @@ public Publisher handleMetadataPush(Payload payload) { } }, leaseGovernor, t -> {}); - socketClient = ReactiveSocketImpl.fromClientConnection( + socketClient = DefaultReactiveSocket.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", HONOR_LEASE) ); diff --git a/src/test/java/io/reactivesocket/ReactiveSocketTest.java b/src/test/java/io/reactivesocket/ReactiveSocketTest.java index d1b27f5f8..1b680b9fd 100644 --- a/src/test/java/io/reactivesocket/ReactiveSocketTest.java +++ b/src/test/java/io/reactivesocket/ReactiveSocketTest.java @@ -64,7 +64,7 @@ public void setup() { fireAndForgetOrMetadataPush = new CountDownLatch(1); lastServerErrorCountDown = new CountDownLatch(1); - socketServer = ReactiveSocketImpl.fromServerConnection(serverConnection, setup -> new RequestHandler() { + socketServer = DefaultReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() { @Override public Publisher handleRequestResponse(Payload payload) { @@ -205,7 +205,7 @@ private void startSockets(int setupFlag, RequestHandler handler) throws Interrup } else if (setupFlag == HONOR_LEASE) { System.out.println("Reactivesocket configured with: HONOR_LEASE"); } - socketClient = ReactiveSocketImpl.fromClientConnection( + socketClient = DefaultReactiveSocket.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", setupFlag), handler, @@ -264,7 +264,7 @@ public void testRequestResponse(int setupFlag) throws InterruptedException { @Test(timeout=2000, expected=IllegalStateException.class) public void testRequestResponsePremature() throws InterruptedException { - socketClient = ReactiveSocketImpl.fromClientConnection( + socketClient = DefaultReactiveSocket.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), err -> err.printStackTrace() diff --git a/src/test/java/io/reactivesocket/TestFlowControlRequestN.java b/src/test/java/io/reactivesocket/TestFlowControlRequestN.java index 6c8821309..46bc3c7c1 100644 --- a/src/test/java/io/reactivesocket/TestFlowControlRequestN.java +++ b/src/test/java/io/reactivesocket/TestFlowControlRequestN.java @@ -315,7 +315,7 @@ public static void setup() throws InterruptedException { clientConnection.connectToServerConnection(serverConnection, false); - socketServer = ReactiveSocketImpl.fromServerConnection(serverConnection, setup -> new RequestHandler() { + socketServer = DefaultReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() { @Override public Publisher handleRequestStream(Payload payload) { @@ -440,7 +440,7 @@ public Publisher handleMetadataPush(Payload payload) } }, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR, Throwable::printStackTrace); - socketClient = ReactiveSocketImpl.fromClientConnection( + socketClient = DefaultReactiveSocket.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), Throwable::printStackTrace diff --git a/src/test/java/io/reactivesocket/TestTransportRequestN.java b/src/test/java/io/reactivesocket/TestTransportRequestN.java index feb24c9d6..d0cda2c52 100644 --- a/src/test/java/io/reactivesocket/TestTransportRequestN.java +++ b/src/test/java/io/reactivesocket/TestTransportRequestN.java @@ -172,7 +172,7 @@ public void setup(TestConnectionWithControlledRequestN clientConnection, TestCon clientConnection.connectToServerConnection(serverConnection, false); lastServerErrorCountDown = new CountDownLatch(1); - socketServer = ReactiveSocketImpl.fromServerConnection(serverConnection, setup -> new RequestHandler() { + socketServer = DefaultReactiveSocket.fromServerConnection(serverConnection, setup -> new RequestHandler() { @Override public Publisher handleRequestResponse(Payload payload) { @@ -218,7 +218,7 @@ public Publisher handleMetadataPush(Payload payload) { lastServerErrorCountDown.countDown(); }); - socketClient = ReactiveSocketImpl.fromClientConnection( + socketClient = DefaultReactiveSocket.fromClientConnection( clientConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS), err -> err.printStackTrace());