diff --git a/txn/flusher.go b/txn/flusher.go index 64b06c3ec..d668da8d7 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -62,7 +62,8 @@ func (f *flusher) run() (err error) { f.debugf("Processing %s", f.goal) seen := make(map[bson.ObjectId]*transaction) - if err := f.recurse(f.goal, seen); err != nil { + preloaded := make(map[bson.ObjectId]*transaction) + if err := f.recurse(f.goal, seen, preloaded); err != nil { return err } if f.goal.done() { @@ -154,26 +155,54 @@ func (f *flusher) run() (err error) { return nil } -func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction) error { +const preloadBatchSize = 100 + +func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t + delete(preloaded, t.Id) err := f.advance(t, nil, false) if err != errPreReqs { return err } for _, dkey := range t.docKeys() { + remaining := make([]bson.ObjectId, 0, len(f.queue[dkey])) + toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey])) for _, dtt := range f.queue[dkey] { id := dtt.id() - if seen[id] != nil { + if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { continue } - qt, err := f.load(id) - if err != nil { - return err + toPreload[id] = struct{}{} + remaining = append(remaining, id) + } + // done with this map + toPreload = nil + for len(remaining) > 0 { + batch := remaining + if len(batch) > preloadBatchSize { + batch = remaining[:preloadBatchSize] } - err = f.recurse(qt, seen) + remaining = remaining[len(batch):] + err := f.loadMulti(batch, preloaded) if err != nil { return err } + for _, id := range batch { + if seen[id] != nil { + continue + } + qt, ok := preloaded[id] + if !ok { + qt, err = f.load(id) + if err != nil { + return err + } + } + err = f.recurse(qt, seen, preloaded) + if err != nil { + return err + } + } } } return nil diff --git a/txn/txn.go b/txn/txn.go index 204b3cf1d..3bc6e640f 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -459,6 +459,26 @@ func (r *Runner) load(id bson.ObjectId) (*transaction, error) { return &t, nil } +func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*transaction) error { + txns := make([]transaction, 0, len(ids)) + + query := r.tc.Find(bson.M{"_id": bson.M{"$in": ids}}) + // Not sure that this actually has much of an effect when using All() + query.Batch(len(ids)) + err := query.All(&txns) + if err == mgo.ErrNotFound { + return fmt.Errorf("could not find a transaction in batch: %v", ids) + } else if err != nil { + return err + } + for i := range txns { + t := &txns[i] + preloaded[t.Id] = t + } + return nil +} + + type typeNature int const (