Skip to content

Conversation

@JoshRosen
Copy link
Contributor

This patch extends TungstenAggregate to support ImperativeAggregate functions. The existing TungstenAggregate operator only supported DeclarativeAggregate functions, which are defined in terms of Catalyst expressions and can be evaluated via generated projections. ImperativeAggregate functions, on the other hand, are evaluated by calling their initialize, update, merge, and eval methods.

The basic strategy here is similar to how SortBasedAggregate evaluates both types of aggregate functions: use a generated projection to evaluate the expression-based declarative aggregates with dummy placeholder expressions inserted in place of the imperative aggregate function output, then invoke the imperative aggregate functions and target them against the aggregation buffer. The bulk of the diff here consists of code that was copied and adapted from SortBasedAggregate, with some key changes to handle TungstenAggregate's sort fallback path.

@JoshRosen
Copy link
Contributor Author

/cc @yhuai for review. There are a few specific questions that I'd like to clarify and I will comment on them inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To explain what's going on here: it turns out that the refactoring in #8973 to lift this into InterpretedAggregate ended up introducing a bug due to lazy val initialization issues. To avoid this, I just pushed these copies to the leaves of our inheritance hierarchy and removed the default implementation from the parent class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if anyone tries to move it back to ImperativeAggregate, he/she will see an test failure? Should we add a comment to ImperativeAggregate to explain why a leaf class needs to implement this?

@SparkQA
Copy link

SparkQA commented Oct 8, 2015

Test build #43438 has finished for PR 9038 at commit 7a34e03.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait Encoder[T]
    • case class ClassEncoder[T](
    • case class StaticInvoke(
    • case class Invoke(
    • case class NewInstance(
    • case class UnwrapOption(
    • case class LambdaVariable(value: String, isNull: String, dataType: DataType) extends Expression
    • case class MapObjects(

@rxin
Copy link
Contributor

rxin commented Oct 9, 2015

YAY

@JoshRosen JoshRosen changed the title [SPARK-11017] Support ImperativeAggregates in TungstenAggregate [SPARK-11017] [SQL] Support ImperativeAggregates in TungstenAggregate Oct 9, 2015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also remove it from planAggregateWithOneDistinct?

For planAggregateWithoutPartial, we can add the flag usesTungstenAggregate and whenever possible use TungstenAggregate (we can do it in a separate pr. I created https://issues.apache.org/jira/browse/SPARK-11028 for this).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, good catch. I removed this from planAggregateWithOneDistinct and it led to a failure in single distinct column set:

[info]   == Results ==
[info]   !== Correct Answer - 4 ==                                                        == Spark Answer - 4 ==
[info]   ![100.0,0.3333333333333333,1.0,2,99.33333333333333,10.0,2.0]                     [100.0,0.3333333333333333,1.0,2,101.0,10.0,2.0]
[info]   ![110.0,10.0,20.0,null,109.0,11.0,30.0]                                          [110.0,10.0,20.0,null,120.0,11.0,30.0]
[info]   ![120.0,23.333333333333332,-3.3333333333333335,1,122.33333333333333,12.0,20.0]   [120.0,23.333333333333332,-3.3333333333333335,1,96.66666666666667,12.0,20.0]
[info]   ![null,null,3.0,3,null,null,null]                                                [null,null,3.0,3,103.0,null,null] (AggregationQuerySuite.scala:690)

Based on the failure mode here, I suspect that another one of the buffer offsets needs to be reset in response to the sort fallback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also reminds me: are there planner tests that should be updated to assert that we're actually using the new Tungsten path? Let me check and see.

@yhuai
Copy link
Contributor

yhuai commented Oct 11, 2015

ah, assert(initialInputBufferOffset > 0) failed because when there is not grouping expression and the mode is not partial, initialInputBufferOffset will be 0.

@yhuai
Copy link
Contributor

yhuai commented Oct 11, 2015

For that failed test, I think the problem is that those two agg operators in the reducer are holding the same reference of those aggregate functions. Essentially, we want to have a way to clone a agg function without changing aggBufferAttributes and inputAggBufferAttributes. How about we address this issue in a separate PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried

val originalFunc = allAggregateExpressions(i).aggregateFunction
val baos = new ByteArrayOutputStream()
val funcOut = new ObjectOutputStream(baos)
funcOut.writeObject(originalFunc)
val byteArray = baos.toByteArray
funcOut.close()
val funcIn = new ObjectInputStream((new ByteArrayInputStream(byteArray)))
val func = funcIn.readObject().asInstanceOf[AggregateFunction2]

to confirm my guess (multiple operators are holding the same ImperativeAggregate instance). After I add this, all tests of TungstenAggregationQueryWithControlledFallbackSuite pass (I also remove these two assertions for initialInputBufferOffset).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I wonder whether we can address this treating ImperativeAggregtate instances as immutable and changing the with*Offset methods to return new instances. This seems a bit easier / safer than having to remember to clone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that this would also help to avoid potential issues caused by executor-side mutable state in the aggregate functions, should they wind up being used multiple times in different parts of a pipeline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it will be good to return a new instance when we call with*Offset. For UDAF, I think it may make sense to add a newInstance interface so, if a user-defined one has internal states, we can create copies of it at executor side. There is one difficulty that we need to preserve those aggBufferAttributes and inputAggBufferAttributes' exprIds when we create new instances. So, binding references can work correctly without requiring too many changes. Is there an easy way to do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thought: instead of trying to preserve aggBufferAttributes and inputAggBufferAttributes when returning a new instance, what if we also updated the other expressions which referenced those attributes? Maybe that would be easier.

@JoshRosen
Copy link
Contributor Author

@yhuai, I think that we need to understand why my latest fix works, since I think that the test failure might impact non-distinct aggregates as well (in which case we can't easily split this ticket into the distinct and non-distinct cases).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create a PR to include changes we made before we deleted this check? So, we can merge those changes earlier first and then spend time to figure out the current issue. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps. Let me see if I'm convinced that the other tests are still valid even if we skip this distinct case.

@yhuai
Copy link
Contributor

yhuai commented Oct 12, 2015

What is the failed test (and the stack trace)?

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43589 has finished for PR 9038 at commit 17b1d7a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class HyperLogLogPlusPlus(

@JoshRosen
Copy link
Contributor Author

To elaborate on the test failures:

All tests pass as of 8695e3f, when the Tungsten code path is not used for imperative distinct aggregates.

As of a09c51b, which enables this for distinct aggregates, the "single distinct column set" query fails by returning the wrong answer. Here, I think the problem was that the attribute bindings were being mixed up, causing us to return the wrong columns in one of the projections.

After trying to fix things by copying when changing the buffer offsets, I wound up getting binding errors. My current understanding of attribute binding is that we shouldn't be creating new attributes on executors because this could lead to expression ids being re-used in an incorrect fashion. Given this, I think the cleanest thing to do will be to do the ImperativeAggregate mutable-part/immutable-part interface refactoring that I proposed earlier.

@JoshRosen
Copy link
Contributor Author

I'm fairly concerned about the implications of having mutable state inside of ImperativeAggregates which isn't reset when switching to sort / could be shared across stages of a pipeline, so I'm slightly worried about the idea of reverting back to 8695e3f and merging this PR as of then: it seems like the same underlying problems could exist there, too.

@JoshRosen
Copy link
Contributor Author

See #9093 for an exploration of some sketchy-looking newExprId calls which might be involved in the problems that we're experiencing here.

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43683 has finished for PR 9038 at commit 2547b29.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class HyperLogLogPlusPlus(
    • case class ExprId(id: Long, jvmId: UUID)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a discussion with @JoshRosen offline. Here is the explaining for this change.

Our ImperativeAggregate and DeclarativeAggregate are quite different. For an ImperativeAggregate, its correctness is based on the correctness of mutableAggBufferOffset and inputAggBufferOffset, and when there is no fallback, it does not rely on attribute ids for binding. To make mutableAggBufferOffset and inputAggBufferOffset immutable, whenever we call withNewMutableAggBufferOffset and withNewInputAggBufferOffset, we create a new copy of the ImperativeAggregate. Although a new copy contains a new set of attribute ids for aggBufferAttributes and inputAggBufferAttributes, in most cases, those new ids do not introduce any problem because an ImperativeAggregate does not rely on these ids. However, once we fallback to sort-based aggregation and we need to extract input aggregation buffers, we need to have matching attribute ids to make it work. Since allAggregateFunctions.flatMap(_.inputAggBufferAttributes) will have attributes with a new set of ids, we will see a binding error at here. originalInputAttributes.drop(initialInputBufferOffset) is fine at here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm folding this into the comments right now. Thanks for summarizing.

@yhuai
Copy link
Contributor

yhuai commented Oct 14, 2015

I tried the UDAF in https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html. I tried the following test in build/sbt -Phive sparkShell.

// Generating data
sc
  .parallelize(1 to 1000, 1)
  .flatMap(i => (1 to 20000).map(j => (i, j, j, j, j, j, j, j, j, j, j)))
  .toDF("i", "j1", "j2", "j3", "j4", "j5", "j6", "j7", "j8", "j9", "j10")
  .write
  .format("parquet")
  .saveAsTable("testAgg")

// Run query.
import org.apache.spark.sql.functions._

val gm = new GeometricMean
sqlContext.udf.register("gm", gm)

sqlContext.sql("set spark.sql.shuffle.partitions=1")
sqlContext.read.load("/user/hive/warehouse/testagg").registerTempTable("testAgg")

val start = System.currentTimeMillis
(1 to 10).foreach { i=>
sqlContext.sql("""
  select
    i,
    gm(j1),
    gm(j2),
    gm(j3),
    gm(j4),
    gm(j5),
    sum(j6),
    sum(j7),
    sum(j8),
    sum(j9),
    sum(j10)
  from testAgg
  group by i""").queryExecution.executedPlan.execute().foreach(x => Unit)
}
println("took " + ((System.currentTimeMillis - start).toDouble / 1000/10))

Without this PR, the avg execution time was 31.6s. With this PR, the avg execution time was 27.8s.

@yhuai
Copy link
Contributor

yhuai commented Oct 14, 2015

test this please

@yhuai
Copy link
Contributor

yhuai commented Oct 14, 2015

LGTM pending jenkins.

@yhuai
Copy link
Contributor

yhuai commented Oct 14, 2015

I took a look at query plans of dfs used in those TungstenAggregation* tests. Those plans look good.

@SparkQA
Copy link

SparkQA commented Oct 15, 2015

Test build #1903 has finished for PR 9038 at commit 4867a3d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class HyperLogLogPlusPlus(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it should be [key, value]. Will fix it while merging it.

@yhuai
Copy link
Contributor

yhuai commented Oct 15, 2015

Thank you @JoshRosen ! I am merging it to master.

@asfgit asfgit closed this in 4ace4f8 Oct 15, 2015
@SparkQA
Copy link

SparkQA commented Oct 15, 2015

Test build #43746 has finished for PR 9038 at commit 4867a3d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class HyperLogLogPlusPlus(

@JoshRosen JoshRosen deleted the support-interpreted-in-tungsten-agg-final branch August 29, 2016 19:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants