Mercurial > hg > vamp-build-and-test
diff DEPENDENCIES/generic/include/boost/interprocess/ipc/message_queue.hpp @ 101:c530137014c0
Update Boost headers (1.58.0)
author | Chris Cannam |
---|---|
date | Mon, 07 Sep 2015 11:12:49 +0100 |
parents | 2665513ce2d3 |
children |
line wrap: on
line diff
--- a/DEPENDENCIES/generic/include/boost/interprocess/ipc/message_queue.hpp Fri Sep 04 12:01:02 2015 +0100 +++ b/DEPENDENCIES/generic/include/boost/interprocess/ipc/message_queue.hpp Mon Sep 07 11:12:49 2015 +0100 @@ -11,6 +11,14 @@ #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP +#ifndef BOOST_CONFIG_HPP +# include <boost/config.hpp> +#endif +# +#if defined(BOOST_HAS_PRAGMA_ONCE) +# pragma once +#endif + #include <boost/interprocess/detail/config_begin.hpp> #include <boost/interprocess/detail/workaround.hpp> @@ -24,11 +32,10 @@ #include <boost/interprocess/creation_tags.hpp> #include <boost/interprocess/exceptions.hpp> #include <boost/interprocess/permissions.hpp> -#include <boost/detail/no_exceptions_support.hpp> +#include <boost/core/no_exceptions_support.hpp> #include <boost/interprocess/detail/type_traits.hpp> #include <boost/intrusive/pointer_traits.hpp> -#include <boost/type_traits/make_unsigned.hpp> -#include <boost/type_traits/alignment_of.hpp> +#include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of #include <boost/intrusive/pointer_traits.hpp> #include <boost/assert.hpp> #include <algorithm> //std::lower_bound @@ -54,12 +61,12 @@ template<class VoidPointer> class message_queue_t { - /// @cond + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) //Blocking modes enum block_t { blocking, timed, non_blocking }; message_queue_t(); - /// @endcond + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED public: typedef VoidPointer void_pointer; @@ -67,7 +74,7 @@ pointer_traits<void_pointer>::template rebind_pointer<char>::type char_ptr; typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; - typedef typename boost::make_unsigned<difference_type>::type size_type; + typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type; //!Creates a process shared message queue with name "name". For this message queue, //!the maximum number of messages will be "max_num_msg" and the maximum message size @@ -168,7 +175,7 @@ //!Returns false on error. Never throws static bool remove(const char *name); - /// @cond + #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) private: typedef boost::posix_time::ptime ptime; @@ -188,10 +195,10 @@ static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; open_create_impl_t m_shmem; - /// @endcond + #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED }; -/// @cond +#if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) namespace ipcdetail { @@ -204,7 +211,7 @@ pointer_traits<void_pointer>::template rebind_pointer<char>::type char_ptr; typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; - typedef typename boost::make_unsigned<difference_type>::type size_type; + typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type; public: size_type len; // Message length @@ -250,7 +257,7 @@ //! if two messages have the same priority. So the next message to be //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg] //! and the first free message ready to be used in a "send" operation is -//! [cur_first_msg] if circular buffer is extended from front, +//! [cur_first_msg] if circular buffer is extended from front, //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise. //! //! This transforms the index in a circular buffer with an embedded free @@ -289,7 +296,8 @@ rebind_pointer<msg_header>::type msg_hdr_ptr_t; typedef typename boost::intrusive::pointer_traits <msg_hdr_ptr_t>::difference_type difference_type; - typedef typename boost::make_unsigned<difference_type>::type size_type; + typedef typename boost::container:: + container_detail::make_unsigned<difference_type>::type size_type; typedef typename boost::intrusive:: pointer_traits<void_pointer>::template rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t; @@ -306,6 +314,8 @@ m_cur_num_msg(0) #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) ,m_cur_first_msg(0u) + ,m_blocked_senders(0u) + ,m_blocked_receivers(0u) #endif { this->initialize_memory(); } @@ -354,7 +364,7 @@ iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end()); if(end < begin){ iterator idx_end = &mp_index[m_max_num_msg]; - iterator ret = std::lower_bound(begin, idx_end, value, func); + iterator ret = std::lower_bound(begin, idx_end, value, func); if(idx_end == ret){ iterator idx_beg = &mp_index[0]; ret = std::lower_bound(idx_beg, end, value, func); @@ -376,17 +386,17 @@ { iterator it_inserted_ptr_end = this->inserted_ptr_end(); iterator it_inserted_ptr_beg = this->inserted_ptr_begin(); - if(where == it_inserted_ptr_end){ - ++m_cur_num_msg; - return **it_inserted_ptr_end; - } - else if(where == it_inserted_ptr_beg){ + if(where == it_inserted_ptr_beg){ //unsigned integer guarantees underflow m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; --m_cur_first_msg; ++m_cur_num_msg; return *mp_index[m_cur_first_msg]; } + else if(where == it_inserted_ptr_end){ + ++m_cur_num_msg; + return **it_inserted_ptr_end; + } else{ size_type pos = where - &mp_index[0]; size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg); @@ -452,7 +462,7 @@ } } - #else + #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX typedef msg_hdr_ptr_t *iterator; @@ -482,7 +492,7 @@ return **pos; } - #endif + #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX //!Inserts the first free message in the priority queue msg_header & queue_free_msg(unsigned int priority) @@ -507,7 +517,6 @@ //Check where the free message should be placed it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this)); } - } //Insert the free message in the correct position return this->insert_at(it); @@ -520,11 +529,11 @@ (size_type max_msg_size, size_type max_num_msg) { const size_type - msg_hdr_align = ::boost::alignment_of<msg_header>::value, - index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value, + msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value, + index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value, r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, - r_index_size = ipcdetail::get_rounded_size(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), - r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header); + r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), + r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header); return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + open_create_impl_t::ManagedOpenOrCreateUserOffset; } @@ -534,11 +543,11 @@ void initialize_memory() { const size_type - msg_hdr_align = ::boost::alignment_of<msg_header>::value, - index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value, + msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value, + index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value, r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, - r_index_size = ipcdetail::get_rounded_size(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), - r_max_msg_size = ipcdetail::get_rounded_size(m_max_msg_size, msg_hdr_align) + sizeof(msg_header); + r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), + r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header); //Pointer to the index msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*> @@ -577,6 +586,8 @@ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) //Current start offset in the circular index size_type m_cur_first_msg; + size_type m_blocked_senders; + size_type m_blocked_receivers; #endif }; @@ -589,9 +600,11 @@ public: typedef typename boost::intrusive:: pointer_traits<VoidPointer>::template - rebind_pointer<char>::type char_ptr; - typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; - typedef typename boost::make_unsigned<difference_type>::type size_type; + rebind_pointer<char>::type char_ptr; + typedef typename boost::intrusive::pointer_traits<char_ptr>:: + difference_type difference_type; + typedef typename boost::container::container_detail:: + make_unsigned<difference_type>::type size_type; msg_queue_initialization_func_t(size_type maxmsg = 0, size_type maxmsgsize = 0) @@ -714,41 +727,67 @@ throw interprocess_exception(size_error); } - bool was_empty = false; + #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + bool notify_blocked_receivers = false; + #endif //--------------------------------------------- scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); //--------------------------------------------- { //If the queue is full execute blocking logic if (p_hdr->is_full()) { - switch(block){ - case non_blocking : - return false; - break; + BOOST_TRY{ + #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX + ++p_hdr->m_blocked_senders; + #endif + switch(block){ + case non_blocking : + #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX + --p_hdr->m_blocked_senders; + #endif + return false; + break; - case blocking : - do{ - p_hdr->m_cond_send.wait(lock); - } - while (p_hdr->is_full()); - break; + case blocking : + do{ + p_hdr->m_cond_send.wait(lock); + } + while (p_hdr->is_full()); + break; - case timed : - do{ - if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){ - if(p_hdr->is_full()) - return false; - break; + case timed : + do{ + if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){ + if(p_hdr->is_full()){ + #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX + --p_hdr->m_blocked_senders; + #endif + return false; + } + break; + } } - } - while (p_hdr->is_full()); - break; - default: - break; + while (p_hdr->is_full()); + break; + default: + break; + } + #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX + --p_hdr->m_blocked_senders; + #endif } + BOOST_CATCH(...){ + #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX + --p_hdr->m_blocked_senders; + #endif + BOOST_RETHROW; + } + BOOST_CATCH_END } - was_empty = p_hdr->is_empty(); + #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers; + #endif //Insert the first free message in the priority queue ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority); @@ -767,9 +806,13 @@ //Notify outside lock to avoid contention. This might produce some //spurious wakeups, but it's usually far better than notifying inside. //If this message changes the queue empty state, notify it to receivers - if (was_empty){ + #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + if (notify_blocked_receivers){ p_hdr->m_cond_recv.notify_one(); } + #else + p_hdr->m_cond_recv.notify_one(); + #endif return true; } @@ -811,42 +854,70 @@ throw interprocess_exception(size_error); } - bool was_full = false; + #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + bool notify_blocked_senders = false; + #endif //--------------------------------------------- scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); //--------------------------------------------- { //If there are no messages execute blocking logic if (p_hdr->is_empty()) { - switch(block){ - case non_blocking : - return false; - break; + BOOST_TRY{ + #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + ++p_hdr->m_blocked_receivers; + #endif + switch(block){ + case non_blocking : + #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + --p_hdr->m_blocked_receivers; + #endif + return false; + break; - case blocking : - do{ - p_hdr->m_cond_recv.wait(lock); - } - while (p_hdr->is_empty()); - break; + case blocking : + do{ + p_hdr->m_cond_recv.wait(lock); + } + while (p_hdr->is_empty()); + break; - case timed : - do{ - if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){ - if(p_hdr->is_empty()) - return false; - break; + case timed : + do{ + if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){ + if(p_hdr->is_empty()){ + #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + --p_hdr->m_blocked_receivers; + #endif + return false; + } + break; + } } - } - while (p_hdr->is_empty()); - break; + while (p_hdr->is_empty()); + break; - //Paranoia check - default: - break; + //Paranoia check + default: + break; + } + #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + --p_hdr->m_blocked_receivers; + #endif } + BOOST_CATCH(...){ + #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + --p_hdr->m_blocked_receivers; + #endif + BOOST_RETHROW; + } + BOOST_CATCH_END } + #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX + notify_blocked_senders = 0 != p_hdr->m_blocked_senders; + #endif + //There is at least one message ready to pick, get the top one ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg(); @@ -861,8 +932,6 @@ //Copy data to receiver's bufers std::memcpy(buffer, top_msg.data(), recvd_size); - was_full = p_hdr->is_full(); - //Free top message and put it in the free message list p_hdr->free_top_msg(); } //Lock end @@ -870,9 +939,13 @@ //Notify outside lock to avoid contention. This might produce some //spurious wakeups, but it's usually far better than notifying inside. //If this reception changes the queue full state, notify senders - if (was_full){ + #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX + if (notify_blocked_senders){ p_hdr->m_cond_send.notify_one(); } + #else + p_hdr->m_cond_send.notify_one(); + #endif return true; } @@ -908,7 +981,13 @@ inline bool message_queue_t<VoidPointer>::remove(const char *name) { return shared_memory_object::remove(name); } -/// @endcond +#else + +//!Typedef for a default message queue +//!to be used between processes +typedef message_queue_t<offset_ptr<void> > message_queue; + +#endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED }} //namespace boost{ namespace interprocess{