From 3df931a4192707c645203129bfa26718ef72d36a Mon Sep 17 00:00:00 2001 From: Michael Wilner Date: Tue, 30 Oct 2018 08:57:34 -0500 Subject: [PATCH 1/6] Implement Mongo backing store for persistence --- Gopkg.lock | 46 +++++++- config/configuration.go | 2 + mongostore.go | 249 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 295 insertions(+), 2 deletions(-) create mode 100644 mongostore.go diff --git a/Gopkg.lock b/Gopkg.lock index 156d40f26..4d6e80a80 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,50 +2,92 @@ [[projects]] + digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" name = "github.com/davecgh/go-spew" packages = ["spew"] + pruneopts = "" revision = "346938d642f2ec3594ed81d874461961cd0faa76" version = "v1.1.0" [[projects]] + branch = "master" + digest = "1:e9ffb9315dce0051beb757d0f0fc25db57c4da654efc4eada4ea109c2d9da815" + name = "github.com/globalsign/mgo" + packages = [ + ".", + "bson", + "internal/json", + "internal/sasl", + "internal/scram", + ] + pruneopts = "" + revision = "eeefdecb41b842af6dc652aaea4026e8403e62df" + +[[projects]] + digest = "1:1cc12f4618ce8d71ca28ef3708f4e98e1318ab6f06ecfffb6781b893f271c89c" name = "github.com/mattn/go-sqlite3" packages = ["."] + pruneopts = "" revision = "ca5e3819723d8eeaf170ad510e7da1d6d2e94a08" version = "v1.2.0" [[projects]] + digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" name = "github.com/pmezard/go-difflib" packages = ["difflib"] + pruneopts = "" revision = "792786c7400a136282c1664665ae0a8db921c6c2" version = "v1.0.0" [[projects]] branch = "master" + digest = "1:68a81aa25065b50a4bf1ffd115ff3634704f61f675d0140b31492e9fcca55421" name = "github.com/shopspring/decimal" packages = ["."] + pruneopts = "" revision = "aed1bfe463fa3c9cc268d60dcc1491db613bff7e" [[projects]] branch = "master" + digest = "1:ed7ac53c7d59041f27964d3f04e021b45ecb5f23c842c84d778a7f1fb67e2ce9" name = "github.com/stretchr/objx" packages = ["."] + pruneopts = "" revision = "1a9d0bb9f541897e62256577b352fdbc1fb4fd94" [[projects]] + digest = "1:3926a4ec9a4ff1a072458451aa2d9b98acd059a45b38f7335d31e06c3d6a0159" name = "github.com/stretchr/testify" - packages = ["assert","mock","require","suite"] + packages = [ + "assert", + "mock", + "require", + "suite", + ] + pruneopts = "" revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0" version = "v1.1.4" [[projects]] branch = "master" + digest = "1:898bc7c802c1e0c20cecd65811e90b7b9bc5651b4a07aefd159451bfb200b2b3" name = "golang.org/x/net" packages = ["context"] + pruneopts = "" revision = "a04bdaca5b32abe1c069418fb7088ae607de5bd0" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "6efc9f467166be5af0c9b9f4b98d7860ba12b50ce641a2fba765049bd1ea4f27" + input-imports = [ + "github.com/globalsign/mgo", + "github.com/globalsign/mgo/bson", + "github.com/mattn/go-sqlite3", + "github.com/shopspring/decimal", + "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/mock", + "github.com/stretchr/testify/require", + "github.com/stretchr/testify/suite", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/config/configuration.go b/config/configuration.go index cf208cfa7..dbc74e86c 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -41,6 +41,8 @@ const ( SQLStoreDriver string = "SQLStoreDriver" SQLStoreDataSourceName string = "SQLStoreDataSourceName" SQLStoreConnMaxLifetime string = "SQLStoreConnMaxLifetime" + MongoStoreConnection string = "MongoStoreConnection" + MongoStoreDatabase string = "MongoStoreDatabase" ValidateFieldsOutOfOrder string = "ValidateFieldsOutOfOrder" ResendRequestChunkSize string = "ResendRequestChunkSize" EnableLastMsgSeqNumProcessed string = "EnableLastMsgSeqNumProcessed" diff --git a/mongostore.go b/mongostore.go new file mode 100644 index 000000000..c22ab0af6 --- /dev/null +++ b/mongostore.go @@ -0,0 +1,249 @@ +package quickfix + +import ( + "fmt" + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" + "time" + + "github.com/quickfixgo/quickfix/config" +) + +type mongoStoreFactory struct { + settings *Settings +} + +type mongoStore struct { + sessionID SessionID + cache *memoryStore + mongoUrl string + mongoDatabase string + db *mgo.Session +} + +// NewMongoStoreFactory returns a mongo-based implementation of MessageStoreFactory +func NewMongoStoreFactory(settings *Settings) MessageStoreFactory { + return mongoStoreFactory{settings: settings} +} + +// Create creates a new MongoStore implementation of the MessageStore interface +func (f mongoStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, err error) { + sessionSettings, ok := f.settings.SessionSettings()[sessionID] + if !ok { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } + mongoConnectionUrl, err := sessionSettings.Setting(config.MongoStoreConnection) + if err != nil { + return nil, err + } + mongoDatabase, err := sessionSettings.Setting(config.MongoStoreDatabase) + if err != nil { + return nil, err + } + return newMongoStore(sessionID, mongoConnectionUrl, mongoDatabase) +} + +func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string) (*mongoStore, error) { + store := &mongoStore{ + sessionID: sessionID, + cache: &memoryStore{}, + mongoUrl: mongoUrl, + mongoDatabase: mongoDatabase, + } + store.cache.Reset() + + if conn, err := mgo.Dial(mongoUrl); err != nil { + return nil, err + } else { + store.db = conn + } + + if err := store.populateCache(); err != nil { + return nil, err + } + + return store, nil +} + +func generateMessageFilter(s *SessionID) (messageFilter *MongoQuickFixEntryData) { + messageFilter = &MongoQuickFixEntryData{ + BeginString: s.BeginString, + SessionQualifier: s.Qualifier, + SenderCompId: s.SenderCompID, + SenderSubId: s.SenderSubID, + SenderLocId: s.SenderLocationID, + TargetCompId: s.TargetCompID, + TargetSubId: s.TargetSubID, + TargetLocId: s.TargetLocationID, + } + return +} + +type MongoQuickFixEntryData struct { + //Message specific data + Msgseq int `bson:"msgseq,omitempty"` + Message []byte `bson:"message,omitempty"` + //Session specific data + CreationTime time.Time `bson:"creation_time,omitempty"` + IncomingSeqNum int `bson:"incoming_seq_num,omitempty"` + OutgoingSeqNum int `bson:"outgoing_seq_num,omitempty"` + //Indexed data + BeginString string `bson:"begin_string"` + SessionQualifier string `bson:"session_qualifier"` + SenderCompId string `bson:"sender_comp_id"` + SenderSubId string `bson:"sender_sub_id"` + SenderLocId string `bson:"sender_loc_id"` + TargetCompId string `bson:"target_comp_id"` + TargetSubId string `bson:"target_sub_id"` + TargetLocId string `bson:"target_loc_id"` +} + +// Reset deletes the store records and sets the seqnums back to 1 +func (store *mongoStore) Reset() error { + msgFilter := generateMessageFilter(&store.sessionID) + _, err := store.db.DB(store.mongoDatabase).C("messages").RemoveAll(msgFilter) + + if err != nil { + return err + } + + if err = store.cache.Reset(); err != nil { + return err + } + + sessionUpdate := generateMessageFilter(&store.sessionID) + sessionUpdate.CreationTime = store.cache.CreationTime() + sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() + sessionUpdate.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() + err = store.db.DB(store.mongoDatabase).C("sessions").Update(msgFilter, sessionUpdate) + + return err +} + +// Refresh reloads the store from the database +func (store *mongoStore) Refresh() error { + if err := store.cache.Reset(); err != nil { + return err + } + return store.populateCache() +} + +func (store *mongoStore) populateCache() (err error) { + msgFilter := generateMessageFilter(&store.sessionID) + query := store.db.DB(store.mongoDatabase).C("sessions").Find(msgFilter) + + if cnt, err := query.Count(); err == nil && cnt > 0 { + // session record found, load it + sessionData := &MongoQuickFixEntryData{} + err = query.One(&sessionData) + if err == nil { + store.cache.creationTime = sessionData.CreationTime + store.cache.SetNextTargetMsgSeqNum(sessionData.IncomingSeqNum) + store.cache.SetNextSenderMsgSeqNum(sessionData.OutgoingSeqNum) + } + } else if err == nil && cnt == 0 { + // session record not found, create it + msgFilter.CreationTime = store.cache.creationTime + msgFilter.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() + msgFilter.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() + err = store.db.DB(store.mongoDatabase).C("sessions").Insert(msgFilter) + } + return +} + +// NextSenderMsgSeqNum returns the next MsgSeqNum that will be sent +func (store *mongoStore) NextSenderMsgSeqNum() int { + return store.cache.NextSenderMsgSeqNum() +} + +// NextTargetMsgSeqNum returns the next MsgSeqNum that should be received +func (store *mongoStore) NextTargetMsgSeqNum() int { + return store.cache.NextTargetMsgSeqNum() +} + +// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent +func (store *mongoStore) SetNextSenderMsgSeqNum(next int) error { + msgFilter := generateMessageFilter(&store.sessionID) + sessionUpdate := generateMessageFilter(&store.sessionID) + sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() + sessionUpdate.OutgoingSeqNum = next + sessionUpdate.CreationTime = store.cache.CreationTime() + err := store.db.DB(store.mongoDatabase).C("sessions").Update(msgFilter, sessionUpdate) + if err != nil { + return err + } + return store.cache.SetNextSenderMsgSeqNum(next) +} + +// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received +func (store *mongoStore) SetNextTargetMsgSeqNum(next int) error { + msgFilter := generateMessageFilter(&store.sessionID) + sessionUpdate := generateMessageFilter(&store.sessionID) + sessionUpdate.IncomingSeqNum = next + sessionUpdate.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() + sessionUpdate.CreationTime = store.cache.CreationTime() + err := store.db.DB(store.mongoDatabase).C("sessions").Update(msgFilter, sessionUpdate) + if err != nil { + return err + } + return store.cache.SetNextTargetMsgSeqNum(next) +} + +// IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent +func (store *mongoStore) IncrNextSenderMsgSeqNum() error { + store.cache.IncrNextSenderMsgSeqNum() + return store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum()) +} + +// IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received +func (store *mongoStore) IncrNextTargetMsgSeqNum() error { + store.cache.IncrNextTargetMsgSeqNum() + return store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum()) +} + +// CreationTime returns the creation time of the store +func (store *mongoStore) CreationTime() time.Time { + return store.cache.CreationTime() +} + +func (store *mongoStore) SaveMessage(seqNum int, msg []byte) (err error) { + msgFilter := generateMessageFilter(&store.sessionID) + msgFilter.Msgseq = seqNum + msgFilter.Message = msg + err = store.db.DB(store.mongoDatabase).C("messages").Insert(msgFilter) + return +} + +func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte, err error) { + msgFilter := generateMessageFilter(&store.sessionID) + //Marshal into database form + msgFilterBytes, err := bson.Marshal(msgFilter) + if err != nil { + return + } + seqFilter := bson.M{} + err = bson.Unmarshal(msgFilterBytes, &seqFilter) + if err != nil { + return + } + //Modify the query to use a range for the sequence filter + seqFilter["msgseq"] = bson.M{ + "$gte": beginSeqNum, + "$lte": endSeqNum, + } + + iter := store.db.DB(store.mongoDatabase).C("messages").Find(seqFilter).Sort("msgseq").Iter() + for iter.Next(msgFilter) { + msgs = append(msgs, msgFilter.Message) + } + return +} + +// Close closes the store's database connection +func (store *mongoStore) Close() error { + if store.db != nil { + store.db.Close() + store.db = nil + } + return nil +} From 114f6049ce6e46c3e8d543de5a736d30206dbb70 Mon Sep 17 00:00:00 2001 From: Michael Wilner Date: Tue, 30 Oct 2018 08:58:10 -0500 Subject: [PATCH 2/6] Implement tests for the Mongo backing store --- mongostore_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 mongostore_test.go diff --git a/mongostore_test.go b/mongostore_test.go new file mode 100644 index 000000000..21ca84a8d --- /dev/null +++ b/mongostore_test.go @@ -0,0 +1,52 @@ +package quickfix + +import ( + "fmt" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "log" + "os" + "strings" + "testing" +) + +// MongoStoreTestSuite runs all tests in the MessageStoreTestSuite against the MongoStore implementation +type MongoStoreTestSuite struct { + MessageStoreTestSuite +} + +func (suite *MongoStoreTestSuite) SetupTest() { + mongoDbCxn := os.Getenv("MONGODB_TEST_CXN") + if len(mongoDbCxn) <= 0 { + log.Println("MONGODB_TEST_CXN environment arg is not provided, skipping...") + suite.T().SkipNow() + } + mongoDatabase := "automated_testing_database" + + // create settings + sessionID := SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} + settings, err := ParseSettings(strings.NewReader(fmt.Sprintf(` +[DEFAULT] +MongoStoreConnection=%s +MongoStoreDatabase=%s + +[SESSION] +BeginString=%s +SenderCompID=%s +TargetCompID=%s`, mongoDbCxn, mongoDatabase, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID))) + require.Nil(suite.T(), err) + + // create store + suite.msgStore, err = NewMongoStoreFactory(settings).Create(sessionID) + require.Nil(suite.T(), err) + err = suite.msgStore.Reset() + require.Nil(suite.T(), err) +} + +func (suite *MongoStoreTestSuite) TearDownTest() { + suite.msgStore.Close() +} + +func TestMongoStoreTestSuite(t *testing.T) { + suite.Run(t, new(MongoStoreTestSuite)) +} From 6db55c044892d56df0f4064da3fa850277f73663 Mon Sep 17 00:00:00 2001 From: Michael Wilner Date: Tue, 30 Oct 2018 09:05:10 -0500 Subject: [PATCH 3/6] Add mongo testing to travis --- .travis.yml | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index b6f063217..55955e15c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,16 +5,22 @@ go: - 1.9 - tip +services: + - mongodb + env: - - FIX_TEST= - - FIX_TEST=fix40 - - FIX_TEST=fix41 - - FIX_TEST=fix42 - - FIX_TEST=fix43 - - FIX_TEST=fix44 - - FIX_TEST=fix50 - - FIX_TEST=fix50sp1 - - FIX_TEST=fix50sp2 + global: + - MONGODB_TEST_CXN=localhost + matrix: + - FIX_TEST= + - FIX_TEST=fix40 + - FIX_TEST=fix41 + - FIX_TEST=fix42 + - FIX_TEST=fix43 + - FIX_TEST=fix44 + - FIX_TEST=fix50 + - FIX_TEST=fix50sp1 + - FIX_TEST=fix50sp2 matrix: allow_failures: From 11e701f3f708bb66ca53c973bedc33b3a08fed60 Mon Sep 17 00:00:00 2001 From: Michael Wilner Date: Tue, 30 Oct 2018 09:46:35 -0500 Subject: [PATCH 4/6] Update docs for the new MongoDB config params --- config/doc.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/config/doc.go b/config/doc.go index 2338543f0..58af30adb 100644 --- a/config/doc.go +++ b/config/doc.go @@ -290,6 +290,14 @@ FileStorePath Directory to store sequence number and message files. Only used with FileStoreFactory. +MongoStoreConnection + +The MongoDB connection URL to use (see https://godoc.org/github.com/globalsign/mgo#Dial for the URL Format). Only used with MongoStoreFactory. + +MongoStoreDatabase + +The MongoDB-specific name of the database to use. Only used with MongoStoreFactory. + SQLStoreDriver The name of the database driver to use (see https://github.com/golang/go/wiki/SQLDrivers for the list of available drivers). Only used with SqlStoreFactory. From 11fb6d75cd902647d58ea0a60361380a782e2461 Mon Sep 17 00:00:00 2001 From: Michael Wilner Date: Tue, 30 Oct 2018 10:20:26 -0500 Subject: [PATCH 5/6] Enable custom prefixes for mongo storage --- mongostore.go | 57 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/mongostore.go b/mongostore.go index c22ab0af6..08656c2b5 100644 --- a/mongostore.go +++ b/mongostore.go @@ -10,20 +10,33 @@ import ( ) type mongoStoreFactory struct { - settings *Settings + settings *Settings + messagesCollection string + sessionsCollection string } type mongoStore struct { - sessionID SessionID - cache *memoryStore - mongoUrl string - mongoDatabase string - db *mgo.Session + sessionID SessionID + cache *memoryStore + mongoUrl string + mongoDatabase string + db *mgo.Session + messagesCollection string + sessionsCollection string } // NewMongoStoreFactory returns a mongo-based implementation of MessageStoreFactory func NewMongoStoreFactory(settings *Settings) MessageStoreFactory { - return mongoStoreFactory{settings: settings} + return NewMongoStoreFactoryPrefixed(settings, "") +} + +// NewMongoStoreFactoryPrefixed returns a mongo-based implementation of MessageStoreFactory, with prefix on collections +func NewMongoStoreFactoryPrefixed(settings *Settings, collectionsPrefix string) MessageStoreFactory { + return mongoStoreFactory{ + settings: settings, + messagesCollection: collectionsPrefix + "messages", + sessionsCollection: collectionsPrefix + "sessions", + } } // Create creates a new MongoStore implementation of the MessageStore interface @@ -40,15 +53,17 @@ func (f mongoStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, e if err != nil { return nil, err } - return newMongoStore(sessionID, mongoConnectionUrl, mongoDatabase) + return newMongoStore(sessionID, mongoConnectionUrl, mongoDatabase, f.messagesCollection, f.sessionsCollection) } -func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string) (*mongoStore, error) { +func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string, messagesCollection string, sessionsCollection string) (*mongoStore, error) { store := &mongoStore{ - sessionID: sessionID, - cache: &memoryStore{}, - mongoUrl: mongoUrl, - mongoDatabase: mongoDatabase, + sessionID: sessionID, + cache: &memoryStore{}, + mongoUrl: mongoUrl, + mongoDatabase: mongoDatabase, + messagesCollection: messagesCollection, + sessionsCollection: sessionsCollection, } store.cache.Reset() @@ -101,7 +116,7 @@ type MongoQuickFixEntryData struct { // Reset deletes the store records and sets the seqnums back to 1 func (store *mongoStore) Reset() error { msgFilter := generateMessageFilter(&store.sessionID) - _, err := store.db.DB(store.mongoDatabase).C("messages").RemoveAll(msgFilter) + _, err := store.db.DB(store.mongoDatabase).C(store.messagesCollection).RemoveAll(msgFilter) if err != nil { return err @@ -115,7 +130,7 @@ func (store *mongoStore) Reset() error { sessionUpdate.CreationTime = store.cache.CreationTime() sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() sessionUpdate.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() - err = store.db.DB(store.mongoDatabase).C("sessions").Update(msgFilter, sessionUpdate) + err = store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate) return err } @@ -130,7 +145,7 @@ func (store *mongoStore) Refresh() error { func (store *mongoStore) populateCache() (err error) { msgFilter := generateMessageFilter(&store.sessionID) - query := store.db.DB(store.mongoDatabase).C("sessions").Find(msgFilter) + query := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Find(msgFilter) if cnt, err := query.Count(); err == nil && cnt > 0 { // session record found, load it @@ -146,7 +161,7 @@ func (store *mongoStore) populateCache() (err error) { msgFilter.CreationTime = store.cache.creationTime msgFilter.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() msgFilter.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() - err = store.db.DB(store.mongoDatabase).C("sessions").Insert(msgFilter) + err = store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Insert(msgFilter) } return } @@ -168,7 +183,7 @@ func (store *mongoStore) SetNextSenderMsgSeqNum(next int) error { sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() sessionUpdate.OutgoingSeqNum = next sessionUpdate.CreationTime = store.cache.CreationTime() - err := store.db.DB(store.mongoDatabase).C("sessions").Update(msgFilter, sessionUpdate) + err := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate) if err != nil { return err } @@ -182,7 +197,7 @@ func (store *mongoStore) SetNextTargetMsgSeqNum(next int) error { sessionUpdate.IncomingSeqNum = next sessionUpdate.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() sessionUpdate.CreationTime = store.cache.CreationTime() - err := store.db.DB(store.mongoDatabase).C("sessions").Update(msgFilter, sessionUpdate) + err := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate) if err != nil { return err } @@ -210,7 +225,7 @@ func (store *mongoStore) SaveMessage(seqNum int, msg []byte) (err error) { msgFilter := generateMessageFilter(&store.sessionID) msgFilter.Msgseq = seqNum msgFilter.Message = msg - err = store.db.DB(store.mongoDatabase).C("messages").Insert(msgFilter) + err = store.db.DB(store.mongoDatabase).C(store.messagesCollection).Insert(msgFilter) return } @@ -232,7 +247,7 @@ func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte, "$lte": endSeqNum, } - iter := store.db.DB(store.mongoDatabase).C("messages").Find(seqFilter).Sort("msgseq").Iter() + iter := store.db.DB(store.mongoDatabase).C(store.messagesCollection).Find(seqFilter).Sort("msgseq").Iter() for iter.Next(msgFilter) { msgs = append(msgs, msgFilter.Message) } From 2ae607bc4bb0cbb95140fe6501dba4c364cc5003 Mon Sep 17 00:00:00 2001 From: Michael Wilner Date: Tue, 30 Oct 2018 13:31:23 -0500 Subject: [PATCH 6/6] Address action items from review --- mongostore.go | 31 ++++++++++++------------------- 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/mongostore.go b/mongostore.go index 08656c2b5..b57232c40 100644 --- a/mongostore.go +++ b/mongostore.go @@ -56,8 +56,8 @@ func (f mongoStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, e return newMongoStore(sessionID, mongoConnectionUrl, mongoDatabase, f.messagesCollection, f.sessionsCollection) } -func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string, messagesCollection string, sessionsCollection string) (*mongoStore, error) { - store := &mongoStore{ +func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string, messagesCollection string, sessionsCollection string) (store *mongoStore, err error) { + store = &mongoStore{ sessionID: sessionID, cache: &memoryStore{}, mongoUrl: mongoUrl, @@ -67,21 +67,16 @@ func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string, m } store.cache.Reset() - if conn, err := mgo.Dial(mongoUrl); err != nil { - return nil, err - } else { - store.db = conn - } - - if err := store.populateCache(); err != nil { - return nil, err + if store.db, err = mgo.Dial(mongoUrl); err != nil { + return } + err = store.populateCache() - return store, nil + return } -func generateMessageFilter(s *SessionID) (messageFilter *MongoQuickFixEntryData) { - messageFilter = &MongoQuickFixEntryData{ +func generateMessageFilter(s *SessionID) (messageFilter *mongoQuickFixEntryData) { + messageFilter = &mongoQuickFixEntryData{ BeginString: s.BeginString, SessionQualifier: s.Qualifier, SenderCompId: s.SenderCompID, @@ -94,7 +89,7 @@ func generateMessageFilter(s *SessionID) (messageFilter *MongoQuickFixEntryData) return } -type MongoQuickFixEntryData struct { +type mongoQuickFixEntryData struct { //Message specific data Msgseq int `bson:"msgseq,omitempty"` Message []byte `bson:"message,omitempty"` @@ -149,7 +144,7 @@ func (store *mongoStore) populateCache() (err error) { if cnt, err := query.Count(); err == nil && cnt > 0 { // session record found, load it - sessionData := &MongoQuickFixEntryData{} + sessionData := &mongoQuickFixEntryData{} err = query.One(&sessionData) if err == nil { store.cache.creationTime = sessionData.CreationTime @@ -183,8 +178,7 @@ func (store *mongoStore) SetNextSenderMsgSeqNum(next int) error { sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() sessionUpdate.OutgoingSeqNum = next sessionUpdate.CreationTime = store.cache.CreationTime() - err := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate) - if err != nil { + if err := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate); err != nil { return err } return store.cache.SetNextSenderMsgSeqNum(next) @@ -197,8 +191,7 @@ func (store *mongoStore) SetNextTargetMsgSeqNum(next int) error { sessionUpdate.IncomingSeqNum = next sessionUpdate.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() sessionUpdate.CreationTime = store.cache.CreationTime() - err := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate) - if err != nil { + if err := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate); err != nil { return err } return store.cache.SetNextTargetMsgSeqNum(next)