Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Jul 2, 2017

What changes were proposed in this pull request?

This PR ensures that Unsafe.sizeInBytes must be a multiple of 8. It it is not satisfied. Unsafe.hashCode causes the assertion violation.

How was this patch tested?

Will add test cases

@SparkQA
Copy link

SparkQA commented Jul 2, 2017

Test build #79042 has finished for PR 18503 at commit eb87c63.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 2, 2017

Test build #79048 has finished for PR 18503 at commit ccc820f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jul 4, 2017

@cloud-fan I believe that this failure occurs since the checkpoint file does not have a value record whose size is not a multiple of 8 (i.e. 28).

We could solve this failure by regenerating these files. However, I think that the real issue is that checkpoint files, whose size may not be a multiple of 8, exist in the production environment. Should we resize the value record size for UnsafeRow to a multiple of 8 if it is not a multiple of 8?

What do you think?

@kiszk kiszk changed the title [WIP][SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a multiple of 8 [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a multiple of 8 Jul 4, 2017
keyRowId = numRows;
keyRow.pointTo(base, recordOffset, klen);
valueRow.pointTo(base, recordOffset + klen, vlen + 4);
valueRow.pointTo(base, recordOffset + klen, vlen + 8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add 8 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 59 puts long value whose length is 8. In summary, line 57 and 59 consumes vlen + 8 bytes from base + recordOffset+ klen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strictly speaking, the final long value doesn't belong to value row, why are we doing so?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. While Long consumes 8 bytes in a page, this is not a part of UnsafeRow.

@SparkQA
Copy link

SparkQA commented Jul 5, 2017

Test build #79214 has finished for PR 18503 at commit 8a7a948.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


keyRowId = numRows;
keyRow.pointTo(base, recordOffset, klen);
valueRow.pointTo(base, recordOffset + klen, vlen + 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why we did this before. Was it a mistake?

Copy link
Member Author

@kiszk kiszk Jul 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question.
@sameeragarwal had similar question one year ago. However, no response from @ooq

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall it being intentional.
See discussion here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ooq thank you for pointing out interesting discussion.
This discussion seems to make sense for page management. The question of @cloud-fan and me is whether valueRow uses only vlen. I think that +4 is for page management.

val keyRowBuffer = new Array[Byte](keySize)
// If key size in an existing file is not a multiple of 8, round it to multiple of 8
val keyAllocationSize = ((keySize + 7) / 8) * 8
val keyRowBuffer = new Array[Byte](keyAllocationSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so RowBasedKeyValueBatch is the format for state store? cc @zsxwing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that RowBasedKeyValueBatch is not used for state store in HDFSBackedStateStoreProvider for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we have this logic? what do we write into the state store?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is why I added this logic.
I believe that this failure occurs since the checkpoint file does not have a value record whose size is not a multiple of 8 (i.e. 28). Thus, I always round up its size to a multiple of 8.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all unsafe rows have the size of a multiple of 8, except RowBasedKeyValueBatch in the previous code. So I'm wondering how the state store can have unsafe rows with wrong size, does state store use RowBasedKeyValueBatch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. If we are having to regenerate, then we are breaking something. And we must not since we have been guaranteeing backward compatibility. If @cloud-fan claims that all unsafe rows except RowBasedKeyValueBatch should have a size multiple of 8, then we need to understand what is going on; why does reading the checkpoint files fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok we need to figure out what's going on, seems there are other places we may have wrong size in UnsafeRow

Copy link
Member Author

@kiszk kiszk Jul 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to exactly know how these check point files were generated. Were these files generated by a method that @kunalkhamar? Or, were these files generated by another tool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a program, which @kunalkhamar pointed out at here, as a new test case to check whether all of the size in UnsafeRow are correct or not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @zsxwing @tdas @kunalkhamar
I misunderstood. To store a state, HDFSBackedStateStoreProvider is used.

I added a new test suite to check HDFSBackedStateStoreProvider for storing and restoring state, as @kunalkhamar suggested here.

Do you think it makes sense?

@SparkQA
Copy link

SparkQA commented Jul 13, 2017

Test build #79583 has finished for PR 18503 at commit 7f5a269.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

This PR we changed FixedLengthRowBasedKeyValueBatch, I want to know if it has some external impacts, like checkpoint data, or it's just used internally?

@kiszk
Copy link
Member Author

kiszk commented Jul 14, 2017

@tdas @marmbrus @zsxwing could you please let us know any impacts by change of FixedLengthRowBasedKeyValueBatch? Can we change this structure?

} else {
val valueRowBuffer = new Array[Byte](valueSize)
// If value size in an existing file is not a multiple of 8, round it to multiple of 8
val valueAllocationSize = ((valueSize + 7) / 8) * 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be made into utility function inside UnsafeRow? Seems like this sort of adjustment should not be a concerns of external users of UnsafeRow.

Copy link
Contributor

@tdas tdas Jul 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, how about something like this

class UnsafeRow { 
  def readFromStream(byteStream: InputStream, bytes: Int): UnsafeRow = ???
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan what do you think?

CheckAnswer((1, 2), (2, 2), (3, 2)))
}

testQuietly("store to and recover from a checkpoint") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this test is needed. There are existing tests that already test reading from checkpoints, etc. The critical test was reading 2.1 checkpoint files which seems to be passing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test also checks whether the length of checkpoints is a multiple of 8 or not. Does it make sense? Or, is there any other test suite to check the length?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not really check it explicitly .. does it? It tests it implicitly by creating checkpoints and then restarting. There are other tests that already do the same thing. E.g. This test is effectively same as
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala#L88

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you are right. This test currently relies on internal assert at Unsafe.pointTo for checking a multiple of 8

Copy link
Contributor

@cloud-fan cloud-fan Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will remove this since the test that @tdas pointed out causes the same assertion failure as my test case expected.

*/
public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need the assertion here, in pointTo, and in setTotalSize. Other places are just checking length for existing unsafe rows, which is unnecessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

s"Error reading delta file $fileToRead of $this: key size cannot be $keySize")
} else {
val keyRowBuffer = new Array[Byte](keySize)
// If key size in an existing file is not a multiple of 8, round it to multiple of 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can round. Assume the actual length of an unsafe row is 8, and previously we will append 4 bytes and have an unsafe row with 12 bytes, and save it to checkpoint. So here, when we reading old checkppint, we need to read 12 bytes, and set the length to 8.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we only need to do this for value, not key.

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79770 has finished for PR 18503 at commit 762f02a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val valueRow = new UnsafeRow(valueSchema.fields.length)
valueRow.pointTo(valueRowBuffer, valueSize)
// If valueSize in existing file is not multiple of 8, round it down to multiple of 8
valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
Copy link
Contributor

@tdas tdas Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This isnt rounding. This essentially floor to the multiple of 8.
@cloud-fan is this safe to do with ANY row generated in earlier Spark 2.0 - 2.2? I want to be 100% sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because the extra bytes that exceed the 8-bytes boundary in UnsafeRow will never be read, so it's safe to ignore them. And here we still respect the row length when reading the binary, so no data will be missed.

@kiszk can we add more comments to explain why this can happen? We should say that before Spark 2.3 we mistakenly append 4 bytes to the value row in aggregate buffer, which gets persisted into the checkpoint data and so on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added more comments.

@SparkQA
Copy link

SparkQA commented Jul 25, 2017

Test build #79924 has finished for PR 18503 at commit 0159701.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2017

Test build #79929 has finished for PR 18503 at commit 54be80e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// This is work around for the following.
// Pre-Spark 2.3 mistakenly append 4 bytes to the value row in
// `FixedLengthRowBasedKeyValueBatch`, which gets persisted into the checkpoint data
valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @kiszk Just to confirm again, are we absolutely sure that the issue is that there 4 extra bytes in checkpointed rows, and therefore this truncation is safe to do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I'm absolutely sure, as we store aggregate buffers in checkpoint files.

BTW, I run this test without the workaround here, and it can still pass. Which means, after this PR, RowBasedKeyValueBatch is fixed, and the generated checkpoint files have corrected row length.

And I also run the test without the fix for RowBasedKeyValueBatch, and it fails. I think this proves that RowBasedKeyValueBatch is the reason why we have wrong row length in checkpoit files.

val valueRow = new UnsafeRow(valueSchema.fields.length)
valueRow.pointTo(valueRowBuffer, valueSize)
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
// This is work around for the following.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is a workaround for the following:

valueRow.pointTo(valueRowBuffer, valueSize)
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
// This is work around for the following.
// Pre-Spark 2.3 mistakenly append 4 bytes to the value row in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Prior to Spark 2.3, we mistakenly ...

// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
// This is work around for the following.
// Pre-Spark 2.3 mistakenly append 4 bytes to the value row in
// `FixedLengthRowBasedKeyValueBatch`, which gets persisted into the checkpoint data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and VariableLengthRowBasedKeyValueBatch, let's just say RowBasedKeyValueBatch

@cloud-fan
Copy link
Contributor

LGTM, pending tests

@SparkQA
Copy link

SparkQA commented Jul 26, 2017

Test build #79956 has finished for PR 18503 at commit cc467de.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@tdas
Copy link
Contributor

tdas commented Jul 26, 2017

@cloud-fan @kiszk thanks for confirming. LGTM pending tests.

@SparkQA
Copy link

SparkQA commented Jul 26, 2017

Test build #79969 has finished for PR 18503 at commit cc467de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in ebbe589 Jul 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants