Skip to content

Conversation

@nik9000
Copy link
Member

@nik9000 nik9000 commented Aug 26, 2022

This teaches IntArray about our serialization.
The interesting bit here is that reading slices the reference to the
underlying buffer rather than copying. That reference can be retained as
long as it's needed, holding the underlying buffer open until the
IntArray is closed.

This should allow aggregations to send dense representations between
nodes with one fewer copy operation.

@elasticsearchmachine elasticsearchmachine added needs:triage Requires assignment of a team area label v8.5.0 labels Aug 26, 2022
This adds `writeBigIntArray` and `readBigIntArray` to our serialization.
The interesting bit here is that reading slices the reference to the
underlying buffer rather than copying. That reference can be retained as
long as it's needed, holding the underlying buffer open until the
`IntArray` is `close`d.

This should allow aggregations to send dense representations between
nodes with one fewer copy operation.
Copy link
Member Author

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked IntArray here because BytesReference has a getInt method. It looks like that method does some bit shifting. We could probably get away with avoiding the bit shifting most of the time, but that seems like a problem for another day.

@not-napoleon not-napoleon added :Analytics/Aggregations Aggregations :Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. >tech debt and removed needs:triage Requires assignment of a team area label labels Aug 26, 2022
@elasticsearchmachine
Copy link
Collaborator

Hi @nik9000, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

@elasticsearchmachine elasticsearchmachine added Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) labels Aug 26, 2022
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

Copy link
Member

@not-napoleon not-napoleon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context, this is a step in the larger aggregations memory story we're working on currently. We are migrating away from large collections of small InternalAggregation objects, which each managed a single bucket, in favor of fewer, larger objects which each manage an entire node's result set for a given aggregation. These objects will be backed by BigArrays, which will be built on the data nodes and deserialized on the coordinating nodes. Something along the lines of this PR will let us avoid copying huge blocks of data when doing that deserialization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about size here - the arrays are often oversized but we will have a precise size when we're ready to write. If we write the oversized array it could be twice as big.

Copy link
Contributor

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nik this is a really cool idea, added a couple comments :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no particular reason really and I agree that this might not be optimal. We could make it increment the ref (and decrement on close) and I bet we could use that to clean up quite a bit of code but all the existing code is written under the assumption that all the lifecycle lives with the bytes reference and that's a bigger task to change without introducing leaks I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I don't want to change this as part of this PR, but it sure confused the hell out of me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems needlessly expensive when we already have the byte representation for the implementation above. Should we maybe add a writeTo to IntArray (i.e. make it writable) instead to be able to leverage those bytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah!

Copy link
Member Author

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I've had a bit of a think about this and wonder if we're not better of making a new interface - one that just reads. And to have two implementations of that interface - one that wraps our IntArray from BigArrays and one that wraps the netty buffer. But, like, that's around the edges. It sounds like folks think this is generally a good idea. I'll try and iterate a bit more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah!

@nik9000
Copy link
Member Author

nik9000 commented Sep 2, 2022

@original-brownbear could you have another look at this? One thing that's really interesting is endianness. BigArrays always runs in platform-native endianness which is usually little endian. BytesReference#getInt is big endian. I'd like to keep those two things true. I was thinking we could lock the wire to little endian. That way in the normal case we get to send the data in the native order most of the time. We could add something like MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.LITTLE_ENDIAN); in BytesArray. That should let us read those ints while reducing quite quickly most of the time.

I don't think we test on any big endian platforms though.

Copy link
Contributor

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nik, I added a few comments but this looks really nice. I commented on endianess inline. The size limitation to 2G is a non-issue at the moment since we don't allow transport messages larger than 2G anyway IMO.

}
int end = intSize % INT_PAGE_SIZE;
out.write(pages[pages.length - 1], 0, end * Integer.BYTES);
// NOCOMMIT endian-ness
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ to your suggestion of adding a fallback for when the system byte order isn't LE if we have to.

But you do make a good point here, we aren't testing or supporting any BE platforms right now as far as I understand. Couldn't we just add a check to node startup that we're not on a BE platform and deal with the issue that way, saving some complexity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've asked around and we don't support any big endian platforms at all. I think it's enough to fail to start if we are on such a system and to comment about it.

@Override
public int get(long index) {
if (index > Integer.MAX_VALUE / 4) {
throw new UnsupportedOperationException(); // NOCOMMIT Oh god, what do we do here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here in this particular line of code it's probably a non-issue since we only read this from the wire and we'll never run into a case where this kind of index isn't out of bounds. Maybe just throw an out of bounds exception here?

}

@Override
public int getIntLE(int index) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add an implementation (using var handles as we did for other stuff in Numbers) of this to BytesArray, otherwise this might be quite slow if it's not getting inlined properly (which it probably won't be).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We have stuff in ByteUtils for this sort of thing I think. I figured I'd do it in a followup and add a microbenchmark. I could do it now if you'd prefer too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up seems fine by me, especially if it gets a benchmark, thanks! :)

@nik9000
Copy link
Member Author

nik9000 commented Sep 15, 2022

I've brought this locally and fixed somethings up. Is it ready to get in so we can build on it?

Copy link
Contributor

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks Nik!

@nik9000 nik9000 merged commit b11f738 into elastic:main Sep 19, 2022
nik9000 added a commit to nik9000/elasticsearch that referenced this pull request Sep 19, 2022
This fixed a bug with the `BigIntArray` serialization that I wrote in elastic#89668
where it'd skip the entire final block if we used all of it.
elasticsearchmachine pushed a commit that referenced this pull request Sep 21, 2022
This fixed a bug with the `BigIntArray` serialization that I wrote in
#89668 where it'd skip the entire final block if we used all of it.
martijnvg added a commit to martijnvg/elasticsearch that referenced this pull request Oct 10, 2022
Based on elastic#89668 but for doubles. This should allow
aggregations down the road to read doubles values
directly from netty buffer, rather than copying
it from the netty buffer.

Relates to elastic#89437
elasticsearchmachine pushed a commit that referenced this pull request Oct 14, 2022
Based on #89668 but for doubles. This should allow aggregations down the
road to read doubles values directly from netty buffer, rather than
copying it from the netty buffer.

Relates to #89437
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/Aggregations Aggregations :Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. >tech debt v8.5.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants