1+ // TODO I wasn't able to use the `.resolve()` approach with a .mjs file
2+ import { arrow as arr , duckdb as duck } from "./dependencies.mjs" ;
3+
4+ export default async function duckdb ( require ) {
5+
6+ const arrow = await require ( arr . resolve ( ) ) ; // TODO is this right...?
7+ const bundles = await duck . getJsDelivrBundles ( ) ;
8+ const bundle = await duck . selectBundle ( bundles ) ;
9+ async function makeDB ( ) {
10+ const logger = new duck . ConsoleLogger ( ) ;
11+ const worker = await duck . createWorker ( bundle . mainWorker ) ;
12+ const db = new duck . AsyncDuckDB ( logger , worker ) ;
13+ await db . instantiate ( bundle . mainModule ) ;
14+ return db ;
15+ }
16+
17+ // Adapted from: https://observablehq.com/@cmudig /duckdb-client
18+ // Follows the DatabaseClient specification: https://observablehq.com/@observablehq /database-client-specification
19+ class DuckDBClient {
20+ constructor ( _db ) {
21+ this . _db = _db ;
22+ this . _counter = 0 ;
23+ }
24+
25+ async queryStream ( query , params ) {
26+ const conn = await this . connection ( ) ;
27+ let result ;
28+
29+ if ( params ) {
30+ const stmt = await conn . prepare ( query ) ;
31+ result = await stmt . query ( ...params ) ;
32+ } else {
33+ result = await conn . query ( query ) ;
34+ }
35+ // Populate the schema of the results
36+ const schema = result . schema . fields . map ( ( { name, type } ) => ( {
37+ name,
38+ type : getType ( String ( type ) ) ,
39+ databaseType : String ( type )
40+ } ) ) ;
41+ return {
42+ schema,
43+ async * readRows ( ) {
44+ let rows = result . toArray ( ) . map ( ( r ) => Object . fromEntries ( r ) ) ;
45+ yield rows ;
46+ }
47+ } ;
48+ }
49+
50+ // This function gets called to prepare the `query` parameter of the `queryStream` method
51+ queryTag ( strings , ...params ) {
52+ return [ strings . join ( "?" ) , params ] ;
53+ }
54+ async db ( ) {
55+ if ( ! this . _db ) {
56+ this . _db = await makeDB ( ) ;
57+ await this . _db . open ( {
58+ query : {
59+ castTimestampToDate : true
60+ }
61+ } ) ;
62+ }
63+ return this . _db ;
64+ }
65+
66+ async connection ( ) {
67+ if ( ! this . _conn ) {
68+ const db = await this . db ( ) ;
69+ this . _conn = await db . connect ( ) ;
70+ }
71+ return this . _conn ;
72+ }
73+
74+ async reconnect ( ) {
75+ if ( this . _conn ) {
76+ this . _conn . close ( ) ;
77+ }
78+ delete this . _conn ;
79+ }
80+
81+ // query a single row
82+ async queryRow ( query , params ) {
83+ const key = `Query ${ this . _counter ++ } : ${ query } ` ;
84+
85+ console . time ( key ) ;
86+ const conn = await this . connection ( ) ;
87+ // use send as we can stop iterating after we get the first batch
88+ const result = await conn . send ( query , params ) ;
89+ const batch = ( await result . next ( ) ) . value ;
90+ console . timeEnd ( key ) ;
91+
92+ return batch ?. get ( 0 ) ;
93+ }
94+
95+ async insertJSON ( name , buffer , options ) {
96+ const db = await this . db ( ) ;
97+ await db . registerFileBuffer ( name , new Uint8Array ( buffer ) ) ;
98+ const conn = await db . connect ( ) ;
99+ await conn . insertJSONFromPath ( name , { name, schema : "main" , ...options } ) ;
100+ await conn . close ( ) ;
101+
102+ return this ;
103+ }
104+
105+ async insertCSV ( name , buffer , options ) {
106+ const db = await this . db ( ) ;
107+ await db . registerFileBuffer ( name , new Uint8Array ( buffer ) ) ;
108+ const conn = await db . connect ( ) ;
109+ await conn . insertCSVFromPath ( name , { name, schema : "main" , ...options } ) ;
110+ await conn . close ( ) ;
111+
112+ return this ;
113+ }
114+
115+ async insertParquet ( name , buffer ) {
116+ const db = await this . db ( ) ;
117+ await db . registerFileBuffer ( name , new Uint8Array ( buffer ) ) ;
118+ const conn = await db . connect ( ) ;
119+ await conn . query (
120+ `CREATE VIEW '${ name } ' AS SELECT * FROM parquet_scan('${ name } ')`
121+ ) ;
122+ await conn . close ( ) ;
123+
124+ return this ;
125+ }
126+
127+ async insertArrowTable ( name , table , options ) {
128+ const buffer = arrow . tableToIPC ( table ) ;
129+ return this . insertArrowFromIPCStream ( name , buffer , options ) ;
130+ }
131+
132+ async insertArrowFromIPCStream ( name , buffer , options ) {
133+ const db = await this . db ( ) ;
134+ const conn = await db . connect ( ) ;
135+ await conn . insertArrowFromIPCStream ( buffer , {
136+ name,
137+ schema : "main" ,
138+ ...options
139+ } ) ;
140+ await conn . close ( ) ;
141+
142+ return this ;
143+ }
144+
145+ // Create a database from FileArrachments
146+ static async of ( files = [ ] ) {
147+ const db = await makeDB ( ) ;
148+ await db . open ( {
149+ query : {
150+ castTimestampToDate : true
151+ }
152+ } ) ;
153+
154+ const toName = ( file ) =>
155+ file . name . split ( "." ) . slice ( 0 , - 1 ) . join ( "." ) . replace ( / \@ .+ ?/ , "" ) ; // remove the "@X" versions Observable adds to file names
156+
157+ if ( files . constructor . name === "FileAttachment" ) {
158+ files = [ [ toName ( files ) , files ] ] ;
159+ } else if ( ! Array . isArray ( files ) ) {
160+ files = Object . entries ( files ) ;
161+ }
162+
163+ // Add all files to the database. Import JSON and CSV. Create view for Parquet.
164+ await Promise . all (
165+ files . map ( async ( entry ) => {
166+ let file ;
167+ let name ;
168+ let options = { } ;
169+
170+ if ( Array . isArray ( entry ) ) {
171+ [ name , file ] = entry ;
172+ if ( file . hasOwnProperty ( "file" ) ) {
173+ ( { file, ...options } = file ) ;
174+ }
175+ } else if ( entry . constructor . name === "FileAttachment" ) {
176+ [ name , file ] = [ toName ( entry ) , entry ] ;
177+ } else if ( typeof entry === "object" ) {
178+ ( { file, name, ...options } = entry ) ;
179+ name = name ?? toName ( file ) ;
180+ } else {
181+ console . error ( "Unrecognized entry" , entry ) ;
182+ }
183+
184+ if ( ! file . url && Array . isArray ( file ) ) {
185+ const data = file ;
186+
187+ const table = arrow . tableFromJSON ( data ) ;
188+ const buffer = arrow . tableToIPC ( table ) ;
189+
190+ const conn = await db . connect ( ) ;
191+ await conn . insertArrowFromIPCStream ( buffer , {
192+ name,
193+ schema : "main" ,
194+ ...options
195+ } ) ;
196+ await conn . close ( ) ;
197+ return ;
198+ } else {
199+ const url = await file . url ( ) ;
200+ if ( url . indexOf ( "blob:" ) === 0 ) {
201+ const buffer = await file . arrayBuffer ( ) ;
202+ await db . registerFileBuffer ( file . name , new Uint8Array ( buffer ) ) ;
203+ } else {
204+ await db . registerFileURL ( file . name , url ) ;
205+ }
206+ }
207+
208+ const conn = await db . connect ( ) ;
209+ if ( file . name . endsWith ( ".csv" ) ) {
210+ await conn . insertCSVFromPath ( file . name , {
211+ name,
212+ schema : "main" ,
213+ ...options
214+ } ) ;
215+ } else if ( file . name . endsWith ( ".json" ) ) {
216+ await conn . insertJSONFromPath ( file . name , {
217+ name,
218+ schema : "main" ,
219+ ...options
220+ } ) ;
221+ } else if ( file . name . endsWith ( ".parquet" ) ) {
222+ await conn . query (
223+ `CREATE VIEW '${ name } ' AS SELECT * FROM parquet_scan('${ file . name } ')`
224+ ) ;
225+ } else {
226+ console . warn ( `Don't know how to handle file type of ${ file . name } ` ) ;
227+ }
228+ await conn . close ( ) ;
229+ } )
230+ ) ;
231+
232+ return new DuckDBClient ( db ) ;
233+ }
234+ }
235+ return function duckdb ( obj ) {
236+ return DuckDBClient . of ( obj )
237+ } ;
238+ }
239+
240+ function getType ( type ) {
241+ const typeLower = type . toLowerCase ( ) ;
242+ switch ( typeLower ) {
243+ case "bigint" :
244+ case "int8" :
245+ case "long" :
246+ return "bigint" ;
247+
248+ case "double" :
249+ case "float8" :
250+ case "numeric" :
251+ case "decimal" :
252+ case "decimal(s, p)" :
253+ case "real" :
254+ case "float4" :
255+ case "float" :
256+ case "float64" :
257+ return "number" ;
258+
259+ case "hugeint" :
260+ case "integer" :
261+ case "smallint" :
262+ case "tinyint" :
263+ case "ubigint" :
264+ case "uinteger" :
265+ case "usmallint" :
266+ case "utinyint" :
267+ case "smallint" :
268+ case "tinyint" :
269+ case "ubigint" :
270+ case "uinteger" :
271+ case "usmallint" :
272+ case "utinyint" :
273+ case "int4" :
274+ case "int" :
275+ case "signed" :
276+ case "int2" :
277+ case "short" :
278+ case "int1" :
279+ case "int64" :
280+ case "int32" :
281+ return "integer" ;
282+
283+ case "boolean" :
284+ case "bool" :
285+ case "logical" :
286+ return "boolean" ;
287+
288+ case "date" :
289+ case "interval" : // date or time delta
290+ case "time" :
291+ case "timestamp" :
292+ case "timestamp with time zone" :
293+ case "datetime" :
294+ case "timestamptz" :
295+ return "date" ;
296+
297+ case "uuid" :
298+ case "varchar" :
299+ case "char" :
300+ case "bpchar" :
301+ case "text" :
302+ case "string" :
303+ case "utf8" : // this type is unlisted in the `types`, but is returned by the db as `column_type`...
304+ return "string" ;
305+ default :
306+ return "other" ;
307+ }
308+ }
309+
310+ function element ( name , props , children ) {
311+ if ( arguments . length === 2 ) children = props , props = undefined ;
312+ const element = document . createElement ( name ) ;
313+ if ( props !== undefined ) for ( const p in props ) element [ p ] = props [ p ] ;
314+ if ( children !== undefined ) for ( const c of children ) element . appendChild ( c ) ;
315+ return element ;
316+ }
317+
318+ function text ( value ) {
319+ return document . createTextNode ( value ) ;
320+ }
0 commit comments