1414import org .elasticsearch .client .Client ;
1515import org .elasticsearch .client .FilterClient ;
1616import org .elasticsearch .common .util .concurrent .ThreadContext ;
17+ import org .elasticsearch .common .util .set .Sets ;
18+ import org .elasticsearch .xpack .core .security .authc .AuthenticationField ;
19+ import org .elasticsearch .xpack .core .security .authc .AuthenticationServiceField ;
1720
21+ import java .util .Map ;
22+ import java .util .Set ;
1823import java .util .function .BiConsumer ;
1924import java .util .function .Supplier ;
25+ import java .util .stream .Collectors ;
2026
2127/**
2228 * Utility class to help with the execution of requests made using a {@link Client} such that they
2329 * have the origin as a transient and listeners have the appropriate context upon invocation
2430 */
2531public final class ClientHelper {
2632
33+ /**
34+ * List of headers that are related to security
35+ */
36+ public static final Set <String > SECURITY_HEADER_FILTERS = Sets .newHashSet (AuthenticationServiceField .RUN_AS_USER_HEADER ,
37+ AuthenticationField .AUTHENTICATION_KEY );
38+
2739 public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin" ;
2840 public static final String SECURITY_ORIGIN = "security" ;
2941 public static final String WATCHER_ORIGIN = "watcher" ;
@@ -78,6 +90,82 @@ RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
7890 }
7991 }
8092
93+ /**
94+ * Execute a client operation and return the response, try to run an action
95+ * with least privileges, when headers exist
96+ *
97+ * @param headers
98+ * Request headers, ideally including security headers
99+ * @param origin
100+ * The origin to fall back to if there are no security headers
101+ * @param client
102+ * The client used to query
103+ * @param supplier
104+ * The action to run
105+ * @return An instance of the response class
106+ */
107+ public static <T extends ActionResponse > T executeWithHeaders (Map <String , String > headers , String origin , Client client ,
108+ Supplier <T > supplier ) {
109+ Map <String , String > filteredHeaders = headers .entrySet ().stream ().filter (e -> SECURITY_HEADER_FILTERS .contains (e .getKey ()))
110+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
111+
112+ // no security headers, we will have to use the xpack internal user for
113+ // our execution by specifying the origin
114+ if (filteredHeaders .isEmpty ()) {
115+ try (ThreadContext .StoredContext ignore = stashWithOrigin (client .threadPool ().getThreadContext (), origin )) {
116+ return supplier .get ();
117+ }
118+ } else {
119+ try (ThreadContext .StoredContext ignore = client .threadPool ().getThreadContext ().stashContext ()) {
120+ client .threadPool ().getThreadContext ().copyHeaders (filteredHeaders .entrySet ());
121+ return supplier .get ();
122+ }
123+ }
124+ }
125+
126+ /**
127+ * Execute a client operation asynchronously, try to run an action with
128+ * least privileges, when headers exist
129+ *
130+ * @param headers
131+ * Request headers, ideally including security headers
132+ * @param origin
133+ * The origin to fall back to if there are no security headers
134+ * @param action
135+ * The action to execute
136+ * @param request
137+ * The request object for the action
138+ * @param listener
139+ * The listener to call when the action is complete
140+ */
141+ public static <Request extends ActionRequest , Response extends ActionResponse ,
142+ RequestBuilder extends ActionRequestBuilder <Request , Response , RequestBuilder >> void executeWithHeadersAsync (
143+ Map <String , String > headers , String origin , Client client , Action <Request , Response , RequestBuilder > action , Request request ,
144+ ActionListener <Response > listener ) {
145+
146+ Map <String , String > filteredHeaders = headers .entrySet ().stream ().filter (e -> SECURITY_HEADER_FILTERS .contains (e .getKey ()))
147+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
148+
149+ final ThreadContext threadContext = client .threadPool ().getThreadContext ();
150+
151+ // No headers (e.g. security not installed/in use) so execute as origin
152+ if (filteredHeaders .isEmpty ()) {
153+ ClientHelper .executeAsyncWithOrigin (client , origin , action , request , listener );
154+ } else {
155+ // Otherwise stash the context and copy in the saved headers before executing
156+ final Supplier <ThreadContext .StoredContext > supplier = threadContext .newRestorableContext (false );
157+ try (ThreadContext .StoredContext ignore = stashWithHeaders (threadContext , filteredHeaders )) {
158+ client .execute (action , request , new ContextPreservingActionListener <>(supplier , listener ));
159+ }
160+ }
161+ }
162+
163+ private static ThreadContext .StoredContext stashWithHeaders (ThreadContext threadContext , Map <String , String > headers ) {
164+ final ThreadContext .StoredContext storedContext = threadContext .stashContext ();
165+ threadContext .copyHeaders (headers .entrySet ());
166+ return storedContext ;
167+ }
168+
81169 private static final class ClientWithOrigin extends FilterClient {
82170
83171 private final String origin ;
@@ -98,5 +186,4 @@ RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
98186 }
99187 }
100188 }
101-
102189}
0 commit comments