Skip to content

Commit f66e365

Browse files
authored
Create directories on a local filesystem (#301)
* add a test for writing data to disk * create dir first * add fsspec test * make lint * add fs tests
1 parent acc934f commit f66e365

File tree

6 files changed

+114
-7
lines changed

6 files changed

+114
-7
lines changed

pyiceberg/io/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
275275
"s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
276276
"s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
277277
"gs": [ARROW_FILE_IO],
278-
"file": [ARROW_FILE_IO],
278+
"file": [ARROW_FILE_IO, FSSPEC_FILE_IO],
279279
"hdfs": [ARROW_FILE_IO],
280280
"abfs": [FSSPEC_FILE_IO],
281281
"abfss": [FSSPEC_FILE_IO],

pyiceberg/io/fsspec.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def s3v4_rest_signer(properties: Properties, request: AWSRequest, **_: Any) -> A
9999

100100

101101
def _file(_: Properties) -> LocalFileSystem:
102-
return LocalFileSystem()
102+
return LocalFileSystem(auto_mkdir=True)
103103

104104

105105
def _s3(properties: Properties) -> AbstractFileSystem:
@@ -173,6 +173,7 @@ def _adlfs(properties: Properties) -> AbstractFileSystem:
173173

174174

175175
SCHEME_TO_FS = {
176+
"": _file,
176177
"file": _file,
177178
"s3": _s3,
178179
"s3a": _s3,

pyiceberg/io/pyarrow.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@
173173
T = TypeVar("T")
174174

175175

176+
class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem):
177+
def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile:
178+
# In LocalFileSystem, parent directories must be first created before opening an output stream
179+
self.create_dir(os.path.dirname(path), recursive=True)
180+
return super().open_output_stream(path, *args, **kwargs)
181+
182+
176183
class PyArrowFile(InputFile, OutputFile):
177184
"""A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.
178185
@@ -379,9 +386,7 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
379386

380387
return GcsFileSystem(**gcs_kwargs)
381388
elif scheme == "file":
382-
from pyarrow.fs import LocalFileSystem
383-
384-
return LocalFileSystem()
389+
return PyArrowLocalFileSystem()
385390
else:
386391
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
387392

tests/catalog/test_sql.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from pathlib import Path
2020
from typing import Generator, List
2121

22+
import pyarrow as pa
2223
import pytest
2324
from pytest import TempPathFactory
2425
from pytest_lazyfixture import lazy_fixture
@@ -35,7 +36,10 @@
3536
NoSuchTableError,
3637
TableAlreadyExistsError,
3738
)
39+
from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
40+
from pyiceberg.io.pyarrow import schema_to_pyarrow
3841
from pyiceberg.schema import Schema
42+
from pyiceberg.table.snapshots import Operation
3943
from pyiceberg.table.sorting import (
4044
NullOrder,
4145
SortDirection,
@@ -80,7 +84,7 @@ def catalog_memory(warehouse: Path) -> Generator[SqlCatalog, None, None]:
8084
@pytest.fixture(scope="module")
8185
def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
8286
props = {
83-
"uri": "sqlite:////tmp/sql-catalog.db",
87+
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
8488
"warehouse": f"file://{warehouse}",
8589
}
8690
catalog = SqlCatalog("test_sql_catalog", **props)
@@ -92,7 +96,7 @@ def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
9296
@pytest.fixture(scope="module")
9397
def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, None, None]:
9498
props = {
95-
"uri": "sqlite:////tmp/sql-catalog.db",
99+
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
96100
"warehouse": f"file://{warehouse}",
97101
}
98102
catalog = SqlCatalog("test_sql_catalog", **props)
@@ -102,6 +106,19 @@ def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, No
102106
catalog.destroy_tables()
103107

104108

109+
@pytest.fixture(scope="module")
110+
def catalog_sqlite_fsspec(warehouse: Path) -> Generator[SqlCatalog, None, None]:
111+
props = {
112+
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
113+
"warehouse": f"file://{warehouse}",
114+
PY_IO_IMPL: FSSPEC_FILE_IO,
115+
}
116+
catalog = SqlCatalog("test_sql_catalog", **props)
117+
catalog.create_tables()
118+
yield catalog
119+
catalog.destroy_tables()
120+
121+
105122
def test_creation_with_no_uri() -> None:
106123
with pytest.raises(NoSuchPropertyException):
107124
SqlCatalog("test_ddb_catalog", not_uri="unused")
@@ -722,6 +739,47 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, random_i
722739
assert new_schema.find_field("b").field_type == IntegerType()
723740

724741

742+
@pytest.mark.parametrize(
743+
'catalog',
744+
[
745+
lazy_fixture('catalog_memory'),
746+
lazy_fixture('catalog_sqlite'),
747+
lazy_fixture('catalog_sqlite_without_rowcount'),
748+
lazy_fixture('catalog_sqlite_fsspec'),
749+
],
750+
)
751+
def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None:
752+
database_name, _table_name = random_identifier
753+
catalog.create_namespace(database_name)
754+
table = catalog.create_table(random_identifier, table_schema_simple)
755+
756+
df = pa.Table.from_pydict(
757+
{
758+
"foo": ["a"],
759+
"bar": [1],
760+
"baz": [True],
761+
},
762+
schema=schema_to_pyarrow(table_schema_simple),
763+
)
764+
765+
table.append(df)
766+
767+
# new snapshot is written in APPEND mode
768+
assert len(table.metadata.snapshots) == 1
769+
assert table.metadata.snapshots[0].snapshot_id == table.metadata.current_snapshot_id
770+
assert table.metadata.snapshots[0].parent_snapshot_id is None
771+
assert table.metadata.snapshots[0].sequence_number == 1
772+
assert table.metadata.snapshots[0].summary is not None
773+
assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
774+
assert table.metadata.snapshots[0].summary['added-data-files'] == '1'
775+
assert table.metadata.snapshots[0].summary['added-records'] == '1'
776+
assert table.metadata.snapshots[0].summary['total-data-files'] == '1'
777+
assert table.metadata.snapshots[0].summary['total-records'] == '1'
778+
779+
# read back the data
780+
assert df == table.scan().to_arrow()
781+
782+
725783
@pytest.mark.parametrize(
726784
'catalog',
727785
[

tests/io/test_fsspec.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import os
19+
import tempfile
1820
import uuid
1921

2022
import pytest
2123
from botocore.awsrequest import AWSRequest
24+
from fsspec.implementations.local import LocalFileSystem
2225
from requests_mock import Mocker
2326

2427
from pyiceberg.exceptions import SignError
@@ -27,6 +30,26 @@
2730
from pyiceberg.io.pyarrow import PyArrowFileIO
2831

2932

33+
def test_fsspec_infer_local_fs_from_path(fsspec_fileio: FsspecFileIO) -> None:
34+
"""Test path with `file` scheme and no scheme both use LocalFileSystem"""
35+
assert isinstance(fsspec_fileio.new_output("file://tmp/warehouse")._fs, LocalFileSystem)
36+
assert isinstance(fsspec_fileio.new_output("/tmp/warehouse")._fs, LocalFileSystem)
37+
38+
39+
def test_fsspec_local_fs_can_create_path_without_parent_dir(fsspec_fileio: FsspecFileIO) -> None:
40+
"""Test LocalFileSystem can create path without first creating the parent directories"""
41+
with tempfile.TemporaryDirectory() as tmpdirname:
42+
file_path = f"{tmpdirname}/foo/bar/baz.txt"
43+
output_file = fsspec_fileio.new_output(file_path)
44+
parent_path = os.path.dirname(file_path)
45+
assert output_file._fs.exists(parent_path) is False
46+
try:
47+
with output_file.create() as f:
48+
f.write(b"foo")
49+
except Exception:
50+
pytest.fail("Failed to write to file without parent directory")
51+
52+
3053
@pytest.mark.s3
3154
def test_fsspec_new_input_file(fsspec_fileio: FsspecFileIO) -> None:
3255
"""Test creating a new input file from a fsspec file-io"""

tests/io/test_pyarrow.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,26 @@
9292
)
9393

9494

95+
def test_pyarrow_infer_local_fs_from_path() -> None:
96+
"""Test path with `file` scheme and no scheme both use LocalFileSystem"""
97+
assert isinstance(PyArrowFileIO().new_output("file://tmp/warehouse")._filesystem, LocalFileSystem)
98+
assert isinstance(PyArrowFileIO().new_output("/tmp/warehouse")._filesystem, LocalFileSystem)
99+
100+
101+
def test_pyarrow_local_fs_can_create_path_without_parent_dir() -> None:
102+
"""Test LocalFileSystem can create path without first creating the parent directories"""
103+
with tempfile.TemporaryDirectory() as tmpdirname:
104+
file_path = f"{tmpdirname}/foo/bar/baz.txt"
105+
output_file = PyArrowFileIO().new_output(file_path)
106+
parent_path = os.path.dirname(file_path)
107+
assert output_file._filesystem.get_file_info(parent_path).type == FileType.NotFound
108+
try:
109+
with output_file.create() as f:
110+
f.write(b"foo")
111+
except Exception:
112+
pytest.fail("Failed to write to file without parent directory")
113+
114+
95115
def test_pyarrow_input_file() -> None:
96116
"""Test reading a file using PyArrowFile"""
97117

0 commit comments

Comments
 (0)