|
4 | 4 | import traceback |
5 | 5 | import unittest |
6 | 6 | import random |
| 7 | +import re |
7 | 8 | from pyspark.sql.utils import AnalysisException |
8 | 9 | from pyspark.sql import SparkSession |
9 | 10 | from pyspark.sql.functions import * |
@@ -85,8 +86,11 @@ def __exit__(self, etype, evalue, traceback): |
85 | 86 | sql("DROP TABLE IF EXISTS {0}".format(self.table_name)) |
86 | 87 | return False # don't suppress exceptions |
87 | 88 |
|
| 89 | +def sort_by(col_name, rows): |
| 90 | + return sorted(rows, cmp = lambda a, b: cmp(a[col_name], b[col_name])) |
| 91 | + |
88 | 92 | def sort_by_id(rows): |
89 | | - return sorted(rows, cmp = lambda a, b: cmp(a['id'], b['id'])) |
| 93 | + return sort_by('id', rows) |
90 | 94 |
|
91 | 95 | def collect(df): |
92 | 96 | return list(map(lambda r: r.asDict(), df.collect())) |
@@ -176,6 +180,50 @@ class IcebergDDLTest(unittest.TestCase): |
176 | 180 | # {'id': 3, 'data': 'c'} |
177 | 181 | # ]) |
178 | 182 |
|
| 183 | + def test_show_create_table(self): |
| 184 | + with temp_table("test_show_create") as t: |
| 185 | + sql("CREATE TABLE {0} (id bigint, data string) USING iceberg", t) |
| 186 | + create_sql = collect(sql("SHOW CREATE TABLE {0}", t))[0]['create_statement'] |
| 187 | + |
| 188 | + quoted = '.'.join([ "`" + part + "`" for part in t.split('.') ]) |
| 189 | + expected = "CREATE TABLE {0} ( id bigint, data string) USING iceberg".format(quoted) |
| 190 | + self.assertEqual(expected, re.sub(r"[\s]+", ' ', create_sql)) |
| 191 | + |
| 192 | + def test_create_table_like(self): |
| 193 | + with temp_table("test_source") as source: |
| 194 | + sql("CREATE TABLE {0} (id bigint, data string) USING iceberg", source) |
| 195 | + with temp_table("test_copy") as copy: |
| 196 | + sql("CREATE TABLE {0} LIKE {1}", copy, source) |
| 197 | + create_sql = collect(sql("SHOW CREATE TABLE {0}", copy))[0]['create_statement'] |
| 198 | + |
| 199 | + quoted = '.'.join([ "`" + part + "`" for part in copy.split('.') ]) |
| 200 | + expected = "CREATE TABLE {0} ( id bigint, data string) USING iceberg".format(quoted) |
| 201 | + self.assertEqual(expected, re.sub(r"[\s]+", ' ', create_sql)) |
| 202 | + |
| 203 | + def test_alter_table_properties(self): |
| 204 | + with temp_table("test_table_properties") as t: |
| 205 | + sql("CREATE TABLE {0} (id bigint, data string) USING iceberg", t) |
| 206 | + rows = collect(sql("SHOW TBLPROPERTIES {0}", t)) |
| 207 | + self.assertEqual(sort_by('property', rows), [ |
| 208 | + {'property': 'provider', 'value': 'iceberg'} |
| 209 | + ]) |
| 210 | + |
| 211 | + sql("ALTER TABLE {0} SET TBLPROPERTIES ('aa'='AA', 'zz'='ZZ')", t) |
| 212 | + |
| 213 | + # test all table properties |
| 214 | + rows = collect(sql("SHOW TBLPROPERTIES {0}", t)) |
| 215 | + self.assertEqual(sort_by('property', rows), [ |
| 216 | + {'property': 'aa', 'value': 'AA'}, |
| 217 | + {'property': 'provider', 'value': 'iceberg'}, |
| 218 | + {'property': 'zz', 'value': 'ZZ'} |
| 219 | + ]) |
| 220 | + |
| 221 | + # test single property lookup |
| 222 | + rows = collect(sql("SHOW TBLPROPERTIES {0} ('provider')", t)) |
| 223 | + self.assertEqual(sort_by('property', rows), [ |
| 224 | + {'property': 'provider', 'value': 'iceberg'} |
| 225 | + ]) |
| 226 | + |
179 | 227 | def test_alter_table_add_columns(self): |
180 | 228 | with temp_table("test_add_columns") as t: |
181 | 229 | sql("CREATE TABLE {0} (id bigint, data string) USING iceberg", t) |
|
0 commit comments