@@ -59,8 +59,8 @@ private[streaming] trait ReceiverSchedulingPolicy {
5959 * If a receiver is scheduled to an executor but has not yet run, it contributes
6060 * `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
6161 * </ul>
62- * At last, we will randomly select one of the executors that have the least weight and add it
63- * to the candidate list .
62+ * At last, if there are more than 3 idle executors (weight = 0), returns all idle executors.
63+ * Otherwise, we only return 3 best options according to the weights .
6464 * </li>
6565 * </ol>
6666 *
@@ -96,12 +96,14 @@ private[streaming] class LoadBalanceReceiverSchedulingPolicyImpl extends Receive
9696 }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
9797
9898 val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
99- if (idleExecutors.nonEmpty ) {
100- // If there are idle executors, randomly select one
101- locations += idleExecutors( Random .nextInt(idleExecutors.size))
99+ if (idleExecutors.size >= 3 ) {
100+ // If there are more than 3 idle executors, return all of them
101+ locations ++ = idleExecutors
102102 } else {
103- // Use the executor that runs the least receivers
104- locations += executorWeights.minBy(_._2)._1
103+ // If there are less than 3 idle executors, return 3 best options
104+ locations ++= idleExecutors
105+ val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
106+ locations ++= (idleExecutors ++ sortedExecutors).take(3 )
105107 }
106108 locations.toSeq
107109 }
0 commit comments