From 60608b4b6209b7d2f0ffeb0d7087bf906d42f744 Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Fri, 20 Dec 2024 17:08:51 +0200 Subject: [PATCH 01/12] wip --- .github/workflows/flink-sql-runner.yaml | 4 ++ .vscode/settings.json | 3 ++ pom.xml | 48 +++++++++++++++++++ src/main/java/io/ecraft/SqlRunner.java | 46 ++++++++++++++++++ src/test/java/io/ecraft/SqlRunnerTest.java | 44 +++++++++++++++++ .../io/ecraft/fixtures/deployableconfig1.json | 12 +++++ 6 files changed, 157 insertions(+) create mode 100644 .vscode/settings.json create mode 100644 src/test/java/io/ecraft/fixtures/deployableconfig1.json diff --git a/.github/workflows/flink-sql-runner.yaml b/.github/workflows/flink-sql-runner.yaml index 7329a42..2f78b74 100644 --- a/.github/workflows/flink-sql-runner.yaml +++ b/.github/workflows/flink-sql-runner.yaml @@ -20,6 +20,10 @@ jobs: with: distribution: temurin java-version-file: .tool-versions + - name: Run tests with Maven + run: | + mvn test + working-directory: . - name: Build with Maven run: | mvn clean package diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..1133129 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.configuration.updateBuildConfiguration": "automatic" +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 01afb8d..d4e1812 100644 --- a/pom.xml +++ b/pom.xml @@ -10,6 +10,7 @@ 1.18.1 1.18 + 2.12 3.1.3 UTF-8 1.8 @@ -125,6 +126,11 @@ 2.3 + + org.json + json + 20210307 + @@ -133,6 +139,48 @@ ${junit.jupiter.version} test + + + org.apache.flink + flink-core + ${flink.version} + test + + + + org.apache.flink + flink-streaming-java + ${flink.version} + test + + + + org.apache.flink + flink-table-api-java + ${flink.version} + test + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + test + + + + org.apache.flink + flink-connector-files + ${flink.version} + test + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + diff --git a/src/main/java/io/ecraft/SqlRunner.java b/src/main/java/io/ecraft/SqlRunner.java index 7c6a318..62d3c9b 100644 --- a/src/main/java/io/ecraft/SqlRunner.java +++ b/src/main/java/io/ecraft/SqlRunner.java @@ -15,6 +15,7 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogStore; @@ -22,6 +23,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.velocity.VelocityContext; import org.apache.velocity.app.Velocity; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,6 +93,19 @@ public static void main(String[] args) throws Exception { transferTo(zipInputStream, zipEntryOutputStream); } + // Read the json file + // String jsonName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".json"; + // Path jsonPath = new Path("/tmp/" + jobName + "/" + jsonName); + // FileSystem jsonFs = jsonPath.getFileSystem(); + // FSDataInputStream jsonInputStream = jsonFs.open(jsonPath); + // BufferedReader jsonStreamReader = new BufferedReader(new InputStreamReader(jsonInputStream, "UTF-8")); + // StringBuilder responseStrBuilder = new StringBuilder(); + + // String inputStr; + // while ((inputStr = jsonStreamReader.readLine()) != null) + // responseStrBuilder.append(inputStr); + // JSONObject deployableConfiguration = new JSONObject(responseStrBuilder.toString()); + // Read the sql file String sqlName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".sql"; Path sqlPath = new Path("/tmp/" + jobName + "/" + sqlName); @@ -107,6 +122,37 @@ public static void main(String[] args) throws Exception { } } + public static void configureTableEnvironment(String currentEnv, JSONObject deployableConfiguration, TableEnvironment tableEnvironment) { + TableConfig tableConfig = tableEnvironment.getConfig(); + + if (!deployableConfiguration.has("environments")) { + // If there is no environment config, do nothing + return; + } + + // Extract the environment configuration + JSONObject environments = deployableConfiguration.getJSONObject("environments"); + if (!environments.has(currentEnv)) { + // If the current environment has no config defined, do nothing + return; + } + + JSONObject currentEnvironment = environments.getJSONObject(currentEnv); + if (!currentEnvironment.has("tableConfig")) { + // If the "tableConfig" is not set, do nothing + return; + } + + // Extract the tableConfig from the current environment + JSONObject tableConfigJson = currentEnvironment.getJSONObject("tableConfig"); + + // Iterate over the keys in the tableConfig and set them in the TableConfig object + for (String key : tableConfigJson.keySet()) { + String value = tableConfigJson.getString(key); + tableConfig.getConfiguration().setString(key, value); + } + } + public static void transferTo(InputStream input, OutputStream output) throws IOException { try { byte[] buffer = new byte[1024]; diff --git a/src/test/java/io/ecraft/SqlRunnerTest.java b/src/test/java/io/ecraft/SqlRunnerTest.java index 67009c8..7a46347 100644 --- a/src/test/java/io/ecraft/SqlRunnerTest.java +++ b/src/test/java/io/ecraft/SqlRunnerTest.java @@ -1,5 +1,11 @@ package io.ecraft; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.json.JSONObject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -7,6 +13,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Map; import java.util.HashMap; @@ -32,4 +40,40 @@ public void testTemplating() throws Exception { assertEquals(sql, "SELECT * FROM T;\n"); } + + @Test + public void testTableConfig() throws Exception { + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .inStreamingMode() + .build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + + String filePath = "src/test/java/io/ecraft/fixtures/deployableconfig1.json"; + JSONObject jsonConfig = readJsonFile(filePath); + + SqlRunner.configureTableEnvironment("dev", jsonConfig, tableEnv); + + // Correctly define the ConfigOption + ConfigOption miniBatchEnabled = ConfigOptions.key("table.exec.mini-batch.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Enable mini-batch execution."); + + assertEquals(tableEnv.getConfig().getConfiguration().get(miniBatchEnabled), true); + + // define config option for table.exec.source.idle-timeout duration + ConfigOption sourceIdleTimeout = ConfigOptions.key("table.exec.source.idle-timeout") + .stringType() + .defaultValue("0") + .withDescription("The time that a source will wait for new data before shutting down."); + + assertEquals(tableEnv.getConfig().getConfiguration().get(sourceIdleTimeout), "5 min"); + } + + public static JSONObject readJsonFile(String filePath) throws IOException { + String content = new String(Files.readAllBytes(Paths.get(filePath))); + return new JSONObject(content); + } + } diff --git a/src/test/java/io/ecraft/fixtures/deployableconfig1.json b/src/test/java/io/ecraft/fixtures/deployableconfig1.json new file mode 100644 index 0000000..1fab19b --- /dev/null +++ b/src/test/java/io/ecraft/fixtures/deployableconfig1.json @@ -0,0 +1,12 @@ +{ + "name": "test01", + "type": "sql", + "environments": { + "dev": { + "tableConfig": { + "table.exec.mini-batch.enabled": true, + "table.exec.source.idle-timeout": "5 min" + } + } + } +} From 2bce171584ce4f113cc2e10de4b233c9902f876b Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 14:02:59 +0200 Subject: [PATCH 02/12] Eureka, found packages from docs --- pom.xml | 68 +++++++++++++++++++-------------------------------------- 1 file changed, 23 insertions(+), 45 deletions(-) diff --git a/pom.xml b/pom.xml index d4e1812..58e6d60 100644 --- a/pom.xml +++ b/pom.xml @@ -43,12 +43,29 @@ ${flink.version} provided + org.apache.flink flink-table-api-java-bridge ${flink.version} provided + + + + org.apache.flink + flink-table-planner-loader + ${flink.version} + provided + + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + org.apache.flink @@ -139,56 +156,17 @@ ${junit.jupiter.version} test - - - org.apache.flink - flink-core - ${flink.version} - test - - - - org.apache.flink - flink-streaming-java - ${flink.version} - test - - - - org.apache.flink - flink-table-api-java - ${flink.version} - test - - - - org.apache.flink - flink-table-api-java-bridge - ${flink.version} - test - - - - org.apache.flink - flink-connector-files - ${flink.version} - test - - - - org.apache.flink - flink-table-planner_${scala.binary.version} - ${flink.version} - test - - org.apache.maven.plugins - maven-surefire-plugin - ${maven-surefire-plugin.version} + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + false + From f8b31cbdeb2208118643acb13592799f8b4b8ae7 Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 14:23:39 +0200 Subject: [PATCH 03/12] Pass environment as first arg to apply the right JSON config. --- src/main/java/io/ecraft/SqlRunner.java | 29 +++++++++++++++----------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/ecraft/SqlRunner.java b/src/main/java/io/ecraft/SqlRunner.java index 62d3c9b..dad7d98 100644 --- a/src/main/java/io/ecraft/SqlRunner.java +++ b/src/main/java/io/ecraft/SqlRunner.java @@ -68,8 +68,10 @@ public static void main(String[] args) throws Exception { LOG.debug(" - {}", t); } + String environment = args[0]; + Path remoteArchivePath = new Path(args[1]); + // Read the tar file from azure blob store to a local file - Path remoteArchivePath = new Path(args[0]); FileSystem remoteArchiveFs = remoteArchivePath.getFileSystem(); FSDataInputStream remoteArchiveStream = remoteArchiveFs.open(remoteArchivePath); // We name everything after the full name of the archive without extension (including hashes) @@ -92,19 +94,21 @@ public static void main(String[] args) throws Exception { InputStream zipInputStream = zipFile.getInputStream(entry); transferTo(zipInputStream, zipEntryOutputStream); } + zipFile.close(); // Read the json file - // String jsonName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".json"; - // Path jsonPath = new Path("/tmp/" + jobName + "/" + jsonName); - // FileSystem jsonFs = jsonPath.getFileSystem(); - // FSDataInputStream jsonInputStream = jsonFs.open(jsonPath); - // BufferedReader jsonStreamReader = new BufferedReader(new InputStreamReader(jsonInputStream, "UTF-8")); - // StringBuilder responseStrBuilder = new StringBuilder(); + String jsonName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".json"; + Path jsonPath = new Path("/tmp/" + jobName + "/" + jsonName); + FileSystem jsonFs = jsonPath.getFileSystem(); + FSDataInputStream jsonInputStream = jsonFs.open(jsonPath); + BufferedReader jsonStreamReader = new BufferedReader(new InputStreamReader(jsonInputStream, "UTF-8")); + StringBuilder responseStrBuilder = new StringBuilder(); - // String inputStr; - // while ((inputStr = jsonStreamReader.readLine()) != null) - // responseStrBuilder.append(inputStr); - // JSONObject deployableConfiguration = new JSONObject(responseStrBuilder.toString()); + String inputStr; + while ((inputStr = jsonStreamReader.readLine()) != null) + responseStrBuilder.append(inputStr); + JSONObject deployableConfiguration = new JSONObject(responseStrBuilder.toString()); + configureTableEnvironment(environment, deployableConfiguration, tableEnv); // Read the sql file String sqlName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".sql"; @@ -112,7 +116,8 @@ public static void main(String[] args) throws Exception { FileSystem sqlFs = sqlPath.getFileSystem(); FSDataInputStream sqlInputStream = sqlFs.open(sqlPath); InputStreamReader reader = new InputStreamReader(sqlInputStream); - String script = new BufferedReader(reader).lines().parallel().collect(Collectors.joining("\n")); + BufferedReader scriptReader = new BufferedReader(reader); + String script = scriptReader.lines().parallel().collect(Collectors.joining("\n")); List statements = parseStatements(script, SqlRunner.loadEnvironment()); for (String statement : statements) { From 949cea157eaa2bc0db7ce1b68debbf79ca0117ae Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 14:24:40 +0200 Subject: [PATCH 04/12] Next version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 58e6d60..8bd5677 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.ecraft flink-sql-runner - 0.0.0 + 0.0.1 Flink SQL Runner From 63bd8b6fe161f99790007b1bf3a118e2a22b63f7 Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 14:25:32 +0200 Subject: [PATCH 05/12] already built 0.0.1 with the wrong package version. Bump once. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8bd5677..48d8b7a 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.ecraft flink-sql-runner - 0.0.1 + 0.0.2 Flink SQL Runner From 4a5ddc8dd83a0b7344463d28ccc15fd00cd95329 Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 14:31:26 +0200 Subject: [PATCH 06/12] Force move if either wrong or correct version file --- .github/workflows/flink-sql-runner.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink-sql-runner.yaml b/.github/workflows/flink-sql-runner.yaml index 2f78b74..65eb37d 100644 --- a/.github/workflows/flink-sql-runner.yaml +++ b/.github/workflows/flink-sql-runner.yaml @@ -30,7 +30,7 @@ jobs: working-directory: . - name: Rename version run: | - mv target/flink-sql-runner-*.jar target/flink-sql-runner-${{ steps.tagName.outputs.tag }}.jar + mv -f target/flink-sql-runner-*.jar target/flink-sql-runner-${{ steps.tagName.outputs.tag }}.jar - name: Release uses: softprops/action-gh-release@v2 with: From e1aef05541631646bc1b1b31e2ba089adbfda9f1 Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 14:35:55 +0200 Subject: [PATCH 07/12] Use 0.0.0 in pom since we want this to work all the time. --- .github/workflows/flink-sql-runner.yaml | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flink-sql-runner.yaml b/.github/workflows/flink-sql-runner.yaml index 65eb37d..2f78b74 100644 --- a/.github/workflows/flink-sql-runner.yaml +++ b/.github/workflows/flink-sql-runner.yaml @@ -30,7 +30,7 @@ jobs: working-directory: . - name: Rename version run: | - mv -f target/flink-sql-runner-*.jar target/flink-sql-runner-${{ steps.tagName.outputs.tag }}.jar + mv target/flink-sql-runner-*.jar target/flink-sql-runner-${{ steps.tagName.outputs.tag }}.jar - name: Release uses: softprops/action-gh-release@v2 with: diff --git a/pom.xml b/pom.xml index 48d8b7a..58e6d60 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.ecraft flink-sql-runner - 0.0.2 + 0.0.0 Flink SQL Runner From 027f6a8720fc6e65c67c0f4177e2887a83583184 Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 14:49:27 +0200 Subject: [PATCH 08/12] Exactly one argument is allowed in flinksessionjob. Use parametertool to get a named parameter instead. --- src/main/java/io/ecraft/SqlRunner.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/ecraft/SqlRunner.java b/src/main/java/io/ecraft/SqlRunner.java index dad7d98..54f43c7 100644 --- a/src/main/java/io/ecraft/SqlRunner.java +++ b/src/main/java/io/ecraft/SqlRunner.java @@ -7,6 +7,7 @@ import java.io.*; import java.nio.file.*; import org.apache.commons.io.FilenameUtils; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; @@ -68,8 +69,11 @@ public static void main(String[] args) throws Exception { LOG.debug(" - {}", t); } - String environment = args[0]; - Path remoteArchivePath = new Path(args[1]); + ParameterTool parameters = ParameterTool.fromArgs(args); + String environment = parameters.getRequired("environment"); + + // Only one argument is allowed + Path remoteArchivePath = new Path(args[0]); // Read the tar file from azure blob store to a local file FileSystem remoteArchiveFs = remoteArchivePath.getFileSystem(); From 4b74789aa097465968e279b323fc1327bf13f259 Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 14:56:09 +0200 Subject: [PATCH 09/12] No single arg useful here, all from params then --- src/main/java/io/ecraft/SqlRunner.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/ecraft/SqlRunner.java b/src/main/java/io/ecraft/SqlRunner.java index 54f43c7..951d94b 100644 --- a/src/main/java/io/ecraft/SqlRunner.java +++ b/src/main/java/io/ecraft/SqlRunner.java @@ -71,9 +71,9 @@ public static void main(String[] args) throws Exception { ParameterTool parameters = ParameterTool.fromArgs(args); String environment = parameters.getRequired("environment"); - - // Only one argument is allowed - Path remoteArchivePath = new Path(args[0]); + String archiveUri = parameters.getRequired("archiveUri"); + + Path remoteArchivePath = new Path(archiveUri); // Read the tar file from azure blob store to a local file FileSystem remoteArchiveFs = remoteArchivePath.getFileSystem(); From 463e20160a4beafb82959d4160215043c71b9eaf Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 15:20:14 +0200 Subject: [PATCH 10/12] Don't limit arg input, and debug log the params. --- src/main/java/io/ecraft/SqlRunner.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/ecraft/SqlRunner.java b/src/main/java/io/ecraft/SqlRunner.java index 951d94b..26bf0c4 100644 --- a/src/main/java/io/ecraft/SqlRunner.java +++ b/src/main/java/io/ecraft/SqlRunner.java @@ -39,10 +39,6 @@ public class SqlRunner { public static void main(String[] args) throws Exception { - if (args.length != 1) { - throw new Exception("Exactly one argument is expected."); - } - EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() @@ -70,9 +66,15 @@ public static void main(String[] args) throws Exception { } ParameterTool parameters = ParameterTool.fromArgs(args); - String environment = parameters.getRequired("environment"); + + // Debug log the keys and values of the parameters + for (String key : parameters.toMap().keySet()) { + LOG.debug("Parameter: {} = {}", key, parameters.get(key)); + } + String archiveUri = parameters.getRequired("archiveUri"); - + String environment = parameters.getRequired("environment"); + Path remoteArchivePath = new Path(archiveUri); // Read the tar file from azure blob store to a local file From 1f52073cca9861142485351f8f6ae2db44a454dd Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 23 Dec 2024 15:28:25 +0200 Subject: [PATCH 11/12] Debug log the table config set --- src/main/java/io/ecraft/SqlRunner.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/io/ecraft/SqlRunner.java b/src/main/java/io/ecraft/SqlRunner.java index 26bf0c4..9442403 100644 --- a/src/main/java/io/ecraft/SqlRunner.java +++ b/src/main/java/io/ecraft/SqlRunner.java @@ -161,6 +161,9 @@ public static void configureTableEnvironment(String currentEnv, JSONObject deplo for (String key : tableConfigJson.keySet()) { String value = tableConfigJson.getString(key); tableConfig.getConfiguration().setString(key, value); + + // Log the value that was set + LOG.debug("Setting table config {} to {}", key, value); } } From 3f6db85253cd788a16780531b0d4f3643bf0d975 Mon Sep 17 00:00:00 2001 From: jonasslotte Date: Mon, 30 Dec 2024 11:18:58 +0200 Subject: [PATCH 12/12] Read execution mode from configuration. Make table config function have matching flow control. --- src/main/java/io/ecraft/SqlRunner.java | 114 +++++++++--------- src/test/java/io/ecraft/SqlRunnerTest.java | 12 ++ .../io/ecraft/fixtures/deployableconfig1.json | 1 + 3 files changed, 73 insertions(+), 54 deletions(-) diff --git a/src/main/java/io/ecraft/SqlRunner.java b/src/main/java/io/ecraft/SqlRunner.java index 9442403..aadb6e4 100644 --- a/src/main/java/io/ecraft/SqlRunner.java +++ b/src/main/java/io/ecraft/SqlRunner.java @@ -12,6 +12,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.Environment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.EnvironmentSettings; @@ -38,33 +39,6 @@ public class SqlRunner { private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))"; public static void main(String[] args) throws Exception { - - EnvironmentSettings settings = EnvironmentSettings - .newInstance() - .inStreamingMode() - .build(); - TableEnvironment tableEnv = TableEnvironment.create(settings); - - String name = "hive"; - String defaultDatabase = "default"; - String hiveConfDir = "/conf/hive-conf"; - - HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); - tableEnv.registerCatalog(name, hive); - - // set the HiveCatalog as the current catalog of the session - tableEnv.useCatalog(name); - - tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); - - LOG.debug("Current catalog: {}", tableEnv.getCurrentCatalog()); - LOG.debug("Current database: {}", tableEnv.getCurrentDatabase()); - LOG.debug("Available tables:"); - - for (String t: tableEnv.listTables()) { - LOG.debug(" - {}", t); - } - ParameterTool parameters = ParameterTool.fromArgs(args); // Debug log the keys and values of the parameters @@ -75,6 +49,14 @@ public static void main(String[] args) throws Exception { String archiveUri = parameters.getRequired("archiveUri"); String environment = parameters.getRequired("environment"); + + + String name = "hive"; + String defaultDatabase = "default"; + String hiveConfDir = "/conf/hive-conf"; + + HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); + Path remoteArchivePath = new Path(archiveUri); // Read the tar file from azure blob store to a local file @@ -114,6 +96,23 @@ public static void main(String[] args) throws Exception { while ((inputStr = jsonStreamReader.readLine()) != null) responseStrBuilder.append(inputStr); JSONObject deployableConfiguration = new JSONObject(responseStrBuilder.toString()); + + EnvironmentSettings settings = configureEnvironmentSettings(environment, deployableConfiguration, EnvironmentSettings.newInstance()).build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + tableEnv.registerCatalog(name, hive); + + // set the HiveCatalog as the current catalog of the session + tableEnv.useCatalog(name); + + tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + + LOG.debug("Current catalog: {}", tableEnv.getCurrentCatalog()); + LOG.debug("Current database: {}", tableEnv.getCurrentDatabase()); + LOG.debug("Available tables:"); + + for (String t: tableEnv.listTables()) { + LOG.debug(" - {}", t); + } configureTableEnvironment(environment, deployableConfiguration, tableEnv); // Read the sql file @@ -133,37 +132,44 @@ public static void main(String[] args) throws Exception { } } - public static void configureTableEnvironment(String currentEnv, JSONObject deployableConfiguration, TableEnvironment tableEnvironment) { - TableConfig tableConfig = tableEnvironment.getConfig(); - - if (!deployableConfiguration.has("environments")) { - // If there is no environment config, do nothing - return; - } - - // Extract the environment configuration - JSONObject environments = deployableConfiguration.getJSONObject("environments"); - if (!environments.has(currentEnv)) { - // If the current environment has no config defined, do nothing - return; - } - - JSONObject currentEnvironment = environments.getJSONObject(currentEnv); - if (!currentEnvironment.has("tableConfig")) { - // If the "tableConfig" is not set, do nothing - return; + public static EnvironmentSettings.Builder configureEnvironmentSettings(String currentEnv, JSONObject deployableConfiguration, EnvironmentSettings.Builder builder) { + if (deployableConfiguration.has("environments")) { + JSONObject environments = deployableConfiguration.getJSONObject("environments"); + if (environments.has(currentEnv)) { + JSONObject currentEnvironment = environments.getJSONObject(currentEnv); + if (currentEnvironment.has("mode")) { + String mode = currentEnvironment.getString("mode"); + if (mode.equals("batch")) { + builder.inBatchMode(); + } else if (mode.equals("streaming")) { + builder.inStreamingMode(); + } else { + throw new RuntimeException("Invalid deployable configuration: '"+ mode + "' is not a valid mode"); + } + } + } } - // Extract the tableConfig from the current environment - JSONObject tableConfigJson = currentEnvironment.getJSONObject("tableConfig"); + return builder; + } - // Iterate over the keys in the tableConfig and set them in the TableConfig object - for (String key : tableConfigJson.keySet()) { - String value = tableConfigJson.getString(key); - tableConfig.getConfiguration().setString(key, value); + public static void configureTableEnvironment(String currentEnv, JSONObject deployableConfiguration, TableEnvironment tableEnvironment) { + TableConfig tableConfig = tableEnvironment.getConfig(); - // Log the value that was set - LOG.debug("Setting table config {} to {}", key, value); + if (deployableConfiguration.has("environments")) { + JSONObject environments = deployableConfiguration.getJSONObject("environments"); + if (environments.has(currentEnv)) { + JSONObject currentEnvironment = environments.getJSONObject(currentEnv); + if (currentEnvironment.has("tableConfig")) { + JSONObject tableConfigJson = currentEnvironment.getJSONObject("tableConfig"); + for (String key : tableConfigJson.keySet()) { + String value = tableConfigJson.getString(key); + tableConfig.getConfiguration().setString(key, value); + + LOG.debug("Setting table config {} to {}", key, value); + } + } + } } } diff --git a/src/test/java/io/ecraft/SqlRunnerTest.java b/src/test/java/io/ecraft/SqlRunnerTest.java index 7a46347..989ad2d 100644 --- a/src/test/java/io/ecraft/SqlRunnerTest.java +++ b/src/test/java/io/ecraft/SqlRunnerTest.java @@ -71,6 +71,18 @@ public void testTableConfig() throws Exception { assertEquals(tableEnv.getConfig().getConfiguration().get(sourceIdleTimeout), "5 min"); } + @Test + public void testEnvironmentConfig() throws Exception { + EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance(); + + String filePath = "src/test/java/io/ecraft/fixtures/deployableconfig1.json"; + JSONObject jsonConfig = readJsonFile(filePath); + + EnvironmentSettings settings = SqlRunner.configureEnvironmentSettings("dev", jsonConfig, builder).build(); + + assertEquals(settings.isStreamingMode(), true); + } + public static JSONObject readJsonFile(String filePath) throws IOException { String content = new String(Files.readAllBytes(Paths.get(filePath))); return new JSONObject(content); diff --git a/src/test/java/io/ecraft/fixtures/deployableconfig1.json b/src/test/java/io/ecraft/fixtures/deployableconfig1.json index 1fab19b..596c5b3 100644 --- a/src/test/java/io/ecraft/fixtures/deployableconfig1.json +++ b/src/test/java/io/ecraft/fixtures/deployableconfig1.json @@ -3,6 +3,7 @@ "type": "sql", "environments": { "dev": { + "mode": "streaming", "tableConfig": { "table.exec.mini-batch.enabled": true, "table.exec.source.idle-timeout": "5 min"