@@ -10,10 +10,12 @@ import (
1010	"github.com/10gen/migration-verifier/internal/types" 
1111	"github.com/10gen/migration-verifier/internal/util" 
1212	"github.com/pkg/errors" 
13+ 	"github.com/samber/lo" 
1314	"go.mongodb.org/mongo-driver/bson" 
1415	"go.mongodb.org/mongo-driver/bson/bsontype" 
1516	"go.mongodb.org/mongo-driver/mongo" 
1617	"go.mongodb.org/mongo-driver/mongo/options" 
18+ 	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore" 
1719)
1820
1921const  (
@@ -31,9 +33,22 @@ const (
3133// sorting by _id will guarantee that all rechecks for a given 
3234// namespace appear consecutively. 
3335type  RecheckPrimaryKey  struct  {
34- 	SrcDatabaseName    string  `bson:"db"` 
35- 	SrcCollectionName  string  `bson:"coll"` 
36- 	DocumentID         any     `bson:"docID"` 
36+ 	SrcDatabaseName    string         `bson:"db"` 
37+ 	SrcCollectionName  string         `bson:"coll"` 
38+ 	DocumentID         bson.RawValue  `bson:"docID"` 
39+ }
40+ 
41+ var  _  bson.Marshaler  =  & RecheckPrimaryKey {}
42+ 
43+ func  (rk  * RecheckPrimaryKey ) MarshalBSON () ([]byte , error ) {
44+ 	return  bsoncore .NewDocumentBuilder ().
45+ 		AppendString ("db" , rk .SrcDatabaseName ).
46+ 		AppendString ("coll" , rk .SrcCollectionName ).
47+ 		AppendValue ("docID" , bsoncore.Value {
48+ 			Type : rk .DocumentID .Type ,
49+ 			Data : rk .DocumentID .Value ,
50+ 		}).
51+ 		Build (), nil 
3752}
3853
3954// RecheckDoc stores the necessary information to know which documents must be rechecked. 
@@ -46,6 +61,15 @@ type RecheckDoc struct {
4661	DataSize  int  `bson:"dataSize"` 
4762}
4863
64+ var  _  bson.Marshaler  =  & RecheckDoc {}
65+ 
66+ func  (rd  * RecheckDoc ) MarshalBSON () ([]byte , error ) {
67+ 	return  bsoncore .NewDocumentBuilder ().
68+ 		AppendDocument ("_id" , lo .Must (bson .Marshal (rd .PrimaryKey ))).
69+ 		AppendInt64 ("dataSize" , int64 (rd .DataSize )).
70+ 		Build (), nil 
71+ }
72+ 
4973// InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check. 
5074func  (verifier  * Verifier ) InsertFailedCompareRecheckDocs (
5175	ctx  context.Context ,
@@ -95,50 +119,16 @@ func (verifier *Verifier) insertRecheckDocs(
95119
96120	genCollection  :=  verifier .getRecheckQueueCollection (generation )
97121
98- 	var  recheckBatches  [][]mongo.WriteModel 
99- 	var  curRechecks  []mongo.WriteModel 
100- 	curBatchSize  :=  0 
101- 	for  i , dbName  :=  range  dbNames  {
102- 		recheckDoc  :=  RecheckDoc {
103- 			PrimaryKey : RecheckPrimaryKey {
104- 				SrcDatabaseName :   dbName ,
105- 				SrcCollectionName : collNames [i ],
106- 				DocumentID :        rawDocIDs [i ],
107- 			},
108- 			DataSize : dataSizes [i ],
109- 		}
110- 
111- 		recheckRaw , err  :=  bson .Marshal (recheckDoc )
112- 		if  err  !=  nil  {
113- 			return  errors .Wrapf (err , "marshaling recheck for %#q" , dbName + "." + collNames [i ])
114- 		}
115- 
116- 		curRechecks  =  append (
117- 			curRechecks ,
118- 			mongo .NewInsertOneModel ().SetDocument (recheckDoc ),
119- 		)
120- 		curBatchSize  +=  len (recheckRaw )
121- 		if  curBatchSize  >  recheckBatchByteLimit  ||  len (curRechecks ) >=  recheckBatchCountLimit  {
122- 			recheckBatches  =  append (recheckBatches , curRechecks )
123- 			curRechecks  =  nil 
124- 			curBatchSize  =  0 
125- 		}
126- 	}
127- 
128- 	if  len (curRechecks ) >  0  {
129- 		recheckBatches  =  append (recheckBatches , curRechecks )
130- 	}
131- 
132- 	for  _ , models  :=  range  recheckBatches  {
122+ 	sendRechecks  :=  func (rechecks  []bson.Raw ) {
133123		eg .Go (func () error  {
134124
135125			retryer  :=  retry .New ()
136126			err  :=  retryer .WithCallback (
137127				func (retryCtx  context.Context , _  * retry.FuncInfo ) error  {
138- 					_ , err  :=  genCollection .BulkWrite (
128+ 					_ , err  :=  genCollection .InsertMany (
139129						retryCtx ,
140- 						models ,
141- 						options .BulkWrite ().SetOrdered (false ),
130+ 						lo . ToAnySlice ( rechecks ) ,
131+ 						options .InsertMany ().SetOrdered (false ),
142132					)
143133
144134					// We expect duplicate-key errors from the above because: 
@@ -157,20 +147,54 @@ func (verifier *Verifier) insertRecheckDocs(
157147					// and document sizes probably remain stable(-ish) across updates. 
158148					err  =  util .TolerateSimpleDuplicateKeyInBulk (
159149						verifier .logger ,
160- 						len (models ),
150+ 						len (rechecks ),
161151						err ,
162152					)
163153
164154					return  err 
165155				},
166156				"persisting %d recheck(s)" ,
167- 				len (models ),
157+ 				len (rechecks ),
168158			).Run (groupCtx , verifier .logger )
169159
170- 			return  errors .Wrapf (err , "batch of %d rechecks" , len (models ))
160+ 			return  errors .Wrapf (err , "batch of %d rechecks" , len (rechecks ))
171161		})
172162	}
173163
164+ 	curRechecks  :=  make ([]bson.Raw , 0 , recheckBatchCountLimit )
165+ 	curBatchBytes  :=  0 
166+ 	for  i , dbName  :=  range  dbNames  {
167+ 		recheckDoc  :=  RecheckDoc {
168+ 			PrimaryKey : RecheckPrimaryKey {
169+ 				SrcDatabaseName :   dbName ,
170+ 				SrcCollectionName : collNames [i ],
171+ 				DocumentID :        rawDocIDs [i ],
172+ 			},
173+ 			DataSize : dataSizes [i ],
174+ 		}
175+ 
176+ 		recheckRaw , err  :=  bson .Marshal (recheckDoc )
177+ 		if  err  !=  nil  {
178+ 			return  errors .Wrapf (err , "marshaling recheck for %#q" , dbName + "." + collNames [i ])
179+ 		}
180+ 
181+ 		curRechecks  =  append (
182+ 			curRechecks ,
183+ 			bson .Raw (recheckRaw ),
184+ 		)
185+ 
186+ 		curBatchBytes  +=  len (recheckRaw )
187+ 		if  curBatchBytes  >  recheckBatchByteLimit  ||  len (curRechecks ) >=  recheckBatchCountLimit  {
188+ 			sendRechecks (curRechecks )
189+ 			curRechecks  =  make ([]bson.Raw , 0 , recheckBatchCountLimit )
190+ 			curBatchBytes  =  0 
191+ 		}
192+ 	}
193+ 
194+ 	if  len (curRechecks ) >  0  {
195+ 		sendRechecks (curRechecks )
196+ 	}
197+ 
174198	if  err  :=  eg .Wait (); err  !=  nil  {
175199		return  errors .Wrapf (
176200			err ,
0 commit comments