Skip to content

Commit 008abc5

Browse files
authored
Merge branch 'master' into package
2 parents 69b7cef + f67140a commit 008abc5

File tree

12 files changed

+270
-35
lines changed

12 files changed

+270
-35
lines changed

src/csharp/Microsoft.Spark.E2ETest/IpcTests/RDDTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void TestParallelize()
3434
[Fact]
3535
public void TestTextFile()
3636
{
37-
RDD<string> rdd = _sc.TextFile(TestEnvironment.ResourceDirectory + "people.txt");
37+
RDD<string> rdd = _sc.TextFile($"{TestEnvironment.ResourceDirectory}people.txt");
3838
var strs = new[] { "Michael, 29", "Andy, 30", "Justin, 19" };
3939
Assert.Equal(strs, rdd.Collect());
4040

src/csharp/Microsoft.Spark.E2ETest/IpcTests/SparkContextTests.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5+
using Microsoft.Spark.E2ETest.Utils;
56
using Xunit;
67

78
namespace Microsoft.Spark.E2ETest.IpcTests
@@ -30,11 +31,14 @@ public void TestSignaturesV2_3_X()
3031

3132
sc.ClearJobGroup();
3233

33-
string filePath = TestEnvironment.ResourceDirectory + "people.txt";
34+
string filePath = $"{TestEnvironment.ResourceDirectory}people.txt";
3435
sc.AddFile(filePath);
3536
sc.AddFile(filePath, true);
3637

37-
sc.SetCheckpointDir(TestEnvironment.ResourceDirectory);
38+
using (var tempDir = new TemporaryDirectory())
39+
{
40+
sc.SetCheckpointDir(TestEnvironment.ResourceDirectory);
41+
}
3842
}
3943
}
4044
}

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameFunctionsTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public DataFrameFunctionsTests(SparkFixture fixture)
1919
_spark = fixture.Spark;
2020
_df = _spark
2121
.Read()
22-
.Json(TestEnvironment.ResourceDirectory + "people.json");
22+
.Json($"{TestEnvironment.ResourceDirectory}people.json");
2323
}
2424

2525
[Fact]
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
using Microsoft.Spark.Sql;
7+
using Xunit;
8+
9+
namespace Microsoft.Spark.E2ETest.IpcTests
10+
{
11+
[Collection("Spark E2E Tests")]
12+
public class DataFrameReaderTests
13+
{
14+
private readonly SparkSession _spark;
15+
16+
public DataFrameReaderTests(SparkFixture fixture)
17+
{
18+
_spark = fixture.Spark;
19+
}
20+
21+
/// <summary>
22+
/// Test signatures for APIs up to Spark 2.3.*.
23+
/// </summary>
24+
[Fact]
25+
public void TestSignaturesV2_3_X()
26+
{
27+
DataFrameReader dfr = _spark.Read();
28+
29+
Assert.IsType<DataFrameReader>(dfr.Format("json"));
30+
31+
Assert.IsType<DataFrameReader>(dfr.Schema("age INT, name STRING"));
32+
33+
Assert.IsType<DataFrameReader>(dfr.Option("stringOption", "value"));
34+
Assert.IsType<DataFrameReader>(dfr.Option("boolOption", true));
35+
Assert.IsType<DataFrameReader>(dfr.Option("longOption", 1L));
36+
Assert.IsType<DataFrameReader>(dfr.Option("doubleOption", 3D));
37+
38+
Assert.IsType<DataFrameReader>(
39+
dfr.Options(
40+
new Dictionary<string, string>
41+
{
42+
{ "option1", "value1" },
43+
{ "option2", "value2" }
44+
}));
45+
46+
string jsonFile = $"{TestEnvironment.ResourceDirectory}people.json";
47+
Assert.IsType<DataFrame>(dfr.Load(jsonFile));
48+
Assert.IsType<DataFrame>(dfr.Load(jsonFile, jsonFile));
49+
50+
Assert.IsType<DataFrame>(dfr.Json(jsonFile));
51+
Assert.IsType<DataFrame>(dfr.Json(jsonFile, jsonFile));
52+
53+
string csvFile = $"{TestEnvironment.ResourceDirectory}people.csv";
54+
Assert.IsType<DataFrame>(dfr.Csv(csvFile));
55+
Assert.IsType<DataFrame>(dfr.Csv(csvFile, csvFile));
56+
57+
string parquetFile = $"{TestEnvironment.ResourceDirectory}users.parquet";
58+
Assert.IsType<DataFrame>(dfr.Parquet(parquetFile));
59+
Assert.IsType<DataFrame>(dfr.Parquet(parquetFile, parquetFile));
60+
61+
string orcFile = $"{TestEnvironment.ResourceDirectory}users.orc";
62+
Assert.IsType<DataFrame>(dfr.Orc(orcFile));
63+
Assert.IsType<DataFrame>(dfr.Orc(orcFile, orcFile));
64+
65+
dfr = _spark.Read();
66+
string textFile = $"{TestEnvironment.ResourceDirectory}people.txt";
67+
Assert.IsType<DataFrame>(dfr.Text(textFile));
68+
Assert.IsType<DataFrame>(dfr.Text(textFile, textFile));
69+
}
70+
}
71+
}

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public DataFrameTests(SparkFixture fixture)
2929
_df = _spark
3030
.Read()
3131
.Schema("age INT, name STRING")
32-
.Json(TestEnvironment.ResourceDirectory + "people.json");
32+
.Json($"{TestEnvironment.ResourceDirectory}people.json");
3333
}
3434

3535
[Fact]
@@ -135,14 +135,17 @@ public void TestSignaturesV2_3_X()
135135

136136
_df.IsStreaming();
137137

138-
// The following is required for *CheckPoint().
139-
_spark.SparkContext.SetCheckpointDir(TestEnvironment.ResourceDirectory);
140-
141-
_df.Checkpoint();
142-
_df.Checkpoint(false);
143-
144-
_df.LocalCheckpoint();
145-
_df.LocalCheckpoint(false);
138+
using (var tempDir = new TemporaryDirectory())
139+
{
140+
// The following is required for *CheckPoint().
141+
_spark.SparkContext.SetCheckpointDir(tempDir.Path);
142+
143+
_df.Checkpoint();
144+
_df.Checkpoint(false);
145+
146+
_df.LocalCheckpoint();
147+
_df.LocalCheckpoint(false);
148+
}
146149

147150
_df.WithWatermark("time", "10 minutes");
148151

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
using Microsoft.Spark.E2ETest.Utils;
7+
using Microsoft.Spark.Sql;
8+
using Xunit;
9+
10+
namespace Microsoft.Spark.E2ETest.IpcTests
11+
{
12+
[Collection("Spark E2E Tests")]
13+
public class DataFrameWriterTests
14+
{
15+
private readonly SparkSession _spark;
16+
17+
public DataFrameWriterTests(SparkFixture fixture)
18+
{
19+
_spark = fixture.Spark;
20+
}
21+
22+
/// <summary>
23+
/// Test signatures for APIs up to Spark 2.3.*.
24+
/// </summary>
25+
[Fact]
26+
public void TestSignaturesV2_3_X()
27+
{
28+
{
29+
DataFrameWriter dfw = _spark
30+
.Read()
31+
.Schema("age INT, name STRING")
32+
.Json($"{TestEnvironment.ResourceDirectory}people.json")
33+
.Write();
34+
35+
Assert.IsType<DataFrameWriter>(dfw.Mode(SaveMode.Ignore));
36+
37+
Assert.IsType<DataFrameWriter>(dfw.Mode("overwrite"));
38+
39+
Assert.IsType<DataFrameWriter>(dfw.Format("json"));
40+
41+
Assert.IsType<DataFrameWriter>(dfw.Option("stringOption", "value"));
42+
Assert.IsType<DataFrameWriter>(dfw.Option("boolOption", true));
43+
Assert.IsType<DataFrameWriter>(dfw.Option("longOption", 1L));
44+
Assert.IsType<DataFrameWriter>(dfw.Option("doubleOption", 3D));
45+
46+
Assert.IsType<DataFrameWriter>(
47+
dfw.Options(
48+
new Dictionary<string, string>
49+
{
50+
{ "option1", "value1" },
51+
{ "option2", "value2" }
52+
}));
53+
54+
Assert.IsType<DataFrameWriter>(dfw.PartitionBy("age"));
55+
Assert.IsType<DataFrameWriter>(dfw.PartitionBy("age", "name"));
56+
57+
Assert.IsType<DataFrameWriter>(dfw.BucketBy(3, "age"));
58+
Assert.IsType<DataFrameWriter>(dfw.BucketBy(3, "age", "name"));
59+
60+
Assert.IsType<DataFrameWriter>(dfw.SortBy("name"));
61+
}
62+
63+
using (var tempDir = new TemporaryDirectory())
64+
{
65+
DataFrameWriter dfw = _spark
66+
.Read()
67+
.Csv($"{TestEnvironment.ResourceDirectory}people.csv")
68+
.Write();
69+
70+
// TODO: Test dfw.Jdbc without running a local db.
71+
72+
dfw.Option("path", tempDir.Path).SaveAsTable("TestTable");
73+
74+
dfw.InsertInto("TestTable");
75+
76+
dfw.Option("path", $"{tempDir.Path}TestSavePath1").Save();
77+
dfw.Save($"{tempDir.Path}TestSavePath2");
78+
79+
dfw.Json($"{tempDir.Path}TestJsonPath");
80+
81+
dfw.Parquet($"{tempDir.Path}TestParquetPath");
82+
83+
dfw.Orc($"{tempDir.Path}TestOrcPath");
84+
85+
dfw.Text($"{tempDir.Path}TestTextPath");
86+
87+
dfw.Csv($"{tempDir.Path}TestCsvPath");
88+
}
89+
}
90+
}
91+
}

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/FunctionsTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public void TestSignaturesV2_3_X()
207207

208208
DataFrame df = _spark
209209
.Read()
210-
.Json(TestEnvironment.ResourceDirectory + "people.json");
210+
.Json($"{TestEnvironment.ResourceDirectory}people.json");
211211
df = Broadcast(df);
212212

213213
col = Coalesce();
547 Bytes
Binary file not shown.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.IO;
7+
8+
namespace Microsoft.Spark.E2ETest.Utils
9+
{
10+
/// <summary>
11+
/// Creates a temporary folder that is automatically cleaned up when disposed.
12+
/// </summary>
13+
internal sealed class TemporaryDirectory : IDisposable
14+
{
15+
private bool disposed = false;
16+
17+
/// <summary>
18+
/// Path to temporary folder.
19+
/// </summary>
20+
public string Path { get; }
21+
22+
public TemporaryDirectory()
23+
{
24+
Path = System.IO.Path.Combine(System.IO.Path.GetTempPath(), Guid.NewGuid().ToString());
25+
Cleanup();
26+
Directory.CreateDirectory(Path);
27+
Path = $"{Path}{System.IO.Path.DirectorySeparatorChar}";
28+
}
29+
30+
public void Dispose()
31+
{
32+
Dispose(true);
33+
GC.SuppressFinalize(this);
34+
}
35+
36+
private void Cleanup()
37+
{
38+
if (File.Exists(Path))
39+
{
40+
File.Delete(Path);
41+
}
42+
else if (Directory.Exists(Path))
43+
{
44+
Directory.Delete(Path, true);
45+
}
46+
}
47+
48+
private void Dispose(bool disposing)
49+
{
50+
if (disposed)
51+
{
52+
return;
53+
}
54+
55+
if (disposing)
56+
{
57+
Cleanup();
58+
}
59+
60+
disposed = true;
61+
}
62+
}
63+
}

src/csharp/Microsoft.Spark/Sql/DataFrameReader.cs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ public DataFrameReader Schema(string schemaString)
5858
/// <returns>This DataFrameReader object</returns>
5959
public DataFrameReader Option(string key, string value)
6060
{
61-
OptionInternal(key, value);
62-
return this;
61+
return OptionInternal(key, value);
6362
}
6463

6564
/// <summary>
@@ -70,8 +69,7 @@ public DataFrameReader Option(string key, string value)
7069
/// <returns>This DataFrameReader object</returns>
7170
public DataFrameReader Option(string key, bool value)
7271
{
73-
OptionInternal(key, value);
74-
return this;
72+
return OptionInternal(key, value);
7573
}
7674

7775
/// <summary>
@@ -82,8 +80,7 @@ public DataFrameReader Option(string key, bool value)
8280
/// <returns>This DataFrameReader object</returns>
8381
public DataFrameReader Option(string key, long value)
8482
{
85-
OptionInternal(key, value);
86-
return this;
83+
return OptionInternal(key, value);
8784
}
8885

8986
/// <summary>
@@ -94,8 +91,7 @@ public DataFrameReader Option(string key, long value)
9491
/// <returns>This DataFrameReader object</returns>
9592
public DataFrameReader Option(string key, double value)
9693
{
97-
OptionInternal(key, value);
98-
return this;
94+
return OptionInternal(key, value);
9995
}
10096

10197
/// <summary>
@@ -119,7 +115,7 @@ public DataFrameReader Options(Dictionary<string, string> options)
119115
/// <param name="paths">Input paths</param>
120116
/// <returns>DataFrame object</returns>
121117
public DataFrame Load(params string[] paths) =>
122-
new DataFrame((JvmObjectReference)_jvmObject.Invoke("load", paths));
118+
new DataFrame((JvmObjectReference)_jvmObject.Invoke("load", (object)paths));
123119

124120
/// <summary>
125121
/// Loads a JSON file (one object per line) and returns the result as a DataFrame.
@@ -182,7 +178,7 @@ private DataFrame LoadSource(string source, params string[] paths)
182178
throw new ArgumentException($"paths cannot be empty for source: {source}");
183179
}
184180

185-
return new DataFrame((JvmObjectReference)_jvmObject.Invoke(source, paths));
181+
return new DataFrame((JvmObjectReference)_jvmObject.Invoke(source, (object)paths));
186182
}
187183
}
188184
}

0 commit comments

Comments
 (0)