Chris@16: ////////////////////////////////////////////////////////////////////////////// Chris@16: // Chris@16: // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost Chris@16: // Software License, Version 1.0. (See accompanying file Chris@16: // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) Chris@16: // Chris@16: // See http://www.boost.org/libs/interprocess for documentation. Chris@16: // Chris@16: ////////////////////////////////////////////////////////////////////////////// Chris@16: Chris@16: #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP Chris@16: #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP Chris@16: Chris@101: #ifndef BOOST_CONFIG_HPP Chris@101: # include Chris@101: #endif Chris@101: # Chris@101: #if defined(BOOST_HAS_PRAGMA_ONCE) Chris@101: # pragma once Chris@101: #endif Chris@101: Chris@16: #include Chris@16: #include Chris@16: Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@16: #include Chris@101: #include Chris@16: #include Chris@16: #include Chris@101: #include //make_unsigned, alignment_of Chris@16: #include Chris@16: #include Chris@16: #include //std::lower_bound Chris@16: #include //std::size_t Chris@16: #include //memcpy Chris@16: Chris@16: Chris@16: //!\file Chris@16: //!Describes an inter-process message queue. This class allows sending Chris@16: //!messages between processes and allows blocking, non-blocking and timed Chris@16: //!sending and receiving. Chris@16: Chris@16: namespace boost{ namespace interprocess{ Chris@16: Chris@16: namespace ipcdetail Chris@16: { Chris@16: template Chris@16: class msg_queue_initialization_func_t; Chris@16: } Chris@16: Chris@16: //!A class that allows sending messages Chris@16: //!between processes. Chris@16: template Chris@16: class message_queue_t Chris@16: { Chris@101: #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) Chris@16: //Blocking modes Chris@16: enum block_t { blocking, timed, non_blocking }; Chris@16: Chris@16: message_queue_t(); Chris@101: #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED Chris@16: Chris@16: public: Chris@16: typedef VoidPointer void_pointer; Chris@16: typedef typename boost::intrusive:: Chris@16: pointer_traits::template Chris@16: rebind_pointer::type char_ptr; Chris@16: typedef typename boost::intrusive::pointer_traits::difference_type difference_type; Chris@101: typedef typename boost::container::container_detail::make_unsigned::type size_type; Chris@16: Chris@16: //!Creates a process shared message queue with name "name". For this message queue, Chris@16: //!the maximum number of messages will be "max_num_msg" and the maximum message size Chris@16: //!will be "max_msg_size". Throws on error and if the queue was previously created. Chris@16: message_queue_t(create_only_t create_only, Chris@16: const char *name, Chris@16: size_type max_num_msg, Chris@16: size_type max_msg_size, Chris@16: const permissions &perm = permissions()); Chris@16: Chris@16: //!Opens or creates a process shared message queue with name "name". Chris@16: //!If the queue is created, the maximum number of messages will be "max_num_msg" Chris@16: //!and the maximum message size will be "max_msg_size". If queue was previously Chris@16: //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters Chris@16: //!are ignored. Throws on error. Chris@16: message_queue_t(open_or_create_t open_or_create, Chris@16: const char *name, Chris@16: size_type max_num_msg, Chris@16: size_type max_msg_size, Chris@16: const permissions &perm = permissions()); Chris@16: Chris@16: //!Opens a previously created process shared message queue with name "name". Chris@16: //!If the queue was not previously created or there are no free resources, Chris@16: //!throws an error. Chris@16: message_queue_t(open_only_t open_only, Chris@16: const char *name); Chris@16: Chris@16: //!Destroys *this and indicates that the calling process is finished using Chris@16: //!the resource. All opened message queues are still Chris@16: //!valid after destruction. The destructor function will deallocate Chris@16: //!any system resources allocated by the system for use by this process for Chris@16: //!this resource. The resource can still be opened again calling Chris@16: //!the open constructor overload. To erase the message queue from the system Chris@16: //!use remove(). Chris@16: ~message_queue_t(); Chris@16: Chris@16: //!Sends a message stored in buffer "buffer" with size "buffer_size" in the Chris@16: //!message queue with priority "priority". If the message queue is full Chris@16: //!the sender is blocked. Throws interprocess_error on error. Chris@16: void send (const void *buffer, size_type buffer_size, Chris@16: unsigned int priority); Chris@16: Chris@16: //!Sends a message stored in buffer "buffer" with size "buffer_size" through the Chris@16: //!message queue with priority "priority". If the message queue is full Chris@16: //!the sender is not blocked and returns false, otherwise returns true. Chris@16: //!Throws interprocess_error on error. Chris@16: bool try_send (const void *buffer, size_type buffer_size, Chris@16: unsigned int priority); Chris@16: Chris@16: //!Sends a message stored in buffer "buffer" with size "buffer_size" in the Chris@16: //!message queue with priority "priority". If the message queue is full Chris@16: //!the sender retries until time "abs_time" is reached. Returns true if Chris@16: //!the message has been successfully sent. Returns false if timeout is reached. Chris@16: //!Throws interprocess_error on error. Chris@16: bool timed_send (const void *buffer, size_type buffer_size, Chris@16: unsigned int priority, const boost::posix_time::ptime& abs_time); Chris@16: Chris@16: //!Receives a message from the message queue. The message is stored in buffer Chris@16: //!"buffer", which has size "buffer_size". The received message has size Chris@16: //!"recvd_size" and priority "priority". If the message queue is empty Chris@16: //!the receiver is blocked. Throws interprocess_error on error. Chris@16: void receive (void *buffer, size_type buffer_size, Chris@16: size_type &recvd_size,unsigned int &priority); Chris@16: Chris@16: //!Receives a message from the message queue. The message is stored in buffer Chris@16: //!"buffer", which has size "buffer_size". The received message has size Chris@16: //!"recvd_size" and priority "priority". If the message queue is empty Chris@16: //!the receiver is not blocked and returns false, otherwise returns true. Chris@16: //!Throws interprocess_error on error. Chris@16: bool try_receive (void *buffer, size_type buffer_size, Chris@16: size_type &recvd_size,unsigned int &priority); Chris@16: Chris@16: //!Receives a message from the message queue. The message is stored in buffer Chris@16: //!"buffer", which has size "buffer_size". The received message has size Chris@16: //!"recvd_size" and priority "priority". If the message queue is empty Chris@16: //!the receiver retries until time "abs_time" is reached. Returns true if Chris@16: //!the message has been successfully sent. Returns false if timeout is reached. Chris@16: //!Throws interprocess_error on error. Chris@16: bool timed_receive (void *buffer, size_type buffer_size, Chris@16: size_type &recvd_size,unsigned int &priority, Chris@16: const boost::posix_time::ptime &abs_time); Chris@16: Chris@16: //!Returns the maximum number of messages allowed by the queue. The message Chris@16: //!queue must be opened or created previously. Otherwise, returns 0. Chris@16: //!Never throws Chris@16: size_type get_max_msg() const; Chris@16: Chris@16: //!Returns the maximum size of message allowed by the queue. The message Chris@16: //!queue must be opened or created previously. Otherwise, returns 0. Chris@16: //!Never throws Chris@16: size_type get_max_msg_size() const; Chris@16: Chris@16: //!Returns the number of messages currently stored. Chris@16: //!Never throws Chris@16: size_type get_num_msg() const; Chris@16: Chris@16: //!Removes the message queue from the system. Chris@16: //!Returns false on error. Never throws Chris@16: static bool remove(const char *name); Chris@16: Chris@101: #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) Chris@16: private: Chris@16: typedef boost::posix_time::ptime ptime; Chris@16: Chris@16: friend class ipcdetail::msg_queue_initialization_func_t; Chris@16: Chris@16: bool do_receive(block_t block, Chris@16: void *buffer, size_type buffer_size, Chris@16: size_type &recvd_size, unsigned int &priority, Chris@16: const ptime &abs_time); Chris@16: Chris@16: bool do_send(block_t block, Chris@16: const void *buffer, size_type buffer_size, Chris@16: unsigned int priority, const ptime &abs_time); Chris@16: Chris@16: //!Returns the needed memory size for the shared message queue. Chris@16: //!Never throws Chris@16: static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); Chris@16: typedef ipcdetail::managed_open_or_create_impl open_create_impl_t; Chris@16: open_create_impl_t m_shmem; Chris@101: #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED Chris@16: }; Chris@16: Chris@101: #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) Chris@16: Chris@16: namespace ipcdetail { Chris@16: Chris@16: //!This header is the prefix of each message in the queue Chris@16: template Chris@16: class msg_hdr_t Chris@16: { Chris@16: typedef VoidPointer void_pointer; Chris@16: typedef typename boost::intrusive:: Chris@16: pointer_traits::template Chris@16: rebind_pointer::type char_ptr; Chris@16: typedef typename boost::intrusive::pointer_traits::difference_type difference_type; Chris@101: typedef typename boost::container::container_detail::make_unsigned::type size_type; Chris@16: Chris@16: public: Chris@16: size_type len; // Message length Chris@16: unsigned int priority;// Message priority Chris@16: //!Returns the data buffer associated with this this message Chris@16: void * data(){ return this+1; } // Chris@16: }; Chris@16: Chris@16: //!This functor is the predicate to order stored messages by priority Chris@16: template Chris@16: class priority_functor Chris@16: { Chris@16: typedef typename boost::intrusive:: Chris@16: pointer_traits::template Chris@16: rebind_pointer >::type msg_hdr_ptr_t; Chris@16: Chris@16: public: Chris@16: bool operator()(const msg_hdr_ptr_t &msg1, Chris@16: const msg_hdr_ptr_t &msg2) const Chris@16: { return msg1->priority < msg2->priority; } Chris@16: }; Chris@16: Chris@16: //!This header is placed in the beginning of the shared memory and contains Chris@16: //!the data to control the queue. This class initializes the shared memory Chris@16: //!in the following way: in ascending memory address with proper alignment Chris@16: //!fillings: Chris@16: //! Chris@16: //!-> mq_hdr_t: Chris@16: //! Main control block that controls the rest of the elements Chris@16: //! Chris@16: //!-> offset_ptr index [max_num_msg] Chris@16: //! An array of pointers with size "max_num_msg" called index. Each pointer Chris@16: //! points to a preallocated message. Elements of this array are Chris@16: //! reordered in runtime in the following way: Chris@16: //! Chris@16: //! IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined: Chris@16: //! Chris@16: //! When the current number of messages is "cur_num_msg", the array Chris@16: //! is treated like a circular buffer. Starting from position "cur_first_msg" Chris@16: //! "cur_num_msg" in a circular way, pointers point to inserted messages and the rest Chris@16: //! point to free messages. Those "cur_num_msg" pointers are Chris@16: //! ordered by the priority of the pointed message and by insertion order Chris@16: //! if two messages have the same priority. So the next message to be Chris@16: //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg] Chris@16: //! and the first free message ready to be used in a "send" operation is Chris@101: //! [cur_first_msg] if circular buffer is extended from front, Chris@16: //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise. Chris@16: //! Chris@16: //! This transforms the index in a circular buffer with an embedded free Chris@16: //! message queue. Chris@16: //! Chris@16: //! ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined): Chris@16: //! Chris@16: //! When the current number of messages is "cur_num_msg", the first Chris@16: //! "cur_num_msg" pointers point to inserted messages and the rest Chris@16: //! point to free messages. The first "cur_num_msg" pointers are Chris@16: //! ordered by the priority of the pointed message and by insertion order Chris@16: //! if two messages have the same priority. So the next message to be Chris@16: //! used in a "receive" is pointed by index [cur_num_msg-1] and the first free Chris@16: //! message ready to be used in a "send" operation is index [cur_num_msg]. Chris@16: //! Chris@16: //! This transforms the index in a fixed size priority queue with an embedded free Chris@16: //! message queue. Chris@16: //! Chris@16: //!-> struct message_t Chris@16: //! { Chris@16: //! msg_hdr_t header; Chris@16: //! char[max_msg_size] data; Chris@16: //! } messages [max_num_msg]; Chris@16: //! Chris@16: //! An array of buffers of preallocated messages, each one prefixed with the Chris@16: //! msg_hdr_t structure. Each of this message is pointed by one pointer of Chris@16: //! the index structure. Chris@16: template Chris@16: class mq_hdr_t Chris@16: : public ipcdetail::priority_functor Chris@16: { Chris@16: typedef VoidPointer void_pointer; Chris@16: typedef msg_hdr_t msg_header; Chris@16: typedef typename boost::intrusive:: Chris@16: pointer_traits::template Chris@16: rebind_pointer::type msg_hdr_ptr_t; Chris@16: typedef typename boost::intrusive::pointer_traits Chris@16: ::difference_type difference_type; Chris@101: typedef typename boost::container:: Chris@101: container_detail::make_unsigned::type size_type; Chris@16: typedef typename boost::intrusive:: Chris@16: pointer_traits::template Chris@16: rebind_pointer::type msg_hdr_ptr_ptr_t; Chris@16: typedef ipcdetail::managed_open_or_create_impl open_create_impl_t; Chris@16: Chris@16: public: Chris@16: //!Constructor. This object must be constructed in the beginning of the Chris@16: //!shared memory of the size returned by the function "get_mem_size". Chris@16: //!This constructor initializes the needed resources and creates Chris@16: //!the internal structures like the priority index. This can throw. Chris@16: mq_hdr_t(size_type max_num_msg, size_type max_msg_size) Chris@16: : m_max_num_msg(max_num_msg), Chris@16: m_max_msg_size(max_msg_size), Chris@16: m_cur_num_msg(0) Chris@16: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@16: ,m_cur_first_msg(0u) Chris@101: ,m_blocked_senders(0u) Chris@101: ,m_blocked_receivers(0u) Chris@16: #endif Chris@16: { this->initialize_memory(); } Chris@16: Chris@16: //!Returns true if the message queue is full Chris@16: bool is_full() const Chris@16: { return m_cur_num_msg == m_max_num_msg; } Chris@16: Chris@16: //!Returns true if the message queue is empty Chris@16: bool is_empty() const Chris@16: { return !m_cur_num_msg; } Chris@16: Chris@16: //!Frees the top priority message and saves it in the free message list Chris@16: void free_top_msg() Chris@16: { --m_cur_num_msg; } Chris@16: Chris@16: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@16: Chris@16: typedef msg_hdr_ptr_t *iterator; Chris@16: Chris@16: size_type end_pos() const Chris@16: { Chris@16: const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg; Chris@16: return space_until_bufend > m_cur_num_msg Chris@16: ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend; Chris@16: } Chris@16: Chris@16: //!Returns the inserted message with top priority Chris@16: msg_header &top_msg() Chris@16: { Chris@16: size_type pos = this->end_pos(); Chris@16: return *mp_index[pos ? --pos : m_max_num_msg - 1]; Chris@16: } Chris@16: Chris@16: //!Returns the inserted message with bottom priority Chris@16: msg_header &bottom_msg() Chris@16: { return *mp_index[m_cur_first_msg]; } Chris@16: Chris@16: iterator inserted_ptr_begin() const Chris@16: { return &mp_index[m_cur_first_msg]; } Chris@16: Chris@16: iterator inserted_ptr_end() const Chris@16: { return &mp_index[this->end_pos()]; } Chris@16: Chris@16: iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor func) Chris@16: { Chris@16: iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end()); Chris@16: if(end < begin){ Chris@16: iterator idx_end = &mp_index[m_max_num_msg]; Chris@101: iterator ret = std::lower_bound(begin, idx_end, value, func); Chris@16: if(idx_end == ret){ Chris@16: iterator idx_beg = &mp_index[0]; Chris@16: ret = std::lower_bound(idx_beg, end, value, func); Chris@16: //sanity check, these cases should not call lower_bound (optimized out) Chris@16: BOOST_ASSERT(ret != end); Chris@16: BOOST_ASSERT(ret != begin); Chris@16: return ret; Chris@16: } Chris@16: else{ Chris@16: return ret; Chris@16: } Chris@16: } Chris@16: else{ Chris@16: return std::lower_bound(begin, end, value, func); Chris@16: } Chris@16: } Chris@16: Chris@16: msg_header & insert_at(iterator where) Chris@16: { Chris@16: iterator it_inserted_ptr_end = this->inserted_ptr_end(); Chris@16: iterator it_inserted_ptr_beg = this->inserted_ptr_begin(); Chris@101: if(where == it_inserted_ptr_beg){ Chris@16: //unsigned integer guarantees underflow Chris@16: m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; Chris@16: --m_cur_first_msg; Chris@16: ++m_cur_num_msg; Chris@16: return *mp_index[m_cur_first_msg]; Chris@16: } Chris@101: else if(where == it_inserted_ptr_end){ Chris@101: ++m_cur_num_msg; Chris@101: return **it_inserted_ptr_end; Chris@101: } Chris@16: else{ Chris@16: size_type pos = where - &mp_index[0]; Chris@16: size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg); Chris@16: //Check if it's more efficient to move back or move front Chris@16: if(circ_pos < m_cur_num_msg/2){ Chris@16: //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos Chris@16: //indicates two step insertion Chris@16: if(!pos){ Chris@16: pos = m_max_num_msg; Chris@16: where = &mp_index[m_max_num_msg-1]; Chris@16: } Chris@16: else{ Chris@16: --where; Chris@16: } Chris@16: const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos; Chris@16: const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u; Chris@16: const size_type first_segment_end = pos; Chris@16: const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg; Chris@16: const size_type second_segment_end = m_max_num_msg; Chris@16: const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1); Chris@16: Chris@16: //First segment Chris@16: if(!unique_segment){ Chris@16: std::copy( &mp_index[0] + second_segment_beg Chris@16: , &mp_index[0] + second_segment_end Chris@16: , &mp_index[0] + second_segment_beg - 1); Chris@16: mp_index[m_max_num_msg-1] = mp_index[0]; Chris@16: } Chris@16: std::copy( &mp_index[0] + first_segment_beg Chris@16: , &mp_index[0] + first_segment_end Chris@16: , &mp_index[0] + first_segment_beg - 1); Chris@16: *where = backup; Chris@16: m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; Chris@16: --m_cur_first_msg; Chris@16: ++m_cur_num_msg; Chris@16: return **where; Chris@16: } Chris@16: else{ Chris@16: //The queue can't be full so end_pos < m_cur_first_msg Chris@16: //indicates two step insertion Chris@16: const size_type pos_end = this->end_pos(); Chris@16: const bool unique_segment = pos < pos_end; Chris@16: const size_type first_segment_beg = pos; Chris@16: const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1; Chris@16: const size_type second_segment_beg = 0u; Chris@16: const size_type second_segment_end = unique_segment ? 0u : pos_end; Chris@16: const msg_hdr_ptr_t backup = *it_inserted_ptr_end; Chris@16: Chris@16: //First segment Chris@16: if(!unique_segment){ Chris@16: std::copy_backward( &mp_index[0] + second_segment_beg Chris@16: , &mp_index[0] + second_segment_end Chris@16: , &mp_index[0] + second_segment_end + 1); Chris@16: mp_index[0] = mp_index[m_max_num_msg-1]; Chris@16: } Chris@16: std::copy_backward( &mp_index[0] + first_segment_beg Chris@16: , &mp_index[0] + first_segment_end Chris@16: , &mp_index[0] + first_segment_end + 1); Chris@16: *where = backup; Chris@16: ++m_cur_num_msg; Chris@16: return **where; Chris@16: } Chris@16: } Chris@16: } Chris@16: Chris@101: #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX Chris@16: Chris@16: typedef msg_hdr_ptr_t *iterator; Chris@16: Chris@16: //!Returns the inserted message with top priority Chris@16: msg_header &top_msg() Chris@16: { return *mp_index[m_cur_num_msg-1]; } Chris@16: Chris@16: //!Returns the inserted message with bottom priority Chris@16: msg_header &bottom_msg() Chris@16: { return *mp_index[0]; } Chris@16: Chris@16: iterator inserted_ptr_begin() const Chris@16: { return &mp_index[0]; } Chris@16: Chris@16: iterator inserted_ptr_end() const Chris@16: { return &mp_index[m_cur_num_msg]; } Chris@16: Chris@16: iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor func) Chris@16: { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); } Chris@16: Chris@16: msg_header & insert_at(iterator pos) Chris@16: { Chris@16: const msg_hdr_ptr_t backup = *inserted_ptr_end(); Chris@16: std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1); Chris@16: *pos = backup; Chris@16: ++m_cur_num_msg; Chris@16: return **pos; Chris@16: } Chris@16: Chris@101: #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX Chris@16: Chris@16: //!Inserts the first free message in the priority queue Chris@16: msg_header & queue_free_msg(unsigned int priority) Chris@16: { Chris@16: //Get priority queue's range Chris@16: iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end()); Chris@16: //Optimize for non-priority usage Chris@16: if(m_cur_num_msg && priority > this->bottom_msg().priority){ Chris@16: //Check for higher priority than all stored messages Chris@16: if(priority > this->top_msg().priority){ Chris@16: it = it_end; Chris@16: } Chris@16: else{ Chris@16: //Since we don't now which free message we will pick Chris@16: //build a dummy header for searches Chris@16: msg_header dummy_hdr; Chris@16: dummy_hdr.priority = priority; Chris@16: Chris@16: //Get free msg Chris@16: msg_hdr_ptr_t dummy_ptr(&dummy_hdr); Chris@16: Chris@16: //Check where the free message should be placed Chris@16: it = this->lower_bound(dummy_ptr, static_cast&>(*this)); Chris@16: } Chris@16: } Chris@16: //Insert the free message in the correct position Chris@16: return this->insert_at(it); Chris@16: } Chris@16: Chris@16: //!Returns the number of bytes needed to construct a message queue with Chris@16: //!"max_num_size" maximum number of messages and "max_msg_size" maximum Chris@16: //!message size. Never throws. Chris@16: static size_type get_mem_size Chris@16: (size_type max_msg_size, size_type max_num_msg) Chris@16: { Chris@16: const size_type Chris@101: msg_hdr_align = ::boost::container::container_detail::alignment_of::value, Chris@101: index_align = ::boost::container::container_detail::alignment_of::value, Chris@16: r_hdr_size = ipcdetail::ct_rounded_size::value, Chris@101: r_index_size = ipcdetail::get_rounded_size(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), Chris@101: r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header); Chris@16: return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + Chris@16: open_create_impl_t::ManagedOpenOrCreateUserOffset; Chris@16: } Chris@16: Chris@16: //!Initializes the memory structures to preallocate messages and constructs the Chris@16: //!message index. Never throws. Chris@16: void initialize_memory() Chris@16: { Chris@16: const size_type Chris@101: msg_hdr_align = ::boost::container::container_detail::alignment_of::value, Chris@101: index_align = ::boost::container::container_detail::alignment_of::value, Chris@16: r_hdr_size = ipcdetail::ct_rounded_size::value, Chris@101: r_index_size = ipcdetail::get_rounded_size(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), Chris@101: r_max_msg_size = ipcdetail::get_rounded_size(m_max_msg_size, msg_hdr_align) + sizeof(msg_header); Chris@16: Chris@16: //Pointer to the index Chris@16: msg_hdr_ptr_t *index = reinterpret_cast Chris@16: (reinterpret_cast(this)+r_hdr_size); Chris@16: Chris@16: //Pointer to the first message header Chris@16: msg_header *msg_hdr = reinterpret_cast Chris@16: (reinterpret_cast(this)+r_hdr_size+r_index_size); Chris@16: Chris@16: //Initialize the pointer to the index Chris@16: mp_index = index; Chris@16: Chris@16: //Initialize the index so each slot points to a preallocated message Chris@16: for(size_type i = 0; i < m_max_num_msg; ++i){ Chris@16: index[i] = msg_hdr; Chris@16: msg_hdr = reinterpret_cast Chris@16: (reinterpret_cast(msg_hdr)+r_max_msg_size); Chris@16: } Chris@16: } Chris@16: Chris@16: public: Chris@16: //Pointer to the index Chris@16: msg_hdr_ptr_ptr_t mp_index; Chris@16: //Maximum number of messages of the queue Chris@16: const size_type m_max_num_msg; Chris@16: //Maximum size of messages of the queue Chris@16: const size_type m_max_msg_size; Chris@16: //Current number of messages Chris@16: size_type m_cur_num_msg; Chris@16: //Mutex to protect data structures Chris@16: interprocess_mutex m_mutex; Chris@16: //Condition block receivers when there are no messages Chris@16: interprocess_condition m_cond_recv; Chris@16: //Condition block senders when the queue is full Chris@16: interprocess_condition m_cond_send; Chris@16: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@16: //Current start offset in the circular index Chris@16: size_type m_cur_first_msg; Chris@101: size_type m_blocked_senders; Chris@101: size_type m_blocked_receivers; Chris@16: #endif Chris@16: }; Chris@16: Chris@16: Chris@16: //!This is the atomic functor to be executed when creating or opening Chris@16: //!shared memory. Never throws Chris@16: template Chris@16: class msg_queue_initialization_func_t Chris@16: { Chris@16: public: Chris@16: typedef typename boost::intrusive:: Chris@16: pointer_traits::template Chris@101: rebind_pointer::type char_ptr; Chris@101: typedef typename boost::intrusive::pointer_traits:: Chris@101: difference_type difference_type; Chris@101: typedef typename boost::container::container_detail:: Chris@101: make_unsigned::type size_type; Chris@16: Chris@16: msg_queue_initialization_func_t(size_type maxmsg = 0, Chris@16: size_type maxmsgsize = 0) Chris@16: : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} Chris@16: Chris@16: bool operator()(void *address, size_type, bool created) Chris@16: { Chris@16: char *mptr; Chris@16: Chris@16: if(created){ Chris@16: mptr = reinterpret_cast(address); Chris@16: //Construct the message queue header at the beginning Chris@16: BOOST_TRY{ Chris@16: new (mptr) mq_hdr_t(m_maxmsg, m_maxmsgsize); Chris@16: } Chris@16: BOOST_CATCH(...){ Chris@16: return false; Chris@16: } Chris@16: BOOST_CATCH_END Chris@16: } Chris@16: return true; Chris@16: } Chris@16: Chris@16: std::size_t get_min_size() const Chris@16: { Chris@16: return mq_hdr_t::get_mem_size(m_maxmsgsize, m_maxmsg) Chris@16: - message_queue_t::open_create_impl_t::ManagedOpenOrCreateUserOffset; Chris@16: } Chris@16: Chris@16: const size_type m_maxmsg; Chris@16: const size_type m_maxmsgsize; Chris@16: }; Chris@16: Chris@16: } //namespace ipcdetail { Chris@16: Chris@16: template Chris@16: inline message_queue_t::~message_queue_t() Chris@16: {} Chris@16: Chris@16: template Chris@16: inline typename message_queue_t::size_type message_queue_t::get_mem_size Chris@16: (size_type max_msg_size, size_type max_num_msg) Chris@16: { return ipcdetail::mq_hdr_t::get_mem_size(max_msg_size, max_num_msg); } Chris@16: Chris@16: template Chris@16: inline message_queue_t::message_queue_t(create_only_t, Chris@16: const char *name, Chris@16: size_type max_num_msg, Chris@16: size_type max_msg_size, Chris@16: const permissions &perm) Chris@16: //Create shared memory and execute functor atomically Chris@16: : m_shmem(create_only, Chris@16: name, Chris@16: get_mem_size(max_msg_size, max_num_msg), Chris@16: read_write, Chris@16: static_cast(0), Chris@16: //Prepare initialization functor Chris@16: ipcdetail::msg_queue_initialization_func_t (max_num_msg, max_msg_size), Chris@16: perm) Chris@16: {} Chris@16: Chris@16: template Chris@16: inline message_queue_t::message_queue_t(open_or_create_t, Chris@16: const char *name, Chris@16: size_type max_num_msg, Chris@16: size_type max_msg_size, Chris@16: const permissions &perm) Chris@16: //Create shared memory and execute functor atomically Chris@16: : m_shmem(open_or_create, Chris@16: name, Chris@16: get_mem_size(max_msg_size, max_num_msg), Chris@16: read_write, Chris@16: static_cast(0), Chris@16: //Prepare initialization functor Chris@16: ipcdetail::msg_queue_initialization_func_t (max_num_msg, max_msg_size), Chris@16: perm) Chris@16: {} Chris@16: Chris@16: template Chris@16: inline message_queue_t::message_queue_t(open_only_t, const char *name) Chris@16: //Create shared memory and execute functor atomically Chris@16: : m_shmem(open_only, Chris@16: name, Chris@16: read_write, Chris@16: static_cast(0), Chris@16: //Prepare initialization functor Chris@16: ipcdetail::msg_queue_initialization_func_t ()) Chris@16: {} Chris@16: Chris@16: template Chris@16: inline void message_queue_t::send Chris@16: (const void *buffer, size_type buffer_size, unsigned int priority) Chris@16: { this->do_send(blocking, buffer, buffer_size, priority, ptime()); } Chris@16: Chris@16: template Chris@16: inline bool message_queue_t::try_send Chris@16: (const void *buffer, size_type buffer_size, unsigned int priority) Chris@16: { return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); } Chris@16: Chris@16: template Chris@16: inline bool message_queue_t::timed_send Chris@16: (const void *buffer, size_type buffer_size Chris@16: ,unsigned int priority, const boost::posix_time::ptime &abs_time) Chris@16: { Chris@16: if(abs_time == boost::posix_time::pos_infin){ Chris@16: this->send(buffer, buffer_size, priority); Chris@16: return true; Chris@16: } Chris@16: return this->do_send(timed, buffer, buffer_size, priority, abs_time); Chris@16: } Chris@16: Chris@16: template Chris@16: inline bool message_queue_t::do_send(block_t block, Chris@16: const void *buffer, size_type buffer_size, Chris@16: unsigned int priority, const boost::posix_time::ptime &abs_time) Chris@16: { Chris@16: ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); Chris@16: //Check if buffer is smaller than maximum allowed Chris@16: if (buffer_size > p_hdr->m_max_msg_size) { Chris@16: throw interprocess_exception(size_error); Chris@16: } Chris@16: Chris@101: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@101: bool notify_blocked_receivers = false; Chris@101: #endif Chris@16: //--------------------------------------------- Chris@16: scoped_lock lock(p_hdr->m_mutex); Chris@16: //--------------------------------------------- Chris@16: { Chris@16: //If the queue is full execute blocking logic Chris@16: if (p_hdr->is_full()) { Chris@101: BOOST_TRY{ Chris@101: #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX Chris@101: ++p_hdr->m_blocked_senders; Chris@101: #endif Chris@101: switch(block){ Chris@101: case non_blocking : Chris@101: #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX Chris@101: --p_hdr->m_blocked_senders; Chris@101: #endif Chris@101: return false; Chris@101: break; Chris@16: Chris@101: case blocking : Chris@101: do{ Chris@101: p_hdr->m_cond_send.wait(lock); Chris@101: } Chris@101: while (p_hdr->is_full()); Chris@101: break; Chris@16: Chris@101: case timed : Chris@101: do{ Chris@101: if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){ Chris@101: if(p_hdr->is_full()){ Chris@101: #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX Chris@101: --p_hdr->m_blocked_senders; Chris@101: #endif Chris@101: return false; Chris@101: } Chris@101: break; Chris@101: } Chris@16: } Chris@101: while (p_hdr->is_full()); Chris@101: break; Chris@101: default: Chris@101: break; Chris@101: } Chris@101: #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX Chris@101: --p_hdr->m_blocked_senders; Chris@101: #endif Chris@16: } Chris@101: BOOST_CATCH(...){ Chris@101: #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX Chris@101: --p_hdr->m_blocked_senders; Chris@101: #endif Chris@101: BOOST_RETHROW; Chris@101: } Chris@101: BOOST_CATCH_END Chris@16: } Chris@16: Chris@101: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@101: notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers; Chris@101: #endif Chris@16: //Insert the first free message in the priority queue Chris@16: ipcdetail::msg_hdr_t &free_msg_hdr = p_hdr->queue_free_msg(priority); Chris@16: Chris@16: //Sanity check, free msgs are always cleaned when received Chris@16: BOOST_ASSERT(free_msg_hdr.priority == 0); Chris@16: BOOST_ASSERT(free_msg_hdr.len == 0); Chris@16: Chris@16: //Copy control data to the free message Chris@16: free_msg_hdr.priority = priority; Chris@16: free_msg_hdr.len = buffer_size; Chris@16: Chris@16: //Copy user buffer to the message Chris@16: std::memcpy(free_msg_hdr.data(), buffer, buffer_size); Chris@16: } // Lock end Chris@16: Chris@16: //Notify outside lock to avoid contention. This might produce some Chris@16: //spurious wakeups, but it's usually far better than notifying inside. Chris@16: //If this message changes the queue empty state, notify it to receivers Chris@101: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@101: if (notify_blocked_receivers){ Chris@16: p_hdr->m_cond_recv.notify_one(); Chris@16: } Chris@101: #else Chris@101: p_hdr->m_cond_recv.notify_one(); Chris@101: #endif Chris@16: Chris@16: return true; Chris@16: } Chris@16: Chris@16: template Chris@16: inline void message_queue_t::receive(void *buffer, size_type buffer_size, Chris@16: size_type &recvd_size, unsigned int &priority) Chris@16: { this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); } Chris@16: Chris@16: template Chris@16: inline bool Chris@16: message_queue_t::try_receive(void *buffer, size_type buffer_size, Chris@16: size_type &recvd_size, unsigned int &priority) Chris@16: { return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); } Chris@16: Chris@16: template Chris@16: inline bool Chris@16: message_queue_t::timed_receive(void *buffer, size_type buffer_size, Chris@16: size_type &recvd_size, unsigned int &priority, Chris@16: const boost::posix_time::ptime &abs_time) Chris@16: { Chris@16: if(abs_time == boost::posix_time::pos_infin){ Chris@16: this->receive(buffer, buffer_size, recvd_size, priority); Chris@16: return true; Chris@16: } Chris@16: return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time); Chris@16: } Chris@16: Chris@16: template Chris@16: inline bool Chris@16: message_queue_t::do_receive(block_t block, Chris@16: void *buffer, size_type buffer_size, Chris@16: size_type &recvd_size, unsigned int &priority, Chris@16: const boost::posix_time::ptime &abs_time) Chris@16: { Chris@16: ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); Chris@16: //Check if buffer is big enough for any message Chris@16: if (buffer_size < p_hdr->m_max_msg_size) { Chris@16: throw interprocess_exception(size_error); Chris@16: } Chris@16: Chris@101: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@101: bool notify_blocked_senders = false; Chris@101: #endif Chris@16: //--------------------------------------------- Chris@16: scoped_lock lock(p_hdr->m_mutex); Chris@16: //--------------------------------------------- Chris@16: { Chris@16: //If there are no messages execute blocking logic Chris@16: if (p_hdr->is_empty()) { Chris@101: BOOST_TRY{ Chris@101: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@101: ++p_hdr->m_blocked_receivers; Chris@101: #endif Chris@101: switch(block){ Chris@101: case non_blocking : Chris@101: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@101: --p_hdr->m_blocked_receivers; Chris@101: #endif Chris@101: return false; Chris@101: break; Chris@16: Chris@101: case blocking : Chris@101: do{ Chris@101: p_hdr->m_cond_recv.wait(lock); Chris@101: } Chris@101: while (p_hdr->is_empty()); Chris@101: break; Chris@16: Chris@101: case timed : Chris@101: do{ Chris@101: if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){ Chris@101: if(p_hdr->is_empty()){ Chris@101: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@101: --p_hdr->m_blocked_receivers; Chris@101: #endif Chris@101: return false; Chris@101: } Chris@101: break; Chris@101: } Chris@16: } Chris@101: while (p_hdr->is_empty()); Chris@101: break; Chris@16: Chris@101: //Paranoia check Chris@101: default: Chris@101: break; Chris@101: } Chris@101: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@101: --p_hdr->m_blocked_receivers; Chris@101: #endif Chris@16: } Chris@101: BOOST_CATCH(...){ Chris@101: #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) Chris@101: --p_hdr->m_blocked_receivers; Chris@101: #endif Chris@101: BOOST_RETHROW; Chris@101: } Chris@101: BOOST_CATCH_END Chris@16: } Chris@16: Chris@101: #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX Chris@101: notify_blocked_senders = 0 != p_hdr->m_blocked_senders; Chris@101: #endif Chris@101: Chris@16: //There is at least one message ready to pick, get the top one Chris@16: ipcdetail::msg_hdr_t &top_msg = p_hdr->top_msg(); Chris@16: Chris@16: //Get data from the message Chris@16: recvd_size = top_msg.len; Chris@16: priority = top_msg.priority; Chris@16: Chris@16: //Some cleanup to ease debugging Chris@16: top_msg.len = 0; Chris@16: top_msg.priority = 0; Chris@16: Chris@16: //Copy data to receiver's bufers Chris@16: std::memcpy(buffer, top_msg.data(), recvd_size); Chris@16: Chris@16: //Free top message and put it in the free message list Chris@16: p_hdr->free_top_msg(); Chris@16: } //Lock end Chris@16: Chris@16: //Notify outside lock to avoid contention. This might produce some Chris@16: //spurious wakeups, but it's usually far better than notifying inside. Chris@16: //If this reception changes the queue full state, notify senders Chris@101: #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX Chris@101: if (notify_blocked_senders){ Chris@16: p_hdr->m_cond_send.notify_one(); Chris@16: } Chris@101: #else Chris@101: p_hdr->m_cond_send.notify_one(); Chris@101: #endif Chris@16: Chris@16: return true; Chris@16: } Chris@16: Chris@16: template Chris@16: inline typename message_queue_t::size_type message_queue_t::get_max_msg() const Chris@16: { Chris@16: ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); Chris@16: return p_hdr ? p_hdr->m_max_num_msg : 0; } Chris@16: Chris@16: template Chris@16: inline typename message_queue_t::size_type message_queue_t::get_max_msg_size() const Chris@16: { Chris@16: ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); Chris@16: return p_hdr ? p_hdr->m_max_msg_size : 0; Chris@16: } Chris@16: Chris@16: template Chris@16: inline typename message_queue_t::size_type message_queue_t::get_num_msg() const Chris@16: { Chris@16: ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); Chris@16: if(p_hdr){ Chris@16: //--------------------------------------------- Chris@16: scoped_lock lock(p_hdr->m_mutex); Chris@16: //--------------------------------------------- Chris@16: return p_hdr->m_cur_num_msg; Chris@16: } Chris@16: Chris@16: return 0; Chris@16: } Chris@16: Chris@16: template Chris@16: inline bool message_queue_t::remove(const char *name) Chris@16: { return shared_memory_object::remove(name); } Chris@16: Chris@101: #else Chris@101: Chris@101: //!Typedef for a default message queue Chris@101: //!to be used between processes Chris@101: typedef message_queue_t > message_queue; Chris@101: Chris@101: #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED Chris@16: Chris@16: }} //namespace boost{ namespace interprocess{ Chris@16: Chris@16: #include Chris@16: Chris@16: #endif //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP