11const { Worker } = require ( "worker_threads" ) ;
22const { performance } = require ( "perf_hooks" ) ;
33const path = require ( "path" ) ;
4+ const admin = require ( "firebase-admin" ) ;
45
5- const totalDocs = 10000000 ; // Total number of documents to write
6+ // Initialize Firebase Admin SDK
7+ admin . initializeApp ( {
8+ projectId : "vertex-testing-1efc3" ,
9+ } ) ;
10+
11+ // Get a reference to the Firestore service
12+ const db = admin . firestore ( ) ;
13+
14+ const totalDocs = 1000000 ; // Total number of documents to write
615const maxThreads = 20 ; // Maximum number of worker threads
716const batchSize = 500 ; // Documents per batch
8- const rampUpDelay = 2000 ; // 5 seconds delay between ramp-ups
9- const rampUps = 20 ; // Number of ramp-ups (planned)
10-
17+ const targetRate = 500 ; // Target docs per second
18+ const rampUpDelay = 1000 ; // Delay between ramp-ups
19+ const rampUps = 5 ; // Number of ramp-ups
1120const docsPerRampUp = Math . ceil ( totalDocs / rampUps ) ; // Documents per ramp-up
1221
22+ // Calculate the delay needed to meet the target rate (in milliseconds)
23+ const delayBetweenBatches = Math . max ( 1000 / ( targetRate / batchSize ) , 0 ) ; // Delay between batches in ms
24+
25+ // Hardcoded collection paths with the form: A/{aid}/B/{bid}/C/{cid}/D/{did}/E/{eid}/F/{fid}/G
26+ const collectionPaths = [
27+ "A/aid1/B/bid1/C/cid1/D/did1/E/eid1/F/fid1/G" ,
28+ "A/aid2/B/bid2/C/cid2/D/did2/E/eid2/F/fid2/G" ,
29+ "A/aid3/B/bid3/C/cid3/D/did3/E/eid3/F/fid3/G" ,
30+ "A/aid4/B/bid4/C/cid4/D/did4/E/eid4/F/fid4/G" ,
31+ "A/aid5/B/bid5/C/cid5/D/did5/E/eid5/F/fid5/G" ,
32+ ] ;
33+
1334// Start measuring total execution time
1435const totalStartTime = performance . now ( ) ;
1536
1637const workerJsPath = path . resolve ( __dirname , "worker.js" ) ;
1738
1839// Function to spawn worker threads for a specific ramp-up
19- const spawnWorkers = async ( activeThreads , startDoc , docsPerRampUp ) => {
20- console . log ( `Spawning ${ activeThreads } worker(s)...` ) ;
40+ const spawnWorkers = async (
41+ activeThreads ,
42+ startDoc ,
43+ docsPerRampUp ,
44+ collectionPath
45+ ) => {
46+ console . log (
47+ `Spawning ${ activeThreads } worker(s) for collection ${ collectionPath } ...`
48+ ) ;
2149 let promises = [ ] ;
2250 const docsPerThread = Math . ceil ( docsPerRampUp / activeThreads ) ;
2351
2452 for ( let i = 0 ; i < activeThreads ; i ++ ) {
2553 const docsForThisThread = Math . min ( docsPerThread , docsPerRampUp ) ;
26- const start = startDoc + i * docsPerThread ;
54+ const start = startDoc + i * docsForThisThread ;
2755 const end = Math . min ( start + docsForThisThread , startDoc + docsPerRampUp ) ;
2856
2957 promises . push (
@@ -33,6 +61,8 @@ const spawnWorkers = async (activeThreads, startDoc, docsPerRampUp) => {
3361 start,
3462 end,
3563 batchSize,
64+ collectionPath, // Pass the collection path to the worker
65+ delayBetweenBatches, // Pass the delay to the worker
3666 } ,
3767 } ) ;
3868
@@ -64,13 +94,44 @@ const spawnWorkers = async (activeThreads, startDoc, docsPerRampUp) => {
6494 }
6595} ;
6696
97+ // Function to query Firestore for the total document count using count() aggregation
98+ const getCollectionCounts = async ( ) => {
99+ let counts = { } ;
100+
101+ for ( const collectionPath of collectionPaths ) {
102+ const collectionRef = db . collection ( collectionPath ) ;
103+ const snapshot = await collectionRef . count ( ) . get ( ) ; // Use the count aggregation query
104+ const count = snapshot . data ( ) . count ;
105+ counts [ collectionPath ] = count ;
106+ console . log ( `Collection ${ collectionPath } has ${ count } documents.` ) ;
107+ }
108+
109+ return counts ;
110+ } ;
111+
112+ // Function to calculate the difference between two count objects
113+ const calculateCountDifference = ( beforeCounts , afterCounts ) => {
114+ let totalDifference = 0 ;
115+
116+ for ( const collectionPath in beforeCounts ) {
117+ const beforeCount = beforeCounts [ collectionPath ] || 0 ;
118+ const afterCount = afterCounts [ collectionPath ] || 0 ;
119+ const difference = afterCount - beforeCount ;
120+ console . log ( `Collection ${ collectionPath } difference: ${ difference } ` ) ;
121+ totalDifference += difference ;
122+ }
123+
124+ return totalDifference ;
125+ } ;
126+
67127// Function to execute ramp-ups
68128const executeRampUps = async ( ) => {
69129 let activeThreads = 1 ;
70130 let startDoc = 0 ;
71131
72132 for ( let i = 0 ; i < rampUps ; i ++ ) {
73- await spawnWorkers ( activeThreads , startDoc , docsPerRampUp ) ;
133+ const collectionPath = collectionPaths [ i % collectionPaths . length ] ; // Rotate through collections
134+ await spawnWorkers ( activeThreads , startDoc , docsPerRampUp , collectionPath ) ;
74135 startDoc += docsPerRampUp ;
75136
76137 if ( activeThreads < maxThreads ) {
@@ -88,17 +149,38 @@ const executeRampUps = async () => {
88149 }
89150} ;
90151
91- // Run the ramp-ups
92- executeRampUps ( )
93- . then ( ( ) => {
152+ // Main execution flow
153+ const main = async ( ) => {
154+ try {
155+ // Count documents before writing
156+ console . log ( "Counting documents before the operation..." ) ;
157+ const beforeCounts = await getCollectionCounts ( ) ;
158+
159+ // Perform the writing operation
160+ await executeRampUps ( ) ;
161+
162+ // Count documents after writing
163+ console . log ( "Counting documents after the operation..." ) ;
164+ const afterCounts = await getCollectionCounts ( ) ;
165+
166+ // Calculate and log the difference
167+ const totalDocsWritten = calculateCountDifference (
168+ beforeCounts ,
169+ afterCounts
170+ ) ;
171+ console . log ( `Total documents written: ${ totalDocsWritten } ` ) ;
172+
94173 const totalEndTime = performance . now ( ) ;
95174 const totalDuration = ( totalEndTime - totalStartTime ) / 1000 ; // Convert to seconds
96175 console . log (
97- `Successfully written ${ totalDocs } documents to the collection in ${ totalDuration . toFixed (
176+ `Successfully written ${ totalDocsWritten } documents in ${ totalDuration . toFixed (
98177 2
99178 ) } seconds.`
100179 ) ;
101- } )
102- . catch ( ( error ) => {
103- console . error ( "Error in worker threads: " , error ) ;
104- } ) ;
180+ } catch ( error ) {
181+ console . error ( "Error during execution: " , error ) ;
182+ }
183+ } ;
184+
185+ // Run the main function
186+ main ( ) ;
0 commit comments