diff --git a/.travis.yml b/.travis.yml index b6f063217..55955e15c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,16 +5,22 @@ go: - 1.9 - tip +services: + - mongodb + env: - - FIX_TEST= - - FIX_TEST=fix40 - - FIX_TEST=fix41 - - FIX_TEST=fix42 - - FIX_TEST=fix43 - - FIX_TEST=fix44 - - FIX_TEST=fix50 - - FIX_TEST=fix50sp1 - - FIX_TEST=fix50sp2 + global: + - MONGODB_TEST_CXN=localhost + matrix: + - FIX_TEST= + - FIX_TEST=fix40 + - FIX_TEST=fix41 + - FIX_TEST=fix42 + - FIX_TEST=fix43 + - FIX_TEST=fix44 + - FIX_TEST=fix50 + - FIX_TEST=fix50sp1 + - FIX_TEST=fix50sp2 matrix: allow_failures: diff --git a/Gopkg.lock b/Gopkg.lock index 156d40f26..4d6e80a80 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,50 +2,92 @@ [[projects]] + digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" name = "github.com/davecgh/go-spew" packages = ["spew"] + pruneopts = "" revision = "346938d642f2ec3594ed81d874461961cd0faa76" version = "v1.1.0" [[projects]] + branch = "master" + digest = "1:e9ffb9315dce0051beb757d0f0fc25db57c4da654efc4eada4ea109c2d9da815" + name = "github.com/globalsign/mgo" + packages = [ + ".", + "bson", + "internal/json", + "internal/sasl", + "internal/scram", + ] + pruneopts = "" + revision = "eeefdecb41b842af6dc652aaea4026e8403e62df" + +[[projects]] + digest = "1:1cc12f4618ce8d71ca28ef3708f4e98e1318ab6f06ecfffb6781b893f271c89c" name = "github.com/mattn/go-sqlite3" packages = ["."] + pruneopts = "" revision = "ca5e3819723d8eeaf170ad510e7da1d6d2e94a08" version = "v1.2.0" [[projects]] + digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" name = "github.com/pmezard/go-difflib" packages = ["difflib"] + pruneopts = "" revision = "792786c7400a136282c1664665ae0a8db921c6c2" version = "v1.0.0" [[projects]] branch = "master" + digest = "1:68a81aa25065b50a4bf1ffd115ff3634704f61f675d0140b31492e9fcca55421" name = "github.com/shopspring/decimal" packages = ["."] + pruneopts = "" revision = "aed1bfe463fa3c9cc268d60dcc1491db613bff7e" [[projects]] branch = "master" + digest = "1:ed7ac53c7d59041f27964d3f04e021b45ecb5f23c842c84d778a7f1fb67e2ce9" name = "github.com/stretchr/objx" packages = ["."] + pruneopts = "" revision = "1a9d0bb9f541897e62256577b352fdbc1fb4fd94" [[projects]] + digest = "1:3926a4ec9a4ff1a072458451aa2d9b98acd059a45b38f7335d31e06c3d6a0159" name = "github.com/stretchr/testify" - packages = ["assert","mock","require","suite"] + packages = [ + "assert", + "mock", + "require", + "suite", + ] + pruneopts = "" revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0" version = "v1.1.4" [[projects]] branch = "master" + digest = "1:898bc7c802c1e0c20cecd65811e90b7b9bc5651b4a07aefd159451bfb200b2b3" name = "golang.org/x/net" packages = ["context"] + pruneopts = "" revision = "a04bdaca5b32abe1c069418fb7088ae607de5bd0" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "6efc9f467166be5af0c9b9f4b98d7860ba12b50ce641a2fba765049bd1ea4f27" + input-imports = [ + "github.com/globalsign/mgo", + "github.com/globalsign/mgo/bson", + "github.com/mattn/go-sqlite3", + "github.com/shopspring/decimal", + "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/mock", + "github.com/stretchr/testify/require", + "github.com/stretchr/testify/suite", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/config/configuration.go b/config/configuration.go index cf208cfa7..dbc74e86c 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -41,6 +41,8 @@ const ( SQLStoreDriver string = "SQLStoreDriver" SQLStoreDataSourceName string = "SQLStoreDataSourceName" SQLStoreConnMaxLifetime string = "SQLStoreConnMaxLifetime" + MongoStoreConnection string = "MongoStoreConnection" + MongoStoreDatabase string = "MongoStoreDatabase" ValidateFieldsOutOfOrder string = "ValidateFieldsOutOfOrder" ResendRequestChunkSize string = "ResendRequestChunkSize" EnableLastMsgSeqNumProcessed string = "EnableLastMsgSeqNumProcessed" diff --git a/config/doc.go b/config/doc.go index 2338543f0..58af30adb 100644 --- a/config/doc.go +++ b/config/doc.go @@ -290,6 +290,14 @@ FileStorePath Directory to store sequence number and message files. Only used with FileStoreFactory. +MongoStoreConnection + +The MongoDB connection URL to use (see https://godoc.org/github.com/globalsign/mgo#Dial for the URL Format). Only used with MongoStoreFactory. + +MongoStoreDatabase + +The MongoDB-specific name of the database to use. Only used with MongoStoreFactory. + SQLStoreDriver The name of the database driver to use (see https://github.com/golang/go/wiki/SQLDrivers for the list of available drivers). Only used with SqlStoreFactory. diff --git a/mongostore.go b/mongostore.go new file mode 100644 index 000000000..b57232c40 --- /dev/null +++ b/mongostore.go @@ -0,0 +1,257 @@ +package quickfix + +import ( + "fmt" + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" + "time" + + "github.com/quickfixgo/quickfix/config" +) + +type mongoStoreFactory struct { + settings *Settings + messagesCollection string + sessionsCollection string +} + +type mongoStore struct { + sessionID SessionID + cache *memoryStore + mongoUrl string + mongoDatabase string + db *mgo.Session + messagesCollection string + sessionsCollection string +} + +// NewMongoStoreFactory returns a mongo-based implementation of MessageStoreFactory +func NewMongoStoreFactory(settings *Settings) MessageStoreFactory { + return NewMongoStoreFactoryPrefixed(settings, "") +} + +// NewMongoStoreFactoryPrefixed returns a mongo-based implementation of MessageStoreFactory, with prefix on collections +func NewMongoStoreFactoryPrefixed(settings *Settings, collectionsPrefix string) MessageStoreFactory { + return mongoStoreFactory{ + settings: settings, + messagesCollection: collectionsPrefix + "messages", + sessionsCollection: collectionsPrefix + "sessions", + } +} + +// Create creates a new MongoStore implementation of the MessageStore interface +func (f mongoStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, err error) { + sessionSettings, ok := f.settings.SessionSettings()[sessionID] + if !ok { + return nil, fmt.Errorf("unknown session: %v", sessionID) + } + mongoConnectionUrl, err := sessionSettings.Setting(config.MongoStoreConnection) + if err != nil { + return nil, err + } + mongoDatabase, err := sessionSettings.Setting(config.MongoStoreDatabase) + if err != nil { + return nil, err + } + return newMongoStore(sessionID, mongoConnectionUrl, mongoDatabase, f.messagesCollection, f.sessionsCollection) +} + +func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string, messagesCollection string, sessionsCollection string) (store *mongoStore, err error) { + store = &mongoStore{ + sessionID: sessionID, + cache: &memoryStore{}, + mongoUrl: mongoUrl, + mongoDatabase: mongoDatabase, + messagesCollection: messagesCollection, + sessionsCollection: sessionsCollection, + } + store.cache.Reset() + + if store.db, err = mgo.Dial(mongoUrl); err != nil { + return + } + err = store.populateCache() + + return +} + +func generateMessageFilter(s *SessionID) (messageFilter *mongoQuickFixEntryData) { + messageFilter = &mongoQuickFixEntryData{ + BeginString: s.BeginString, + SessionQualifier: s.Qualifier, + SenderCompId: s.SenderCompID, + SenderSubId: s.SenderSubID, + SenderLocId: s.SenderLocationID, + TargetCompId: s.TargetCompID, + TargetSubId: s.TargetSubID, + TargetLocId: s.TargetLocationID, + } + return +} + +type mongoQuickFixEntryData struct { + //Message specific data + Msgseq int `bson:"msgseq,omitempty"` + Message []byte `bson:"message,omitempty"` + //Session specific data + CreationTime time.Time `bson:"creation_time,omitempty"` + IncomingSeqNum int `bson:"incoming_seq_num,omitempty"` + OutgoingSeqNum int `bson:"outgoing_seq_num,omitempty"` + //Indexed data + BeginString string `bson:"begin_string"` + SessionQualifier string `bson:"session_qualifier"` + SenderCompId string `bson:"sender_comp_id"` + SenderSubId string `bson:"sender_sub_id"` + SenderLocId string `bson:"sender_loc_id"` + TargetCompId string `bson:"target_comp_id"` + TargetSubId string `bson:"target_sub_id"` + TargetLocId string `bson:"target_loc_id"` +} + +// Reset deletes the store records and sets the seqnums back to 1 +func (store *mongoStore) Reset() error { + msgFilter := generateMessageFilter(&store.sessionID) + _, err := store.db.DB(store.mongoDatabase).C(store.messagesCollection).RemoveAll(msgFilter) + + if err != nil { + return err + } + + if err = store.cache.Reset(); err != nil { + return err + } + + sessionUpdate := generateMessageFilter(&store.sessionID) + sessionUpdate.CreationTime = store.cache.CreationTime() + sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() + sessionUpdate.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() + err = store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate) + + return err +} + +// Refresh reloads the store from the database +func (store *mongoStore) Refresh() error { + if err := store.cache.Reset(); err != nil { + return err + } + return store.populateCache() +} + +func (store *mongoStore) populateCache() (err error) { + msgFilter := generateMessageFilter(&store.sessionID) + query := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Find(msgFilter) + + if cnt, err := query.Count(); err == nil && cnt > 0 { + // session record found, load it + sessionData := &mongoQuickFixEntryData{} + err = query.One(&sessionData) + if err == nil { + store.cache.creationTime = sessionData.CreationTime + store.cache.SetNextTargetMsgSeqNum(sessionData.IncomingSeqNum) + store.cache.SetNextSenderMsgSeqNum(sessionData.OutgoingSeqNum) + } + } else if err == nil && cnt == 0 { + // session record not found, create it + msgFilter.CreationTime = store.cache.creationTime + msgFilter.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() + msgFilter.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() + err = store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Insert(msgFilter) + } + return +} + +// NextSenderMsgSeqNum returns the next MsgSeqNum that will be sent +func (store *mongoStore) NextSenderMsgSeqNum() int { + return store.cache.NextSenderMsgSeqNum() +} + +// NextTargetMsgSeqNum returns the next MsgSeqNum that should be received +func (store *mongoStore) NextTargetMsgSeqNum() int { + return store.cache.NextTargetMsgSeqNum() +} + +// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent +func (store *mongoStore) SetNextSenderMsgSeqNum(next int) error { + msgFilter := generateMessageFilter(&store.sessionID) + sessionUpdate := generateMessageFilter(&store.sessionID) + sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() + sessionUpdate.OutgoingSeqNum = next + sessionUpdate.CreationTime = store.cache.CreationTime() + if err := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate); err != nil { + return err + } + return store.cache.SetNextSenderMsgSeqNum(next) +} + +// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received +func (store *mongoStore) SetNextTargetMsgSeqNum(next int) error { + msgFilter := generateMessageFilter(&store.sessionID) + sessionUpdate := generateMessageFilter(&store.sessionID) + sessionUpdate.IncomingSeqNum = next + sessionUpdate.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() + sessionUpdate.CreationTime = store.cache.CreationTime() + if err := store.db.DB(store.mongoDatabase).C(store.sessionsCollection).Update(msgFilter, sessionUpdate); err != nil { + return err + } + return store.cache.SetNextTargetMsgSeqNum(next) +} + +// IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent +func (store *mongoStore) IncrNextSenderMsgSeqNum() error { + store.cache.IncrNextSenderMsgSeqNum() + return store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum()) +} + +// IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received +func (store *mongoStore) IncrNextTargetMsgSeqNum() error { + store.cache.IncrNextTargetMsgSeqNum() + return store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum()) +} + +// CreationTime returns the creation time of the store +func (store *mongoStore) CreationTime() time.Time { + return store.cache.CreationTime() +} + +func (store *mongoStore) SaveMessage(seqNum int, msg []byte) (err error) { + msgFilter := generateMessageFilter(&store.sessionID) + msgFilter.Msgseq = seqNum + msgFilter.Message = msg + err = store.db.DB(store.mongoDatabase).C(store.messagesCollection).Insert(msgFilter) + return +} + +func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) (msgs [][]byte, err error) { + msgFilter := generateMessageFilter(&store.sessionID) + //Marshal into database form + msgFilterBytes, err := bson.Marshal(msgFilter) + if err != nil { + return + } + seqFilter := bson.M{} + err = bson.Unmarshal(msgFilterBytes, &seqFilter) + if err != nil { + return + } + //Modify the query to use a range for the sequence filter + seqFilter["msgseq"] = bson.M{ + "$gte": beginSeqNum, + "$lte": endSeqNum, + } + + iter := store.db.DB(store.mongoDatabase).C(store.messagesCollection).Find(seqFilter).Sort("msgseq").Iter() + for iter.Next(msgFilter) { + msgs = append(msgs, msgFilter.Message) + } + return +} + +// Close closes the store's database connection +func (store *mongoStore) Close() error { + if store.db != nil { + store.db.Close() + store.db = nil + } + return nil +} diff --git a/mongostore_test.go b/mongostore_test.go new file mode 100644 index 000000000..21ca84a8d --- /dev/null +++ b/mongostore_test.go @@ -0,0 +1,52 @@ +package quickfix + +import ( + "fmt" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "log" + "os" + "strings" + "testing" +) + +// MongoStoreTestSuite runs all tests in the MessageStoreTestSuite against the MongoStore implementation +type MongoStoreTestSuite struct { + MessageStoreTestSuite +} + +func (suite *MongoStoreTestSuite) SetupTest() { + mongoDbCxn := os.Getenv("MONGODB_TEST_CXN") + if len(mongoDbCxn) <= 0 { + log.Println("MONGODB_TEST_CXN environment arg is not provided, skipping...") + suite.T().SkipNow() + } + mongoDatabase := "automated_testing_database" + + // create settings + sessionID := SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} + settings, err := ParseSettings(strings.NewReader(fmt.Sprintf(` +[DEFAULT] +MongoStoreConnection=%s +MongoStoreDatabase=%s + +[SESSION] +BeginString=%s +SenderCompID=%s +TargetCompID=%s`, mongoDbCxn, mongoDatabase, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID))) + require.Nil(suite.T(), err) + + // create store + suite.msgStore, err = NewMongoStoreFactory(settings).Create(sessionID) + require.Nil(suite.T(), err) + err = suite.msgStore.Reset() + require.Nil(suite.T(), err) +} + +func (suite *MongoStoreTestSuite) TearDownTest() { + suite.msgStore.Close() +} + +func TestMongoStoreTestSuite(t *testing.T) { + suite.Run(t, new(MongoStoreTestSuite)) +}