1- import { type Document } from '../../bson' ;
1+ import { BSON , type Document } from '../../bson' ;
22import { DocumentSequence } from '../../cmap/commands' ;
33import { type PkFactory } from '../../mongo_client' ;
44import type { Filter , OptionalId , UpdateFilter , WithoutId } from '../../mongo_types' ;
@@ -28,6 +28,11 @@ export interface ClientBulkWriteCommand {
2828 comment ?: any ;
2929}
3030
31+ /**
32+ * The bytes overhead for the extra fields added post command generation.
33+ */
34+ const MESSAGE_OVERHEAD_BYTES = 1000 ;
35+
3136/** @internal */
3237export class ClientBulkWriteCommandBuilder {
3338 models : AnyClientBulkWriteModel [ ] ;
@@ -62,32 +67,101 @@ export class ClientBulkWriteCommandBuilder {
6267 /**
6368 * Build the bulk write commands from the models.
6469 */
65- buildCommands ( ) : ClientBulkWriteCommand [ ] {
70+ buildCommands ( maxMessageSizeBytes ?: number ) : ClientBulkWriteCommand [ ] {
71+ // If we don't know the maxMessageSizeBytes or for some reason it's 0
72+ // then we cannot calculate the batch.
73+ if ( ! maxMessageSizeBytes ) {
74+ throw new Error ( '' ) ;
75+ }
76+
6677 // Iterate the models to build the ops and nsInfo fields.
67- const operations = [ ] ;
78+ // We need to do this in a loop which creates one command each up
79+ // to the max bson size or max message size.
80+ const commands : ClientBulkWriteCommand [ ] = [ ] ;
81+ let currentCommandLength = MESSAGE_OVERHEAD_BYTES ;
6882 let currentNamespaceIndex = 0 ;
83+ let currentCommand : ClientBulkWriteCommand = this . baseCommand ( ) ;
6984 const namespaces = new Map < string , number > ( ) ;
70- for ( const model of this . models ) {
85+ for ( const [ modelIndex , model ] of this . models . entries ( ) ) {
7186 const ns = model . namespace ;
7287 const index = namespaces . get ( ns ) ;
7388 if ( index != null ) {
74- operations . push ( buildOperation ( model , index , this . pkFactory ) ) ;
89+ // Pushing to the ops document sequence returns the bytes length added.
90+ const operation = buildOperation ( model , index , this . pkFactory ) ;
91+ const operationBuffer = BSON . serialize ( operation ) ;
92+
93+ // Check if the operation buffer can fit in the current command. If it can,
94+ // then add the operation to the document sequence and increment the
95+ // current length.
96+ if ( currentCommandLength + operationBuffer . length < maxMessageSizeBytes ) {
97+ // Pushing to the ops document sequence returns the bytes length added.
98+ const opsLength = currentCommand . ops . push ( operation , operationBuffer ) ;
99+ currentCommandLength += opsLength ;
100+
101+ // If this is the last model in the array, push the current command.
102+ if ( modelIndex === this . models . length - 1 ) {
103+ commands . push ( currentCommand ) ;
104+ }
105+ } else {
106+ // We need to batch. Push the current command to the commands
107+ // array and create a new current command if there are more models
108+ // that need to be iterated.
109+ commands . push ( currentCommand ) ;
110+ if ( modelIndex < this . models . length - 1 ) {
111+ currentCommand = this . baseCommand ( ) ;
112+ }
113+ }
75114 } else {
76115 namespaces . set ( ns , currentNamespaceIndex ) ;
77- operations . push ( buildOperation ( model , currentNamespaceIndex , this . pkFactory ) ) ;
78- currentNamespaceIndex ++ ;
116+ const nsInfo = { ns : ns } ;
117+ const nsInfoBuffer = BSON . serialize ( nsInfo ) ;
118+ const operation = buildOperation ( model , currentNamespaceIndex , this . pkFactory ) ;
119+ const operationBuffer = BSON . serialize ( operation ) ;
120+
121+ // Check if the operation and nsInfo buffers can fit in the command. If they
122+ // can, then add the operation and nsInfo to their respective document
123+ // sequences and increment the current length.
124+ if (
125+ currentCommandLength + nsInfoBuffer . length + operationBuffer . length <
126+ maxMessageSizeBytes
127+ ) {
128+ // Pushing to the nsInfo document sequence returns the bytes length added.
129+ const nsInfoLength = currentCommand . nsInfo . push ( nsInfo , nsInfoBuffer ) ;
130+ currentCommandLength += nsInfoLength ;
131+
132+ // Pushing to the ops document sequence returns the bytes length added.
133+ const opsLength = currentCommand . ops . push ( operation , operationBuffer ) ;
134+ currentCommandLength += opsLength ;
135+
136+ // We've added a new namespace, increment the namespace index.
137+ currentNamespaceIndex ++ ;
138+
139+ // If this is the last model in the array, push the current command.
140+ if ( modelIndex === this . models . length - 1 ) {
141+ commands . push ( currentCommand ) ;
142+ }
143+ } else {
144+ // We need to batch. Push the current command to the commands
145+ // array and create a new current command if there are more models
146+ // that need to be iterated.
147+ commands . push ( currentCommand ) ;
148+ if ( modelIndex < this . models . length - 1 ) {
149+ currentCommand = this . baseCommand ( ) ;
150+ }
151+ }
79152 }
80153 }
81154
82- const nsInfo = Array . from ( namespaces . keys ( ) , ns => ( { ns } ) ) ;
155+ return commands ;
156+ }
83157
84- // The base command.
158+ private baseCommand ( ) : ClientBulkWriteCommand {
85159 const command : ClientBulkWriteCommand = {
86160 bulkWrite : 1 ,
87161 errorsOnly : this . errorsOnly ,
88162 ordered : this . options . ordered ?? true ,
89- ops : new DocumentSequence ( operations ) ,
90- nsInfo : new DocumentSequence ( nsInfo )
163+ ops : new DocumentSequence ( 'ops' ) ,
164+ nsInfo : new DocumentSequence ( ' nsInfo' )
91165 } ;
92166 // Add bypassDocumentValidation if it was present in the options.
93167 if ( this . options . bypassDocumentValidation != null ) {
@@ -103,7 +177,8 @@ export class ClientBulkWriteCommandBuilder {
103177 if ( this . options . comment !== undefined ) {
104178 command . comment = this . options . comment ;
105179 }
106- return [ command ] ;
180+
181+ return command ;
107182 }
108183}
109184
0 commit comments