|
23 | 23 | import org.elasticsearch.rest.RestRequest; |
24 | 24 | import org.elasticsearch.rest.RestResponse; |
25 | 25 | import org.elasticsearch.rest.action.RestActionListener; |
| 26 | +import org.elasticsearch.rest.action.RestCancellableNodeClient; |
26 | 27 | import org.elasticsearch.rest.action.RestResponseListener; |
| 28 | +import org.elasticsearch.tasks.TaskCancelledException; |
27 | 29 |
|
28 | 30 | import java.util.List; |
29 | 31 | import java.util.Map; |
@@ -58,14 +60,19 @@ protected RestChannelConsumer doCatRequest(final RestRequest request, final Node |
58 | 60 | clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout())); |
59 | 61 | clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices); |
60 | 62 |
|
61 | | - return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) { |
| 63 | + final RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); |
| 64 | + |
| 65 | + return channel -> cancelClient.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) { |
62 | 66 | @Override |
63 | 67 | public void processResponse(final ClusterStateResponse clusterStateResponse) { |
64 | 68 | final IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest(); |
65 | 69 | indicesSegmentsRequest.indices(indices); |
66 | | - client.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener<IndicesSegmentResponse>(channel) { |
| 70 | + cancelClient.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener<IndicesSegmentResponse>(channel) { |
67 | 71 | @Override |
68 | 72 | public RestResponse buildResponse(final IndicesSegmentResponse indicesSegmentResponse) throws Exception { |
| 73 | + if (request.getHttpChannel().isOpen() == false) { |
| 74 | + throw new TaskCancelledException("response channel [" + request.getHttpChannel() + "] closed"); |
| 75 | + } |
69 | 76 | final Map<String, IndexSegments> indicesSegments = indicesSegmentResponse.getIndices(); |
70 | 77 | Table tab = buildTable(request, clusterStateResponse, indicesSegments); |
71 | 78 | return RestTable.buildResponse(tab, channel); |
|
0 commit comments