Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions scripts/cleanup-webhook-meta.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#!/usr/bin/env bun

import Redis from "ioredis";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Switch to Bun.redis for Redis access

Scripts under scripts/ must rely on Bun’s built-in Redis client rather than ioredis. Please refactor this script to use Bun.redis so it aligns with the required runtime stack. As per coding guidelines

🤖 Prompt for AI Agents
In scripts/cleanup-webhook-meta.ts around line 3, the file currently imports
ioredis; replace that with Bun's built-in Redis client by removing the ioredis
import and creating a Bun.redis client configured from the same environment
variables (e.g. REDIS_URL/REDIS_PASSWORD) used previously. Update any
ioredis-specific usage to the Bun.redis equivalents (get/set/del/scan/expire or
pipeline patterns) and ensure you properly close/disconnect the Bun.redis client
at the end of the script or on errors; keep the rest of the cleanup logic
unchanged.


if (!process.env.REDIS_URL) {
throw new Error("REDIS_URL is not set");
}

// Configuration
const CONFIG = {
redisUrl: process.env.REDIS_URL,
batchSize: 5000,
dryRun: false, // Set to false to actually delete
maxAgeHours: 3, // Delete jobs finished more than 3 hours ago
} as const;

class WebhookMetaCleanup {
private redis: Redis;
private stats = {
totalScanned: 0,
totalDeleted: 0,
totalSkipped: 0,
errors: 0,
invalidTimestamps: 0,
};

constructor() {
this.redis = new Redis(CONFIG.redisUrl);
}

async run(): Promise<void> {
console.log(`🚀 Starting cleanup (DRY_RUN: ${CONFIG.dryRun})`);
console.log("🎯 Target pattern:");
console.log(" - twmq:engine-cloud_webhook:job:*:meta");
console.log(` - Max age: ${CONFIG.maxAgeHours} hours`);
console.log("");

try {
await this.cleanOldJobMeta();
this.printFinalStats();
} catch (error) {
console.error(`💥 Fatal error: ${error}`);
throw error;
} finally {
await this.redis.quit();
}
}

private async cleanOldJobMeta(): Promise<void> {
const pattern = "twmq:engine-cloud_webhook:job:*:meta";
console.log(`🔍 Scanning pattern: ${pattern}`);

let cursor = "0";
// Unix timestamps are always in UTC (seconds since Jan 1, 1970 00:00:00 UTC)
const now = Math.floor(Date.now() / 1000);
const cutoffTimestamp = now - (CONFIG.maxAgeHours * 60 * 60);

console.log(` Current time (UTC): ${now} (${new Date(now * 1000).toISOString()})`);
console.log(` Cutoff time (UTC): ${cutoffTimestamp} (${new Date(cutoffTimestamp * 1000).toISOString()})`);
console.log("");

do {
const [newCursor, keys] = await this.redis.scan(
cursor,
"MATCH",
pattern,
"COUNT",
CONFIG.batchSize
);
cursor = newCursor;

if (keys.length > 0) {
this.stats.totalScanned += keys.length;
console.log(` Scanned ${keys.length} keys (total: ${this.stats.totalScanned})`);

await this.processKeyBatch(keys, cutoffTimestamp);
}
} while (cursor !== "0");

console.log(`✅ Scan complete: ${pattern} (scanned ${this.stats.totalScanned} keys)`);
console.log("");
}

private async processKeyBatch(keys: string[], cutoffTimestamp: number): Promise<void> {
const keysToDelete: string[] = [];

// Batch fetch all finished_at timestamps using pipeline
const pipeline = this.redis.pipeline();
for (const key of keys) {
pipeline.hget(key, "finished_at");
}

let results;
try {
results = await pipeline.exec();
} catch (error) {
console.error(` 💥 Error fetching timestamps batch: ${error}`);
this.stats.errors += keys.length;
return;
}

// Process results
for (let i = 0; i < keys.length; i++) {
const key = keys[i];
if (!key) continue;

const [err, finishedAt] = results?.[i] ?? [null, null];

if (err) {
console.error(` 💥 Error processing key ${key}: ${err}`);
this.stats.errors += 1;
continue;
}

if (!finishedAt) {
this.stats.totalSkipped += 1;
continue;
}

const finishedAtTimestamp = parseInt(finishedAt as string, 10);

if (isNaN(finishedAtTimestamp)) {
this.stats.invalidTimestamps += 1;
continue;
}

if (finishedAtTimestamp < cutoffTimestamp) {
const age = Math.floor((Date.now() / 1000 - finishedAtTimestamp) / 3600);
if (keysToDelete.length < 10) {
// Only log first 10 to avoid spam
console.log(` 🗑️ Marking for deletion: ${key} (finished ${age}h ago)`);
}
keysToDelete.push(key);
} else {
this.stats.totalSkipped += 1;
}
}

// Delete the marked keys
if (keysToDelete.length > 0) {
console.log(` Found ${keysToDelete.length} keys to delete in this batch`);
if (CONFIG.dryRun) {
console.log(` [DRY RUN] Would delete ${keysToDelete.length} keys`);
this.stats.totalDeleted += keysToDelete.length;
} else {
await this.deleteKeys(keysToDelete);
}
}
}

private async deleteKeys(keys: string[]): Promise<void> {
try {
const pipeline = this.redis.pipeline();
for (const key of keys) {
pipeline.del(key);
}

const results = await pipeline.exec();
const deletedCount = results?.filter(([err]) => err === null).length || 0;
const failedCount = keys.length - deletedCount;

console.log(` ✅ Deleted ${deletedCount} keys`);
if (failedCount > 0) {
console.log(` ❌ Failed to delete ${failedCount} keys`);
this.stats.errors += failedCount;
}

this.stats.totalDeleted += deletedCount;
} catch (error) {
console.error(` 💥 Error deleting batch: ${error}`);
this.stats.errors += keys.length;
}
}

private printFinalStats(): void {
console.log("📈 Final Statistics:");
console.log(` Total Scanned: ${this.stats.totalScanned.toLocaleString()}`);
console.log(` Total ${CONFIG.dryRun ? 'Would Delete' : 'Deleted'}: ${this.stats.totalDeleted.toLocaleString()}`);
console.log(` Total Skipped (not old enough): ${this.stats.totalSkipped.toLocaleString()}`);
if (this.stats.invalidTimestamps > 0) {
console.log(` Invalid Timestamps: ${this.stats.invalidTimestamps.toLocaleString()}`);
}
if (this.stats.errors > 0) {
console.log(` Errors: ${this.stats.errors.toLocaleString()}`);
}
console.log("");

if (CONFIG.dryRun) {
console.log("💡 This was a DRY RUN - no data was actually deleted");
console.log("💡 Set CONFIG.dryRun = false to actually delete the keys");
} else {
console.log("✅ CLEANUP COMPLETED - Data has been permanently deleted");
}
}
}

// Main execution
async function main() {
const cleaner = new WebhookMetaCleanup();
await cleaner.run();
}

if (import.meta.main) {
main().catch(console.error);
}

87 changes: 83 additions & 4 deletions scripts/simple-redis-cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ class SimpleRedisCleanup {
private redis: Redis;
private stats = {
useropErrors: 0,
useropResults: 0,
eip7702Errors: 0,
eip7702Results: 0,
webhookErrors: 0,
webhookResults: 0,
externalBundlerErrors: 0,
externalBundlerResults: 0,
totalDeleted: 0,
errors: 0,
};
Expand All @@ -30,15 +36,31 @@ class SimpleRedisCleanup {
console.log(`🚀 Starting cleanup (DRY_RUN: ${CONFIG.dryRun})`);
console.log("🎯 Target patterns:");
console.log(" - twmq:engine-cloud_userop_confirm:job:*:errors");
console.log(" - twmq:engine-cloud_userop_confirm:jobs:result (hash)");
console.log(" - twmq:engine-cloud_eip7702_send:job:*:errors");
console.log(" - twmq:engine-cloud_eip7702_send:jobs:result (hash)");
console.log(" - twmq:engine-cloud_webhook:job:*:errors");
console.log(" - twmq:engine-cloud_webhook:jobs:result (hash)");
console.log(" - twmq:engine-cloud_external_bundler_send:job:*:errors");
console.log(" - twmq:engine-cloud_external_bundler_send:jobs:result (hash)");
console.log("");

try {
// Clean userop confirm error keys
// Clean userop confirm keys
await this.cleanPattern("twmq:engine-cloud_userop_confirm:job:*:errors");
await this.cleanHash("twmq:engine-cloud_userop_confirm:jobs:result", "userop_confirm");

// Clean eip7702 send error keys
// Clean eip7702 send keys
await this.cleanPattern("twmq:engine-cloud_eip7702_send:job:*:errors");
await this.cleanHash("twmq:engine-cloud_eip7702_send:jobs:result", "eip7702_send");

// Clean webhook keys
await this.cleanPattern("twmq:engine-cloud_webhook:job:*:errors");
await this.cleanHash("twmq:engine-cloud_webhook:jobs:result", "webhook");

// Clean external bundler send keys
await this.cleanPattern("twmq:engine-cloud_external_bundler_send:job:*:errors");
await this.cleanHash("twmq:engine-cloud_external_bundler_send:jobs:result", "external_bundler_send");

this.printFinalStats();
} catch (error) {
Expand Down Expand Up @@ -83,6 +105,37 @@ class SimpleRedisCleanup {
console.log("");
}

private async cleanHash(key: string, queueType: string): Promise<void> {
console.log(`🔍 Checking hash: ${key}`);

try {
const exists = await this.redis.exists(key);

if (exists) {
const fieldCount = await this.redis.hlen(key);
console.log(` Found hash with ${fieldCount} fields`);

if (CONFIG.dryRun) {
console.log(` [DRY RUN] Would delete hash with ${fieldCount} fields`);
this.updateStatsForHash(queueType, fieldCount);
} else {
await this.redis.del(key);
console.log(` ✅ Deleted hash with ${fieldCount} fields`);
this.updateStatsForHash(queueType, fieldCount);
this.stats.totalDeleted += 1;
}
} else {
console.log(` Hash does not exist`);
}

console.log(`✅ Hash complete: ${key}`);
console.log("");
} catch (error) {
console.error(` 💥 Error handling hash: ${error}`);
this.stats.errors += 1;
}
}

private async deleteKeys(keys: string[]): Promise<void> {
try {
const pipeline = this.redis.pipeline();
Expand Down Expand Up @@ -112,13 +165,39 @@ class SimpleRedisCleanup {
this.stats.useropErrors += count;
} else if (pattern.includes("eip7702_send")) {
this.stats.eip7702Errors += count;
} else if (pattern.includes("webhook")) {
this.stats.webhookErrors += count;
} else if (pattern.includes("external_bundler_send")) {
this.stats.externalBundlerErrors += count;
}
}

private updateStatsForHash(queueType: string, count: number): void {
if (queueType === "userop_confirm") {
this.stats.useropResults += count;
} else if (queueType === "eip7702_send") {
this.stats.eip7702Results += count;
} else if (queueType === "webhook") {
this.stats.webhookResults += count;
} else if (queueType === "external_bundler_send") {
this.stats.externalBundlerResults += count;
}
}

private printFinalStats(): void {
console.log("📈 Final Statistics:");
console.log(` Userop Confirm Errors: ${this.stats.useropErrors.toLocaleString()}`);
console.log(` EIP-7702 Send Errors: ${this.stats.eip7702Errors.toLocaleString()}`);
console.log(` Userop Confirm:`);
console.log(` - Errors: ${this.stats.useropErrors.toLocaleString()}`);
console.log(` - Result Hash Fields: ${this.stats.useropResults.toLocaleString()}`);
console.log(` EIP-7702 Send:`);
console.log(` - Errors: ${this.stats.eip7702Errors.toLocaleString()}`);
console.log(` - Result Hash Fields: ${this.stats.eip7702Results.toLocaleString()}`);
console.log(` Webhook:`);
console.log(` - Errors: ${this.stats.webhookErrors.toLocaleString()}`);
console.log(` - Result Hash Fields: ${this.stats.webhookResults.toLocaleString()}`);
console.log(` External Bundler Send:`);
console.log(` - Errors: ${this.stats.externalBundlerErrors.toLocaleString()}`);
console.log(` - Result Hash Fields: ${this.stats.externalBundlerResults.toLocaleString()}`);
console.log(` Total ${CONFIG.dryRun ? 'Would Delete' : 'Deleted'}: ${this.stats.totalDeleted.toLocaleString()}`);
if (this.stats.errors > 0) {
console.log(` Errors: ${this.stats.errors}`);
Expand Down
Loading
Loading