1010import org .elasticsearch .action .support .IndicesOptions ;
1111import org .elasticsearch .common .io .stream .StreamInput ;
1212import org .elasticsearch .common .io .stream .StreamOutput ;
13+ import org .elasticsearch .core .Nullable ;
1314import org .elasticsearch .index .query .QueryBuilder ;
1415import org .elasticsearch .index .shard .ShardId ;
1516import org .elasticsearch .transport .TransportRequest ;
2021import java .util .Set ;
2122
2223/**
23- * Internal terms enum request executed directly against a specific node, querying potentially many
24+ * Internal terms enum request executed directly against a specific node, querying potentially many
2425 * shards in one request
2526 */
2627public class NodeTermsEnumRequest extends TransportRequest implements IndicesRequest {
@@ -36,12 +37,27 @@ public class NodeTermsEnumRequest extends TransportRequest implements IndicesReq
3637 private final QueryBuilder indexFilter ;
3738 private Set <ShardId > shardIds ;
3839 private String nodeId ;
39-
40+
41+ public NodeTermsEnumRequest (final String nodeId ,
42+ final Set <ShardId > shardIds ,
43+ TermsEnumRequest request ,
44+ long taskStartTimeMillis ) {
45+ this .field = request .field ();
46+ this .string = request .string ();
47+ this .searchAfter = request .searchAfter ();
48+ this .caseInsensitive = request .caseInsensitive ();
49+ this .size = request .size ();
50+ this .timeout = request .timeout ().getMillis ();
51+ this .taskStartedTimeMillis = taskStartTimeMillis ;
52+ this .indexFilter = request .indexFilter ();
53+ this .nodeId = nodeId ;
54+ this .shardIds = shardIds ;
55+ }
4056
4157 public NodeTermsEnumRequest (StreamInput in ) throws IOException {
4258 super (in );
4359 field = in .readString ();
44- string = in .readString ();
60+ string = in .readOptionalString ();
4561 searchAfter = in .readOptionalString ();
4662 caseInsensitive = in .readBoolean ();
4763 size = in .readVInt ();
@@ -56,36 +72,47 @@ public NodeTermsEnumRequest(StreamInput in) throws IOException {
5672 }
5773 }
5874
59- public NodeTermsEnumRequest (final String nodeId , final Set <ShardId > shardIds , TermsEnumRequest request ) {
60- this .field = request .field ();
61- this .string = request .string ();
62- this .searchAfter = request .searchAfter ();
63- this .caseInsensitive = request .caseInsensitive ();
64- this .size = request .size ();
65- this .timeout = request .timeout ().getMillis ();
66- this .taskStartedTimeMillis = request .taskStartTimeMillis ;
67- this .indexFilter = request .indexFilter ();
68- this .nodeId = nodeId ;
69- this .shardIds = shardIds ;
75+ @ Override
76+ public void writeTo (StreamOutput out ) throws IOException {
77+ super .writeTo (out );
78+ out .writeString (field );
79+ out .writeOptionalString (string );
80+ out .writeOptionalString (searchAfter );
81+ out .writeBoolean (caseInsensitive );
82+ out .writeVInt (size );
83+ // Adjust the amount of permitted time the shard has remaining to gather terms.
84+ long timeSpentSoFarInCoordinatingNode = System .currentTimeMillis () - taskStartedTimeMillis ;
85+ long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode );
86+ // TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0?
87+ out .writeVLong (remainingTimeForShardToUse );
88+ out .writeVLong (taskStartedTimeMillis );
89+ out .writeOptionalNamedWriteable (indexFilter );
90+ out .writeString (nodeId );
91+ out .writeVInt (shardIds .size ());
92+ for (ShardId shardId : shardIds ) {
93+ shardId .writeTo (out );
94+ }
7095 }
7196
7297 public String field () {
7398 return field ;
7499 }
75100
101+ @ Nullable
76102 public String string () {
77103 return string ;
78104 }
79105
106+ @ Nullable
80107 public String searchAfter () {
81108 return searchAfter ;
82109 }
83110
84111 public long taskStartedTimeMillis () {
85112 return this .taskStartedTimeMillis ;
86113 }
87-
88- /**
114+
115+ /**
89116 * The time this request was materialized on a node
90117 */
91118 long nodeStartedTimeMillis () {
@@ -94,12 +121,12 @@ long nodeStartedTimeMillis() {
94121 nodeStartedTimeMillis = System .currentTimeMillis ();
95122 }
96123 return this .nodeStartedTimeMillis ;
97- }
98-
124+ }
125+
99126 public void startTimerOnDataNode () {
100127 nodeStartedTimeMillis = System .currentTimeMillis ();
101128 }
102-
129+
103130 public Set <ShardId > shardIds () {
104131 return Collections .unmodifiableSet (shardIds );
105132 }
@@ -119,28 +146,6 @@ public String nodeId() {
119146 return nodeId ;
120147 }
121148
122- @ Override
123- public void writeTo (StreamOutput out ) throws IOException {
124- super .writeTo (out );
125- out .writeString (field );
126- out .writeString (string );
127- out .writeOptionalString (searchAfter );
128- out .writeBoolean (caseInsensitive );
129- out .writeVInt (size );
130- // Adjust the amount of permitted time the shard has remaining to gather terms.
131- long timeSpentSoFarInCoordinatingNode = System .currentTimeMillis () - taskStartedTimeMillis ;
132- long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode );
133- // TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0?
134- out .writeVLong (remainingTimeForShardToUse );
135- out .writeVLong (taskStartedTimeMillis );
136- out .writeOptionalNamedWriteable (indexFilter );
137- out .writeString (nodeId );
138- out .writeVInt (shardIds .size ());
139- for (ShardId shardId : shardIds ) {
140- shardId .writeTo (out );
141- }
142- }
143-
144149 public QueryBuilder indexFilter () {
145150 return indexFilter ;
146151 }
@@ -162,5 +167,4 @@ public IndicesOptions indicesOptions() {
162167 public boolean remove (ShardId shardId ) {
163168 return shardIds .remove (shardId );
164169 }
165-
166170}
0 commit comments