Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 76 additions & 51 deletions Server/mods/deathmatch/logic/CDatabaseJobQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class CDatabaseJobQueueImpl : public CDatabaseJobQueue
virtual bool FreeCommand(CDbJobData* pJobData);
virtual CDbJobData* FindCommandFromId(SDbJobId id);
virtual void IgnoreConnectionResults(SConnectionHandle connectionHandle);
virtual bool IsConnectionClosed();
virtual int GetQueueSize();
virtual bool UsesConnection(SConnectionHandle connectionHandle);

protected:
void StopThread();
Expand Down Expand Up @@ -67,12 +66,10 @@ class CDatabaseJobQueueImpl : public CDatabaseJobQueue
uint m_uiJobCount10sMin;
CElapsedTime m_JobCountElpasedTime;
std::set<SConnectionHandle> m_PendingFlushMap;
CDatabaseConnection* m_pConnection = nullptr;
SConnectionHandle m_connectionHandle;
bool m_bConnectionClosed = true;

// Other thread variables
std::map<SString, CDatabaseType*> m_DatabaseTypeMap;
uint m_uiConnectionCountWarnThresh;
EJobLogLevelType m_LogLevel;
SString m_strLogFilename;

Expand All @@ -84,6 +81,7 @@ class CDatabaseJobQueueImpl : public CDatabaseJobQueue
CJobQueueType m_CommandQueue;
CJobQueueType m_ResultQueue;
CComboMutex m_Mutex;
std::map<SConnectionHandle, CDatabaseConnection*> m_HandleConnectionMap;
} shared;
};

Expand All @@ -102,7 +100,7 @@ CDatabaseJobQueue* NewDatabaseJobQueue()
// Init known database types and start the job service thread
//
///////////////////////////////////////////////////////////////
CDatabaseJobQueueImpl::CDatabaseJobQueueImpl() : m_uiJobCountWarnThresh(200)
CDatabaseJobQueueImpl::CDatabaseJobQueueImpl() : m_uiJobCountWarnThresh(200), m_uiConnectionCountWarnThresh(20)
{
// Add known database types
CDatabaseType* pDatabaseTypeSqlite = NewDatabaseTypeSqlite();
Expand Down Expand Up @@ -295,6 +293,13 @@ void CDatabaseJobQueueImpl::UpdateDebugData()

shared.m_Mutex.Lock();

// Log to console if connection count is creeping up
if (shared.m_HandleConnectionMap.size() > m_uiConnectionCountWarnThresh)
{
m_uiConnectionCountWarnThresh = shared.m_HandleConnectionMap.size() * 2;
CLogger::LogPrintf("Notice: There are now %d database connections\n", shared.m_HandleConnectionMap.size());
}

// Log to console if job count is creeping up
m_uiJobCount10sMin = std::min<uint>(m_uiJobCount10sMin, m_ActiveJobHandles.size());
if (m_uiJobCount10sMin > m_uiJobCountWarnThresh)
Expand Down Expand Up @@ -497,16 +502,16 @@ void CDatabaseJobQueueImpl::IgnoreJobResults(CDbJobData* pJobData)
pJobData->result.bIgnoreResult = true;
}

///////////////////////////////////////////////////////////////
//
// CDatabaseJobQueueImpl::IsConnectionClose
//
// Return true if connection was closed
//
///////////////////////////////////////////////////////////////
bool CDatabaseJobQueueImpl::IsConnectionClosed()
///////////////////////////////////////////////////////////////
//
// CDatabaseJobQueueImpl::UsesConnection
//
// Return true if supplied connection is used by this queue
//
///////////////////////////////////////////////////////////////
bool CDatabaseJobQueueImpl::UsesConnection(SConnectionHandle connectionHandle)
{
return m_bConnectionClosed || !m_pConnection;
return GetConnectionFromHandle(connectionHandle) != nullptr;
}

//
Expand Down Expand Up @@ -654,10 +659,10 @@ void CDatabaseJobQueueImpl::ProcessConnect(CDbJobData* pJobData)
if (pTypeManager->GetDataSourceTag() != "mysql")
pConnection->m_SuppressedErrorCodes.clear();

// Set current connection
m_pConnection = pConnection;
m_connectionHandle = pJobData->command.connectionHandle;
m_bConnectionClosed = false;
// Associate handle with CDatabaseConnection*
shared.m_Mutex.Lock();
MapSet(shared.m_HandleConnectionMap, pJobData->command.connectionHandle, pConnection);
shared.m_Mutex.Unlock();

// Set result
pJobData->result.status = EJobResult::SUCCESS;
Expand All @@ -672,18 +677,19 @@ void CDatabaseJobQueueImpl::ProcessConnect(CDbJobData* pJobData)
///////////////////////////////////////////////////////////////
void CDatabaseJobQueueImpl::ProcessDisconnect(CDbJobData* pJobData)
{
// Check connection active
if (IsConnectionClosed())
// CDatabaseConnection* from handle
CDatabaseConnection* pConnection = GetConnectionFromHandle(pJobData->command.connectionHandle);
if (!pConnection)
{
pJobData->result.status = EJobResult::FAIL;
pJobData->result.strReason = "Invalid connection";
return;
}

// And disconnect
m_pConnection->Release();
m_pConnection = NULL;
m_bConnectionClosed = true;
RemoveHandleForConnection(pJobData->command.connectionHandle, pConnection);
pConnection->Release();
pConnection = NULL;

// Set result
pJobData->result.status = EJobResult::SUCCESS;
Expand All @@ -698,21 +704,22 @@ void CDatabaseJobQueueImpl::ProcessDisconnect(CDbJobData* pJobData)
///////////////////////////////////////////////////////////////
void CDatabaseJobQueueImpl::ProcessQuery(CDbJobData* pJobData)
{
// Check connection active
if (IsConnectionClosed())
// CDatabaseConnection* from handle
CDatabaseConnection* pConnection = GetConnectionFromHandle(pJobData->command.connectionHandle);
if (!pConnection)
{
pJobData->result.status = EJobResult::FAIL;
pJobData->result.strReason = "Invalid connection";
return;
}

// And query
if (!m_pConnection->Query(pJobData->command.strData, pJobData->result.registryResult))
if (!pConnection->Query(pJobData->command.strData, pJobData->result.registryResult))
{
pJobData->result.status = EJobResult::FAIL;
pJobData->result.strReason = m_pConnection->GetLastErrorMessage();
pJobData->result.uiErrorCode = m_pConnection->GetLastErrorCode();
pJobData->result.bErrorSuppressed = MapContains(m_pConnection->m_SuppressedErrorCodes, m_pConnection->GetLastErrorCode());
pJobData->result.strReason = pConnection->GetLastErrorMessage();
pJobData->result.uiErrorCode = pConnection->GetLastErrorCode();
pJobData->result.bErrorSuppressed = MapContains(pConnection->m_SuppressedErrorCodes, pConnection->GetLastErrorCode());
}
else
{
Expand All @@ -732,15 +739,16 @@ void CDatabaseJobQueueImpl::ProcessQuery(CDbJobData* pJobData)
///////////////////////////////////////////////////////////////
void CDatabaseJobQueueImpl::ProcessFlush(CDbJobData* pJobData)
{
if (IsConnectionClosed())
CDatabaseConnection* pConnection = GetConnectionFromHandle(pJobData->command.connectionHandle);
if (!pConnection)
{
pJobData->result.status = EJobResult::FAIL;
pJobData->result.strReason = "Invalid connection";
return;
}

// Do flush
m_pConnection->Flush();
pConnection->Flush();
pJobData->result.status = EJobResult::SUCCESS;
}

Expand All @@ -758,6 +766,38 @@ void CDatabaseJobQueueImpl::ProcessSetLogLevel(CDbJobData* pJobData)
pJobData->result.status = EJobResult::SUCCESS;
}

///////////////////////////////////////////////////////////////
//
// CDatabaseJobQueueImpl::GetConnectionFromHandle
//
//
//
///////////////////////////////////////////////////////////////
CDatabaseConnection* CDatabaseJobQueueImpl::GetConnectionFromHandle(SConnectionHandle connectionHandle)
{
shared.m_Mutex.Lock();
CDatabaseConnection* pConnection = MapFindRef(shared.m_HandleConnectionMap, connectionHandle);
shared.m_Mutex.Unlock();
return pConnection;
}

///////////////////////////////////////////////////////////////
//
// CDatabaseJobQueueImpl::RemoveHandleForConnection
//
//
//
///////////////////////////////////////////////////////////////
void CDatabaseJobQueueImpl::RemoveHandleForConnection(SConnectionHandle connectionHandle, CDatabaseConnection* pConnection)
{
shared.m_Mutex.Lock();
if (!MapContains(shared.m_HandleConnectionMap, connectionHandle))
CLogger::ErrorPrintf("RemoveHandleForConnection: Serious problem here\n");

MapRemove(shared.m_HandleConnectionMap, connectionHandle);
shared.m_Mutex.Unlock();
}

///////////////////////////////////////////////////////////////
//
// CDatabaseJobQueueImpl::LogResult
Expand All @@ -772,14 +812,15 @@ void CDatabaseJobQueueImpl::LogResult(CDbJobData* pJobData)
return;

// Check logging status of connection
if (IsConnectionClosed() || !m_pConnection->m_bLoggingEnabled)
CDatabaseConnection* pConnection = GetConnectionFromHandle(pJobData->command.connectionHandle);
if (!pConnection || !pConnection->m_bLoggingEnabled)
return;

if (pJobData->result.status == EJobResult::SUCCESS)
{
if (m_LogLevel >= EJobLogLevel::ALL)
{
SString strLine("%s: [%s] SUCCESS: Affected rows:%d [Query:%s]\n", *GetLocalTimeString(true, true), *m_pConnection->m_strLogTag,
SString strLine("%s: [%s] SUCCESS: Affected rows:%d [Query:%s]\n", *GetLocalTimeString(true, true), *pConnection->m_strLogTag,
pJobData->result.registryResult->uiNumAffectedRows, *pJobData->GetCommandStringForLog());
LogString(strLine);
}
Expand All @@ -792,7 +833,7 @@ void CDatabaseJobQueueImpl::LogResult(CDbJobData* pJobData)
if (pJobData->result.bErrorSuppressed && m_LogLevel != EJobLogLevel::ALL)
return;

SString strLine("%s: [%s] FAIL: (%d) %s [Query:%s]\n", *GetLocalTimeString(true, true), *m_pConnection->m_strLogTag, pJobData->result.uiErrorCode,
SString strLine("%s: [%s] FAIL: (%d) %s [Query:%s]\n", *GetLocalTimeString(true, true), *pConnection->m_strLogTag, pJobData->result.uiErrorCode,
*pJobData->result.strReason, *pJobData->GetCommandStringForLog());
LogString(strLine);
}
Expand All @@ -810,19 +851,3 @@ void CDatabaseJobQueueImpl::LogString(const SString& strText)
{
FileAppend(m_strLogFilename, strText);
}

///////////////////////////////////////////////////////////////
//
// CDatabaseJobQueueImpl::GetQueueSize
//
// Get count elements in queue
//
///////////////////////////////////////////////////////////////
int CDatabaseJobQueueImpl::GetQueueSize()
{
shared.m_Mutex.Lock();
int count = shared.m_CommandQueue.size();
shared.m_Mutex.Unlock();

return count;
}
3 changes: 1 addition & 2 deletions Server/mods/deathmatch/logic/CDatabaseJobQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ class CDatabaseJobQueue
virtual bool FreeCommand(CDbJobData* pJobData) = 0;
virtual CDbJobData* FindCommandFromId(SDbJobId id) = 0;
virtual void IgnoreConnectionResults(SConnectionHandle connectionHandle) = 0;
virtual bool IsConnectionClosed() = 0;
virtual int GetQueueSize() = 0;
virtual bool UsesConnection(SConnectionHandle connectionHandle) = 0;
};

CDatabaseJobQueue* NewDatabaseJobQueue();
39 changes: 17 additions & 22 deletions Server/mods/deathmatch/logic/CDatabaseJobQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ CDbJobData* CDatabaseJobQueueManager::AddCommand(EJobCommandType jobType, SConne
if (jobType == EJobCommand::CONNECT)
{
connectionHandle = GetNextConnectionHandle();
pJobQueue = GetQueueFromConnectCommand(connectionHandle);
pJobQueue = GetQueueFromConnectCommand(strData);
}
else
{
Expand Down Expand Up @@ -162,7 +162,12 @@ void CDatabaseJobQueueManager::SetLogLevel(EJobLogLevelType logLevel, const SStr
///////////////////////////////////////////////////////////////
CDatabaseJobQueue* CDatabaseJobQueueManager::FindQueueFromConnection(SConnectionHandle connectionHandle)
{
return MapFindRef(m_QueueNameMap, connectionHandle);
for (const auto iter : m_QueueNameMap)
{
if (iter.second->UsesConnection(connectionHandle))
return iter.second;
}
return nullptr;
}

///////////////////////////////////////////////////////////////
Expand All @@ -173,15 +178,21 @@ CDatabaseJobQueue* CDatabaseJobQueueManager::FindQueueFromConnection(SConnection
// Can't fail
//
///////////////////////////////////////////////////////////////
CDatabaseJobQueue* CDatabaseJobQueueManager::GetQueueFromConnectCommand(SConnectionHandle connectionHandle)
CDatabaseJobQueue* CDatabaseJobQueueManager::GetQueueFromConnectCommand(const SString& strData)
{
// Extract queue name from options
std::vector<SString> parts;
strData.Split("\1", parts);
SString strQueueName;
GetOption<CDbOptionsMap>(parts[4], "queue", strQueueName);

// Find queue with name
CDatabaseJobQueue* pQueue = MapFindRef(m_QueueNameMap, connectionHandle);
CDatabaseJobQueue* pQueue = MapFindRef(m_QueueNameMap, strQueueName);
if (!pQueue)
{
// Add new queue
pQueue = NewDatabaseJobQueue();
MapSet(m_QueueNameMap, connectionHandle, pQueue);
MapSet(m_QueueNameMap, strQueueName, pQueue);
}
return pQueue;
}
Expand All @@ -201,23 +212,7 @@ SConnectionHandle CDatabaseJobQueueManager::GetNextConnectionHandle()
m_ConnectionHandleCounter &= 0x000FFFFF;
m_ConnectionHandleCounter |= 0x00200000;
// TODO - check when all (1,048,575) ids are in use
} while (MapContains(m_QueueNameMap, m_ConnectionHandleCounter));
} while (FindQueueFromConnection(m_ConnectionHandleCounter));

return m_ConnectionHandleCounter;
}

///////////////////////////////////////////////////////////////
//
// CDatabaseJobQueueManager::GetQueueSizeFromConnection
//
// Return count elements in queue
//
///////////////////////////////////////////////////////////////
int CDatabaseJobQueueManager::GetQueueSizeFromConnection(SConnectionHandle connectionHandle)
{
CDatabaseJobQueue* pJobQueue = FindQueueFromConnection(connectionHandle);
if (!pJobQueue)
return -1;

return pJobQueue->GetQueueSize();
}
7 changes: 3 additions & 4 deletions Server/mods/deathmatch/logic/CDatabaseJobQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ class CDatabaseJobQueueManager
CDbJobData* FindCommandFromId(SDbJobId id);
void IgnoreConnectionResults(SConnectionHandle connectionHandle);
void SetLogLevel(EJobLogLevelType logLevel, const SString& strLogFilename);
int GetQueueSizeFromConnection(SConnectionHandle connectionHandle);

protected:
CDatabaseJobQueue* GetQueueFromConnectCommand(SConnectionHandle connectionHandle);
CDatabaseJobQueue* GetQueueFromConnectCommand(const SString& strData);
CDatabaseJobQueue* FindQueueFromConnection(SConnectionHandle connectionHandle);
SConnectionHandle GetNextConnectionHandle();

std::map<SConnectionHandle, CDatabaseJobQueue*> m_QueueNameMap;
SConnectionHandle m_ConnectionHandleCounter;
std::map<SString, CDatabaseJobQueue*> m_QueueNameMap;
SConnectionHandle m_ConnectionHandleCounter;
};
13 changes: 0 additions & 13 deletions Server/mods/deathmatch/logic/CDatabaseManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class CDatabaseManagerImpl : public CDatabaseManager
CLuaArguments* pArgs = nullptr);
virtual bool QueryWithCallbackf(SConnectionHandle hConnection, PFN_DBRESULT pfnDbResult, void* pCallbackContext, const char* szQuery, ...);
virtual void SetLogLevel(EJobLogLevelType logLevel, const SString& strLogFilename);
virtual int GetQueueSizeFromConnection(SConnectionHandle connectionHandle);

// CDatabaseManagerImpl
SString InsertQueryArguments(SConnectionHandle hConnection, const SString& strQuery, CLuaArguments* pArgs);
Expand Down Expand Up @@ -565,18 +564,6 @@ void CDatabaseManagerImpl::SetLogLevel(EJobLogLevelType logLevel, const SString&
return m_JobQueue->SetLogLevel(logLevel, strLogFilename);
}

///////////////////////////////////////////////////////////////
//
// CDatabaseManagerImpl::GetQueueSizeFromConnection
//
//
//
///////////////////////////////////////////////////////////////
int CDatabaseManagerImpl::GetQueueSizeFromConnection(SConnectionHandle connectionHandle)
{
return m_JobQueue->GetQueueSizeFromConnection(connectionHandle);
}

///////////////////////////////////////////////////////////////
//
// CDatabaseManagerImpl::InsertQueryArguments
Expand Down
1 change: 0 additions & 1 deletion Server/mods/deathmatch/logic/CDatabaseManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ class CDatabaseManager
CLuaArguments* pArgs = nullptr) = 0;
virtual bool QueryWithCallbackf(SConnectionHandle hConnection, PFN_DBRESULT pfnDbResult, void* pCallbackContext, const char* szQuery, ...) = 0;
virtual void SetLogLevel(EJobLogLevelType logLevel, const SString& strLogFilename) = 0;
virtual int GetQueueSizeFromConnection(SConnectionHandle connectionHandle) = 0;
};

CDatabaseManager* NewDatabaseManager();
Expand Down
Loading