Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,12 @@ private void getUserTokenFromId(String userTokenId, Version tokenVersion, Action
logger.warn("failed to get access token [{}] because index [{}] is not available", userTokenId, tokensIndex.aliasName());
listener.onResponse(null);
} else {
final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME,
getTokenDocumentId(userTokenId)).request();
final Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("decode token", userTokenId, ex));
tokensIndex.checkIndexVersionThenExecute(
ex -> listener.onFailure(traceLog("prepare tokens index [" + tokensIndex.aliasName() +"]", userTokenId, ex)),
() -> {
final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME,
getTokenDocumentId(userTokenId)).request();
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("decode token", userTokenId, ex));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
Map<String, Object> accessTokenSource =
Expand Down Expand Up @@ -384,8 +383,8 @@ private void getUserTokenFromId(String userTokenId, Version tokenVersion, Action
logger.error(new ParameterizedMessage("failed to get access token [{}]", userTokenId), e);
listener.onFailure(e);
}
}), client::get);
});
}), client::get)
);
}
}

Expand Down Expand Up @@ -862,7 +861,9 @@ private void innerRefresh(String tokenDocId, Map<String, Object> source, long se
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setIfSeqNo(seqNo)
.setIfPrimaryTerm(primaryTerm);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(),
refreshedTokenIndex.prepareIndexIfNeededThenExecute(
ex -> listener.onFailure(traceLog("prepare index [" + refreshedTokenIndex.aliasName() + "]", ex)),
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(),
ActionListener.<UpdateResponse>wrap(updateResponse -> {
if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
logger.debug(() -> new ParameterizedMessage("updated the original token document to {}",
Expand Down Expand Up @@ -931,7 +932,7 @@ public void onFailure(Exception e) {
} else {
onFailure.accept(e);
}
}), client::update);
}), client::update));
}
}

Expand Down Expand Up @@ -1005,7 +1006,9 @@ private void getSupersedingTokenDocAsync(RefreshTokenStatus refreshTokenStatus,

private void getTokenDocAsync(String tokenDocId, SecurityIndexManager tokensIndex, ActionListener<GetResponse> listener) {
final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, tokenDocId).request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, listener, client::get);
tokensIndex.checkIndexVersionThenExecute(
ex -> listener.onFailure(traceLog("prepare tokens index [" + tokensIndex.aliasName() + "]", tokenDocId, ex)),
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, listener, client::get));
}

private Version getTokenVersionCompatibility() {
Expand Down Expand Up @@ -1392,10 +1395,10 @@ private void checkIfTokenIsValid(UserToken userToken, ActionListener<UserToken>
logger.warn("failed to validate access token because the index [" + tokensIndex.aliasName() + "] doesn't exist");
listener.onResponse(null);
} else {
final GetRequest getRequest = client
.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, getTokenDocumentId(userToken)).request();
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("check token state", userToken.getId(), ex));
tokensIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
final GetRequest getRequest = client
.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, getTokenDocumentId(userToken)).request();
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("check token state", userToken.getId(), ex));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
Expand Down