diff --git a/.github/workflows/flink-sql-runner.yaml b/.github/workflows/flink-sql-runner.yaml index 7329a42..2f78b74 100644 --- a/.github/workflows/flink-sql-runner.yaml +++ b/.github/workflows/flink-sql-runner.yaml @@ -20,6 +20,10 @@ jobs: with: distribution: temurin java-version-file: .tool-versions + - name: Run tests with Maven + run: | + mvn test + working-directory: . - name: Build with Maven run: | mvn clean package diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..1133129 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.configuration.updateBuildConfiguration": "automatic" +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 01afb8d..58e6d60 100644 --- a/pom.xml +++ b/pom.xml @@ -10,6 +10,7 @@ 1.18.1 1.18 + 2.12 3.1.3 UTF-8 1.8 @@ -42,12 +43,29 @@ ${flink.version} provided + org.apache.flink flink-table-api-java-bridge ${flink.version} provided + + + + org.apache.flink + flink-table-planner-loader + ${flink.version} + provided + + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + org.apache.flink @@ -125,6 +143,11 @@ 2.3 + + org.json + json + 20210307 + @@ -138,9 +161,12 @@ - org.apache.maven.plugins - maven-surefire-plugin - ${maven-surefire-plugin.version} + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + false + diff --git a/src/main/java/io/ecraft/SqlRunner.java b/src/main/java/io/ecraft/SqlRunner.java index 7c6a318..aadb6e4 100644 --- a/src/main/java/io/ecraft/SqlRunner.java +++ b/src/main/java/io/ecraft/SqlRunner.java @@ -7,14 +7,17 @@ import java.io.*; import java.nio.file.*; import org.apache.commons.io.FilenameUtils; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.Environment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogStore; @@ -22,6 +25,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.velocity.VelocityContext; import org.apache.velocity.app.Velocity; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,39 +39,27 @@ public class SqlRunner { private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))"; public static void main(String[] args) throws Exception { + ParameterTool parameters = ParameterTool.fromArgs(args); - if (args.length != 1) { - throw new Exception("Exactly one argument is expected."); + // Debug log the keys and values of the parameters + for (String key : parameters.toMap().keySet()) { + LOG.debug("Parameter: {} = {}", key, parameters.get(key)); } - EnvironmentSettings settings = EnvironmentSettings - .newInstance() - .inStreamingMode() - .build(); - TableEnvironment tableEnv = TableEnvironment.create(settings); + String archiveUri = parameters.getRequired("archiveUri"); + String environment = parameters.getRequired("environment"); + + String name = "hive"; String defaultDatabase = "default"; String hiveConfDir = "/conf/hive-conf"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); - tableEnv.registerCatalog(name, hive); - // set the HiveCatalog as the current catalog of the session - tableEnv.useCatalog(name); - - tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); - - LOG.debug("Current catalog: {}", tableEnv.getCurrentCatalog()); - LOG.debug("Current database: {}", tableEnv.getCurrentDatabase()); - LOG.debug("Available tables:"); - - for (String t: tableEnv.listTables()) { - LOG.debug(" - {}", t); - } + Path remoteArchivePath = new Path(archiveUri); // Read the tar file from azure blob store to a local file - Path remoteArchivePath = new Path(args[0]); FileSystem remoteArchiveFs = remoteArchivePath.getFileSystem(); FSDataInputStream remoteArchiveStream = remoteArchiveFs.open(remoteArchivePath); // We name everything after the full name of the archive without extension (including hashes) @@ -90,6 +82,38 @@ public static void main(String[] args) throws Exception { InputStream zipInputStream = zipFile.getInputStream(entry); transferTo(zipInputStream, zipEntryOutputStream); } + zipFile.close(); + + // Read the json file + String jsonName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".json"; + Path jsonPath = new Path("/tmp/" + jobName + "/" + jsonName); + FileSystem jsonFs = jsonPath.getFileSystem(); + FSDataInputStream jsonInputStream = jsonFs.open(jsonPath); + BufferedReader jsonStreamReader = new BufferedReader(new InputStreamReader(jsonInputStream, "UTF-8")); + StringBuilder responseStrBuilder = new StringBuilder(); + + String inputStr; + while ((inputStr = jsonStreamReader.readLine()) != null) + responseStrBuilder.append(inputStr); + JSONObject deployableConfiguration = new JSONObject(responseStrBuilder.toString()); + + EnvironmentSettings settings = configureEnvironmentSettings(environment, deployableConfiguration, EnvironmentSettings.newInstance()).build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + tableEnv.registerCatalog(name, hive); + + // set the HiveCatalog as the current catalog of the session + tableEnv.useCatalog(name); + + tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + + LOG.debug("Current catalog: {}", tableEnv.getCurrentCatalog()); + LOG.debug("Current database: {}", tableEnv.getCurrentDatabase()); + LOG.debug("Available tables:"); + + for (String t: tableEnv.listTables()) { + LOG.debug(" - {}", t); + } + configureTableEnvironment(environment, deployableConfiguration, tableEnv); // Read the sql file String sqlName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".sql"; @@ -97,7 +121,8 @@ public static void main(String[] args) throws Exception { FileSystem sqlFs = sqlPath.getFileSystem(); FSDataInputStream sqlInputStream = sqlFs.open(sqlPath); InputStreamReader reader = new InputStreamReader(sqlInputStream); - String script = new BufferedReader(reader).lines().parallel().collect(Collectors.joining("\n")); + BufferedReader scriptReader = new BufferedReader(reader); + String script = scriptReader.lines().parallel().collect(Collectors.joining("\n")); List statements = parseStatements(script, SqlRunner.loadEnvironment()); for (String statement : statements) { @@ -107,6 +132,47 @@ public static void main(String[] args) throws Exception { } } + public static EnvironmentSettings.Builder configureEnvironmentSettings(String currentEnv, JSONObject deployableConfiguration, EnvironmentSettings.Builder builder) { + if (deployableConfiguration.has("environments")) { + JSONObject environments = deployableConfiguration.getJSONObject("environments"); + if (environments.has(currentEnv)) { + JSONObject currentEnvironment = environments.getJSONObject(currentEnv); + if (currentEnvironment.has("mode")) { + String mode = currentEnvironment.getString("mode"); + if (mode.equals("batch")) { + builder.inBatchMode(); + } else if (mode.equals("streaming")) { + builder.inStreamingMode(); + } else { + throw new RuntimeException("Invalid deployable configuration: '"+ mode + "' is not a valid mode"); + } + } + } + } + + return builder; + } + + public static void configureTableEnvironment(String currentEnv, JSONObject deployableConfiguration, TableEnvironment tableEnvironment) { + TableConfig tableConfig = tableEnvironment.getConfig(); + + if (deployableConfiguration.has("environments")) { + JSONObject environments = deployableConfiguration.getJSONObject("environments"); + if (environments.has(currentEnv)) { + JSONObject currentEnvironment = environments.getJSONObject(currentEnv); + if (currentEnvironment.has("tableConfig")) { + JSONObject tableConfigJson = currentEnvironment.getJSONObject("tableConfig"); + for (String key : tableConfigJson.keySet()) { + String value = tableConfigJson.getString(key); + tableConfig.getConfiguration().setString(key, value); + + LOG.debug("Setting table config {} to {}", key, value); + } + } + } + } + } + public static void transferTo(InputStream input, OutputStream output) throws IOException { try { byte[] buffer = new byte[1024]; diff --git a/src/test/java/io/ecraft/SqlRunnerTest.java b/src/test/java/io/ecraft/SqlRunnerTest.java index 67009c8..989ad2d 100644 --- a/src/test/java/io/ecraft/SqlRunnerTest.java +++ b/src/test/java/io/ecraft/SqlRunnerTest.java @@ -1,5 +1,11 @@ package io.ecraft; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.json.JSONObject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -7,6 +13,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Map; import java.util.HashMap; @@ -32,4 +40,52 @@ public void testTemplating() throws Exception { assertEquals(sql, "SELECT * FROM T;\n"); } + + @Test + public void testTableConfig() throws Exception { + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .inStreamingMode() + .build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + + String filePath = "src/test/java/io/ecraft/fixtures/deployableconfig1.json"; + JSONObject jsonConfig = readJsonFile(filePath); + + SqlRunner.configureTableEnvironment("dev", jsonConfig, tableEnv); + + // Correctly define the ConfigOption + ConfigOption miniBatchEnabled = ConfigOptions.key("table.exec.mini-batch.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Enable mini-batch execution."); + + assertEquals(tableEnv.getConfig().getConfiguration().get(miniBatchEnabled), true); + + // define config option for table.exec.source.idle-timeout duration + ConfigOption sourceIdleTimeout = ConfigOptions.key("table.exec.source.idle-timeout") + .stringType() + .defaultValue("0") + .withDescription("The time that a source will wait for new data before shutting down."); + + assertEquals(tableEnv.getConfig().getConfiguration().get(sourceIdleTimeout), "5 min"); + } + + @Test + public void testEnvironmentConfig() throws Exception { + EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance(); + + String filePath = "src/test/java/io/ecraft/fixtures/deployableconfig1.json"; + JSONObject jsonConfig = readJsonFile(filePath); + + EnvironmentSettings settings = SqlRunner.configureEnvironmentSettings("dev", jsonConfig, builder).build(); + + assertEquals(settings.isStreamingMode(), true); + } + + public static JSONObject readJsonFile(String filePath) throws IOException { + String content = new String(Files.readAllBytes(Paths.get(filePath))); + return new JSONObject(content); + } + } diff --git a/src/test/java/io/ecraft/fixtures/deployableconfig1.json b/src/test/java/io/ecraft/fixtures/deployableconfig1.json new file mode 100644 index 0000000..596c5b3 --- /dev/null +++ b/src/test/java/io/ecraft/fixtures/deployableconfig1.json @@ -0,0 +1,13 @@ +{ + "name": "test01", + "type": "sql", + "environments": { + "dev": { + "mode": "streaming", + "tableConfig": { + "table.exec.mini-batch.enabled": true, + "table.exec.source.idle-timeout": "5 min" + } + } + } +}