Skip to content

Commit ef96baf

Browse files
committed
Feat: Add fail_if_exists param to create_table
1 parent e9e265a commit ef96baf

File tree

12 files changed

+159
-32
lines changed

12 files changed

+159
-32
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ def create_table(
297297
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
298298
sort_order: SortOrder = UNSORTED_SORT_ORDER,
299299
properties: Properties = EMPTY_DICT,
300+
fail_if_exists: bool = True,
300301
) -> Table:
301302
"""Create a table.
302303
@@ -307,6 +308,7 @@ def create_table(
307308
partition_spec (PartitionSpec): PartitionSpec for the table.
308309
sort_order (SortOrder): SortOrder for the table.
309310
properties (Properties): Table properties that can be a string based dictionary.
311+
fail_if_exists (bool): If True, raise an error if the table already exists.
310312
311313
Returns:
312314
Table: the created table instance.

pyiceberg/catalog/dynamodb.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def create_table(
136136
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
137137
sort_order: SortOrder = UNSORTED_SORT_ORDER,
138138
properties: Properties = EMPTY_DICT,
139+
fail_if_exists: bool = True,
139140
) -> Table:
140141
"""
141142
Create an Iceberg table.
@@ -147,6 +148,7 @@ def create_table(
147148
partition_spec: PartitionSpec for the table.
148149
sort_order: SortOrder for the table.
149150
properties: Table properties that can be a string based dictionary.
151+
fail_if_exists: If True, raise an error if the table already exists.
150152
151153
Returns:
152154
Table: the created table instance.
@@ -178,7 +180,8 @@ def create_table(
178180
condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})",
179181
)
180182
except ConditionalCheckFailedException as e:
181-
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
183+
if fail_if_exists:
184+
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
182185

183186
return self.load_table(identifier=identifier)
184187

pyiceberg/catalog/glue.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,18 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
302302
catalog=self,
303303
)
304304

305-
def _create_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef) -> None:
305+
def _create_glue_table(
306+
self,
307+
database_name: str,
308+
table_name: str,
309+
table_input: TableInputTypeDef,
310+
fail_if_exists: bool = True,
311+
) -> None:
306312
try:
307313
self.glue.create_table(DatabaseName=database_name, TableInput=table_input)
308314
except self.glue.exceptions.AlreadyExistsException as e:
309-
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
315+
if fail_if_exists:
316+
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
310317
except self.glue.exceptions.EntityNotFoundException as e:
311318
raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e
312319

@@ -340,6 +347,7 @@ def create_table(
340347
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
341348
sort_order: SortOrder = UNSORTED_SORT_ORDER,
342349
properties: Properties = EMPTY_DICT,
350+
fail_if_exists: bool = True,
343351
) -> Table:
344352
"""
345353
Create an Iceberg table.
@@ -351,6 +359,7 @@ def create_table(
351359
partition_spec: PartitionSpec for the table.
352360
sort_order: SortOrder for the table.
353361
properties: Table properties that can be a string based dictionary.
362+
fail_if_exists: If True, raise an error if the table already exists.
354363
355364
Returns:
356365
Table: the created table instance.
@@ -374,7 +383,9 @@ def create_table(
374383

375384
table_input = _construct_table_input(table_name, metadata_location, properties, metadata)
376385
database_name, table_name = self.identifier_to_database_and_table(identifier)
377-
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
386+
self._create_glue_table(
387+
database_name=database_name, table_name=table_name, table_input=table_input, fail_if_exists=fail_if_exists
388+
)
378389

379390
return self.load_table(identifier=identifier)
380391

pyiceberg/catalog/hive.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ def create_table(
269269
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
270270
sort_order: SortOrder = UNSORTED_SORT_ORDER,
271271
properties: Properties = EMPTY_DICT,
272+
fail_if_exists: bool = True,
272273
) -> Table:
273274
"""Create a table.
274275
@@ -279,6 +280,7 @@ def create_table(
279280
partition_spec: PartitionSpec for the table.
280281
sort_order: SortOrder for the table.
281282
properties: Table properties that can be a string based dictionary.
283+
fail_if_exists: If True, raise an error if the table already exists.
282284
283285
Returns:
284286
Table: the created table instance.
@@ -321,7 +323,10 @@ def create_table(
321323
open_client.create_table(tbl)
322324
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
323325
except AlreadyExistsException as e:
324-
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
326+
if fail_if_exists:
327+
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
328+
else:
329+
hive_table = self.load_table(identifier)
325330

326331
return self._convert_hive_into_iceberg(hive_table, io)
327332

pyiceberg/catalog/noop.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def create_table(
4747
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
4848
sort_order: SortOrder = UNSORTED_SORT_ORDER,
4949
properties: Properties = EMPTY_DICT,
50+
fail_if_exists: bool = True,
5051
) -> Table:
5152
raise NotImplementedError
5253

pyiceberg/catalog/rest.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ def create_table(
446446
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
447447
sort_order: SortOrder = UNSORTED_SORT_ORDER,
448448
properties: Properties = EMPTY_DICT,
449+
fail_if_exists: bool = True,
449450
) -> Table:
450451
iceberg_schema = self._convert_schema_if_needed(schema)
451452
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
@@ -468,11 +469,16 @@ def create_table(
468469
)
469470
try:
470471
response.raise_for_status()
472+
table_response = TableResponse(**response.json())
473+
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)
471474
except HTTPError as exc:
472-
self._handle_non_200_response(exc, {409: TableAlreadyExistsError})
473-
474-
table_response = TableResponse(**response.json())
475-
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)
475+
try:
476+
self._handle_non_200_response(exc, {409: TableAlreadyExistsError})
477+
except TableAlreadyExistsError:
478+
if fail_if_exists:
479+
raise
480+
return self.load_table(identifier)
481+
raise
476482

477483
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
478484
"""Register a new table using existing metadata.

pyiceberg/catalog/sql.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def create_table(
153153
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
154154
sort_order: SortOrder = UNSORTED_SORT_ORDER,
155155
properties: Properties = EMPTY_DICT,
156+
fail_if_exists: bool = True,
156157
) -> Table:
157158
"""
158159
Create an Iceberg table.
@@ -164,6 +165,7 @@ def create_table(
164165
partition_spec: PartitionSpec for the table.
165166
sort_order: SortOrder for the table.
166167
properties: Table properties that can be a string based dictionary.
168+
fail_if_exists: If True, raise an error if the table already exists.
167169
168170
Returns:
169171
Table: the created table instance.
@@ -200,8 +202,8 @@ def create_table(
200202
)
201203
session.commit()
202204
except IntegrityError as e:
203-
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
204-
205+
if fail_if_exists:
206+
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
205207
return self.load_table(identifier=identifier)
206208

207209
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:

tests/catalog/integration_test_dynamodb.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
from typing import Generator, List
18+
from typing import Generator, List, Type
1919

2020
import boto3
2121
import pytest
@@ -89,11 +89,29 @@ def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_
8989
test_catalog.create_table(identifier, table_schema_nested)
9090

9191

92-
def test_create_duplicated_table(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
92+
@pytest.mark.parametrize(
93+
"fail_if_exists, expected_exception",
94+
[
95+
(True, TableAlreadyExistsError),
96+
(False, None),
97+
],
98+
)
99+
def test_create_duplicated_table(
100+
test_catalog: Catalog,
101+
table_schema_nested: Schema,
102+
database_name: str,
103+
table_name: str,
104+
fail_if_exists: bool,
105+
expected_exception: Type[Exception],
106+
) -> None:
93107
test_catalog.create_namespace(database_name)
94108
test_catalog.create_table((database_name, table_name), table_schema_nested)
95-
with pytest.raises(TableAlreadyExistsError):
96-
test_catalog.create_table((database_name, table_name), table_schema_nested)
109+
if expected_exception:
110+
with pytest.raises(expected_exception):
111+
test_catalog.create_table((database_name, table_name), table_schema_nested, fail_if_exists=fail_if_exists)
112+
else:
113+
table = test_catalog.create_table((database_name, table_name), table_schema_nested, fail_if_exists=fail_if_exists)
114+
assert table.identifier == (test_catalog.name, database_name, table_name)
97115

98116

99117
def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:

tests/catalog/integration_test_glue.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717

1818
import time
19-
from typing import Any, Dict, Generator, List
19+
from typing import Any, Dict, Generator, List, Type
2020
from uuid import uuid4
2121

2222
import boto3
@@ -193,11 +193,29 @@ def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_
193193
test_catalog.create_table(identifier, table_schema_nested)
194194

195195

196-
def test_create_duplicated_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
196+
@pytest.mark.parametrize(
197+
"fail_if_exists, expected_exception",
198+
[
199+
(True, TableAlreadyExistsError),
200+
(False, None),
201+
],
202+
)
203+
def test_create_duplicated_table(
204+
test_catalog: Catalog,
205+
table_schema_nested: Schema,
206+
table_name: str,
207+
database_name: str,
208+
fail_if_exists: bool,
209+
expected_exception: Type[Exception],
210+
) -> None:
197211
test_catalog.create_namespace(database_name)
198212
test_catalog.create_table((database_name, table_name), table_schema_nested)
199-
with pytest.raises(TableAlreadyExistsError):
200-
test_catalog.create_table((database_name, table_name), table_schema_nested)
213+
if expected_exception:
214+
with pytest.raises(expected_exception):
215+
test_catalog.create_table((database_name, table_name), table_schema_nested, fail_if_exists=fail_if_exists)
216+
else:
217+
table = test_catalog.create_table((database_name, table_name), table_schema_nested, fail_if_exists=fail_if_exists)
218+
assert table.identifier == (CATALOG_NAME, database_name, table_name)
201219

202220

203221
def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:

tests/catalog/test_dynamodb.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
from typing import List
17+
from typing import List, Type
1818
from unittest import mock
1919

2020
import boto3
@@ -164,16 +164,33 @@ def test_create_table_with_no_database(
164164
test_catalog.create_table(identifier=identifier, schema=table_schema_nested)
165165

166166

167+
@pytest.mark.parametrize(
168+
"fail_if_exists, expected_exception",
169+
[
170+
(True, TableAlreadyExistsError),
171+
(False, None),
172+
],
173+
)
167174
@mock_aws
168175
def test_create_duplicated_table(
169-
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
176+
_bucket_initialize: None,
177+
moto_endpoint_url: str,
178+
table_schema_nested: Schema,
179+
database_name: str,
180+
table_name: str,
181+
fail_if_exists: bool,
182+
expected_exception: Type[Exception],
170183
) -> None:
171184
identifier = (database_name, table_name)
172185
test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
173186
test_catalog.create_namespace(namespace=database_name)
174187
test_catalog.create_table(identifier, table_schema_nested)
175-
with pytest.raises(TableAlreadyExistsError):
176-
test_catalog.create_table(identifier, table_schema_nested)
188+
if fail_if_exists:
189+
with pytest.raises(expected_exception):
190+
test_catalog.create_table(identifier, table_schema_nested, fail_if_exists=fail_if_exists)
191+
else:
192+
table = test_catalog.create_table(identifier, table_schema_nested, fail_if_exists=fail_if_exists)
193+
assert table.identifier == ("test_ddb_catalog",) + identifier
177194

178195

179196
@mock_aws

0 commit comments

Comments
 (0)