Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ class DefaultS3OutputStream extends S3OutputStream {

@Override
public void flush(byte[] bytes, int off, int len, boolean closing) throws IOException {
SocketAccess.doPrivilegedIOException(() -> {
flushPrivileged(bytes, off, len, closing);
return null;
});
}

private void flushPrivileged(byte[] bytes, int off, int len, boolean closing) throws IOException {
if (len > MULTIPART_MAX_SIZE.getBytes()) {
throw new IOException("Unable to upload files larger than " + MULTIPART_MAX_SIZE + " to Amazon S3");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
}
try (OutputStream stream = createOutput(blobName)) {
SocketAccess.doPrivilegedIOException(() -> Streams.copy(inputStream, stream));
Streams.copy(inputStream, stream);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,49 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.Socket;
import java.security.DigestInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.Assert.assertTrue;

class MockAmazonS3 extends AbstractAmazonS3 {

private final int mockSocketPort;

private Map<String, InputStream> blobs = new ConcurrentHashMap<>();

// in ESBlobStoreContainerTestCase.java, the maximum
// length of the input data is 100 bytes
private byte[] byteCounter = new byte[100];


MockAmazonS3(int mockSocketPort) {
this.mockSocketPort = mockSocketPort;
}

// Simulate a socket connection to check that SocketAccess.doPrivileged() is used correctly.
// Any method of AmazonS3 might potentially open a socket to the S3 service. Firstly, a call
// to any method of AmazonS3 has to be wrapped by SocketAccess.doPrivileged().
// Secondly, each method on the stack from doPrivileged to opening the socket has to be
// located in a jar that is provided by the plugin.
// Thirdly, a SocketPermission has to be configured in plugin-security.policy.
// By opening a socket in each method of MockAmazonS3 it is ensured that in production AmazonS3
// is able to to open a socket to the S3 Service without causing a SecurityException
private void simulateS3SocketConnection() {
try (Socket socket = new Socket(InetAddress.getByName("127.0.0.1"), mockSocketPort)) {
assertTrue(socket.isConnected()); // NOOP to keep static analysis happy
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}


@Override
public boolean doesBucketExist(String bucket) {
return true;
Expand All @@ -63,6 +92,7 @@ public boolean doesBucketExist(String bucket) {
public ObjectMetadata getObjectMetadata(
GetObjectMetadataRequest getObjectMetadataRequest)
throws AmazonClientException, AmazonServiceException {
simulateS3SocketConnection();
String blobName = getObjectMetadataRequest.getKey();

if (!blobs.containsKey(blobName)) {
Expand All @@ -75,6 +105,7 @@ public ObjectMetadata getObjectMetadata(
@Override
public PutObjectResult putObject(PutObjectRequest putObjectRequest)
throws AmazonClientException, AmazonServiceException {
simulateS3SocketConnection();
String blobName = putObjectRequest.getKey();
DigestInputStream stream = (DigestInputStream) putObjectRequest.getInputStream();

Expand All @@ -95,6 +126,7 @@ public PutObjectResult putObject(PutObjectRequest putObjectRequest)
@Override
public S3Object getObject(GetObjectRequest getObjectRequest)
throws AmazonClientException, AmazonServiceException {
simulateS3SocketConnection();
// in ESBlobStoreContainerTestCase.java, the prefix is empty,
// so the key and blobName are equivalent to each other
String blobName = getObjectRequest.getKey();
Expand All @@ -114,6 +146,7 @@ public S3Object getObject(GetObjectRequest getObjectRequest)
@Override
public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
throws AmazonClientException, AmazonServiceException {
simulateS3SocketConnection();
MockObjectListing list = new MockObjectListing();
list.setTruncated(false);

Expand Down Expand Up @@ -147,6 +180,7 @@ public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
@Override
public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest)
throws AmazonClientException, AmazonServiceException {
simulateS3SocketConnection();
String sourceBlobName = copyObjectRequest.getSourceKey();
String targetBlobName = copyObjectRequest.getDestinationKey();

Expand All @@ -167,6 +201,7 @@ public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest)
@Override
public void deleteObject(DeleteObjectRequest deleteObjectRequest)
throws AmazonClientException, AmazonServiceException {
simulateS3SocketConnection();
String blobName = deleteObjectRequest.getKey();

if (!blobs.containsKey(blobName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,61 @@

package org.elasticsearch.repositories.s3;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.Locale;

public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase {

private static final Logger logger = Loggers.getLogger(S3BlobStoreContainerTests.class);

private static ServerSocket mockS3ServerSocket;

private static Thread mockS3AcceptorThread;

// Opens a MockSocket to simulate connections to S3 checking that SocketPermissions are set up correctly.
// See MockAmazonS3.simulateS3SocketConnection.
@BeforeClass
public static void openMockSocket() throws IOException {
mockS3ServerSocket = new MockServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
mockS3AcceptorThread = new Thread(() -> {
while (!mockS3ServerSocket.isClosed()) {
try {
// Accept connections from MockAmazonS3.
mockS3ServerSocket.accept();
} catch (IOException e) {
}
}
});
mockS3AcceptorThread.start();
}

protected BlobStore newBlobStore() throws IOException {
MockAmazonS3 client = new MockAmazonS3();
MockAmazonS3 client = new MockAmazonS3(mockS3ServerSocket.getLocalPort());
String bucket = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);

return new S3BlobStore(Settings.EMPTY, client, bucket, false,
new ByteSizeValue(10, ByteSizeUnit.MB), "public-read-write", "standard");
}

@AfterClass
public static void closeMockSocket() throws IOException, InterruptedException {
mockS3ServerSocket.close();
mockS3AcceptorThread.join();
mockS3AcceptorThread = null;
mockS3ServerSocket = null;
}
}