@@ -3021,41 +3021,54 @@ test_that("dapplyCollect() on DataFrame with a binary column", {
30213021})
30223022
30233023test_that(" repartition by columns on DataFrame" , {
3024- df <- createDataFrame(
3025- list (list (1L , 1 , " 1" , 0.1 ), list (1L , 2 , " 2" , 0.2 ), list (3L , 3 , " 3" , 0.3 )),
3026- c(" a" , " b" , " c" , " d" ))
3027-
3028- # no column and number of partitions specified
3029- retError <- tryCatch(repartition(df ), error = function (e ) e )
3030- expect_equal(grepl
3031- (" Please, specify the number of partitions and/or a column\\ (s\\ )" , retError ), TRUE )
3032-
3033- # repartition by column and number of partitions
3034- actual <- repartition(df , 3 , col = df $ " a" )
3035-
3036- # Checking that at least the dimensions are identical
3037- expect_identical(dim(df ), dim(actual ))
3038- expect_equal(getNumPartitions(actual ), 3L )
3039-
3040- # repartition by number of partitions
3041- actual <- repartition(df , 13L )
3042- expect_identical(dim(df ), dim(actual ))
3043- expect_equal(getNumPartitions(actual ), 13L )
3044-
3045- expect_equal(getNumPartitions(coalesce(actual , 1L )), 1L )
3046-
3047- # a test case with a column and dapply
3048- schema <- structType(structField(" a" , " integer" ), structField(" avg" , " double" ))
3049- df <- repartition(df , col = df $ " a" )
3050- df1 <- dapply(
3051- df ,
3052- function (x ) {
3053- y <- (data.frame (x $ a [1 ], mean(x $ b )))
3054- },
3055- schema )
3024+ # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
3025+ # partitions to reduce the number of the tasks to speed up the test. This is particularly
3026+ # slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
3027+ conf <- callJMethod(sparkSession , " conf" )
3028+ shufflepartitionsvalue <- callJMethod(conf , " get" , " spark.sql.shuffle.partitions" )
3029+ callJMethod(conf , " set" , " spark.sql.shuffle.partitions" , " 5" )
3030+ tryCatch({
3031+ df <- createDataFrame(
3032+ list (list (1L , 1 , " 1" , 0.1 ), list (1L , 2 , " 2" , 0.2 ), list (3L , 3 , " 3" , 0.3 )),
3033+ c(" a" , " b" , " c" , " d" ))
3034+
3035+ # no column and number of partitions specified
3036+ retError <- tryCatch(repartition(df ), error = function (e ) e )
3037+ expect_equal(grepl
3038+ (" Please, specify the number of partitions and/or a column\\ (s\\ )" , retError ), TRUE )
3039+
3040+ # repartition by column and number of partitions
3041+ actual <- repartition(df , 3 , col = df $ " a" )
3042+
3043+ # Checking that at least the dimensions are identical
3044+ expect_identical(dim(df ), dim(actual ))
3045+ expect_equal(getNumPartitions(actual ), 3L )
3046+
3047+ # repartition by number of partitions
3048+ actual <- repartition(df , 13L )
3049+ expect_identical(dim(df ), dim(actual ))
3050+ expect_equal(getNumPartitions(actual ), 13L )
3051+
3052+ expect_equal(getNumPartitions(coalesce(actual , 1L )), 1L )
3053+
3054+ # a test case with a column and dapply
3055+ schema <- structType(structField(" a" , " integer" ), structField(" avg" , " double" ))
3056+ df <- repartition(df , col = df $ " a" )
3057+
3058+ df1 <- dapply(
3059+ df ,
3060+ function (x ) {
3061+ y <- (data.frame (x $ a [1 ], mean(x $ b )))
3062+ },
3063+ schema )
30563064
3057- # Number of partitions is equal to 2
3058- expect_equal(nrow(df1 ), 2 )
3065+ # Number of partitions is equal to 2
3066+ expect_equal(nrow(df1 ), 2 )
3067+ },
3068+ finally = {
3069+ # Resetting the conf back to default value
3070+ callJMethod(conf , " set" , " spark.sql.shuffle.partitions" , shufflepartitionsvalue )
3071+ })
30593072})
30603073
30613074test_that(" coalesce, repartition, numPartitions" , {
@@ -3078,101 +3091,117 @@ test_that("coalesce, repartition, numPartitions", {
30783091})
30793092
30803093test_that(" gapply() and gapplyCollect() on a DataFrame" , {
3081- df <- createDataFrame(
3082- list (list (1L , 1 , " 1" , 0.1 ), list (1L , 2 , " 1" , 0.2 ), list (3L , 3 , " 3" , 0.3 )),
3083- c(" a" , " b" , " c" , " d" ))
3084- expected <- collect(df )
3085- df1 <- gapply(df , " a" , function (key , x ) { x }, schema(df ))
3086- actual <- collect(df1 )
3087- expect_identical(actual , expected )
3088-
3089- df1Collect <- gapplyCollect(df , list (" a" ), function (key , x ) { x })
3090- expect_identical(df1Collect , expected )
3091-
3092- # gapply on empty grouping columns.
3093- df1 <- gapply(df , c(), function (key , x ) { x }, schema(df ))
3094- actual <- collect(df1 )
3095- expect_identical(actual , expected )
3096-
3097- # Computes the sum of second column by grouping on the first and third columns
3098- # and checks if the sum is larger than 2
3099- schemas <- list (structType(structField(" a" , " integer" ), structField(" e" , " boolean" )),
3100- " a INT, e BOOLEAN" )
3101- for (schema in schemas ) {
3102- df2 <- gapply(
3094+ # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
3095+ # partitions to reduce the number of the tasks to speed up the test. This is particularly
3096+ # slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
3097+ conf <- callJMethod(sparkSession , " conf" )
3098+ shufflepartitionsvalue <- callJMethod(conf , " get" , " spark.sql.shuffle.partitions" )
3099+ # TODO: Lower number of 'spark.sql.shuffle.partitions' causes test failures
3100+ # for an unknown reason. Probably we should fix it.
3101+ callJMethod(conf , " set" , " spark.sql.shuffle.partitions" , " 16" )
3102+ tryCatch({
3103+ df <- createDataFrame(
3104+ list (list (1L , 1 , " 1" , 0.1 ), list (1L , 2 , " 1" , 0.2 ), list (3L , 3 , " 3" , 0.3 )),
3105+ c(" a" , " b" , " c" , " d" ))
3106+ expected <- collect(df )
3107+ df1 <- gapply(df , " a" , function (key , x ) { x }, schema(df ))
3108+ actual <- collect(df1 )
3109+ expect_identical(actual , expected )
3110+
3111+ df1Collect <- gapplyCollect(df , list (" a" ), function (key , x ) { x })
3112+ expect_identical(df1Collect , expected )
3113+
3114+ # gapply on empty grouping columns.
3115+ df1 <- gapply(df , c(), function (key , x ) { x }, schema(df ))
3116+ actual <- collect(df1 )
3117+ expect_identical(actual , expected )
3118+
3119+ # Computes the sum of second column by grouping on the first and third columns
3120+ # and checks if the sum is larger than 2
3121+ schemas <- list (structType(structField(" a" , " integer" ), structField(" e" , " boolean" )),
3122+ " a INT, e BOOLEAN" )
3123+ for (schema in schemas ) {
3124+ df2 <- gapply(
3125+ df ,
3126+ c(df $ " a" , df $ " c" ),
3127+ function (key , x ) {
3128+ y <- data.frame (key [1 ], sum(x $ b ) > 2 )
3129+ },
3130+ schema )
3131+ actual <- collect(df2 )$ e
3132+ expected <- c(TRUE , TRUE )
3133+ expect_identical(actual , expected )
3134+
3135+ df2Collect <- gapplyCollect(
3136+ df ,
3137+ c(df $ " a" , df $ " c" ),
3138+ function (key , x ) {
3139+ y <- data.frame (key [1 ], sum(x $ b ) > 2 )
3140+ colnames(y ) <- c(" a" , " e" )
3141+ y
3142+ })
3143+ actual <- df2Collect $ e
3144+ expect_identical(actual , expected )
3145+ }
3146+
3147+ # Computes the arithmetic mean of the second column by grouping
3148+ # on the first and third columns. Output the groupping value and the average.
3149+ schema <- structType(structField(" a" , " integer" ), structField(" c" , " string" ),
3150+ structField(" avg" , " double" ))
3151+ df3 <- gapply(
31033152 df ,
3104- c(df $ " a" , df $ " c" ),
3153+ c(" a" , " c" ),
31053154 function (key , x ) {
3106- y <- data.frame (key [ 1 ], sum (x $ b ) > 2 )
3155+ y <- data.frame (key , mean (x $ b ), stringsAsFactors = FALSE )
31073156 },
31083157 schema )
3109- actual <- collect(df2 )$ e
3110- expected <- c(TRUE , TRUE )
3158+ actual <- collect(df3 )
3159+ actual <- actual [order(actual $ a ), ]
3160+ rownames(actual ) <- NULL
3161+ expected <- collect(select(df , " a" , " b" , " c" ))
3162+ expected <- data.frame (aggregate(expected $ b , by = list (expected $ a , expected $ c ), FUN = mean ))
3163+ colnames(expected ) <- c(" a" , " c" , " avg" )
3164+ expected <- expected [order(expected $ a ), ]
3165+ rownames(expected ) <- NULL
31113166 expect_identical(actual , expected )
31123167
3113- df2Collect <- gapplyCollect(
3168+ df3Collect <- gapplyCollect(
31143169 df ,
3115- c(df $ " a" , df $ " c" ),
3170+ c(" a" , " c" ),
31163171 function (key , x ) {
3117- y <- data.frame (key [ 1 ], sum (x $ b ) > 2 )
3118- colnames(y ) <- c(" a" , " e " )
3172+ y <- data.frame (key , mean (x $ b ), stringsAsFactors = FALSE )
3173+ colnames(y ) <- c(" a" , " c " , " avg " )
31193174 y
31203175 })
3121- actual <- df2Collect $ e
3122- expect_identical(actual , expected )
3123- }
3124-
3125- # Computes the arithmetic mean of the second column by grouping
3126- # on the first and third columns. Output the groupping value and the average.
3127- schema <- structType(structField(" a" , " integer" ), structField(" c" , " string" ),
3128- structField(" avg" , " double" ))
3129- df3 <- gapply(
3130- df ,
3131- c(" a" , " c" ),
3132- function (key , x ) {
3133- y <- data.frame (key , mean(x $ b ), stringsAsFactors = FALSE )
3134- },
3135- schema )
3136- actual <- collect(df3 )
3137- actual <- actual [order(actual $ a ), ]
3138- rownames(actual ) <- NULL
3139- expected <- collect(select(df , " a" , " b" , " c" ))
3140- expected <- data.frame (aggregate(expected $ b , by = list (expected $ a , expected $ c ), FUN = mean ))
3141- colnames(expected ) <- c(" a" , " c" , " avg" )
3142- expected <- expected [order(expected $ a ), ]
3143- rownames(expected ) <- NULL
3144- expect_identical(actual , expected )
3145-
3146- df3Collect <- gapplyCollect(
3147- df ,
3148- c(" a" , " c" ),
3149- function (key , x ) {
3150- y <- data.frame (key , mean(x $ b ), stringsAsFactors = FALSE )
3151- colnames(y ) <- c(" a" , " c" , " avg" )
3152- y
3153- })
3154- actual <- df3Collect [order(df3Collect $ a ), ]
3155- expect_identical(actual $ avg , expected $ avg )
3156-
3157- irisDF <- suppressWarnings(createDataFrame(iris ))
3158- schema <- structType(structField(" Sepal_Length" , " double" ), structField(" Avg" , " double" ))
3159- # Groups by `Sepal_Length` and computes the average for `Sepal_Width`
3160- df4 <- gapply(
3161- cols = " Sepal_Length" ,
3162- irisDF ,
3163- function (key , x ) {
3164- y <- data.frame (key , mean(x $ Sepal_Width ), stringsAsFactors = FALSE )
3165- },
3166- schema )
3167- actual <- collect(df4 )
3168- actual <- actual [order(actual $ Sepal_Length ), ]
3169- rownames(actual ) <- NULL
3170- agg_local_df <- data.frame (aggregate(iris $ Sepal.Width , by = list (iris $ Sepal.Length ), FUN = mean ),
3171- stringsAsFactors = FALSE )
3172- colnames(agg_local_df ) <- c(" Sepal_Length" , " Avg" )
3173- expected <- agg_local_df [order(agg_local_df $ Sepal_Length ), ]
3174- rownames(expected ) <- NULL
3175- expect_identical(actual , expected )
3176+ actual <- df3Collect [order(df3Collect $ a ), ]
3177+ expect_identical(actual $ avg , expected $ avg )
3178+
3179+ irisDF <- suppressWarnings(createDataFrame(iris ))
3180+ schema <- structType(structField(" Sepal_Length" , " double" ), structField(" Avg" , " double" ))
3181+ # Groups by `Sepal_Length` and computes the average for `Sepal_Width`
3182+ df4 <- gapply(
3183+ cols = " Sepal_Length" ,
3184+ irisDF ,
3185+ function (key , x ) {
3186+ y <- data.frame (key , mean(x $ Sepal_Width ), stringsAsFactors = FALSE )
3187+ },
3188+ schema )
3189+ actual <- collect(df4 )
3190+ actual <- actual [order(actual $ Sepal_Length ), ]
3191+ rownames(actual ) <- NULL
3192+ agg_local_df <- data.frame (aggregate(iris $ Sepal.Width ,
3193+ by = list (iris $ Sepal.Length ),
3194+ FUN = mean ),
3195+ stringsAsFactors = FALSE )
3196+ colnames(agg_local_df ) <- c(" Sepal_Length" , " Avg" )
3197+ expected <- agg_local_df [order(agg_local_df $ Sepal_Length ), ]
3198+ rownames(expected ) <- NULL
3199+ expect_identical(actual , expected )
3200+ },
3201+ finally = {
3202+ # Resetting the conf back to default value
3203+ callJMethod(conf , " set" , " spark.sql.shuffle.partitions" , shufflepartitionsvalue )
3204+ })
31763205})
31773206
31783207test_that(" Window functions on a DataFrame" , {
0 commit comments