annotate DEPENDENCIES/generic/include/boost/interprocess/ipc/message_queue.hpp @ 133:4acb5d8d80b6 tip

Don't fail environmental check if README.md exists (but .txt and no-suffix don't)
author Chris Cannam
date Tue, 30 Jul 2019 12:25:44 +0100
parents c530137014c0
children
rev   line source
Chris@16 1 //////////////////////////////////////////////////////////////////////////////
Chris@16 2 //
Chris@16 3 // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
Chris@16 4 // Software License, Version 1.0. (See accompanying file
Chris@16 5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Chris@16 6 //
Chris@16 7 // See http://www.boost.org/libs/interprocess for documentation.
Chris@16 8 //
Chris@16 9 //////////////////////////////////////////////////////////////////////////////
Chris@16 10
Chris@16 11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
Chris@16 12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
Chris@16 13
Chris@101 14 #ifndef BOOST_CONFIG_HPP
Chris@101 15 # include <boost/config.hpp>
Chris@101 16 #endif
Chris@101 17 #
Chris@101 18 #if defined(BOOST_HAS_PRAGMA_ONCE)
Chris@101 19 # pragma once
Chris@101 20 #endif
Chris@101 21
Chris@16 22 #include <boost/interprocess/detail/config_begin.hpp>
Chris@16 23 #include <boost/interprocess/detail/workaround.hpp>
Chris@16 24
Chris@16 25 #include <boost/interprocess/shared_memory_object.hpp>
Chris@16 26 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
Chris@16 27 #include <boost/interprocess/sync/interprocess_condition.hpp>
Chris@16 28 #include <boost/interprocess/sync/interprocess_mutex.hpp>
Chris@16 29 #include <boost/interprocess/sync/scoped_lock.hpp>
Chris@16 30 #include <boost/interprocess/detail/utilities.hpp>
Chris@16 31 #include <boost/interprocess/offset_ptr.hpp>
Chris@16 32 #include <boost/interprocess/creation_tags.hpp>
Chris@16 33 #include <boost/interprocess/exceptions.hpp>
Chris@16 34 #include <boost/interprocess/permissions.hpp>
Chris@101 35 #include <boost/core/no_exceptions_support.hpp>
Chris@16 36 #include <boost/interprocess/detail/type_traits.hpp>
Chris@16 37 #include <boost/intrusive/pointer_traits.hpp>
Chris@101 38 #include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of
Chris@16 39 #include <boost/intrusive/pointer_traits.hpp>
Chris@16 40 #include <boost/assert.hpp>
Chris@16 41 #include <algorithm> //std::lower_bound
Chris@16 42 #include <cstddef> //std::size_t
Chris@16 43 #include <cstring> //memcpy
Chris@16 44
Chris@16 45
Chris@16 46 //!\file
Chris@16 47 //!Describes an inter-process message queue. This class allows sending
Chris@16 48 //!messages between processes and allows blocking, non-blocking and timed
Chris@16 49 //!sending and receiving.
Chris@16 50
Chris@16 51 namespace boost{ namespace interprocess{
Chris@16 52
Chris@16 53 namespace ipcdetail
Chris@16 54 {
Chris@16 55 template<class VoidPointer>
Chris@16 56 class msg_queue_initialization_func_t;
Chris@16 57 }
Chris@16 58
Chris@16 59 //!A class that allows sending messages
Chris@16 60 //!between processes.
Chris@16 61 template<class VoidPointer>
Chris@16 62 class message_queue_t
Chris@16 63 {
Chris@101 64 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
Chris@16 65 //Blocking modes
Chris@16 66 enum block_t { blocking, timed, non_blocking };
Chris@16 67
Chris@16 68 message_queue_t();
Chris@101 69 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
Chris@16 70
Chris@16 71 public:
Chris@16 72 typedef VoidPointer void_pointer;
Chris@16 73 typedef typename boost::intrusive::
Chris@16 74 pointer_traits<void_pointer>::template
Chris@16 75 rebind_pointer<char>::type char_ptr;
Chris@16 76 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
Chris@101 77 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type;
Chris@16 78
Chris@16 79 //!Creates a process shared message queue with name "name". For this message queue,
Chris@16 80 //!the maximum number of messages will be "max_num_msg" and the maximum message size
Chris@16 81 //!will be "max_msg_size". Throws on error and if the queue was previously created.
Chris@16 82 message_queue_t(create_only_t create_only,
Chris@16 83 const char *name,
Chris@16 84 size_type max_num_msg,
Chris@16 85 size_type max_msg_size,
Chris@16 86 const permissions &perm = permissions());
Chris@16 87
Chris@16 88 //!Opens or creates a process shared message queue with name "name".
Chris@16 89 //!If the queue is created, the maximum number of messages will be "max_num_msg"
Chris@16 90 //!and the maximum message size will be "max_msg_size". If queue was previously
Chris@16 91 //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
Chris@16 92 //!are ignored. Throws on error.
Chris@16 93 message_queue_t(open_or_create_t open_or_create,
Chris@16 94 const char *name,
Chris@16 95 size_type max_num_msg,
Chris@16 96 size_type max_msg_size,
Chris@16 97 const permissions &perm = permissions());
Chris@16 98
Chris@16 99 //!Opens a previously created process shared message queue with name "name".
Chris@16 100 //!If the queue was not previously created or there are no free resources,
Chris@16 101 //!throws an error.
Chris@16 102 message_queue_t(open_only_t open_only,
Chris@16 103 const char *name);
Chris@16 104
Chris@16 105 //!Destroys *this and indicates that the calling process is finished using
Chris@16 106 //!the resource. All opened message queues are still
Chris@16 107 //!valid after destruction. The destructor function will deallocate
Chris@16 108 //!any system resources allocated by the system for use by this process for
Chris@16 109 //!this resource. The resource can still be opened again calling
Chris@16 110 //!the open constructor overload. To erase the message queue from the system
Chris@16 111 //!use remove().
Chris@16 112 ~message_queue_t();
Chris@16 113
Chris@16 114 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
Chris@16 115 //!message queue with priority "priority". If the message queue is full
Chris@16 116 //!the sender is blocked. Throws interprocess_error on error.
Chris@16 117 void send (const void *buffer, size_type buffer_size,
Chris@16 118 unsigned int priority);
Chris@16 119
Chris@16 120 //!Sends a message stored in buffer "buffer" with size "buffer_size" through the
Chris@16 121 //!message queue with priority "priority". If the message queue is full
Chris@16 122 //!the sender is not blocked and returns false, otherwise returns true.
Chris@16 123 //!Throws interprocess_error on error.
Chris@16 124 bool try_send (const void *buffer, size_type buffer_size,
Chris@16 125 unsigned int priority);
Chris@16 126
Chris@16 127 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
Chris@16 128 //!message queue with priority "priority". If the message queue is full
Chris@16 129 //!the sender retries until time "abs_time" is reached. Returns true if
Chris@16 130 //!the message has been successfully sent. Returns false if timeout is reached.
Chris@16 131 //!Throws interprocess_error on error.
Chris@16 132 bool timed_send (const void *buffer, size_type buffer_size,
Chris@16 133 unsigned int priority, const boost::posix_time::ptime& abs_time);
Chris@16 134
Chris@16 135 //!Receives a message from the message queue. The message is stored in buffer
Chris@16 136 //!"buffer", which has size "buffer_size". The received message has size
Chris@16 137 //!"recvd_size" and priority "priority". If the message queue is empty
Chris@16 138 //!the receiver is blocked. Throws interprocess_error on error.
Chris@16 139 void receive (void *buffer, size_type buffer_size,
Chris@16 140 size_type &recvd_size,unsigned int &priority);
Chris@16 141
Chris@16 142 //!Receives a message from the message queue. The message is stored in buffer
Chris@16 143 //!"buffer", which has size "buffer_size". The received message has size
Chris@16 144 //!"recvd_size" and priority "priority". If the message queue is empty
Chris@16 145 //!the receiver is not blocked and returns false, otherwise returns true.
Chris@16 146 //!Throws interprocess_error on error.
Chris@16 147 bool try_receive (void *buffer, size_type buffer_size,
Chris@16 148 size_type &recvd_size,unsigned int &priority);
Chris@16 149
Chris@16 150 //!Receives a message from the message queue. The message is stored in buffer
Chris@16 151 //!"buffer", which has size "buffer_size". The received message has size
Chris@16 152 //!"recvd_size" and priority "priority". If the message queue is empty
Chris@16 153 //!the receiver retries until time "abs_time" is reached. Returns true if
Chris@16 154 //!the message has been successfully sent. Returns false if timeout is reached.
Chris@16 155 //!Throws interprocess_error on error.
Chris@16 156 bool timed_receive (void *buffer, size_type buffer_size,
Chris@16 157 size_type &recvd_size,unsigned int &priority,
Chris@16 158 const boost::posix_time::ptime &abs_time);
Chris@16 159
Chris@16 160 //!Returns the maximum number of messages allowed by the queue. The message
Chris@16 161 //!queue must be opened or created previously. Otherwise, returns 0.
Chris@16 162 //!Never throws
Chris@16 163 size_type get_max_msg() const;
Chris@16 164
Chris@16 165 //!Returns the maximum size of message allowed by the queue. The message
Chris@16 166 //!queue must be opened or created previously. Otherwise, returns 0.
Chris@16 167 //!Never throws
Chris@16 168 size_type get_max_msg_size() const;
Chris@16 169
Chris@16 170 //!Returns the number of messages currently stored.
Chris@16 171 //!Never throws
Chris@16 172 size_type get_num_msg() const;
Chris@16 173
Chris@16 174 //!Removes the message queue from the system.
Chris@16 175 //!Returns false on error. Never throws
Chris@16 176 static bool remove(const char *name);
Chris@16 177
Chris@101 178 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
Chris@16 179 private:
Chris@16 180 typedef boost::posix_time::ptime ptime;
Chris@16 181
Chris@16 182 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
Chris@16 183
Chris@16 184 bool do_receive(block_t block,
Chris@16 185 void *buffer, size_type buffer_size,
Chris@16 186 size_type &recvd_size, unsigned int &priority,
Chris@16 187 const ptime &abs_time);
Chris@16 188
Chris@16 189 bool do_send(block_t block,
Chris@16 190 const void *buffer, size_type buffer_size,
Chris@16 191 unsigned int priority, const ptime &abs_time);
Chris@16 192
Chris@16 193 //!Returns the needed memory size for the shared message queue.
Chris@16 194 //!Never throws
Chris@16 195 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
Chris@16 196 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
Chris@16 197 open_create_impl_t m_shmem;
Chris@101 198 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
Chris@16 199 };
Chris@16 200
Chris@101 201 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
Chris@16 202
Chris@16 203 namespace ipcdetail {
Chris@16 204
Chris@16 205 //!This header is the prefix of each message in the queue
Chris@16 206 template<class VoidPointer>
Chris@16 207 class msg_hdr_t
Chris@16 208 {
Chris@16 209 typedef VoidPointer void_pointer;
Chris@16 210 typedef typename boost::intrusive::
Chris@16 211 pointer_traits<void_pointer>::template
Chris@16 212 rebind_pointer<char>::type char_ptr;
Chris@16 213 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
Chris@101 214 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type;
Chris@16 215
Chris@16 216 public:
Chris@16 217 size_type len; // Message length
Chris@16 218 unsigned int priority;// Message priority
Chris@16 219 //!Returns the data buffer associated with this this message
Chris@16 220 void * data(){ return this+1; } //
Chris@16 221 };
Chris@16 222
Chris@16 223 //!This functor is the predicate to order stored messages by priority
Chris@16 224 template<class VoidPointer>
Chris@16 225 class priority_functor
Chris@16 226 {
Chris@16 227 typedef typename boost::intrusive::
Chris@16 228 pointer_traits<VoidPointer>::template
Chris@16 229 rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t;
Chris@16 230
Chris@16 231 public:
Chris@16 232 bool operator()(const msg_hdr_ptr_t &msg1,
Chris@16 233 const msg_hdr_ptr_t &msg2) const
Chris@16 234 { return msg1->priority < msg2->priority; }
Chris@16 235 };
Chris@16 236
Chris@16 237 //!This header is placed in the beginning of the shared memory and contains
Chris@16 238 //!the data to control the queue. This class initializes the shared memory
Chris@16 239 //!in the following way: in ascending memory address with proper alignment
Chris@16 240 //!fillings:
Chris@16 241 //!
Chris@16 242 //!-> mq_hdr_t:
Chris@16 243 //! Main control block that controls the rest of the elements
Chris@16 244 //!
Chris@16 245 //!-> offset_ptr<msg_hdr_t> index [max_num_msg]
Chris@16 246 //! An array of pointers with size "max_num_msg" called index. Each pointer
Chris@16 247 //! points to a preallocated message. Elements of this array are
Chris@16 248 //! reordered in runtime in the following way:
Chris@16 249 //!
Chris@16 250 //! IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined:
Chris@16 251 //!
Chris@16 252 //! When the current number of messages is "cur_num_msg", the array
Chris@16 253 //! is treated like a circular buffer. Starting from position "cur_first_msg"
Chris@16 254 //! "cur_num_msg" in a circular way, pointers point to inserted messages and the rest
Chris@16 255 //! point to free messages. Those "cur_num_msg" pointers are
Chris@16 256 //! ordered by the priority of the pointed message and by insertion order
Chris@16 257 //! if two messages have the same priority. So the next message to be
Chris@16 258 //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg]
Chris@16 259 //! and the first free message ready to be used in a "send" operation is
Chris@101 260 //! [cur_first_msg] if circular buffer is extended from front,
Chris@16 261 //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise.
Chris@16 262 //!
Chris@16 263 //! This transforms the index in a circular buffer with an embedded free
Chris@16 264 //! message queue.
Chris@16 265 //!
Chris@16 266 //! ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined):
Chris@16 267 //!
Chris@16 268 //! When the current number of messages is "cur_num_msg", the first
Chris@16 269 //! "cur_num_msg" pointers point to inserted messages and the rest
Chris@16 270 //! point to free messages. The first "cur_num_msg" pointers are
Chris@16 271 //! ordered by the priority of the pointed message and by insertion order
Chris@16 272 //! if two messages have the same priority. So the next message to be
Chris@16 273 //! used in a "receive" is pointed by index [cur_num_msg-1] and the first free
Chris@16 274 //! message ready to be used in a "send" operation is index [cur_num_msg].
Chris@16 275 //!
Chris@16 276 //! This transforms the index in a fixed size priority queue with an embedded free
Chris@16 277 //! message queue.
Chris@16 278 //!
Chris@16 279 //!-> struct message_t
Chris@16 280 //! {
Chris@16 281 //! msg_hdr_t header;
Chris@16 282 //! char[max_msg_size] data;
Chris@16 283 //! } messages [max_num_msg];
Chris@16 284 //!
Chris@16 285 //! An array of buffers of preallocated messages, each one prefixed with the
Chris@16 286 //! msg_hdr_t structure. Each of this message is pointed by one pointer of
Chris@16 287 //! the index structure.
Chris@16 288 template<class VoidPointer>
Chris@16 289 class mq_hdr_t
Chris@16 290 : public ipcdetail::priority_functor<VoidPointer>
Chris@16 291 {
Chris@16 292 typedef VoidPointer void_pointer;
Chris@16 293 typedef msg_hdr_t<void_pointer> msg_header;
Chris@16 294 typedef typename boost::intrusive::
Chris@16 295 pointer_traits<void_pointer>::template
Chris@16 296 rebind_pointer<msg_header>::type msg_hdr_ptr_t;
Chris@16 297 typedef typename boost::intrusive::pointer_traits
Chris@16 298 <msg_hdr_ptr_t>::difference_type difference_type;
Chris@101 299 typedef typename boost::container::
Chris@101 300 container_detail::make_unsigned<difference_type>::type size_type;
Chris@16 301 typedef typename boost::intrusive::
Chris@16 302 pointer_traits<void_pointer>::template
Chris@16 303 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t;
Chris@16 304 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
Chris@16 305
Chris@16 306 public:
Chris@16 307 //!Constructor. This object must be constructed in the beginning of the
Chris@16 308 //!shared memory of the size returned by the function "get_mem_size".
Chris@16 309 //!This constructor initializes the needed resources and creates
Chris@16 310 //!the internal structures like the priority index. This can throw.
Chris@16 311 mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
Chris@16 312 : m_max_num_msg(max_num_msg),
Chris@16 313 m_max_msg_size(max_msg_size),
Chris@16 314 m_cur_num_msg(0)
Chris@16 315 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@16 316 ,m_cur_first_msg(0u)
Chris@101 317 ,m_blocked_senders(0u)
Chris@101 318 ,m_blocked_receivers(0u)
Chris@16 319 #endif
Chris@16 320 { this->initialize_memory(); }
Chris@16 321
Chris@16 322 //!Returns true if the message queue is full
Chris@16 323 bool is_full() const
Chris@16 324 { return m_cur_num_msg == m_max_num_msg; }
Chris@16 325
Chris@16 326 //!Returns true if the message queue is empty
Chris@16 327 bool is_empty() const
Chris@16 328 { return !m_cur_num_msg; }
Chris@16 329
Chris@16 330 //!Frees the top priority message and saves it in the free message list
Chris@16 331 void free_top_msg()
Chris@16 332 { --m_cur_num_msg; }
Chris@16 333
Chris@16 334 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@16 335
Chris@16 336 typedef msg_hdr_ptr_t *iterator;
Chris@16 337
Chris@16 338 size_type end_pos() const
Chris@16 339 {
Chris@16 340 const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
Chris@16 341 return space_until_bufend > m_cur_num_msg
Chris@16 342 ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
Chris@16 343 }
Chris@16 344
Chris@16 345 //!Returns the inserted message with top priority
Chris@16 346 msg_header &top_msg()
Chris@16 347 {
Chris@16 348 size_type pos = this->end_pos();
Chris@16 349 return *mp_index[pos ? --pos : m_max_num_msg - 1];
Chris@16 350 }
Chris@16 351
Chris@16 352 //!Returns the inserted message with bottom priority
Chris@16 353 msg_header &bottom_msg()
Chris@16 354 { return *mp_index[m_cur_first_msg]; }
Chris@16 355
Chris@16 356 iterator inserted_ptr_begin() const
Chris@16 357 { return &mp_index[m_cur_first_msg]; }
Chris@16 358
Chris@16 359 iterator inserted_ptr_end() const
Chris@16 360 { return &mp_index[this->end_pos()]; }
Chris@16 361
Chris@16 362 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
Chris@16 363 {
Chris@16 364 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
Chris@16 365 if(end < begin){
Chris@16 366 iterator idx_end = &mp_index[m_max_num_msg];
Chris@101 367 iterator ret = std::lower_bound(begin, idx_end, value, func);
Chris@16 368 if(idx_end == ret){
Chris@16 369 iterator idx_beg = &mp_index[0];
Chris@16 370 ret = std::lower_bound(idx_beg, end, value, func);
Chris@16 371 //sanity check, these cases should not call lower_bound (optimized out)
Chris@16 372 BOOST_ASSERT(ret != end);
Chris@16 373 BOOST_ASSERT(ret != begin);
Chris@16 374 return ret;
Chris@16 375 }
Chris@16 376 else{
Chris@16 377 return ret;
Chris@16 378 }
Chris@16 379 }
Chris@16 380 else{
Chris@16 381 return std::lower_bound(begin, end, value, func);
Chris@16 382 }
Chris@16 383 }
Chris@16 384
Chris@16 385 msg_header & insert_at(iterator where)
Chris@16 386 {
Chris@16 387 iterator it_inserted_ptr_end = this->inserted_ptr_end();
Chris@16 388 iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
Chris@101 389 if(where == it_inserted_ptr_beg){
Chris@16 390 //unsigned integer guarantees underflow
Chris@16 391 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
Chris@16 392 --m_cur_first_msg;
Chris@16 393 ++m_cur_num_msg;
Chris@16 394 return *mp_index[m_cur_first_msg];
Chris@16 395 }
Chris@101 396 else if(where == it_inserted_ptr_end){
Chris@101 397 ++m_cur_num_msg;
Chris@101 398 return **it_inserted_ptr_end;
Chris@101 399 }
Chris@16 400 else{
Chris@16 401 size_type pos = where - &mp_index[0];
Chris@16 402 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
Chris@16 403 //Check if it's more efficient to move back or move front
Chris@16 404 if(circ_pos < m_cur_num_msg/2){
Chris@16 405 //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos
Chris@16 406 //indicates two step insertion
Chris@16 407 if(!pos){
Chris@16 408 pos = m_max_num_msg;
Chris@16 409 where = &mp_index[m_max_num_msg-1];
Chris@16 410 }
Chris@16 411 else{
Chris@16 412 --where;
Chris@16 413 }
Chris@16 414 const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
Chris@16 415 const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u;
Chris@16 416 const size_type first_segment_end = pos;
Chris@16 417 const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
Chris@16 418 const size_type second_segment_end = m_max_num_msg;
Chris@16 419 const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1);
Chris@16 420
Chris@16 421 //First segment
Chris@16 422 if(!unique_segment){
Chris@16 423 std::copy( &mp_index[0] + second_segment_beg
Chris@16 424 , &mp_index[0] + second_segment_end
Chris@16 425 , &mp_index[0] + second_segment_beg - 1);
Chris@16 426 mp_index[m_max_num_msg-1] = mp_index[0];
Chris@16 427 }
Chris@16 428 std::copy( &mp_index[0] + first_segment_beg
Chris@16 429 , &mp_index[0] + first_segment_end
Chris@16 430 , &mp_index[0] + first_segment_beg - 1);
Chris@16 431 *where = backup;
Chris@16 432 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
Chris@16 433 --m_cur_first_msg;
Chris@16 434 ++m_cur_num_msg;
Chris@16 435 return **where;
Chris@16 436 }
Chris@16 437 else{
Chris@16 438 //The queue can't be full so end_pos < m_cur_first_msg
Chris@16 439 //indicates two step insertion
Chris@16 440 const size_type pos_end = this->end_pos();
Chris@16 441 const bool unique_segment = pos < pos_end;
Chris@16 442 const size_type first_segment_beg = pos;
Chris@16 443 const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1;
Chris@16 444 const size_type second_segment_beg = 0u;
Chris@16 445 const size_type second_segment_end = unique_segment ? 0u : pos_end;
Chris@16 446 const msg_hdr_ptr_t backup = *it_inserted_ptr_end;
Chris@16 447
Chris@16 448 //First segment
Chris@16 449 if(!unique_segment){
Chris@16 450 std::copy_backward( &mp_index[0] + second_segment_beg
Chris@16 451 , &mp_index[0] + second_segment_end
Chris@16 452 , &mp_index[0] + second_segment_end + 1);
Chris@16 453 mp_index[0] = mp_index[m_max_num_msg-1];
Chris@16 454 }
Chris@16 455 std::copy_backward( &mp_index[0] + first_segment_beg
Chris@16 456 , &mp_index[0] + first_segment_end
Chris@16 457 , &mp_index[0] + first_segment_end + 1);
Chris@16 458 *where = backup;
Chris@16 459 ++m_cur_num_msg;
Chris@16 460 return **where;
Chris@16 461 }
Chris@16 462 }
Chris@16 463 }
Chris@16 464
Chris@101 465 #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
Chris@16 466
Chris@16 467 typedef msg_hdr_ptr_t *iterator;
Chris@16 468
Chris@16 469 //!Returns the inserted message with top priority
Chris@16 470 msg_header &top_msg()
Chris@16 471 { return *mp_index[m_cur_num_msg-1]; }
Chris@16 472
Chris@16 473 //!Returns the inserted message with bottom priority
Chris@16 474 msg_header &bottom_msg()
Chris@16 475 { return *mp_index[0]; }
Chris@16 476
Chris@16 477 iterator inserted_ptr_begin() const
Chris@16 478 { return &mp_index[0]; }
Chris@16 479
Chris@16 480 iterator inserted_ptr_end() const
Chris@16 481 { return &mp_index[m_cur_num_msg]; }
Chris@16 482
Chris@16 483 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
Chris@16 484 { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); }
Chris@16 485
Chris@16 486 msg_header & insert_at(iterator pos)
Chris@16 487 {
Chris@16 488 const msg_hdr_ptr_t backup = *inserted_ptr_end();
Chris@16 489 std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
Chris@16 490 *pos = backup;
Chris@16 491 ++m_cur_num_msg;
Chris@16 492 return **pos;
Chris@16 493 }
Chris@16 494
Chris@101 495 #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
Chris@16 496
Chris@16 497 //!Inserts the first free message in the priority queue
Chris@16 498 msg_header & queue_free_msg(unsigned int priority)
Chris@16 499 {
Chris@16 500 //Get priority queue's range
Chris@16 501 iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end());
Chris@16 502 //Optimize for non-priority usage
Chris@16 503 if(m_cur_num_msg && priority > this->bottom_msg().priority){
Chris@16 504 //Check for higher priority than all stored messages
Chris@16 505 if(priority > this->top_msg().priority){
Chris@16 506 it = it_end;
Chris@16 507 }
Chris@16 508 else{
Chris@16 509 //Since we don't now which free message we will pick
Chris@16 510 //build a dummy header for searches
Chris@16 511 msg_header dummy_hdr;
Chris@16 512 dummy_hdr.priority = priority;
Chris@16 513
Chris@16 514 //Get free msg
Chris@16 515 msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
Chris@16 516
Chris@16 517 //Check where the free message should be placed
Chris@16 518 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
Chris@16 519 }
Chris@16 520 }
Chris@16 521 //Insert the free message in the correct position
Chris@16 522 return this->insert_at(it);
Chris@16 523 }
Chris@16 524
Chris@16 525 //!Returns the number of bytes needed to construct a message queue with
Chris@16 526 //!"max_num_size" maximum number of messages and "max_msg_size" maximum
Chris@16 527 //!message size. Never throws.
Chris@16 528 static size_type get_mem_size
Chris@16 529 (size_type max_msg_size, size_type max_num_msg)
Chris@16 530 {
Chris@16 531 const size_type
Chris@101 532 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value,
Chris@101 533 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value,
Chris@16 534 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
Chris@101 535 r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
Chris@101 536 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
Chris@16 537 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
Chris@16 538 open_create_impl_t::ManagedOpenOrCreateUserOffset;
Chris@16 539 }
Chris@16 540
Chris@16 541 //!Initializes the memory structures to preallocate messages and constructs the
Chris@16 542 //!message index. Never throws.
Chris@16 543 void initialize_memory()
Chris@16 544 {
Chris@16 545 const size_type
Chris@101 546 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value,
Chris@101 547 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value,
Chris@16 548 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
Chris@101 549 r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
Chris@101 550 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
Chris@16 551
Chris@16 552 //Pointer to the index
Chris@16 553 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*>
Chris@16 554 (reinterpret_cast<char*>(this)+r_hdr_size);
Chris@16 555
Chris@16 556 //Pointer to the first message header
Chris@16 557 msg_header *msg_hdr = reinterpret_cast<msg_header*>
Chris@16 558 (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
Chris@16 559
Chris@16 560 //Initialize the pointer to the index
Chris@16 561 mp_index = index;
Chris@16 562
Chris@16 563 //Initialize the index so each slot points to a preallocated message
Chris@16 564 for(size_type i = 0; i < m_max_num_msg; ++i){
Chris@16 565 index[i] = msg_hdr;
Chris@16 566 msg_hdr = reinterpret_cast<msg_header*>
Chris@16 567 (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
Chris@16 568 }
Chris@16 569 }
Chris@16 570
Chris@16 571 public:
Chris@16 572 //Pointer to the index
Chris@16 573 msg_hdr_ptr_ptr_t mp_index;
Chris@16 574 //Maximum number of messages of the queue
Chris@16 575 const size_type m_max_num_msg;
Chris@16 576 //Maximum size of messages of the queue
Chris@16 577 const size_type m_max_msg_size;
Chris@16 578 //Current number of messages
Chris@16 579 size_type m_cur_num_msg;
Chris@16 580 //Mutex to protect data structures
Chris@16 581 interprocess_mutex m_mutex;
Chris@16 582 //Condition block receivers when there are no messages
Chris@16 583 interprocess_condition m_cond_recv;
Chris@16 584 //Condition block senders when the queue is full
Chris@16 585 interprocess_condition m_cond_send;
Chris@16 586 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@16 587 //Current start offset in the circular index
Chris@16 588 size_type m_cur_first_msg;
Chris@101 589 size_type m_blocked_senders;
Chris@101 590 size_type m_blocked_receivers;
Chris@16 591 #endif
Chris@16 592 };
Chris@16 593
Chris@16 594
Chris@16 595 //!This is the atomic functor to be executed when creating or opening
Chris@16 596 //!shared memory. Never throws
Chris@16 597 template<class VoidPointer>
Chris@16 598 class msg_queue_initialization_func_t
Chris@16 599 {
Chris@16 600 public:
Chris@16 601 typedef typename boost::intrusive::
Chris@16 602 pointer_traits<VoidPointer>::template
Chris@101 603 rebind_pointer<char>::type char_ptr;
Chris@101 604 typedef typename boost::intrusive::pointer_traits<char_ptr>::
Chris@101 605 difference_type difference_type;
Chris@101 606 typedef typename boost::container::container_detail::
Chris@101 607 make_unsigned<difference_type>::type size_type;
Chris@16 608
Chris@16 609 msg_queue_initialization_func_t(size_type maxmsg = 0,
Chris@16 610 size_type maxmsgsize = 0)
Chris@16 611 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
Chris@16 612
Chris@16 613 bool operator()(void *address, size_type, bool created)
Chris@16 614 {
Chris@16 615 char *mptr;
Chris@16 616
Chris@16 617 if(created){
Chris@16 618 mptr = reinterpret_cast<char*>(address);
Chris@16 619 //Construct the message queue header at the beginning
Chris@16 620 BOOST_TRY{
Chris@16 621 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
Chris@16 622 }
Chris@16 623 BOOST_CATCH(...){
Chris@16 624 return false;
Chris@16 625 }
Chris@16 626 BOOST_CATCH_END
Chris@16 627 }
Chris@16 628 return true;
Chris@16 629 }
Chris@16 630
Chris@16 631 std::size_t get_min_size() const
Chris@16 632 {
Chris@16 633 return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
Chris@16 634 - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
Chris@16 635 }
Chris@16 636
Chris@16 637 const size_type m_maxmsg;
Chris@16 638 const size_type m_maxmsgsize;
Chris@16 639 };
Chris@16 640
Chris@16 641 } //namespace ipcdetail {
Chris@16 642
Chris@16 643 template<class VoidPointer>
Chris@16 644 inline message_queue_t<VoidPointer>::~message_queue_t()
Chris@16 645 {}
Chris@16 646
Chris@16 647 template<class VoidPointer>
Chris@16 648 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
Chris@16 649 (size_type max_msg_size, size_type max_num_msg)
Chris@16 650 { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); }
Chris@16 651
Chris@16 652 template<class VoidPointer>
Chris@16 653 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
Chris@16 654 const char *name,
Chris@16 655 size_type max_num_msg,
Chris@16 656 size_type max_msg_size,
Chris@16 657 const permissions &perm)
Chris@16 658 //Create shared memory and execute functor atomically
Chris@16 659 : m_shmem(create_only,
Chris@16 660 name,
Chris@16 661 get_mem_size(max_msg_size, max_num_msg),
Chris@16 662 read_write,
Chris@16 663 static_cast<void*>(0),
Chris@16 664 //Prepare initialization functor
Chris@16 665 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
Chris@16 666 perm)
Chris@16 667 {}
Chris@16 668
Chris@16 669 template<class VoidPointer>
Chris@16 670 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
Chris@16 671 const char *name,
Chris@16 672 size_type max_num_msg,
Chris@16 673 size_type max_msg_size,
Chris@16 674 const permissions &perm)
Chris@16 675 //Create shared memory and execute functor atomically
Chris@16 676 : m_shmem(open_or_create,
Chris@16 677 name,
Chris@16 678 get_mem_size(max_msg_size, max_num_msg),
Chris@16 679 read_write,
Chris@16 680 static_cast<void*>(0),
Chris@16 681 //Prepare initialization functor
Chris@16 682 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
Chris@16 683 perm)
Chris@16 684 {}
Chris@16 685
Chris@16 686 template<class VoidPointer>
Chris@16 687 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
Chris@16 688 //Create shared memory and execute functor atomically
Chris@16 689 : m_shmem(open_only,
Chris@16 690 name,
Chris@16 691 read_write,
Chris@16 692 static_cast<void*>(0),
Chris@16 693 //Prepare initialization functor
Chris@16 694 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
Chris@16 695 {}
Chris@16 696
Chris@16 697 template<class VoidPointer>
Chris@16 698 inline void message_queue_t<VoidPointer>::send
Chris@16 699 (const void *buffer, size_type buffer_size, unsigned int priority)
Chris@16 700 { this->do_send(blocking, buffer, buffer_size, priority, ptime()); }
Chris@16 701
Chris@16 702 template<class VoidPointer>
Chris@16 703 inline bool message_queue_t<VoidPointer>::try_send
Chris@16 704 (const void *buffer, size_type buffer_size, unsigned int priority)
Chris@16 705 { return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }
Chris@16 706
Chris@16 707 template<class VoidPointer>
Chris@16 708 inline bool message_queue_t<VoidPointer>::timed_send
Chris@16 709 (const void *buffer, size_type buffer_size
Chris@16 710 ,unsigned int priority, const boost::posix_time::ptime &abs_time)
Chris@16 711 {
Chris@16 712 if(abs_time == boost::posix_time::pos_infin){
Chris@16 713 this->send(buffer, buffer_size, priority);
Chris@16 714 return true;
Chris@16 715 }
Chris@16 716 return this->do_send(timed, buffer, buffer_size, priority, abs_time);
Chris@16 717 }
Chris@16 718
Chris@16 719 template<class VoidPointer>
Chris@16 720 inline bool message_queue_t<VoidPointer>::do_send(block_t block,
Chris@16 721 const void *buffer, size_type buffer_size,
Chris@16 722 unsigned int priority, const boost::posix_time::ptime &abs_time)
Chris@16 723 {
Chris@16 724 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
Chris@16 725 //Check if buffer is smaller than maximum allowed
Chris@16 726 if (buffer_size > p_hdr->m_max_msg_size) {
Chris@16 727 throw interprocess_exception(size_error);
Chris@16 728 }
Chris@16 729
Chris@101 730 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@101 731 bool notify_blocked_receivers = false;
Chris@101 732 #endif
Chris@16 733 //---------------------------------------------
Chris@16 734 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
Chris@16 735 //---------------------------------------------
Chris@16 736 {
Chris@16 737 //If the queue is full execute blocking logic
Chris@16 738 if (p_hdr->is_full()) {
Chris@101 739 BOOST_TRY{
Chris@101 740 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
Chris@101 741 ++p_hdr->m_blocked_senders;
Chris@101 742 #endif
Chris@101 743 switch(block){
Chris@101 744 case non_blocking :
Chris@101 745 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
Chris@101 746 --p_hdr->m_blocked_senders;
Chris@101 747 #endif
Chris@101 748 return false;
Chris@101 749 break;
Chris@16 750
Chris@101 751 case blocking :
Chris@101 752 do{
Chris@101 753 p_hdr->m_cond_send.wait(lock);
Chris@101 754 }
Chris@101 755 while (p_hdr->is_full());
Chris@101 756 break;
Chris@16 757
Chris@101 758 case timed :
Chris@101 759 do{
Chris@101 760 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
Chris@101 761 if(p_hdr->is_full()){
Chris@101 762 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
Chris@101 763 --p_hdr->m_blocked_senders;
Chris@101 764 #endif
Chris@101 765 return false;
Chris@101 766 }
Chris@101 767 break;
Chris@101 768 }
Chris@16 769 }
Chris@101 770 while (p_hdr->is_full());
Chris@101 771 break;
Chris@101 772 default:
Chris@101 773 break;
Chris@101 774 }
Chris@101 775 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
Chris@101 776 --p_hdr->m_blocked_senders;
Chris@101 777 #endif
Chris@16 778 }
Chris@101 779 BOOST_CATCH(...){
Chris@101 780 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
Chris@101 781 --p_hdr->m_blocked_senders;
Chris@101 782 #endif
Chris@101 783 BOOST_RETHROW;
Chris@101 784 }
Chris@101 785 BOOST_CATCH_END
Chris@16 786 }
Chris@16 787
Chris@101 788 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@101 789 notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
Chris@101 790 #endif
Chris@16 791 //Insert the first free message in the priority queue
Chris@16 792 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
Chris@16 793
Chris@16 794 //Sanity check, free msgs are always cleaned when received
Chris@16 795 BOOST_ASSERT(free_msg_hdr.priority == 0);
Chris@16 796 BOOST_ASSERT(free_msg_hdr.len == 0);
Chris@16 797
Chris@16 798 //Copy control data to the free message
Chris@16 799 free_msg_hdr.priority = priority;
Chris@16 800 free_msg_hdr.len = buffer_size;
Chris@16 801
Chris@16 802 //Copy user buffer to the message
Chris@16 803 std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
Chris@16 804 } // Lock end
Chris@16 805
Chris@16 806 //Notify outside lock to avoid contention. This might produce some
Chris@16 807 //spurious wakeups, but it's usually far better than notifying inside.
Chris@16 808 //If this message changes the queue empty state, notify it to receivers
Chris@101 809 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@101 810 if (notify_blocked_receivers){
Chris@16 811 p_hdr->m_cond_recv.notify_one();
Chris@16 812 }
Chris@101 813 #else
Chris@101 814 p_hdr->m_cond_recv.notify_one();
Chris@101 815 #endif
Chris@16 816
Chris@16 817 return true;
Chris@16 818 }
Chris@16 819
Chris@16 820 template<class VoidPointer>
Chris@16 821 inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size,
Chris@16 822 size_type &recvd_size, unsigned int &priority)
Chris@16 823 { this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
Chris@16 824
Chris@16 825 template<class VoidPointer>
Chris@16 826 inline bool
Chris@16 827 message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size,
Chris@16 828 size_type &recvd_size, unsigned int &priority)
Chris@16 829 { return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
Chris@16 830
Chris@16 831 template<class VoidPointer>
Chris@16 832 inline bool
Chris@16 833 message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size,
Chris@16 834 size_type &recvd_size, unsigned int &priority,
Chris@16 835 const boost::posix_time::ptime &abs_time)
Chris@16 836 {
Chris@16 837 if(abs_time == boost::posix_time::pos_infin){
Chris@16 838 this->receive(buffer, buffer_size, recvd_size, priority);
Chris@16 839 return true;
Chris@16 840 }
Chris@16 841 return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);
Chris@16 842 }
Chris@16 843
Chris@16 844 template<class VoidPointer>
Chris@16 845 inline bool
Chris@16 846 message_queue_t<VoidPointer>::do_receive(block_t block,
Chris@16 847 void *buffer, size_type buffer_size,
Chris@16 848 size_type &recvd_size, unsigned int &priority,
Chris@16 849 const boost::posix_time::ptime &abs_time)
Chris@16 850 {
Chris@16 851 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
Chris@16 852 //Check if buffer is big enough for any message
Chris@16 853 if (buffer_size < p_hdr->m_max_msg_size) {
Chris@16 854 throw interprocess_exception(size_error);
Chris@16 855 }
Chris@16 856
Chris@101 857 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@101 858 bool notify_blocked_senders = false;
Chris@101 859 #endif
Chris@16 860 //---------------------------------------------
Chris@16 861 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
Chris@16 862 //---------------------------------------------
Chris@16 863 {
Chris@16 864 //If there are no messages execute blocking logic
Chris@16 865 if (p_hdr->is_empty()) {
Chris@101 866 BOOST_TRY{
Chris@101 867 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@101 868 ++p_hdr->m_blocked_receivers;
Chris@101 869 #endif
Chris@101 870 switch(block){
Chris@101 871 case non_blocking :
Chris@101 872 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@101 873 --p_hdr->m_blocked_receivers;
Chris@101 874 #endif
Chris@101 875 return false;
Chris@101 876 break;
Chris@16 877
Chris@101 878 case blocking :
Chris@101 879 do{
Chris@101 880 p_hdr->m_cond_recv.wait(lock);
Chris@101 881 }
Chris@101 882 while (p_hdr->is_empty());
Chris@101 883 break;
Chris@16 884
Chris@101 885 case timed :
Chris@101 886 do{
Chris@101 887 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
Chris@101 888 if(p_hdr->is_empty()){
Chris@101 889 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@101 890 --p_hdr->m_blocked_receivers;
Chris@101 891 #endif
Chris@101 892 return false;
Chris@101 893 }
Chris@101 894 break;
Chris@101 895 }
Chris@16 896 }
Chris@101 897 while (p_hdr->is_empty());
Chris@101 898 break;
Chris@16 899
Chris@101 900 //Paranoia check
Chris@101 901 default:
Chris@101 902 break;
Chris@101 903 }
Chris@101 904 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@101 905 --p_hdr->m_blocked_receivers;
Chris@101 906 #endif
Chris@16 907 }
Chris@101 908 BOOST_CATCH(...){
Chris@101 909 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
Chris@101 910 --p_hdr->m_blocked_receivers;
Chris@101 911 #endif
Chris@101 912 BOOST_RETHROW;
Chris@101 913 }
Chris@101 914 BOOST_CATCH_END
Chris@16 915 }
Chris@16 916
Chris@101 917 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
Chris@101 918 notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
Chris@101 919 #endif
Chris@101 920
Chris@16 921 //There is at least one message ready to pick, get the top one
Chris@16 922 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
Chris@16 923
Chris@16 924 //Get data from the message
Chris@16 925 recvd_size = top_msg.len;
Chris@16 926 priority = top_msg.priority;
Chris@16 927
Chris@16 928 //Some cleanup to ease debugging
Chris@16 929 top_msg.len = 0;
Chris@16 930 top_msg.priority = 0;
Chris@16 931
Chris@16 932 //Copy data to receiver's bufers
Chris@16 933 std::memcpy(buffer, top_msg.data(), recvd_size);
Chris@16 934
Chris@16 935 //Free top message and put it in the free message list
Chris@16 936 p_hdr->free_top_msg();
Chris@16 937 } //Lock end
Chris@16 938
Chris@16 939 //Notify outside lock to avoid contention. This might produce some
Chris@16 940 //spurious wakeups, but it's usually far better than notifying inside.
Chris@16 941 //If this reception changes the queue full state, notify senders
Chris@101 942 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
Chris@101 943 if (notify_blocked_senders){
Chris@16 944 p_hdr->m_cond_send.notify_one();
Chris@16 945 }
Chris@101 946 #else
Chris@101 947 p_hdr->m_cond_send.notify_one();
Chris@101 948 #endif
Chris@16 949
Chris@16 950 return true;
Chris@16 951 }
Chris@16 952
Chris@16 953 template<class VoidPointer>
Chris@16 954 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
Chris@16 955 {
Chris@16 956 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
Chris@16 957 return p_hdr ? p_hdr->m_max_num_msg : 0; }
Chris@16 958
Chris@16 959 template<class VoidPointer>
Chris@16 960 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
Chris@16 961 {
Chris@16 962 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
Chris@16 963 return p_hdr ? p_hdr->m_max_msg_size : 0;
Chris@16 964 }
Chris@16 965
Chris@16 966 template<class VoidPointer>
Chris@16 967 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
Chris@16 968 {
Chris@16 969 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
Chris@16 970 if(p_hdr){
Chris@16 971 //---------------------------------------------
Chris@16 972 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
Chris@16 973 //---------------------------------------------
Chris@16 974 return p_hdr->m_cur_num_msg;
Chris@16 975 }
Chris@16 976
Chris@16 977 return 0;
Chris@16 978 }
Chris@16 979
Chris@16 980 template<class VoidPointer>
Chris@16 981 inline bool message_queue_t<VoidPointer>::remove(const char *name)
Chris@16 982 { return shared_memory_object::remove(name); }
Chris@16 983
Chris@101 984 #else
Chris@101 985
Chris@101 986 //!Typedef for a default message queue
Chris@101 987 //!to be used between processes
Chris@101 988 typedef message_queue_t<offset_ptr<void> > message_queue;
Chris@101 989
Chris@101 990 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
Chris@16 991
Chris@16 992 }} //namespace boost{ namespace interprocess{
Chris@16 993
Chris@16 994 #include <boost/interprocess/detail/config_end.hpp>
Chris@16 995
Chris@16 996 #endif //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP