Changeset View
Changeset View
Standalone View
Standalone View
ps/trunk/source/tools/atlas/GameInterface/MessagePasserImpl.cpp
Show All 15 Lines | |||||
*/ | */ | ||||
#include "precompiled.h" | #include "precompiled.h" | ||||
#include <cstdio> | #include <cstdio> | ||||
#include "MessagePasserImpl.h" | #include "MessagePasserImpl.h" | ||||
#include "Messages.h" | #include "Messages.h" | ||||
#include "Handlers/MessageHandler.h" | |||||
#include "lib/timer.h" | #include "lib/timer.h" | ||||
#include "lib/rand.h" | #include "ps/CLogger.h" | ||||
#include "lib/posix/posix_filesystem.h" | |||||
using namespace AtlasMessage; | using namespace AtlasMessage; | ||||
double last_user_activity = 0.0; | |||||
MessagePasserImpl::MessagePasserImpl() | |||||
: m_Trace(false), m_Semaphore(NULL) | |||||
{ | |||||
int tries = 0; | |||||
while (tries++ < 16) // some arbitrary cut-off point to avoid infinite loops | |||||
{ | |||||
static char name[64]; | |||||
sprintf_s(name, ARRAY_SIZE(name), "/wfg-atlas-msgpass-%d-%d", | |||||
(int)rand(1, 1000), (int)(time(0)%1000)); | |||||
sem_t* sem = sem_open(name, O_CREAT | O_EXCL, 0700, 0); | |||||
// This cast should not be necessary, but apparently SEM_FAILED is not | |||||
// a value of a pointer type | |||||
if (sem == (sem_t*)SEM_FAILED || !sem) | |||||
{ | |||||
int err = errno; | |||||
if (err == EEXIST) | |||||
{ | |||||
// Semaphore already exists - try another one | |||||
continue; | |||||
} | |||||
// Otherwise, it's a probably-fatal error | |||||
debug_warn(L"sem_open failed"); | |||||
break; | |||||
} | |||||
// Succeeded - use this semaphore | |||||
m_Semaphore = sem; | |||||
m_SemaphoreName = name; | |||||
break; | |||||
} | |||||
if (! m_Semaphore) | |||||
{ | |||||
debug_warn(L"Failed to create semaphore for Atlas - giving up"); | |||||
// We will probably crash later - maybe we could fall back on sem_init, if this | |||||
// ever fails in practice | |||||
} | |||||
} | |||||
MessagePasserImpl::~MessagePasserImpl() | |||||
{ | |||||
if (m_Semaphore) | |||||
{ | |||||
// Clean up | |||||
sem_close(m_Semaphore); | |||||
sem_unlink(m_SemaphoreName.c_str()); | |||||
} | |||||
} | |||||
void MessagePasserImpl::Add(IMessage* msg) | void MessagePasserImpl::Add(IMessage* msg) | ||||
{ | { | ||||
ENSURE(msg); | ENSURE(msg); | ||||
ENSURE(msg->GetType() == IMessage::Message); | ENSURE(msg->GetType() == IMessage::Message); | ||||
if (m_Trace) | if (m_Trace) | ||||
debug_printf("%8.3f add message: %s\n", timer_Time(), msg->GetName()); | debug_printf("%8.3f add message: %s\n", timer_Time(), msg->GetName()); | ||||
msgHandlers::const_iterator it = GetMsgHandlers().find(msg->GetName()); | |||||
if (it != GetMsgHandlers().end()) | |||||
{ | { | ||||
std::lock_guard<std::mutex> lock(m_Mutex); | it->second(msg); | ||||
m_Queue.push(msg); | |||||
} | } | ||||
} | else | ||||
IMessage* MessagePasserImpl::Retrieve() | |||||
{ | { | ||||
// (It should be fairly easy to use a more efficient thread-safe queue, | debug_warn(L"Unrecognised message"); | ||||
// since there's only one thread adding items and one thread consuming; | // CLogger might not be initialised, but this error will be sent | ||||
// but it's not worthwhile yet.) | // to the debug output window anyway so people can still see it | ||||
LOGERROR("Unrecognised message (%s)", msg->GetName()); | |||||
IMessage* msg = NULL; | |||||
{ | |||||
std::lock_guard<std::mutex> lock(m_Mutex); | |||||
if (! m_Queue.empty()) | |||||
{ | |||||
msg = m_Queue.front(); | |||||
m_Queue.pop(); | |||||
} | |||||
} | } | ||||
// Delete the object - we took ownership of it. | |||||
if (m_Trace && msg) | AtlasMessage::ShareableDelete(msg); | ||||
debug_printf("%8.3f retrieved message: %s\n", timer_Time(), msg->GetName()); | |||||
return msg; | |||||
} | } | ||||
void MessagePasserImpl::Query(QueryMessage* qry, void(* UNUSED(timeoutCallback) )()) | void MessagePasserImpl::Query(QueryMessage* msg, void(* UNUSED(timeoutCallback) )()) | ||||
{ | { | ||||
ENSURE(qry); | ENSURE(msg); | ||||
ENSURE(qry->GetType() == IMessage::Query); | ENSURE(msg->GetType() == IMessage::Query); | ||||
if (m_Trace) | if (m_Trace) | ||||
debug_printf("%8.3f add query: %s\n", timer_Time(), qry->GetName()); | debug_printf("%8.3f add query: %s\n", timer_Time(), msg->GetName()); | ||||
// Set the semaphore, so we can block until the query has been handled | |||||
qry->m_Semaphore = static_cast<void*>(m_Semaphore); | |||||
msgHandlers::const_iterator it = GetMsgHandlers().find(msg->GetName()); | |||||
if (it != GetMsgHandlers().end()) | |||||
{ | { | ||||
std::lock_guard<std::mutex> lock(m_Mutex); | it->second(msg); | ||||
m_Queue.push(qry); | |||||
} | } | ||||
else | |||||
// Wait until the query handler has handled the query and called sem_post: | |||||
// The following code was necessary to avoid deadlock, but it still breaks | |||||
// in some cases (e.g. when Atlas issues a query before its event loop starts | |||||
// running) and doesn't seem to be the simplest possible solution. | |||||
// So currently we're trying to not do anything like that at all, and | |||||
// just stop the game making windows (which is what seems (from experience) to | |||||
// deadlock things) by overriding ah_display_error. Hopefully it'll work like | |||||
// that, and the redundant code below/elsewhere can be removed, but it's | |||||
// left in here in case it needs to be reinserted in the future to make it | |||||
// work. | |||||
// (See http://www.wildfiregames.com/forum/index.php?s=&showtopic=10236&view=findpost&p=174617) | |||||
// // At least on Win32, it is necessary for the UI thread to run its event | |||||
// // loop to avoid deadlocking the system (particularly when the game | |||||
// // tries to show a dialog box); so timeoutCallback is called whenever we | |||||
// // think it's necessary for that to happen. | |||||
// | |||||
// #if OS_WIN | |||||
// // On Win32, use MsgWaitForMultipleObjects, which waits on the semaphore | |||||
// // but is also interrupted by incoming Windows-messages. | |||||
// // while (0 != (err = sem_msgwait_np(psem))) | |||||
// | |||||
// while (0 != (err = sem_wait(psem))) | |||||
// #else | |||||
// // TODO: On non-Win32, I have no idea whether the same problem exists; but | |||||
// // it might do, so call the callback every few seconds just in case it helps. | |||||
// struct timespec abs_timeout; | |||||
// clock_gettime(CLOCK_REALTIME, &abs_timeout); | |||||
// abs_timeout.tv_sec += 2; | |||||
// while (0 != (err = sem_timedwait(psem, &abs_timeout))) | |||||
// #endif | |||||
while (0 != sem_wait(m_Semaphore)) | |||||
{ | |||||
// If timed out, call callback and try again | |||||
// if (errno == ETIMEDOUT) | |||||
// timeoutCallback(); | |||||
// else | |||||
// Keep retrying while EINTR, but other errors are probably fatal | |||||
if (errno != EINTR) | |||||
{ | { | ||||
debug_warn(L"Semaphore wait failed"); | debug_warn(L"Unrecognised message"); | ||||
return; // (leaks the semaphore) | // CLogger might not be initialised, but this error will be sent | ||||
// to the debug output window anyway so people can still see it | |||||
LOGERROR("Unrecognised message (%s)", msg->GetName()); | |||||
} | } | ||||
} | } | ||||
// Clean up | |||||
qry->m_Semaphore = NULL; | |||||
} | |||||
bool MessagePasserImpl::IsEmpty() | |||||
{ | |||||
std::lock_guard<std::mutex> lock(m_Mutex); | |||||
return m_Queue.empty(); | |||||
} | |||||
void MessagePasserImpl::SetTrace(bool t) | void MessagePasserImpl::SetTrace(bool t) | ||||
{ | { | ||||
m_Trace = t; | m_Trace = t; | ||||
} | } |
Wildfire Games · Phabricator