Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;

/**
* Simple utility methods for file and stream copying.
Expand All @@ -34,19 +33,23 @@
*/
public class Streams {

private static final ThreadLocal<byte[]> buffer = ThreadLocal.withInitial(() -> new byte[8 * 1024]);

private Streams() {

}

/**
* Copy the contents of the given InputStream to the given OutputStream.
* Closes both streams when done.
* Copy the contents of the given InputStream to the given OutputStream. Optionally, closes both streams when done.
*
* @param in the stream to copy from
* @param out the stream to copy to
* @param in the stream to copy from
* @param out the stream to copy to
* @param close whether to close both streams after copying
* @param buffer buffer to use for copying
* @return the number of bytes copied
* @throws IOException in case of I/O errors
*/
public static long copy(final InputStream in, final OutputStream out) throws IOException {
Objects.requireNonNull(in, "No InputStream specified");
Objects.requireNonNull(out, "No OutputStream specified");
final byte[] buffer = new byte[8192];
public static long copy(final InputStream in, final OutputStream out, byte[] buffer, boolean close) throws IOException {
Exception err = null;
try {
long byteCount = 0;
Expand All @@ -61,7 +64,30 @@ public static long copy(final InputStream in, final OutputStream out) throws IOE
err = e;
throw e;
} finally {
IOUtils.close(err, in, out);
if (close) {
IOUtils.close(err, in, out);
}
}
}

/**
* @see #copy(InputStream, OutputStream, byte[], boolean)
*/
public static long copy(final InputStream in, final OutputStream out, boolean close) throws IOException {
return copy(in, out, buffer.get(), close);
}

/**
* @see #copy(InputStream, OutputStream, byte[], boolean)
*/
public static long copy(final InputStream in, final OutputStream out, byte[] buffer) throws IOException {
return copy(in, out, buffer, true);
}

/**
* @see #copy(InputStream, OutputStream, byte[], boolean)
*/
public static long copy(final InputStream in, final OutputStream out) throws IOException {
return copy(in, out, buffer.get(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPathBasedFilter;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;

import java.io.BufferedInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -349,7 +349,7 @@ public void writeRawField(String name, InputStream content, XContentType content
} else {
writeStartRaw(name);
flush();
copyStream(content, os);
Streams.copy(content, os);
writeEndRaw();
}
}
Expand All @@ -364,24 +364,11 @@ public void writeRawValue(InputStream stream, XContentType xContentType) throws
generator.writeRaw(':');
}
flush();
transfer(stream, os);
Streams.copy(stream, os, false);
writeEndRaw();
}
}

// A basic copy of Java 9's InputStream#transferTo
private static long transfer(InputStream in, OutputStream out) throws IOException {
Objects.requireNonNull(out, "out");
long transferred = 0;
byte[] buffer = new byte[8192];
int read;
while ((read = in.read(buffer, 0, 8192)) >= 0) {
out.write(buffer, 0, read);
transferred += read;
}
return transferred;
}

private boolean mayWriteRawData(XContentType contentType) {
// When the current generator is filtered (ie filter != null)
// or the content is in a different format than the current generator,
Expand Down Expand Up @@ -480,37 +467,4 @@ public void close() throws IOException {
public boolean isClosed() {
return generator.isClosed();
}

/**
* Copy the contents of the given InputStream to the given OutputStream.
* Closes both streams when done.
*
* @param in the stream to copy from
* @param out the stream to copy to
* @return the number of bytes copied
* @throws IOException in case of I/O errors
*/
private static long copyStream(InputStream in, OutputStream out) throws IOException {
Objects.requireNonNull(in, "No InputStream specified");
Objects.requireNonNull(out, "No OutputStream specified");
final byte[] buffer = new byte[8192];
boolean success = false;
try {
long byteCount = 0;
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
byteCount += bytesRead;
}
out.flush();
success = true;
return byteCount;
} finally {
if (success) {
IOUtils.close(in, out);
} else {
IOUtils.closeWhileHandlingException(in, out);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long
* It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy
* is in the stacktrace and is not granted the permissions needed to close and write the channel.
*/
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
org.elasticsearch.core.internal.io.Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {

@SuppressForbidden(reason = "channel is based on a socket")
@Override
Expand Down Expand Up @@ -350,7 +350,7 @@ private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long
throws IOException {
assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method";
final byte[] buffer = new byte[Math.toIntExact(blobSize)];
org.elasticsearch.common.io.Streams.readFully(inputStream, buffer);
Streams.readFully(inputStream, buffer);
try {
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public InputStream readBlob(String blobName, long position, long length) throws
channel.position(position);
}
assert channel.position() == position;
return org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length);
return Streams.limitStream(Channels.newInputStream(channel), length);
}

@Override
Expand Down Expand Up @@ -212,7 +212,8 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream
private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
final int bufferSize = blobStore.bufferSizeInBytes();
Streams.copy(inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
org.elasticsearch.core.internal.io.Streams.copy(
inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
}
IOUtils.fsync(tempBlobPath, false);
}
Expand Down
49 changes: 4 additions & 45 deletions server/src/main/java/org/elasticsearch/common/io/Streams.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,45 +65,6 @@ public void write(byte[] b, int off, int len) {
}
};

//---------------------------------------------------------------------
// Copy methods for java.io.InputStream / java.io.OutputStream
//---------------------------------------------------------------------


public static long copy(InputStream in, OutputStream out) throws IOException {
return copy(in, out, new byte[BUFFER_SIZE]);
}

/**
* Copy the contents of the given InputStream to the given OutputStream.
* Closes both streams when done.
*
* @param in the stream to copy from
* @param out the stream to copy to
* @return the number of bytes copied
* @throws IOException in case of I/O errors
*/
public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
Objects.requireNonNull(in, "No InputStream specified");
Objects.requireNonNull(out, "No OutputStream specified");
// Leverage try-with-resources to close in and out so that exceptions in close() are either propagated or added as suppressed
// exceptions to the main exception
try (InputStream in2 = in; OutputStream out2 = out) {
return doCopy(in2, out2, buffer);
}
}

private static long doCopy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
long byteCount = 0;
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
byteCount += bytesRead;
}
out.flush();
return byteCount;
}

/**
* Copy the contents of the given byte array to the given OutputStream.
* Closes the stream when done.
Expand Down Expand Up @@ -222,7 +183,7 @@ public static int readFully(InputStream reader, byte[] dest, int offset, int len
* Fully consumes the input stream, throwing the bytes away. Returns the number of bytes consumed.
*/
public static long consumeFully(InputStream inputStream) throws IOException {
return copy(inputStream, NULL_OUTPUT_STREAM);
return org.elasticsearch.core.internal.io.Streams.copy(inputStream, NULL_OUTPUT_STREAM);
}

public static List<String> readAllLines(InputStream input) throws IOException {
Expand Down Expand Up @@ -267,11 +228,9 @@ public static BytesStream flushOnCloseStream(BytesStream os) {
* Reads all bytes from the given {@link InputStream} and closes it afterwards.
*/
public static BytesReference readFully(InputStream in) throws IOException {
try (InputStream inputStream = in) {
BytesStreamOutput out = new BytesStreamOutput();
copy(inputStream, out);
return out.bytes();
}
BytesStreamOutput out = new BytesStreamOutput();
org.elasticsearch.core.internal.io.Streams.copy(in, out);
return out.bytes();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testLimitInputStream() throws IOException {
final int limit = randomIntBetween(0, bytes.length);
final BytesArray stuffArray = new BytesArray(bytes);
final ByteArrayOutputStream out = new ByteArrayOutputStream(bytes.length);
final long count = Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out);
final long count = org.elasticsearch.core.internal.io.Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out);
assertEquals(limit, count);
assertThat(Arrays.equals(out.toByteArray(), Arrays.copyOf(bytes, limit)), equalTo(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
Expand All @@ -70,6 +69,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
Expand Down