From e74a16b63ed2492f53831dec64cd2df7b803ee0c Mon Sep 17 00:00:00 2001 From: "helmi.nour" Date: Thu, 13 Oct 2022 01:29:14 +0100 Subject: [PATCH 1/6] load data v2 protocol --- railib/api.py | 16 ++++++++-- tests/integration.py | 71 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 3 deletions(-) diff --git a/railib/api.py b/railib/api.py index 3574401..232b116 100644 --- a/railib/api.py +++ b/railib/api.py @@ -715,7 +715,7 @@ def list_models(ctx: Context, database: str, engine: str) -> list: # Generate a rel literal relation for the given dict. def _gen_literal_dict(items: dict) -> str: result = [] - for k, v in items: + for k, v in items.items(): result.append(f"{_gen_literal(k)},{_gen_literal(v)}") return "{" + ";".join(result) + "}" @@ -761,6 +761,14 @@ def _gen_syntax_config(syntax: dict = {}) -> str: return result +# Generate list of config schema options for `load_csv` +def _gen_schema_config(schema: dict = {}) -> str: + result = "" + for k, v in schema.items(): + result += f"def config:schema{k} = \"{v}\"\n" + return result + + # `syntax`: # * header: a map from col number to name (base 1) # * header_row: row number of header, 0 means no header (default: 1) @@ -777,6 +785,7 @@ def load_csv( relation: str, data: str or io.TextIOBase, syntax: dict = {}, + schema: dict = {}, ) -> dict: if isinstance(data, str): pass # ok @@ -786,8 +795,9 @@ def load_csv( raise TypeError(f"bad type for arg 'data': {data.__class__.__name__}") inputs = {"data": data} command = _gen_syntax_config(syntax) + command += _gen_schema_config(schema) command += "def config:data = data\n" "def insert:%s = load_csv[config]" % relation - return exec_v1(ctx, database, engine, command, inputs=inputs, readonly=False) + return exec(ctx, database, engine, command, inputs=inputs, readonly=False) def load_json( @@ -805,7 +815,7 @@ def load_json( raise TypeError(f"bad type for arg 'data': {data.__class__.__name__}") inputs = {"data": data} command = "def config:data = data\n" "def insert:%s = load_json[config]" % relation - return exec_v1(ctx, database, engine, command, inputs=inputs, readonly=False) + return exec(ctx, database, engine, command, inputs=inputs, readonly=False) def exec_v1( diff --git a/tests/integration.py b/tests/integration.py index 3e34791..a4f43a5 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -91,5 +91,76 @@ def tearDown(self): api.delete_database(ctx, dbname) +class TestDataload(unittest.TestCase): + def setUp(self): + create_engine_wait(ctx, engine) + api.create_database(ctx, dbname) + + def test_load_json(self): + json = '{ "test" : 123 }' + resp = api.load_json(ctx, dbname, engine, 'test_relation', json) + self.assertEqual("COMPLETED", resp.transaction["state"]) + + resp = api.exec(ctx, dbname, engine, 'def output = test_relation') + self.assertEqual("COMPLETED", resp.transaction["state"]) + self.assertEqual({'v1': [123]}, resp.results[0]["table"].to_pydict()) + + def test_load_csv(self): + csv = 'foo,bar\n1,2' + resp = api.load_csv(ctx, dbname, engine, 'test_relation_1', csv) + self.assertEqual("COMPLETED", resp.transaction["state"]) + + resp = api.exec(ctx, dbname, engine, 'def output = test_relation_1') + self.assertEqual("COMPLETED", resp.transaction["state"]) + self.assertEqual({'v1': [2], 'v2': ['2']}, resp.results[0]["table"].to_pydict()) + self.assertEqual({'v1': [2], 'v2': ['1']}, resp.results[1]["table"].to_pydict()) + + def test_load_csv_with_syntax(self): + csv = 'foo|bar\n1,2' + resp = api.load_csv( + ctx, + dbname, + engine, + 'test_relation_2', + csv, + { + 'header': {1: 'foo', 2: 'bar'}, + 'delim': '|', + 'quotechar': "'", + 'header_row': 0, + 'escapechar': ']' + } + ) + self.assertEqual("COMPLETED", resp.transaction["state"]) + + resp = api.exec(ctx, dbname, engine, 'def output = test_relation_2') + self.assertEqual("COMPLETED", resp.transaction["state"]) + self.assertEqual({'v1': [2], 'v2': [2], 'v3': ['1,2']}, resp.results[0]["table"].to_pydict()) + + def test_load_csv_with_schema(self): + csv = 'foo,bar\n1,test' + resp = api.load_csv( + ctx, + dbname, + engine, + 'test_relation_3', + csv, + schema={ + ':foo': 'int', + ':bar': 'string' + } + ) + self.assertEqual("COMPLETED", resp.transaction["state"]) + + resp = api.exec(ctx, dbname, engine, 'def output = test_relation_3') + self.assertEqual("COMPLETED", resp.transaction["state"]) + self.assertEqual({'v1': [2], 'v2': ['test']}, resp.results[0]["table"].to_pydict()) + self.assertEqual({'v1': [2], 'v2': [1]}, resp.results[1]["table"].to_pydict()) + + def tearDown(self): + api.delete_engine(ctx, engine) + api.delete_database(ctx, dbname) + + if __name__ == '__main__': unittest.main() From e0ce2d8976007a0c0459070ef8ce70bbcb6059a3 Mon Sep 17 00:00:00 2001 From: "helmi.nour" Date: Thu, 13 Oct 2022 11:38:14 +0100 Subject: [PATCH 2/6] fix --- tests/integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration.py b/tests/integration.py index a4f43a5..ba4e028 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -61,7 +61,7 @@ def setUp(self): def test_v2_exec(self): cmd = "x, x^2, x^3, x^4 from x in {1; 2; 3; 4; 5}" - rsp = api.exec(ctx, "hnr-db", "hnr-engine", cmd) + rsp = api.exec(ctx, dbname, engine, cmd) # transaction self.assertEqual("COMPLETED", rsp.transaction["state"]) From f3c477debfbef0934c1aa098ca17db0d8853f312 Mon Sep 17 00:00:00 2001 From: "helmi.nour" Date: Thu, 13 Oct 2022 12:04:10 +0100 Subject: [PATCH 3/6] make tests independent from engine/db names --- tests/integration.py | 52 +++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/tests/integration.py b/tests/integration.py index ba4e028..cd51ad8 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -30,7 +30,6 @@ def create_engine_wait(ctx: api.Context, engine: str): client_credentials_url = os.getenv("CLIENT_CREDENTIALS_URL") if client_id is None: - print("not using config from path") cfg = config.read() else: file = tempfile.NamedTemporaryFile(mode="w") @@ -49,19 +48,18 @@ def create_engine_wait(ctx: api.Context, engine: str): ctx = api.Context(**cfg) -suffix = uuid.uuid4() -engine = f"python-sdk-{suffix}" -dbname = f"python-sdk-{suffix}" - - class TestTransactionAsync(unittest.TestCase): def setUp(self): - create_engine_wait(ctx, engine) - api.create_database(ctx, dbname) + suffix = uuid.uuid4() + self.engine = f"python-sdk-{suffix}" + self.dbname = f"python-sdk-{suffix}" + + create_engine_wait(ctx, self.engine) + api.create_database(ctx, self.dbname) def test_v2_exec(self): cmd = "x, x^2, x^3, x^4 from x in {1; 2; 3; 4; 5}" - rsp = api.exec(ctx, dbname, engine, cmd) + rsp = api.exec(ctx, self.dbname, self.engine, cmd) # transaction self.assertEqual("COMPLETED", rsp.transaction["state"]) @@ -87,30 +85,34 @@ def test_v2_exec(self): 1, 16, 81, 256, 625]}, rsp.results[0]["table"].to_pydict()) def tearDown(self): - api.delete_engine(ctx, engine) - api.delete_database(ctx, dbname) + api.delete_engine(ctx, self.engine) + api.delete_database(ctx, self.dbname) class TestDataload(unittest.TestCase): def setUp(self): - create_engine_wait(ctx, engine) - api.create_database(ctx, dbname) + suffix = uuid.uuid4() + self.engine = f"python-sdk-{suffix}" + self.dbname = f"python-sdk-{suffix}" + + create_engine_wait(ctx, self.engine) + api.create_database(ctx, self.dbname) def test_load_json(self): json = '{ "test" : 123 }' - resp = api.load_json(ctx, dbname, engine, 'test_relation', json) + resp = api.load_json(ctx, self.dbname, self.engine, 'test_relation', json) self.assertEqual("COMPLETED", resp.transaction["state"]) - resp = api.exec(ctx, dbname, engine, 'def output = test_relation') + resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation') self.assertEqual("COMPLETED", resp.transaction["state"]) self.assertEqual({'v1': [123]}, resp.results[0]["table"].to_pydict()) def test_load_csv(self): csv = 'foo,bar\n1,2' - resp = api.load_csv(ctx, dbname, engine, 'test_relation_1', csv) + resp = api.load_csv(ctx, self.dbname, self.engine, 'test_relation_1', csv) self.assertEqual("COMPLETED", resp.transaction["state"]) - resp = api.exec(ctx, dbname, engine, 'def output = test_relation_1') + resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation_1') self.assertEqual("COMPLETED", resp.transaction["state"]) self.assertEqual({'v1': [2], 'v2': ['2']}, resp.results[0]["table"].to_pydict()) self.assertEqual({'v1': [2], 'v2': ['1']}, resp.results[1]["table"].to_pydict()) @@ -119,8 +121,8 @@ def test_load_csv_with_syntax(self): csv = 'foo|bar\n1,2' resp = api.load_csv( ctx, - dbname, - engine, + self.dbname, + self.engine, 'test_relation_2', csv, { @@ -133,7 +135,7 @@ def test_load_csv_with_syntax(self): ) self.assertEqual("COMPLETED", resp.transaction["state"]) - resp = api.exec(ctx, dbname, engine, 'def output = test_relation_2') + resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation_2') self.assertEqual("COMPLETED", resp.transaction["state"]) self.assertEqual({'v1': [2], 'v2': [2], 'v3': ['1,2']}, resp.results[0]["table"].to_pydict()) @@ -141,8 +143,8 @@ def test_load_csv_with_schema(self): csv = 'foo,bar\n1,test' resp = api.load_csv( ctx, - dbname, - engine, + self.dbname, + self.engine, 'test_relation_3', csv, schema={ @@ -152,14 +154,14 @@ def test_load_csv_with_schema(self): ) self.assertEqual("COMPLETED", resp.transaction["state"]) - resp = api.exec(ctx, dbname, engine, 'def output = test_relation_3') + resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation_3') self.assertEqual("COMPLETED", resp.transaction["state"]) self.assertEqual({'v1': [2], 'v2': ['test']}, resp.results[0]["table"].to_pydict()) self.assertEqual({'v1': [2], 'v2': [1]}, resp.results[1]["table"].to_pydict()) def tearDown(self): - api.delete_engine(ctx, engine) - api.delete_database(ctx, dbname) + api.delete_engine(ctx, self.engine) + api.delete_database(ctx, self.dbname) if __name__ == '__main__': From 6f45ea42e9838a9257f9a2204890ca92b1e48548 Mon Sep 17 00:00:00 2001 From: "helmi.nour" Date: Thu, 13 Oct 2022 12:06:39 +0100 Subject: [PATCH 4/6] fix linter --- tests/integration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration.py b/tests/integration.py index cd51ad8..15a9680 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -48,6 +48,7 @@ def create_engine_wait(ctx: api.Context, engine: str): ctx = api.Context(**cfg) + class TestTransactionAsync(unittest.TestCase): def setUp(self): suffix = uuid.uuid4() From 2e40b4ffe93d218d5b0437475bb1552a5726a579 Mon Sep 17 00:00:00 2001 From: "helmi.nour" Date: Thu, 13 Oct 2022 13:47:08 +0100 Subject: [PATCH 5/6] debug --- tests/integration.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/tests/integration.py b/tests/integration.py index 15a9680..bb7892c 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -51,6 +51,7 @@ def create_engine_wait(ctx: api.Context, engine: str): class TestTransactionAsync(unittest.TestCase): def setUp(self): + print("==> setup 1") suffix = uuid.uuid4() self.engine = f"python-sdk-{suffix}" self.dbname = f"python-sdk-{suffix}" @@ -59,6 +60,7 @@ def setUp(self): api.create_database(ctx, self.dbname) def test_v2_exec(self): + print("=> test v2 exec") cmd = "x, x^2, x^3, x^4 from x in {1; 2; 3; 4; 5}" rsp = api.exec(ctx, self.dbname, self.engine, cmd) @@ -86,20 +88,25 @@ def test_v2_exec(self): 1, 16, 81, 256, 625]}, rsp.results[0]["table"].to_pydict()) def tearDown(self): + print("==> tear down 1") api.delete_engine(ctx, self.engine) api.delete_database(ctx, self.dbname) class TestDataload(unittest.TestCase): def setUp(self): + print("==> setup 2") suffix = uuid.uuid4() self.engine = f"python-sdk-{suffix}" self.dbname = f"python-sdk-{suffix}" + print(self.engine) + print(self.dbname) create_engine_wait(ctx, self.engine) api.create_database(ctx, self.dbname) def test_load_json(self): + print("=> test load json") json = '{ "test" : 123 }' resp = api.load_json(ctx, self.dbname, self.engine, 'test_relation', json) self.assertEqual("COMPLETED", resp.transaction["state"]) @@ -109,22 +116,24 @@ def test_load_json(self): self.assertEqual({'v1': [123]}, resp.results[0]["table"].to_pydict()) def test_load_csv(self): + print("=> test load csv") csv = 'foo,bar\n1,2' - resp = api.load_csv(ctx, self.dbname, self.engine, 'test_relation_1', csv) + resp = api.load_csv(ctx, self.dbname, self.engine, 'test_relation', csv) self.assertEqual("COMPLETED", resp.transaction["state"]) - resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation_1') + resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation') self.assertEqual("COMPLETED", resp.transaction["state"]) self.assertEqual({'v1': [2], 'v2': ['2']}, resp.results[0]["table"].to_pydict()) self.assertEqual({'v1': [2], 'v2': ['1']}, resp.results[1]["table"].to_pydict()) def test_load_csv_with_syntax(self): + print("=> load csv with syntax") csv = 'foo|bar\n1,2' resp = api.load_csv( ctx, self.dbname, self.engine, - 'test_relation_2', + 'test_relation', csv, { 'header': {1: 'foo', 2: 'bar'}, @@ -136,17 +145,18 @@ def test_load_csv_with_syntax(self): ) self.assertEqual("COMPLETED", resp.transaction["state"]) - resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation_2') + resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation') self.assertEqual("COMPLETED", resp.transaction["state"]) self.assertEqual({'v1': [2], 'v2': [2], 'v3': ['1,2']}, resp.results[0]["table"].to_pydict()) def test_load_csv_with_schema(self): + print("=> load csv with schema") csv = 'foo,bar\n1,test' resp = api.load_csv( ctx, self.dbname, self.engine, - 'test_relation_3', + 'test_relation', csv, schema={ ':foo': 'int', @@ -155,12 +165,15 @@ def test_load_csv_with_schema(self): ) self.assertEqual("COMPLETED", resp.transaction["state"]) - resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation_3') + resp = api.exec(ctx, self.dbname, self.engine, 'def output = test_relation') self.assertEqual("COMPLETED", resp.transaction["state"]) self.assertEqual({'v1': [2], 'v2': ['test']}, resp.results[0]["table"].to_pydict()) self.assertEqual({'v1': [2], 'v2': [1]}, resp.results[1]["table"].to_pydict()) def tearDown(self): + print("==> tear down 2") + print(self.engine) + print(self.dbname) api.delete_engine(ctx, self.engine) api.delete_database(ctx, self.dbname) From 87d61a8fef0696018997adb2b976779e0ad6b3d0 Mon Sep 17 00:00:00 2001 From: "helmi.nour" Date: Thu, 13 Oct 2022 13:57:41 +0100 Subject: [PATCH 6/6] cleanup --- tests/integration.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/tests/integration.py b/tests/integration.py index bb7892c..0b98b7b 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -51,7 +51,6 @@ def create_engine_wait(ctx: api.Context, engine: str): class TestTransactionAsync(unittest.TestCase): def setUp(self): - print("==> setup 1") suffix = uuid.uuid4() self.engine = f"python-sdk-{suffix}" self.dbname = f"python-sdk-{suffix}" @@ -60,7 +59,6 @@ def setUp(self): api.create_database(ctx, self.dbname) def test_v2_exec(self): - print("=> test v2 exec") cmd = "x, x^2, x^3, x^4 from x in {1; 2; 3; 4; 5}" rsp = api.exec(ctx, self.dbname, self.engine, cmd) @@ -88,25 +86,20 @@ def test_v2_exec(self): 1, 16, 81, 256, 625]}, rsp.results[0]["table"].to_pydict()) def tearDown(self): - print("==> tear down 1") api.delete_engine(ctx, self.engine) api.delete_database(ctx, self.dbname) class TestDataload(unittest.TestCase): def setUp(self): - print("==> setup 2") suffix = uuid.uuid4() self.engine = f"python-sdk-{suffix}" self.dbname = f"python-sdk-{suffix}" - print(self.engine) - print(self.dbname) create_engine_wait(ctx, self.engine) api.create_database(ctx, self.dbname) def test_load_json(self): - print("=> test load json") json = '{ "test" : 123 }' resp = api.load_json(ctx, self.dbname, self.engine, 'test_relation', json) self.assertEqual("COMPLETED", resp.transaction["state"]) @@ -116,7 +109,6 @@ def test_load_json(self): self.assertEqual({'v1': [123]}, resp.results[0]["table"].to_pydict()) def test_load_csv(self): - print("=> test load csv") csv = 'foo,bar\n1,2' resp = api.load_csv(ctx, self.dbname, self.engine, 'test_relation', csv) self.assertEqual("COMPLETED", resp.transaction["state"]) @@ -127,7 +119,6 @@ def test_load_csv(self): self.assertEqual({'v1': [2], 'v2': ['1']}, resp.results[1]["table"].to_pydict()) def test_load_csv_with_syntax(self): - print("=> load csv with syntax") csv = 'foo|bar\n1,2' resp = api.load_csv( ctx, @@ -150,7 +141,6 @@ def test_load_csv_with_syntax(self): self.assertEqual({'v1': [2], 'v2': [2], 'v3': ['1,2']}, resp.results[0]["table"].to_pydict()) def test_load_csv_with_schema(self): - print("=> load csv with schema") csv = 'foo,bar\n1,test' resp = api.load_csv( ctx, @@ -171,9 +161,6 @@ def test_load_csv_with_schema(self): self.assertEqual({'v1': [2], 'v2': [1]}, resp.results[1]["table"].to_pydict()) def tearDown(self): - print("==> tear down 2") - print(self.engine) - print(self.dbname) api.delete_engine(ctx, self.engine) api.delete_database(ctx, self.dbname)