@@ -84,6 +84,7 @@ public CompletionStage<Void> getBody() {
8484
8585 @ Override
8686 public void onSubscribe (Flow .Subscription subscription ) {
87+ Objects .requireNonNull (subscription );
8788 if (!subscribed .compareAndSet (false , true )) {
8889 subscription .cancel ();
8990 } else {
@@ -94,6 +95,7 @@ public void onSubscribe(Flow.Subscription subscription) {
9495
9596 @ Override
9697 public void onNext (List <ByteBuffer > items ) {
98+ Objects .requireNonNull (items );
9799 for (ByteBuffer item : items ) {
98100 byte [] buf = new byte [item .remaining ()];
99101 item .get (buf );
@@ -104,6 +106,7 @@ public void onNext(List<ByteBuffer> items) {
104106
105107 @ Override
106108 public void onError (Throwable throwable ) {
109+ Objects .requireNonNull (throwable );
107110 result .completeExceptionally (throwable );
108111 }
109112
@@ -131,6 +134,7 @@ public static class PathSubscriber implements BodySubscriber<Path> {
131134 private final FilePermission [] filePermissions ;
132135 private final CompletableFuture <Path > result = new MinimalFuture <>();
133136
137+ private final AtomicBoolean subscribed = new AtomicBoolean ();
134138 private volatile Flow .Subscription subscription ;
135139 private volatile FileChannel out ;
136140
@@ -170,6 +174,12 @@ public static PathSubscriber create(Path file,
170174
171175 @ Override
172176 public void onSubscribe (Flow .Subscription subscription ) {
177+ Objects .requireNonNull (subscription );
178+ if (!subscribed .compareAndSet (false , true )) {
179+ subscription .cancel ();
180+ return ;
181+ }
182+
173183 this .subscription = subscription ;
174184 if (System .getSecurityManager () == null ) {
175185 try {
@@ -411,6 +421,7 @@ public int read() throws IOException {
411421
412422 @ Override
413423 public void onSubscribe (Flow .Subscription s ) {
424+ Objects .requireNonNull (s );
414425 try {
415426 if (!subscribed .compareAndSet (false , true )) {
416427 s .cancel ();
@@ -539,6 +550,7 @@ public NullSubscriber(Optional<T> result) {
539550
540551 @ Override
541552 public void onSubscribe (Flow .Subscription subscription ) {
553+ Objects .requireNonNull (subscription );
542554 if (!subscribed .compareAndSet (false , true )) {
543555 subscription .cancel ();
544556 } else {
@@ -553,6 +565,7 @@ public void onNext(List<ByteBuffer> items) {
553565
554566 @ Override
555567 public void onError (Throwable throwable ) {
568+ Objects .requireNonNull (throwable );
556569 cf .completeExceptionally (throwable );
557570 }
558571
@@ -819,13 +832,21 @@ private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
819832 }
820833 }
821834
835+ private final AtomicBoolean subscribed = new AtomicBoolean ();
836+
822837 @ Override
823838 public void onSubscribe (Flow .Subscription subscription ) {
824- subscriptionCF .complete (subscription );
839+ Objects .requireNonNull (subscription );
840+ if (!subscribed .compareAndSet (false , true )) {
841+ subscription .cancel ();
842+ } else {
843+ subscriptionCF .complete (subscription );
844+ }
825845 }
826846
827847 @ Override
828848 public void onNext (List <ByteBuffer > item ) {
849+ Objects .requireNonNull (item );
829850 try {
830851 // cannot be called before onSubscribe()
831852 assert subscriptionCF .isDone ();
@@ -853,6 +874,7 @@ assert suppress(subscriptionCF.isDone(),
853874 // onError can be called before request(1), and therefore can
854875 // be called before subscriberRef is set.
855876 signalError (throwable );
877+ Objects .requireNonNull (throwable );
856878 }
857879
858880 @ Override
0 commit comments