Skip to content

Commit da976d2

Browse files
authored
Improve robustness of Query Result serializations (#54692) (#55028)
Makes query result serialization more robust by propagating possible IOExceptions that can occur during shard level result serialization to the caller instead of throwing AssertionError that is not intercepted. Fixes #54665
1 parent 17101d8 commit da976d2

File tree

5 files changed

+21
-15
lines changed

5 files changed

+21
-15
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ public void writeGenericValue(@Nullable Object value) throws IOException {
808808
if (writer != null) {
809809
writer.write(this, value);
810810
} else {
811-
throw new IOException("can not write type [" + type + "]");
811+
throw new IllegalArgumentException("can not write type [" + type + "]");
812812
}
813813
}
814814

server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.lucene.index.IndexReader;
2929
import org.apache.lucene.util.Accountable;
3030
import org.apache.lucene.util.RamUsageEstimator;
31+
import org.elasticsearch.common.CheckedSupplier;
3132
import org.elasticsearch.common.bytes.BytesReference;
3233
import org.elasticsearch.common.cache.Cache;
3334
import org.elasticsearch.common.cache.CacheBuilder;
@@ -43,6 +44,7 @@
4344
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4445

4546
import java.io.Closeable;
47+
import java.io.IOException;
4648
import java.util.Collection;
4749
import java.util.Collections;
4850
import java.util.Iterator;
@@ -115,8 +117,8 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
115117
// NORELEASE The cacheKeyRenderer has been added in order to debug
116118
// https://github.com/elastic/elasticsearch/issues/32827, it should be
117119
// removed when this issue is solved
118-
BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
119-
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
120+
BytesReference getOrCompute(CacheEntity cacheEntity, CheckedSupplier<BytesReference, IOException> loader,
121+
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
120122
assert reader.getReaderCacheHelper() != null;
121123
final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey);
122124
Loader cacheLoader = new Loader(cacheEntity, loader);
@@ -157,10 +159,10 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference
157159
private static class Loader implements CacheLoader<Key, BytesReference> {
158160

159161
private final CacheEntity entity;
160-
private final Supplier<BytesReference> loader;
162+
private final CheckedSupplier<BytesReference, IOException> loader;
161163
private boolean loaded;
162164

163-
Loader(CacheEntity entity, Supplier<BytesReference> loader) {
165+
Loader(CacheEntity entity, CheckedSupplier<BytesReference, IOException> loader) {
164166
this.entity = entity;
165167
this.loader = loader;
166168
}

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@
4545
import org.elasticsearch.cluster.routing.RecoverySource;
4646
import org.elasticsearch.cluster.routing.ShardRouting;
4747
import org.elasticsearch.cluster.service.ClusterService;
48+
import org.elasticsearch.common.CheckedConsumer;
4849
import org.elasticsearch.common.CheckedFunction;
50+
import org.elasticsearch.common.CheckedSupplier;
4951
import org.elasticsearch.common.Nullable;
5052
import org.elasticsearch.common.breaker.CircuitBreaker;
5153
import org.elasticsearch.common.bytes.BytesReference;
@@ -1376,12 +1378,7 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q
13761378
() -> "Shard: " + request.shardId() + "\nSource:\n" + request.source(),
13771379
out -> {
13781380
queryPhase.execute(context);
1379-
try {
1380-
context.queryResult().writeToNoId(out);
1381-
1382-
} catch (IOException e) {
1383-
throw new AssertionError("Could not serialize response", e);
1384-
}
1381+
context.queryResult().writeToNoId(out);
13851382
loadedFromCache[0] = false;
13861383
});
13871384

@@ -1420,9 +1417,9 @@ public ByteSizeValue getTotalIndexingBufferBytes() {
14201417
* @return the contents of the cache or the result of calling the loader
14211418
*/
14221419
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey,
1423-
Supplier<String> cacheKeyRenderer, Consumer<StreamOutput> loader) throws Exception {
1420+
Supplier<String> cacheKeyRenderer, CheckedConsumer<StreamOutput, IOException> loader) throws Exception {
14241421
IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard);
1425-
Supplier<BytesReference> supplier = () -> {
1422+
CheckedSupplier<BytesReference, IOException> supplier = () -> {
14261423
/* BytesStreamOutput allows to pass the expected size but by default uses
14271424
* BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
14281425
* a date histogram with 3 buckets is ~100byte so 16k might be very wasteful

server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,13 @@ public void testSimpleStreams() throws Exception {
349349
assertThat(jdt.getZonedDateTime().toInstant().toEpochMilli(), equalTo(123456L));
350350
assertThat(jdt.getZonedDateTime().getZone(), equalTo(ZoneId.of("America/Los_Angeles")));
351351
assertEquals(0, in.available());
352+
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> out.writeGenericValue(new Object() {
353+
@Override
354+
public String toString() {
355+
return "This object cannot be serialized by writeGeneric method";
356+
}
357+
}));
358+
assertThat(ex.getMessage(), containsString("can not write type"));
352359
in.close();
353360
out.close();
354361
}

server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.lucene.search.TopDocs;
3333
import org.apache.lucene.store.Directory;
3434
import org.apache.lucene.util.BytesRef;
35+
import org.elasticsearch.common.CheckedSupplier;
3536
import org.elasticsearch.common.bytes.AbstractBytesReference;
3637
import org.elasticsearch.common.bytes.BytesReference;
3738
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -49,7 +50,6 @@
4950
import java.io.IOException;
5051
import java.util.Arrays;
5152
import java.util.concurrent.atomic.AtomicBoolean;
52-
import java.util.function.Supplier;
5353

5454
public class IndicesRequestCacheTests extends ESTestCase {
5555

@@ -331,7 +331,7 @@ public Iterable<Field> newDoc(int id, String value) {
331331
StringField.TYPE_STORED));
332332
}
333333

334-
private static class Loader implements Supplier<BytesReference> {
334+
private static class Loader implements CheckedSupplier<BytesReference, IOException> {
335335

336336
private final DirectoryReader reader;
337337
private final int id;

0 commit comments

Comments
 (0)