Skip to content

Conversation

@BryanCutler
Copy link
Member

@BryanCutler BryanCutler commented May 24, 2018

What changes were proposed in this pull request?

Currently, a pandas_udf of type PandasUDFType.GROUPED_MAP will assign the resulting columns based on index of the return pandas.DataFrame. If a new DataFrame is returned and constructed using a dict, then the order of the columns could be arbitrary and be different than the defined schema for the UDF. If the schema types still match, then no error will be raised and the user will see column names and column data mixed up.

This change will first try to assign columns using the return type field names. If a KeyError occurs, then the column index is checked if it is string based. If so, then the error is raised as it is most likely a naming mistake, else it will fallback to assign columns by position and raise a TypeError if the field types do not match.

How was this patch tested?

Added a test that returns a new DataFrame with column order different than the schema.

@BryanCutler
Copy link
Member Author

I think it's pretty common that users will construct a pd.DataFrame from a dict and see this issue. So I think we have to use the column name instead of index. I can't remember if this was previously discussed or not. cc @icexelloss @HyukjinKwon @ueshin

@BryanCutler
Copy link
Member Author

Also, if this fix looks ok should we backport? I can see a lot of people hitting this and getting wrong data, so seems like a correctness bug to me.

@SparkQA
Copy link

SparkQA commented May 24, 2018

Test build #91124 has finished for PR 21427 at commit d67a8a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@icexelloss
Copy link
Contributor

I agree with the choice here.

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@BryanCutler
Copy link
Member Author

BryanCutler commented May 25, 2018

At first glance, I thought this issue was slightly different than https://issues.apache.org/jira/browse/SPARK-23929, but yeah it seems to be the same. After reading through that discussion, I guess we need to be careful about any changes. I'm not used to creating DataFrames by position, but it is possible to do so with a list of tuples like the example from the doctest:

       >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
       ... def mean_udf(key, pdf):
       ...     # key is a tuple of one numpy.int64, which is the value
       ...     # of 'id' for the current group
       ...     return pd.DataFrame([key + (pdf.v.mean(),)])
  

Then this would be a breaking change... so maybe it would be best to add better documentation for now like @HyukjinKwon mentioned in SPARK-23929, and target a change for Spark 3.0?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 25, 2018

Hmmmm .. I got that this one is more preferable and I think we haven't got a discussion for this so far if I remember this correctly.

Do you feel strongly about this @icexelloss and @BryanCutler? If so, let's update migration guide for 2.4.0 ... and I hope we can document this feature as an experimental (separate JIRA?). I think it could be a-okay.

Otherwise, I a bit more prefer to target this 3.0.0 and document this for now .. Another option is to add a configuration to control this behaviour but I remember it's tricky to inject the configuration there.

@icexelloss
Copy link
Contributor

icexelloss commented May 25, 2018

I do think the current default behavior might be confusing to users and hard to debug. I have also received similar complaints.

Two simple options are:
(1) Keep the current behavior unchanged (match by indices)
(2) Match by column names, throw exception when names don't match exactly

I think there is another possible choice:
(3) Match by name if "same column name, different order"; Match by indices if the column names are "unspecified" (i.e., if you create a pd.DataFrame without specifying column names, column names will be 0, 1, 2, ...). In other cases, we can throw exception.

I know this is a breaking change but I feel like it's better to change now rather later because the feature is still new and there is going to be more people hit by the current behavior. I think (3) seems like it's the least disruptive change - the only two case it will change is (a) same column name, different order, in which case I think people probably want "match by name" (b) different column names between schema and return value, in which case I think it's likely to be a typo and user should fix the code.

I do think we want to fix this at least in 2.4 if not 2.3.x...

@HyukjinKwon
Copy link
Member

Just for clarification, I am okay @BryanCutler if you feel in this way too.

@gatorsmile
Copy link
Member

How about making it configurable? Users can choose either resolve by names or resolve by positions. It is hard to say which one is right. If the names do not match when users want to resolve by names, we should issue an error.

@gatorsmile
Copy link
Member

Do not backport this to 2.3. This is a behavior change.

@HyukjinKwon
Copy link
Member

Yea agree with not backporting and agree with configuration. Thing is, the configuration is inaccessible in worker.py side. That's why I was hesitant. The safest way is just to target 3.0.0 but there are currently many complaints too on the other hand.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 25, 2018

Also, I really think we should mark this feature as experimental. Actually, I think we should make a blocker JIRA to target mark them as experimental even, and then backport this JIRA to branch-2.3 so that we claim it's experimental.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems col is not used btw.

@gatorsmile
Copy link
Member

cc @rxin

@rxin
Copy link
Contributor

rxin commented May 25, 2018

If this has been released you can't just change it like this; it will break users' programs immediately. At the very least introduce a flag so it can be set by the user to avoid breaking their code.

@HyukjinKwon
Copy link
Member

but as I said it's difficult to have a configuration there. Shall we just target 3.0.0 abd martk this as experimental as I suggeated from the first place? That should be the safest way.

@rxin
Copy link
Contributor

rxin commented May 25, 2018 via email

@HyukjinKwon
Copy link
Member

because it needs to change the our pickle protocol to access to the configuration if I remember this correctly. cc @ueshin too.

@HyukjinKwon
Copy link
Member

BTW, what do you think about adding a blocker to set this feature as experimental @rxin? I think it's pretty new feature and it should be reasonable to call it experimental.

@rxin
Copy link
Contributor

rxin commented May 25, 2018 via email

@HyukjinKwon
Copy link
Member

I believe it was just a mistake to correct - we forget this to mark it experimental. It's pretty unstable and many JIRAs are being open. @BryanCutler mind if I ask to go ahead if you find some time? if you are busy will do it by myself.

cc @vanzin FYI.

@rxin
Copy link
Contributor

rxin commented May 25, 2018 via email

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 25, 2018

I need to take a look too but sounds possible. WDYT @BryanCutler? BTW, the current fix here should be the most appropriate place to fix since that's actually where the problem was started.

@icexelloss
Copy link
Contributor

@rxin @gatorsmile thanks for joining the discussion!

On the configuration side, we have already some mechanism to do so for the "timezone" config:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L48
I'd imagine we could extend the mechanism to support arbitrary configuration map.

On the behavior side, I think more about this and I feel a desirable behavior is support both matching by name and by index, i.e.
(1) If the output dataframe has the same column names as the schema, we match by column name, this is desirable behavior where user do:

return pd.DataFrame({'a': ..., 'b': ...})

(2) If the output dataframe has column names "0, 1, ,2 ...", we match by indices, this is because when user doesn't specify column names when creating a pd.DataFrame, that's the default column names, e.g.

>>> pd.DataFrame([[1, 2.0, "hello"], [4, 5.0, "xxx"]])
   0    1      2
0  1  2.0  hello
1  4  5.0    xxx

(3) throw exception otherwise

What do you think of having the new configuration support this behavior?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 25, 2018

Yea, that's what I meant. Having configuration should be desiable but I doubt if we should extend that way further. one time thing could be fine too. I roughly guess that's going to be a min fix to address all concerns here? If fixing in JVM side is smaller, let's go that way.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 25, 2018

btw, we should really mark this as experimental and allow a bit of behaviour changes really. I guess that's what we meant by Experimental:

* Experimental API's might change or be removed in minor versions of Spark, or be adopted as

@BryanCutler
Copy link
Member Author

I opened https://issues.apache.org/jira/browse/SPARK-24392 to continue the discussion about changing this to experimental. IMO it was a bit of an oversight to not do so initially, but agree that it is a little strange to change it after being released.

@BryanCutler
Copy link
Member Author

I've been thinking about this and came to the same conclusion as @icexelloss here #21427 (comment) that we could really support both names and position, and fix this without changing behavior.

When the user defines as grouped map udf, the StructType has field names so if the returned DataFrame has column names they should match. If the user returned a DataFrame with positional columns only, pandas will name the columns with an integer index (not an integer string). We could change the logic to do the following:

Assign columns by name, catching a KeyError exception
If the column names are all integers, then fallback to assign by position
Else raise the KeyError (most likely the user has a typo in the column name)

I think that will solve this issue and not change the behavior, but I would need check that this will hold for different pandas versions. How does that sound?

@rxin
Copy link
Contributor

rxin commented May 25, 2018 via email

@BryanCutler BryanCutler force-pushed the arrow-grouped-map-mixesup-cols-SPARK-24324 branch from e322e1a to 59972d6 Compare June 18, 2018 22:07
.internal()
.doc("When true, a grouped map Pandas UDF will assign columns from the returned " +
"Pandas DataFrame based on position, regardless of column label type. When false, " +
"columns will be looked up by name if labeled with a string and fallback to use" +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a space at end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also be marked as deprecated right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think so.

val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
s"stdout writer for $pythonExec", 0, Long.MaxValue)
val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this back, accidental

@BryanCutler
Copy link
Member Author

I updated to use a conf that reverts to column assignment by position, regardless of the the type of column labels in the Pandas DataFrame, and added a test for this. I also put this in a generic Map[String, String] so it will be less trouble to add additional confs in the future, or remove deprecated ones.

@SparkQA
Copy link

SparkQA commented Jun 19, 2018

Test build #92048 has finished for PR 21427 at commit 59972d6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jun 19, 2018

Test build #92066 has finished for PR 21427 at commit 59972d6.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jun 19, 2018

Test build #92077 has finished for PR 21427 at commit 59972d6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/** Return Map with conf settings to be used in ArrowPythonRunner */
def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = {
Copy link
Contributor

@icexelloss icexelloss Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this function out of ArrowUtils? Doesn't seem to be Arrow specific.

Edit: Actually, nvm

private val batchSize = conf.arrowMaxRecordsPerBatch
private val sessionLocalTimeZone = conf.sessionLocalTimeZone
private val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
private val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: runnerConf -> pythonRunnerConf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

PythonRDD.writeUTF(timeZoneId, dataOut)
} else {
dataOut.writeInt(SpecialLengths.NULL)
dataOut.writeInt(conf.size)
Copy link
Contributor

@icexelloss icexelloss Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put this in a writeConf method to be more specific?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, but I will add some comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, SGTM.

val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pythonRunnerConf?

val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pythonRunnerConf?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 19, 2018

Seems fine and I am okay with it. Should be good to address nits anyway.

} else {
Nil
}
val pandasColsByPosition = if (conf.pandasGroupedMapAssignColumnssByPosition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do:

val pandasColByPosition = Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION.key -> conf.pandasGroupedMapAssignColumnssByPosition)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to just omit the config for the default case, that way it's easier to process in the worker.

Copy link
Contributor

@icexelloss icexelloss Jun 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry can you explain why it's easier to process in the worker?

I think we just need to remove the default value here?
https://github.com/apache/spark/pull/21427/files#diff-d33eea00c68dfd120f4ceae6381f34cdR100

Also one thing is not great about omitting the conf for default case is that you need to put the default value in two places..(both python and java)

# Assign result columns by schema name
return [(result[field.name], to_arrow_type(field.dataType))
for field in return_type]
except KeyError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to be a little more careful here, for example, an KeyError in to_arrow_type could lead to unexpected behavior.

How about sth like this:

if any(isinstance(name, basestring) for name in result.columns):
    return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type]
else:
    return [(result.iloc[:,i], to_arrow_type(field.dataType)) for i, field in enumerate(return_type)]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems ok to me since it's basically the same, but I don't think we need to worry about to_arrow_type throwing a KeyError. Is there any particular reason you suggested handling position like this?

[(result.iloc[:,i], to_arrow_type(field.dataType)) for i, field in enumerate(return_type)]

To me it seems better to look up by column labels, how it is currently

[(result[result.columns[i]], to_arrow_type(field.dataType))
                for i, field in enumerate(return_type)]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think result.iloc[:,i] and result[result.columns[i]] are the same, you don't have change it if you prefer result.columns[i]

I agree to_arrow_type doesn't throw KeyError, but in general I feel it's more robust not to assume the implementation detail of to_arrow_type. I think the code is more concise and readable with if/else too (comparing to except KeyError)

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92223 has finished for PR 21427 at commit 2d2ced6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in a5849ad Jun 24, 2018
@BryanCutler BryanCutler deleted the arrow-grouped-map-mixesup-cols-SPARK-24324 branch June 25, 2018 17:54
@BryanCutler
Copy link
Member Author

Thanks all for the discussion and reviews!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants