Skip to content

Scan does not work as expected #495

@ndrluis

Description

@ndrluis

I'm testing using the iceberg rest image from Tabular as a catalog.

Here's the docker-compose.yml file:

version: '3.8'

services:
  rest:
    image: tabulario/iceberg-rest:0.10.0
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000
    depends_on:
      - minio
    ports:
      - "8181:8181"
    networks:
      iceberg_net:

  minio:
    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    networks:
      iceberg_net:
        aliases:
          - warehouse.minio
    expose:
      - 9001
      - 9000
    ports:
      - "9000:9000"
      - "9001:9001"
    command: [ "server", "/data", "--console-address", ":9001" ]

  mc:
    depends_on:
      - minio
    image: minio/mc:RELEASE.2024-03-07T00-31-49Z
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
        until (/usr/bin/mc config host add minio http://minio:9000 admin password) do
          echo '...waiting...' && sleep 1;
        done;
        /usr/bin/mc mb minio/warehouse;
        /usr/bin/mc policy set public minio/warehouse;
        tail -f /dev/null
      "
    networks:
      iceberg_net:

networks:
  iceberg_net:

I created some data with PyIceberg:

from pyiceberg.catalog import load_catalog
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
        "warehouse": "demo",
    },
)

catalog.create_namespace_if_not_exists("default")

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

tbl = catalog.create_table_if_not_exists("default.cities", schema=schema)

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)

tbl.append(df)

And queried with PyIceberg to verify if it's okay:

from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181/",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
        "warehouse": "demo",
    },
)

tbl: Table = catalog.load_table("default.cities")

res = tbl.scan().to_arrow()

print(len(res))

It returns 4.

And then with the Rust implementation:

use std::collections::HashMap;

use futures::TryStreamExt;
use iceberg::{
    io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY},
    Catalog, TableIdent,
};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};

#[tokio::main]
async fn main() {
    // Create catalog
    let config = RestCatalogConfig::builder()
        .uri("http://localhost:8181".to_string())
        .warehouse("demo".to_string())
        .props(HashMap::from([
            (S3_ENDPOINT.to_string(), "http://localhost:9000".to_string()),
            (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
            (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
            (S3_REGION.to_string(), "us-east-1".to_string()),
        ]))
        .build();

    let catalog = RestCatalog::new(config);

    let table = catalog
        .load_table(&TableIdent::from_strs(["default", "cities"]).unwrap())
        .await
        .unwrap();

    let scan = table.scan().select_all().build().unwrap();
    let batch_stream = scan.to_arrow().await.unwrap();

    dbg!(scan);

    let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

    dbg!(batches.len());
}

Its returning nothing.

We have to define the S3 configurations because the Tabular image does not return the S3 credentials during the get config process.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions