comparison DEPENDENCIES/generic/include/boost/interprocess/ipc/message_queue.hpp @ 101:c530137014c0

Update Boost headers (1.58.0)
author Chris Cannam
date Mon, 07 Sep 2015 11:12:49 +0100
parents 2665513ce2d3
children
comparison
equal deleted inserted replaced
100:793467b5e61c 101:c530137014c0
8 // 8 //
9 ////////////////////////////////////////////////////////////////////////////// 9 //////////////////////////////////////////////////////////////////////////////
10 10
11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP 11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP 12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
13
14 #ifndef BOOST_CONFIG_HPP
15 # include <boost/config.hpp>
16 #endif
17 #
18 #if defined(BOOST_HAS_PRAGMA_ONCE)
19 # pragma once
20 #endif
13 21
14 #include <boost/interprocess/detail/config_begin.hpp> 22 #include <boost/interprocess/detail/config_begin.hpp>
15 #include <boost/interprocess/detail/workaround.hpp> 23 #include <boost/interprocess/detail/workaround.hpp>
16 24
17 #include <boost/interprocess/shared_memory_object.hpp> 25 #include <boost/interprocess/shared_memory_object.hpp>
22 #include <boost/interprocess/detail/utilities.hpp> 30 #include <boost/interprocess/detail/utilities.hpp>
23 #include <boost/interprocess/offset_ptr.hpp> 31 #include <boost/interprocess/offset_ptr.hpp>
24 #include <boost/interprocess/creation_tags.hpp> 32 #include <boost/interprocess/creation_tags.hpp>
25 #include <boost/interprocess/exceptions.hpp> 33 #include <boost/interprocess/exceptions.hpp>
26 #include <boost/interprocess/permissions.hpp> 34 #include <boost/interprocess/permissions.hpp>
27 #include <boost/detail/no_exceptions_support.hpp> 35 #include <boost/core/no_exceptions_support.hpp>
28 #include <boost/interprocess/detail/type_traits.hpp> 36 #include <boost/interprocess/detail/type_traits.hpp>
29 #include <boost/intrusive/pointer_traits.hpp> 37 #include <boost/intrusive/pointer_traits.hpp>
30 #include <boost/type_traits/make_unsigned.hpp> 38 #include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of
31 #include <boost/type_traits/alignment_of.hpp>
32 #include <boost/intrusive/pointer_traits.hpp> 39 #include <boost/intrusive/pointer_traits.hpp>
33 #include <boost/assert.hpp> 40 #include <boost/assert.hpp>
34 #include <algorithm> //std::lower_bound 41 #include <algorithm> //std::lower_bound
35 #include <cstddef> //std::size_t 42 #include <cstddef> //std::size_t
36 #include <cstring> //memcpy 43 #include <cstring> //memcpy
52 //!A class that allows sending messages 59 //!A class that allows sending messages
53 //!between processes. 60 //!between processes.
54 template<class VoidPointer> 61 template<class VoidPointer>
55 class message_queue_t 62 class message_queue_t
56 { 63 {
57 /// @cond 64 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
58 //Blocking modes 65 //Blocking modes
59 enum block_t { blocking, timed, non_blocking }; 66 enum block_t { blocking, timed, non_blocking };
60 67
61 message_queue_t(); 68 message_queue_t();
62 /// @endcond 69 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
63 70
64 public: 71 public:
65 typedef VoidPointer void_pointer; 72 typedef VoidPointer void_pointer;
66 typedef typename boost::intrusive:: 73 typedef typename boost::intrusive::
67 pointer_traits<void_pointer>::template 74 pointer_traits<void_pointer>::template
68 rebind_pointer<char>::type char_ptr; 75 rebind_pointer<char>::type char_ptr;
69 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; 76 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
70 typedef typename boost::make_unsigned<difference_type>::type size_type; 77 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type;
71 78
72 //!Creates a process shared message queue with name "name". For this message queue, 79 //!Creates a process shared message queue with name "name". For this message queue,
73 //!the maximum number of messages will be "max_num_msg" and the maximum message size 80 //!the maximum number of messages will be "max_num_msg" and the maximum message size
74 //!will be "max_msg_size". Throws on error and if the queue was previously created. 81 //!will be "max_msg_size". Throws on error and if the queue was previously created.
75 message_queue_t(create_only_t create_only, 82 message_queue_t(create_only_t create_only,
166 173
167 //!Removes the message queue from the system. 174 //!Removes the message queue from the system.
168 //!Returns false on error. Never throws 175 //!Returns false on error. Never throws
169 static bool remove(const char *name); 176 static bool remove(const char *name);
170 177
171 /// @cond 178 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
172 private: 179 private:
173 typedef boost::posix_time::ptime ptime; 180 typedef boost::posix_time::ptime ptime;
174 181
175 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>; 182 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
176 183
186 //!Returns the needed memory size for the shared message queue. 193 //!Returns the needed memory size for the shared message queue.
187 //!Never throws 194 //!Never throws
188 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); 195 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
189 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; 196 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
190 open_create_impl_t m_shmem; 197 open_create_impl_t m_shmem;
191 /// @endcond 198 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
192 }; 199 };
193 200
194 /// @cond 201 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
195 202
196 namespace ipcdetail { 203 namespace ipcdetail {
197 204
198 //!This header is the prefix of each message in the queue 205 //!This header is the prefix of each message in the queue
199 template<class VoidPointer> 206 template<class VoidPointer>
202 typedef VoidPointer void_pointer; 209 typedef VoidPointer void_pointer;
203 typedef typename boost::intrusive:: 210 typedef typename boost::intrusive::
204 pointer_traits<void_pointer>::template 211 pointer_traits<void_pointer>::template
205 rebind_pointer<char>::type char_ptr; 212 rebind_pointer<char>::type char_ptr;
206 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; 213 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
207 typedef typename boost::make_unsigned<difference_type>::type size_type; 214 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type;
208 215
209 public: 216 public:
210 size_type len; // Message length 217 size_type len; // Message length
211 unsigned int priority;// Message priority 218 unsigned int priority;// Message priority
212 //!Returns the data buffer associated with this this message 219 //!Returns the data buffer associated with this this message
248 //! point to free messages. Those "cur_num_msg" pointers are 255 //! point to free messages. Those "cur_num_msg" pointers are
249 //! ordered by the priority of the pointed message and by insertion order 256 //! ordered by the priority of the pointed message and by insertion order
250 //! if two messages have the same priority. So the next message to be 257 //! if two messages have the same priority. So the next message to be
251 //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg] 258 //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg]
252 //! and the first free message ready to be used in a "send" operation is 259 //! and the first free message ready to be used in a "send" operation is
253 //! [cur_first_msg] if circular buffer is extended from front, 260 //! [cur_first_msg] if circular buffer is extended from front,
254 //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise. 261 //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise.
255 //! 262 //!
256 //! This transforms the index in a circular buffer with an embedded free 263 //! This transforms the index in a circular buffer with an embedded free
257 //! message queue. 264 //! message queue.
258 //! 265 //!
287 typedef typename boost::intrusive:: 294 typedef typename boost::intrusive::
288 pointer_traits<void_pointer>::template 295 pointer_traits<void_pointer>::template
289 rebind_pointer<msg_header>::type msg_hdr_ptr_t; 296 rebind_pointer<msg_header>::type msg_hdr_ptr_t;
290 typedef typename boost::intrusive::pointer_traits 297 typedef typename boost::intrusive::pointer_traits
291 <msg_hdr_ptr_t>::difference_type difference_type; 298 <msg_hdr_ptr_t>::difference_type difference_type;
292 typedef typename boost::make_unsigned<difference_type>::type size_type; 299 typedef typename boost::container::
300 container_detail::make_unsigned<difference_type>::type size_type;
293 typedef typename boost::intrusive:: 301 typedef typename boost::intrusive::
294 pointer_traits<void_pointer>::template 302 pointer_traits<void_pointer>::template
295 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t; 303 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t;
296 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; 304 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
297 305
304 : m_max_num_msg(max_num_msg), 312 : m_max_num_msg(max_num_msg),
305 m_max_msg_size(max_msg_size), 313 m_max_msg_size(max_msg_size),
306 m_cur_num_msg(0) 314 m_cur_num_msg(0)
307 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) 315 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
308 ,m_cur_first_msg(0u) 316 ,m_cur_first_msg(0u)
317 ,m_blocked_senders(0u)
318 ,m_blocked_receivers(0u)
309 #endif 319 #endif
310 { this->initialize_memory(); } 320 { this->initialize_memory(); }
311 321
312 //!Returns true if the message queue is full 322 //!Returns true if the message queue is full
313 bool is_full() const 323 bool is_full() const
352 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func) 362 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
353 { 363 {
354 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end()); 364 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
355 if(end < begin){ 365 if(end < begin){
356 iterator idx_end = &mp_index[m_max_num_msg]; 366 iterator idx_end = &mp_index[m_max_num_msg];
357 iterator ret = std::lower_bound(begin, idx_end, value, func); 367 iterator ret = std::lower_bound(begin, idx_end, value, func);
358 if(idx_end == ret){ 368 if(idx_end == ret){
359 iterator idx_beg = &mp_index[0]; 369 iterator idx_beg = &mp_index[0];
360 ret = std::lower_bound(idx_beg, end, value, func); 370 ret = std::lower_bound(idx_beg, end, value, func);
361 //sanity check, these cases should not call lower_bound (optimized out) 371 //sanity check, these cases should not call lower_bound (optimized out)
362 BOOST_ASSERT(ret != end); 372 BOOST_ASSERT(ret != end);
374 384
375 msg_header & insert_at(iterator where) 385 msg_header & insert_at(iterator where)
376 { 386 {
377 iterator it_inserted_ptr_end = this->inserted_ptr_end(); 387 iterator it_inserted_ptr_end = this->inserted_ptr_end();
378 iterator it_inserted_ptr_beg = this->inserted_ptr_begin(); 388 iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
379 if(where == it_inserted_ptr_end){ 389 if(where == it_inserted_ptr_beg){
380 ++m_cur_num_msg;
381 return **it_inserted_ptr_end;
382 }
383 else if(where == it_inserted_ptr_beg){
384 //unsigned integer guarantees underflow 390 //unsigned integer guarantees underflow
385 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; 391 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
386 --m_cur_first_msg; 392 --m_cur_first_msg;
387 ++m_cur_num_msg; 393 ++m_cur_num_msg;
388 return *mp_index[m_cur_first_msg]; 394 return *mp_index[m_cur_first_msg];
395 }
396 else if(where == it_inserted_ptr_end){
397 ++m_cur_num_msg;
398 return **it_inserted_ptr_end;
389 } 399 }
390 else{ 400 else{
391 size_type pos = where - &mp_index[0]; 401 size_type pos = where - &mp_index[0];
392 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg); 402 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
393 //Check if it's more efficient to move back or move front 403 //Check if it's more efficient to move back or move front
450 return **where; 460 return **where;
451 } 461 }
452 } 462 }
453 } 463 }
454 464
455 #else 465 #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
456 466
457 typedef msg_hdr_ptr_t *iterator; 467 typedef msg_hdr_ptr_t *iterator;
458 468
459 //!Returns the inserted message with top priority 469 //!Returns the inserted message with top priority
460 msg_header &top_msg() 470 msg_header &top_msg()
480 *pos = backup; 490 *pos = backup;
481 ++m_cur_num_msg; 491 ++m_cur_num_msg;
482 return **pos; 492 return **pos;
483 } 493 }
484 494
485 #endif 495 #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
486 496
487 //!Inserts the first free message in the priority queue 497 //!Inserts the first free message in the priority queue
488 msg_header & queue_free_msg(unsigned int priority) 498 msg_header & queue_free_msg(unsigned int priority)
489 { 499 {
490 //Get priority queue's range 500 //Get priority queue's range
505 msg_hdr_ptr_t dummy_ptr(&dummy_hdr); 515 msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
506 516
507 //Check where the free message should be placed 517 //Check where the free message should be placed
508 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this)); 518 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
509 } 519 }
510
511 } 520 }
512 //Insert the free message in the correct position 521 //Insert the free message in the correct position
513 return this->insert_at(it); 522 return this->insert_at(it);
514 } 523 }
515 524
518 //!message size. Never throws. 527 //!message size. Never throws.
519 static size_type get_mem_size 528 static size_type get_mem_size
520 (size_type max_msg_size, size_type max_num_msg) 529 (size_type max_msg_size, size_type max_num_msg)
521 { 530 {
522 const size_type 531 const size_type
523 msg_hdr_align = ::boost::alignment_of<msg_header>::value, 532 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value,
524 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value, 533 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value,
525 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, 534 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
526 r_index_size = ipcdetail::get_rounded_size(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), 535 r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
527 r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header); 536 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
528 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + 537 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
529 open_create_impl_t::ManagedOpenOrCreateUserOffset; 538 open_create_impl_t::ManagedOpenOrCreateUserOffset;
530 } 539 }
531 540
532 //!Initializes the memory structures to preallocate messages and constructs the 541 //!Initializes the memory structures to preallocate messages and constructs the
533 //!message index. Never throws. 542 //!message index. Never throws.
534 void initialize_memory() 543 void initialize_memory()
535 { 544 {
536 const size_type 545 const size_type
537 msg_hdr_align = ::boost::alignment_of<msg_header>::value, 546 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value,
538 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value, 547 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value,
539 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, 548 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
540 r_index_size = ipcdetail::get_rounded_size(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), 549 r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
541 r_max_msg_size = ipcdetail::get_rounded_size(m_max_msg_size, msg_hdr_align) + sizeof(msg_header); 550 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
542 551
543 //Pointer to the index 552 //Pointer to the index
544 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*> 553 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*>
545 (reinterpret_cast<char*>(this)+r_hdr_size); 554 (reinterpret_cast<char*>(this)+r_hdr_size);
546 555
575 //Condition block senders when the queue is full 584 //Condition block senders when the queue is full
576 interprocess_condition m_cond_send; 585 interprocess_condition m_cond_send;
577 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) 586 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
578 //Current start offset in the circular index 587 //Current start offset in the circular index
579 size_type m_cur_first_msg; 588 size_type m_cur_first_msg;
589 size_type m_blocked_senders;
590 size_type m_blocked_receivers;
580 #endif 591 #endif
581 }; 592 };
582 593
583 594
584 //!This is the atomic functor to be executed when creating or opening 595 //!This is the atomic functor to be executed when creating or opening
587 class msg_queue_initialization_func_t 598 class msg_queue_initialization_func_t
588 { 599 {
589 public: 600 public:
590 typedef typename boost::intrusive:: 601 typedef typename boost::intrusive::
591 pointer_traits<VoidPointer>::template 602 pointer_traits<VoidPointer>::template
592 rebind_pointer<char>::type char_ptr; 603 rebind_pointer<char>::type char_ptr;
593 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; 604 typedef typename boost::intrusive::pointer_traits<char_ptr>::
594 typedef typename boost::make_unsigned<difference_type>::type size_type; 605 difference_type difference_type;
606 typedef typename boost::container::container_detail::
607 make_unsigned<difference_type>::type size_type;
595 608
596 msg_queue_initialization_func_t(size_type maxmsg = 0, 609 msg_queue_initialization_func_t(size_type maxmsg = 0,
597 size_type maxmsgsize = 0) 610 size_type maxmsgsize = 0)
598 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} 611 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
599 612
712 //Check if buffer is smaller than maximum allowed 725 //Check if buffer is smaller than maximum allowed
713 if (buffer_size > p_hdr->m_max_msg_size) { 726 if (buffer_size > p_hdr->m_max_msg_size) {
714 throw interprocess_exception(size_error); 727 throw interprocess_exception(size_error);
715 } 728 }
716 729
717 bool was_empty = false; 730 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
731 bool notify_blocked_receivers = false;
732 #endif
718 //--------------------------------------------- 733 //---------------------------------------------
719 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); 734 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
720 //--------------------------------------------- 735 //---------------------------------------------
721 { 736 {
722 //If the queue is full execute blocking logic 737 //If the queue is full execute blocking logic
723 if (p_hdr->is_full()) { 738 if (p_hdr->is_full()) {
724 switch(block){ 739 BOOST_TRY{
725 case non_blocking : 740 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
726 return false; 741 ++p_hdr->m_blocked_senders;
727 break; 742 #endif
728 743 switch(block){
729 case blocking : 744 case non_blocking :
730 do{ 745 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
731 p_hdr->m_cond_send.wait(lock); 746 --p_hdr->m_blocked_senders;
732 } 747 #endif
733 while (p_hdr->is_full()); 748 return false;
734 break; 749 break;
735 750
736 case timed : 751 case blocking :
737 do{ 752 do{
738 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){ 753 p_hdr->m_cond_send.wait(lock);
739 if(p_hdr->is_full())
740 return false;
741 break;
742 } 754 }
743 } 755 while (p_hdr->is_full());
744 while (p_hdr->is_full()); 756 break;
745 break; 757
746 default: 758 case timed :
747 break; 759 do{
748 } 760 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
761 if(p_hdr->is_full()){
762 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
763 --p_hdr->m_blocked_senders;
764 #endif
765 return false;
766 }
767 break;
768 }
769 }
770 while (p_hdr->is_full());
771 break;
772 default:
773 break;
774 }
775 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
776 --p_hdr->m_blocked_senders;
777 #endif
778 }
779 BOOST_CATCH(...){
780 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
781 --p_hdr->m_blocked_senders;
782 #endif
783 BOOST_RETHROW;
784 }
785 BOOST_CATCH_END
749 } 786 }
750 787
751 was_empty = p_hdr->is_empty(); 788 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
789 notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
790 #endif
752 //Insert the first free message in the priority queue 791 //Insert the first free message in the priority queue
753 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority); 792 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
754 793
755 //Sanity check, free msgs are always cleaned when received 794 //Sanity check, free msgs are always cleaned when received
756 BOOST_ASSERT(free_msg_hdr.priority == 0); 795 BOOST_ASSERT(free_msg_hdr.priority == 0);
765 } // Lock end 804 } // Lock end
766 805
767 //Notify outside lock to avoid contention. This might produce some 806 //Notify outside lock to avoid contention. This might produce some
768 //spurious wakeups, but it's usually far better than notifying inside. 807 //spurious wakeups, but it's usually far better than notifying inside.
769 //If this message changes the queue empty state, notify it to receivers 808 //If this message changes the queue empty state, notify it to receivers
770 if (was_empty){ 809 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
810 if (notify_blocked_receivers){
771 p_hdr->m_cond_recv.notify_one(); 811 p_hdr->m_cond_recv.notify_one();
772 } 812 }
813 #else
814 p_hdr->m_cond_recv.notify_one();
815 #endif
773 816
774 return true; 817 return true;
775 } 818 }
776 819
777 template<class VoidPointer> 820 template<class VoidPointer>
809 //Check if buffer is big enough for any message 852 //Check if buffer is big enough for any message
810 if (buffer_size < p_hdr->m_max_msg_size) { 853 if (buffer_size < p_hdr->m_max_msg_size) {
811 throw interprocess_exception(size_error); 854 throw interprocess_exception(size_error);
812 } 855 }
813 856
814 bool was_full = false; 857 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
858 bool notify_blocked_senders = false;
859 #endif
815 //--------------------------------------------- 860 //---------------------------------------------
816 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); 861 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
817 //--------------------------------------------- 862 //---------------------------------------------
818 { 863 {
819 //If there are no messages execute blocking logic 864 //If there are no messages execute blocking logic
820 if (p_hdr->is_empty()) { 865 if (p_hdr->is_empty()) {
821 switch(block){ 866 BOOST_TRY{
822 case non_blocking : 867 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
823 return false; 868 ++p_hdr->m_blocked_receivers;
824 break; 869 #endif
825 870 switch(block){
826 case blocking : 871 case non_blocking :
827 do{ 872 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
828 p_hdr->m_cond_recv.wait(lock); 873 --p_hdr->m_blocked_receivers;
829 } 874 #endif
830 while (p_hdr->is_empty()); 875 return false;
831 break; 876 break;
832 877
833 case timed : 878 case blocking :
834 do{ 879 do{
835 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){ 880 p_hdr->m_cond_recv.wait(lock);
836 if(p_hdr->is_empty())
837 return false;
838 break;
839 } 881 }
840 } 882 while (p_hdr->is_empty());
841 while (p_hdr->is_empty()); 883 break;
842 break; 884
843 885 case timed :
844 //Paranoia check 886 do{
845 default: 887 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
846 break; 888 if(p_hdr->is_empty()){
847 } 889 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
890 --p_hdr->m_blocked_receivers;
891 #endif
892 return false;
893 }
894 break;
895 }
896 }
897 while (p_hdr->is_empty());
898 break;
899
900 //Paranoia check
901 default:
902 break;
903 }
904 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
905 --p_hdr->m_blocked_receivers;
906 #endif
907 }
908 BOOST_CATCH(...){
909 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
910 --p_hdr->m_blocked_receivers;
911 #endif
912 BOOST_RETHROW;
913 }
914 BOOST_CATCH_END
848 } 915 }
916
917 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
918 notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
919 #endif
849 920
850 //There is at least one message ready to pick, get the top one 921 //There is at least one message ready to pick, get the top one
851 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg(); 922 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
852 923
853 //Get data from the message 924 //Get data from the message
859 top_msg.priority = 0; 930 top_msg.priority = 0;
860 931
861 //Copy data to receiver's bufers 932 //Copy data to receiver's bufers
862 std::memcpy(buffer, top_msg.data(), recvd_size); 933 std::memcpy(buffer, top_msg.data(), recvd_size);
863 934
864 was_full = p_hdr->is_full();
865
866 //Free top message and put it in the free message list 935 //Free top message and put it in the free message list
867 p_hdr->free_top_msg(); 936 p_hdr->free_top_msg();
868 } //Lock end 937 } //Lock end
869 938
870 //Notify outside lock to avoid contention. This might produce some 939 //Notify outside lock to avoid contention. This might produce some
871 //spurious wakeups, but it's usually far better than notifying inside. 940 //spurious wakeups, but it's usually far better than notifying inside.
872 //If this reception changes the queue full state, notify senders 941 //If this reception changes the queue full state, notify senders
873 if (was_full){ 942 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
943 if (notify_blocked_senders){
874 p_hdr->m_cond_send.notify_one(); 944 p_hdr->m_cond_send.notify_one();
875 } 945 }
946 #else
947 p_hdr->m_cond_send.notify_one();
948 #endif
876 949
877 return true; 950 return true;
878 } 951 }
879 952
880 template<class VoidPointer> 953 template<class VoidPointer>
906 979
907 template<class VoidPointer> 980 template<class VoidPointer>
908 inline bool message_queue_t<VoidPointer>::remove(const char *name) 981 inline bool message_queue_t<VoidPointer>::remove(const char *name)
909 { return shared_memory_object::remove(name); } 982 { return shared_memory_object::remove(name); }
910 983
911 /// @endcond 984 #else
985
986 //!Typedef for a default message queue
987 //!to be used between processes
988 typedef message_queue_t<offset_ptr<void> > message_queue;
989
990 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
912 991
913 }} //namespace boost{ namespace interprocess{ 992 }} //namespace boost{ namespace interprocess{
914 993
915 #include <boost/interprocess/detail/config_end.hpp> 994 #include <boost/interprocess/detail/config_end.hpp>
916 995