-
Notifications
You must be signed in to change notification settings - Fork 393
Description
Apache Iceberg version
0.7.1
Please describe the bug 🐞
Summary
I'm currently trying to migrate a couple of dataframes with a custom hive-like storage scheme to Iceberg. After a lot of fiddling I managed to load the dataframes from an Azure storage, create the table in the Iceberg catalog (currently using sqlite + local fs) and append fragments from the Parquet dataset. As soon as adding a thread pool I always run into concurrency issues.
Errors
I get either of the following two error messages:
CommitFailedException: Requirement failed: branch main has changed: expected id 7548527194257629329, found
8136001929437813453
or
CommitFailedException: Requirement failed: branch main was created concurrently
Sources
I use Dataset.get_fragments and insert the data into an iceberg table with identical partitioning.
I can work around this error by using a GIL (global iceberg lock, pun intended.) which is just a threading.Lock() that ensures every load_table() + table.append happens atomically. But that kills almost all performance gains there could be made. Also I plan on using this in some Celery runners . So using a threading.Lock() is no option in the future anyways.
azure_import.py
#!/bin/env -S poetry run python
from concurrent.futures import ThreadPoolExecutor, as_completed
import pyarrow as pa
import pyarrow.dataset as pd
from adlfs import AzureBlobFileSystem
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
from pyarrow.dataset import HivePartitioning
from pyiceberg.catalog import Catalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.io.pyarrow import pyarrow_to_schema
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.table.name_mapping import MappedField, NameMapping
from pyiceberg.transforms import IdentityTransform
import settings
class AzureStorage:
def __init__(self):
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(
settings.AZURE_BLOB_URL, credential
)
self.container_client = blob_service_client.get_container_client(
settings.AZURE_BLOB_CONTAINER
)
# The AzureBlobFileSystem doesn't cleanly shutdown and currently
# always raises an expection at the end of this program. See:
# https://github.com/fsspec/adlfs/issues/431
self.abfs = AzureBlobFileSystem(
account_name=settings.AZURE_BLOB_ACCOUNT_NAME,
credential=credential,
)
def list_tables(self):
return self.container_client.walk_blobs(
settings.AZURE_LIVE_PATH, delimiter="/"
)
def load_dataset(self, table_name) -> pd.Dataset:
name = "/".join((settings.AZURE_LIVE_PATH.rstrip("/"), table_name))
dataset = pd.dataset(
"/".join([settings.AZURE_LIVE_CONTAINER, name]),
format="parquet",
filesystem=self.abfs,
partitioning=HivePartitioning(
pa.schema(
[
("dataset", pa.string()),
("flavor", pa.string()),
]
)
),
)
return dataset
def create_iceberg_catalog():
catalog = SqlCatalog(
"default",
**{
"uri": settings.ICEBERG_DATABASE_URI,
"warehouse": settings.ICEBERG_WAREHOUSE,
},
)
return catalog
def download_table(catalog: Catalog, table_name: str, ds: pd.Dataset):
name_mapping = NameMapping(
root=[
MappedField(field_id=field_id, names=[field.name])
for field_id, field in enumerate(ds.schema, 1)
]
)
schema = pyarrow_to_schema(ds.schema, name_mapping=name_mapping)
assert isinstance(ds.partitioning, HivePartitioning), ds.partitioning
partitioning_spec = PartitionSpec(
*(
PartitionField(
source_id=name_mapping.find(field.name).field_id,
field_id=-1,
transform=IdentityTransform(),
name=field.name,
)
for field in ds.partitioning.schema
)
)
table = catalog.create_table(
f"{settings.ICEBERG_NAMESPACE}.{table_name}",
schema=schema,
partition_spec=partitioning_spec,
)
fragments = list(ds.get_fragments())
with ThreadPoolExecutor(8) as executor:
futures = [
executor.submit(
download_fragment,
table.identifier,
fragment,
)
for fragment in fragments
]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
executor.shutdown(wait=False, cancel_futures=True)
raise e from None
def download_fragment(
table_identifier: str,
fragment,
):
catalog = create_iceberg_catalog()
partition_keys = pd.get_partition_keys(fragment.partition_expression)
fragment_table = fragment.to_table()
for k, v in partition_keys.items():
fragment_table = fragment_table.append_column(
pa.field(k, pa.string(), nullable=False),
pa.repeat(pa.scalar(v), fragment_table.num_rows),
)
table = catalog.load_table(table_identifier)
table.append(fragment_table)
def import_data(storage: AzureStorage, catalog, table_name):
dataset = storage.load_dataset(table_name)
download_table(catalog, table_name, dataset)
def main():
catalog = create_iceberg_catalog()
catalog.create_namespace_if_not_exists(settings.ICEBERG_NAMESPACE)
storage = AzureStorage()
for table_name in storage.list_tables():
import_data(storage, catalog, table_name)
if __name__ == "__main__":
main()pyproject.toml
[tool.poetry]
name = "iceberg-azure-importer"
version = "0.1.0"
description = ""
authors = ["Michael P. Jung <[email protected]>"]
package-mode = false
[tool.poetry.dependencies]
python = "^3.12"
pyiceberg = { extras = ["sql-postgres"], version = "^0.7.1" }
azure-identity = "^1.17.1"
adlfs = "^2024"
psutil = "^6.0.0"
pyarrow = "^17.0.0"
fsspec = "^2024"