1212import org .elasticsearch .action .search .SearchResponse ;
1313import org .elasticsearch .action .support .ActionFilters ;
1414import org .elasticsearch .action .support .HandledTransportAction ;
15+ import org .elasticsearch .client .internal .Client ;
16+ import org .elasticsearch .client .internal .ParentTaskAssigningClient ;
1517import org .elasticsearch .client .internal .node .NodeClient ;
1618import org .elasticsearch .common .inject .Inject ;
1719import org .elasticsearch .common .util .Maps ;
2224import org .elasticsearch .search .aggregations .metrics .SumAggregationBuilder ;
2325import org .elasticsearch .tasks .Task ;
2426import org .elasticsearch .transport .TransportService ;
25- import org .elasticsearch .xpack . profiler . utils . MapExtractor ;
27+ import org .elasticsearch .xcontent . ObjectPath ;
2628
2729import java .util .Arrays ;
2830import java .util .HashMap ;
3436public class TransportGetProfilingAction extends HandledTransportAction <GetProfilingRequest , GetProfilingResponse > {
3537 private final NodeClient nodeClient ;
3638 private final ThreadContext threadContext ;
39+ private final TransportService transportService ;
3740
3841 @ Inject
3942 public TransportGetProfilingAction (TransportService transportService , ActionFilters actionFilters , NodeClient nodeClient ) {
4043 super (GetProfilingAction .NAME , transportService , actionFilters , GetProfilingRequest ::new );
4144 this .nodeClient = nodeClient ;
45+ this .transportService = transportService ;
4246 this .threadContext = transportService .getThreadPool ().getThreadContext ();
4347 }
4448
4549 @ Override
4650 protected void doExecute (Task submitTask , GetProfilingRequest request , ActionListener <GetProfilingResponse > submitListener ) {
4751 try (var ignored = threadContext .newTraceContext ()) {
52+ Client client = new ParentTaskAssigningClient (this .nodeClient , transportService .getLocalNode (), submitTask );
4853 EventsIndex mediumDownsampled = EventsIndex .MEDIUM_DOWNSAMPLED ;
49- this . nodeClient .prepareSearch (mediumDownsampled .getName ())
54+ client .prepareSearch (mediumDownsampled .getName ())
5055 .setSize (0 )
5156 .setQuery (request .getQuery ())
5257 .setTrackTotalHits (true )
@@ -55,7 +60,7 @@ protected void doExecute(Task submitTask, GetProfilingRequest request, ActionLis
5560 public void onResponse (SearchResponse searchResponse ) {
5661 long sampleCount = searchResponse .getHits ().getTotalHits ().value ;
5762 EventsIndex resampledIndex = mediumDownsampled .getResampledIndex (request .getSampleSize (), sampleCount );
58- searchEventGroupByStackTrace (request , resampledIndex , submitListener );
63+ searchEventGroupByStackTrace (client , request , resampledIndex , submitListener );
5964 }
6065
6166 @ Override
@@ -67,12 +72,13 @@ public void onFailure(Exception e) {
6772 }
6873
6974 private void searchEventGroupByStackTrace (
75+ Client client ,
7076 GetProfilingRequest request ,
7177 EventsIndex eventsIndex ,
7278 ActionListener <GetProfilingResponse > submitListener
7379 ) {
7480 GetProfilingResponseBuilder responseBuilder = new GetProfilingResponseBuilder ();
75- this . nodeClient .prepareSearch (eventsIndex .getName ())
81+ client .prepareSearch (eventsIndex .getName ())
7682 .setTrackTotalHits (false )
7783 .setQuery (request .getQuery ())
7884 .addAggregation (
@@ -86,7 +92,6 @@ private void searchEventGroupByStackTrace(
8692 .subAggregation (new SumAggregationBuilder ("count" ).field ("Stacktrace.count" ))
8793 )
8894 .addAggregation (new SumAggregationBuilder ("total_count" ).field ("Stacktrace.count" ))
89- .setPreFilterShardSize (1 )
9095 .execute (new ActionListener <>() {
9196 @ Override
9297 public void onResponse (SearchResponse searchResponse ) {
@@ -104,7 +109,7 @@ public void onResponse(SearchResponse searchResponse) {
104109 }
105110 if (stackTraceEvents .isEmpty () == false ) {
106111 responseBuilder .setStackTraceEvents (stackTraceEvents );
107- retrieveStackTraces (responseBuilder , submitListener );
112+ retrieveStackTraces (client , responseBuilder , submitListener );
108113 } else {
109114 submitListener .onResponse (responseBuilder .build ());
110115 }
@@ -117,8 +122,12 @@ public void onFailure(Exception e) {
117122 });
118123 }
119124
120- private void retrieveStackTraces (GetProfilingResponseBuilder responseBuilder , ActionListener <GetProfilingResponse > submitListener ) {
121- this .nodeClient .prepareMultiGet ()
125+ private void retrieveStackTraces (
126+ Client client ,
127+ GetProfilingResponseBuilder responseBuilder ,
128+ ActionListener <GetProfilingResponse > submitListener
129+ ) {
130+ client .prepareMultiGet ()
122131 .addIds ("profiling-stacktraces" , responseBuilder .getStackTraceEvents ().keySet ())
123132 .setRealtime (true )
124133 .execute (new ActionListener <>() {
@@ -140,7 +149,7 @@ public void onResponse(MultiGetResponse multiGetItemResponses) {
140149 }
141150 responseBuilder .setStackTraces (stackTracePerId );
142151 responseBuilder .setTotalFrames (totalFrames );
143- retrieveStackTraceDetails (responseBuilder , stackFrameIds , executableIds , submitListener );
152+ retrieveStackTraceDetails (client , responseBuilder , stackFrameIds , executableIds , submitListener );
144153 }
145154
146155 @ Override
@@ -151,6 +160,7 @@ public void onFailure(Exception e) {
151160 }
152161
153162 private void retrieveStackTraceDetails (
163+ Client client ,
154164 GetProfilingResponseBuilder responseBuilder ,
155165 Set <String > stackFrameIds ,
156166 Set <String > executableIds ,
@@ -162,40 +172,33 @@ private void retrieveStackTraceDetails(
162172 if (stackFrameIds .isEmpty ()) {
163173 handler .onStackFramesResponse (new MultiGetResponse (new MultiGetItemResponse [0 ]));
164174 } else {
175+ client .prepareMultiGet ().addIds ("profiling-stackframes" , stackFrameIds ).setRealtime (true ).execute (new ActionListener <>() {
176+ @ Override
177+ public void onResponse (MultiGetResponse multiGetItemResponses ) {
178+ handler .onStackFramesResponse (multiGetItemResponses );
179+ }
165180
166- this .nodeClient .prepareMultiGet ()
167- .addIds ("profiling-stackframes" , stackFrameIds )
168- .setRealtime (true )
169- .execute (new ActionListener <>() {
170- @ Override
171- public void onResponse (MultiGetResponse multiGetItemResponses ) {
172- handler .onStackFramesResponse (multiGetItemResponses );
173- }
174-
175- @ Override
176- public void onFailure (Exception e ) {
177- submitListener .onFailure (e );
178- }
179- });
181+ @ Override
182+ public void onFailure (Exception e ) {
183+ submitListener .onFailure (e );
184+ }
185+ });
180186 }
181187 // no data dependency - we can do this concurrently
182188 if (executableIds .isEmpty ()) {
183189 handler .onExecutableDetailsResponse (new MultiGetResponse (new MultiGetItemResponse [0 ]));
184190 } else {
185- this .nodeClient .prepareMultiGet ()
186- .addIds ("profiling-executables" , executableIds )
187- .setRealtime (true )
188- .execute (new ActionListener <>() {
189- @ Override
190- public void onResponse (MultiGetResponse multiGetItemResponses ) {
191- handler .onExecutableDetailsResponse (multiGetItemResponses );
192- }
191+ client .prepareMultiGet ().addIds ("profiling-executables" , executableIds ).setRealtime (true ).execute (new ActionListener <>() {
192+ @ Override
193+ public void onResponse (MultiGetResponse multiGetItemResponses ) {
194+ handler .onExecutableDetailsResponse (multiGetItemResponses );
195+ }
193196
194- @ Override
195- public void onFailure (Exception e ) {
196- submitListener .onFailure (e );
197- }
198- });
197+ @ Override
198+ public void onFailure (Exception e ) {
199+ submitListener .onFailure (e );
200+ }
201+ });
199202 }
200203 }
201204
@@ -276,10 +279,7 @@ public void onExecutableDetailsResponse(MultiGetResponse multiGetItemResponses)
276279 Map <String , String > executables = new HashMap <>();
277280 for (MultiGetItemResponse executable : multiGetItemResponses ) {
278281 if (executable .isFailed () == false && executable .getResponse ().isExists ()) {
279- executables .put (
280- executable .getId (),
281- MapExtractor .read (executable .getResponse ().getSource (), "Executable" , "file" , "name" )
282- );
282+ executables .put (executable .getId (), ObjectPath .eval ("Executable.file.name" , executable .getResponse ().getSource ()));
283283 }
284284 }
285285 // publish to object state only when completely done, otherwise mayFinish() could run twice
0 commit comments