Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,27 @@ latest_schema_id: [[null,0,0,0]]
latest_sequence_number: [[null,0,0,0]]
```

### History

To show a table's history:

```python
table.inspect.history()
```

```
pyarrow.Table
made_current_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
is_current_ancestor: bool not null
----
made_current_at: [[2024-06-18 16:17:48.768,2024-06-18 16:17:49.240,2024-06-18 16:17:49.343,2024-06-18 16:17:49.511]]
snapshot_id: [[4358109269873137077,3380769165026943338,4358109269873137077,3089420140651211776]]
parent_id: [[null,4358109269873137077,null,4358109269873137077]]
is_current_ancestor: [[true,false,true,true]]
```

## Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
Expand Down
28 changes: 28 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
SnapshotLogEntry,
SnapshotSummaryCollector,
Summary,
ancestors_of,
update_snapshot_summaries,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -3879,6 +3880,33 @@ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any
schema=table_schema,
)

def history(self) -> "pa.Table":
import pyarrow as pa

history_schema = pa.schema([
pa.field("made_current_at", pa.timestamp(unit="ms"), nullable=False),
pa.field("snapshot_id", pa.int64(), nullable=False),
pa.field("parent_id", pa.int64(), nullable=True),
pa.field("is_current_ancestor", pa.bool_(), nullable=False),
])

ancestors_ids = {snapshot.snapshot_id for snapshot in ancestors_of(self.tbl.current_snapshot(), self.tbl.metadata)}

history = []
metadata = self.tbl.metadata

for snapshot_entry in metadata.snapshot_log:
snapshot = metadata.snapshot_by_id(snapshot_entry.snapshot_id)

history.append({
"made_current_at": datetime.utcfromtimestamp(snapshot_entry.timestamp_ms / 1000.0),
"snapshot_id": snapshot_entry.snapshot_id,
"parent_id": snapshot.parent_snapshot_id if snapshot else None,
"is_current_ancestor": snapshot_entry.snapshot_id in ancestors_ids,
})

return pa.Table.from_pylist(history, schema=history_schema)


@dataclass(frozen=True)
class TablePartition:
Expand Down
69 changes: 69 additions & 0 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,72 @@ def test_inspect_metadata_log_entries(
if column == "timestamp":
continue
assert left == right, f"Difference in column {column}: {left} != {right}"


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = "default.table_history"

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

spark.sql(
f"""
CREATE TABLE {identifier} (
id int,
data string
)
PARTITIONED BY (data)
"""
)

spark.sql(
f"""
INSERT INTO {identifier} VALUES (1, "a")
"""
)

table = session_catalog.load_table(identifier)
first_snapshot = table.current_snapshot()
snapshot_id = None if not first_snapshot else first_snapshot.snapshot_id

spark.sql(
f"""
INSERT INTO {identifier} VALUES (2, "b")
"""
)

spark.sql(
f"""
CALL integration.system.rollback_to_snapshot('{identifier}', {snapshot_id})
"""
)

spark.sql(
f"""
INSERT INTO {identifier} VALUES (3, "c")
"""
)

table.refresh()

df = table.inspect.history()

assert df.column_names == [
"made_current_at",
"snapshot_id",
"parent_id",
"is_current_ancestor",
]

lhs = spark.table(f"{identifier}.history").toPandas()
rhs = df.to_pandas()
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
# NaN != NaN in Python
continue
assert left == right, f"Difference in column {column}: {left} != {right}"