From e16f95fd5db332eee13d1204d13401adbfe99796 Mon Sep 17 00:00:00 2001 From: mkfreeman Date: Sun, 23 Oct 2022 15:04:42 -0700 Subject: [PATCH 1/8] Add sketch of duckdb --- bin/resolve-dependencies | 4 + src/dependencies.mjs | 10 +- src/duckdb.mjs | 320 +++++++++++++++++++++++++++++++++++++++ src/library.mjs | 2 + test/index-test.mjs | 3 +- 5 files changed, 337 insertions(+), 2 deletions(-) create mode 100644 src/duckdb.mjs diff --git a/bin/resolve-dependencies b/bin/resolve-dependencies index dcf2af1d..4f71475a 100755 --- a/bin/resolve-dependencies +++ b/bin/resolve-dependencies @@ -86,6 +86,10 @@ const mains = ["unpkg", "jsdelivr", "browser", "main"]; const package = await resolve("leaflet"); console.log(`export const leaflet = dependency("${package.name}", "${package.version}", "${package.export.replace(/-src\.js$/, ".js")}");`); } + { + const package = await resolve("@duckdb/duckdb-wasm@1.17.0"); + console.log(`export const duckdb = dependency("${package.name}", "${package.version}", "${package.export}");`); + } })(); async function resolve(specifier) { diff --git a/src/dependencies.mjs b/src/dependencies.mjs index 9dc6652b..d63ef879 100644 --- a/src/dependencies.mjs +++ b/src/dependencies.mjs @@ -1,4 +1,5 @@ import dependency from "./dependency.mjs"; +import * as ddb from "@duckdb/duckdb-wasm" export const d3 = dependency("d3", "7.6.1", "dist/d3.min.js"); export const inputs = dependency("@observablehq/inputs", "0.10.4", "dist/inputs.min.js"); export const plot = dependency("@observablehq/plot", "0.6.0", "dist/plot.umd.min.js"); @@ -13,9 +14,16 @@ export const sql = dependency("sql.js", "1.7.0", "dist/sql-wasm.js"); export const vega = dependency("vega", "5.22.1", "build/vega.min.js"); export const vegalite = dependency("vega-lite", "5.5.0", "build/vega-lite.min.js"); export const vegaliteApi = dependency("vega-lite-api", "5.0.0", "build/vega-lite-api.min.js"); -export const arrow = dependency("apache-arrow", "4.0.1", "Arrow.es2015.min.js"); +export const arrow = dependency("apache-arrow", "^8", "Arrow.es2015.min.js"); export const arquero = dependency("arquero", "4.8.8", "dist/arquero.min.js"); export const topojson = dependency("topojson-client", "3.1.0", "dist/topojson-client.min.js"); export const exceljs = dependency("exceljs", "4.3.0", "dist/exceljs.min.js"); export const mermaid = dependency("mermaid", "9.1.6", "dist/mermaid.min.js"); export const leaflet = dependency("leaflet", "1.8.0", "dist/leaflet.js"); +export const duckdb = ddb; +// export const duckdb = dependency("@duckdb/duckdb-wasm", "1.17.0", "dist/duckdb-browser.cjs"); +// export const duckdb = dependency("@duckdb/duckdb-wasm", "1.17.0", ""); +// export const duckdb = dependency("@duckdb/duckdb-wasm", "1.17.0", "?min"); +// tried: dependency("@duckdb/duckdb-wasm", "1.17.0", "+esm"); +// dependency("@duckdb/duckdb-wasm", "1.17.0", "dist/duckdb-browser.cjs"); +// dependency("@duckdb/duckdb-wasm", "1.17.0", "dist/duckdb-browser.mjs"); \ No newline at end of file diff --git a/src/duckdb.mjs b/src/duckdb.mjs new file mode 100644 index 00000000..e32be85d --- /dev/null +++ b/src/duckdb.mjs @@ -0,0 +1,320 @@ +// TODO I wasn't able to use the `.resolve()` approach with a .mjs file +import {arrow as arr, duckdb as duck} from "./dependencies.mjs"; + +export default async function duckdb(require) { + + const arrow = await require(arr.resolve()); // TODO is this right...? + const bundles = await duck.getJsDelivrBundles(); + const bundle = await duck.selectBundle(bundles); + async function makeDB() { + const logger = new duck.ConsoleLogger(); + const worker = await duck.createWorker(bundle.mainWorker); + const db = new duck.AsyncDuckDB(logger, worker); + await db.instantiate(bundle.mainModule); + return db; + } + +// Adapted from: https://observablehq.com/@cmudig/duckdb-client +// Follows the DatabaseClient specification: https://observablehq.com/@observablehq/database-client-specification +class DuckDBClient { + constructor(_db) { + this._db = _db; + this._counter = 0; + } + + async queryStream(query, params) { + const conn = await this.connection(); + let result; + + if (params) { + const stmt = await conn.prepare(query); + result = await stmt.query(...params); + } else { + result = await conn.query(query); + } + // Populate the schema of the results + const schema = result.schema.fields.map(({ name, type }) => ({ + name, + type: getType(String(type)), + databaseType: String(type) + })); + return { + schema, + async *readRows() { + let rows = result.toArray().map((r) => Object.fromEntries(r)); + yield rows; + } + }; + } + + // This function gets called to prepare the `query` parameter of the `queryStream` method + queryTag(strings, ...params) { + return [strings.join("?"), params]; + } + async db() { + if (!this._db) { + this._db = await makeDB(); + await this._db.open({ + query: { + castTimestampToDate: true + } + }); + } + return this._db; + } + + async connection() { + if (!this._conn) { + const db = await this.db(); + this._conn = await db.connect(); + } + return this._conn; + } + + async reconnect() { + if (this._conn) { + this._conn.close(); + } + delete this._conn; + } + + // query a single row + async queryRow(query, params) { + const key = `Query ${this._counter++}: ${query}`; + + console.time(key); + const conn = await this.connection(); + // use send as we can stop iterating after we get the first batch + const result = await conn.send(query, params); + const batch = (await result.next()).value; + console.timeEnd(key); + + return batch?.get(0); + } + + async insertJSON(name, buffer, options) { + const db = await this.db(); + await db.registerFileBuffer(name, new Uint8Array(buffer)); + const conn = await db.connect(); + await conn.insertJSONFromPath(name, { name, schema: "main", ...options }); + await conn.close(); + + return this; + } + + async insertCSV(name, buffer, options) { + const db = await this.db(); + await db.registerFileBuffer(name, new Uint8Array(buffer)); + const conn = await db.connect(); + await conn.insertCSVFromPath(name, { name, schema: "main", ...options }); + await conn.close(); + + return this; + } + + async insertParquet(name, buffer) { + const db = await this.db(); + await db.registerFileBuffer(name, new Uint8Array(buffer)); + const conn = await db.connect(); + await conn.query( + `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')` + ); + await conn.close(); + + return this; + } + + async insertArrowTable(name, table, options) { + const buffer = arrow.tableToIPC(table); + return this.insertArrowFromIPCStream(name, buffer, options); + } + + async insertArrowFromIPCStream(name, buffer, options) { + const db = await this.db(); + const conn = await db.connect(); + await conn.insertArrowFromIPCStream(buffer, { + name, + schema: "main", + ...options + }); + await conn.close(); + + return this; + } + + // Create a database from FileArrachments + static async of(files = []) { + const db = await makeDB(); + await db.open({ + query: { + castTimestampToDate: true + } + }); + + const toName = (file) => + file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, ""); // remove the "@X" versions Observable adds to file names + + if (files.constructor.name === "FileAttachment") { + files = [[toName(files), files]]; + } else if (!Array.isArray(files)) { + files = Object.entries(files); + } + + // Add all files to the database. Import JSON and CSV. Create view for Parquet. + await Promise.all( + files.map(async (entry) => { + let file; + let name; + let options = {}; + + if (Array.isArray(entry)) { + [name, file] = entry; + if (file.hasOwnProperty("file")) { + ({ file, ...options } = file); + } + } else if (entry.constructor.name === "FileAttachment") { + [name, file] = [toName(entry), entry]; + } else if (typeof entry === "object") { + ({ file, name, ...options } = entry); + name = name ?? toName(file); + } else { + console.error("Unrecognized entry", entry); + } + + if (!file.url && Array.isArray(file)) { + const data = file; + + const table = arrow.tableFromJSON(data); + const buffer = arrow.tableToIPC(table); + + const conn = await db.connect(); + await conn.insertArrowFromIPCStream(buffer, { + name, + schema: "main", + ...options + }); + await conn.close(); + return; + } else { + const url = await file.url(); + if (url.indexOf("blob:") === 0) { + const buffer = await file.arrayBuffer(); + await db.registerFileBuffer(file.name, new Uint8Array(buffer)); + } else { + await db.registerFileURL(file.name, url); + } + } + + const conn = await db.connect(); + if (file.name.endsWith(".csv")) { + await conn.insertCSVFromPath(file.name, { + name, + schema: "main", + ...options + }); + } else if (file.name.endsWith(".json")) { + await conn.insertJSONFromPath(file.name, { + name, + schema: "main", + ...options + }); + } else if (file.name.endsWith(".parquet")) { + await conn.query( + `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')` + ); + } else { + console.warn(`Don't know how to handle file type of ${file.name}`); + } + await conn.close(); + }) + ); + + return new DuckDBClient(db); + } + } + return function duckdb(obj) { + return DuckDBClient.of(obj) + }; +} + +function getType (type) { + const typeLower = type.toLowerCase(); + switch (typeLower) { + case "bigint": + case "int8": + case "long": + return "bigint"; + + case "double": + case "float8": + case "numeric": + case "decimal": + case "decimal(s, p)": + case "real": + case "float4": + case "float": + case "float64": + return "number"; + + case "hugeint": + case "integer": + case "smallint": + case "tinyint": + case "ubigint": + case "uinteger": + case "usmallint": + case "utinyint": + case "smallint": + case "tinyint": + case "ubigint": + case "uinteger": + case "usmallint": + case "utinyint": + case "int4": + case "int": + case "signed": + case "int2": + case "short": + case "int1": + case "int64": + case "int32": + return "integer"; + + case "boolean": + case "bool": + case "logical": + return "boolean"; + + case "date": + case "interval": // date or time delta + case "time": + case "timestamp": + case "timestamp with time zone": + case "datetime": + case "timestamptz": + return "date"; + + case "uuid": + case "varchar": + case "char": + case "bpchar": + case "text": + case "string": + case "utf8": // this type is unlisted in the `types`, but is returned by the db as `column_type`... + return "string"; + default: + return "other"; + } +} + +function element(name, props, children) { + if (arguments.length === 2) children = props, props = undefined; + const element = document.createElement(name); + if (props !== undefined) for (const p in props) element[p] = props[p]; + if (children !== undefined) for (const c of children) element.appendChild(c); + return element; +} + +function text(value) { +return document.createTextNode(value); +} \ No newline at end of file diff --git a/src/library.mjs b/src/library.mjs index a8de4208..c097c19e 100644 --- a/src/library.mjs +++ b/src/library.mjs @@ -3,6 +3,7 @@ import DOM from "./dom/index.mjs"; import Files from "./files/index.mjs"; import {AbstractFile, FileAttachment, NoFileAttachments} from "./fileAttachment.mjs"; import Generators from "./generators/index.mjs"; +import duckdb from "./duckdb.mjs"; import html from "./html.mjs"; import leaflet from "./leaflet.mjs"; import md from "./md.mjs"; @@ -53,6 +54,7 @@ export default Object.assign(Object.defineProperties(function Library(resolver) SQLiteDatabaseClient: () => SQLiteDatabaseClient, topojson: () => require(topojson.resolve()), vl: () => vegalite(require), + duckdb: () => duckdb(require), // Sample datasets // https://observablehq.com/@observablehq/datasets diff --git a/test/index-test.mjs b/test/index-test.mjs index 1b09bc43..2fa57aae 100644 --- a/test/index-test.mjs +++ b/test/index-test.mjs @@ -25,6 +25,7 @@ it("new Library returns a library with the expected keys", () => { "d3", "diamonds", "dot", + "duckdb", "flare", "htl", "html", @@ -42,6 +43,6 @@ it("new Library returns a library with the expected keys", () => { "topojson", "vl", "weather", - "width" + "width" ]); }); From 916522dc50f6727ceabc6474b01e0847c6dd1355 Mon Sep 17 00:00:00 2001 From: mkfreeman Date: Mon, 24 Oct 2022 11:12:57 -0400 Subject: [PATCH 2/8] Expose entire DuckDBClient --- src/dependencies.mjs | 2 +- src/duckdb.mjs | 484 +++++++++++++++++++++++++------------------ src/library.mjs | 2 +- test/index-test.mjs | 2 +- 4 files changed, 285 insertions(+), 205 deletions(-) diff --git a/src/dependencies.mjs b/src/dependencies.mjs index d63ef879..b7c6bd49 100644 --- a/src/dependencies.mjs +++ b/src/dependencies.mjs @@ -1,5 +1,5 @@ import dependency from "./dependency.mjs"; -import * as ddb from "@duckdb/duckdb-wasm" +import * as ddb from "@duckdb/duckdb-wasm"; export const d3 = dependency("d3", "7.6.1", "dist/d3.min.js"); export const inputs = dependency("@observablehq/inputs", "0.10.4", "dist/inputs.min.js"); export const plot = dependency("@observablehq/plot", "0.6.0", "dist/plot.umd.min.js"); diff --git a/src/duckdb.mjs b/src/duckdb.mjs index e32be85d..a31b8892 100644 --- a/src/duckdb.mjs +++ b/src/duckdb.mjs @@ -17,224 +17,304 @@ export default async function duckdb(require) { // Adapted from: https://observablehq.com/@cmudig/duckdb-client // Follows the DatabaseClient specification: https://observablehq.com/@observablehq/database-client-specification class DuckDBClient { - constructor(_db) { - this._db = _db; - this._counter = 0; + constructor(_db) { + this._db = _db; + this._counter = 0; + } + + async queryStream(query, params) { + const conn = await this.connection(); + let result; + + if (params) { + const stmt = await conn.prepare(query); + result = await stmt.query(...params); + } else { + result = await conn.query(query); } - - async queryStream(query, params) { - const conn = await this.connection(); - let result; - - if (params) { - const stmt = await conn.prepare(query); - result = await stmt.query(...params); - } else { - result = await conn.query(query); + // Populate the schema of the results + const schema = result.schema.fields.map(({ name, type }) => ({ + name, + type: getType(String(type)), + databaseType: String(type) + })); + return { + schema, + async *readRows() { + let rows = result.toArray().map((r) => Object.fromEntries(r)); + yield rows; } - // Populate the schema of the results - const schema = result.schema.fields.map(({ name, type }) => ({ - name, - type: getType(String(type)), - databaseType: String(type) - })); + }; + } + + // This function gets called to prepare the `query` parameter of the `queryStream` method + queryTag(strings, ...params) { + return [strings.join("?"), params]; + } + + escape(name) { + return `"${name}"`; + } + + async describeTables() { + const conn = await this.connection(); + const tables = (await conn.query(`SHOW TABLES`)).toArray(); + return tables.map(({ name }) => ({ name })); + } + + async describeColumns({ table } = {}) { + const conn = await this.connection(); + const columns = (await conn.query(`DESCRIBE ${table}`)).toArray(); + return columns.map(({ column_name, column_type }) => { return { - schema, - async *readRows() { - let rows = result.toArray().map((r) => Object.fromEntries(r)); - yield rows; - } + name: column_name, + type: getType(column_type), + databaseType: column_type }; - } - - // This function gets called to prepare the `query` parameter of the `queryStream` method - queryTag(strings, ...params) { - return [strings.join("?"), params]; - } - async db() { - if (!this._db) { - this._db = await makeDB(); - await this._db.open({ - query: { - castTimestampToDate: true - } - }); + }); + } + + async db() { + if (!this._db) { + this._db = await makeDB(); + await this._db.open({ + query: { + castTimestampToDate: true } - return this._db; - } - - async connection() { - if (!this._conn) { - const db = await this.db(); - this._conn = await db.connect(); - } - return this._conn; - } - - async reconnect() { - if (this._conn) { - this._conn.close(); - } - delete this._conn; - } - - // query a single row - async queryRow(query, params) { - const key = `Query ${this._counter++}: ${query}`; - - console.time(key); - const conn = await this.connection(); - // use send as we can stop iterating after we get the first batch - const result = await conn.send(query, params); - const batch = (await result.next()).value; - console.timeEnd(key); - - return batch?.get(0); - } - - async insertJSON(name, buffer, options) { - const db = await this.db(); - await db.registerFileBuffer(name, new Uint8Array(buffer)); - const conn = await db.connect(); - await conn.insertJSONFromPath(name, { name, schema: "main", ...options }); - await conn.close(); - - return this; - } - - async insertCSV(name, buffer, options) { - const db = await this.db(); - await db.registerFileBuffer(name, new Uint8Array(buffer)); - const conn = await db.connect(); - await conn.insertCSVFromPath(name, { name, schema: "main", ...options }); - await conn.close(); - - return this; + }); } - - async insertParquet(name, buffer) { + return this._db; + } + + async connection() { + if (!this._conn) { const db = await this.db(); - await db.registerFileBuffer(name, new Uint8Array(buffer)); - const conn = await db.connect(); - await conn.query( - `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')` - ); - await conn.close(); - - return this; + this._conn = await db.connect(); } - - async insertArrowTable(name, table, options) { - const buffer = arrow.tableToIPC(table); - return this.insertArrowFromIPCStream(name, buffer, options); + return this._conn; + } + + async reconnect() { + if (this._conn) { + this._conn.close(); } - - async insertArrowFromIPCStream(name, buffer, options) { - const db = await this.db(); - const conn = await db.connect(); - await conn.insertArrowFromIPCStream(buffer, { - name, - schema: "main", - ...options - }); - await conn.close(); - - return this; + delete this._conn; + } + + // The `.queryStream` function will supercede this for SQL and Table cells + // Keeping this for backwards compatibility + async query(query, params) { + const conn = await this.connection(); + let result; + + if (params) { + const stmt = await conn.prepare(query); + result = stmt.query(...params); + } else { + result = await conn.query(query); } - - // Create a database from FileArrachments - static async of(files = []) { - const db = await makeDB(); - await db.open({ - query: { - castTimestampToDate: true - } - }); - - const toName = (file) => - file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, ""); // remove the "@X" versions Observable adds to file names - - if (files.constructor.name === "FileAttachment") { - files = [[toName(files), files]]; - } else if (!Array.isArray(files)) { - files = Object.entries(files); + return result; + } + + // The `.queryStream` function will supercede this for SQL and Table cells + // Keeping this for backwards compatibility + async sql(strings, ...args) { + // expected to be used like db.sql`select * from table where foo = ${param}` + + const results = await this.query(strings.join("?"), args); + + // return rows as a JavaScript array of objects for now + let rows = results.toArray().map(Object.fromEntries); + rows.columns = results.schema.fields.map((d) => d.name); + return rows; + } + + async table(query, params, opts) { + const result = await this.query(query, params); + return Inputs.table(result, { layout: "auto", ...(opts || {}) }); + } + + // get the client after the query ran + async client(query, params) { + await this.query(query, params); + return this; + } + + // query a single row + async queryRow(query, params) { + const key = `Query ${this._counter++}: ${query}`; + const conn = await this.connection(); + // use send as we can stop iterating after we get the first batch + const result = await conn.send(query, params); + const batch = (await result.next()).value; + return batch && batch.get(0); + } + + async explain(query, params) { + const row = await this.queryRow(`EXPLAIN ${query}`, params); + return element("pre", { className: "observablehq--inspect" }, [ + text(row["explain_value"]) + ]); + } + + // Describe the database (no arg) or a table + async describe(object) { + const result = await (object === undefined + ? this.query(`SHOW TABLES`) + : this.query(`DESCRIBE ${object}`)); + return Inputs.table(result); + } + + // Summarize a query result + async summarize(query) { + const result = await this.query(`SUMMARIZE ${query}`); + return Inputs.table(result); + } + + async insertJSON(name, buffer, options) { + const db = await this.db(); + await db.registerFileBuffer(name, new Uint8Array(buffer)); + const conn = await db.connect(); + await conn.insertJSONFromPath(name, { name, schema: "main", ...options }); + await conn.close(); + + return this; + } + + async insertCSV(name, buffer, options) { + const db = await this.db(); + await db.registerFileBuffer(name, new Uint8Array(buffer)); + const conn = await db.connect(); + await conn.insertCSVFromPath(name, { name, schema: "main", ...options }); + await conn.close(); + + return this; + } + + async insertParquet(name, buffer) { + const db = await this.db(); + await db.registerFileBuffer(name, new Uint8Array(buffer)); + const conn = await db.connect(); + await conn.query( + `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')` + ); + await conn.close(); + + return this; + } + + async insertArrowTable(name, table, options) { + const buffer = arrow.tableToIPC(table); + return this.insertArrowFromIPCStream(name, buffer, options); + } + + async insertArrowFromIPCStream(name, buffer, options) { + const db = await this.db(); + const conn = await db.connect(); + await conn.insertArrowFromIPCStream(buffer, { + name, + schema: "main", + ...options + }); + await conn.close(); + + return this; + } + + // Create a database from FileArrachments + static async of(files = []) { + const db = await makeDB(); + await db.open({ + query: { + castTimestampToDate: true } - - // Add all files to the database. Import JSON and CSV. Create view for Parquet. - await Promise.all( - files.map(async (entry) => { - let file; - let name; - let options = {}; - - if (Array.isArray(entry)) { - [name, file] = entry; - if (file.hasOwnProperty("file")) { - ({ file, ...options } = file); - } - } else if (entry.constructor.name === "FileAttachment") { - [name, file] = [toName(entry), entry]; - } else if (typeof entry === "object") { - ({ file, name, ...options } = entry); - name = name ?? toName(file); - } else { - console.error("Unrecognized entry", entry); - } - - if (!file.url && Array.isArray(file)) { - const data = file; - - const table = arrow.tableFromJSON(data); - const buffer = arrow.tableToIPC(table); - - const conn = await db.connect(); - await conn.insertArrowFromIPCStream(buffer, { - name, - schema: "main", - ...options - }); - await conn.close(); - return; - } else { - const url = await file.url(); - if (url.indexOf("blob:") === 0) { - const buffer = await file.arrayBuffer(); - await db.registerFileBuffer(file.name, new Uint8Array(buffer)); - } else { - await db.registerFileURL(file.name, url); - } + }); + + const toName = (file) => + file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, ""); // remove the "@X" versions Observable adds to file names + + if (files.constructor.name === "FileAttachment") { + files = [[toName(files), files]]; + } else if (!Array.isArray(files)) { + files = Object.entries(files); + } + + // Add all files to the database. Import JSON and CSV. Create view for Parquet. + await Promise.all( + files.map(async (entry) => { + let file; + let name; + let options = {}; + + if (Array.isArray(entry)) { + [name, file] = entry; + if (file.hasOwnProperty("file")) { + ({ file, ...options } = file); } - + } else if (entry.constructor.name === "FileAttachment") { + [name, file] = [toName(entry), entry]; + } else if (typeof entry === "object") { + ({ file, name, ...options } = entry); + name = name ?? toName(file); + } else { + console.error("Unrecognized entry", entry); + } + + if (!file.url && Array.isArray(file)) { + const data = file; + // file = { name: name + ".json" }; + // db.registerFileText(`${name}.json`, JSON.stringify(data)); + + const table = arrow.tableFromJSON(data); + const buffer = arrow.tableToIPC(table); + const conn = await db.connect(); - if (file.name.endsWith(".csv")) { - await conn.insertCSVFromPath(file.name, { - name, - schema: "main", - ...options - }); - } else if (file.name.endsWith(".json")) { - await conn.insertJSONFromPath(file.name, { - name, - schema: "main", - ...options - }); - } else if (file.name.endsWith(".parquet")) { - await conn.query( - `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')` - ); + await conn.insertArrowFromIPCStream(buffer, { + name, + schema: "main", + ...options + }); + await conn.close(); + return; + } else { + const url = await file.url(); + if (url.indexOf("blob:") === 0) { + const buffer = await file.arrayBuffer(); + await db.registerFileBuffer(file.name, new Uint8Array(buffer)); } else { - console.warn(`Don't know how to handle file type of ${file.name}`); + await db.registerFileURL(file.name, url); } - await conn.close(); - }) - ); - - return new DuckDBClient(db); - } + } + + const conn = await db.connect(); + if (file.name.endsWith(".csv")) { + await conn.insertCSVFromPath(file.name, { + name, + schema: "main", + ...options + }); + } else if (file.name.endsWith(".json")) { + await conn.insertJSONFromPath(file.name, { + name, + schema: "main", + ...options + }); + } else if (file.name.endsWith(".parquet")) { + await conn.query( + `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')` + ); + } else { + console.warn(`Don't know how to handle file type of ${file.name}`); + } + await conn.close(); + }) + ); + + return new DuckDBClient(db); } - return function duckdb(obj) { - return DuckDBClient.of(obj) - }; +} + return DuckDBClient + } function getType (type) { diff --git a/src/library.mjs b/src/library.mjs index c097c19e..7558b3a9 100644 --- a/src/library.mjs +++ b/src/library.mjs @@ -43,6 +43,7 @@ export default Object.assign(Object.defineProperties(function Library(resolver) aq: () => require.alias({"apache-arrow": arrow.resolve()})(arquero.resolve()), Arrow: () => require(arrow.resolve()), d3: () => require(d3.resolve()), + DuckDBClient: () => duckdb(require), Inputs: () => require(inputs.resolve()).then(Inputs => ({...Inputs, file: Inputs.fileOf(AbstractFile)})), L: () => leaflet(require), mermaid: () => mermaid(require), @@ -54,7 +55,6 @@ export default Object.assign(Object.defineProperties(function Library(resolver) SQLiteDatabaseClient: () => SQLiteDatabaseClient, topojson: () => require(topojson.resolve()), vl: () => vegalite(require), - duckdb: () => duckdb(require), // Sample datasets // https://observablehq.com/@observablehq/datasets diff --git a/test/index-test.mjs b/test/index-test.mjs index 2fa57aae..2e4bdbec 100644 --- a/test/index-test.mjs +++ b/test/index-test.mjs @@ -5,6 +5,7 @@ it("new Library returns a library with the expected keys", () => { assert.deepStrictEqual(Object.keys(new Library()).sort(), [ "Arrow", "DOM", + "DuckDBClient", "FileAttachment", "Files", "Generators", @@ -25,7 +26,6 @@ it("new Library returns a library with the expected keys", () => { "d3", "diamonds", "dot", - "duckdb", "flare", "htl", "html", From 8f86915f17af9f6958615c78a22727f442366b41 Mon Sep 17 00:00:00 2001 From: mkfreeman Date: Mon, 24 Oct 2022 11:15:04 -0400 Subject: [PATCH 3/8] Beautify file --- src/duckdb.mjs | 570 ++++++++++++++++++++++++++----------------------- 1 file changed, 304 insertions(+), 266 deletions(-) diff --git a/src/duckdb.mjs b/src/duckdb.mjs index a31b8892..b05c623e 100644 --- a/src/duckdb.mjs +++ b/src/duckdb.mjs @@ -1,8 +1,11 @@ // TODO I wasn't able to use the `.resolve()` approach with a .mjs file -import {arrow as arr, duckdb as duck} from "./dependencies.mjs"; +import { + arrow as arr, + duckdb as duck +} from "./dependencies.mjs"; export default async function duckdb(require) { - + const arrow = await require(arr.resolve()); // TODO is this right...? const bundles = await duck.getJsDelivrBundles(); const bundle = await duck.selectBundle(bundles); @@ -14,310 +17,343 @@ export default async function duckdb(require) { return db; } -// Adapted from: https://observablehq.com/@cmudig/duckdb-client -// Follows the DatabaseClient specification: https://observablehq.com/@observablehq/database-client-specification -class DuckDBClient { - constructor(_db) { - this._db = _db; - this._counter = 0; - } - - async queryStream(query, params) { - const conn = await this.connection(); - let result; - - if (params) { - const stmt = await conn.prepare(query); - result = await stmt.query(...params); - } else { - result = await conn.query(query); + // Adapted from: https://observablehq.com/@cmudig/duckdb-client + // Follows the DatabaseClient specification: https://observablehq.com/@observablehq/database-client-specification + class DuckDBClient { + constructor(_db) { + this._db = _db; + this._counter = 0; } - // Populate the schema of the results - const schema = result.schema.fields.map(({ name, type }) => ({ - name, - type: getType(String(type)), - databaseType: String(type) - })); - return { - schema, - async *readRows() { - let rows = result.toArray().map((r) => Object.fromEntries(r)); - yield rows; - } - }; - } - // This function gets called to prepare the `query` parameter of the `queryStream` method - queryTag(strings, ...params) { - return [strings.join("?"), params]; - } - - escape(name) { - return `"${name}"`; - } - - async describeTables() { - const conn = await this.connection(); - const tables = (await conn.query(`SHOW TABLES`)).toArray(); - return tables.map(({ name }) => ({ name })); - } + async queryStream(query, params) { + const conn = await this.connection(); + let result; - async describeColumns({ table } = {}) { - const conn = await this.connection(); - const columns = (await conn.query(`DESCRIBE ${table}`)).toArray(); - return columns.map(({ column_name, column_type }) => { + if (params) { + const stmt = await conn.prepare(query); + result = await stmt.query(...params); + } else { + result = await conn.query(query); + } + // Populate the schema of the results + const schema = result.schema.fields.map(({ + name, + type + }) => ({ + name, + type: getType(String(type)), + databaseType: String(type) + })); return { - name: column_name, - type: getType(column_type), - databaseType: column_type + schema, + async *readRows() { + let rows = result.toArray().map((r) => Object.fromEntries(r)); + yield rows; + } }; - }); - } + } - async db() { - if (!this._db) { - this._db = await makeDB(); - await this._db.open({ - query: { - castTimestampToDate: true - } - }); + // This function gets called to prepare the `query` parameter of the `queryStream` method + queryTag(strings, ...params) { + return [strings.join("?"), params]; } - return this._db; - } - async connection() { - if (!this._conn) { - const db = await this.db(); - this._conn = await db.connect(); + escape(name) { + return `"${name}"`; } - return this._conn; - } - async reconnect() { - if (this._conn) { - this._conn.close(); + async describeTables() { + const conn = await this.connection(); + const tables = (await conn.query(`SHOW TABLES`)).toArray(); + return tables.map(({ + name + }) => ({ + name + })); } - delete this._conn; - } - // The `.queryStream` function will supercede this for SQL and Table cells - // Keeping this for backwards compatibility - async query(query, params) { - const conn = await this.connection(); - let result; - - if (params) { - const stmt = await conn.prepare(query); - result = stmt.query(...params); - } else { - result = await conn.query(query); + async describeColumns({ + table + } = {}) { + const conn = await this.connection(); + const columns = (await conn.query(`DESCRIBE ${table}`)).toArray(); + return columns.map(({ + column_name, + column_type + }) => { + return { + name: column_name, + type: getType(column_type), + databaseType: column_type + }; + }); } - return result; - } - // The `.queryStream` function will supercede this for SQL and Table cells - // Keeping this for backwards compatibility - async sql(strings, ...args) { - // expected to be used like db.sql`select * from table where foo = ${param}` + async db() { + if (!this._db) { + this._db = await makeDB(); + await this._db.open({ + query: { + castTimestampToDate: true + } + }); + } + return this._db; + } - const results = await this.query(strings.join("?"), args); + async connection() { + if (!this._conn) { + const db = await this.db(); + this._conn = await db.connect(); + } + return this._conn; + } - // return rows as a JavaScript array of objects for now - let rows = results.toArray().map(Object.fromEntries); - rows.columns = results.schema.fields.map((d) => d.name); - return rows; - } + async reconnect() { + if (this._conn) { + this._conn.close(); + } + delete this._conn; + } - async table(query, params, opts) { - const result = await this.query(query, params); - return Inputs.table(result, { layout: "auto", ...(opts || {}) }); - } + // The `.queryStream` function will supercede this for SQL and Table cells + // Keeping this for backwards compatibility + async query(query, params) { + const conn = await this.connection(); + let result; + + if (params) { + const stmt = await conn.prepare(query); + result = stmt.query(...params); + } else { + result = await conn.query(query); + } + return result; + } - // get the client after the query ran - async client(query, params) { - await this.query(query, params); - return this; - } + // The `.queryStream` function will supercede this for SQL and Table cells + // Keeping this for backwards compatibility + async sql(strings, ...args) { + // expected to be used like db.sql`select * from table where foo = ${param}` - // query a single row - async queryRow(query, params) { - const key = `Query ${this._counter++}: ${query}`; - const conn = await this.connection(); - // use send as we can stop iterating after we get the first batch - const result = await conn.send(query, params); - const batch = (await result.next()).value; - return batch && batch.get(0); - } + const results = await this.query(strings.join("?"), args); - async explain(query, params) { - const row = await this.queryRow(`EXPLAIN ${query}`, params); - return element("pre", { className: "observablehq--inspect" }, [ - text(row["explain_value"]) - ]); - } + // return rows as a JavaScript array of objects for now + let rows = results.toArray().map(Object.fromEntries); + rows.columns = results.schema.fields.map((d) => d.name); + return rows; + } - // Describe the database (no arg) or a table - async describe(object) { - const result = await (object === undefined - ? this.query(`SHOW TABLES`) - : this.query(`DESCRIBE ${object}`)); - return Inputs.table(result); - } + async table(query, params, opts) { + const result = await this.query(query, params); + return Inputs.table(result, { + layout: "auto", + ...(opts || {}) + }); + } - // Summarize a query result - async summarize(query) { - const result = await this.query(`SUMMARIZE ${query}`); - return Inputs.table(result); - } + // get the client after the query ran + async client(query, params) { + await this.query(query, params); + return this; + } - async insertJSON(name, buffer, options) { - const db = await this.db(); - await db.registerFileBuffer(name, new Uint8Array(buffer)); - const conn = await db.connect(); - await conn.insertJSONFromPath(name, { name, schema: "main", ...options }); - await conn.close(); + // query a single row + async queryRow(query, params) { + const key = `Query ${this._counter++}: ${query}`; + const conn = await this.connection(); + // use send as we can stop iterating after we get the first batch + const result = await conn.send(query, params); + const batch = (await result.next()).value; + return batch && batch.get(0); + } - return this; - } + async explain(query, params) { + const row = await this.queryRow(`EXPLAIN ${query}`, params); + return element("pre", { + className: "observablehq--inspect" + }, [ + text(row["explain_value"]) + ]); + } - async insertCSV(name, buffer, options) { - const db = await this.db(); - await db.registerFileBuffer(name, new Uint8Array(buffer)); - const conn = await db.connect(); - await conn.insertCSVFromPath(name, { name, schema: "main", ...options }); - await conn.close(); + // Describe the database (no arg) or a table + async describe(object) { + const result = await (object === undefined ? + this.query(`SHOW TABLES`) : + this.query(`DESCRIBE ${object}`)); + return Inputs.table(result); + } - return this; - } + // Summarize a query result + async summarize(query) { + const result = await this.query(`SUMMARIZE ${query}`); + return Inputs.table(result); + } - async insertParquet(name, buffer) { - const db = await this.db(); - await db.registerFileBuffer(name, new Uint8Array(buffer)); - const conn = await db.connect(); - await conn.query( - `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')` - ); - await conn.close(); + async insertJSON(name, buffer, options) { + const db = await this.db(); + await db.registerFileBuffer(name, new Uint8Array(buffer)); + const conn = await db.connect(); + await conn.insertJSONFromPath(name, { + name, + schema: "main", + ...options + }); + await conn.close(); - return this; - } + return this; + } - async insertArrowTable(name, table, options) { - const buffer = arrow.tableToIPC(table); - return this.insertArrowFromIPCStream(name, buffer, options); - } + async insertCSV(name, buffer, options) { + const db = await this.db(); + await db.registerFileBuffer(name, new Uint8Array(buffer)); + const conn = await db.connect(); + await conn.insertCSVFromPath(name, { + name, + schema: "main", + ...options + }); + await conn.close(); - async insertArrowFromIPCStream(name, buffer, options) { - const db = await this.db(); - const conn = await db.connect(); - await conn.insertArrowFromIPCStream(buffer, { - name, - schema: "main", - ...options - }); - await conn.close(); - - return this; - } + return this; + } - // Create a database from FileArrachments - static async of(files = []) { - const db = await makeDB(); - await db.open({ - query: { - castTimestampToDate: true - } - }); + async insertParquet(name, buffer) { + const db = await this.db(); + await db.registerFileBuffer(name, new Uint8Array(buffer)); + const conn = await db.connect(); + await conn.query( + `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')` + ); + await conn.close(); + + return this; + } + + async insertArrowTable(name, table, options) { + const buffer = arrow.tableToIPC(table); + return this.insertArrowFromIPCStream(name, buffer, options); + } - const toName = (file) => - file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, ""); // remove the "@X" versions Observable adds to file names + async insertArrowFromIPCStream(name, buffer, options) { + const db = await this.db(); + const conn = await db.connect(); + await conn.insertArrowFromIPCStream(buffer, { + name, + schema: "main", + ...options + }); + await conn.close(); - if (files.constructor.name === "FileAttachment") { - files = [[toName(files), files]]; - } else if (!Array.isArray(files)) { - files = Object.entries(files); + return this; } - // Add all files to the database. Import JSON and CSV. Create view for Parquet. - await Promise.all( - files.map(async (entry) => { - let file; - let name; - let options = {}; - - if (Array.isArray(entry)) { - [name, file] = entry; - if (file.hasOwnProperty("file")) { - ({ file, ...options } = file); - } - } else if (entry.constructor.name === "FileAttachment") { - [name, file] = [toName(entry), entry]; - } else if (typeof entry === "object") { - ({ file, name, ...options } = entry); - name = name ?? toName(file); - } else { - console.error("Unrecognized entry", entry); + // Create a database from FileArrachments + static async of(files = []) { + const db = await makeDB(); + await db.open({ + query: { + castTimestampToDate: true } + }); - if (!file.url && Array.isArray(file)) { - const data = file; - // file = { name: name + ".json" }; - // db.registerFileText(`${name}.json`, JSON.stringify(data)); + const toName = (file) => + file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, ""); // remove the "@X" versions Observable adds to file names - const table = arrow.tableFromJSON(data); - const buffer = arrow.tableToIPC(table); + if (files.constructor.name === "FileAttachment") { + files = [ + [toName(files), files] + ]; + } else if (!Array.isArray(files)) { + files = Object.entries(files); + } - const conn = await db.connect(); - await conn.insertArrowFromIPCStream(buffer, { - name, - schema: "main", - ...options - }); - await conn.close(); - return; - } else { - const url = await file.url(); - if (url.indexOf("blob:") === 0) { - const buffer = await file.arrayBuffer(); - await db.registerFileBuffer(file.name, new Uint8Array(buffer)); + // Add all files to the database. Import JSON and CSV. Create view for Parquet. + await Promise.all( + files.map(async (entry) => { + let file; + let name; + let options = {}; + + if (Array.isArray(entry)) { + [name, file] = entry; + if (file.hasOwnProperty("file")) { + ({ + file, + ...options + } = file); + } + } else if (entry.constructor.name === "FileAttachment") { + [name, file] = [toName(entry), entry]; + } else if (typeof entry === "object") { + ({ + file, + name, + ...options + } = entry); + name = name ? ? toName(file); } else { - await db.registerFileURL(file.name, url); + console.error("Unrecognized entry", entry); } - } - const conn = await db.connect(); - if (file.name.endsWith(".csv")) { - await conn.insertCSVFromPath(file.name, { - name, - schema: "main", - ...options - }); - } else if (file.name.endsWith(".json")) { - await conn.insertJSONFromPath(file.name, { - name, - schema: "main", - ...options - }); - } else if (file.name.endsWith(".parquet")) { - await conn.query( - `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')` - ); - } else { - console.warn(`Don't know how to handle file type of ${file.name}`); - } - await conn.close(); - }) - ); + if (!file.url && Array.isArray(file)) { + const data = file; + // file = { name: name + ".json" }; + // db.registerFileText(`${name}.json`, JSON.stringify(data)); + + const table = arrow.tableFromJSON(data); + const buffer = arrow.tableToIPC(table); + + const conn = await db.connect(); + await conn.insertArrowFromIPCStream(buffer, { + name, + schema: "main", + ...options + }); + await conn.close(); + return; + } else { + const url = await file.url(); + if (url.indexOf("blob:") === 0) { + const buffer = await file.arrayBuffer(); + await db.registerFileBuffer(file.name, new Uint8Array(buffer)); + } else { + await db.registerFileURL(file.name, url); + } + } + + const conn = await db.connect(); + if (file.name.endsWith(".csv")) { + await conn.insertCSVFromPath(file.name, { + name, + schema: "main", + ...options + }); + } else if (file.name.endsWith(".json")) { + await conn.insertJSONFromPath(file.name, { + name, + schema: "main", + ...options + }); + } else if (file.name.endsWith(".parquet")) { + await conn.query( + `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')` + ); + } else { + console.warn(`Don't know how to handle file type of ${file.name}`); + } + await conn.close(); + }) + ); - return new DuckDBClient(db); + return new DuckDBClient(db); + } } -} - return DuckDBClient - + return DuckDBClient; } -function getType (type) { +function getType(type) { const typeLower = type.toLowerCase(); switch (typeLower) { case "bigint": @@ -388,13 +424,15 @@ function getType (type) { } function element(name, props, children) { - if (arguments.length === 2) children = props, props = undefined; - const element = document.createElement(name); - if (props !== undefined) for (const p in props) element[p] = props[p]; - if (children !== undefined) for (const c of children) element.appendChild(c); - return element; + if (arguments.length === 2) children = props, props = undefined; + const element = document.createElement(name); + if (props !== undefined) + for (const p in props) element[p] = props[p]; + if (children !== undefined) + for (const c of children) element.appendChild(c); + return element; } function text(value) { -return document.createTextNode(value); + return document.createTextNode(value); } \ No newline at end of file From 198cfbbf845156d2e87e17cc63a0971ea0a0a901 Mon Sep 17 00:00:00 2001 From: Mike Bostock Date: Wed, 2 Nov 2022 15:15:58 -0700 Subject: [PATCH 4/8] DuckDBClient --- .eslintrc.json | 2 +- bin/resolve-dependencies | 10 +- rollup.config.js | 1 + src/dependencies.mjs | 12 +- src/duckdb.mjs | 548 ++++++++++++++------------------------- src/fileAttachment.mjs | 4 +- src/library.mjs | 10 +- test/index-test.mjs | 2 +- 8 files changed, 217 insertions(+), 372 deletions(-) diff --git a/.eslintrc.json b/.eslintrc.json index 0cc3e774..48aa23da 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -2,7 +2,7 @@ "extends": "eslint:recommended", "parserOptions": { "sourceType": "module", - "ecmaVersion": 2018 + "ecmaVersion": 2020 }, "env": { "es6": true, diff --git a/bin/resolve-dependencies b/bin/resolve-dependencies index 4f71475a..d3b22bf9 100755 --- a/bin/resolve-dependencies +++ b/bin/resolve-dependencies @@ -64,7 +64,11 @@ const mains = ["unpkg", "jsdelivr", "browser", "main"]; } { const package = await resolve("apache-arrow@4"); - console.log(`export const arrow = dependency("${package.name}", "${package.version}", "${package.export}");`); + console.log(`export const arrow4 = dependency("${package.name}", "${package.version}", "${package.export}");`); + } + { + const package = await resolve("apache-arrow@9"); + console.log(`export const arrow9 = dependency("${package.name}", "${package.version}", "+esm");`); } { const package = await resolve("arquero"); @@ -87,8 +91,8 @@ const mains = ["unpkg", "jsdelivr", "browser", "main"]; console.log(`export const leaflet = dependency("${package.name}", "${package.version}", "${package.export.replace(/-src\.js$/, ".js")}");`); } { - const package = await resolve("@duckdb/duckdb-wasm@1.17.0"); - console.log(`export const duckdb = dependency("${package.name}", "${package.version}", "${package.export}");`); + const package = await resolve("@duckdb/duckdb-wasm"); + console.log(`export const duckdb = dependency("${package.name}", "${package.version}", "+esm");`); } })(); diff --git a/rollup.config.js b/rollup.config.js index 8a46a9b1..4af5c2a5 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -15,6 +15,7 @@ export default [ reserved: [ "FileAttachment", "RequireError", + "DuckDBClient", "SQLiteDatabaseClient", "Workbook", "ZipArchive", diff --git a/src/dependencies.mjs b/src/dependencies.mjs index b7c6bd49..9916bbf2 100644 --- a/src/dependencies.mjs +++ b/src/dependencies.mjs @@ -1,5 +1,4 @@ import dependency from "./dependency.mjs"; -import * as ddb from "@duckdb/duckdb-wasm"; export const d3 = dependency("d3", "7.6.1", "dist/d3.min.js"); export const inputs = dependency("@observablehq/inputs", "0.10.4", "dist/inputs.min.js"); export const plot = dependency("@observablehq/plot", "0.6.0", "dist/plot.umd.min.js"); @@ -14,16 +13,11 @@ export const sql = dependency("sql.js", "1.7.0", "dist/sql-wasm.js"); export const vega = dependency("vega", "5.22.1", "build/vega.min.js"); export const vegalite = dependency("vega-lite", "5.5.0", "build/vega-lite.min.js"); export const vegaliteApi = dependency("vega-lite-api", "5.0.0", "build/vega-lite-api.min.js"); -export const arrow = dependency("apache-arrow", "^8", "Arrow.es2015.min.js"); +export const arrow4 = dependency("apache-arrow", "4.0.1", "Arrow.es2015.min.js"); +export const arrow9 = dependency("apache-arrow", "9.0.0", "+esm"); export const arquero = dependency("arquero", "4.8.8", "dist/arquero.min.js"); export const topojson = dependency("topojson-client", "3.1.0", "dist/topojson-client.min.js"); export const exceljs = dependency("exceljs", "4.3.0", "dist/exceljs.min.js"); export const mermaid = dependency("mermaid", "9.1.6", "dist/mermaid.min.js"); export const leaflet = dependency("leaflet", "1.8.0", "dist/leaflet.js"); -export const duckdb = ddb; -// export const duckdb = dependency("@duckdb/duckdb-wasm", "1.17.0", "dist/duckdb-browser.cjs"); -// export const duckdb = dependency("@duckdb/duckdb-wasm", "1.17.0", ""); -// export const duckdb = dependency("@duckdb/duckdb-wasm", "1.17.0", "?min"); -// tried: dependency("@duckdb/duckdb-wasm", "1.17.0", "+esm"); -// dependency("@duckdb/duckdb-wasm", "1.17.0", "dist/duckdb-browser.cjs"); -// dependency("@duckdb/duckdb-wasm", "1.17.0", "dist/duckdb-browser.mjs"); \ No newline at end of file +export const duckdb = dependency("@duckdb/duckdb-wasm", "1.17.0", "+esm"); diff --git a/src/duckdb.mjs b/src/duckdb.mjs index b05c623e..aefb1203 100644 --- a/src/duckdb.mjs +++ b/src/duckdb.mjs @@ -1,361 +1,225 @@ -// TODO I wasn't able to use the `.resolve()` approach with a .mjs file -import { - arrow as arr, - duckdb as duck -} from "./dependencies.mjs"; - -export default async function duckdb(require) { - - const arrow = await require(arr.resolve()); // TODO is this right...? - const bundles = await duck.getJsDelivrBundles(); - const bundle = await duck.selectBundle(bundles); - async function makeDB() { - const logger = new duck.ConsoleLogger(); - const worker = await duck.createWorker(bundle.mainWorker); - const db = new duck.AsyncDuckDB(logger, worker); - await db.instantiate(bundle.mainModule); - return db; +import {arrow9 as arrow, duckdb} from "./dependencies.mjs"; +import {FileAttachment} from "./fileAttachment.mjs"; + +// Adapted from https://observablehq.com/@cmudig/duckdb-client +// Copyright 2021 CMU Data Interaction Group +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors +// may be used to endorse or promote products derived from this software +// without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +// TODO Allow this to be overridden using the Library’s resolver. +const cdn = "https://cdn.observableusercontent.com/npm/"; + +export class DuckDBClient { + constructor(db) { + Object.defineProperties(this, { + _db: {value: db} + }); } - // Adapted from: https://observablehq.com/@cmudig/duckdb-client - // Follows the DatabaseClient specification: https://observablehq.com/@observablehq/database-client-specification - class DuckDBClient { - constructor(_db) { - this._db = _db; - this._counter = 0; + async queryStream(query, params) { + const connection = await this._db.connect(); + let reader, schema, batch; + try { + reader = await connection.send(query, params); + batch = await reader.next(); + if (batch.done) throw new Error("missing first batch"); + schema = batch.value.schema; + } catch (error) { + await connection.close(); + throw error; } - - async queryStream(query, params) { - const conn = await this.connection(); - let result; - - if (params) { - const stmt = await conn.prepare(query); - result = await stmt.query(...params); - } else { - result = await conn.query(query); - } - // Populate the schema of the results - const schema = result.schema.fields.map(({ - name, - type - }) => ({ + return { + schema: schema.fields.map(({name, type}) => ({ name, type: getType(String(type)), databaseType: String(type) - })); - return { - schema, - async *readRows() { - let rows = result.toArray().map((r) => Object.fromEntries(r)); - yield rows; - } - }; - } - - // This function gets called to prepare the `query` parameter of the `queryStream` method - queryTag(strings, ...params) { - return [strings.join("?"), params]; - } - - escape(name) { - return `"${name}"`; - } - - async describeTables() { - const conn = await this.connection(); - const tables = (await conn.query(`SHOW TABLES`)).toArray(); - return tables.map(({ - name - }) => ({ - name - })); - } - - async describeColumns({ - table - } = {}) { - const conn = await this.connection(); - const columns = (await conn.query(`DESCRIBE ${table}`)).toArray(); - return columns.map(({ - column_name, - column_type - }) => { - return { - name: column_name, - type: getType(column_type), - databaseType: column_type - }; - }); - } - - async db() { - if (!this._db) { - this._db = await makeDB(); - await this._db.open({ - query: { - castTimestampToDate: true + })), + async *readRows() { + try { + while (!batch.done) { + yield batch.value.toArray(); + batch = await reader.next(); } - }); - } - return this._db; - } - - async connection() { - if (!this._conn) { - const db = await this.db(); - this._conn = await db.connect(); - } - return this._conn; - } - - async reconnect() { - if (this._conn) { - this._conn.close(); + } finally { + await connection.close(); + } } - delete this._conn; - } - - // The `.queryStream` function will supercede this for SQL and Table cells - // Keeping this for backwards compatibility - async query(query, params) { - const conn = await this.connection(); - let result; + }; + } - if (params) { - const stmt = await conn.prepare(query); - result = stmt.query(...params); - } else { - result = await conn.query(query); + async query(query, params) { + const result = await this.queryStream(query, params); + const results = []; + for await (const rows of result.readRows()) { + for (const row of rows) { + results.push(row); } - return result; - } - - // The `.queryStream` function will supercede this for SQL and Table cells - // Keeping this for backwards compatibility - async sql(strings, ...args) { - // expected to be used like db.sql`select * from table where foo = ${param}` - - const results = await this.query(strings.join("?"), args); - - // return rows as a JavaScript array of objects for now - let rows = results.toArray().map(Object.fromEntries); - rows.columns = results.schema.fields.map((d) => d.name); - return rows; - } - - async table(query, params, opts) { - const result = await this.query(query, params); - return Inputs.table(result, { - layout: "auto", - ...(opts || {}) - }); - } - - // get the client after the query ran - async client(query, params) { - await this.query(query, params); - return this; - } - - // query a single row - async queryRow(query, params) { - const key = `Query ${this._counter++}: ${query}`; - const conn = await this.connection(); - // use send as we can stop iterating after we get the first batch - const result = await conn.send(query, params); - const batch = (await result.next()).value; - return batch && batch.get(0); - } - - async explain(query, params) { - const row = await this.queryRow(`EXPLAIN ${query}`, params); - return element("pre", { - className: "observablehq--inspect" - }, [ - text(row["explain_value"]) - ]); - } - - // Describe the database (no arg) or a table - async describe(object) { - const result = await (object === undefined ? - this.query(`SHOW TABLES`) : - this.query(`DESCRIBE ${object}`)); - return Inputs.table(result); } + results.schema = result.schema; + return results; + } - // Summarize a query result - async summarize(query) { - const result = await this.query(`SUMMARIZE ${query}`); - return Inputs.table(result); - } + async queryRow(query, params) { + const results = await this.query(query, params); + return results.length ? results[0] : null; + } - async insertJSON(name, buffer, options) { - const db = await this.db(); - await db.registerFileBuffer(name, new Uint8Array(buffer)); - const conn = await db.connect(); - await conn.insertJSONFromPath(name, { - name, - schema: "main", - ...options - }); - await conn.close(); + async sql(strings, ...args) { + return await this.query(strings.join("?"), args); + } - return this; - } + queryTag(strings, ...params) { + return [strings.join("?"), params]; + } - async insertCSV(name, buffer, options) { - const db = await this.db(); - await db.registerFileBuffer(name, new Uint8Array(buffer)); - const conn = await db.connect(); - await conn.insertCSVFromPath(name, { - name, - schema: "main", - ...options - }); - await conn.close(); + escape(name) { + return `"${name}"`; + } - return this; - } + async describeTables() { + const tables = await this.query(`SHOW TABLES`); + return tables.map(({name}) => ({name})); + } - async insertParquet(name, buffer) { - const db = await this.db(); - await db.registerFileBuffer(name, new Uint8Array(buffer)); - const conn = await db.connect(); - await conn.query( - `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${name}')` - ); - await conn.close(); + async describeColumns({table} = {}) { + const columns = await this.query(`DESCRIBE ${table}`); + return columns.map(({column_name, column_type}) => { + return { + name: column_name, + type: getType(column_type), + databaseType: column_type + }; + }); + } - return this; - } + static async of(sources = {}, config = {}) { + const db = await createDuckDB(); + await db.open(config); + await Promise.all( + Object.entries(sources).map(async ([name, source]) => { + if ("array" in source) { // array + options + const {array, ...options} = source; + await insertArray(db, name, array, options); + } else if ("file" in source) { // file + options + const {file, ...options} = source; + await insertFile(db, name, file, options); + } else if (source instanceof FileAttachment) { // bare file + await insertFile(db, name, source); + } else if (Array.isArray(source)) { // bare data + await insertArray(db, name, source); + } else { + throw new Error(`invalid source: ${source}`); + } + }) + ); + return new DuckDBClient(db); + } +} - async insertArrowTable(name, table, options) { - const buffer = arrow.tableToIPC(table); - return this.insertArrowFromIPCStream(name, buffer, options); +async function insertFile(database, name, file, options) { + const url = await file.url(); + if (url.startsWith("blob:")) { + const buffer = await file.arrayBuffer(); + await database.registerFileBuffer(file.name, new Uint8Array(buffer)); + } else { + await database.registerFileURL(file.name, url); + } + const connection = await database.connect(); + try { + switch (file.mimeType) { + case "text/csv": + await connection.insertCSVFromPath(file.name, { + name, + schema: "main", + ...options + }); + break; + case "application/json": + await connection.insertJSONFromPath(file.name, { + name, + schema: "main", + ...options + }); + break; + default: + if (file.name.endsWith(".parquet")) { + await connection.query( + `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')` + ); + } else { + throw new Error(`unknown file type: ${file.mimeType}`); + } } + } finally { + await connection.close(); + } +} - async insertArrowFromIPCStream(name, buffer, options) { - const db = await this.db(); - const conn = await db.connect(); - await conn.insertArrowFromIPCStream(buffer, { - name, - schema: "main", - ...options - }); - await conn.close(); +async function insertArray(database, name, array, options) { + const arrow = await loadArrow(); + const table = arrow.tableFromJSON(array); + const buffer = arrow.tableToIPC(table); + const connection = await database.connect(); + try { + await connection.insertArrowFromIPCStream(buffer, { + name, + schema: "main", + ...options + }); + } finally { + await connection.close(); + } +} - return this; +async function createDuckDB() { + const duck = await import(`${cdn}${duckdb.resolve()}`); + const bundle = await duck.selectBundle({ + mvp: { + mainModule: `${cdn}${duckdb.resolve("dist/duckdb-mvp.wasm")}`, + mainWorker: `${cdn}${duckdb.resolve("dist/duckdb-browser-mvp.worker.js")}` + }, + eh: { + mainModule: `${cdn}${duckdb.resolve("dist/duckdb-eh.wasm")}`, + mainWorker: `${cdn}${duckdb.resolve("dist/duckdb-browser-eh.worker.js")}` } + }); + const logger = new duck.ConsoleLogger(); + const worker = await duck.createWorker(bundle.mainWorker); + const db = new duck.AsyncDuckDB(logger, worker); + await db.instantiate(bundle.mainModule); + return db; +} - // Create a database from FileArrachments - static async of(files = []) { - const db = await makeDB(); - await db.open({ - query: { - castTimestampToDate: true - } - }); - - const toName = (file) => - file.name.split(".").slice(0, -1).join(".").replace(/\@.+?/, ""); // remove the "@X" versions Observable adds to file names - - if (files.constructor.name === "FileAttachment") { - files = [ - [toName(files), files] - ]; - } else if (!Array.isArray(files)) { - files = Object.entries(files); - } - - // Add all files to the database. Import JSON and CSV. Create view for Parquet. - await Promise.all( - files.map(async (entry) => { - let file; - let name; - let options = {}; - - if (Array.isArray(entry)) { - [name, file] = entry; - if (file.hasOwnProperty("file")) { - ({ - file, - ...options - } = file); - } - } else if (entry.constructor.name === "FileAttachment") { - [name, file] = [toName(entry), entry]; - } else if (typeof entry === "object") { - ({ - file, - name, - ...options - } = entry); - name = name ? ? toName(file); - } else { - console.error("Unrecognized entry", entry); - } - - if (!file.url && Array.isArray(file)) { - const data = file; - // file = { name: name + ".json" }; - // db.registerFileText(`${name}.json`, JSON.stringify(data)); - - const table = arrow.tableFromJSON(data); - const buffer = arrow.tableToIPC(table); - - const conn = await db.connect(); - await conn.insertArrowFromIPCStream(buffer, { - name, - schema: "main", - ...options - }); - await conn.close(); - return; - } else { - const url = await file.url(); - if (url.indexOf("blob:") === 0) { - const buffer = await file.arrayBuffer(); - await db.registerFileBuffer(file.name, new Uint8Array(buffer)); - } else { - await db.registerFileURL(file.name, url); - } - } - - const conn = await db.connect(); - if (file.name.endsWith(".csv")) { - await conn.insertCSVFromPath(file.name, { - name, - schema: "main", - ...options - }); - } else if (file.name.endsWith(".json")) { - await conn.insertJSONFromPath(file.name, { - name, - schema: "main", - ...options - }); - } else if (file.name.endsWith(".parquet")) { - await conn.query( - `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')` - ); - } else { - console.warn(`Don't know how to handle file type of ${file.name}`); - } - await conn.close(); - }) - ); - - return new DuckDBClient(db); - } - } - return DuckDBClient; +async function loadArrow() { + return await import(`${cdn}${arrow.resolve()}`); } function getType(type) { - const typeLower = type.toLowerCase(); - switch (typeLower) { + switch (type.toLowerCase()) { case "bigint": case "int8": case "long": @@ -369,6 +233,7 @@ function getType(type) { case "real": case "float4": case "float": + case "float32": case "float64": return "number"; @@ -380,12 +245,6 @@ function getType(type) { case "uinteger": case "usmallint": case "utinyint": - case "smallint": - case "tinyint": - case "ubigint": - case "uinteger": - case "usmallint": - case "utinyint": case "int4": case "int": case "signed": @@ -408,6 +267,7 @@ function getType(type) { case "timestamp with time zone": case "datetime": case "timestamptz": + case "date64": return "date"; case "uuid": @@ -422,17 +282,3 @@ function getType(type) { return "other"; } } - -function element(name, props, children) { - if (arguments.length === 2) children = props, props = undefined; - const element = document.createElement(name); - if (props !== undefined) - for (const p in props) element[p] = props[p]; - if (children !== undefined) - for (const c of children) element.appendChild(c); - return element; -} - -function text(value) { - return document.createTextNode(value); -} \ No newline at end of file diff --git a/src/fileAttachment.mjs b/src/fileAttachment.mjs index 43ef6a79..cc6e24aa 100644 --- a/src/fileAttachment.mjs +++ b/src/fileAttachment.mjs @@ -1,5 +1,5 @@ import {autoType, csvParse, csvParseRows, tsvParse, tsvParseRows} from "d3-dsv"; -import {arrow, jszip, exceljs} from "./dependencies.mjs"; +import {arrow4, jszip, exceljs} from "./dependencies.mjs"; import {requireDefault} from "./require.mjs"; import {SQLiteDatabaseClient} from "./sqlite.mjs"; import {Workbook} from "./xlsx.mjs"; @@ -57,7 +57,7 @@ export class AbstractFile { }); } async arrow() { - const [Arrow, response] = await Promise.all([requireDefault(arrow.resolve()), remote_fetch(this)]); + const [Arrow, response] = await Promise.all([requireDefault(arrow4.resolve()), remote_fetch(this)]); // TODO upgrade to apache-arrow@9 return Arrow.Table.from(response); } async sqlite() { diff --git a/src/library.mjs b/src/library.mjs index 7558b3a9..06f26dfe 100644 --- a/src/library.mjs +++ b/src/library.mjs @@ -3,7 +3,7 @@ import DOM from "./dom/index.mjs"; import Files from "./files/index.mjs"; import {AbstractFile, FileAttachment, NoFileAttachments} from "./fileAttachment.mjs"; import Generators from "./generators/index.mjs"; -import duckdb from "./duckdb.mjs"; +import {DuckDBClient} from "./duckdb.mjs"; import html from "./html.mjs"; import leaflet from "./leaflet.mjs"; import md from "./md.mjs"; @@ -18,7 +18,7 @@ import svg from "./svg.mjs"; import tex from "./tex.mjs"; import vegalite from "./vegalite.mjs"; import width from "./width.mjs"; -import {arquero, arrow, d3, graphviz, htl, inputs, lodash, plot, topojson} from "./dependencies.mjs"; +import {arquero, arrow4, d3, graphviz, htl, inputs, lodash, plot, topojson} from "./dependencies.mjs"; import {__query} from "./table.mjs"; export default Object.assign(Object.defineProperties(function Library(resolver) { @@ -40,10 +40,10 @@ export default Object.assign(Object.defineProperties(function Library(resolver) // Recommended libraries // https://observablehq.com/@observablehq/recommended-libraries _: () => require(lodash.resolve()), - aq: () => require.alias({"apache-arrow": arrow.resolve()})(arquero.resolve()), - Arrow: () => require(arrow.resolve()), + aq: () => require.alias({"apache-arrow": arrow4.resolve()})(arquero.resolve()), // TODO upgrade to apache-arrow@9 + Arrow: () => require(arrow4.resolve()), // TODO upgrade to apache-arrow@9 d3: () => require(d3.resolve()), - DuckDBClient: () => duckdb(require), + DuckDBClient: () => DuckDBClient, Inputs: () => require(inputs.resolve()).then(Inputs => ({...Inputs, file: Inputs.fileOf(AbstractFile)})), L: () => leaflet(require), mermaid: () => mermaid(require), diff --git a/test/index-test.mjs b/test/index-test.mjs index 2e4bdbec..f58b18e6 100644 --- a/test/index-test.mjs +++ b/test/index-test.mjs @@ -43,6 +43,6 @@ it("new Library returns a library with the expected keys", () => { "topojson", "vl", "weather", - "width" + "width" ]); }); From a0fef0c392cbfa67ada6a65d91ddad46f11b3280 Mon Sep 17 00:00:00 2001 From: Mike Bostock Date: Wed, 2 Nov 2022 20:01:44 -0700 Subject: [PATCH 5/8] default query.castTimestampToDate --- src/duckdb.mjs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/duckdb.mjs b/src/duckdb.mjs index aefb1203..b5c5aca5 100644 --- a/src/duckdb.mjs +++ b/src/duckdb.mjs @@ -118,6 +118,9 @@ export class DuckDBClient { static async of(sources = {}, config = {}) { const db = await createDuckDB(); + if (config.query?.castTimestampToDate === undefined) { + config = {...config, query: {...config.query, castTimestampToDate: true}}; + } await db.open(config); await Promise.all( Object.entries(sources).map(async ([name, source]) => { From 6bf7aea168843801ddc42714afcc754d86142f25 Mon Sep 17 00:00:00 2001 From: Mike Bostock Date: Mon, 7 Nov 2022 10:56:16 -0800 Subject: [PATCH 6/8] optimize queryRow --- src/duckdb.mjs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/duckdb.mjs b/src/duckdb.mjs index b5c5aca5..d4f24bf0 100644 --- a/src/duckdb.mjs +++ b/src/duckdb.mjs @@ -84,8 +84,14 @@ export class DuckDBClient { } async queryRow(query, params) { - const results = await this.query(query, params); - return results.length ? results[0] : null; + const result = await this.queryStream(query, params); + const reader = result.readRows(); + try { + const {done, value} = await reader.next(); + return done || !value.length ? null : value[0]; + } finally { + await reader.return(); + } } async sql(strings, ...args) { From 990c240d551dfe3a7ecc1224127e0a8a68c39cbf Mon Sep 17 00:00:00 2001 From: Mike Bostock Date: Mon, 7 Nov 2022 19:38:15 -0800 Subject: [PATCH 7/8] use arrow types for schema --- src/arrow.mjs | 44 +++++++++++++++++++++ src/duckdb.mjs | 101 ++++++++++++++++--------------------------------- 2 files changed, 77 insertions(+), 68 deletions(-) create mode 100644 src/arrow.mjs diff --git a/src/arrow.mjs b/src/arrow.mjs new file mode 100644 index 00000000..03121fa8 --- /dev/null +++ b/src/arrow.mjs @@ -0,0 +1,44 @@ +export function getArrowTableSchema(table) { + return table.schema.fields.map(getArrowFieldSchema); +} + +function getArrowFieldSchema(field) { + return { + name: field.name, + type: getArrowType(field.type), + nullable: field.nullable, + databaseType: String(field.type) + }; +} + +// https://github.com/apache/arrow/blob/89f9a0948961f6e94f1ef5e4f310b707d22a3c11/js/src/enum.ts#L140-L141 +function getArrowType(type) { + switch (type.typeId) { + case 2: // Int + return "integer"; + case 3: // Float + case 7: // Decimal + return "number"; + case 4: // Binary + case 15: // FixedSizeBinary + return "buffer"; + case 5: // Utf8 + return "string"; + case 6: // Bool + return "boolean"; + case 8: // Date + case 9: // Time + case 10: // Timestamp + return "date"; + case 12: // List + case 16: // FixedSizeList + return "array"; + case 13: // Struct + case 14: // Union + return "object"; + case 11: // Interval + case 17: // Map + default: + return "other"; + } +} diff --git a/src/duckdb.mjs b/src/duckdb.mjs index d4f24bf0..0c8816a2 100644 --- a/src/duckdb.mjs +++ b/src/duckdb.mjs @@ -1,3 +1,4 @@ +import {getArrowTableSchema} from "./arrow.mjs"; import {arrow9 as arrow, duckdb} from "./dependencies.mjs"; import {FileAttachment} from "./fileAttachment.mjs"; @@ -42,22 +43,17 @@ export class DuckDBClient { async queryStream(query, params) { const connection = await this._db.connect(); - let reader, schema, batch; + let reader, batch; try { reader = await connection.send(query, params); batch = await reader.next(); if (batch.done) throw new Error("missing first batch"); - schema = batch.value.schema; } catch (error) { await connection.close(); throw error; } return { - schema: schema.fields.map(({name, type}) => ({ - name, - type: getType(String(type)), - databaseType: String(type) - })), + schema: getArrowTableSchema(batch.value), async *readRows() { try { while (!batch.done) { @@ -113,13 +109,12 @@ export class DuckDBClient { async describeColumns({table} = {}) { const columns = await this.query(`DESCRIBE ${table}`); - return columns.map(({column_name, column_type}) => { - return { - name: column_name, - type: getType(column_type), - databaseType: column_type - }; - }); + return columns.map(({column_name, column_type, null: nullable}) => ({ + name: column_name, + type: getDuckDBType(column_type), + nullable: nullable !== "NO", + databaseType: column_type + })); } static async of(sources = {}, config = {}) { @@ -227,67 +222,37 @@ async function loadArrow() { return await import(`${cdn}${arrow.resolve()}`); } -function getType(type) { - switch (type.toLowerCase()) { - case "bigint": - case "int8": - case "long": +// https://duckdb.org/docs/sql/data_types/overview +function getDuckDBType(type) { + switch (type) { + case "BIGINT": + case "HUGEINT": + case "UBIGINT": return "bigint"; - - case "double": - case "float8": - case "numeric": - case "decimal": - case "decimal(s, p)": - case "real": - case "float4": - case "float": - case "float32": - case "float64": + case "DOUBLE": + case "REAL": return "number"; - - case "hugeint": - case "integer": - case "smallint": - case "tinyint": - case "ubigint": - case "uinteger": - case "usmallint": - case "utinyint": - case "int4": - case "int": - case "signed": - case "int2": - case "short": - case "int1": - case "int64": - case "int32": + case "INTEGER": + case "SMALLINT": + case "TINYINT": + case "USMALLINT": + case "UINTEGER": + case "UTINYINT": return "integer"; - - case "boolean": - case "bool": - case "logical": + case "BOOLEAN": return "boolean"; - - case "date": - case "interval": // date or time delta - case "time": - case "timestamp": - case "timestamp with time zone": - case "datetime": - case "timestamptz": - case "date64": + case "DATE": + case "TIMESTAMP": + case "TIMESTAMP WITH TIME ZONE": return "date"; - - case "uuid": - case "varchar": - case "char": - case "bpchar": - case "text": - case "string": - case "utf8": // this type is unlisted in the `types`, but is returned by the db as `column_type`... + case "VARCHAR": + case "UUID": return "string"; + // case "BLOB": + // case "INTERVAL": + // case "TIME": default: + if (/^DECIMAL\(/.test(type)) return "integer"; return "other"; } } From 77ccac0e29484042c269b06e4fd503e357b76d2f Mon Sep 17 00:00:00 2001 From: Mike Bostock Date: Mon, 7 Nov 2022 20:06:24 -0800 Subject: [PATCH 8/8] Apache Arrow for DuckDBClient --- src/arrow.mjs | 14 +++++++++++ src/duckdb.mjs | 56 ++++++++++++++++++++++++++---------------- src/fileAttachment.mjs | 19 ++++++++++---- src/index.mjs | 1 + src/require.mjs | 3 +++ 5 files changed, 67 insertions(+), 26 deletions(-) diff --git a/src/arrow.mjs b/src/arrow.mjs index 03121fa8..7ed60993 100644 --- a/src/arrow.mjs +++ b/src/arrow.mjs @@ -1,3 +1,17 @@ +// Returns true if the vaue is an Apache Arrow table. This uses a “duck” test +// (instead of strict instanceof) because we want it to work with a range of +// Apache Arrow versions at least 7.0.0 or above. +// https://arrow.apache.org/docs/7.0/js/classes/Arrow_dom.Table.html +export function isArrowTable(value) { + return ( + value && + typeof value.getChild === "function" && + typeof value.toArray === "function" && + value.schema && + Array.isArray(value.schema.fields) + ); +} + export function getArrowTableSchema(table) { return table.schema.fields.map(getArrowFieldSchema); } diff --git a/src/duckdb.mjs b/src/duckdb.mjs index 0c8816a2..c81432e3 100644 --- a/src/duckdb.mjs +++ b/src/duckdb.mjs @@ -1,6 +1,7 @@ -import {getArrowTableSchema} from "./arrow.mjs"; +import {getArrowTableSchema, isArrowTable} from "./arrow.mjs"; import {arrow9 as arrow, duckdb} from "./dependencies.mjs"; import {FileAttachment} from "./fileAttachment.mjs"; +import {cdn} from "./require.mjs"; // Adapted from https://observablehq.com/@cmudig/duckdb-client // Copyright 2021 CMU Data Interaction Group @@ -31,9 +32,6 @@ import {FileAttachment} from "./fileAttachment.mjs"; // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. -// TODO Allow this to be overridden using the Library’s resolver. -const cdn = "https://cdn.observableusercontent.com/npm/"; - export class DuckDBClient { constructor(db) { Object.defineProperties(this, { @@ -125,16 +123,22 @@ export class DuckDBClient { await db.open(config); await Promise.all( Object.entries(sources).map(async ([name, source]) => { - if ("array" in source) { // array + options - const {array, ...options} = source; - await insertArray(db, name, array, options); + if (source instanceof FileAttachment) { // bare file + await insertFile(db, name, source); + } else if (isArrowTable(source)) { // bare arrow table + await insertArrowTable(db, name, source); + } else if (Array.isArray(source)) { // bare array of objects + await insertArray(db, name, source); + } else if ("data" in source) { // data + options + const {data, ...options} = source; + if (isArrowTable(data)) { + await insertArrowTable(db, name, data, options); + } else { + await insertArray(db, name, data, options); + } } else if ("file" in source) { // file + options const {file, ...options} = source; await insertFile(db, name, file, options); - } else if (source instanceof FileAttachment) { // bare file - await insertFile(db, name, source); - } else if (Array.isArray(source)) { // bare data - await insertArray(db, name, source); } else { throw new Error(`invalid source: ${source}`); } @@ -156,36 +160,40 @@ async function insertFile(database, name, file, options) { try { switch (file.mimeType) { case "text/csv": - await connection.insertCSVFromPath(file.name, { + return await connection.insertCSVFromPath(file.name, { name, schema: "main", ...options }); - break; case "application/json": - await connection.insertJSONFromPath(file.name, { + return await connection.insertJSONFromPath(file.name, { name, schema: "main", ...options }); - break; default: - if (file.name.endsWith(".parquet")) { - await connection.query( + if (/\.arrow$/i.test(file.name)) { + const buffer = new Uint8Array(await file.arrayBuffer()); + return await connection.insertArrowFromIPCStream(buffer, { + name, + schema: "main", + ...options + }); + } + if (/\.parquet$/i.test(file.name)) { + return await connection.query( `CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')` ); - } else { - throw new Error(`unknown file type: ${file.mimeType}`); } + throw new Error(`unknown file type: ${file.mimeType}`); } } finally { await connection.close(); } } -async function insertArray(database, name, array, options) { +async function insertArrowTable(database, name, table, options) { const arrow = await loadArrow(); - const table = arrow.tableFromJSON(array); const buffer = arrow.tableToIPC(table); const connection = await database.connect(); try { @@ -199,6 +207,12 @@ async function insertArray(database, name, array, options) { } } +async function insertArray(database, name, array, options) { + const arrow = await loadArrow(); + const table = arrow.tableFromJSON(array); + return await insertArrowTable(database, name, table, options); +} + async function createDuckDB() { const duck = await import(`${cdn}${duckdb.resolve()}`); const bundle = await duck.selectBundle({ diff --git a/src/fileAttachment.mjs b/src/fileAttachment.mjs index cc6e24aa..fbef1cf2 100644 --- a/src/fileAttachment.mjs +++ b/src/fileAttachment.mjs @@ -1,6 +1,6 @@ import {autoType, csvParse, csvParseRows, tsvParse, tsvParseRows} from "d3-dsv"; -import {arrow4, jszip, exceljs} from "./dependencies.mjs"; -import {requireDefault} from "./require.mjs"; +import {arrow4, arrow9, jszip, exceljs} from "./dependencies.mjs"; +import {cdn, requireDefault} from "./require.mjs"; import {SQLiteDatabaseClient} from "./sqlite.mjs"; import {Workbook} from "./xlsx.mjs"; @@ -56,9 +56,18 @@ export class AbstractFile { i.src = url; }); } - async arrow() { - const [Arrow, response] = await Promise.all([requireDefault(arrow4.resolve()), remote_fetch(this)]); // TODO upgrade to apache-arrow@9 - return Arrow.Table.from(response); + async arrow({version = 4} = {}) { + switch (version) { + case 4: { + const [Arrow, response] = await Promise.all([requireDefault(arrow4.resolve()), remote_fetch(this)]); + return Arrow.Table.from(response); + } + case 9: { + const [Arrow, response] = await Promise.all([import(`${cdn}${arrow9.resolve()}`), remote_fetch(this)]); + return Arrow.tableFromIPC(response); + } + default: throw new Error(`unsupported arrow version: ${version}`); + } } async sqlite() { return SQLiteDatabaseClient.open(remote_fetch(this)); diff --git a/src/index.mjs b/src/index.mjs index cd2d65ff..9a158a98 100644 --- a/src/index.mjs +++ b/src/index.mjs @@ -1,3 +1,4 @@ export {default as FileAttachments, AbstractFile} from "./fileAttachment.mjs"; export {default as Library} from "./library.mjs"; +export {getArrowTableSchema, isArrowTable} from "./arrow.mjs"; export {makeQueryTemplate, loadDataSource, arrayIsPrimitive, isDataArray, isDatabaseClient} from "./table.mjs"; diff --git a/src/require.mjs b/src/require.mjs index d75635a8..a826ef1e 100644 --- a/src/require.mjs +++ b/src/require.mjs @@ -1,5 +1,8 @@ import {require as initialRequire, requireFrom} from "d3-require"; +// TODO Allow this to be overridden using the Library’s resolver. +export const cdn = "https://cdn.observableusercontent.com/npm/"; + export let requireDefault = initialRequire; export function setDefaultRequire(require) {