Chris@16: ////////////////////////////////////////////////////////////////////////////// Chris@16: // Chris@16: // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost Chris@16: // Software License, Version 1.0. (See accompanying file Chris@16: // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) Chris@16: // Chris@16: // See http://www.boost.org/libs/interprocess for documentation. Chris@16: // Chris@16: ////////////////////////////////////////////////////////////////////////////// Chris@16: Chris@16: #ifndef BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP Chris@16: #define BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP Chris@16: Chris@101: #ifndef BOOST_CONFIG_HPP Chris@101: # include Chris@101: #endif Chris@101: # Chris@101: #if defined(BOOST_HAS_PRAGMA_ONCE) Chris@101: # pragma once Chris@101: #endif Chris@101: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@101: #include Chris@16: #include Chris@16: Chris@16: namespace boost { Chris@16: namespace interprocess { Chris@16: namespace ipcdetail { Chris@16: Chris@16: class spin_condition Chris@16: { Chris@16: spin_condition(const spin_condition &); Chris@16: spin_condition &operator=(const spin_condition &); Chris@16: public: Chris@16: spin_condition(); Chris@16: ~spin_condition(); Chris@16: Chris@16: void notify_one(); Chris@16: void notify_all(); Chris@16: Chris@16: template Chris@16: bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time) Chris@16: { Chris@101: if (!lock) Chris@101: throw lock_exception(); Chris@101: //Handle infinity absolute time here to avoid complications in do_timed_wait Chris@16: if(abs_time == boost::posix_time::pos_infin){ Chris@16: this->wait(lock); Chris@16: return true; Chris@16: } Chris@16: return this->do_timed_wait(abs_time, *lock.mutex()); Chris@16: } Chris@16: Chris@16: template Chris@16: bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred) Chris@16: { Chris@101: if (!lock) Chris@101: throw lock_exception(); Chris@101: //Handle infinity absolute time here to avoid complications in do_timed_wait Chris@16: if(abs_time == boost::posix_time::pos_infin){ Chris@16: this->wait(lock, pred); Chris@16: return true; Chris@16: } Chris@16: while (!pred()){ Chris@16: if (!this->do_timed_wait(abs_time, *lock.mutex())) Chris@16: return pred(); Chris@16: } Chris@16: return true; Chris@16: } Chris@16: Chris@16: template Chris@16: void wait(L& lock) Chris@16: { Chris@16: if (!lock) Chris@16: throw lock_exception(); Chris@16: do_wait(*lock.mutex()); Chris@16: } Chris@16: Chris@16: template Chris@16: void wait(L& lock, Pr pred) Chris@16: { Chris@16: if (!lock) Chris@16: throw lock_exception(); Chris@16: Chris@16: while (!pred()) Chris@16: do_wait(*lock.mutex()); Chris@16: } Chris@16: Chris@16: template Chris@16: void do_wait(InterprocessMutex &mut); Chris@16: Chris@16: template Chris@16: bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut); Chris@16: Chris@16: private: Chris@16: template Chris@16: bool do_timed_wait(bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut); Chris@16: Chris@16: enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL }; Chris@16: spin_mutex m_enter_mut; Chris@16: volatile boost::uint32_t m_command; Chris@16: volatile boost::uint32_t m_num_waiters; Chris@16: void notify(boost::uint32_t command); Chris@16: }; Chris@16: Chris@16: inline spin_condition::spin_condition() Chris@16: { Chris@16: //Note that this class is initialized to zero. Chris@16: //So zeroed memory can be interpreted as an initialized Chris@16: //condition variable Chris@16: m_command = SLEEP; Chris@16: m_num_waiters = 0; Chris@16: } Chris@16: Chris@16: inline spin_condition::~spin_condition() Chris@16: { Chris@101: //Notify all waiting threads Chris@101: //to allow POSIX semantics on condition destruction Chris@101: this->notify_all(); Chris@16: } Chris@16: Chris@16: inline void spin_condition::notify_one() Chris@16: { Chris@16: this->notify(NOTIFY_ONE); Chris@16: } Chris@16: Chris@16: inline void spin_condition::notify_all() Chris@16: { Chris@16: this->notify(NOTIFY_ALL); Chris@16: } Chris@16: Chris@16: inline void spin_condition::notify(boost::uint32_t command) Chris@16: { Chris@16: //This mutex guarantees that no other thread can enter to the Chris@16: //do_timed_wait method logic, so that thread count will be Chris@16: //constant until the function writes a NOTIFY_ALL command. Chris@16: //It also guarantees that no other notification can be signaled Chris@16: //on this spin_condition before this one ends Chris@16: m_enter_mut.lock(); Chris@16: Chris@16: //Return if there are no waiters Chris@16: if(!atomic_read32(&m_num_waiters)) { Chris@16: m_enter_mut.unlock(); Chris@16: return; Chris@16: } Chris@16: Chris@16: //Notify that all threads should execute wait logic Chris@16: spin_wait swait; Chris@16: while(SLEEP != atomic_cas32(const_cast(&m_command), command, SLEEP)){ Chris@16: swait.yield(); Chris@16: } Chris@16: //The enter mutex will rest locked until the last waiting thread unlocks it Chris@16: } Chris@16: Chris@16: template Chris@16: inline void spin_condition::do_wait(InterprocessMutex &mut) Chris@16: { Chris@16: this->do_timed_wait(false, boost::posix_time::ptime(), mut); Chris@16: } Chris@16: Chris@16: template Chris@16: inline bool spin_condition::do_timed_wait Chris@16: (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut) Chris@16: { Chris@16: return this->do_timed_wait(true, abs_time, mut); Chris@16: } Chris@16: Chris@16: template Chris@16: inline bool spin_condition::do_timed_wait(bool tout_enabled, Chris@16: const boost::posix_time::ptime &abs_time, Chris@16: InterprocessMutex &mut) Chris@16: { Chris@16: boost::posix_time::ptime now = microsec_clock::universal_time(); Chris@16: Chris@16: if(tout_enabled){ Chris@16: if(now >= abs_time) return false; Chris@16: } Chris@16: Chris@16: typedef boost::interprocess::scoped_lock InternalLock; Chris@16: //The enter mutex guarantees that while executing a notification, Chris@16: //no other thread can execute the do_timed_wait method. Chris@16: { Chris@16: //--------------------------------------------------------------- Chris@16: InternalLock lock; Chris@16: if(tout_enabled){ Chris@16: InternalLock dummy(m_enter_mut, abs_time); Chris@16: lock = boost::move(dummy); Chris@16: } Chris@16: else{ Chris@16: InternalLock dummy(m_enter_mut); Chris@16: lock = boost::move(dummy); Chris@16: } Chris@16: Chris@16: if(!lock) Chris@16: return false; Chris@16: //--------------------------------------------------------------- Chris@16: //We increment the waiting thread count protected so that it will be Chris@16: //always constant when another thread enters the notification logic. Chris@16: //The increment marks this thread as "waiting on spin_condition" Chris@16: atomic_inc32(const_cast(&m_num_waiters)); Chris@16: Chris@16: //We unlock the external mutex atomically with the increment Chris@16: mut.unlock(); Chris@16: } Chris@16: Chris@16: //By default, we suppose that no timeout has happened Chris@16: bool timed_out = false, unlock_enter_mut= false; Chris@16: Chris@16: //Loop until a notification indicates that the thread should Chris@16: //exit or timeout occurs Chris@16: while(1){ Chris@16: //The thread sleeps/spins until a spin_condition commands a notification Chris@16: //Notification occurred, we will lock the checking mutex so that Chris@16: spin_wait swait; Chris@16: while(atomic_read32(&m_command) == SLEEP){ Chris@16: swait.yield(); Chris@16: Chris@16: //Check for timeout Chris@16: if(tout_enabled){ Chris@16: now = microsec_clock::universal_time(); Chris@16: Chris@16: if(now >= abs_time){ Chris@16: //If we can lock the mutex it means that no notification Chris@16: //is being executed in this spin_condition variable Chris@16: timed_out = m_enter_mut.try_lock(); Chris@16: Chris@16: //If locking fails, indicates that another thread is executing Chris@16: //notification, so we play the notification game Chris@16: if(!timed_out){ Chris@16: //There is an ongoing notification, we will try again later Chris@16: continue; Chris@16: } Chris@16: //No notification in execution, since enter mutex is locked. Chris@16: //We will execute time-out logic, so we will decrement count, Chris@16: //release the enter mutex and return false. Chris@16: break; Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: //If a timeout occurred, the mutex will not execute checking logic Chris@16: if(tout_enabled && timed_out){ Chris@16: //Decrement wait count Chris@16: atomic_dec32(const_cast(&m_num_waiters)); Chris@16: unlock_enter_mut = true; Chris@16: break; Chris@16: } Chris@16: else{ Chris@16: boost::uint32_t result = atomic_cas32 Chris@16: (const_cast(&m_command), SLEEP, NOTIFY_ONE); Chris@16: if(result == SLEEP){ Chris@16: //Other thread has been notified and since it was a NOTIFY one Chris@16: //command, this thread must sleep again Chris@16: continue; Chris@16: } Chris@16: else if(result == NOTIFY_ONE){ Chris@16: //If it was a NOTIFY_ONE command, only this thread should Chris@16: //exit. This thread has atomically marked command as sleep before Chris@16: //so no other thread will exit. Chris@16: //Decrement wait count. Chris@16: unlock_enter_mut = true; Chris@16: atomic_dec32(const_cast(&m_num_waiters)); Chris@16: break; Chris@16: } Chris@16: else{ Chris@16: //If it is a NOTIFY_ALL command, all threads should return Chris@16: //from do_timed_wait function. Decrement wait count. Chris@16: unlock_enter_mut = 1 == atomic_dec32(const_cast(&m_num_waiters)); Chris@16: //Check if this is the last thread of notify_all waiters Chris@16: //Only the last thread will release the mutex Chris@16: if(unlock_enter_mut){ Chris@16: atomic_cas32(const_cast(&m_command), SLEEP, NOTIFY_ALL); Chris@16: } Chris@16: break; Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@16: //Unlock the enter mutex if it is a single notification, if this is Chris@16: //the last notified thread in a notify_all or a timeout has occurred Chris@16: if(unlock_enter_mut){ Chris@16: m_enter_mut.unlock(); Chris@16: } Chris@16: Chris@16: //Lock external again before returning from the method Chris@16: mut.lock(); Chris@16: return !timed_out; Chris@16: } Chris@16: Chris@16: } //namespace ipcdetail Chris@16: } //namespace interprocess Chris@16: } //namespace boost Chris@16: Chris@16: #include Chris@16: Chris@16: #endif //BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP