Skip to content

Conversation

@liutang123
Copy link
Contributor

@liutang123 liutang123 commented Jul 15, 2018

When join key is long or int in broadcast join, Spark will use LongToUnsafeRowMap to store key-values of the table witch will be broadcasted. But, when LongToUnsafeRowMap is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because write uses a variable cursor to determine how many bytes in page of LongToUnsafeRowMap will be write out and the cursor was not restore when deserializing, executor will write out nothing from page into disk.

What changes were proposed in this pull request?

Restore cursor value when deserializing.

@hvanhovell
Copy link
Contributor

ok to test

@hvanhovell
Copy link
Contributor

@liutang123 can you explain why we are losing data when serializing to disk. Also can you add a unit test?

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93112 has finished for PR 21772 at commit a72fe61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

writeLong(used)
val cursorFlag = cursor - Platform.LONG_ARRAY_OFFSET
writeLong(cursorFlag)
val used = (cursorFlag / 8).toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this said, that when (cursor - Platform.LONG_ARRAY_OFFSET) / 8 is over the range of Int, we will have overflow? But later you still do toInt and use the value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

losing data when serializing LongHashedRelation in executor, can you see this picture? In executor, the cursor is 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you post the image in this PR? The web site you refer contains too many ads.

Copy link
Contributor Author

@liutang123 liutang123 Jul 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't kown how to post an image in PR at first.
The image is as fowllows:
image

@liutang123
Copy link
Contributor Author

liutang123 commented Jul 18, 2018

@hvanhovell Thanks for reviewing. Losing data because the variable cursor in executor is Platform.LONG_ARRAY_OFFSET and serialization depends on it. I will add an UT later.

@viirya
Copy link
Member

viirya commented Jul 18, 2018

Let me clarify it. So this means that when LongToUnsafeRowMap is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because write uses cursor to determine used, it will write out nothing from page into disk.

Is this what you mean?

@liutang123
Copy link
Contributor Author

@viirya Yes, absolutely right. :)

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93267 has finished for PR 21772 at commit f67ff4d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93314 has finished for PR 21772 at commit f67ff4d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liutang123
Copy link
Contributor Author

@viirya Hi, Could you have more time to review this PR?


val usedWordsNumber = ((cursor - Platform.LONG_ARRAY_OFFSET) / 8).toInt
writeLong(usedWordsNumber)
writeLongArray(writeBuffer, page, usedWordsNumber)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no good reason, shall we revert this change? Looks like you only rename it?

val usedWordsNumber = readLong().toInt
// Set cursor because cursor is used in write function.
cursor = usedWordsNumber * 8 + Platform.LONG_ARRAY_OFFSET
page = readLongArray(readBuffer, usedWordsNumber)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Can you just update cursor and revert other unrelated change?

map.free()
}

test("SPARK-24809: Serializing LongHashedRelation in executor may result in data error") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have an end-to-end test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this UT can cover the case I had met.
End-to-end test is too hard to structure because this case just occurs when executor's memory is not enough to hold the block and the broadcast cache is removed by the garbage collector.

@viirya
Copy link
Member

viirya commented Jul 23, 2018

@liutang123 Thanks for this work. I'm curious that if this is an actual problem you hit in real application, or you just think it is problematic?

@liutang123
Copy link
Contributor Author

@viirya This case occurred in our cluster and we took a lot of time to find this bug.
For some man-made reasons, the small table's max id has become abnormally large. The LongHasedRelation generated based on the table was not optimized to dense and has become abnormally big(approximately 400MB).

array = readLongArray(readBuffer, length)
val pageLength = readLong().toInt
page = readLongArray(readBuffer, pageLength)
// Set cursor because cursor is used in write function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe: Restore cursor variable to make this map able to be serialized again on executors?

@viirya
Copy link
Member

viirya commented Jul 24, 2018

As you actually modify LongToUnsafeRowMap, is it better to update the PR title and description to reflect that?

val value1 = new Random().nextLong()

val key2 = 2L
val value2 = new Random().nextLong()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to use Random here? Can we use two arbitrary long values?


val resultRow = new UnsafeRow(1)
assert(originalMap.getValue(key1, resultRow).getLong(0) === value1)
assert(originalMap.getValue(key2, resultRow).getLong(0) === value2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to test LongToUnsafeRowMap's normal feature here. We just need to verify the map after two ser/de can work normally.

val ser = new KryoSerializer(
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()

val mapSerializedInDriver = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

// Simulate serialize/deserialize twice on driver and executor
val firstTimeSerialized = ...
val secondTimeSerialized = ...

@viirya
Copy link
Member

viirya commented Jul 24, 2018

cc @cloud-fan

@liutang123 liutang123 changed the title [SPARK-24809] [SQL] Serializing LongHashedRelation in executor may result in data error [SPARK-24809] [SQL] Serializing LongToUnsafeRowMap in executor may result in data error Jul 24, 2018
@SparkQA
Copy link

SparkQA commented Jul 24, 2018

Test build #93473 has finished for PR 21772 at commit 06a9547.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 24, 2018

Test build #93480 has finished for PR 21772 at commit c9ebfd0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liutang123
Copy link
Contributor Author

Jenkins, test this please

@kiszk
Copy link
Member

kiszk commented Jul 24, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 24, 2018

Test build #93516 has finished for PR 21772 at commit c9ebfd0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

originalMap.append(key2, unsafeProj(InternalRow(value2)))
originalMap.optimize()

val ser = new KryoSerializer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can write sparkContext.env.serializer.newInstance()

@cloud-fan
Copy link
Contributor

good catch! LGTM

@viirya
Copy link
Member

viirya commented Jul 29, 2018

LGTM too.

@SparkQA
Copy link

SparkQA commented Jul 29, 2018

Test build #93749 has finished for PR 21772 at commit 6246dfa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Merged to master/2.3/2.2/2.1

asfgit pushed a commit that referenced this pull request Jul 29, 2018
…ult in data error

When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk.

## What changes were proposed in this pull request?
Restore cursor value when deserializing.

Author: liulijia <[email protected]>

Closes #21772 from liutang123/SPARK-24809.

(cherry picked from commit 2c54aae)
Signed-off-by: Xiao Li <[email protected]>
asfgit pushed a commit that referenced this pull request Jul 29, 2018
…ult in data error

When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk.

## What changes were proposed in this pull request?
Restore cursor value when deserializing.

Author: liulijia <[email protected]>

Closes #21772 from liutang123/SPARK-24809.

(cherry picked from commit 2c54aae)
Signed-off-by: Xiao Li <[email protected]>
asfgit pushed a commit that referenced this pull request Jul 29, 2018
…ult in data error

When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk.

## What changes were proposed in this pull request?
Restore cursor value when deserializing.

Author: liulijia <[email protected]>

Closes #21772 from liutang123/SPARK-24809.

(cherry picked from commit 2c54aae)
Signed-off-by: Xiao Li <[email protected]>
@asfgit asfgit closed this in 2c54aae Jul 29, 2018
rdblue pushed a commit to rdblue/spark that referenced this pull request May 19, 2019
…ult in data error

When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk.

## What changes were proposed in this pull request?
Restore cursor value when deserializing.

Author: liulijia <[email protected]>

Closes apache#21772 from liutang123/SPARK-24809.
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 26, 2019
…ult in data error

When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk.

## What changes were proposed in this pull request?
Restore cursor value when deserializing.

Author: liulijia <[email protected]>

Closes apache#21772 from liutang123/SPARK-24809.

(cherry picked from commit 2c54aae)
Signed-off-by: Xiao Li <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 27, 2019
…ult in data error

When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk.

## What changes were proposed in this pull request?
Restore cursor value when deserializing.

Author: liulijia <[email protected]>

Closes apache#21772 from liutang123/SPARK-24809.

(cherry picked from commit 2c54aae)
Signed-off-by: Xiao Li <[email protected]>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…ult in data error

When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk.

## What changes were proposed in this pull request?
Restore cursor value when deserializing.

Author: liulijia <[email protected]>

Closes apache#21772 from liutang123/SPARK-24809.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants