diff --git a/CODEOWNERS b/CODEOWNERS index d984c513..6350370d 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -11,6 +11,7 @@ conversational-agent-app @vivian-xie-db @yuanchaoma-db database-diagram-builder @alexott downstreams @nfx @alexott feature-registry-app @yang-chengg @mparkhe @mingyangge-db @stephanielu5 +filepush @chi-yang-db go-libs @nfx @alexott ip_access_list_analyzer @alexott ka-chat-bot @taiga-db @@ -19,4 +20,4 @@ runtime-packages @nfx @alexott sql_migration_copilot @robertwhiffin tacklebox @Jonathan-Choi uc-catalog-cloning @esiol-db @vasco-lopes -.github @nfx @alexott @gueniai \ No newline at end of file +.github @nfx @alexott @gueniai diff --git a/filepush/.gitignore b/filepush/.gitignore new file mode 100644 index 00000000..722d5e71 --- /dev/null +++ b/filepush/.gitignore @@ -0,0 +1 @@ +.vscode diff --git a/filepush/README.md b/filepush/README.md new file mode 100644 index 00000000..64750f30 --- /dev/null +++ b/filepush/README.md @@ -0,0 +1,179 @@ +--- +title: "Managed File Push" +language: python +author: "Chi Yang" +date: 2025-08-07 + +tags: +- ingestion +- file +- nocode +--- + +# Managed File Push + +A lightweight, no‑code file ingestion workflow. Configure a set of tables, get a volume path for each, and drop files into those paths—your data lands in Unity Catalog tables via Auto Loader. + +## Table of Contents +- [Quick Start](#quick-start) + - [Step 1. Configure tables](#step-1-configure-tables) + - [Step 2. Deploy & set up](#step-2-deploy--set-up) + - [Step 3. Retrieve endpoint & push files](#step-3-retrieve-endpoint--push-files) +- [Debug Table Issues](#debug-table-issues) + - [Step 1. Configure tables to debug](#step-1-configure-tables-to-debug) + - [Step 2. Deploy & set up in dev mode](#step-2-deploy--set-up-in-dev-mode) + - [Step 3. Retrieve endpoint & push files to debug](#step-3-retrieve-endpoint--push-files-to-debug) + - [Step 4. Debug table configs](#step-4-debug-table-configs) + - [Step 5. Fix the table configs in production](#step-5-fix-the-table-configs-in-production) + +--- + +## Quick Start + +### Step 1. Configure tables +Define the catalog and a **new** schema name where the tables will land in `./dab/databricks.yml`: + +```yaml +variables: + catalog_name: + description: The existing catalog where the NEW schema will be created. + default: main + schema_name: + description: The name of the NEW schema where the tables will be created. + default: filepushschema +``` + +Edit table configs in `./dab/src/configs/tables.json`. Only `name` and `format` are required. + +For supported `format_options`, see the [Auto Loader options](https://docs.databricks.com/aws/en/ingestion/cloud-object-storage/auto-loader/options). Not all options are supported here. If unsure, specify only `name` and `format`, or follow [Debug Table Issues](#debug-table-issues) to discover the correct options. + +```json +[ + { + "name": "table1", + "format": "csv", + "format_options": { "escape": "\"" }, + "schema_hints": "id int, name string" + }, + { + "name": "table2", + "format": "json" + } +] +``` + +> **Tip:** Keep `schema_hints` minimal; Auto Loader can evolve the schema as new columns appear. + +### Step 2. Deploy & set up + +```bash +cd dab +databricks bundle deploy +databricks bundle run configuration_job +``` + +Wait for the configuration job to finish before moving on. + +### Step 3. Retrieve endpoint & push files +Fetch the volume path for uploading files to a specific table (example: `table1`): + +```bash +databricks tables get main.filepushschema.table1 --output json \ + | jq -r '.properties["filepush.table_volume_path_data"]' +``` + +Example output: + +```text +/Volumes/main/filepushschema/main_filepushschema_filepush_volume/data/table1 +``` + +Upload files to the path above using any of the [Volumes file APIs](https://docs.databricks.com/aws/en/volumes/volume-files#methods-for-managing-files-in-volumes). + +**REST API example**: + +```bash +# prerequisites: export DATABRICKS_HOST and DATABRICKS_TOKEN (PAT token) +curl -X PUT "$DATABRICKS_HOST/api/2.0/fs/files/Volumes/main/filepushschema/main_filepushschema_filepush_volume/data/table1/datafile1.csv" \ + -H "Authorization: Bearer $DATABRICKS_TOKEN" \ + -H "Content-Type: application/octet-stream" \ + --data-binary @"/local/file/path/datafile1.csv" +``` + +**Databricks CLI example** (destination uses the `dbfs:` scheme): + +```bash +databricks fs cp /local/file/path/datafile1.csv \ + dbfs:/Volumes/main/filepushschema/main_filepushschema_filepush_volume/data/table1 +``` + +Within about a minute, the data should appear in the table `main.filepushschema.table1`. + +--- + +## Debug Table Issues +If data isn’t parsed as expected, use **dev mode** to iterate on table options safely. + +### Step 1. Configure tables to debug +Configure tables as in [Step 1 of Quick Start](#step-1-configure-tables). + +### Step 2. Deploy & set up in **dev mode** + +```bash +cd dab +databricks bundle deploy -t dev +databricks bundle run configuration_job -t dev +``` + +Wait for the configuration job to finish. Example output: + +```text +2025-09-23 22:03:04,938 [INFO] initialization - ========== +catalog_name: main +schema_name: dev_chi_yang_filepushschema +volume_path_root: /Volumes/main/dev_chi_yang_filepushschema/main_filepushschema_filepush_volume +volume_path_data: /Volumes/main/dev_chi_yang_filepushschema/main_filepushschema_filepush_volume/data +volume_path_archive: /Volumes/main/dev_chi_yang_filepushschema/main_filepushschema_filepush_volume/archive +========== +``` + +> **Note:** In **dev mode**, the schema name is **prefixed**. Use the printed schema name for the remaining steps. + +### Step 3. Retrieve endpoint & push files to debug +Get the dev volume path (note the prefixed schema): + +```bash +databricks tables get main.dev_chi_yang_filepushschema.table1 --output json \ + | jq -r '.properties["filepush.table_volume_path_data"]' +``` + +Example output: + +```text +/Volumes/main/dev_chi_yang_filepushschema/main_filepushschema_filepush_volume/data/table1 +``` + +Then follow the upload instructions from [Quick Start → Step 3](#step-3-retrieve-endpoint--push-files) to send test files. + +### Step 4. Debug table configs +Open the pipeline in the workspace: + +```bash +databricks bundle open refresh_pipeline -t dev +``` + +Click **Edit pipeline** to launch the development UI. Open the `debug_table_config` notebook and follow its guidance to refine the table options. When satisfied, copy the final config back to `./dab/src/configs/tables.json`. + +### Step 5. Fix the table configs in production +Redeploy the updated config and run a full refresh to correct existing data for an affected table: + +```bash +cd dab +databricks bundle deploy +databricks bundle run refresh_pipeline --full-refresh table1 +``` + +--- + +**That’s it!** You now have a managed file‑push workflow with debuggable table configs and repeatable deployments. + diff --git a/filepush/dab/databricks.yml b/filepush/dab/databricks.yml new file mode 100644 index 00000000..c9c1729b --- /dev/null +++ b/filepush/dab/databricks.yml @@ -0,0 +1,36 @@ +# databricks.yml +# This is the configuration for the file push DAB dab. + +bundle: + name: dab + +include: + - resources/*.yml + +targets: + # The deployment targets. See https://docs.databricks.com/en/dev-tools/bundles/deployment-modes.html + dev: + mode: development + workspace: + host: https://e2-dogfood.staging.cloud.databricks.com + + prod: + mode: production + default: true + workspace: + host: https://e2-dogfood.staging.cloud.databricks.com + root_path: /Workspace/Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} + permissions: + - user_name: ${workspace.current_user.userName} + level: CAN_MANAGE + +variables: + catalog_name: + description: The existing catalog where the NEW schema will be created. + default: chi_catalog + schema_name: + description: The name of the NEW schema where the tables will be created. + default: filepushschema + resource_name_prefix: + description: The prefix for the resource names. + default: ${var.catalog_name}_${var.schema_name}_ diff --git a/filepush/dab/resources/job.yml b/filepush/dab/resources/job.yml new file mode 100644 index 00000000..f8fdaac9 --- /dev/null +++ b/filepush/dab/resources/job.yml @@ -0,0 +1,46 @@ +# The main job for schema dab +# This job will trigger in the schema pipeline + +resources: + jobs: + filetrigger_job: + name: ${var.resource_name_prefix}filetrigger_job + tasks: + - task_key: pipeline_refresh + pipeline_task: + pipeline_id: ${resources.pipelines.refresh_pipeline.id} + trigger: + file_arrival: + url: ${resources.volumes.filepush_volume.volume_path}/data/ + configuration_job: + name: ${var.resource_name_prefix}configuration_job + tasks: + - task_key: initialization + spark_python_task: + python_file: ../src/utils/initialization.py + parameters: + - "--catalog_name" + - "{{job.parameters.catalog_name}}" + - "--schema_name" + - "{{job.parameters.schema_name}}" + - "--volume_path_root" + - "{{job.parameters.volume_path_root}}" + - "--logging_level" + - "${bundle.target}" + environment_key: serverless + - task_key: trigger_refresh + run_job_task: + job_id: ${resources.jobs.filetrigger_job.id} + depends_on: + - task_key: initialization + environments: + - environment_key: serverless + spec: + client: "3" + parameters: + - name: catalog_name + default: ${var.catalog_name} + - name: schema_name + default: ${resources.schemas.main_schema.name} + - name: volume_path_root + default: ${resources.volumes.filepush_volume.volume_path} diff --git a/filepush/dab/resources/pipeline.yml b/filepush/dab/resources/pipeline.yml new file mode 100644 index 00000000..e30c4ae6 --- /dev/null +++ b/filepush/dab/resources/pipeline.yml @@ -0,0 +1,15 @@ +# The table refresh pipeline for schema dab + +resources: + pipelines: + refresh_pipeline: + name: ${var.resource_name_prefix}refresh_pipeline + catalog: ${var.catalog_name} + schema: ${resources.schemas.main_schema.name} + serverless: true + libraries: + - file: + path: ../src/ingestion.py + root_path: ../src + configuration: + filepush.volume_path_root: ${resources.volumes.filepush_volume.volume_path} diff --git a/filepush/dab/resources/schema.yml b/filepush/dab/resources/schema.yml new file mode 100644 index 00000000..72500a02 --- /dev/null +++ b/filepush/dab/resources/schema.yml @@ -0,0 +1,7 @@ +# The schema dab + +resources: + schemas: + main_schema: + name: ${var.schema_name} + catalog_name: ${var.catalog_name} diff --git a/filepush/dab/resources/volume.yml b/filepush/dab/resources/volume.yml new file mode 100644 index 00000000..ac8929c8 --- /dev/null +++ b/filepush/dab/resources/volume.yml @@ -0,0 +1,8 @@ +# The file staging volume for schema dab + +resources: + volumes: + filepush_volume: + name: ${var.resource_name_prefix}filepush_volume + catalog_name: ${var.catalog_name} + schema_name: ${var.schema_name} diff --git a/filepush/dab/src/configs/tables.json b/filepush/dab/src/configs/tables.json new file mode 100644 index 00000000..3926a1bc --- /dev/null +++ b/filepush/dab/src/configs/tables.json @@ -0,0 +1,10 @@ +[ + { + "name": "example_table", + "format": "csv", + "format_options": { + "escape": "\"" + }, + "schema_hints": "id int, name string" + } +] diff --git a/filepush/dab/src/debug_table_config.py b/filepush/dab/src/debug_table_config.py new file mode 100644 index 00000000..0d697fcd --- /dev/null +++ b/filepush/dab/src/debug_table_config.py @@ -0,0 +1,63 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC ## Paste the table config JSON you would like to debug from `./configs/tables.json` and assign to variable `table_config` +# MAGIC For example, +# MAGIC ``` +# MAGIC table_config = r''' +# MAGIC { +# MAGIC "name": "all_employees", +# MAGIC "format": "csv", +# MAGIC "format_options": { +# MAGIC "escape": "\"", +# MAGIC "multiLine": "false" +# MAGIC } +# MAGIC "schema_hints": "id int, name string" +# MAGIC } +# MAGIC ''' +# MAGIC ``` +# MAGIC Only `name` and `format` are required for a table. + +# COMMAND ---------- + +table_config = r''' + { + "name": "employees", + "format": "csv", + "format_options": { + "escape": "\"" + }, + "schema_hints": "id int, name string" + } +''' + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Click `Run all` and inspect the parsed result. Iterate on the config until the result looks good + +# COMMAND ---------- + +import json +import tempfile +from utils import tablemanager +from utils import envmanager + +if not envmanager.has_default_storage(): + print("WARNING: Current catalog is not using default storage, some file push feature may not be available") + +# Load table config +table_config_json = json.loads(table_config) +tablemanager.validate_config(table_config_json) +table_name = table_config_json["name"] +table_volume_path_data = tablemanager.get_table_volume_path(table_name) + +assert tablemanager.has_data_file(table_name), f"No data file found in {table_volume_path_data}. Please upload at least 1 file to {table_volume_path_data}" + +# Put schema location in temp directory +with tempfile.TemporaryDirectory() as tmpdir: + display(tablemanager.get_df_with_config(spark, table_config_json, tmpdir)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Copy and paste the modified config back to the `./configs/tables.json` in the DAB folder \ No newline at end of file diff --git a/filepush/dab/src/ingestion.py b/filepush/dab/src/ingestion.py new file mode 100644 index 00000000..1046a140 --- /dev/null +++ b/filepush/dab/src/ingestion.py @@ -0,0 +1,34 @@ +import dlt +from utils import tablemanager +from utils import formatmanager + +def _make_append_flow(table_name, table_config, table_volume_path): + def _body(): + # use _rescued_data as placeholder when no data file is present + if not tablemanager.has_data_file(table_name): + return tablemanager.get_placeholder_df_with_config(spark, table_config) + else: + return tablemanager.get_df_with_config(spark, table_config) + + # give the function a unique name (nice for logs / debug) + _body.__name__ = f"append_{table_name.lower()}" + + # apply the decorator programmatically + dlt.append_flow(target=table_name, name=table_name)(_body) + +table_configs = tablemanager.get_configs() + +for cfg in table_configs: + tablemanager.validate_config(cfg) + tbl = cfg["name"] + path = tablemanager.get_table_volume_path(tbl) + fmt = formatmanager.get_format_manager(cfg["format"]) + expts = fmt.expectations + + dlt.create_streaming_table( + name=tbl, + comment="File push created table", + table_properties={"filepush.table_volume_path_data": path}, + expect_all=expts + ) + _make_append_flow(tbl, cfg, path) diff --git a/filepush/dab/src/utils/envmanager.py b/filepush/dab/src/utils/envmanager.py new file mode 100644 index 00000000..ba822a98 --- /dev/null +++ b/filepush/dab/src/utils/envmanager.py @@ -0,0 +1,38 @@ +import os +import json +from databricks.sdk import WorkspaceClient + +def get_config() -> dict: + json_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "configs", "environment.json") + if not os.path.exists(json_path): + raise RuntimeError(f"Missing environment file: {json_path}. Have you run `databricks bundle run configuration_job`?") + with open(json_path, "r") as f: + configs = json.load(f) + return configs + +def has_default_storage() -> bool: + catalog = get_config()["catalog_name"] + + w = WorkspaceClient() + + # Try SDK model first + info = w.catalogs.get(catalog) + storage_root = getattr(info, "storage_root", None) + storage_location = getattr(info, "storage_location", None) + props = getattr(info, "properties", {}) or {} + + # Some workspaces expose fields only via raw JSON; fall back if all empty + if not (storage_root or storage_location or props): + j = w.api_client.do("GET", f"/api/2.1/unity-catalog/catalogs/{catalog}") + storage_root = j.get("storage_root") or j.get("storageLocation") + storage_location = j.get("storage_location") or j.get("storageLocation") + props = j.get("properties", {}) or {} + + # Heuristics: any of these indicates “default storage” is set + return bool( + storage_root or + storage_location or + props.get("defaultManagedLocation") or + props.get("delta.defaultLocation") + ) + \ No newline at end of file diff --git a/filepush/dab/src/utils/formatmanager.py b/filepush/dab/src/utils/formatmanager.py new file mode 100644 index 00000000..663b897a --- /dev/null +++ b/filepush/dab/src/utils/formatmanager.py @@ -0,0 +1,105 @@ +from dataclasses import dataclass +from . import envmanager + +@dataclass(frozen=True, slots=True) +class AutoLoaderOption: + key: str + value: str + hidden: bool = False + def __iter__(self): + yield (self.key, self) + +class AutoLoaderFormat: + def __init__(self): + self.name = None + self.options: set[AutoLoaderOption] = { + AutoLoaderOption("cloudFiles.inferColumnTypes", "true", True), + AutoLoaderOption("cloudFiles.schemaEvolutionMode", "addNewColumns", True), + AutoLoaderOption("cloudFiles.cleanSource", "MOVE", True), + AutoLoaderOption("cloudFiles.cleanSource.retentionDuration", "14 days", True), + AutoLoaderOption("cloudFiles.cleanSource.moveDestination", f"{envmanager.get_config()['volume_path_archive']}/{{table_name}}", True) + } + self.expectations: dict[str, str] = { + "Rescued data should be null": "_rescued_data IS NULL" + } + self.default_schema: set[str] = {"_rescued_data STRING"} + + def get_default_schema(self) -> str: + return ", ".join(self.default_schema) + + def get_userfacing_options(self) -> dict[str, str]: + return {opt.key: opt.value for opt in self.options if not opt.hidden} + + def validate_user_options(self, options: dict[str, str]) -> None: + allowed = set(self.get_userfacing_options()) + illegal = set(options) - allowed + if illegal: + raise ValueError( + f"Unsupported or protected options: {sorted(illegal)}. " + f"Allowed user options: {sorted(allowed)}" + ) + + def get_modified_options(self, options: dict[str, str]) -> dict[str, str]: + self.validate_user_options(options) + defaults = self.get_userfacing_options() + return {k: v for k, v in options.items() if k in defaults and v != defaults[k]} + + def get_merged_options(self, options: dict[str, str], table_name: str) -> dict[str, str]: + self.validate_user_options(options) + defaults = self.get_userfacing_options() + + merged = defaults.copy() + merged.update({k: v for k, v in options.items() if k in defaults}) + + # Format the moveDestination with table_name + move_dest_key = "cloudFiles.cleanSource.moveDestination" + if move_dest_key in merged: + merged[move_dest_key] = merged[move_dest_key].format(table_name=table_name) + + return merged + +class CSV(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "CSV" + self.options |= { + AutoLoaderOption("header", "true", True), + AutoLoaderOption("mergeSchema", "true", True), + AutoLoaderOption("mode", "PERMISSIVE", True), + AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), + AutoLoaderOption("delimiter", ","), + AutoLoaderOption("escape", "\""), + AutoLoaderOption("multiLine", "false"), + } + self.expectations |= { + "Corrupted record should be null": "_corrupt_record IS NULL" + } + self.default_schema |= {"_corrupt_record STRING"} + +class JSON(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "JSON" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + AutoLoaderOption("mode", "PERMISSIVE", True), + AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), + AutoLoaderOption("allowComments", "true"), + AutoLoaderOption("allowSingleQuotes", "true"), + AutoLoaderOption("inferTimestamp", "true"), + AutoLoaderOption("multiLine", "true"), + } + self.expectations |= { + "Corrupted record should be null": "_corrupt_record IS NULL" + } + self.default_schema |= {"_corrupt_record STRING"} + +_supported_formats: dict[str, AutoLoaderFormat] = {f.name: f for f in (CSV(), JSON())} + +def get_format_manager(fmt: str) -> dict[str, str]: + key = fmt.strip().upper() + try: + return _supported_formats[key] + except KeyError: + supported = ", ".join(sorted(_supported_formats)) + raise ValueError(f"{fmt!r} is not a supported format. Supported formats: {supported}") diff --git a/filepush/dab/src/utils/initialization.py b/filepush/dab/src/utils/initialization.py new file mode 100644 index 00000000..aa106072 --- /dev/null +++ b/filepush/dab/src/utils/initialization.py @@ -0,0 +1,69 @@ +from databricks.sdk import WorkspaceClient +import argparse +import json +import logging + +# Parse arguments +parser = argparse.ArgumentParser() +parser.add_argument("--catalog_name", type=str, required=True) +parser.add_argument("--schema_name", type=str, required=True) +parser.add_argument("--volume_path_root", type=str, required=True) +parser.add_argument("--logging_level", type=str, required=False, default="dev") +args = parser.parse_args() + +catalog_name = args.catalog_name +schema_name = args.schema_name +volume_path_root = args.volume_path_root +volume_path_data = args.volume_path_root + "/data" +volume_path_archive = args.volume_path_root + "/archive" +logging_level = logging.DEBUG if args.logging_level == "dev" else logging.INFO + +# Logging +logging.basicConfig( + level=logging_level, + format="%(asctime)s [%(levelname)s] %(module)s - %(message)s" +) +logger = logging.getLogger(__name__) # per-module logger + +# Initialize workspace client +ws = WorkspaceClient() + +# Set property to schema +logger.info(f"Setting property to schema {catalog_name}.{schema_name}") +logger.debug(f"Volume path root: {volume_path_root}") +logger.debug(f"Volume path data: {volume_path_data}") +ws.schemas.update(full_name=f"{catalog_name}.{schema_name}", properties={ + "filepush.volume_path_root": volume_path_root, + "filepush.volume_path_data": volume_path_data, + "filepush.volume_path_data": volume_path_archive +}) +logger.info(f"Schema {catalog_name}.{schema_name} configured") + +# Initialize volume folder structure +logger.info(f"Initializing volume folder structure {volume_path_root}") +logger.debug(f"Creating data directory {volume_path_data}") +ws.files.create_directory(volume_path_data) +logger.debug(f"Creating archive directory {volume_path_archive}") +ws.files.create_directory(volume_path_archive) +with open("../configs/tables.json", "r") as f: + for table in json.load(f): + table_volume_path_data = f"{volume_path_data}/{table['name']}" + logger.debug(f"Creating table directory {table_volume_path_data}") + ws.files.create_directory(table_volume_path_data) + table_volume_path_archive = f"{volume_path_archive}/{table['name']}" + logger.debug(f"Creating table archive directory {table_volume_path_archive}") + ws.files.create_directory(table_volume_path_archive) +logger.info(f"Volume {volume_path_root} configured") + +# Dump configs to environment json +all_configs = { + "catalog_name": catalog_name, + "schema_name": schema_name, + "volume_path_root": volume_path_root, + "volume_path_data": volume_path_data, + "volume_path_archive": volume_path_archive +} +with open("../configs/environment.json", "w") as f: + json.dump(all_configs, f) + +logger.info(f"==========\n%s\n==========", "\n".join(f"{k}: {v}" for k, v in all_configs.items())) diff --git a/filepush/dab/src/utils/tablemanager.py b/filepush/dab/src/utils/tablemanager.py new file mode 100644 index 00000000..5aaf9a55 --- /dev/null +++ b/filepush/dab/src/utils/tablemanager.py @@ -0,0 +1,98 @@ +import os +import json +from . import envmanager +from . import formatmanager +from pyspark.sql.streaming import DataStreamReader +from pyspark.sql import DataFrame, SparkSession +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors.platform import NotFound + +def validate_config(table_config: dict): + if not table_config.get("name"): + raise ValueError("name is required for table config") + if not table_config.get("format"): + raise ValueError("format is required for table config") + +def validate_configs(table_configs: list): + names = [cfg.get("name") for cfg in table_configs] + duplicates = set([name for name in names if names.count(name) > 1 and name is not None]) + if duplicates: + raise ValueError(f"Duplicate table names found in table configs: {sorted(duplicates)}") + for table_config in table_configs: + validate_config(table_config) + +def get_configs() -> list: + json_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "configs", "tables.json") + if not os.path.exists(json_path): + raise RuntimeError(f"Missing table configs file: {json_path}. Please following README.md to create one, deploy and run configuration_job.") + with open(json_path, "r") as f: + configs = json.load(f) + validate_configs(configs) + return configs + +def get_table_volume_path(table_name: str) -> str: + ws = WorkspaceClient() + table_volume_path_data = os.path.join(envmanager.get_config()["volume_path_data"], table_name) + try: + ws.files.get_directory_metadata(table_volume_path_data) + except NotFound: + raise RuntimeError(f"Table data path not found for table `{table_name}`. Have you run `databricks bundle run configuration_job`?") + return table_volume_path_data + +def has_data_file(table_name: str) -> bool: + ws = WorkspaceClient() + table_volume_path_data = get_table_volume_path(table_name) + try: + iter = ws.files.list_directory_contents(table_volume_path_data) + next(iter) + except StopIteration: + return False + return True + +def is_table_created(table_name: str) -> bool: + ws = WorkspaceClient() + return ws.tables.exists(full_name=f"{envmanager.get_config()['catalog_name']}.{envmanager.get_config()['schema_name']}.{table_name}").table_exists + +def _apply_table_options(reader: DataStreamReader, table_config: dict, fmt_mgr) -> DataStreamReader: + name = table_config.get("name") + fmt = table_config.get("format") + + # format options + user_fmt_opts = table_config.get("format_options", {}) + final_fmt_opts = fmt_mgr.get_merged_options(user_fmt_opts, name) + reader = reader.option("cloudFiles.format", fmt) + for k, v in final_fmt_opts.items(): + reader = reader.option(k, v) + + # schema hints + schema_hints = table_config.get("schema_hints") + if schema_hints: + reader = reader.option("cloudFiles.schemaHints", ", ".join({schema_hints} | fmt_mgr.default_schema)) + else: + reader = reader.option("cloudFiles.schemaHints", ", ".join(fmt_mgr.default_schema)) + + return reader + +def get_df_with_config(spark: SparkSession, table_config: dict, schema_location: str = None) -> DataFrame: + validate_config(table_config) + fmt = table_config.get("format") + fmt_mgr = formatmanager.get_format_manager(fmt) + + reader = spark.readStream.format("cloudFiles") + reader = _apply_table_options(reader, table_config, fmt_mgr) + if schema_location: + reader = reader.option("cloudFiles.schemaLocation", schema_location) + + # include file metadata + return reader.load(get_table_volume_path(table_config.get("name"))).selectExpr("*", "_metadata") + +def get_placeholder_df_with_config(spark: SparkSession, table_config: dict) -> DataFrame: + validate_config(table_config) + fmt = table_config.get("format") + fmt_mgr = formatmanager.get_format_manager(fmt) + + reader = spark.readStream.format("cloudFiles") + reader = _apply_table_options(reader, table_config, fmt_mgr).schema(fmt_mgr.get_default_schema()) + + return reader.load(get_table_volume_path(table_config.get("name"))) + \ No newline at end of file