Skip to content

Commit d2aa2a0

Browse files
committed
Resolv conflict
2 parents 9c329d8 + cc465fd commit d2aa2a0

File tree

42 files changed

+908
-284
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+908
-284
lines changed

R/pkg/inst/profile/shell.R

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,21 @@
2727
sc <- SparkR::sparkR.init()
2828
assign("sc", sc, envir=.GlobalEnv)
2929
sqlContext <- SparkR::sparkRSQL.init(sc)
30+
sparkVer <- SparkR:::callJMethod(sc, "version")
3031
assign("sqlContext", sqlContext, envir=.GlobalEnv)
31-
cat("\n Welcome to SparkR!")
32+
cat("\n Welcome to")
33+
cat("\n")
34+
cat(" ____ __", "\n")
35+
cat(" / __/__ ___ _____/ /__", "\n")
36+
cat(" _\\ \\/ _ \\/ _ `/ __/ '_/", "\n")
37+
cat(" /___/ .__/\\_,_/_/ /_/\\_\\")
38+
if (nchar(sparkVer) == 0) {
39+
cat("\n")
40+
} else {
41+
cat(" version ", sparkVer, "\n")
42+
}
43+
cat(" /_/", "\n")
44+
cat("\n")
45+
3246
cat("\n Spark context is available as sc, SQL context is available as sqlContext\n")
3347
}

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
139139

140140
@Override
141141
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
142+
// Keep track of success so we know if we ecountered an exception
143+
// We do this rather than a standard try/catch/re-throw to handle
144+
// generic throwables.
142145
boolean success = false;
143146
try {
144147
while (records.hasNext()) {
@@ -147,8 +150,19 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
147150
closeAndWriteOutput();
148151
success = true;
149152
} finally {
150-
if (!success) {
151-
sorter.cleanupAfterError();
153+
if (sorter != null) {
154+
try {
155+
sorter.cleanupAfterError();
156+
} catch (Exception e) {
157+
// Only throw this error if we won't be masking another
158+
// error.
159+
if (success) {
160+
throw e;
161+
} else {
162+
logger.error("In addition to a failure during writing, we failed during " +
163+
"cleanup.", e);
164+
}
165+
}
152166
}
153167
}
154168
}

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,23 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
253253
createWriter(false).stop(false);
254254
}
255255

256+
class PandaException extends RuntimeException {
257+
}
258+
259+
@Test(expected=PandaException.class)
260+
public void writeFailurePropagates() throws Exception {
261+
class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
262+
@Override public boolean hasNext() {
263+
throw new PandaException();
264+
}
265+
@Override public Product2<Object, Object> next() {
266+
return null;
267+
}
268+
}
269+
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
270+
writer.write(new BadRecords());
271+
}
272+
256273
@Test
257274
public void writeEmptyIterator() throws Exception {
258275
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);

docs/sql-programming-guide.md

Lines changed: 117 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.
2222
All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`, `pyspark` shell, or `sparkR` shell.
2323

2424

25-
## Starting Point: `SQLContext`
25+
## Starting Point: SQLContext
2626

2727
<div class="codetabs">
2828
<div data-lang="scala" markdown="1">
@@ -1036,6 +1036,15 @@ for (teenName in collect(teenNames)) {
10361036

10371037
</div>
10381038

1039+
<div data-lang="python" markdown="1">
1040+
1041+
{% highlight python %}
1042+
# sqlContext is an existing HiveContext
1043+
sqlContext.sql("REFRESH TABLE my_table")
1044+
{% endhighlight %}
1045+
1046+
</div>
1047+
10391048
<div data-lang="sql" markdown="1">
10401049

10411050
{% highlight sql %}
@@ -1054,7 +1063,7 @@ SELECT * FROM parquetTable
10541063

10551064
</div>
10561065

1057-
### Partition discovery
1066+
### Partition Discovery
10581067

10591068
Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
10601069
table, data are usually stored in different directories, with partitioning column values encoded in
@@ -1108,7 +1117,7 @@ can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, w
11081117
`true`. When type inference is disabled, string type will be used for the partitioning columns.
11091118

11101119

1111-
### Schema merging
1120+
### Schema Merging
11121121

11131122
Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
11141123
a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
@@ -1208,6 +1217,79 @@ printSchema(df3)
12081217

12091218
</div>
12101219

1220+
### Hive metastore Parquet table conversion
1221+
1222+
When reading from and writing to Hive metastore Parquet tables, Spark SQL will try to use its own
1223+
Parquet support instead of Hive SerDe for better performance. This behavior is controlled by the
1224+
`spark.sql.hive.convertMetastoreParquet` configuration, and is turned on by default.
1225+
1226+
#### Hive/Parquet Schema Reconciliation
1227+
1228+
There are two key differences between Hive and Parquet from the perspective of table schema
1229+
processing.
1230+
1231+
1. Hive is case insensitive, while Parquet is not
1232+
1. Hive considers all columns nullable, while nullability in Parquet is significant
1233+
1234+
Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a
1235+
Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:
1236+
1237+
1. Fields that have the same name in both schema must have the same data type regardless of
1238+
nullability. The reconciled field should have the data type of the Parquet side, so that
1239+
nullability is respected.
1240+
1241+
1. The reconciled schema contains exactly those fields defined in Hive metastore schema.
1242+
1243+
- Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
1244+
- Any fileds that only appear in the Hive metastore schema are added as nullable field in the
1245+
reconciled schema.
1246+
1247+
#### Metadata Refreshing
1248+
1249+
Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table
1250+
conversion is enabled, metadata of those converted tables are also cached. If these tables are
1251+
updated by Hive or other external tools, you need to refresh them manually to ensure consistent
1252+
metadata.
1253+
1254+
<div class="codetabs">
1255+
1256+
<div data-lang="scala" markdown="1">
1257+
1258+
{% highlight scala %}
1259+
// sqlContext is an existing HiveContext
1260+
sqlContext.refreshTable("my_table")
1261+
{% endhighlight %}
1262+
1263+
</div>
1264+
1265+
<div data-lang="java" markdown="1">
1266+
1267+
{% highlight java %}
1268+
// sqlContext is an existing HiveContext
1269+
sqlContext.refreshTable("my_table")
1270+
{% endhighlight %}
1271+
1272+
</div>
1273+
1274+
<div data-lang="python" markdown="1">
1275+
1276+
{% highlight python %}
1277+
# sqlContext is an existing HiveContext
1278+
sqlContext.refreshTable("my_table")
1279+
{% endhighlight %}
1280+
1281+
</div>
1282+
1283+
<div data-lang="sql" markdown="1">
1284+
1285+
{% highlight sql %}
1286+
REFRESH TABLE my_table;
1287+
{% endhighlight %}
1288+
1289+
</div>
1290+
1291+
</div>
1292+
12111293
### Configuration
12121294

12131295
Configuration of Parquet can be done using the `setConf` method on `SQLContext` or by running
@@ -1266,6 +1348,34 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
12661348
support.
12671349
</td>
12681350
</tr>
1351+
<tr>
1352+
<td><code>spark.sql.parquet.output.committer.class</code></td>
1353+
<td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
1354+
<td>
1355+
<p>
1356+
The output committer class used by Parquet. The specified class needs to be a subclass of
1357+
<code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a
1358+
subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
1359+
</p>
1360+
<p>
1361+
<b>Note:</b>
1362+
<ul>
1363+
<li>
1364+
This option must be set via Hadoop <code>Configuration</code> rather than Spark
1365+
<code>SQLConf</code>.
1366+
</li>
1367+
<li>
1368+
This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
1369+
</li>
1370+
</ul>
1371+
</p>
1372+
<p>
1373+
Spark SQL comes with a builtin
1374+
<code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
1375+
efficient then the default Parquet output committer when writing data to S3.
1376+
</p>
1377+
</td>
1378+
</tr>
12691379
</table>
12701380

12711381
## JSON Datasets
@@ -1445,8 +1555,8 @@ This command builds a new assembly jar that includes Hive. Note that this Hive a
14451555
on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries
14461556
(SerDes) in order to access data stored in Hive.
14471557

1448-
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. Please note when running
1449-
the query on a YARN cluster (`yarn-cluster` mode), the `datanucleus` jars under the `lib_managed/jars` directory
1558+
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. Please note when running
1559+
the query on a YARN cluster (`yarn-cluster` mode), the `datanucleus` jars under the `lib_managed/jars` directory
14501560
and `hive-site.xml` under `conf/` directory need to be available on the driver and all executors launched by the
14511561
YARN cluster. The convenient way to do this is adding them through the `--jars` option and `--file` option of the
14521562
`spark-submit` command.
@@ -1794,7 +1904,7 @@ that these options will be deprecated in future release as more optimizations ar
17941904
Configures the number of partitions to use when shuffling data for joins or aggregations.
17951905
</td>
17961906
</tr>
1797-
<tr>
1907+
<tr>
17981908
<td><code>spark.sql.planner.externalSort</code></td>
17991909
<td>false</td>
18001910
<td>
@@ -1889,7 +1999,7 @@ options.
18891999
#### DataFrame data reader/writer interface
18902000

18912001
Based on user feedback, we created a new, more fluid API for reading data in (`SQLContext.read`)
1892-
and writing data out (`DataFrame.write`),
2002+
and writing data out (`DataFrame.write`),
18932003
and deprecated the old APIs (e.g. `SQLContext.parquetFile`, `SQLContext.jsonFile`).
18942004

18952005
See the API docs for `SQLContext.read` (

mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.Logging
2626
import org.apache.spark.annotation.Experimental
2727
import org.apache.spark.ml.PredictorParams
2828
import org.apache.spark.ml.param.ParamMap
29-
import org.apache.spark.ml.param.shared.{HasElasticNetParam, HasMaxIter, HasRegParam, HasTol}
29+
import org.apache.spark.ml.param.shared._
3030
import org.apache.spark.ml.util.Identifiable
3131
import org.apache.spark.mllib.linalg.{Vector, Vectors}
3232
import org.apache.spark.mllib.linalg.BLAS._
@@ -41,7 +41,8 @@ import org.apache.spark.util.StatCounter
4141
* Params for linear regression.
4242
*/
4343
private[regression] trait LinearRegressionParams extends PredictorParams
44-
with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
44+
with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
45+
with HasFitIntercept
4546

4647
/**
4748
* :: Experimental ::
@@ -72,6 +73,14 @@ class LinearRegression(override val uid: String)
7273
def setRegParam(value: Double): this.type = set(regParam, value)
7374
setDefault(regParam -> 0.0)
7475

76+
/**
77+
* Set if we should fit the intercept
78+
* Default is true.
79+
* @group setParam
80+
*/
81+
def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value)
82+
setDefault(fitIntercept -> true)
83+
7584
/**
7685
* Set the ElasticNet mixing parameter.
7786
* For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
@@ -123,6 +132,7 @@ class LinearRegression(override val uid: String)
123132
val numFeatures = summarizer.mean.size
124133
val yMean = statCounter.mean
125134
val yStd = math.sqrt(statCounter.variance)
135+
// look at glmnet5.m L761 maaaybe that has info
126136

127137
// If the yStd is zero, then the intercept is yMean with zero weights;
128138
// as a result, training is not needed.
@@ -142,7 +152,7 @@ class LinearRegression(override val uid: String)
142152
val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam
143153
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
144154

145-
val costFun = new LeastSquaresCostFun(instances, yStd, yMean,
155+
val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
146156
featuresStd, featuresMean, effectiveL2RegParam)
147157

148158
val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
@@ -180,7 +190,7 @@ class LinearRegression(override val uid: String)
180190
// The intercept in R's GLMNET is computed using closed form after the coefficients are
181191
// converged. See the following discussion for detail.
182192
// http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
183-
val intercept = yMean - dot(weights, Vectors.dense(featuresMean))
193+
val intercept = if ($(fitIntercept)) yMean - dot(weights, Vectors.dense(featuresMean)) else 0.0
184194
if (handlePersistence) instances.unpersist()
185195

186196
// TODO: Converts to sparse format based on the storage, but may base on the scoring speed.
@@ -234,13 +244,18 @@ class LinearRegressionModel private[ml] (
234244
* See this discussion for detail.
235245
* http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
236246
*
247+
* When training with intercept enabled,
237248
* The objective function in the scaled space is given by
238249
* {{{
239250
* L = 1/2n ||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2,
240251
* }}}
241252
* where \bar{x_i} is the mean of x_i, \hat{x_i} is the standard deviation of x_i,
242253
* \bar{y} is the mean of label, and \hat{y} is the standard deviation of label.
243254
*
255+
* If we fitting the intercept disabled (that is forced through 0.0),
256+
* we can use the same equation except we set \bar{y} and \bar{x_i} to 0 instead
257+
* of the respective means.
258+
*
244259
* This can be rewritten as
245260
* {{{
246261
* L = 1/2n ||\sum_i (w_i/\hat{x_i})x_i - \sum_i (w_i/\hat{x_i})\bar{x_i} - y / \hat{y}
@@ -255,6 +270,7 @@ class LinearRegressionModel private[ml] (
255270
* \sum_i w_i^\prime x_i - y / \hat{y} + offset
256271
* }}}
257272
*
273+
*
258274
* Note that the effective weights and offset don't depend on training dataset,
259275
* so they can be precomputed.
260276
*
@@ -301,6 +317,7 @@ private class LeastSquaresAggregator(
301317
weights: Vector,
302318
labelStd: Double,
303319
labelMean: Double,
320+
fitIntercept: Boolean,
304321
featuresStd: Array[Double],
305322
featuresMean: Array[Double]) extends Serializable {
306323

@@ -321,7 +338,7 @@ private class LeastSquaresAggregator(
321338
}
322339
i += 1
323340
}
324-
(weightsArray, -sum + labelMean / labelStd, weightsArray.length)
341+
(weightsArray, if (fitIntercept) labelMean / labelStd - sum else 0.0, weightsArray.length)
325342
}
326343

327344
private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray)
@@ -404,6 +421,7 @@ private class LeastSquaresCostFun(
404421
data: RDD[(Double, Vector)],
405422
labelStd: Double,
406423
labelMean: Double,
424+
fitIntercept: Boolean,
407425
featuresStd: Array[Double],
408426
featuresMean: Array[Double],
409427
effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
@@ -412,7 +430,7 @@ private class LeastSquaresCostFun(
412430
val w = Vectors.fromBreeze(weights)
413431

414432
val leastSquaresAggregator = data.treeAggregate(new LeastSquaresAggregator(w, labelStd,
415-
labelMean, featuresStd, featuresMean))(
433+
labelMean, fitIntercept, featuresStd, featuresMean))(
416434
seqOp = (c, v) => (c, v) match {
417435
case (aggregator, (label, features)) => aggregator.add(label, features)
418436
},

0 commit comments

Comments
 (0)