Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d996d4e
WIP
Tim-Brooks Jan 2, 2019
8471837
WIP
Tim-Brooks Jan 3, 2019
ee37b25
Work on restore process
Tim-Brooks Jan 3, 2019
cce5c99
WIP
Tim-Brooks Jan 3, 2019
11ce19c
WIP
Tim-Brooks Jan 3, 2019
6af7ec0
Fix checkstyle
Tim-Brooks Jan 4, 2019
f774c28
Change
Tim-Brooks Jan 4, 2019
359ad5b
Fix logger check
Tim-Brooks Jan 4, 2019
2031fd2
Merge remote-tracking branch 'upstream/master' into ccr_file_recovery
Tim-Brooks Jan 4, 2019
e18df0a
Changes
Tim-Brooks Jan 4, 2019
da5b2e7
Add basic ref counting
Tim-Brooks Jan 4, 2019
2209eac
Extract context
Tim-Brooks Jan 5, 2019
79a53e0
Use in ccr
Tim-Brooks Jan 5, 2019
142800d
Add file reader
Tim-Brooks Jan 6, 2019
6ddaa00
Merge remote-tracking branch 'upstream/master' into ccr_file_recovery
Tim-Brooks Jan 7, 2019
478e25c
WIP
Tim-Brooks Jan 7, 2019
0eba9d7
Cache file
Tim-Brooks Jan 7, 2019
0e56995
Work in progress
Tim-Brooks Jan 7, 2019
8083076
Checkstyle
Tim-Brooks Jan 7, 2019
6c011bc
Fix blob store repository
Tim-Brooks Jan 7, 2019
be5f0f9
Merge remote-tracking branch 'upstream/master' into ccr_file_recovery
Tim-Brooks Jan 7, 2019
47d7576
Add descriptor
Tim-Brooks Jan 7, 2019
bcef903
Changes
Tim-Brooks Jan 8, 2019
483f5c7
Chnages
Tim-Brooks Jan 8, 2019
d09454c
Merge remote-tracking branch 'upstream/master' into ccr_file_recovery
Tim-Brooks Jan 9, 2019
83a1c65
Checkstyle
Tim-Brooks Jan 9, 2019
7835006
Changes
Tim-Brooks Jan 9, 2019
c40f242
Merge remote-tracking branch 'upstream/master' into ccr_file_recovery
Tim-Brooks Jan 9, 2019
a201e93
Small changes
Tim-Brooks Jan 11, 2019
083b9ae
Changes
Tim-Brooks Jan 11, 2019
cf35fcd
Changes
Tim-Brooks Jan 11, 2019
fa59f49
no recursive dependencies
ywelsch Jan 11, 2019
d872334
Changes
Tim-Brooks Jan 12, 2019
92435b0
Don't swallow
Tim-Brooks Jan 12, 2019
2bfa3e1
Changes
Tim-Brooks Jan 14, 2019
9005245
Merge remote-tracking branch 'upstream/master' into ccr_file_recovery
Tim-Brooks Jan 14, 2019
cb1d455
Merge remote-tracking branch 'upstream/master' into ccr_file_recovery
Tim-Brooks Jan 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.PageCacheRecycler;

Expand All @@ -35,17 +34,15 @@ public class PagedBytesReference extends BytesReference {

private static final int PAGE_SIZE = PageCacheRecycler.BYTE_PAGE_SIZE;

private final BigArrays bigarrays;
protected final ByteArray byteArray;
private final ByteArray byteArray;
private final int offset;
private final int length;

public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
this(bigarrays, byteArray, 0, length);
public PagedBytesReference(ByteArray byteArray, int length) {
this(byteArray, 0, length);
}

public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int from, int length) {
this.bigarrays = bigarrays;
private PagedBytesReference(ByteArray byteArray, int from, int length) {
this.byteArray = byteArray;
this.offset = from;
this.length = length;
Expand All @@ -67,7 +64,7 @@ public BytesReference slice(int from, int length) {
throw new IllegalArgumentException("can't slice a buffer with length [" + length() +
"], with slice parameters from [" + from + "], length [" + length + "]");
}
return new PagedBytesReference(bigarrays, byteArray, offset + from, length);
return new PagedBytesReference(byteArray, offset + from, length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;

/**
Expand All @@ -32,9 +31,8 @@ public final class ReleasablePagedBytesReference extends PagedBytesReference imp

private final Releasable releasable;

public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length,
Releasable releasable) {
super(bigarrays, byteArray, length);
public ReleasablePagedBytesReference(ByteArray byteArray, int length, Releasable releasable) {
super(byteArray, length);
this.releasable = releasable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public int size() {

@Override
public BytesReference bytes() {
return new PagedBytesReference(bigArrays, bytes, count);
return new PagedBytesReference(bytes, count);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
*/
@Override
public ReleasablePagedBytesReference bytes() {
return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable);
return new ReleasablePagedBytesReference(bytes, count, releasable);
}

@Override
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ public void testEquals() {
}

// get refs & compare
BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length);
BytesReference pbr2 = new PagedBytesReference(bigarrays, ba2, length);
BytesReference pbr = new PagedBytesReference(ba1, length);
BytesReference pbr2 = new PagedBytesReference(ba2, length);
assertEquals(pbr, pbr2);
int offsetToFlip = randomIntBetween(0, length - 1);
int value = ~Byte.toUnsignedInt(ba1.get(offsetToFlip));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ public void testEquals() throws IOException {
public void testSliceEquals() {
int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2, 5));
ByteArray ba1 = bigarrays.newByteArray(length, false);
BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length);
BytesReference pbr = new PagedBytesReference(ba1, length);

// test equality of slices
int sliceFrom = randomIntBetween(0, pbr.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,16 @@ protected IndexShard newStartedShard(final boolean primary) throws IOException {
return newStartedShard(primary, Settings.EMPTY, new InternalEngineFactory());
}

/**
* Creates a new empty shard and starts it.
*
* @param primary controls whether the shard will be a primary or a replica.
* @param settings the settings to use for this shard
*/
protected IndexShard newStartedShard(final boolean primary, Settings settings) throws IOException {
return newStartedShard(primary, settings, new InternalEngineFactory());
}

/**
* Creates a new empty shard with the specified settings and engine factory and starts it.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
Expand Down Expand Up @@ -193,6 +194,8 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),
new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE,
ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class),
new ActionHandler<>(GetCcrRestoreFileChunkAction.INSTANCE,
GetCcrRestoreFileChunkAction.TransportGetCcrRestoreFileChunkAction.class),
// stats action
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;

public class GetCcrRestoreFileChunkAction extends Action<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> {

public static final GetCcrRestoreFileChunkAction INSTANCE = new GetCcrRestoreFileChunkAction();
public static final String NAME = "internal:admin/ccr/restore/file_chunk/get";

private GetCcrRestoreFileChunkAction() {
super(NAME);
}

@Override
public GetCcrRestoreFileChunkResponse newResponse() {
throw new UnsupportedOperationException();
}

@Override
public Writeable.Reader<GetCcrRestoreFileChunkResponse> getResponseReader() {
return GetCcrRestoreFileChunkResponse::new;
}


public static class TransportGetCcrRestoreFileChunkAction
extends HandledTransportAction<GetCcrRestoreFileChunkRequest, GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> {

private final CcrRestoreSourceService restoreSourceService;
private final ThreadPool threadPool;
private final BigArrays bigArrays;

@Inject
public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters,
CcrRestoreSourceService restoreSourceService) {
super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new);
TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new);
this.threadPool = transportService.getThreadPool();
this.restoreSourceService = restoreSourceService;
this.bigArrays = bigArrays;
}

@Override
protected void doExecute(Task task, GetCcrRestoreFileChunkRequest request,
ActionListener<GetCcrRestoreFileChunkResponse> listener) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
protected void doRun() throws Exception {
int bytesRequested = request.getSize();
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
String fileName = request.getFileName();
String sessionUUID = request.getSessionUUID();
// This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
// structure on the same thread. So the bytes will be copied before the reference is released.
try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
long offsetBeforeRead = offsetAfterRead - reference.length();
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
}
}
}
});
}
}

public static class GetCcrRestoreFileChunkResponse extends ActionResponse {

private final long offset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offset is typically the beginning of the range that is sent. Having it different here is a potential source for confusion in the future. I see two options: 1) Name it differently or 2) make it the offset that represents the beginning of the range that is sent. I'm strongly favoring 2), which is a small change to make, and provides the same validation on the receiver.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with number 2.

private final BytesReference chunk;

GetCcrRestoreFileChunkResponse(StreamInput streamInput) throws IOException {
super(streamInput);
offset = streamInput.readVLong();
chunk = streamInput.readBytesReference();
}

GetCcrRestoreFileChunkResponse(long offset, BytesReference chunk) {
this.offset = offset;
this.chunk = chunk;
}

public long getOffset() {
return offset;
}

public BytesReference getChunk() {
return chunk;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(offset);
out.writeBytesReference(chunk);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.RemoteClusterAwareRequest;

import java.io.IOException;

public class GetCcrRestoreFileChunkRequest extends ActionRequest implements RemoteClusterAwareRequest {

private final DiscoveryNode node;
private final String sessionUUID;
private final String fileName;
private final int size;

@Override
public ActionRequestValidationException validate() {
return null;
}

public GetCcrRestoreFileChunkRequest(DiscoveryNode node, String sessionUUID, String fileName, int size) {
this.node = node;
this.sessionUUID = sessionUUID;
this.fileName = fileName;
this.size = size;
assert size > -1 : "The file chunk request size must be positive. Found: [" + size + "].";
}

GetCcrRestoreFileChunkRequest(StreamInput in) throws IOException {
super(in);
node = null;
sessionUUID = in.readString();
fileName = in.readString();
size = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sessionUUID);
out.writeString(fileName);
out.writeVInt(size);
}

@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
}

String getSessionUUID() {
return sessionUUID;
}

String getFileName() {
return fileName;
}

int getSize() {
return size;
}

@Override
public DiscoveryNode getPreferredTargetNode() {
assert node != null : "Target node is null";
return node;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe assert here that node is not-null?

}
}
Loading