From afb175205dcfc08e4d48e4be4527120e96b2e68c Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Mon, 26 Jul 2021 17:25:40 +0000 Subject: [PATCH 01/18] from arrow need a from bytes for java written data --- python/ray/experimental/data/block.py | 4 ++++ python/ray/experimental/data/impl/arrow_block.py | 5 +++++ python/ray/experimental/data/read_api.py | 2 +- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/python/ray/experimental/data/block.py b/python/ray/experimental/data/block.py index 71db75330d88..e4049cb703a8 100644 --- a/python/ray/experimental/data/block.py +++ b/python/ray/experimental/data/block.py @@ -112,6 +112,10 @@ def for_block(block: Block) -> "BlockAccessor[T]": from ray.experimental.data.impl.arrow_block import \ ArrowBlockAccessor return ArrowBlockAccessor(block) + elif isinstance(block, bytes): + from ray.experimental.data.impl.arrow_block import \ + ArrowBlockAccessor + return ArrowBlockAccessor.from_bytes(block) elif isinstance(block, list): from ray.experimental.data.impl.block_builder import \ SimpleBlockAccessor diff --git a/python/ray/experimental/data/impl/arrow_block.py b/python/ray/experimental/data/impl/arrow_block.py index 5f2ca9ec0381..c3f3b7c5b38d 100644 --- a/python/ray/experimental/data/impl/arrow_block.py +++ b/python/ray/experimental/data/impl/arrow_block.py @@ -124,6 +124,11 @@ def __init__(self, table: "pyarrow.Table"): raise ImportError("Run `pip install pyarrow` for Arrow support") self._table = table + @classmethod + def from_bytes(cls, data: bytes): + reader = pyarrow.ipc.open_stream(data) + return cls(reader.read_all()) + def iter_rows(self) -> Iterator[ArrowRow]: outer = self diff --git a/python/ray/experimental/data/read_api.py b/python/ray/experimental/data/read_api.py index 930b8623ea80..c2cf944772e3 100644 --- a/python/ray/experimental/data/read_api.py +++ b/python/ray/experimental/data/read_api.py @@ -345,7 +345,7 @@ def df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]: @PublicAPI(stability="beta") -def from_arrow(tables: List[ObjectRef["pyarrow.Table"]], +def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]], parallelism: int = 200) -> Dataset[ArrowRow]: """Create a dataset from a set of Arrow tables. From 619404ab3c6473d3f0f53fc987dfd7c731007c51 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Thu, 29 Jul 2021 09:55:18 +0000 Subject: [PATCH 02/18] add bytes to block type --- python/ray/experimental/data/block.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/experimental/data/block.py b/python/ray/experimental/data/block.py index e4049cb703a8..256a21c50a18 100644 --- a/python/ray/experimental/data/block.py +++ b/python/ray/experimental/data/block.py @@ -14,7 +14,7 @@ # # Block data can be accessed in a uniform way via ``BlockAccessors`` such as # ``SimpleBlockAccessor`` and ``ArrowBlockAccessor``. -Block = Union[List[T], "pyarrow.Table"] +Block = Union[List[T], "pyarrow.Table", bytes] @DeveloperAPI From 0c8de45d5f26b8169166b075d1505e552ba84131 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Tue, 17 Aug 2021 14:13:36 +0000 Subject: [PATCH 03/18] owner address is needed for ensure locality --- python/ray/_raylet.pyx | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 812068e57ff2..2ee8c318dac7 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1647,6 +1647,11 @@ cdef class CoreWorker: CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference( c_object_id) + def get_owner_address(self, ObjectRef object_ref): + cdef: + CObjectID c_object_id = object_ref.native() + return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(c_object_id).SerializeAsString() + def serialize_and_promote_object_ref(self, ObjectRef object_ref): cdef: CObjectID c_object_id = object_ref.native() From cf9f4f420888b559e7e64911ec392797977dc7a6 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Tue, 17 Aug 2021 14:34:09 +0000 Subject: [PATCH 04/18] use raydp provided functions --- python/ray/data/dataset.py | 5 +++-- python/ray/data/read_api.py | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index f5952449c1cf..c813294143a9 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1275,7 +1275,7 @@ def to_modin(self) -> "modin.DataFrame": """ raise NotImplementedError # P1 - def to_spark(self) -> "pyspark.sql.DataFrame": + def to_spark(self, spark: "pyspark.sql.SparkSession") -> "pyspark.sql.DataFrame": """Convert this dataset into a Spark dataframe. Time complexity: O(dataset size / parallelism) @@ -1283,7 +1283,8 @@ def to_spark(self) -> "pyspark.sql.DataFrame": Returns: A Spark dataframe created from this dataset. """ - raise NotImplementedError # P2 + import raydp + raydp.spark.ray_dataset_to_spark_dataframe(spark, self) def to_pandas(self) -> List[ObjectRef["pandas.DataFrame"]]: """Convert this dataset into a distributed set of Pandas dataframes. diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index f0ef5fedc104..15ad377fa97a 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -502,13 +502,15 @@ def from_spark(df: "pyspark.sql.DataFrame", *, """Create a dataset from a Spark dataframe. Args: + spark: A SparkSession, which must be created by RayDP (Spark-on-Ray). df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. Returns: Dataset holding Arrow records read from the dataframe. """ - raise NotImplementedError # P2 + import raydp + return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism) def _df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]: From 68e109ff4d632ee47d094cf1d6fdc81cb41a127d Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Wed, 18 Aug 2021 10:22:43 +0000 Subject: [PATCH 05/18] format --- python/ray/_raylet.pyx | 3 ++- python/ray/data/dataset.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index aa025547e7a4..c6c56b05e240 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1670,7 +1670,8 @@ cdef class CoreWorker: def get_owner_address(self, ObjectRef object_ref): cdef: CObjectID c_object_id = object_ref.native() - return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(c_object_id).SerializeAsString() + return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress( + c_object_id).SerializeAsString() def serialize_and_promote_object_ref(self, ObjectRef object_ref): cdef: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index c813294143a9..d67a64f1d5da 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1275,7 +1275,8 @@ def to_modin(self) -> "modin.DataFrame": """ raise NotImplementedError # P1 - def to_spark(self, spark: "pyspark.sql.SparkSession") -> "pyspark.sql.DataFrame": + def to_spark(self, + spark: "pyspark.sql.SparkSession") -> "pyspark.sql.DataFrame": """Convert this dataset into a Spark dataframe. Time complexity: O(dataset size / parallelism) From dec202ee419c6372f9bd260b2dd9ae4953cafd2c Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Mon, 23 Aug 2021 14:41:40 +0000 Subject: [PATCH 06/18] add a developer api for getting block locations --- python/ray/data/dataset.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index d67a64f1d5da..90ac6dadf8b8 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1462,6 +1462,11 @@ def get_blocks(self) -> List[ObjectRef[Block]]: """ return list(self._blocks) + @DeveloperAPI + def get_block_locations(self) -> List[bytes]: + core_worker = ray.worker.global_worker.core_worker + return [core_worker.get_owner_address(block) for block in self.get_blocks()] + def __repr__(self) -> str: schema = self.schema() if schema is None: From 93994c953a21b630df55d1cf7c6c832e774a74d7 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Fri, 27 Aug 2021 16:05:20 +0000 Subject: [PATCH 07/18] fix; add test --- python/ray/data/dataset.py | 2 +- python/ray/tests/test_raydp_dataset.py | 38 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 python/ray/tests/test_raydp_dataset.py diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 90ac6dadf8b8..3cc807c8b8cd 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1285,7 +1285,7 @@ def to_spark(self, A Spark dataframe created from this dataset. """ import raydp - raydp.spark.ray_dataset_to_spark_dataframe(spark, self) + return raydp.spark.ray_dataset_to_spark_dataframe(spark, self) def to_pandas(self) -> List[ObjectRef["pandas.DataFrame"]]: """Convert this dataset into a distributed set of Pandas dataframes. diff --git a/python/ray/tests/test_raydp_dataset.py b/python/ray/tests/test_raydp_dataset.py new file mode 100644 index 000000000000..0923e7bbbf34 --- /dev/null +++ b/python/ray/tests/test_raydp_dataset.py @@ -0,0 +1,38 @@ +import pytest +import ray +import raydp + +@pytest.fixture(scope="function") +def spark_on_ray_small(request): + ray.init(num_cpus=2, include_dashboard=False) + spark = raydp.init_spark("test", 1, 1, "500 M") + + def stop_all(): + raydp.stop_spark() + ray.shutdown() + request.addfinalizer(stop_all) + return spark + +def test_raydp_roundtrip(spark_on_ray_small): + spark = spark_on_ray_small + spark_df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["one", "two"]) + rows = [(r.one, r.two) for r in spark_df.take(3)] + ds = ray.data.from_spark(spark_df) + values = [(r["one"], r["two"]) for r in ds.take(6)] + assert values == rows + df = ds.to_spark(spark) + rows_2 = [(r.one, r.two) for r in df.take(3)] + assert values == rows_2 + +def test_raydp_to_spark(spark_on_ray_small): + spark = spark_on_ray_small + n = 5 + ds = ray.data.range_arrow(n) + values = [r["value"] for r in ds.take(5)] + df = ds.to_spark(spark) + rows = [r.value for r in df.take(5)] + assert values == rows + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__])) From 407a7e684037795fb5952ccc692569c635ab1c6e Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Fri, 27 Aug 2021 16:12:22 +0000 Subject: [PATCH 08/18] fix --- python/ray/data/block.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/block.py b/python/ray/data/block.py index d2c5ad9c79fb..e8710e795bf0 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -121,7 +121,7 @@ def for_block(block: Block) -> "BlockAccessor[T]": ArrowBlockAccessor return ArrowBlockAccessor(block) elif isinstance(block, bytes): - from ray.experimental.data.impl.arrow_block import \ + from ray.data.impl.arrow_block import \ ArrowBlockAccessor return ArrowBlockAccessor.from_bytes(block) elif isinstance(block, list): From 6200f00e2af6662adcc121261118f40a79e76117 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Thu, 2 Sep 2021 14:25:01 +0000 Subject: [PATCH 09/18] update --- python/ray/data/read_api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index e03771d6b1a0..9c9640159768 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -507,13 +507,15 @@ def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]], @PublicAPI(stability="beta") def from_spark(df: "pyspark.sql.DataFrame", *, - parallelism: int = 200) -> Dataset[ArrowRow]: + parallelism: int = 0) -> Dataset[ArrowRow]: """Create a dataset from a Spark dataframe. Args: spark: A SparkSession, which must be created by RayDP (Spark-on-Ray). df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. + If not provided, it will be equal to the number of partitions of the + original Spark dataframe. Returns: Dataset holding Arrow records read from the dataframe. From 6a27cac9d0adb185ff5c8a24c64e8848bddba0ff Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Thu, 2 Sep 2021 15:26:08 +0000 Subject: [PATCH 10/18] format --- python/ray/data/dataset.py | 5 ++++- python/ray/data/read_api.py | 2 ++ python/ray/tests/test_raydp_dataset.py | 8 +++++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 156a370ae380..4bd20e3ac2d0 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1449,7 +1449,10 @@ def get_blocks(self) -> List[ObjectRef[Block]]: @DeveloperAPI def get_block_locations(self) -> List[bytes]: core_worker = ray.worker.global_worker.core_worker - return [core_worker.get_owner_address(block) for block in self.get_blocks()] + return [ + core_worker.get_owner_address(block) + for block in self.get_blocks() + ] def _split(self, index: int, return_right_half: bool) -> ("Dataset[T]", "Dataset[T]"): diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 9c9640159768..28fae326140a 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -487,6 +487,7 @@ def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[np.ndarray]: blocks, metadata = zip(*res) return Dataset(BlockList(blocks, ray.get(list(metadata)))) + @PublicAPI(stability="beta") def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]], *, @@ -505,6 +506,7 @@ def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]], metadata = [get_metadata.remote(t) for t in tables] return Dataset(BlockList(tables, ray.get(metadata))) + @PublicAPI(stability="beta") def from_spark(df: "pyspark.sql.DataFrame", *, parallelism: int = 0) -> Dataset[ArrowRow]: diff --git a/python/ray/tests/test_raydp_dataset.py b/python/ray/tests/test_raydp_dataset.py index 0923e7bbbf34..c86c6a0803c1 100644 --- a/python/ray/tests/test_raydp_dataset.py +++ b/python/ray/tests/test_raydp_dataset.py @@ -2,6 +2,7 @@ import ray import raydp + @pytest.fixture(scope="function") def spark_on_ray_small(request): ray.init(num_cpus=2, include_dashboard=False) @@ -10,12 +11,15 @@ def spark_on_ray_small(request): def stop_all(): raydp.stop_spark() ray.shutdown() + request.addfinalizer(stop_all) return spark + def test_raydp_roundtrip(spark_on_ray_small): spark = spark_on_ray_small - spark_df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["one", "two"]) + spark_df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], + ["one", "two"]) rows = [(r.one, r.two) for r in spark_df.take(3)] ds = ray.data.from_spark(spark_df) values = [(r["one"], r["two"]) for r in ds.take(6)] @@ -24,6 +28,7 @@ def test_raydp_roundtrip(spark_on_ray_small): rows_2 = [(r.one, r.two) for r in df.take(3)] assert values == rows_2 + def test_raydp_to_spark(spark_on_ray_small): spark = spark_on_ray_small n = 5 @@ -33,6 +38,7 @@ def test_raydp_to_spark(spark_on_ray_small): rows = [r.value for r in df.take(5)] assert values == rows + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) From 2bfc10a8be58758d1e4ac495d710151586648d00 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Thu, 2 Sep 2021 16:06:27 +0000 Subject: [PATCH 11/18] add raydp in test dependency --- python/requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/requirements.txt b/python/requirements.txt index ee0e4bb0d372..915cace43959 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -59,6 +59,8 @@ moto mypy networkx numba +raydp-nightly; platform_system != "Windows" + # higher version of llvmlite breaks windows llvmlite==0.34.0 openpyxl From b2b5a6fcd568605d93edd56f0f12aa8466d61915 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Tue, 7 Sep 2021 11:09:43 +0000 Subject: [PATCH 12/18] get object locations in ray --- python/ray/data/dataset.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 3803edbc40c7..a366b45e3d66 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1267,7 +1267,12 @@ def to_spark(self, A Spark dataframe created from this dataset. """ import raydp - return raydp.spark.ray_dataset_to_spark_dataframe(spark, self) + core_worker = ray.worker.global_worker.core_worker + locations = [ + core_worker.get_owner_address(block) + for block in self.get_blocks() + ] + return raydp.spark.ray_dataset_to_spark_dataframe(spark, self, locations) def to_pandas(self) -> List[ObjectRef["pandas.DataFrame"]]: """Convert this dataset into a distributed set of Pandas dataframes. @@ -1460,14 +1465,6 @@ def get_blocks(self) -> List[ObjectRef[Block]]: """ return list(self._blocks) - @DeveloperAPI - def get_block_locations(self) -> List[bytes]: - core_worker = ray.worker.global_worker.core_worker - return [ - core_worker.get_owner_address(block) - for block in self.get_blocks() - ] - def _split(self, index: int, return_right_half: bool) -> ("Dataset[T]", "Dataset[T]"): get_num_rows = cached_remote_fn(_get_num_rows) From 00272ca40ae333584a9c0ddf44faed8736f22fbb Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Tue, 7 Sep 2021 11:11:21 +0000 Subject: [PATCH 13/18] move test location --- python/ray/{ => data}/tests/test_raydp_dataset.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename python/ray/{ => data}/tests/test_raydp_dataset.py (100%) diff --git a/python/ray/tests/test_raydp_dataset.py b/python/ray/data/tests/test_raydp_dataset.py similarity index 100% rename from python/ray/tests/test_raydp_dataset.py rename to python/ray/data/tests/test_raydp_dataset.py From d99c7f44b17124a05a071df1bb9377ac56839977 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Tue, 7 Sep 2021 14:15:51 +0000 Subject: [PATCH 14/18] move raydp dependency --- python/requirements.txt | 2 -- python/requirements/data_processing/requirements.txt | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/python/requirements.txt b/python/requirements.txt index 915cace43959..ee0e4bb0d372 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -59,8 +59,6 @@ moto mypy networkx numba -raydp-nightly; platform_system != "Windows" - # higher version of llvmlite breaks windows llvmlite==0.34.0 openpyxl diff --git a/python/requirements/data_processing/requirements.txt b/python/requirements/data_processing/requirements.txt index 7f49e686a9b1..c835ec7472a3 100644 --- a/python/requirements/data_processing/requirements.txt +++ b/python/requirements/data_processing/requirements.txt @@ -7,3 +7,4 @@ s3fs modin>=0.8.3; python_version < '3.7' modin>=0.10.0; python_version >= '3.7' pytest-repeat +raydp-nightly From c4f62d26a9892ff6d9d299488f54f6fd9667e85e Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Tue, 7 Sep 2021 14:47:39 +0000 Subject: [PATCH 15/18] passing only blocks and schema --- python/ray/data/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index a366b45e3d66..e80e7489c622 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1272,7 +1272,8 @@ def to_spark(self, core_worker.get_owner_address(block) for block in self.get_blocks() ] - return raydp.spark.ray_dataset_to_spark_dataframe(spark, self, locations) + return raydp.spark.ray_dataset_to_spark_dataframe( + spark, self.schema(), self.get_blocks(), locations) def to_pandas(self) -> List[ObjectRef["pandas.DataFrame"]]: """Convert this dataset into a distributed set of Pandas dataframes. From edfae6e2b2561e270b5aacba56f62b668e365117 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Wed, 8 Sep 2021 10:29:57 +0800 Subject: [PATCH 16/18] Update python/ray/data/read_api.py Co-authored-by: Eric Liang --- python/ray/data/read_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index a21fd5c3ddaf..316c399f9cd5 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -509,7 +509,7 @@ def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]], @PublicAPI(stability="beta") def from_spark(df: "pyspark.sql.DataFrame", *, - parallelism: int = 0) -> Dataset[ArrowRow]: + parallelism: Optional[int] = None) -> Dataset[ArrowRow]: """Create a dataset from a Spark dataframe. Args: From c1a59d262acd4ca4368fb63190da15725b889bf3 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Wed, 8 Sep 2021 10:24:15 +0000 Subject: [PATCH 17/18] minor updates --- python/ray/data/read_api.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 316c399f9cd5..cafee4d5a6ff 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -489,15 +489,13 @@ def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[np.ndarray]: @PublicAPI(stability="beta") -def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]], - *, - parallelism: int = 200) -> Dataset[ArrowRow]: +def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]] + ) -> Dataset[ArrowRow]: """Create a dataset from a set of Arrow tables. Args: tables: A list of Ray object references to Arrow tables, or its streaming format in bytes. - parallelism: The amount of parallelism to use for the dataset. Returns: Dataset holding Arrow records from the tables. @@ -508,16 +506,17 @@ def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]], @PublicAPI(stability="beta") -def from_spark(df: "pyspark.sql.DataFrame", *, +def from_spark(df: "pyspark.sql.DataFrame", + *, parallelism: Optional[int] = None) -> Dataset[ArrowRow]: """Create a dataset from a Spark dataframe. Args: spark: A SparkSession, which must be created by RayDP (Spark-on-Ray). df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray). - parallelism: The amount of parallelism to use for the dataset. - If not provided, it will be equal to the number of partitions of the - original Spark dataframe. + parallelism: The amount of parallelism to use for the dataset. + If not provided, it will be equal to the number of partitions of the + original Spark dataframe. Returns: Dataset holding Arrow records read from the dataframe. From 2734119523d41353374fc36146e38e36dbc4011d Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Thu, 9 Sep 2021 09:57:19 +0000 Subject: [PATCH 18/18] lint --- python/ray/data/read_api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index cafee4d5a6ff..a2daf7732f8e 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -515,8 +515,8 @@ def from_spark(df: "pyspark.sql.DataFrame", spark: A SparkSession, which must be created by RayDP (Spark-on-Ray). df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. - If not provided, it will be equal to the number of partitions of the - original Spark dataframe. + If not provided, it will be equal to the number of partitions of + the original Spark dataframe. Returns: Dataset holding Arrow records read from the dataframe.