comparison DEPENDENCIES/generic/include/boost/interprocess/ipc/message_queue.hpp @ 16:2665513ce2d3

Add boost headers
author Chris Cannam
date Tue, 05 Aug 2014 11:11:38 +0100
parents
children c530137014c0
comparison
equal deleted inserted replaced
15:663ca0da4350 16:2665513ce2d3
1 //////////////////////////////////////////////////////////////////////////////
2 //
3 // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
4 // Software License, Version 1.0. (See accompanying file
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // See http://www.boost.org/libs/interprocess for documentation.
8 //
9 //////////////////////////////////////////////////////////////////////////////
10
11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
13
14 #include <boost/interprocess/detail/config_begin.hpp>
15 #include <boost/interprocess/detail/workaround.hpp>
16
17 #include <boost/interprocess/shared_memory_object.hpp>
18 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
19 #include <boost/interprocess/sync/interprocess_condition.hpp>
20 #include <boost/interprocess/sync/interprocess_mutex.hpp>
21 #include <boost/interprocess/sync/scoped_lock.hpp>
22 #include <boost/interprocess/detail/utilities.hpp>
23 #include <boost/interprocess/offset_ptr.hpp>
24 #include <boost/interprocess/creation_tags.hpp>
25 #include <boost/interprocess/exceptions.hpp>
26 #include <boost/interprocess/permissions.hpp>
27 #include <boost/detail/no_exceptions_support.hpp>
28 #include <boost/interprocess/detail/type_traits.hpp>
29 #include <boost/intrusive/pointer_traits.hpp>
30 #include <boost/type_traits/make_unsigned.hpp>
31 #include <boost/type_traits/alignment_of.hpp>
32 #include <boost/intrusive/pointer_traits.hpp>
33 #include <boost/assert.hpp>
34 #include <algorithm> //std::lower_bound
35 #include <cstddef> //std::size_t
36 #include <cstring> //memcpy
37
38
39 //!\file
40 //!Describes an inter-process message queue. This class allows sending
41 //!messages between processes and allows blocking, non-blocking and timed
42 //!sending and receiving.
43
44 namespace boost{ namespace interprocess{
45
46 namespace ipcdetail
47 {
48 template<class VoidPointer>
49 class msg_queue_initialization_func_t;
50 }
51
52 //!A class that allows sending messages
53 //!between processes.
54 template<class VoidPointer>
55 class message_queue_t
56 {
57 /// @cond
58 //Blocking modes
59 enum block_t { blocking, timed, non_blocking };
60
61 message_queue_t();
62 /// @endcond
63
64 public:
65 typedef VoidPointer void_pointer;
66 typedef typename boost::intrusive::
67 pointer_traits<void_pointer>::template
68 rebind_pointer<char>::type char_ptr;
69 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
70 typedef typename boost::make_unsigned<difference_type>::type size_type;
71
72 //!Creates a process shared message queue with name "name". For this message queue,
73 //!the maximum number of messages will be "max_num_msg" and the maximum message size
74 //!will be "max_msg_size". Throws on error and if the queue was previously created.
75 message_queue_t(create_only_t create_only,
76 const char *name,
77 size_type max_num_msg,
78 size_type max_msg_size,
79 const permissions &perm = permissions());
80
81 //!Opens or creates a process shared message queue with name "name".
82 //!If the queue is created, the maximum number of messages will be "max_num_msg"
83 //!and the maximum message size will be "max_msg_size". If queue was previously
84 //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
85 //!are ignored. Throws on error.
86 message_queue_t(open_or_create_t open_or_create,
87 const char *name,
88 size_type max_num_msg,
89 size_type max_msg_size,
90 const permissions &perm = permissions());
91
92 //!Opens a previously created process shared message queue with name "name".
93 //!If the queue was not previously created or there are no free resources,
94 //!throws an error.
95 message_queue_t(open_only_t open_only,
96 const char *name);
97
98 //!Destroys *this and indicates that the calling process is finished using
99 //!the resource. All opened message queues are still
100 //!valid after destruction. The destructor function will deallocate
101 //!any system resources allocated by the system for use by this process for
102 //!this resource. The resource can still be opened again calling
103 //!the open constructor overload. To erase the message queue from the system
104 //!use remove().
105 ~message_queue_t();
106
107 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
108 //!message queue with priority "priority". If the message queue is full
109 //!the sender is blocked. Throws interprocess_error on error.
110 void send (const void *buffer, size_type buffer_size,
111 unsigned int priority);
112
113 //!Sends a message stored in buffer "buffer" with size "buffer_size" through the
114 //!message queue with priority "priority". If the message queue is full
115 //!the sender is not blocked and returns false, otherwise returns true.
116 //!Throws interprocess_error on error.
117 bool try_send (const void *buffer, size_type buffer_size,
118 unsigned int priority);
119
120 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
121 //!message queue with priority "priority". If the message queue is full
122 //!the sender retries until time "abs_time" is reached. Returns true if
123 //!the message has been successfully sent. Returns false if timeout is reached.
124 //!Throws interprocess_error on error.
125 bool timed_send (const void *buffer, size_type buffer_size,
126 unsigned int priority, const boost::posix_time::ptime& abs_time);
127
128 //!Receives a message from the message queue. The message is stored in buffer
129 //!"buffer", which has size "buffer_size". The received message has size
130 //!"recvd_size" and priority "priority". If the message queue is empty
131 //!the receiver is blocked. Throws interprocess_error on error.
132 void receive (void *buffer, size_type buffer_size,
133 size_type &recvd_size,unsigned int &priority);
134
135 //!Receives a message from the message queue. The message is stored in buffer
136 //!"buffer", which has size "buffer_size". The received message has size
137 //!"recvd_size" and priority "priority". If the message queue is empty
138 //!the receiver is not blocked and returns false, otherwise returns true.
139 //!Throws interprocess_error on error.
140 bool try_receive (void *buffer, size_type buffer_size,
141 size_type &recvd_size,unsigned int &priority);
142
143 //!Receives a message from the message queue. The message is stored in buffer
144 //!"buffer", which has size "buffer_size". The received message has size
145 //!"recvd_size" and priority "priority". If the message queue is empty
146 //!the receiver retries until time "abs_time" is reached. Returns true if
147 //!the message has been successfully sent. Returns false if timeout is reached.
148 //!Throws interprocess_error on error.
149 bool timed_receive (void *buffer, size_type buffer_size,
150 size_type &recvd_size,unsigned int &priority,
151 const boost::posix_time::ptime &abs_time);
152
153 //!Returns the maximum number of messages allowed by the queue. The message
154 //!queue must be opened or created previously. Otherwise, returns 0.
155 //!Never throws
156 size_type get_max_msg() const;
157
158 //!Returns the maximum size of message allowed by the queue. The message
159 //!queue must be opened or created previously. Otherwise, returns 0.
160 //!Never throws
161 size_type get_max_msg_size() const;
162
163 //!Returns the number of messages currently stored.
164 //!Never throws
165 size_type get_num_msg() const;
166
167 //!Removes the message queue from the system.
168 //!Returns false on error. Never throws
169 static bool remove(const char *name);
170
171 /// @cond
172 private:
173 typedef boost::posix_time::ptime ptime;
174
175 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
176
177 bool do_receive(block_t block,
178 void *buffer, size_type buffer_size,
179 size_type &recvd_size, unsigned int &priority,
180 const ptime &abs_time);
181
182 bool do_send(block_t block,
183 const void *buffer, size_type buffer_size,
184 unsigned int priority, const ptime &abs_time);
185
186 //!Returns the needed memory size for the shared message queue.
187 //!Never throws
188 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
189 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
190 open_create_impl_t m_shmem;
191 /// @endcond
192 };
193
194 /// @cond
195
196 namespace ipcdetail {
197
198 //!This header is the prefix of each message in the queue
199 template<class VoidPointer>
200 class msg_hdr_t
201 {
202 typedef VoidPointer void_pointer;
203 typedef typename boost::intrusive::
204 pointer_traits<void_pointer>::template
205 rebind_pointer<char>::type char_ptr;
206 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
207 typedef typename boost::make_unsigned<difference_type>::type size_type;
208
209 public:
210 size_type len; // Message length
211 unsigned int priority;// Message priority
212 //!Returns the data buffer associated with this this message
213 void * data(){ return this+1; } //
214 };
215
216 //!This functor is the predicate to order stored messages by priority
217 template<class VoidPointer>
218 class priority_functor
219 {
220 typedef typename boost::intrusive::
221 pointer_traits<VoidPointer>::template
222 rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t;
223
224 public:
225 bool operator()(const msg_hdr_ptr_t &msg1,
226 const msg_hdr_ptr_t &msg2) const
227 { return msg1->priority < msg2->priority; }
228 };
229
230 //!This header is placed in the beginning of the shared memory and contains
231 //!the data to control the queue. This class initializes the shared memory
232 //!in the following way: in ascending memory address with proper alignment
233 //!fillings:
234 //!
235 //!-> mq_hdr_t:
236 //! Main control block that controls the rest of the elements
237 //!
238 //!-> offset_ptr<msg_hdr_t> index [max_num_msg]
239 //! An array of pointers with size "max_num_msg" called index. Each pointer
240 //! points to a preallocated message. Elements of this array are
241 //! reordered in runtime in the following way:
242 //!
243 //! IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined:
244 //!
245 //! When the current number of messages is "cur_num_msg", the array
246 //! is treated like a circular buffer. Starting from position "cur_first_msg"
247 //! "cur_num_msg" in a circular way, pointers point to inserted messages and the rest
248 //! point to free messages. Those "cur_num_msg" pointers are
249 //! ordered by the priority of the pointed message and by insertion order
250 //! if two messages have the same priority. So the next message to be
251 //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg]
252 //! and the first free message ready to be used in a "send" operation is
253 //! [cur_first_msg] if circular buffer is extended from front,
254 //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise.
255 //!
256 //! This transforms the index in a circular buffer with an embedded free
257 //! message queue.
258 //!
259 //! ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined):
260 //!
261 //! When the current number of messages is "cur_num_msg", the first
262 //! "cur_num_msg" pointers point to inserted messages and the rest
263 //! point to free messages. The first "cur_num_msg" pointers are
264 //! ordered by the priority of the pointed message and by insertion order
265 //! if two messages have the same priority. So the next message to be
266 //! used in a "receive" is pointed by index [cur_num_msg-1] and the first free
267 //! message ready to be used in a "send" operation is index [cur_num_msg].
268 //!
269 //! This transforms the index in a fixed size priority queue with an embedded free
270 //! message queue.
271 //!
272 //!-> struct message_t
273 //! {
274 //! msg_hdr_t header;
275 //! char[max_msg_size] data;
276 //! } messages [max_num_msg];
277 //!
278 //! An array of buffers of preallocated messages, each one prefixed with the
279 //! msg_hdr_t structure. Each of this message is pointed by one pointer of
280 //! the index structure.
281 template<class VoidPointer>
282 class mq_hdr_t
283 : public ipcdetail::priority_functor<VoidPointer>
284 {
285 typedef VoidPointer void_pointer;
286 typedef msg_hdr_t<void_pointer> msg_header;
287 typedef typename boost::intrusive::
288 pointer_traits<void_pointer>::template
289 rebind_pointer<msg_header>::type msg_hdr_ptr_t;
290 typedef typename boost::intrusive::pointer_traits
291 <msg_hdr_ptr_t>::difference_type difference_type;
292 typedef typename boost::make_unsigned<difference_type>::type size_type;
293 typedef typename boost::intrusive::
294 pointer_traits<void_pointer>::template
295 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t;
296 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
297
298 public:
299 //!Constructor. This object must be constructed in the beginning of the
300 //!shared memory of the size returned by the function "get_mem_size".
301 //!This constructor initializes the needed resources and creates
302 //!the internal structures like the priority index. This can throw.
303 mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
304 : m_max_num_msg(max_num_msg),
305 m_max_msg_size(max_msg_size),
306 m_cur_num_msg(0)
307 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
308 ,m_cur_first_msg(0u)
309 #endif
310 { this->initialize_memory(); }
311
312 //!Returns true if the message queue is full
313 bool is_full() const
314 { return m_cur_num_msg == m_max_num_msg; }
315
316 //!Returns true if the message queue is empty
317 bool is_empty() const
318 { return !m_cur_num_msg; }
319
320 //!Frees the top priority message and saves it in the free message list
321 void free_top_msg()
322 { --m_cur_num_msg; }
323
324 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
325
326 typedef msg_hdr_ptr_t *iterator;
327
328 size_type end_pos() const
329 {
330 const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
331 return space_until_bufend > m_cur_num_msg
332 ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
333 }
334
335 //!Returns the inserted message with top priority
336 msg_header &top_msg()
337 {
338 size_type pos = this->end_pos();
339 return *mp_index[pos ? --pos : m_max_num_msg - 1];
340 }
341
342 //!Returns the inserted message with bottom priority
343 msg_header &bottom_msg()
344 { return *mp_index[m_cur_first_msg]; }
345
346 iterator inserted_ptr_begin() const
347 { return &mp_index[m_cur_first_msg]; }
348
349 iterator inserted_ptr_end() const
350 { return &mp_index[this->end_pos()]; }
351
352 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
353 {
354 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
355 if(end < begin){
356 iterator idx_end = &mp_index[m_max_num_msg];
357 iterator ret = std::lower_bound(begin, idx_end, value, func);
358 if(idx_end == ret){
359 iterator idx_beg = &mp_index[0];
360 ret = std::lower_bound(idx_beg, end, value, func);
361 //sanity check, these cases should not call lower_bound (optimized out)
362 BOOST_ASSERT(ret != end);
363 BOOST_ASSERT(ret != begin);
364 return ret;
365 }
366 else{
367 return ret;
368 }
369 }
370 else{
371 return std::lower_bound(begin, end, value, func);
372 }
373 }
374
375 msg_header & insert_at(iterator where)
376 {
377 iterator it_inserted_ptr_end = this->inserted_ptr_end();
378 iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
379 if(where == it_inserted_ptr_end){
380 ++m_cur_num_msg;
381 return **it_inserted_ptr_end;
382 }
383 else if(where == it_inserted_ptr_beg){
384 //unsigned integer guarantees underflow
385 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
386 --m_cur_first_msg;
387 ++m_cur_num_msg;
388 return *mp_index[m_cur_first_msg];
389 }
390 else{
391 size_type pos = where - &mp_index[0];
392 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
393 //Check if it's more efficient to move back or move front
394 if(circ_pos < m_cur_num_msg/2){
395 //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos
396 //indicates two step insertion
397 if(!pos){
398 pos = m_max_num_msg;
399 where = &mp_index[m_max_num_msg-1];
400 }
401 else{
402 --where;
403 }
404 const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
405 const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u;
406 const size_type first_segment_end = pos;
407 const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
408 const size_type second_segment_end = m_max_num_msg;
409 const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1);
410
411 //First segment
412 if(!unique_segment){
413 std::copy( &mp_index[0] + second_segment_beg
414 , &mp_index[0] + second_segment_end
415 , &mp_index[0] + second_segment_beg - 1);
416 mp_index[m_max_num_msg-1] = mp_index[0];
417 }
418 std::copy( &mp_index[0] + first_segment_beg
419 , &mp_index[0] + first_segment_end
420 , &mp_index[0] + first_segment_beg - 1);
421 *where = backup;
422 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
423 --m_cur_first_msg;
424 ++m_cur_num_msg;
425 return **where;
426 }
427 else{
428 //The queue can't be full so end_pos < m_cur_first_msg
429 //indicates two step insertion
430 const size_type pos_end = this->end_pos();
431 const bool unique_segment = pos < pos_end;
432 const size_type first_segment_beg = pos;
433 const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1;
434 const size_type second_segment_beg = 0u;
435 const size_type second_segment_end = unique_segment ? 0u : pos_end;
436 const msg_hdr_ptr_t backup = *it_inserted_ptr_end;
437
438 //First segment
439 if(!unique_segment){
440 std::copy_backward( &mp_index[0] + second_segment_beg
441 , &mp_index[0] + second_segment_end
442 , &mp_index[0] + second_segment_end + 1);
443 mp_index[0] = mp_index[m_max_num_msg-1];
444 }
445 std::copy_backward( &mp_index[0] + first_segment_beg
446 , &mp_index[0] + first_segment_end
447 , &mp_index[0] + first_segment_end + 1);
448 *where = backup;
449 ++m_cur_num_msg;
450 return **where;
451 }
452 }
453 }
454
455 #else
456
457 typedef msg_hdr_ptr_t *iterator;
458
459 //!Returns the inserted message with top priority
460 msg_header &top_msg()
461 { return *mp_index[m_cur_num_msg-1]; }
462
463 //!Returns the inserted message with bottom priority
464 msg_header &bottom_msg()
465 { return *mp_index[0]; }
466
467 iterator inserted_ptr_begin() const
468 { return &mp_index[0]; }
469
470 iterator inserted_ptr_end() const
471 { return &mp_index[m_cur_num_msg]; }
472
473 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
474 { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); }
475
476 msg_header & insert_at(iterator pos)
477 {
478 const msg_hdr_ptr_t backup = *inserted_ptr_end();
479 std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
480 *pos = backup;
481 ++m_cur_num_msg;
482 return **pos;
483 }
484
485 #endif
486
487 //!Inserts the first free message in the priority queue
488 msg_header & queue_free_msg(unsigned int priority)
489 {
490 //Get priority queue's range
491 iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end());
492 //Optimize for non-priority usage
493 if(m_cur_num_msg && priority > this->bottom_msg().priority){
494 //Check for higher priority than all stored messages
495 if(priority > this->top_msg().priority){
496 it = it_end;
497 }
498 else{
499 //Since we don't now which free message we will pick
500 //build a dummy header for searches
501 msg_header dummy_hdr;
502 dummy_hdr.priority = priority;
503
504 //Get free msg
505 msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
506
507 //Check where the free message should be placed
508 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
509 }
510
511 }
512 //Insert the free message in the correct position
513 return this->insert_at(it);
514 }
515
516 //!Returns the number of bytes needed to construct a message queue with
517 //!"max_num_size" maximum number of messages and "max_msg_size" maximum
518 //!message size. Never throws.
519 static size_type get_mem_size
520 (size_type max_msg_size, size_type max_num_msg)
521 {
522 const size_type
523 msg_hdr_align = ::boost::alignment_of<msg_header>::value,
524 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value,
525 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
526 r_index_size = ipcdetail::get_rounded_size(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
527 r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header);
528 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
529 open_create_impl_t::ManagedOpenOrCreateUserOffset;
530 }
531
532 //!Initializes the memory structures to preallocate messages and constructs the
533 //!message index. Never throws.
534 void initialize_memory()
535 {
536 const size_type
537 msg_hdr_align = ::boost::alignment_of<msg_header>::value,
538 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value,
539 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
540 r_index_size = ipcdetail::get_rounded_size(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
541 r_max_msg_size = ipcdetail::get_rounded_size(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
542
543 //Pointer to the index
544 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*>
545 (reinterpret_cast<char*>(this)+r_hdr_size);
546
547 //Pointer to the first message header
548 msg_header *msg_hdr = reinterpret_cast<msg_header*>
549 (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
550
551 //Initialize the pointer to the index
552 mp_index = index;
553
554 //Initialize the index so each slot points to a preallocated message
555 for(size_type i = 0; i < m_max_num_msg; ++i){
556 index[i] = msg_hdr;
557 msg_hdr = reinterpret_cast<msg_header*>
558 (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
559 }
560 }
561
562 public:
563 //Pointer to the index
564 msg_hdr_ptr_ptr_t mp_index;
565 //Maximum number of messages of the queue
566 const size_type m_max_num_msg;
567 //Maximum size of messages of the queue
568 const size_type m_max_msg_size;
569 //Current number of messages
570 size_type m_cur_num_msg;
571 //Mutex to protect data structures
572 interprocess_mutex m_mutex;
573 //Condition block receivers when there are no messages
574 interprocess_condition m_cond_recv;
575 //Condition block senders when the queue is full
576 interprocess_condition m_cond_send;
577 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
578 //Current start offset in the circular index
579 size_type m_cur_first_msg;
580 #endif
581 };
582
583
584 //!This is the atomic functor to be executed when creating or opening
585 //!shared memory. Never throws
586 template<class VoidPointer>
587 class msg_queue_initialization_func_t
588 {
589 public:
590 typedef typename boost::intrusive::
591 pointer_traits<VoidPointer>::template
592 rebind_pointer<char>::type char_ptr;
593 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
594 typedef typename boost::make_unsigned<difference_type>::type size_type;
595
596 msg_queue_initialization_func_t(size_type maxmsg = 0,
597 size_type maxmsgsize = 0)
598 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
599
600 bool operator()(void *address, size_type, bool created)
601 {
602 char *mptr;
603
604 if(created){
605 mptr = reinterpret_cast<char*>(address);
606 //Construct the message queue header at the beginning
607 BOOST_TRY{
608 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
609 }
610 BOOST_CATCH(...){
611 return false;
612 }
613 BOOST_CATCH_END
614 }
615 return true;
616 }
617
618 std::size_t get_min_size() const
619 {
620 return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
621 - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
622 }
623
624 const size_type m_maxmsg;
625 const size_type m_maxmsgsize;
626 };
627
628 } //namespace ipcdetail {
629
630 template<class VoidPointer>
631 inline message_queue_t<VoidPointer>::~message_queue_t()
632 {}
633
634 template<class VoidPointer>
635 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
636 (size_type max_msg_size, size_type max_num_msg)
637 { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); }
638
639 template<class VoidPointer>
640 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
641 const char *name,
642 size_type max_num_msg,
643 size_type max_msg_size,
644 const permissions &perm)
645 //Create shared memory and execute functor atomically
646 : m_shmem(create_only,
647 name,
648 get_mem_size(max_msg_size, max_num_msg),
649 read_write,
650 static_cast<void*>(0),
651 //Prepare initialization functor
652 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
653 perm)
654 {}
655
656 template<class VoidPointer>
657 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
658 const char *name,
659 size_type max_num_msg,
660 size_type max_msg_size,
661 const permissions &perm)
662 //Create shared memory and execute functor atomically
663 : m_shmem(open_or_create,
664 name,
665 get_mem_size(max_msg_size, max_num_msg),
666 read_write,
667 static_cast<void*>(0),
668 //Prepare initialization functor
669 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
670 perm)
671 {}
672
673 template<class VoidPointer>
674 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
675 //Create shared memory and execute functor atomically
676 : m_shmem(open_only,
677 name,
678 read_write,
679 static_cast<void*>(0),
680 //Prepare initialization functor
681 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
682 {}
683
684 template<class VoidPointer>
685 inline void message_queue_t<VoidPointer>::send
686 (const void *buffer, size_type buffer_size, unsigned int priority)
687 { this->do_send(blocking, buffer, buffer_size, priority, ptime()); }
688
689 template<class VoidPointer>
690 inline bool message_queue_t<VoidPointer>::try_send
691 (const void *buffer, size_type buffer_size, unsigned int priority)
692 { return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }
693
694 template<class VoidPointer>
695 inline bool message_queue_t<VoidPointer>::timed_send
696 (const void *buffer, size_type buffer_size
697 ,unsigned int priority, const boost::posix_time::ptime &abs_time)
698 {
699 if(abs_time == boost::posix_time::pos_infin){
700 this->send(buffer, buffer_size, priority);
701 return true;
702 }
703 return this->do_send(timed, buffer, buffer_size, priority, abs_time);
704 }
705
706 template<class VoidPointer>
707 inline bool message_queue_t<VoidPointer>::do_send(block_t block,
708 const void *buffer, size_type buffer_size,
709 unsigned int priority, const boost::posix_time::ptime &abs_time)
710 {
711 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
712 //Check if buffer is smaller than maximum allowed
713 if (buffer_size > p_hdr->m_max_msg_size) {
714 throw interprocess_exception(size_error);
715 }
716
717 bool was_empty = false;
718 //---------------------------------------------
719 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
720 //---------------------------------------------
721 {
722 //If the queue is full execute blocking logic
723 if (p_hdr->is_full()) {
724 switch(block){
725 case non_blocking :
726 return false;
727 break;
728
729 case blocking :
730 do{
731 p_hdr->m_cond_send.wait(lock);
732 }
733 while (p_hdr->is_full());
734 break;
735
736 case timed :
737 do{
738 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
739 if(p_hdr->is_full())
740 return false;
741 break;
742 }
743 }
744 while (p_hdr->is_full());
745 break;
746 default:
747 break;
748 }
749 }
750
751 was_empty = p_hdr->is_empty();
752 //Insert the first free message in the priority queue
753 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
754
755 //Sanity check, free msgs are always cleaned when received
756 BOOST_ASSERT(free_msg_hdr.priority == 0);
757 BOOST_ASSERT(free_msg_hdr.len == 0);
758
759 //Copy control data to the free message
760 free_msg_hdr.priority = priority;
761 free_msg_hdr.len = buffer_size;
762
763 //Copy user buffer to the message
764 std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
765 } // Lock end
766
767 //Notify outside lock to avoid contention. This might produce some
768 //spurious wakeups, but it's usually far better than notifying inside.
769 //If this message changes the queue empty state, notify it to receivers
770 if (was_empty){
771 p_hdr->m_cond_recv.notify_one();
772 }
773
774 return true;
775 }
776
777 template<class VoidPointer>
778 inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size,
779 size_type &recvd_size, unsigned int &priority)
780 { this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
781
782 template<class VoidPointer>
783 inline bool
784 message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size,
785 size_type &recvd_size, unsigned int &priority)
786 { return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
787
788 template<class VoidPointer>
789 inline bool
790 message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size,
791 size_type &recvd_size, unsigned int &priority,
792 const boost::posix_time::ptime &abs_time)
793 {
794 if(abs_time == boost::posix_time::pos_infin){
795 this->receive(buffer, buffer_size, recvd_size, priority);
796 return true;
797 }
798 return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);
799 }
800
801 template<class VoidPointer>
802 inline bool
803 message_queue_t<VoidPointer>::do_receive(block_t block,
804 void *buffer, size_type buffer_size,
805 size_type &recvd_size, unsigned int &priority,
806 const boost::posix_time::ptime &abs_time)
807 {
808 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
809 //Check if buffer is big enough for any message
810 if (buffer_size < p_hdr->m_max_msg_size) {
811 throw interprocess_exception(size_error);
812 }
813
814 bool was_full = false;
815 //---------------------------------------------
816 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
817 //---------------------------------------------
818 {
819 //If there are no messages execute blocking logic
820 if (p_hdr->is_empty()) {
821 switch(block){
822 case non_blocking :
823 return false;
824 break;
825
826 case blocking :
827 do{
828 p_hdr->m_cond_recv.wait(lock);
829 }
830 while (p_hdr->is_empty());
831 break;
832
833 case timed :
834 do{
835 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
836 if(p_hdr->is_empty())
837 return false;
838 break;
839 }
840 }
841 while (p_hdr->is_empty());
842 break;
843
844 //Paranoia check
845 default:
846 break;
847 }
848 }
849
850 //There is at least one message ready to pick, get the top one
851 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
852
853 //Get data from the message
854 recvd_size = top_msg.len;
855 priority = top_msg.priority;
856
857 //Some cleanup to ease debugging
858 top_msg.len = 0;
859 top_msg.priority = 0;
860
861 //Copy data to receiver's bufers
862 std::memcpy(buffer, top_msg.data(), recvd_size);
863
864 was_full = p_hdr->is_full();
865
866 //Free top message and put it in the free message list
867 p_hdr->free_top_msg();
868 } //Lock end
869
870 //Notify outside lock to avoid contention. This might produce some
871 //spurious wakeups, but it's usually far better than notifying inside.
872 //If this reception changes the queue full state, notify senders
873 if (was_full){
874 p_hdr->m_cond_send.notify_one();
875 }
876
877 return true;
878 }
879
880 template<class VoidPointer>
881 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
882 {
883 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
884 return p_hdr ? p_hdr->m_max_num_msg : 0; }
885
886 template<class VoidPointer>
887 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
888 {
889 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
890 return p_hdr ? p_hdr->m_max_msg_size : 0;
891 }
892
893 template<class VoidPointer>
894 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
895 {
896 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
897 if(p_hdr){
898 //---------------------------------------------
899 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
900 //---------------------------------------------
901 return p_hdr->m_cur_num_msg;
902 }
903
904 return 0;
905 }
906
907 template<class VoidPointer>
908 inline bool message_queue_t<VoidPointer>::remove(const char *name)
909 { return shared_memory_object::remove(name); }
910
911 /// @endcond
912
913 }} //namespace boost{ namespace interprocess{
914
915 #include <boost/interprocess/detail/config_end.hpp>
916
917 #endif //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP