Skip to content

Commit 6c3be11

Browse files
pentaflopsqaisjp
andauthored
Multi-thread DB Queue (#1242)
* Multi-thread DB Queue * Missing chg * Some fixes Co-authored-by: Qais Patankar <[email protected]>
1 parent a3695fb commit 6c3be11

File tree

8 files changed

+119
-98
lines changed

8 files changed

+119
-98
lines changed

Server/mods/deathmatch/logic/CDatabaseJobQueue.cpp

Lines changed: 51 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class CDatabaseJobQueueImpl : public CDatabaseJobQueue
3636
virtual bool FreeCommand(CDbJobData* pJobData);
3737
virtual CDbJobData* FindCommandFromId(SDbJobId id);
3838
virtual void IgnoreConnectionResults(SConnectionHandle connectionHandle);
39-
virtual bool UsesConnection(SConnectionHandle connectionHandle);
39+
virtual bool IsConnectionClosed();
40+
virtual int GetQueueSize();
4041

4142
protected:
4243
void StopThread();
@@ -66,10 +67,12 @@ class CDatabaseJobQueueImpl : public CDatabaseJobQueue
6667
uint m_uiJobCount10sMin;
6768
CElapsedTime m_JobCountElpasedTime;
6869
std::set<SConnectionHandle> m_PendingFlushMap;
70+
CDatabaseConnection* m_pConnection = nullptr;
71+
SConnectionHandle m_connectionHandle;
72+
bool m_bConnectionClosed = true;
6973

7074
// Other thread variables
7175
std::map<SString, CDatabaseType*> m_DatabaseTypeMap;
72-
uint m_uiConnectionCountWarnThresh;
7376
EJobLogLevelType m_LogLevel;
7477
SString m_strLogFilename;
7578

@@ -81,7 +84,6 @@ class CDatabaseJobQueueImpl : public CDatabaseJobQueue
8184
CJobQueueType m_CommandQueue;
8285
CJobQueueType m_ResultQueue;
8386
CComboMutex m_Mutex;
84-
std::map<SConnectionHandle, CDatabaseConnection*> m_HandleConnectionMap;
8587
} shared;
8688
};
8789

@@ -100,7 +102,7 @@ CDatabaseJobQueue* NewDatabaseJobQueue()
100102
// Init known database types and start the job service thread
101103
//
102104
///////////////////////////////////////////////////////////////
103-
CDatabaseJobQueueImpl::CDatabaseJobQueueImpl() : m_uiJobCountWarnThresh(200), m_uiConnectionCountWarnThresh(20)
105+
CDatabaseJobQueueImpl::CDatabaseJobQueueImpl() : m_uiJobCountWarnThresh(200)
104106
{
105107
// Add known database types
106108
CDatabaseType* pDatabaseTypeSqlite = NewDatabaseTypeSqlite();
@@ -293,13 +295,6 @@ void CDatabaseJobQueueImpl::UpdateDebugData()
293295

294296
shared.m_Mutex.Lock();
295297

296-
// Log to console if connection count is creeping up
297-
if (shared.m_HandleConnectionMap.size() > m_uiConnectionCountWarnThresh)
298-
{
299-
m_uiConnectionCountWarnThresh = shared.m_HandleConnectionMap.size() * 2;
300-
CLogger::LogPrintf("Notice: There are now %d database connections\n", shared.m_HandleConnectionMap.size());
301-
}
302-
303298
// Log to console if job count is creeping up
304299
m_uiJobCount10sMin = std::min<uint>(m_uiJobCount10sMin, m_ActiveJobHandles.size());
305300
if (m_uiJobCount10sMin > m_uiJobCountWarnThresh)
@@ -502,16 +497,16 @@ void CDatabaseJobQueueImpl::IgnoreJobResults(CDbJobData* pJobData)
502497
pJobData->result.bIgnoreResult = true;
503498
}
504499

505-
///////////////////////////////////////////////////////////////
506-
//
507-
// CDatabaseJobQueueImpl::UsesConnection
508-
//
509-
// Return true if supplied connection is used by this queue
510-
//
511-
///////////////////////////////////////////////////////////////
512-
bool CDatabaseJobQueueImpl::UsesConnection(SConnectionHandle connectionHandle)
500+
///////////////////////////////////////////////////////////////
501+
//
502+
// CDatabaseJobQueueImpl::IsConnectionClose
503+
//
504+
// Return true if connection was closed
505+
//
506+
///////////////////////////////////////////////////////////////
507+
bool CDatabaseJobQueueImpl::IsConnectionClosed()
513508
{
514-
return GetConnectionFromHandle(connectionHandle) != nullptr;
509+
return m_bConnectionClosed || !m_pConnection;
515510
}
516511

517512
//
@@ -659,10 +654,10 @@ void CDatabaseJobQueueImpl::ProcessConnect(CDbJobData* pJobData)
659654
if (pTypeManager->GetDataSourceTag() != "mysql")
660655
pConnection->m_SuppressedErrorCodes.clear();
661656

662-
// Associate handle with CDatabaseConnection*
663-
shared.m_Mutex.Lock();
664-
MapSet(shared.m_HandleConnectionMap, pJobData->command.connectionHandle, pConnection);
665-
shared.m_Mutex.Unlock();
657+
// Set current connection
658+
m_pConnection = pConnection;
659+
m_connectionHandle = pJobData->command.connectionHandle;
660+
m_bConnectionClosed = false;
666661

667662
// Set result
668663
pJobData->result.status = EJobResult::SUCCESS;
@@ -677,19 +672,18 @@ void CDatabaseJobQueueImpl::ProcessConnect(CDbJobData* pJobData)
677672
///////////////////////////////////////////////////////////////
678673
void CDatabaseJobQueueImpl::ProcessDisconnect(CDbJobData* pJobData)
679674
{
680-
// CDatabaseConnection* from handle
681-
CDatabaseConnection* pConnection = GetConnectionFromHandle(pJobData->command.connectionHandle);
682-
if (!pConnection)
675+
// Check connection active
676+
if (IsConnectionClosed())
683677
{
684678
pJobData->result.status = EJobResult::FAIL;
685679
pJobData->result.strReason = "Invalid connection";
686680
return;
687681
}
688682

689683
// And disconnect
690-
RemoveHandleForConnection(pJobData->command.connectionHandle, pConnection);
691-
pConnection->Release();
692-
pConnection = NULL;
684+
m_pConnection->Release();
685+
m_pConnection = NULL;
686+
m_bConnectionClosed = true;
693687

694688
// Set result
695689
pJobData->result.status = EJobResult::SUCCESS;
@@ -704,22 +698,21 @@ void CDatabaseJobQueueImpl::ProcessDisconnect(CDbJobData* pJobData)
704698
///////////////////////////////////////////////////////////////
705699
void CDatabaseJobQueueImpl::ProcessQuery(CDbJobData* pJobData)
706700
{
707-
// CDatabaseConnection* from handle
708-
CDatabaseConnection* pConnection = GetConnectionFromHandle(pJobData->command.connectionHandle);
709-
if (!pConnection)
701+
// Check connection active
702+
if (IsConnectionClosed())
710703
{
711704
pJobData->result.status = EJobResult::FAIL;
712705
pJobData->result.strReason = "Invalid connection";
713706
return;
714707
}
715708

716709
// And query
717-
if (!pConnection->Query(pJobData->command.strData, pJobData->result.registryResult))
710+
if (!m_pConnection->Query(pJobData->command.strData, pJobData->result.registryResult))
718711
{
719712
pJobData->result.status = EJobResult::FAIL;
720-
pJobData->result.strReason = pConnection->GetLastErrorMessage();
721-
pJobData->result.uiErrorCode = pConnection->GetLastErrorCode();
722-
pJobData->result.bErrorSuppressed = MapContains(pConnection->m_SuppressedErrorCodes, pConnection->GetLastErrorCode());
713+
pJobData->result.strReason = m_pConnection->GetLastErrorMessage();
714+
pJobData->result.uiErrorCode = m_pConnection->GetLastErrorCode();
715+
pJobData->result.bErrorSuppressed = MapContains(m_pConnection->m_SuppressedErrorCodes, m_pConnection->GetLastErrorCode());
723716
}
724717
else
725718
{
@@ -739,16 +732,15 @@ void CDatabaseJobQueueImpl::ProcessQuery(CDbJobData* pJobData)
739732
///////////////////////////////////////////////////////////////
740733
void CDatabaseJobQueueImpl::ProcessFlush(CDbJobData* pJobData)
741734
{
742-
CDatabaseConnection* pConnection = GetConnectionFromHandle(pJobData->command.connectionHandle);
743-
if (!pConnection)
735+
if (IsConnectionClosed())
744736
{
745737
pJobData->result.status = EJobResult::FAIL;
746738
pJobData->result.strReason = "Invalid connection";
747739
return;
748740
}
749741

750742
// Do flush
751-
pConnection->Flush();
743+
m_pConnection->Flush();
752744
pJobData->result.status = EJobResult::SUCCESS;
753745
}
754746

@@ -766,38 +758,6 @@ void CDatabaseJobQueueImpl::ProcessSetLogLevel(CDbJobData* pJobData)
766758
pJobData->result.status = EJobResult::SUCCESS;
767759
}
768760

769-
///////////////////////////////////////////////////////////////
770-
//
771-
// CDatabaseJobQueueImpl::GetConnectionFromHandle
772-
//
773-
//
774-
//
775-
///////////////////////////////////////////////////////////////
776-
CDatabaseConnection* CDatabaseJobQueueImpl::GetConnectionFromHandle(SConnectionHandle connectionHandle)
777-
{
778-
shared.m_Mutex.Lock();
779-
CDatabaseConnection* pConnection = MapFindRef(shared.m_HandleConnectionMap, connectionHandle);
780-
shared.m_Mutex.Unlock();
781-
return pConnection;
782-
}
783-
784-
///////////////////////////////////////////////////////////////
785-
//
786-
// CDatabaseJobQueueImpl::RemoveHandleForConnection
787-
//
788-
//
789-
//
790-
///////////////////////////////////////////////////////////////
791-
void CDatabaseJobQueueImpl::RemoveHandleForConnection(SConnectionHandle connectionHandle, CDatabaseConnection* pConnection)
792-
{
793-
shared.m_Mutex.Lock();
794-
if (!MapContains(shared.m_HandleConnectionMap, connectionHandle))
795-
CLogger::ErrorPrintf("RemoveHandleForConnection: Serious problem here\n");
796-
797-
MapRemove(shared.m_HandleConnectionMap, connectionHandle);
798-
shared.m_Mutex.Unlock();
799-
}
800-
801761
///////////////////////////////////////////////////////////////
802762
//
803763
// CDatabaseJobQueueImpl::LogResult
@@ -812,15 +772,14 @@ void CDatabaseJobQueueImpl::LogResult(CDbJobData* pJobData)
812772
return;
813773

814774
// Check logging status of connection
815-
CDatabaseConnection* pConnection = GetConnectionFromHandle(pJobData->command.connectionHandle);
816-
if (!pConnection || !pConnection->m_bLoggingEnabled)
775+
if (IsConnectionClosed() || !m_pConnection->m_bLoggingEnabled)
817776
return;
818777

819778
if (pJobData->result.status == EJobResult::SUCCESS)
820779
{
821780
if (m_LogLevel >= EJobLogLevel::ALL)
822781
{
823-
SString strLine("%s: [%s] SUCCESS: Affected rows:%d [Query:%s]\n", *GetLocalTimeString(true, true), *pConnection->m_strLogTag,
782+
SString strLine("%s: [%s] SUCCESS: Affected rows:%d [Query:%s]\n", *GetLocalTimeString(true, true), *m_pConnection->m_strLogTag,
824783
pJobData->result.registryResult->uiNumAffectedRows, *pJobData->GetCommandStringForLog());
825784
LogString(strLine);
826785
}
@@ -833,7 +792,7 @@ void CDatabaseJobQueueImpl::LogResult(CDbJobData* pJobData)
833792
if (pJobData->result.bErrorSuppressed && m_LogLevel != EJobLogLevel::ALL)
834793
return;
835794

836-
SString strLine("%s: [%s] FAIL: (%d) %s [Query:%s]\n", *GetLocalTimeString(true, true), *pConnection->m_strLogTag, pJobData->result.uiErrorCode,
795+
SString strLine("%s: [%s] FAIL: (%d) %s [Query:%s]\n", *GetLocalTimeString(true, true), *m_pConnection->m_strLogTag, pJobData->result.uiErrorCode,
837796
*pJobData->result.strReason, *pJobData->GetCommandStringForLog());
838797
LogString(strLine);
839798
}
@@ -851,3 +810,19 @@ void CDatabaseJobQueueImpl::LogString(const SString& strText)
851810
{
852811
FileAppend(m_strLogFilename, strText);
853812
}
813+
814+
///////////////////////////////////////////////////////////////
815+
//
816+
// CDatabaseJobQueueImpl::GetQueueSize
817+
//
818+
// Get count elements in queue
819+
//
820+
///////////////////////////////////////////////////////////////
821+
int CDatabaseJobQueueImpl::GetQueueSize()
822+
{
823+
shared.m_Mutex.Lock();
824+
int count = shared.m_CommandQueue.size();
825+
shared.m_Mutex.Unlock();
826+
827+
return count;
828+
}

Server/mods/deathmatch/logic/CDatabaseJobQueue.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class CDatabaseJobQueue
2727
virtual bool FreeCommand(CDbJobData* pJobData) = 0;
2828
virtual CDbJobData* FindCommandFromId(SDbJobId id) = 0;
2929
virtual void IgnoreConnectionResults(SConnectionHandle connectionHandle) = 0;
30-
virtual bool UsesConnection(SConnectionHandle connectionHandle) = 0;
30+
virtual bool IsConnectionClosed() = 0;
31+
virtual int GetQueueSize() = 0;
3132
};
3233

3334
CDatabaseJobQueue* NewDatabaseJobQueue();

Server/mods/deathmatch/logic/CDatabaseJobQueueManager.cpp

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ CDbJobData* CDatabaseJobQueueManager::AddCommand(EJobCommandType jobType, SConne
4242
if (jobType == EJobCommand::CONNECT)
4343
{
4444
connectionHandle = GetNextConnectionHandle();
45-
pJobQueue = GetQueueFromConnectCommand(strData);
45+
pJobQueue = GetQueueFromConnectCommand(connectionHandle);
4646
}
4747
else
4848
{
@@ -162,12 +162,7 @@ void CDatabaseJobQueueManager::SetLogLevel(EJobLogLevelType logLevel, const SStr
162162
///////////////////////////////////////////////////////////////
163163
CDatabaseJobQueue* CDatabaseJobQueueManager::FindQueueFromConnection(SConnectionHandle connectionHandle)
164164
{
165-
for (const auto iter : m_QueueNameMap)
166-
{
167-
if (iter.second->UsesConnection(connectionHandle))
168-
return iter.second;
169-
}
170-
return nullptr;
165+
return MapFindRef(m_QueueNameMap, connectionHandle);
171166
}
172167

173168
///////////////////////////////////////////////////////////////
@@ -178,21 +173,15 @@ CDatabaseJobQueue* CDatabaseJobQueueManager::FindQueueFromConnection(SConnection
178173
// Can't fail
179174
//
180175
///////////////////////////////////////////////////////////////
181-
CDatabaseJobQueue* CDatabaseJobQueueManager::GetQueueFromConnectCommand(const SString& strData)
176+
CDatabaseJobQueue* CDatabaseJobQueueManager::GetQueueFromConnectCommand(SConnectionHandle connectionHandle)
182177
{
183-
// Extract queue name from options
184-
std::vector<SString> parts;
185-
strData.Split("\1", parts);
186-
SString strQueueName;
187-
GetOption<CDbOptionsMap>(parts[4], "queue", strQueueName);
188-
189178
// Find queue with name
190-
CDatabaseJobQueue* pQueue = MapFindRef(m_QueueNameMap, strQueueName);
179+
CDatabaseJobQueue* pQueue = MapFindRef(m_QueueNameMap, connectionHandle);
191180
if (!pQueue)
192181
{
193182
// Add new queue
194183
pQueue = NewDatabaseJobQueue();
195-
MapSet(m_QueueNameMap, strQueueName, pQueue);
184+
MapSet(m_QueueNameMap, connectionHandle, pQueue);
196185
}
197186
return pQueue;
198187
}
@@ -212,7 +201,23 @@ SConnectionHandle CDatabaseJobQueueManager::GetNextConnectionHandle()
212201
m_ConnectionHandleCounter &= 0x000FFFFF;
213202
m_ConnectionHandleCounter |= 0x00200000;
214203
// TODO - check when all (1,048,575) ids are in use
215-
} while (FindQueueFromConnection(m_ConnectionHandleCounter));
204+
} while (MapContains(m_QueueNameMap, m_ConnectionHandleCounter));
216205

217206
return m_ConnectionHandleCounter;
218207
}
208+
209+
///////////////////////////////////////////////////////////////
210+
//
211+
// CDatabaseJobQueueManager::GetQueueSizeFromConnection
212+
//
213+
// Return count elements in queue
214+
//
215+
///////////////////////////////////////////////////////////////
216+
int CDatabaseJobQueueManager::GetQueueSizeFromConnection(SConnectionHandle connectionHandle)
217+
{
218+
CDatabaseJobQueue* pJobQueue = FindQueueFromConnection(connectionHandle);
219+
if (!pJobQueue)
220+
return -1;
221+
222+
return pJobQueue->GetQueueSize();
223+
}

Server/mods/deathmatch/logic/CDatabaseJobQueueManager.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ class CDatabaseJobQueueManager
2727
CDbJobData* FindCommandFromId(SDbJobId id);
2828
void IgnoreConnectionResults(SConnectionHandle connectionHandle);
2929
void SetLogLevel(EJobLogLevelType logLevel, const SString& strLogFilename);
30+
int GetQueueSizeFromConnection(SConnectionHandle connectionHandle);
3031

3132
protected:
32-
CDatabaseJobQueue* GetQueueFromConnectCommand(const SString& strData);
33+
CDatabaseJobQueue* GetQueueFromConnectCommand(SConnectionHandle connectionHandle);
3334
CDatabaseJobQueue* FindQueueFromConnection(SConnectionHandle connectionHandle);
3435
SConnectionHandle GetNextConnectionHandle();
3536

36-
std::map<SString, CDatabaseJobQueue*> m_QueueNameMap;
37-
SConnectionHandle m_ConnectionHandleCounter;
37+
std::map<SConnectionHandle, CDatabaseJobQueue*> m_QueueNameMap;
38+
SConnectionHandle m_ConnectionHandleCounter;
3839
};

Server/mods/deathmatch/logic/CDatabaseManager.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class CDatabaseManagerImpl : public CDatabaseManager
5050
CLuaArguments* pArgs = nullptr);
5151
virtual bool QueryWithCallbackf(SConnectionHandle hConnection, PFN_DBRESULT pfnDbResult, void* pCallbackContext, const char* szQuery, ...);
5252
virtual void SetLogLevel(EJobLogLevelType logLevel, const SString& strLogFilename);
53+
virtual int GetQueueSizeFromConnection(SConnectionHandle connectionHandle);
5354

5455
// CDatabaseManagerImpl
5556
SString InsertQueryArguments(SConnectionHandle hConnection, const SString& strQuery, CLuaArguments* pArgs);
@@ -564,6 +565,18 @@ void CDatabaseManagerImpl::SetLogLevel(EJobLogLevelType logLevel, const SString&
564565
return m_JobQueue->SetLogLevel(logLevel, strLogFilename);
565566
}
566567

568+
///////////////////////////////////////////////////////////////
569+
//
570+
// CDatabaseManagerImpl::GetQueueSizeFromConnection
571+
//
572+
//
573+
//
574+
///////////////////////////////////////////////////////////////
575+
int CDatabaseManagerImpl::GetQueueSizeFromConnection(SConnectionHandle connectionHandle)
576+
{
577+
return m_JobQueue->GetQueueSizeFromConnection(connectionHandle);
578+
}
579+
567580
///////////////////////////////////////////////////////////////
568581
//
569582
// CDatabaseManagerImpl::InsertQueryArguments

Server/mods/deathmatch/logic/CDatabaseManager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ class CDatabaseManager
165165
CLuaArguments* pArgs = nullptr) = 0;
166166
virtual bool QueryWithCallbackf(SConnectionHandle hConnection, PFN_DBRESULT pfnDbResult, void* pCallbackContext, const char* szQuery, ...) = 0;
167167
virtual void SetLogLevel(EJobLogLevelType logLevel, const SString& strLogFilename) = 0;
168+
virtual int GetQueueSizeFromConnection(SConnectionHandle connectionHandle) = 0;
168169
};
169170

170171
CDatabaseManager* NewDatabaseManager();

0 commit comments

Comments
 (0)