Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8aee1c6
Update YAML Rest tests to check for product header on all responses.
jbaiera Jan 28, 2022
f05df00
Preserve thread context when waiting for snapshot completion
jbaiera Jan 28, 2022
8cab8c0
Update docs/changelog/83290.yaml
jbaiera Jan 28, 2022
fc4a875
Preserve context on restore snapshot calls
jbaiera Jan 31, 2022
c96b8f4
Preserve thread context when completing remote reindex operations
jbaiera Jan 31, 2022
3ae6947
Preserve thread context when putting followers
jbaiera Jan 31, 2022
6b2b152
Preserve thread context when iterating over xpack usage results
jbaiera Jan 31, 2022
5a33c69
Fix code formatting
jbaiera Jan 31, 2022
3cac42c
Restore thread context on unfollow action listener completion
jbaiera Jan 31, 2022
45271b2
Fix DoSection test
jbaiera Feb 1, 2022
d6a43c2
Merge branch 'master' into fix-require-all-response-with-product-header
elasticmachine Feb 1, 2022
2680ddf
disable bwc tests until fixes are backported
jbaiera Feb 2, 2022
0a6b864
Disable some more BWC tests
jbaiera Feb 2, 2022
65e88fb
Rework the transport xpack usage action listener logic to not need co…
jbaiera Feb 4, 2022
4dd834e
Move the context preservation for cluster applier listeners down to t…
jbaiera Feb 4, 2022
e1634a9
more bwc tests
jbaiera Feb 4, 2022
38c7253
appease style checker
jbaiera Feb 4, 2022
8f275d0
Update listener name, fix removeListener method
jbaiera Feb 7, 2022
7266f9a
Merge branch 'master' into fix-require-all-response-with-product-header
jbaiera Feb 7, 2022
67f7591
checkstyle
jbaiera Feb 7, 2022
74908bd
assert unique listeners, fix precommit again
jbaiera Feb 7, 2022
c5e4729
Revert cluster state listener context change. Add javadoc note
jbaiera Feb 8, 2022
3282b87
Fix typo
jbaiera Feb 8, 2022
071944d
Fix the watcher usage action stashing context
jbaiera Feb 8, 2022
763bcd4
Simplify bwc mute
jbaiera Feb 8, 2022
9593279
Correctly range the bwc versions
jbaiera Feb 9, 2022
073dbe6
Mute tsdb test until backport
jbaiera Feb 10, 2022
deb966a
Merge branch 'master' into fix-require-all-response-with-product-header
elasticmachine Feb 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/83290.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83290
summary: Update YAML Rest tests to check for product header on all responses
area: Infra/REST API
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ protected void finishHim(Exception failure) {
*/
protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
logger.debug("[{}]: finishing without any catastrophic failures", task.getId());
scrollSource.close(() -> {
scrollSource.close(threadPool.getThreadContext().preserveContext(() -> {
if (failure == null) {
BulkByScrollResponse response = buildResponse(
timeValueNanos(System.nanoTime() - startTime.get()),
Expand All @@ -605,7 +605,7 @@ protected void finishHim(Exception failure, List<Failure> indexingFailures, List
} else {
listener.onFailure(failure);
}
});
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
$/
---
"Test cat snapshots output":
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
repository: test_cat_snapshots_1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down Expand Up @@ -61,6 +64,7 @@ setup:

---
"Get snapshot info when verbose is false":

- do:
indices.create:
index: test_index
Expand Down Expand Up @@ -198,7 +202,6 @@ setup:
- skip:
version: " - 7.12.99"
reason: "Introduced in 7.13.0"

- do:
indices.create:
index: test_index
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"
---
"Get repository returns UUID":
- skip:
version: " - 7.12.99"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
repository: test_repo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;

import java.util.function.Supplier;

import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;

public class RestoreClusterStateListener implements ClusterStateListener {
Expand All @@ -29,43 +32,48 @@ public class RestoreClusterStateListener implements ClusterStateListener {
private final ClusterService clusterService;
private final String uuid;
private final ActionListener<RestoreSnapshotResponse> listener;
private final Supplier<ThreadContext.StoredContext> contextSupplier;

private RestoreClusterStateListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener
ActionListener<RestoreSnapshotResponse> listener,
Supplier<ThreadContext.StoredContext> contextSupplier
) {
this.clusterService = clusterService;
this.uuid = response.getUuid();
this.listener = listener;
this.contextSupplier = contextSupplier;
}

@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards)
);
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
try (ThreadContext.StoredContext stored = contextSupplier.get()) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards)
);
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
}
}
}

Expand All @@ -76,8 +84,11 @@ public void clusterChanged(ClusterChangedEvent changedEvent) {
public static void createAndRegisterListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener
ActionListener<RestoreSnapshotResponse> listener,
ThreadContext threadContext
) {
clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener));
clusterService.addListener(
new RestoreClusterStateListener(clusterService, response, listener, threadContext.newRestorableContext(true))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ protected void masterOperation(
) {
restoreService.restoreSnapshot(request, listener.delegateFailure((delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
threadPool.getThreadContext()
);
} else {
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void removeApplier(ClusterStateApplier applier) {
}

/**
* Add a listener for updated cluster states
* Add a listener for updated cluster states. Listeners are executed in the system thread context.
*/
public void addListener(ClusterStateListener listener) {
clusterStateListeners.add(listener);
Expand All @@ -222,7 +222,7 @@ public void addListener(ClusterStateListener listener) {
/**
* Removes a listener for updated cluster states.
*/
public void removeListener(ClusterStateListener listener) {
public void removeListener(final ClusterStateListener listener) {
clusterStateListeners.remove(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -2957,7 +2958,8 @@ static Map<String, DataStreamAlias> filterDataStreamAliases(
* @param listener listener
*/
private void addListener(Snapshot snapshot, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener);
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>())
.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,20 @@ public String getReasonPhrase() {
* Get a list of all of the values of all warning headers returned in the response.
*/
public List<String> getWarningHeaders() {
List<String> warningHeaders = new ArrayList<>();
return getHeaders("Warning");
}

/**
* Get a list of all the values of a given header returned in the response.
*/
public List<String> getHeaders(String name) {
List<String> headers = new ArrayList<>();
for (Header header : response.getHeaders()) {
if (header.getName().equals("Warning")) {
warningHeaders.add(header.getValue());
if (header.getName().equalsIgnoreCase(name)) {
headers.add(header.getValue());
}
}
return warningHeaders;
return headers;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ public void execute(ClientYamlTestExecutionContext executionContext) throws IOEx
final String testPath = executionContext.getClientYamlTestCandidate() != null
? executionContext.getClientYamlTestCandidate().getTestPath()
: null;
checkElasticProductHeader(response.getHeaders("X-elastic-product"));
checkWarningHeaders(response.getWarningHeaders(), testPath);
} catch (ClientYamlTestResponseException e) {
ClientYamlTestResponse restTestResponse = e.getRestTestResponse();
Expand All @@ -392,6 +393,31 @@ public void execute(ClientYamlTestExecutionContext executionContext) throws IOEx
}
}

void checkElasticProductHeader(final List<String> productHeaders) {
if (productHeaders.isEmpty()) {
fail("Response is missing required X-Elastic-Product response header");
}
boolean headerPresent = false;
final List<String> unexpected = new ArrayList<>();
for (String header : productHeaders) {
if (header.equals("Elasticsearch")) {
headerPresent = true;
break;
} else {
unexpected.add(header);
}
}
if (headerPresent == false) {
StringBuilder failureMessage = new StringBuilder();
appendBadHeaders(
failureMessage,
unexpected,
"did not get expected product header [Elasticsearch], found header" + (unexpected.size() > 1 ? "s" : "")
);
fail(failureMessage.toString());
}
}

void checkWarningHeaders(final List<String> warningHeaders) {
checkWarningHeaders(warningHeaders, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ public void testNodeSelectorByVersion() throws IOException {
doSection.getApiCallSection().getNodeSelector()
)
).thenReturn(mockResponse);
when(mockResponse.getHeaders("X-elastic-product")).thenReturn(List.of("Elasticsearch"));
doSection.execute(context);
verify(context).callApi(
"indices.get_field_mapping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public void onFailure(Exception e) {
assert restoreInfo.failedShards() > 0 : "Should have failed shards";
delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
}
})
}),
threadPool.getThreadContext()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
Expand Down Expand Up @@ -178,10 +179,16 @@ private void removeRetentionLeaseForShard(
) {
logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId);
final ThreadContext threadContext = threadPool.getThreadContext();
// We're about to stash the thread context for this retention lease removal. The listener will be completed while the
// context is stashed. The context needs to be restored in the listener when it is completing or else it is simply wiped.
final ActionListener<ActionResponse.Empty> preservedListener = new ContextPreservingActionListener<>(
threadContext.newRestorableContext(true),
listener
);
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
// we have to execute under the system context so that if security is enabled the removal is authorized
threadContext.markAsSystemContext();
CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener);
CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, preservedListener);
}
}

Expand Down
Loading