Skip to content

[FIX] Snowflake - Insert Multiple Rows #17722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import snowflake from "../../snowflake.app.mjs";

export default {
name: "Execute SQL Query",
version: "0.2.0",
version: "0.2.1",
key: "snowflake-execute-sql-query",
description: "Execute a custom Snowflake query. See [our docs](https://pipedream.com/docs/databases/working-with-sql) to learn more about working with SQL in Pipedream.",
type: "action",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export default {
key: "snowflake-insert-multiple-rows",
name: "Insert Multiple Rows",
description: "Insert multiple rows into a table",
version: "0.1.2",
version: "0.1.3",
props: {
snowflake,
database: {
Expand Down Expand Up @@ -50,29 +50,136 @@ export default {
"values",
],
},
batchSize: {
type: "integer",
label: "Batch Size",
description: "Number of rows to process per batch. Automatically calculated based on data size if not specified. Recommended: `50-200` for wide tables, `100-500` for narrow tables.",
optional: true,
default: 100,
min: 10,
max: 1000,
},
maxPayloadSizeMB: {
type: "integer",
label: "Max Payload Size (MB)",
description: "Maximum payload size per batch in MB. Helps prevent `413 Payload Too Large` errors.",
optional: true,
default: 5,
min: 1,
max: 10,
},
enableBatching: {
type: "boolean",
label: "Enable Batch Processing",
description: "Enable automatic batch processing for large datasets. Disable only for small datasets (< 50 rows) or troubleshooting.",
optional: true,
default: true,
},
},
async run({ $ }) {
let rows = this.values;

let inputValidated = true;

if (!Array.isArray(rows)) {
rows = JSON.parse(rows);
try {
rows = JSON.parse(rows);
} catch (parseError) {
throw new ConfigurationError("The row data could not be parsed as JSON. Please ensure it's a valid JSON array of arrays.");
}
}

if (!rows || !rows.length || !Array.isArray(rows)) {
inputValidated = false;
} else {
rows.forEach((row) => { if (!Array.isArray(row)) { inputValidated = false; } });
rows.forEach((row, index) => {
if (!Array.isArray(row)) {
console.log(`Row ${index + 1} is not an array:`, row);
inputValidated = false;
}
});
}

// Throw an error if input validation failed
if (!inputValidated) {
throw new ConfigurationError("The row data you passed is not an array of arrays. Please enter an array of arrays in the `Values` parameter above. If you're trying to add a single row to Snowflake, select the **Insert Single Row** action.");
}

const response = await this.snowflake.insertRows(this.tableName, this.columns, this.values);
$.export("$summary", `Successfully inserted ${this.values.length} rows in ${this.tableName}`);
return response;
const expectedColumnCount = this.columns.length;
const invalidRows = rows.filter((row, index) => {
if (row.length !== expectedColumnCount) {
console.error(`Row ${index + 1} has ${row.length} values but ${expectedColumnCount} columns specified`);
return true;
}
return false;
});

if (invalidRows.length > 0) {
throw new ConfigurationError(`${invalidRows.length} rows have a different number of values than the specified columns. Each row must have exactly ${expectedColumnCount} values to match the selected columns.`);
}

// Add batch processing options
const batchOptions = {
batchSize: this.batchSize,
maxPayloadSizeMB: this.maxPayloadSizeMB,
enableBatching: this.enableBatching,
};

try {
const response = await this.snowflake.insertRows(
this.tableName,
this.columns,
rows,
batchOptions,
);

// Handle different response formats (batched vs single insert)
if (response.summary) {
// Batched response
const { summary } = response;
$.export("$summary", `Successfully inserted ${summary.totalRowsProcessed} rows into ${this.tableName} using ${summary.totalBatches} batches`);

// Export detailed batch information
$.export("batchDetails", {
totalRows: summary.totalRows,
totalBatches: summary.totalBatches,
successfulBatches: summary.successfulBatches,
failedBatches: summary.failedBatches,
batchSize: summary.batchSize,
processingTime: new Date().toISOString(),
});

// Export batch results for debugging if needed
$.export("batchResults", summary.results);

return response;

} else {
// Single insert response (small dataset or batching disabled)
$.export("$summary", `Successfully inserted ${rows.length} rows into ${this.tableName}`);
return response;
}

} catch (error) {
// Enhanced error handling for batch processing
if (error.summary) {
// Partial failure in batch processing
const { summary } = error;
$.export("$summary", `Partial success: ${summary.totalRowsProcessed}/${summary.totalRows} rows inserted. ${summary.failedBatches} batches failed.`);
$.export("batchDetails", summary);
$.export("failedBatches", summary.results.filter((r) => !r.success));
}

// Re-throw the error with additional context
if (error.message.includes("413") || error.message.includes("Payload Too Large")) {
throw new ConfigurationError(
`Payload too large error detected. Try reducing the batch size (current: ${this.batchSize}) or enable batching if disabled. ` +
`You're trying to insert ${rows.length} rows with ${this.columns.length} columns each. ` +
`Original error: ${error.message}`,
);
}

throw error;
}
},
};
2 changes: 1 addition & 1 deletion components/snowflake/actions/insert-row/insert-row.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export default {
key: "snowflake-insert-row",
name: "Insert Single Row",
description: "Insert a row into a table",
version: "1.1.2",
version: "1.1.3",
props: {
snowflake,
database: {
Expand Down
24 changes: 24 additions & 0 deletions components/snowflake/common/utils.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
function estimatePayloadSize(data) {
try {
const jsonString = JSON.stringify(data);
// Use Buffer.byteLength for accurate size calculation in Node.js
return Buffer.byteLength(jsonString, "utf8");
} catch (error) {
// Fallback estimation if JSON.stringify fails
return data.length * 1000; // Conservative estimate
}
}

// Helper method to split rows into batches
function createBatches(rows, batchSize) {
const batches = [];
for (let i = 0; i < rows.length; i += batchSize) {
batches.push(rows.slice(i, i + batchSize));
}
return batches;
}

export default {
estimatePayloadSize,
createBatches,
};
2 changes: 1 addition & 1 deletion components/snowflake/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/snowflake",
"version": "0.13.0",
"version": "0.13.1",
"description": "Pipedream Snowflake Components",
"main": "snowflake.app.mjs",
"keywords": [
Expand Down
117 changes: 114 additions & 3 deletions components/snowflake/snowflake.app.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { createPrivateKey } from "crypto";
import { snowflake } from "@pipedream/snowflake-sdk";
import { promisify } from "util";
import {
sqlProxy, sqlProp,
sqlProxy, sqlProp, ConfigurationError,
} from "@pipedream/platform";
import utils from "./common/utils.mjs";

snowflake.configure({
logLevel: "WARN",
Expand Down Expand Up @@ -368,13 +369,123 @@ export default {
};
return this.executeQuery(statement);
},
async insertRows(tableName, columns, binds) {
const sqlText = `INSERT INTO ${tableName} (${columns.join(",")}) VALUES (${columns.map(() => "?").join(", ")});`;
async _insertRowsOriginal(tableName, columns, values) {
// Create placeholders for all rows
const rowPlaceholders = values.map(() =>
`(${columns.map(() => "?").join(", ")})`).join(", ");

const sqlText = `INSERT INTO ${tableName} (${columns.join(",")}) VALUES ${rowPlaceholders}`;

// Flatten all values into a single array for binding
const binds = values.flat();

const statement = {
sqlText,
binds,
};

return this.executeQuery(statement);
},
async insertRows(tableName, columns, values, options = {}) {
const {
batchSize = 100,
maxPayloadSizeMB = 5,
enableBatching = true,
} = options;

// If batching is disabled or small dataset, use original approach
if (!enableBatching || values.length <= 50) {
return this._insertRowsOriginal(tableName, columns, values);
}

// Estimate payload size for dynamic batch sizing
const sampleRowData = values.slice(0, Math.min(10, values.length));
const sampleSize = utils.estimatePayloadSize(sampleRowData);
const avgRowSize = sampleSize / sampleRowData.length;
const maxSizeBytes = maxPayloadSizeMB * 1024 * 1024;

// Calculate optimal batch size with safety margin
const calculatedBatchSize = Math.floor((maxSizeBytes * 0.8) / avgRowSize);
const optimalBatchSize = Math.min(
Math.max(calculatedBatchSize, 10), // Minimum 10 rows per batch
Math.min(batchSize, 500), // Maximum 500 rows per batch
);

console.log(`Processing ${values.length} rows in batches of ${optimalBatchSize}`);

// Split into batches
const batches = utils.createBatches(values, optimalBatchSize);

// Process batches sequentially
const results = [];
let totalRowsProcessed = 0;
let successfulBatches = 0;
let failedBatches = 0;

for (let i = 0; i < batches.length; i++) {
const batch = batches[i];

try {
console.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} rows)`);

const batchResult = await this._insertRowsOriginal(tableName, columns, batch);

results.push({
batchIndex: i + 1,
rowsProcessed: batch.length,
success: true,
result: batchResult,
});

totalRowsProcessed += batch.length;
successfulBatches++;

// Small delay between batches to prevent overwhelming the server
if (i < batches.length - 1) {
await new Promise((resolve) => setTimeout(resolve, 100));
}

} catch (error) {
console.log(`Batch ${i + 1} failed:`, error.message);

results.push({
batchIndex: i + 1,
rowsProcessed: 0,
success: false,
error: error.message,
});

failedBatches++;

// Continue processing remaining batches
}
}

const summary = {
totalRows: values.length,
totalBatches: batches.length,
successfulBatches,
failedBatches,
totalRowsProcessed,
batchSize: optimalBatchSize,
results,
};

console.log(`Batch processing completed: ${totalRowsProcessed}/${values.length} rows processed`);

if (failedBatches > 0) {
const error = new ConfigurationError(
`Batch insert partially failed: \`${totalRowsProcessed}/${values.length}\` rows inserted. \`${failedBatches}\` batches failed.`,
);
error.summary = summary;
throw error;
}

return {
success: true,
message: `Successfully inserted ${totalRowsProcessed} rows in ${batches.length} batches`,
summary,
};
},
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export default {
// eslint-disable-next-line
name: "New, Updated, or Deleted Warehouse",
description: "Emit new events when a warehouse is created, altered, or dropped",
version: "0.1.2",
version: "0.1.3",
async run() {
await this.watchObjectsAndEmitChanges("WAREHOUSE", this.warehouses, this.queryTypes);
},
Expand Down
2 changes: 1 addition & 1 deletion components/snowflake/sources/deleted-role/deleted-role.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export default {
key: "snowflake-deleted-role",
name: "New Deleted Role",
description: "Emit new event when a role is deleted",
version: "0.1.2",
version: "0.1.3",
methods: {
...common.methods,
getSqlText() {
Expand Down
2 changes: 1 addition & 1 deletion components/snowflake/sources/deleted-user/deleted-user.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export default {
key: "snowflake-deleted-user",
name: "New Deleted User",
description: "Emit new event when a user is deleted",
version: "0.1.2",
version: "0.1.3",
methods: {
...common.methods,
getSqlText() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export default {
// eslint-disable-next-line
name: "Failed Task in Schema",
description: "Emit new events when a task fails in a database schema",
version: "0.1.2",
version: "0.1.3",
async run() {
await this.emitFailedTasks({
database: this.database,
Expand Down
2 changes: 1 addition & 1 deletion components/snowflake/sources/new-database/new-database.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export default {
key: "snowflake-new-database",
name: "New Database",
description: "Emit new event when a database is created",
version: "0.1.2",
version: "0.1.3",
methods: {
...common.methods,
alwaysRunInSingleProcessMode() {
Expand Down
2 changes: 1 addition & 1 deletion components/snowflake/sources/new-role/new-role.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export default {
key: "snowflake-new-role",
name: "New Role",
description: "Emit new event when a role is created",
version: "0.1.2",
version: "0.1.3",
methods: {
...common.methods,
alwaysRunInSingleProcessMode() {
Expand Down
Loading
Loading