Skip to content

Commit 4018d2d

Browse files
committed
[FIX] Snowflake - Insert Multiple Rows
1 parent 8138a13 commit 4018d2d

File tree

6 files changed

+254
-12
lines changed

6 files changed

+254
-12
lines changed

components/snowflake/actions/execute-sql-query/execute-sql-query.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import snowflake from "../../snowflake.app.mjs";
22

33
export default {
44
name: "Execute SQL Query",
5-
version: "0.2.0",
5+
version: "0.2.1",
66
key: "snowflake-execute-sql-query",
77
description: "Execute a custom Snowflake query. See [our docs](https://pipedream.com/docs/databases/working-with-sql) to learn more about working with SQL in Pipedream.",
88
type: "action",

components/snowflake/actions/insert-multiple-rows/insert-multiple-rows.mjs

Lines changed: 113 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export default {
66
key: "snowflake-insert-multiple-rows",
77
name: "Insert Multiple Rows",
88
description: "Insert multiple rows into a table",
9-
version: "0.1.2",
9+
version: "0.1.3",
1010
props: {
1111
snowflake,
1212
database: {
@@ -50,29 +50,136 @@ export default {
5050
"values",
5151
],
5252
},
53+
batchSize: {
54+
type: "integer",
55+
label: "Batch Size",
56+
description: "Number of rows to process per batch. Automatically calculated based on data size if not specified. Recommended: `50-200` for wide tables, `100-500` for narrow tables.",
57+
optional: true,
58+
default: 100,
59+
min: 10,
60+
max: 1000,
61+
},
62+
maxPayloadSizeMB: {
63+
type: "integer",
64+
label: "Max Payload Size (MB)",
65+
description: "Maximum payload size per batch in MB. Helps prevent `413 Payload Too Large` errors.",
66+
optional: true,
67+
default: 5,
68+
min: 1,
69+
max: 10,
70+
},
71+
enableBatching: {
72+
type: "boolean",
73+
label: "Enable Batch Processing",
74+
description: "Enable automatic batch processing for large datasets. Disable only for small datasets (< 50 rows) or troubleshooting.",
75+
optional: true,
76+
default: true,
77+
},
5378
},
5479
async run({ $ }) {
5580
let rows = this.values;
5681

5782
let inputValidated = true;
5883

5984
if (!Array.isArray(rows)) {
60-
rows = JSON.parse(rows);
85+
try {
86+
rows = JSON.parse(rows);
87+
} catch (parseError) {
88+
throw new ConfigurationError("The row data could not be parsed as JSON. Please ensure it's a valid JSON array of arrays.");
89+
}
6190
}
6291

6392
if (!rows || !rows.length || !Array.isArray(rows)) {
6493
inputValidated = false;
6594
} else {
66-
rows.forEach((row) => { if (!Array.isArray(row)) { inputValidated = false; } });
95+
rows.forEach((row, index) => {
96+
if (!Array.isArray(row)) {
97+
console.log(`Row ${index + 1} is not an array:`, row);
98+
inputValidated = false;
99+
}
100+
});
67101
}
68102

69103
// Throw an error if input validation failed
70104
if (!inputValidated) {
71105
throw new ConfigurationError("The row data you passed is not an array of arrays. Please enter an array of arrays in the `Values` parameter above. If you're trying to add a single row to Snowflake, select the **Insert Single Row** action.");
72106
}
73107

74-
const response = await this.snowflake.insertRows(this.tableName, this.columns, this.values);
75-
$.export("$summary", `Successfully inserted ${this.values.length} rows in ${this.tableName}`);
76-
return response;
108+
const expectedColumnCount = this.columns.length;
109+
const invalidRows = rows.filter((row, index) => {
110+
if (row.length !== expectedColumnCount) {
111+
console.error(`Row ${index + 1} has ${row.length} values but ${expectedColumnCount} columns specified`);
112+
return true;
113+
}
114+
return false;
115+
});
116+
117+
if (invalidRows.length > 0) {
118+
throw new ConfigurationError(`${invalidRows.length} rows have a different number of values than the specified columns. Each row must have exactly ${expectedColumnCount} values to match the selected columns.`);
119+
}
120+
121+
// Add batch processing options
122+
const batchOptions = {
123+
batchSize: this.batchSize,
124+
maxPayloadSizeMB: this.maxPayloadSizeMB,
125+
enableBatching: this.enableBatching,
126+
};
127+
128+
try {
129+
const response = await this.snowflake.insertRows(
130+
this.tableName,
131+
this.columns,
132+
rows,
133+
batchOptions,
134+
);
135+
136+
// Handle different response formats (batched vs single insert)
137+
if (response.summary) {
138+
// Batched response
139+
const { summary } = response;
140+
$.export("$summary", `Successfully inserted ${summary.totalRowsProcessed} rows into ${this.tableName} using ${summary.totalBatches} batches`);
141+
142+
// Export detailed batch information
143+
$.export("batchDetails", {
144+
totalRows: summary.totalRows,
145+
totalBatches: summary.totalBatches,
146+
successfulBatches: summary.successfulBatches,
147+
failedBatches: summary.failedBatches,
148+
batchSize: summary.batchSize,
149+
processingTime: new Date().toISOString(),
150+
});
151+
152+
// Export batch results for debugging if needed
153+
$.export("batchResults", summary.results);
154+
155+
return response;
156+
157+
} else {
158+
// Single insert response (small dataset or batching disabled)
159+
$.export("$summary", `Successfully inserted ${rows.length} rows into ${this.tableName}`);
160+
return response;
161+
}
162+
163+
} catch (error) {
164+
// Enhanced error handling for batch processing
165+
if (error.summary) {
166+
// Partial failure in batch processing
167+
const { summary } = error;
168+
$.export("$summary", `Partial success: ${summary.totalRowsProcessed}/${summary.totalRows} rows inserted. ${summary.failedBatches} batches failed.`);
169+
$.export("batchDetails", summary);
170+
$.export("failedBatches", summary.results.filter((r) => !r.success));
171+
}
172+
173+
// Re-throw the error with additional context
174+
if (error.message.includes("413") || error.message.includes("Payload Too Large")) {
175+
throw new ConfigurationError(
176+
`Payload too large error detected. Try reducing the batch size (current: ${this.batchSize}) or enable batching if disabled. ` +
177+
`You're trying to insert ${rows.length} rows with ${this.columns.length} columns each. ` +
178+
`Original error: ${error.message}`,
179+
);
180+
}
181+
182+
throw error;
183+
}
77184
},
78185
};

components/snowflake/actions/insert-row/insert-row.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ export default {
55
key: "snowflake-insert-row",
66
name: "Insert Single Row",
77
description: "Insert a row into a table",
8-
version: "1.1.2",
8+
version: "1.1.3",
99
props: {
1010
snowflake,
1111
database: {

components/snowflake/common/utils.mjs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
function estimatePayloadSize(data) {
2+
try {
3+
const jsonString = JSON.stringify(data);
4+
// Use Buffer.byteLength for accurate size calculation in Node.js
5+
return Buffer.byteLength(jsonString, "utf8");
6+
} catch (error) {
7+
// Fallback estimation if JSON.stringify fails
8+
return data.length * 1000; // Conservative estimate
9+
}
10+
}
11+
12+
// Helper method to split rows into batches
13+
function createBatches(rows, batchSize) {
14+
const batches = [];
15+
for (let i = 0; i < rows.length; i += batchSize) {
16+
batches.push(rows.slice(i, i + batchSize));
17+
}
18+
return batches;
19+
}
20+
21+
export default {
22+
estimatePayloadSize,
23+
createBatches,
24+
};

components/snowflake/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@pipedream/snowflake",
3-
"version": "0.13.0",
3+
"version": "0.13.1",
44
"description": "Pipedream Snowflake Components",
55
"main": "snowflake.app.mjs",
66
"keywords": [

components/snowflake/snowflake.app.mjs

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import { createPrivateKey } from "crypto";
22
import { snowflake } from "@pipedream/snowflake-sdk";
33
import { promisify } from "util";
44
import {
5-
sqlProxy, sqlProp,
5+
sqlProxy, sqlProp, ConfigurationError,
66
} from "@pipedream/platform";
7+
import utils from "./common/utils.mjs";
78

89
snowflake.configure({
910
logLevel: "WARN",
@@ -368,13 +369,123 @@ export default {
368369
};
369370
return this.executeQuery(statement);
370371
},
371-
async insertRows(tableName, columns, binds) {
372-
const sqlText = `INSERT INTO ${tableName} (${columns.join(",")}) VALUES (${columns.map(() => "?").join(", ")});`;
372+
async _insertRowsOriginal(tableName, columns, values) {
373+
// Create placeholders for all rows
374+
const rowPlaceholders = values.map(() =>
375+
`(${columns.map(() => "?").join(", ")})`).join(", ");
376+
377+
const sqlText = `INSERT INTO ${tableName} (${columns.join(",")}) VALUES ${rowPlaceholders}`;
378+
379+
// Flatten all values into a single array for binding
380+
const binds = values.flat();
381+
373382
const statement = {
374383
sqlText,
375384
binds,
376385
};
386+
377387
return this.executeQuery(statement);
378388
},
389+
async insertRows(tableName, columns, values, options = {}) {
390+
const {
391+
batchSize = 100,
392+
maxPayloadSizeMB = 5,
393+
enableBatching = true,
394+
} = options;
395+
396+
// If batching is disabled or small dataset, use original approach
397+
if (!enableBatching || values.length <= 50) {
398+
return this._insertRowsOriginal(tableName, columns, values);
399+
}
400+
401+
// Estimate payload size for dynamic batch sizing
402+
const sampleRowData = values.slice(0, Math.min(10, values.length));
403+
const sampleSize = utils.estimatePayloadSize(sampleRowData);
404+
const avgRowSize = sampleSize / sampleRowData.length;
405+
const maxSizeBytes = maxPayloadSizeMB * 1024 * 1024;
406+
407+
// Calculate optimal batch size with safety margin
408+
const calculatedBatchSize = Math.floor((maxSizeBytes * 0.8) / avgRowSize);
409+
const optimalBatchSize = Math.min(
410+
Math.max(calculatedBatchSize, 10), // Minimum 10 rows per batch
411+
Math.min(batchSize, 500), // Maximum 500 rows per batch
412+
);
413+
414+
console.log(`Processing ${values.length} rows in batches of ${optimalBatchSize}`);
415+
416+
// Split into batches
417+
const batches = utils.createBatches(values, optimalBatchSize);
418+
419+
// Process batches sequentially
420+
const results = [];
421+
let totalRowsProcessed = 0;
422+
let successfulBatches = 0;
423+
let failedBatches = 0;
424+
425+
for (let i = 0; i < batches.length; i++) {
426+
const batch = batches[i];
427+
428+
try {
429+
console.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} rows)`);
430+
431+
const batchResult = await this._insertRowsOriginal(tableName, columns, batch);
432+
433+
results.push({
434+
batchIndex: i + 1,
435+
rowsProcessed: batch.length,
436+
success: true,
437+
result: batchResult,
438+
});
439+
440+
totalRowsProcessed += batch.length;
441+
successfulBatches++;
442+
443+
// Small delay between batches to prevent overwhelming the server
444+
if (i < batches.length - 1) {
445+
await new Promise((resolve) => setTimeout(resolve, 100));
446+
}
447+
448+
} catch (error) {
449+
console.log(`Batch ${i + 1} failed:`, error.message);
450+
451+
results.push({
452+
batchIndex: i + 1,
453+
rowsProcessed: 0,
454+
success: false,
455+
error: error.message,
456+
});
457+
458+
failedBatches++;
459+
460+
// Continue processing remaining batches
461+
}
462+
}
463+
464+
const summary = {
465+
totalRows: values.length,
466+
totalBatches: batches.length,
467+
successfulBatches,
468+
failedBatches,
469+
totalRowsProcessed,
470+
batchSize: optimalBatchSize,
471+
results,
472+
};
473+
474+
console.log(`Batch processing completed: ${totalRowsProcessed}/${values.length} rows processed`);
475+
476+
if (failedBatches > 0) {
477+
const error = new ConfigurationError(
478+
`Batch insert partially failed: \`${totalRowsProcessed}/${values.length}\` rows inserted. \`${failedBatches}\` batches failed.`,
479+
);
480+
error.summary = summary;
481+
throw error;
482+
}
483+
484+
return {
485+
success: true,
486+
message: `Successfully inserted ${totalRowsProcessed} rows in ${batches.length} batches`,
487+
summary,
488+
};
489+
},
379490
},
380491
};

0 commit comments

Comments
 (0)