Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b76039b
WIP
Tim-Brooks Oct 18, 2017
5c4f9c2
Merge remote-tracking branch 'upstream/master' into reuse_buffers
Tim-Brooks Oct 19, 2017
198cd51
Transition to network bytes for writes
Tim-Brooks Oct 19, 2017
6294630
Use channel buffer on incoming channels
Tim-Brooks Oct 19, 2017
9496186
Use actual big arrays
Tim-Brooks Oct 19, 2017
358983e
Close read context
Tim-Brooks Oct 19, 2017
0fdf0be
A few cleanups
Tim-Brooks Oct 19, 2017
f447893
Do not circuit break and only assert pages released after class
Tim-Brooks Oct 20, 2017
64ce813
Merge remote-tracking branch 'upstream/master' into reuse_buffers
Tim-Brooks Oct 20, 2017
f197df7
Improve tests
Tim-Brooks Oct 22, 2017
636d504
Merge remote-tracking branch 'upstream/master' into reuse_buffers
Tim-Brooks Oct 23, 2017
d28cc9d
Fix edge case with slicing while using channel buffer
Tim-Brooks Oct 23, 2017
fadaf73
Make sure to close removed buffer
Tim-Brooks Oct 23, 2017
4d7dd1b
Implement vectorized reads
Tim-Brooks Oct 26, 2017
962e493
WIP
Tim-Brooks Oct 26, 2017
7844f5e
Merge remote-tracking branch 'upstream/master' into reuse_buffers
Tim-Brooks Oct 26, 2017
4c542cc
Merge branch 'reuse_buffers' into simplify_for_simon
Tim-Brooks Oct 26, 2017
fb6c81e
WIP
Tim-Brooks Oct 27, 2017
95f2ff1
WIP
Tim-Brooks Oct 27, 2017
498e8e0
Fix tests
Tim-Brooks Oct 27, 2017
caea6d9
A few cleanups
Tim-Brooks Oct 27, 2017
1168d9b
Small fix related to channel buffer
Tim-Brooks Oct 27, 2017
1161c83
Changes based on review
Tim-Brooks Oct 27, 2017
af21323
Changes based on review
Tim-Brooks Oct 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions core/src/main/java/org/elasticsearch/common/bytes/BytesArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
package org.elasticsearch.common.bytes;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;

public final class BytesArray extends BytesReference {
import java.util.concurrent.atomic.AtomicBoolean;

public class BytesArray extends BytesReference implements Releasable {

public static final BytesArray EMPTY = new BytesArray(BytesRef.EMPTY_BYTES, 0, 0);
private final byte[] bytes;
private final int offset;
private final int length;
private final Releasable releasable;
private final AtomicBoolean isClosed = new AtomicBoolean(false);

public BytesArray(String bytes) {
this(new BytesRef(bytes));
Expand All @@ -43,16 +49,26 @@ public BytesArray(BytesRef bytesRef, boolean deepCopy) {
bytes = bytesRef.bytes;
offset = bytesRef.offset;
length = bytesRef.length;
releasable = null;
}

public BytesArray(byte[] bytes) {
this(bytes, 0, bytes.length);
}

public BytesArray(byte[] bytes, Releasable releasable) {
this(bytes, 0, bytes.length, releasable);
}

public BytesArray(byte[] bytes, int offset, int length) {
this(bytes, offset, length, null);
}

public BytesArray(byte[] bytes, int offset, int length, Releasable releasable) {
this.bytes = bytes;
this.offset = offset;
this.length = length;
this.releasable = releasable;
}

@Override
Expand All @@ -68,11 +84,25 @@ public int length() {
@Override
public BytesReference slice(int from, int length) {
if (from < 0 || (from + length) > this.length) {
throw new IllegalArgumentException("can't slice a buffer with length [" + this.length + "], with slice parameters from [" + from + "], length [" + length + "]");
throw new IllegalArgumentException("can't slice a buffer with length [" + this.length +
"], with slice parameters from [" + from + "], length [" + length + "]");
}
return new BytesArray(bytes, offset + from, length);
}

/**
* This provides the same facilities as {@link #slice(int, int)}, but retains a reference to
* the underlying releasable (if one exists for this array). Closing the "slice" will also
* close this array.
*/
public BytesArray sliceAndRetainReleasable(int from, int length) {
if (from < 0 || (from + length) > this.length()) {
throw new IllegalArgumentException("can't slice a buffer with length [" + this.length +
"], with slice parameters from [" + from + "], length [" + length + "]");
}
return new BytesArray(bytes, offset + from, length, releasable);
}

public byte[] array() {
return bytes;
}
Expand All @@ -91,4 +121,11 @@ public long ramBytesUsed() {
return bytes.length;
}

@Override
public void close() {
// TODO: Do we want to throw an exception to expose misuse?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the closeable contract says it's closeable multple times so we are good I guess.

if (isClosed.compareAndSet(false, true)) {
Releasables.close(releasable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
* generic stream access to {@link BytesReference} instances without materializing the
* underlying bytes reference.
*/
final class BytesReferenceStreamInput extends StreamInput {
public final class BytesReferenceStreamInput extends StreamInput {
private final BytesRefIterator iterator;
private int sliceIndex;
private BytesRef slice;
private final int length; // the total size of the stream
private int offset; // the current position of the stream

BytesReferenceStreamInput(BytesRefIterator iterator, final int length) throws IOException {
public BytesReferenceStreamInput(BytesRefIterator iterator, final int length) throws IOException {
this.iterator = iterator;
this.slice = iterator.next();
this.length = length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,15 @@ public class PagedBytesReference extends BytesReference {

private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;

private final BigArrays bigarrays;
protected final ByteArray byteArray;
private final int offset;
private final int length;

public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
this(bigarrays, byteArray, 0, length);
public PagedBytesReference(ByteArray byteArray, int length) {
this(byteArray, 0, length);
}

public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int from, int length) {
this.bigarrays = bigarrays;
public PagedBytesReference(ByteArray byteArray, int from, int length) {
this.byteArray = byteArray;
this.offset = from;
this.length = length;
Expand All @@ -65,7 +63,7 @@ public BytesReference slice(int from, int length) {
if (from < 0 || (from + length) > length()) {
throw new IllegalArgumentException("can't slice a buffer with length [" + length() + "], with slice parameters from [" + from + "], length [" + length + "]");
}
return new PagedBytesReference(bigarrays, byteArray, offset + from, length);
return new PagedBytesReference(byteArray, offset + from, length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ public final class ReleasablePagedBytesReference extends PagedBytesReference imp

private final Releasable releasable;

public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length,
Releasable releasable) {
super(bigarrays, byteArray, length);
public ReleasablePagedBytesReference(ByteArray byteArray, int length, Releasable releasable) {
super(byteArray, length);
this.releasable = releasable;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.collect;

import com.carrotsearch.hppc.BufferAllocationException;
import com.carrotsearch.hppc.ObjectArrayDeque;

/**
* This is an {@link ObjectArrayDeque} that allows indexed O(1) access to its contents.
* Additionally, it overrides the catching on {@link OutOfMemoryError} on resize.
*
* @param <KType> the types contained in the deque
*/
public class IndexedArrayDeque<KType> extends ObjectArrayDeque<KType> {

public IndexedArrayDeque(int initialSize) {
super(initialSize);
}

public KType get(int i) {
Object[] rawBuffer = buffer;
int actualIndex = (head + i) % rawBuffer.length;
@SuppressWarnings("unchecked")
KType value = (KType) rawBuffer[actualIndex];
return value;
}

@Override
protected void ensureBufferSpace(int expectedAdditions) {
try {
super.ensureBufferSpace(expectedAdditions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh man that is terrible. We should contribute this back to hppc. Can you make sure we do this? I don't think it should ever catch OOM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh and, good catch!

} catch (BufferAllocationException e) {
Throwable cause = e.getCause();
if (cause instanceof OutOfMemoryError) {
throw (OutOfMemoryError) cause;
}
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public int size() {

@Override
public BytesReference bytes() {
return new PagedBytesReference(bigArrays, bytes, count);
return new PagedBytesReference(bytes, count);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
*/
@Override
public ReleasablePagedBytesReference bytes() {
return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable);
return new ReleasablePagedBytesReference(bytes, count, releasable);
}

@Override
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/elasticsearch/common/util/BigArrays.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.recycler.Recycler;
Expand Down Expand Up @@ -463,6 +464,18 @@ private <T extends BigArray> T validate(T array) {
return array;
}

/**
* Allocate a new {@link BytesArray}.
*/
public BytesArray newBytePage() {
if (recycler != null) {
final Recycler.V<byte[]> page = recycler.bytePage(false);
return new BytesArray(page.v(), page);
} else {
return new BytesArray(new byte[BYTE_PAGE_SIZE], null);
}
}

/**
* Allocate a new {@link ByteArray}.
* @param size the initial length of the array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public class BytesArrayTests extends AbstractBytesReferenceTestCase {

@Override
protected BytesReference newBytesReference(int length) throws IOException {
return newBytesReference(length, randomInt(length));
return newBytesArrayReference(length, randomInt(length));
}

@Override
protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException {
return newBytesReference(length, 0);
return newBytesArrayReference(length, 0);
}

private BytesReference newBytesReference(int length, int offset) throws IOException {
private static BytesReference newBytesArrayReference(int length, int offset) throws IOException {
// we know bytes stream output always creates a paged bytes reference, we use it to create randomized content
final BytesStreamOutput out = new BytesStreamOutput(length + offset);
for (int i = 0; i < length + offset; i++) {
Expand All @@ -61,6 +61,7 @@ public void testArray() throws IOException {
}
}

@Override
public void testArrayOffset() throws IOException {
int length = randomInt(PAGE_SIZE * randomIntBetween(2, 5));
BytesArray pbr = (BytesArray) newBytesReferenceWithOffsetOfZero(length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ public void testEquals() {
}

// get refs & compare
BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length);
BytesReference pbr2 = new PagedBytesReference(bigarrays, ba2, length);
BytesReference pbr = new PagedBytesReference(ba1, length);
BytesReference pbr2 = new PagedBytesReference(ba2, length);
assertEquals(pbr, pbr2);
int offsetToFlip = randomIntBetween(0, length - 1);
int value = ~Byte.toUnsignedInt(ba1.get(offsetToFlip));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ private static class TestResponse extends RestResponse {
}
final ByteArray bigArray = bigArrays.newByteArray(bytes.length);
bigArray.set(0, bytes, 0, bytes.length);
reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length, Releasables.releaseOnce(bigArray));
reference = new ReleasablePagedBytesReference(bigArray, bytes.length, Releasables.releaseOnce(bigArray));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public void testEquals() throws IOException {
public void testSliceEquals() {
int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2, 5));
ByteArray ba1 = bigarrays.newByteArray(length, false);
BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length);
BytesReference pbr = new PagedBytesReference(ba1, length);

// test equality of slices
int sliceFrom = randomIntBetween(0, pbr.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2074,7 +2074,7 @@ public static void afterClass() throws Exception {
try {
INSTANCE.printTestMessage("cleaning up after");
INSTANCE.afterInternal(true);
checkStaticState();
checkStaticState(true);
} finally {
INSTANCE = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ protected boolean enableWarningsCheck() {

@After
public final void after() throws Exception {
checkStaticState();
checkStaticState(false);
// We check threadContext != null rather than enableWarningsCheck()
// because after methods are still called in the event that before
// methods failed, in which case threadContext might not have been
Expand Down Expand Up @@ -393,8 +393,10 @@ public void log(StatusData data) {
}

// separate method so that this can be checked again after suite scoped cluster is shut down
protected static void checkStaticState() throws Exception {
MockPageCacheRecycler.ensureAllPagesAreReleased();
protected static void checkStaticState(boolean afterClass) throws Exception {
if (afterClass) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this in general or can we maybe only do this in specific test cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I view this as a long term fix. I just wanted to full CI run without failures caused by holding bytes in between tests. We maybe should talk about the best way to deal with this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok understood. maybe mark it with //nocommit

MockPageCacheRecycler.ensureAllPagesAreReleased();
}
MockBigArrays.ensureAllArraysAreReleased();

// ensure no one changed the status logger level on us
Expand Down
Loading