Mercurial > hg > vamp-build-and-test
comparison DEPENDENCIES/generic/include/boost/interprocess/ipc/message_queue.hpp @ 101:c530137014c0
Update Boost headers (1.58.0)
author | Chris Cannam |
---|---|
date | Mon, 07 Sep 2015 11:12:49 +0100 |
parents | 2665513ce2d3 |
children |
comparison
equal
deleted
inserted
replaced
100:793467b5e61c | 101:c530137014c0 |
---|---|
8 // | 8 // |
9 ////////////////////////////////////////////////////////////////////////////// | 9 ////////////////////////////////////////////////////////////////////////////// |
10 | 10 |
11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP | 11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP |
12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP | 12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP |
13 | |
14 #ifndef BOOST_CONFIG_HPP | |
15 # include <boost/config.hpp> | |
16 #endif | |
17 # | |
18 #if defined(BOOST_HAS_PRAGMA_ONCE) | |
19 # pragma once | |
20 #endif | |
13 | 21 |
14 #include <boost/interprocess/detail/config_begin.hpp> | 22 #include <boost/interprocess/detail/config_begin.hpp> |
15 #include <boost/interprocess/detail/workaround.hpp> | 23 #include <boost/interprocess/detail/workaround.hpp> |
16 | 24 |
17 #include <boost/interprocess/shared_memory_object.hpp> | 25 #include <boost/interprocess/shared_memory_object.hpp> |
22 #include <boost/interprocess/detail/utilities.hpp> | 30 #include <boost/interprocess/detail/utilities.hpp> |
23 #include <boost/interprocess/offset_ptr.hpp> | 31 #include <boost/interprocess/offset_ptr.hpp> |
24 #include <boost/interprocess/creation_tags.hpp> | 32 #include <boost/interprocess/creation_tags.hpp> |
25 #include <boost/interprocess/exceptions.hpp> | 33 #include <boost/interprocess/exceptions.hpp> |
26 #include <boost/interprocess/permissions.hpp> | 34 #include <boost/interprocess/permissions.hpp> |
27 #include <boost/detail/no_exceptions_support.hpp> | 35 #include <boost/core/no_exceptions_support.hpp> |
28 #include <boost/interprocess/detail/type_traits.hpp> | 36 #include <boost/interprocess/detail/type_traits.hpp> |
29 #include <boost/intrusive/pointer_traits.hpp> | 37 #include <boost/intrusive/pointer_traits.hpp> |
30 #include <boost/type_traits/make_unsigned.hpp> | 38 #include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of |
31 #include <boost/type_traits/alignment_of.hpp> | |
32 #include <boost/intrusive/pointer_traits.hpp> | 39 #include <boost/intrusive/pointer_traits.hpp> |
33 #include <boost/assert.hpp> | 40 #include <boost/assert.hpp> |
34 #include <algorithm> //std::lower_bound | 41 #include <algorithm> //std::lower_bound |
35 #include <cstddef> //std::size_t | 42 #include <cstddef> //std::size_t |
36 #include <cstring> //memcpy | 43 #include <cstring> //memcpy |
52 //!A class that allows sending messages | 59 //!A class that allows sending messages |
53 //!between processes. | 60 //!between processes. |
54 template<class VoidPointer> | 61 template<class VoidPointer> |
55 class message_queue_t | 62 class message_queue_t |
56 { | 63 { |
57 /// @cond | 64 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) |
58 //Blocking modes | 65 //Blocking modes |
59 enum block_t { blocking, timed, non_blocking }; | 66 enum block_t { blocking, timed, non_blocking }; |
60 | 67 |
61 message_queue_t(); | 68 message_queue_t(); |
62 /// @endcond | 69 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED |
63 | 70 |
64 public: | 71 public: |
65 typedef VoidPointer void_pointer; | 72 typedef VoidPointer void_pointer; |
66 typedef typename boost::intrusive:: | 73 typedef typename boost::intrusive:: |
67 pointer_traits<void_pointer>::template | 74 pointer_traits<void_pointer>::template |
68 rebind_pointer<char>::type char_ptr; | 75 rebind_pointer<char>::type char_ptr; |
69 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; | 76 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; |
70 typedef typename boost::make_unsigned<difference_type>::type size_type; | 77 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type; |
71 | 78 |
72 //!Creates a process shared message queue with name "name". For this message queue, | 79 //!Creates a process shared message queue with name "name". For this message queue, |
73 //!the maximum number of messages will be "max_num_msg" and the maximum message size | 80 //!the maximum number of messages will be "max_num_msg" and the maximum message size |
74 //!will be "max_msg_size". Throws on error and if the queue was previously created. | 81 //!will be "max_msg_size". Throws on error and if the queue was previously created. |
75 message_queue_t(create_only_t create_only, | 82 message_queue_t(create_only_t create_only, |
166 | 173 |
167 //!Removes the message queue from the system. | 174 //!Removes the message queue from the system. |
168 //!Returns false on error. Never throws | 175 //!Returns false on error. Never throws |
169 static bool remove(const char *name); | 176 static bool remove(const char *name); |
170 | 177 |
171 /// @cond | 178 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) |
172 private: | 179 private: |
173 typedef boost::posix_time::ptime ptime; | 180 typedef boost::posix_time::ptime ptime; |
174 | 181 |
175 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>; | 182 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>; |
176 | 183 |
186 //!Returns the needed memory size for the shared message queue. | 193 //!Returns the needed memory size for the shared message queue. |
187 //!Never throws | 194 //!Never throws |
188 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); | 195 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); |
189 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; | 196 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; |
190 open_create_impl_t m_shmem; | 197 open_create_impl_t m_shmem; |
191 /// @endcond | 198 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED |
192 }; | 199 }; |
193 | 200 |
194 /// @cond | 201 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED) |
195 | 202 |
196 namespace ipcdetail { | 203 namespace ipcdetail { |
197 | 204 |
198 //!This header is the prefix of each message in the queue | 205 //!This header is the prefix of each message in the queue |
199 template<class VoidPointer> | 206 template<class VoidPointer> |
202 typedef VoidPointer void_pointer; | 209 typedef VoidPointer void_pointer; |
203 typedef typename boost::intrusive:: | 210 typedef typename boost::intrusive:: |
204 pointer_traits<void_pointer>::template | 211 pointer_traits<void_pointer>::template |
205 rebind_pointer<char>::type char_ptr; | 212 rebind_pointer<char>::type char_ptr; |
206 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; | 213 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; |
207 typedef typename boost::make_unsigned<difference_type>::type size_type; | 214 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type; |
208 | 215 |
209 public: | 216 public: |
210 size_type len; // Message length | 217 size_type len; // Message length |
211 unsigned int priority;// Message priority | 218 unsigned int priority;// Message priority |
212 //!Returns the data buffer associated with this this message | 219 //!Returns the data buffer associated with this this message |
248 //! point to free messages. Those "cur_num_msg" pointers are | 255 //! point to free messages. Those "cur_num_msg" pointers are |
249 //! ordered by the priority of the pointed message and by insertion order | 256 //! ordered by the priority of the pointed message and by insertion order |
250 //! if two messages have the same priority. So the next message to be | 257 //! if two messages have the same priority. So the next message to be |
251 //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg] | 258 //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg] |
252 //! and the first free message ready to be used in a "send" operation is | 259 //! and the first free message ready to be used in a "send" operation is |
253 //! [cur_first_msg] if circular buffer is extended from front, | 260 //! [cur_first_msg] if circular buffer is extended from front, |
254 //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise. | 261 //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise. |
255 //! | 262 //! |
256 //! This transforms the index in a circular buffer with an embedded free | 263 //! This transforms the index in a circular buffer with an embedded free |
257 //! message queue. | 264 //! message queue. |
258 //! | 265 //! |
287 typedef typename boost::intrusive:: | 294 typedef typename boost::intrusive:: |
288 pointer_traits<void_pointer>::template | 295 pointer_traits<void_pointer>::template |
289 rebind_pointer<msg_header>::type msg_hdr_ptr_t; | 296 rebind_pointer<msg_header>::type msg_hdr_ptr_t; |
290 typedef typename boost::intrusive::pointer_traits | 297 typedef typename boost::intrusive::pointer_traits |
291 <msg_hdr_ptr_t>::difference_type difference_type; | 298 <msg_hdr_ptr_t>::difference_type difference_type; |
292 typedef typename boost::make_unsigned<difference_type>::type size_type; | 299 typedef typename boost::container:: |
300 container_detail::make_unsigned<difference_type>::type size_type; | |
293 typedef typename boost::intrusive:: | 301 typedef typename boost::intrusive:: |
294 pointer_traits<void_pointer>::template | 302 pointer_traits<void_pointer>::template |
295 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t; | 303 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t; |
296 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; | 304 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; |
297 | 305 |
304 : m_max_num_msg(max_num_msg), | 312 : m_max_num_msg(max_num_msg), |
305 m_max_msg_size(max_msg_size), | 313 m_max_msg_size(max_msg_size), |
306 m_cur_num_msg(0) | 314 m_cur_num_msg(0) |
307 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | 315 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) |
308 ,m_cur_first_msg(0u) | 316 ,m_cur_first_msg(0u) |
317 ,m_blocked_senders(0u) | |
318 ,m_blocked_receivers(0u) | |
309 #endif | 319 #endif |
310 { this->initialize_memory(); } | 320 { this->initialize_memory(); } |
311 | 321 |
312 //!Returns true if the message queue is full | 322 //!Returns true if the message queue is full |
313 bool is_full() const | 323 bool is_full() const |
352 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func) | 362 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func) |
353 { | 363 { |
354 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end()); | 364 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end()); |
355 if(end < begin){ | 365 if(end < begin){ |
356 iterator idx_end = &mp_index[m_max_num_msg]; | 366 iterator idx_end = &mp_index[m_max_num_msg]; |
357 iterator ret = std::lower_bound(begin, idx_end, value, func); | 367 iterator ret = std::lower_bound(begin, idx_end, value, func); |
358 if(idx_end == ret){ | 368 if(idx_end == ret){ |
359 iterator idx_beg = &mp_index[0]; | 369 iterator idx_beg = &mp_index[0]; |
360 ret = std::lower_bound(idx_beg, end, value, func); | 370 ret = std::lower_bound(idx_beg, end, value, func); |
361 //sanity check, these cases should not call lower_bound (optimized out) | 371 //sanity check, these cases should not call lower_bound (optimized out) |
362 BOOST_ASSERT(ret != end); | 372 BOOST_ASSERT(ret != end); |
374 | 384 |
375 msg_header & insert_at(iterator where) | 385 msg_header & insert_at(iterator where) |
376 { | 386 { |
377 iterator it_inserted_ptr_end = this->inserted_ptr_end(); | 387 iterator it_inserted_ptr_end = this->inserted_ptr_end(); |
378 iterator it_inserted_ptr_beg = this->inserted_ptr_begin(); | 388 iterator it_inserted_ptr_beg = this->inserted_ptr_begin(); |
379 if(where == it_inserted_ptr_end){ | 389 if(where == it_inserted_ptr_beg){ |
380 ++m_cur_num_msg; | |
381 return **it_inserted_ptr_end; | |
382 } | |
383 else if(where == it_inserted_ptr_beg){ | |
384 //unsigned integer guarantees underflow | 390 //unsigned integer guarantees underflow |
385 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; | 391 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; |
386 --m_cur_first_msg; | 392 --m_cur_first_msg; |
387 ++m_cur_num_msg; | 393 ++m_cur_num_msg; |
388 return *mp_index[m_cur_first_msg]; | 394 return *mp_index[m_cur_first_msg]; |
395 } | |
396 else if(where == it_inserted_ptr_end){ | |
397 ++m_cur_num_msg; | |
398 return **it_inserted_ptr_end; | |
389 } | 399 } |
390 else{ | 400 else{ |
391 size_type pos = where - &mp_index[0]; | 401 size_type pos = where - &mp_index[0]; |
392 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg); | 402 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg); |
393 //Check if it's more efficient to move back or move front | 403 //Check if it's more efficient to move back or move front |
450 return **where; | 460 return **where; |
451 } | 461 } |
452 } | 462 } |
453 } | 463 } |
454 | 464 |
455 #else | 465 #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX |
456 | 466 |
457 typedef msg_hdr_ptr_t *iterator; | 467 typedef msg_hdr_ptr_t *iterator; |
458 | 468 |
459 //!Returns the inserted message with top priority | 469 //!Returns the inserted message with top priority |
460 msg_header &top_msg() | 470 msg_header &top_msg() |
480 *pos = backup; | 490 *pos = backup; |
481 ++m_cur_num_msg; | 491 ++m_cur_num_msg; |
482 return **pos; | 492 return **pos; |
483 } | 493 } |
484 | 494 |
485 #endif | 495 #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX |
486 | 496 |
487 //!Inserts the first free message in the priority queue | 497 //!Inserts the first free message in the priority queue |
488 msg_header & queue_free_msg(unsigned int priority) | 498 msg_header & queue_free_msg(unsigned int priority) |
489 { | 499 { |
490 //Get priority queue's range | 500 //Get priority queue's range |
505 msg_hdr_ptr_t dummy_ptr(&dummy_hdr); | 515 msg_hdr_ptr_t dummy_ptr(&dummy_hdr); |
506 | 516 |
507 //Check where the free message should be placed | 517 //Check where the free message should be placed |
508 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this)); | 518 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this)); |
509 } | 519 } |
510 | |
511 } | 520 } |
512 //Insert the free message in the correct position | 521 //Insert the free message in the correct position |
513 return this->insert_at(it); | 522 return this->insert_at(it); |
514 } | 523 } |
515 | 524 |
518 //!message size. Never throws. | 527 //!message size. Never throws. |
519 static size_type get_mem_size | 528 static size_type get_mem_size |
520 (size_type max_msg_size, size_type max_num_msg) | 529 (size_type max_msg_size, size_type max_num_msg) |
521 { | 530 { |
522 const size_type | 531 const size_type |
523 msg_hdr_align = ::boost::alignment_of<msg_header>::value, | 532 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value, |
524 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value, | 533 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value, |
525 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, | 534 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, |
526 r_index_size = ipcdetail::get_rounded_size(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), | 535 r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), |
527 r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header); | 536 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header); |
528 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + | 537 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + |
529 open_create_impl_t::ManagedOpenOrCreateUserOffset; | 538 open_create_impl_t::ManagedOpenOrCreateUserOffset; |
530 } | 539 } |
531 | 540 |
532 //!Initializes the memory structures to preallocate messages and constructs the | 541 //!Initializes the memory structures to preallocate messages and constructs the |
533 //!message index. Never throws. | 542 //!message index. Never throws. |
534 void initialize_memory() | 543 void initialize_memory() |
535 { | 544 { |
536 const size_type | 545 const size_type |
537 msg_hdr_align = ::boost::alignment_of<msg_header>::value, | 546 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value, |
538 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value, | 547 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value, |
539 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, | 548 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, |
540 r_index_size = ipcdetail::get_rounded_size(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), | 549 r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), |
541 r_max_msg_size = ipcdetail::get_rounded_size(m_max_msg_size, msg_hdr_align) + sizeof(msg_header); | 550 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header); |
542 | 551 |
543 //Pointer to the index | 552 //Pointer to the index |
544 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*> | 553 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*> |
545 (reinterpret_cast<char*>(this)+r_hdr_size); | 554 (reinterpret_cast<char*>(this)+r_hdr_size); |
546 | 555 |
575 //Condition block senders when the queue is full | 584 //Condition block senders when the queue is full |
576 interprocess_condition m_cond_send; | 585 interprocess_condition m_cond_send; |
577 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | 586 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) |
578 //Current start offset in the circular index | 587 //Current start offset in the circular index |
579 size_type m_cur_first_msg; | 588 size_type m_cur_first_msg; |
589 size_type m_blocked_senders; | |
590 size_type m_blocked_receivers; | |
580 #endif | 591 #endif |
581 }; | 592 }; |
582 | 593 |
583 | 594 |
584 //!This is the atomic functor to be executed when creating or opening | 595 //!This is the atomic functor to be executed when creating or opening |
587 class msg_queue_initialization_func_t | 598 class msg_queue_initialization_func_t |
588 { | 599 { |
589 public: | 600 public: |
590 typedef typename boost::intrusive:: | 601 typedef typename boost::intrusive:: |
591 pointer_traits<VoidPointer>::template | 602 pointer_traits<VoidPointer>::template |
592 rebind_pointer<char>::type char_ptr; | 603 rebind_pointer<char>::type char_ptr; |
593 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; | 604 typedef typename boost::intrusive::pointer_traits<char_ptr>:: |
594 typedef typename boost::make_unsigned<difference_type>::type size_type; | 605 difference_type difference_type; |
606 typedef typename boost::container::container_detail:: | |
607 make_unsigned<difference_type>::type size_type; | |
595 | 608 |
596 msg_queue_initialization_func_t(size_type maxmsg = 0, | 609 msg_queue_initialization_func_t(size_type maxmsg = 0, |
597 size_type maxmsgsize = 0) | 610 size_type maxmsgsize = 0) |
598 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} | 611 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} |
599 | 612 |
712 //Check if buffer is smaller than maximum allowed | 725 //Check if buffer is smaller than maximum allowed |
713 if (buffer_size > p_hdr->m_max_msg_size) { | 726 if (buffer_size > p_hdr->m_max_msg_size) { |
714 throw interprocess_exception(size_error); | 727 throw interprocess_exception(size_error); |
715 } | 728 } |
716 | 729 |
717 bool was_empty = false; | 730 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) |
731 bool notify_blocked_receivers = false; | |
732 #endif | |
718 //--------------------------------------------- | 733 //--------------------------------------------- |
719 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); | 734 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); |
720 //--------------------------------------------- | 735 //--------------------------------------------- |
721 { | 736 { |
722 //If the queue is full execute blocking logic | 737 //If the queue is full execute blocking logic |
723 if (p_hdr->is_full()) { | 738 if (p_hdr->is_full()) { |
724 switch(block){ | 739 BOOST_TRY{ |
725 case non_blocking : | 740 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX |
726 return false; | 741 ++p_hdr->m_blocked_senders; |
727 break; | 742 #endif |
728 | 743 switch(block){ |
729 case blocking : | 744 case non_blocking : |
730 do{ | 745 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX |
731 p_hdr->m_cond_send.wait(lock); | 746 --p_hdr->m_blocked_senders; |
732 } | 747 #endif |
733 while (p_hdr->is_full()); | 748 return false; |
734 break; | 749 break; |
735 | 750 |
736 case timed : | 751 case blocking : |
737 do{ | 752 do{ |
738 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){ | 753 p_hdr->m_cond_send.wait(lock); |
739 if(p_hdr->is_full()) | |
740 return false; | |
741 break; | |
742 } | 754 } |
743 } | 755 while (p_hdr->is_full()); |
744 while (p_hdr->is_full()); | 756 break; |
745 break; | 757 |
746 default: | 758 case timed : |
747 break; | 759 do{ |
748 } | 760 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){ |
761 if(p_hdr->is_full()){ | |
762 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
763 --p_hdr->m_blocked_senders; | |
764 #endif | |
765 return false; | |
766 } | |
767 break; | |
768 } | |
769 } | |
770 while (p_hdr->is_full()); | |
771 break; | |
772 default: | |
773 break; | |
774 } | |
775 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
776 --p_hdr->m_blocked_senders; | |
777 #endif | |
778 } | |
779 BOOST_CATCH(...){ | |
780 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
781 --p_hdr->m_blocked_senders; | |
782 #endif | |
783 BOOST_RETHROW; | |
784 } | |
785 BOOST_CATCH_END | |
749 } | 786 } |
750 | 787 |
751 was_empty = p_hdr->is_empty(); | 788 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) |
789 notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers; | |
790 #endif | |
752 //Insert the first free message in the priority queue | 791 //Insert the first free message in the priority queue |
753 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority); | 792 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority); |
754 | 793 |
755 //Sanity check, free msgs are always cleaned when received | 794 //Sanity check, free msgs are always cleaned when received |
756 BOOST_ASSERT(free_msg_hdr.priority == 0); | 795 BOOST_ASSERT(free_msg_hdr.priority == 0); |
765 } // Lock end | 804 } // Lock end |
766 | 805 |
767 //Notify outside lock to avoid contention. This might produce some | 806 //Notify outside lock to avoid contention. This might produce some |
768 //spurious wakeups, but it's usually far better than notifying inside. | 807 //spurious wakeups, but it's usually far better than notifying inside. |
769 //If this message changes the queue empty state, notify it to receivers | 808 //If this message changes the queue empty state, notify it to receivers |
770 if (was_empty){ | 809 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) |
810 if (notify_blocked_receivers){ | |
771 p_hdr->m_cond_recv.notify_one(); | 811 p_hdr->m_cond_recv.notify_one(); |
772 } | 812 } |
813 #else | |
814 p_hdr->m_cond_recv.notify_one(); | |
815 #endif | |
773 | 816 |
774 return true; | 817 return true; |
775 } | 818 } |
776 | 819 |
777 template<class VoidPointer> | 820 template<class VoidPointer> |
809 //Check if buffer is big enough for any message | 852 //Check if buffer is big enough for any message |
810 if (buffer_size < p_hdr->m_max_msg_size) { | 853 if (buffer_size < p_hdr->m_max_msg_size) { |
811 throw interprocess_exception(size_error); | 854 throw interprocess_exception(size_error); |
812 } | 855 } |
813 | 856 |
814 bool was_full = false; | 857 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) |
858 bool notify_blocked_senders = false; | |
859 #endif | |
815 //--------------------------------------------- | 860 //--------------------------------------------- |
816 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); | 861 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); |
817 //--------------------------------------------- | 862 //--------------------------------------------- |
818 { | 863 { |
819 //If there are no messages execute blocking logic | 864 //If there are no messages execute blocking logic |
820 if (p_hdr->is_empty()) { | 865 if (p_hdr->is_empty()) { |
821 switch(block){ | 866 BOOST_TRY{ |
822 case non_blocking : | 867 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) |
823 return false; | 868 ++p_hdr->m_blocked_receivers; |
824 break; | 869 #endif |
825 | 870 switch(block){ |
826 case blocking : | 871 case non_blocking : |
827 do{ | 872 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) |
828 p_hdr->m_cond_recv.wait(lock); | 873 --p_hdr->m_blocked_receivers; |
829 } | 874 #endif |
830 while (p_hdr->is_empty()); | 875 return false; |
831 break; | 876 break; |
832 | 877 |
833 case timed : | 878 case blocking : |
834 do{ | 879 do{ |
835 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){ | 880 p_hdr->m_cond_recv.wait(lock); |
836 if(p_hdr->is_empty()) | |
837 return false; | |
838 break; | |
839 } | 881 } |
840 } | 882 while (p_hdr->is_empty()); |
841 while (p_hdr->is_empty()); | 883 break; |
842 break; | 884 |
843 | 885 case timed : |
844 //Paranoia check | 886 do{ |
845 default: | 887 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){ |
846 break; | 888 if(p_hdr->is_empty()){ |
847 } | 889 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) |
890 --p_hdr->m_blocked_receivers; | |
891 #endif | |
892 return false; | |
893 } | |
894 break; | |
895 } | |
896 } | |
897 while (p_hdr->is_empty()); | |
898 break; | |
899 | |
900 //Paranoia check | |
901 default: | |
902 break; | |
903 } | |
904 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
905 --p_hdr->m_blocked_receivers; | |
906 #endif | |
907 } | |
908 BOOST_CATCH(...){ | |
909 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
910 --p_hdr->m_blocked_receivers; | |
911 #endif | |
912 BOOST_RETHROW; | |
913 } | |
914 BOOST_CATCH_END | |
848 } | 915 } |
916 | |
917 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX | |
918 notify_blocked_senders = 0 != p_hdr->m_blocked_senders; | |
919 #endif | |
849 | 920 |
850 //There is at least one message ready to pick, get the top one | 921 //There is at least one message ready to pick, get the top one |
851 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg(); | 922 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg(); |
852 | 923 |
853 //Get data from the message | 924 //Get data from the message |
859 top_msg.priority = 0; | 930 top_msg.priority = 0; |
860 | 931 |
861 //Copy data to receiver's bufers | 932 //Copy data to receiver's bufers |
862 std::memcpy(buffer, top_msg.data(), recvd_size); | 933 std::memcpy(buffer, top_msg.data(), recvd_size); |
863 | 934 |
864 was_full = p_hdr->is_full(); | |
865 | |
866 //Free top message and put it in the free message list | 935 //Free top message and put it in the free message list |
867 p_hdr->free_top_msg(); | 936 p_hdr->free_top_msg(); |
868 } //Lock end | 937 } //Lock end |
869 | 938 |
870 //Notify outside lock to avoid contention. This might produce some | 939 //Notify outside lock to avoid contention. This might produce some |
871 //spurious wakeups, but it's usually far better than notifying inside. | 940 //spurious wakeups, but it's usually far better than notifying inside. |
872 //If this reception changes the queue full state, notify senders | 941 //If this reception changes the queue full state, notify senders |
873 if (was_full){ | 942 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX |
943 if (notify_blocked_senders){ | |
874 p_hdr->m_cond_send.notify_one(); | 944 p_hdr->m_cond_send.notify_one(); |
875 } | 945 } |
946 #else | |
947 p_hdr->m_cond_send.notify_one(); | |
948 #endif | |
876 | 949 |
877 return true; | 950 return true; |
878 } | 951 } |
879 | 952 |
880 template<class VoidPointer> | 953 template<class VoidPointer> |
906 | 979 |
907 template<class VoidPointer> | 980 template<class VoidPointer> |
908 inline bool message_queue_t<VoidPointer>::remove(const char *name) | 981 inline bool message_queue_t<VoidPointer>::remove(const char *name) |
909 { return shared_memory_object::remove(name); } | 982 { return shared_memory_object::remove(name); } |
910 | 983 |
911 /// @endcond | 984 #else |
985 | |
986 //!Typedef for a default message queue | |
987 //!to be used between processes | |
988 typedef message_queue_t<offset_ptr<void> > message_queue; | |
989 | |
990 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED | |
912 | 991 |
913 }} //namespace boost{ namespace interprocess{ | 992 }} //namespace boost{ namespace interprocess{ |
914 | 993 |
915 #include <boost/interprocess/detail/config_end.hpp> | 994 #include <boost/interprocess/detail/config_end.hpp> |
916 | 995 |