Skip to content

Conversation

@ConeyLiu
Copy link
Contributor

@ConeyLiu ConeyLiu commented Sep 5, 2017

What changes were proposed in this pull request?

When Spark persist data to Unsafe memory, we call the method MemoryStore.putIteratorAsBytes, which need synchronize the memoryManager for every record write. This implementation is not necessary, we can apply for more memory at a time to reduce unnecessary synchronization.

How was this patch tested?

Test case (with 1 executor 20 core):

val start = System.currentTimeMillis()
val data = sc.parallelize(0 until Integer.MAX_VALUE, 100)
      .persist(StorageLevel.OFF_HEAP)
      .count()

println(System.currentTimeMillis() - start)

Test result:

before

| 27647 | 29108 | 28591 | 28264 | 27232 |

after

| 26868 | 26358 | 27767 | 26653 | 26693 |

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 5, 2017

Hi, @cloud-fan @jerryshao Would you mind take a look ? Thanks a lot

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 5, 2017

Here https://github.com/apache/spark/pull/19135/files?diff=unified#diff-870cd3693df7a5add2ac3119d7d91d34L373, we call reserveAdditionalMemoryIfNecessary() for every record write.

@jerryshao
Copy link
Contributor

Sorry I'm not so familiar with this part, but from the test result seems that the performance just improved a little. I would doubt the way you generate RDD 0 until Integer.MAX_VALUE might take most of the time (since a large integer array needs to be serialized with tasks and ship to executor).

Also I see you use 1 executor with 20 cores to do test. In the normal usage case we will not allocate so many cores to 1 executor, can you please test with 2-4 cores per executor, I guess with less cores, the contention of MemoryManager lock should be alleviated, and the performance might be close.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 5, 2017

Firstly, Serialization time did not take a long time. You can see follow:
untitled

Secondly, I do not think that every executor in a distributed system should be set to very little core and memory. Because the more the process also means that more communication between the process, which means more data serialization and deserialization.

Thirdly, only when there are enough concurrent threads, thread synchronization will cause performance problems. In the server, we have 70 to 80 cores, concurrent tasks more than this.

This change is really small, the proportion of the entire task is also very small, so the impact on the total time is not so big, but in this test case, still increased by 5%.

@cloud-fan
Copy link
Contributor

Does this patch has regressions? It seems to me that allocating more memory may starve other tasks/operators and reduce the overall performance.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 6, 2017

hi @cloud-fan, The previous writing is the same as putIteratorAsValues. Now I have modified the code, each application for an additional chunkSize bytes of memory, because the size of ChunkedByteBufferOutputStream each growth is just chunkSize.

@jiangxb1987
Copy link
Contributor

It would be great to test the perf on executors with various corsPerExecutor settings to ensure we don't bring in regressions by the code change.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 7, 2017

@jiangxb1987 Ok, I can test it later. The following picture is when I run kmeans and put the source data into the offheap memory, and you can see the CPU time occupied by reserveUnrollMemoryForThisTask is very high.
pic

@cloud-fan
Copy link
Contributor

is it better to do batch unrolling? i.e., we can check memory usage and request memory for like every 10 records, instead of doing it for every record.

@ConeyLiu
Copy link
Contributor Author

@cloud-fan Very sorry to reply so late. I updated the code followed your suggestion. Does this need performance test? If needed, I will test it late.

@ConeyLiu ConeyLiu changed the title [SPARK-21923][CORE]Avoid call reserveUnrollMemoryForThisTask every record [SPARK-21923][CORE]Avoid calling reserveUnrollMemoryForThisTask for every record Sep 11, 2017
@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 14, 2017

Hi @cloud-fan , @jiangxb1987 . The follow is the test result, I have modified the test to server.
2 cores with 150GB memory:

after before proportion
135881 135183
139352 135652
138356 136431
137041 137980
137191 139708
137564.2 136990.8 1.004186

20 cores with 150GB memory:

after before proportion
27519 27683
27319 28055
25905 27357
26362 28709
25042 29306
26429.4 28222 0.936482

50 cores with 150GB memory

after before proportion
29740 30570
28573 29881
28922 30221
29809 30096
29069 30349
29222.6 30223.4 0.966887

@jerryshao
Copy link
Contributor

So it somehow reflects that CPU core contention is the main issue for memory pre-occupation , am I right?

AFAIK from our customer, we usually don't allocate so many cores to one executor, also avoid big executor (many cores + large heap memory) to reduce GC and other memory/cpu contentions.

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Sep 14, 2017

Hi @jerryshao thanks for your reviewing.

So it somehow reflects that CPU core contention is the main issue for memory pre-occupation

I have modified the code, now it will not request more memory, it just reduce the times of calling reserveUnrollMemoryForThisTask followed by @cloud-fan comments. And also the method is same as putIteratorAsValues.

Yeah, its impact will be small with small cores. In the above test results, it doesn't bring any regressions, and also better for many cores. For machine learning, we need cache the source data to OFF_HEAP in order to reduce the gc problem.

For the configuration, I think the different application scenarios may be different.

// Number of elements unrolled so far
var elementsUnrolled = 0L
// How often to check whether we need to request more memory
val memoryCheckPeriod = 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same approach we used in putIteratorAsValues. One thing still missing is memoryGrowthFactor, can we add that too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the future we should do some refactor between putIteratorAsValues and putIteratorAsBytes, to reduce duplicated code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll add it. I think I can take a try to refactor it. Do you need a separate pr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea definitely not in this PR.

// Number of elements unrolled so far
var elementsUnrolled = 0L
// How often to check whether we need to request more memory
val memoryCheckPeriod = 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be made configurable? Or can we make sure it's the best magic number that we can choose?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's hard coded in putIteratorAsValues too, we can improve it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just made it configurable. I'm not sure if this writting is reasonable.

val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = 16
val memoryCheckPeriod = conf.getLong("spark.storage.unrollMemoryCheckPeriod", 16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should move these 2 configs to org.apache.spark.internal.config and add some documents. They should be internal config I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, moved. Pls take a took.

private[spark] val UNROLL_MEMORY_CHECK_PERIOD =
ConfigBuilder("spark.storage.unrollMemoryCheckPeriod")
.doc("The memory check period is used to determine how often we should check whether "
+ "there is a need to request more memory when we try to put the given block in memory.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put -> unroll

@cloud-fan
Copy link
Contributor

retest this please

@cloud-fan
Copy link
Contributor

LGTM except one minor comment


private[spark] val UNROLL_MEMORY_CHECK_PERIOD =
ConfigBuilder("spark.storage.unrollMemoryCheckPeriod")
.doc("The memory check period is used to determine how often we should check whether "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call .internal() here, to make them internal configs, as we don't expect users to tune it.

@SparkQA
Copy link

SparkQA commented Sep 18, 2017

Test build #81868 has finished for PR 19135 at commit 7f97cc2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ConeyLiu
Copy link
Contributor Author

Hi @cloud-fan, thanks for reviewing. The code has updated, pls take a look.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Sep 18, 2017

Test build #81878 has finished for PR 19135 at commit e1dc7a4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants