Mercurial > hg > vamp-build-and-test
comparison DEPENDENCIES/generic/include/boost/interprocess/ipc/message_queue.hpp @ 16:2665513ce2d3
Add boost headers
author | Chris Cannam |
---|---|
date | Tue, 05 Aug 2014 11:11:38 +0100 |
parents | |
children | c530137014c0 |
comparison
equal
deleted
inserted
replaced
15:663ca0da4350 | 16:2665513ce2d3 |
---|---|
1 ////////////////////////////////////////////////////////////////////////////// | |
2 // | |
3 // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost | |
4 // Software License, Version 1.0. (See accompanying file | |
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 // | |
7 // See http://www.boost.org/libs/interprocess for documentation. | |
8 // | |
9 ////////////////////////////////////////////////////////////////////////////// | |
10 | |
11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP | |
12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP | |
13 | |
14 #include <boost/interprocess/detail/config_begin.hpp> | |
15 #include <boost/interprocess/detail/workaround.hpp> | |
16 | |
17 #include <boost/interprocess/shared_memory_object.hpp> | |
18 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp> | |
19 #include <boost/interprocess/sync/interprocess_condition.hpp> | |
20 #include <boost/interprocess/sync/interprocess_mutex.hpp> | |
21 #include <boost/interprocess/sync/scoped_lock.hpp> | |
22 #include <boost/interprocess/detail/utilities.hpp> | |
23 #include <boost/interprocess/offset_ptr.hpp> | |
24 #include <boost/interprocess/creation_tags.hpp> | |
25 #include <boost/interprocess/exceptions.hpp> | |
26 #include <boost/interprocess/permissions.hpp> | |
27 #include <boost/detail/no_exceptions_support.hpp> | |
28 #include <boost/interprocess/detail/type_traits.hpp> | |
29 #include <boost/intrusive/pointer_traits.hpp> | |
30 #include <boost/type_traits/make_unsigned.hpp> | |
31 #include <boost/type_traits/alignment_of.hpp> | |
32 #include <boost/intrusive/pointer_traits.hpp> | |
33 #include <boost/assert.hpp> | |
34 #include <algorithm> //std::lower_bound | |
35 #include <cstddef> //std::size_t | |
36 #include <cstring> //memcpy | |
37 | |
38 | |
39 //!\file | |
40 //!Describes an inter-process message queue. This class allows sending | |
41 //!messages between processes and allows blocking, non-blocking and timed | |
42 //!sending and receiving. | |
43 | |
44 namespace boost{ namespace interprocess{ | |
45 | |
46 namespace ipcdetail | |
47 { | |
48 template<class VoidPointer> | |
49 class msg_queue_initialization_func_t; | |
50 } | |
51 | |
52 //!A class that allows sending messages | |
53 //!between processes. | |
54 template<class VoidPointer> | |
55 class message_queue_t | |
56 { | |
57 /// @cond | |
58 //Blocking modes | |
59 enum block_t { blocking, timed, non_blocking }; | |
60 | |
61 message_queue_t(); | |
62 /// @endcond | |
63 | |
64 public: | |
65 typedef VoidPointer void_pointer; | |
66 typedef typename boost::intrusive:: | |
67 pointer_traits<void_pointer>::template | |
68 rebind_pointer<char>::type char_ptr; | |
69 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; | |
70 typedef typename boost::make_unsigned<difference_type>::type size_type; | |
71 | |
72 //!Creates a process shared message queue with name "name". For this message queue, | |
73 //!the maximum number of messages will be "max_num_msg" and the maximum message size | |
74 //!will be "max_msg_size". Throws on error and if the queue was previously created. | |
75 message_queue_t(create_only_t create_only, | |
76 const char *name, | |
77 size_type max_num_msg, | |
78 size_type max_msg_size, | |
79 const permissions &perm = permissions()); | |
80 | |
81 //!Opens or creates a process shared message queue with name "name". | |
82 //!If the queue is created, the maximum number of messages will be "max_num_msg" | |
83 //!and the maximum message size will be "max_msg_size". If queue was previously | |
84 //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters | |
85 //!are ignored. Throws on error. | |
86 message_queue_t(open_or_create_t open_or_create, | |
87 const char *name, | |
88 size_type max_num_msg, | |
89 size_type max_msg_size, | |
90 const permissions &perm = permissions()); | |
91 | |
92 //!Opens a previously created process shared message queue with name "name". | |
93 //!If the queue was not previously created or there are no free resources, | |
94 //!throws an error. | |
95 message_queue_t(open_only_t open_only, | |
96 const char *name); | |
97 | |
98 //!Destroys *this and indicates that the calling process is finished using | |
99 //!the resource. All opened message queues are still | |
100 //!valid after destruction. The destructor function will deallocate | |
101 //!any system resources allocated by the system for use by this process for | |
102 //!this resource. The resource can still be opened again calling | |
103 //!the open constructor overload. To erase the message queue from the system | |
104 //!use remove(). | |
105 ~message_queue_t(); | |
106 | |
107 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the | |
108 //!message queue with priority "priority". If the message queue is full | |
109 //!the sender is blocked. Throws interprocess_error on error. | |
110 void send (const void *buffer, size_type buffer_size, | |
111 unsigned int priority); | |
112 | |
113 //!Sends a message stored in buffer "buffer" with size "buffer_size" through the | |
114 //!message queue with priority "priority". If the message queue is full | |
115 //!the sender is not blocked and returns false, otherwise returns true. | |
116 //!Throws interprocess_error on error. | |
117 bool try_send (const void *buffer, size_type buffer_size, | |
118 unsigned int priority); | |
119 | |
120 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the | |
121 //!message queue with priority "priority". If the message queue is full | |
122 //!the sender retries until time "abs_time" is reached. Returns true if | |
123 //!the message has been successfully sent. Returns false if timeout is reached. | |
124 //!Throws interprocess_error on error. | |
125 bool timed_send (const void *buffer, size_type buffer_size, | |
126 unsigned int priority, const boost::posix_time::ptime& abs_time); | |
127 | |
128 //!Receives a message from the message queue. The message is stored in buffer | |
129 //!"buffer", which has size "buffer_size". The received message has size | |
130 //!"recvd_size" and priority "priority". If the message queue is empty | |
131 //!the receiver is blocked. Throws interprocess_error on error. | |
132 void receive (void *buffer, size_type buffer_size, | |
133 size_type &recvd_size,unsigned int &priority); | |
134 | |
135 //!Receives a message from the message queue. The message is stored in buffer | |
136 //!"buffer", which has size "buffer_size". The received message has size | |
137 //!"recvd_size" and priority "priority". If the message queue is empty | |
138 //!the receiver is not blocked and returns false, otherwise returns true. | |
139 //!Throws interprocess_error on error. | |
140 bool try_receive (void *buffer, size_type buffer_size, | |
141 size_type &recvd_size,unsigned int &priority); | |
142 | |
143 //!Receives a message from the message queue. The message is stored in buffer | |
144 //!"buffer", which has size "buffer_size". The received message has size | |
145 //!"recvd_size" and priority "priority". If the message queue is empty | |
146 //!the receiver retries until time "abs_time" is reached. Returns true if | |
147 //!the message has been successfully sent. Returns false if timeout is reached. | |
148 //!Throws interprocess_error on error. | |
149 bool timed_receive (void *buffer, size_type buffer_size, | |
150 size_type &recvd_size,unsigned int &priority, | |
151 const boost::posix_time::ptime &abs_time); | |
152 | |
153 //!Returns the maximum number of messages allowed by the queue. The message | |
154 //!queue must be opened or created previously. Otherwise, returns 0. | |
155 //!Never throws | |
156 size_type get_max_msg() const; | |
157 | |
158 //!Returns the maximum size of message allowed by the queue. The message | |
159 //!queue must be opened or created previously. Otherwise, returns 0. | |
160 //!Never throws | |
161 size_type get_max_msg_size() const; | |
162 | |
163 //!Returns the number of messages currently stored. | |
164 //!Never throws | |
165 size_type get_num_msg() const; | |
166 | |
167 //!Removes the message queue from the system. | |
168 //!Returns false on error. Never throws | |
169 static bool remove(const char *name); | |
170 | |
171 /// @cond | |
172 private: | |
173 typedef boost::posix_time::ptime ptime; | |
174 | |
175 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>; | |
176 | |
177 bool do_receive(block_t block, | |
178 void *buffer, size_type buffer_size, | |
179 size_type &recvd_size, unsigned int &priority, | |
180 const ptime &abs_time); | |
181 | |
182 bool do_send(block_t block, | |
183 const void *buffer, size_type buffer_size, | |
184 unsigned int priority, const ptime &abs_time); | |
185 | |
186 //!Returns the needed memory size for the shared message queue. | |
187 //!Never throws | |
188 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); | |
189 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; | |
190 open_create_impl_t m_shmem; | |
191 /// @endcond | |
192 }; | |
193 | |
194 /// @cond | |
195 | |
196 namespace ipcdetail { | |
197 | |
198 //!This header is the prefix of each message in the queue | |
199 template<class VoidPointer> | |
200 class msg_hdr_t | |
201 { | |
202 typedef VoidPointer void_pointer; | |
203 typedef typename boost::intrusive:: | |
204 pointer_traits<void_pointer>::template | |
205 rebind_pointer<char>::type char_ptr; | |
206 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; | |
207 typedef typename boost::make_unsigned<difference_type>::type size_type; | |
208 | |
209 public: | |
210 size_type len; // Message length | |
211 unsigned int priority;// Message priority | |
212 //!Returns the data buffer associated with this this message | |
213 void * data(){ return this+1; } // | |
214 }; | |
215 | |
216 //!This functor is the predicate to order stored messages by priority | |
217 template<class VoidPointer> | |
218 class priority_functor | |
219 { | |
220 typedef typename boost::intrusive:: | |
221 pointer_traits<VoidPointer>::template | |
222 rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t; | |
223 | |
224 public: | |
225 bool operator()(const msg_hdr_ptr_t &msg1, | |
226 const msg_hdr_ptr_t &msg2) const | |
227 { return msg1->priority < msg2->priority; } | |
228 }; | |
229 | |
230 //!This header is placed in the beginning of the shared memory and contains | |
231 //!the data to control the queue. This class initializes the shared memory | |
232 //!in the following way: in ascending memory address with proper alignment | |
233 //!fillings: | |
234 //! | |
235 //!-> mq_hdr_t: | |
236 //! Main control block that controls the rest of the elements | |
237 //! | |
238 //!-> offset_ptr<msg_hdr_t> index [max_num_msg] | |
239 //! An array of pointers with size "max_num_msg" called index. Each pointer | |
240 //! points to a preallocated message. Elements of this array are | |
241 //! reordered in runtime in the following way: | |
242 //! | |
243 //! IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined: | |
244 //! | |
245 //! When the current number of messages is "cur_num_msg", the array | |
246 //! is treated like a circular buffer. Starting from position "cur_first_msg" | |
247 //! "cur_num_msg" in a circular way, pointers point to inserted messages and the rest | |
248 //! point to free messages. Those "cur_num_msg" pointers are | |
249 //! ordered by the priority of the pointed message and by insertion order | |
250 //! if two messages have the same priority. So the next message to be | |
251 //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg] | |
252 //! and the first free message ready to be used in a "send" operation is | |
253 //! [cur_first_msg] if circular buffer is extended from front, | |
254 //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise. | |
255 //! | |
256 //! This transforms the index in a circular buffer with an embedded free | |
257 //! message queue. | |
258 //! | |
259 //! ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined): | |
260 //! | |
261 //! When the current number of messages is "cur_num_msg", the first | |
262 //! "cur_num_msg" pointers point to inserted messages and the rest | |
263 //! point to free messages. The first "cur_num_msg" pointers are | |
264 //! ordered by the priority of the pointed message and by insertion order | |
265 //! if two messages have the same priority. So the next message to be | |
266 //! used in a "receive" is pointed by index [cur_num_msg-1] and the first free | |
267 //! message ready to be used in a "send" operation is index [cur_num_msg]. | |
268 //! | |
269 //! This transforms the index in a fixed size priority queue with an embedded free | |
270 //! message queue. | |
271 //! | |
272 //!-> struct message_t | |
273 //! { | |
274 //! msg_hdr_t header; | |
275 //! char[max_msg_size] data; | |
276 //! } messages [max_num_msg]; | |
277 //! | |
278 //! An array of buffers of preallocated messages, each one prefixed with the | |
279 //! msg_hdr_t structure. Each of this message is pointed by one pointer of | |
280 //! the index structure. | |
281 template<class VoidPointer> | |
282 class mq_hdr_t | |
283 : public ipcdetail::priority_functor<VoidPointer> | |
284 { | |
285 typedef VoidPointer void_pointer; | |
286 typedef msg_hdr_t<void_pointer> msg_header; | |
287 typedef typename boost::intrusive:: | |
288 pointer_traits<void_pointer>::template | |
289 rebind_pointer<msg_header>::type msg_hdr_ptr_t; | |
290 typedef typename boost::intrusive::pointer_traits | |
291 <msg_hdr_ptr_t>::difference_type difference_type; | |
292 typedef typename boost::make_unsigned<difference_type>::type size_type; | |
293 typedef typename boost::intrusive:: | |
294 pointer_traits<void_pointer>::template | |
295 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t; | |
296 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t; | |
297 | |
298 public: | |
299 //!Constructor. This object must be constructed in the beginning of the | |
300 //!shared memory of the size returned by the function "get_mem_size". | |
301 //!This constructor initializes the needed resources and creates | |
302 //!the internal structures like the priority index. This can throw. | |
303 mq_hdr_t(size_type max_num_msg, size_type max_msg_size) | |
304 : m_max_num_msg(max_num_msg), | |
305 m_max_msg_size(max_msg_size), | |
306 m_cur_num_msg(0) | |
307 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
308 ,m_cur_first_msg(0u) | |
309 #endif | |
310 { this->initialize_memory(); } | |
311 | |
312 //!Returns true if the message queue is full | |
313 bool is_full() const | |
314 { return m_cur_num_msg == m_max_num_msg; } | |
315 | |
316 //!Returns true if the message queue is empty | |
317 bool is_empty() const | |
318 { return !m_cur_num_msg; } | |
319 | |
320 //!Frees the top priority message and saves it in the free message list | |
321 void free_top_msg() | |
322 { --m_cur_num_msg; } | |
323 | |
324 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
325 | |
326 typedef msg_hdr_ptr_t *iterator; | |
327 | |
328 size_type end_pos() const | |
329 { | |
330 const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg; | |
331 return space_until_bufend > m_cur_num_msg | |
332 ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend; | |
333 } | |
334 | |
335 //!Returns the inserted message with top priority | |
336 msg_header &top_msg() | |
337 { | |
338 size_type pos = this->end_pos(); | |
339 return *mp_index[pos ? --pos : m_max_num_msg - 1]; | |
340 } | |
341 | |
342 //!Returns the inserted message with bottom priority | |
343 msg_header &bottom_msg() | |
344 { return *mp_index[m_cur_first_msg]; } | |
345 | |
346 iterator inserted_ptr_begin() const | |
347 { return &mp_index[m_cur_first_msg]; } | |
348 | |
349 iterator inserted_ptr_end() const | |
350 { return &mp_index[this->end_pos()]; } | |
351 | |
352 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func) | |
353 { | |
354 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end()); | |
355 if(end < begin){ | |
356 iterator idx_end = &mp_index[m_max_num_msg]; | |
357 iterator ret = std::lower_bound(begin, idx_end, value, func); | |
358 if(idx_end == ret){ | |
359 iterator idx_beg = &mp_index[0]; | |
360 ret = std::lower_bound(idx_beg, end, value, func); | |
361 //sanity check, these cases should not call lower_bound (optimized out) | |
362 BOOST_ASSERT(ret != end); | |
363 BOOST_ASSERT(ret != begin); | |
364 return ret; | |
365 } | |
366 else{ | |
367 return ret; | |
368 } | |
369 } | |
370 else{ | |
371 return std::lower_bound(begin, end, value, func); | |
372 } | |
373 } | |
374 | |
375 msg_header & insert_at(iterator where) | |
376 { | |
377 iterator it_inserted_ptr_end = this->inserted_ptr_end(); | |
378 iterator it_inserted_ptr_beg = this->inserted_ptr_begin(); | |
379 if(where == it_inserted_ptr_end){ | |
380 ++m_cur_num_msg; | |
381 return **it_inserted_ptr_end; | |
382 } | |
383 else if(where == it_inserted_ptr_beg){ | |
384 //unsigned integer guarantees underflow | |
385 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; | |
386 --m_cur_first_msg; | |
387 ++m_cur_num_msg; | |
388 return *mp_index[m_cur_first_msg]; | |
389 } | |
390 else{ | |
391 size_type pos = where - &mp_index[0]; | |
392 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg); | |
393 //Check if it's more efficient to move back or move front | |
394 if(circ_pos < m_cur_num_msg/2){ | |
395 //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos | |
396 //indicates two step insertion | |
397 if(!pos){ | |
398 pos = m_max_num_msg; | |
399 where = &mp_index[m_max_num_msg-1]; | |
400 } | |
401 else{ | |
402 --where; | |
403 } | |
404 const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos; | |
405 const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u; | |
406 const size_type first_segment_end = pos; | |
407 const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg; | |
408 const size_type second_segment_end = m_max_num_msg; | |
409 const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1); | |
410 | |
411 //First segment | |
412 if(!unique_segment){ | |
413 std::copy( &mp_index[0] + second_segment_beg | |
414 , &mp_index[0] + second_segment_end | |
415 , &mp_index[0] + second_segment_beg - 1); | |
416 mp_index[m_max_num_msg-1] = mp_index[0]; | |
417 } | |
418 std::copy( &mp_index[0] + first_segment_beg | |
419 , &mp_index[0] + first_segment_end | |
420 , &mp_index[0] + first_segment_beg - 1); | |
421 *where = backup; | |
422 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; | |
423 --m_cur_first_msg; | |
424 ++m_cur_num_msg; | |
425 return **where; | |
426 } | |
427 else{ | |
428 //The queue can't be full so end_pos < m_cur_first_msg | |
429 //indicates two step insertion | |
430 const size_type pos_end = this->end_pos(); | |
431 const bool unique_segment = pos < pos_end; | |
432 const size_type first_segment_beg = pos; | |
433 const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1; | |
434 const size_type second_segment_beg = 0u; | |
435 const size_type second_segment_end = unique_segment ? 0u : pos_end; | |
436 const msg_hdr_ptr_t backup = *it_inserted_ptr_end; | |
437 | |
438 //First segment | |
439 if(!unique_segment){ | |
440 std::copy_backward( &mp_index[0] + second_segment_beg | |
441 , &mp_index[0] + second_segment_end | |
442 , &mp_index[0] + second_segment_end + 1); | |
443 mp_index[0] = mp_index[m_max_num_msg-1]; | |
444 } | |
445 std::copy_backward( &mp_index[0] + first_segment_beg | |
446 , &mp_index[0] + first_segment_end | |
447 , &mp_index[0] + first_segment_end + 1); | |
448 *where = backup; | |
449 ++m_cur_num_msg; | |
450 return **where; | |
451 } | |
452 } | |
453 } | |
454 | |
455 #else | |
456 | |
457 typedef msg_hdr_ptr_t *iterator; | |
458 | |
459 //!Returns the inserted message with top priority | |
460 msg_header &top_msg() | |
461 { return *mp_index[m_cur_num_msg-1]; } | |
462 | |
463 //!Returns the inserted message with bottom priority | |
464 msg_header &bottom_msg() | |
465 { return *mp_index[0]; } | |
466 | |
467 iterator inserted_ptr_begin() const | |
468 { return &mp_index[0]; } | |
469 | |
470 iterator inserted_ptr_end() const | |
471 { return &mp_index[m_cur_num_msg]; } | |
472 | |
473 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func) | |
474 { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); } | |
475 | |
476 msg_header & insert_at(iterator pos) | |
477 { | |
478 const msg_hdr_ptr_t backup = *inserted_ptr_end(); | |
479 std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1); | |
480 *pos = backup; | |
481 ++m_cur_num_msg; | |
482 return **pos; | |
483 } | |
484 | |
485 #endif | |
486 | |
487 //!Inserts the first free message in the priority queue | |
488 msg_header & queue_free_msg(unsigned int priority) | |
489 { | |
490 //Get priority queue's range | |
491 iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end()); | |
492 //Optimize for non-priority usage | |
493 if(m_cur_num_msg && priority > this->bottom_msg().priority){ | |
494 //Check for higher priority than all stored messages | |
495 if(priority > this->top_msg().priority){ | |
496 it = it_end; | |
497 } | |
498 else{ | |
499 //Since we don't now which free message we will pick | |
500 //build a dummy header for searches | |
501 msg_header dummy_hdr; | |
502 dummy_hdr.priority = priority; | |
503 | |
504 //Get free msg | |
505 msg_hdr_ptr_t dummy_ptr(&dummy_hdr); | |
506 | |
507 //Check where the free message should be placed | |
508 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this)); | |
509 } | |
510 | |
511 } | |
512 //Insert the free message in the correct position | |
513 return this->insert_at(it); | |
514 } | |
515 | |
516 //!Returns the number of bytes needed to construct a message queue with | |
517 //!"max_num_size" maximum number of messages and "max_msg_size" maximum | |
518 //!message size. Never throws. | |
519 static size_type get_mem_size | |
520 (size_type max_msg_size, size_type max_num_msg) | |
521 { | |
522 const size_type | |
523 msg_hdr_align = ::boost::alignment_of<msg_header>::value, | |
524 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value, | |
525 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, | |
526 r_index_size = ipcdetail::get_rounded_size(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), | |
527 r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header); | |
528 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + | |
529 open_create_impl_t::ManagedOpenOrCreateUserOffset; | |
530 } | |
531 | |
532 //!Initializes the memory structures to preallocate messages and constructs the | |
533 //!message index. Never throws. | |
534 void initialize_memory() | |
535 { | |
536 const size_type | |
537 msg_hdr_align = ::boost::alignment_of<msg_header>::value, | |
538 index_align = ::boost::alignment_of<msg_hdr_ptr_t>::value, | |
539 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value, | |
540 r_index_size = ipcdetail::get_rounded_size(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), | |
541 r_max_msg_size = ipcdetail::get_rounded_size(m_max_msg_size, msg_hdr_align) + sizeof(msg_header); | |
542 | |
543 //Pointer to the index | |
544 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*> | |
545 (reinterpret_cast<char*>(this)+r_hdr_size); | |
546 | |
547 //Pointer to the first message header | |
548 msg_header *msg_hdr = reinterpret_cast<msg_header*> | |
549 (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size); | |
550 | |
551 //Initialize the pointer to the index | |
552 mp_index = index; | |
553 | |
554 //Initialize the index so each slot points to a preallocated message | |
555 for(size_type i = 0; i < m_max_num_msg; ++i){ | |
556 index[i] = msg_hdr; | |
557 msg_hdr = reinterpret_cast<msg_header*> | |
558 (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size); | |
559 } | |
560 } | |
561 | |
562 public: | |
563 //Pointer to the index | |
564 msg_hdr_ptr_ptr_t mp_index; | |
565 //Maximum number of messages of the queue | |
566 const size_type m_max_num_msg; | |
567 //Maximum size of messages of the queue | |
568 const size_type m_max_msg_size; | |
569 //Current number of messages | |
570 size_type m_cur_num_msg; | |
571 //Mutex to protect data structures | |
572 interprocess_mutex m_mutex; | |
573 //Condition block receivers when there are no messages | |
574 interprocess_condition m_cond_recv; | |
575 //Condition block senders when the queue is full | |
576 interprocess_condition m_cond_send; | |
577 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) | |
578 //Current start offset in the circular index | |
579 size_type m_cur_first_msg; | |
580 #endif | |
581 }; | |
582 | |
583 | |
584 //!This is the atomic functor to be executed when creating or opening | |
585 //!shared memory. Never throws | |
586 template<class VoidPointer> | |
587 class msg_queue_initialization_func_t | |
588 { | |
589 public: | |
590 typedef typename boost::intrusive:: | |
591 pointer_traits<VoidPointer>::template | |
592 rebind_pointer<char>::type char_ptr; | |
593 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type; | |
594 typedef typename boost::make_unsigned<difference_type>::type size_type; | |
595 | |
596 msg_queue_initialization_func_t(size_type maxmsg = 0, | |
597 size_type maxmsgsize = 0) | |
598 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} | |
599 | |
600 bool operator()(void *address, size_type, bool created) | |
601 { | |
602 char *mptr; | |
603 | |
604 if(created){ | |
605 mptr = reinterpret_cast<char*>(address); | |
606 //Construct the message queue header at the beginning | |
607 BOOST_TRY{ | |
608 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize); | |
609 } | |
610 BOOST_CATCH(...){ | |
611 return false; | |
612 } | |
613 BOOST_CATCH_END | |
614 } | |
615 return true; | |
616 } | |
617 | |
618 std::size_t get_min_size() const | |
619 { | |
620 return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg) | |
621 - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset; | |
622 } | |
623 | |
624 const size_type m_maxmsg; | |
625 const size_type m_maxmsgsize; | |
626 }; | |
627 | |
628 } //namespace ipcdetail { | |
629 | |
630 template<class VoidPointer> | |
631 inline message_queue_t<VoidPointer>::~message_queue_t() | |
632 {} | |
633 | |
634 template<class VoidPointer> | |
635 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size | |
636 (size_type max_msg_size, size_type max_num_msg) | |
637 { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); } | |
638 | |
639 template<class VoidPointer> | |
640 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t, | |
641 const char *name, | |
642 size_type max_num_msg, | |
643 size_type max_msg_size, | |
644 const permissions &perm) | |
645 //Create shared memory and execute functor atomically | |
646 : m_shmem(create_only, | |
647 name, | |
648 get_mem_size(max_msg_size, max_num_msg), | |
649 read_write, | |
650 static_cast<void*>(0), | |
651 //Prepare initialization functor | |
652 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size), | |
653 perm) | |
654 {} | |
655 | |
656 template<class VoidPointer> | |
657 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t, | |
658 const char *name, | |
659 size_type max_num_msg, | |
660 size_type max_msg_size, | |
661 const permissions &perm) | |
662 //Create shared memory and execute functor atomically | |
663 : m_shmem(open_or_create, | |
664 name, | |
665 get_mem_size(max_msg_size, max_num_msg), | |
666 read_write, | |
667 static_cast<void*>(0), | |
668 //Prepare initialization functor | |
669 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size), | |
670 perm) | |
671 {} | |
672 | |
673 template<class VoidPointer> | |
674 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name) | |
675 //Create shared memory and execute functor atomically | |
676 : m_shmem(open_only, | |
677 name, | |
678 read_write, | |
679 static_cast<void*>(0), | |
680 //Prepare initialization functor | |
681 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ()) | |
682 {} | |
683 | |
684 template<class VoidPointer> | |
685 inline void message_queue_t<VoidPointer>::send | |
686 (const void *buffer, size_type buffer_size, unsigned int priority) | |
687 { this->do_send(blocking, buffer, buffer_size, priority, ptime()); } | |
688 | |
689 template<class VoidPointer> | |
690 inline bool message_queue_t<VoidPointer>::try_send | |
691 (const void *buffer, size_type buffer_size, unsigned int priority) | |
692 { return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); } | |
693 | |
694 template<class VoidPointer> | |
695 inline bool message_queue_t<VoidPointer>::timed_send | |
696 (const void *buffer, size_type buffer_size | |
697 ,unsigned int priority, const boost::posix_time::ptime &abs_time) | |
698 { | |
699 if(abs_time == boost::posix_time::pos_infin){ | |
700 this->send(buffer, buffer_size, priority); | |
701 return true; | |
702 } | |
703 return this->do_send(timed, buffer, buffer_size, priority, abs_time); | |
704 } | |
705 | |
706 template<class VoidPointer> | |
707 inline bool message_queue_t<VoidPointer>::do_send(block_t block, | |
708 const void *buffer, size_type buffer_size, | |
709 unsigned int priority, const boost::posix_time::ptime &abs_time) | |
710 { | |
711 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
712 //Check if buffer is smaller than maximum allowed | |
713 if (buffer_size > p_hdr->m_max_msg_size) { | |
714 throw interprocess_exception(size_error); | |
715 } | |
716 | |
717 bool was_empty = false; | |
718 //--------------------------------------------- | |
719 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); | |
720 //--------------------------------------------- | |
721 { | |
722 //If the queue is full execute blocking logic | |
723 if (p_hdr->is_full()) { | |
724 switch(block){ | |
725 case non_blocking : | |
726 return false; | |
727 break; | |
728 | |
729 case blocking : | |
730 do{ | |
731 p_hdr->m_cond_send.wait(lock); | |
732 } | |
733 while (p_hdr->is_full()); | |
734 break; | |
735 | |
736 case timed : | |
737 do{ | |
738 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){ | |
739 if(p_hdr->is_full()) | |
740 return false; | |
741 break; | |
742 } | |
743 } | |
744 while (p_hdr->is_full()); | |
745 break; | |
746 default: | |
747 break; | |
748 } | |
749 } | |
750 | |
751 was_empty = p_hdr->is_empty(); | |
752 //Insert the first free message in the priority queue | |
753 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority); | |
754 | |
755 //Sanity check, free msgs are always cleaned when received | |
756 BOOST_ASSERT(free_msg_hdr.priority == 0); | |
757 BOOST_ASSERT(free_msg_hdr.len == 0); | |
758 | |
759 //Copy control data to the free message | |
760 free_msg_hdr.priority = priority; | |
761 free_msg_hdr.len = buffer_size; | |
762 | |
763 //Copy user buffer to the message | |
764 std::memcpy(free_msg_hdr.data(), buffer, buffer_size); | |
765 } // Lock end | |
766 | |
767 //Notify outside lock to avoid contention. This might produce some | |
768 //spurious wakeups, but it's usually far better than notifying inside. | |
769 //If this message changes the queue empty state, notify it to receivers | |
770 if (was_empty){ | |
771 p_hdr->m_cond_recv.notify_one(); | |
772 } | |
773 | |
774 return true; | |
775 } | |
776 | |
777 template<class VoidPointer> | |
778 inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size, | |
779 size_type &recvd_size, unsigned int &priority) | |
780 { this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); } | |
781 | |
782 template<class VoidPointer> | |
783 inline bool | |
784 message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size, | |
785 size_type &recvd_size, unsigned int &priority) | |
786 { return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); } | |
787 | |
788 template<class VoidPointer> | |
789 inline bool | |
790 message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size, | |
791 size_type &recvd_size, unsigned int &priority, | |
792 const boost::posix_time::ptime &abs_time) | |
793 { | |
794 if(abs_time == boost::posix_time::pos_infin){ | |
795 this->receive(buffer, buffer_size, recvd_size, priority); | |
796 return true; | |
797 } | |
798 return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time); | |
799 } | |
800 | |
801 template<class VoidPointer> | |
802 inline bool | |
803 message_queue_t<VoidPointer>::do_receive(block_t block, | |
804 void *buffer, size_type buffer_size, | |
805 size_type &recvd_size, unsigned int &priority, | |
806 const boost::posix_time::ptime &abs_time) | |
807 { | |
808 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
809 //Check if buffer is big enough for any message | |
810 if (buffer_size < p_hdr->m_max_msg_size) { | |
811 throw interprocess_exception(size_error); | |
812 } | |
813 | |
814 bool was_full = false; | |
815 //--------------------------------------------- | |
816 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); | |
817 //--------------------------------------------- | |
818 { | |
819 //If there are no messages execute blocking logic | |
820 if (p_hdr->is_empty()) { | |
821 switch(block){ | |
822 case non_blocking : | |
823 return false; | |
824 break; | |
825 | |
826 case blocking : | |
827 do{ | |
828 p_hdr->m_cond_recv.wait(lock); | |
829 } | |
830 while (p_hdr->is_empty()); | |
831 break; | |
832 | |
833 case timed : | |
834 do{ | |
835 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){ | |
836 if(p_hdr->is_empty()) | |
837 return false; | |
838 break; | |
839 } | |
840 } | |
841 while (p_hdr->is_empty()); | |
842 break; | |
843 | |
844 //Paranoia check | |
845 default: | |
846 break; | |
847 } | |
848 } | |
849 | |
850 //There is at least one message ready to pick, get the top one | |
851 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg(); | |
852 | |
853 //Get data from the message | |
854 recvd_size = top_msg.len; | |
855 priority = top_msg.priority; | |
856 | |
857 //Some cleanup to ease debugging | |
858 top_msg.len = 0; | |
859 top_msg.priority = 0; | |
860 | |
861 //Copy data to receiver's bufers | |
862 std::memcpy(buffer, top_msg.data(), recvd_size); | |
863 | |
864 was_full = p_hdr->is_full(); | |
865 | |
866 //Free top message and put it in the free message list | |
867 p_hdr->free_top_msg(); | |
868 } //Lock end | |
869 | |
870 //Notify outside lock to avoid contention. This might produce some | |
871 //spurious wakeups, but it's usually far better than notifying inside. | |
872 //If this reception changes the queue full state, notify senders | |
873 if (was_full){ | |
874 p_hdr->m_cond_send.notify_one(); | |
875 } | |
876 | |
877 return true; | |
878 } | |
879 | |
880 template<class VoidPointer> | |
881 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const | |
882 { | |
883 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
884 return p_hdr ? p_hdr->m_max_num_msg : 0; } | |
885 | |
886 template<class VoidPointer> | |
887 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const | |
888 { | |
889 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
890 return p_hdr ? p_hdr->m_max_msg_size : 0; | |
891 } | |
892 | |
893 template<class VoidPointer> | |
894 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const | |
895 { | |
896 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address()); | |
897 if(p_hdr){ | |
898 //--------------------------------------------- | |
899 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex); | |
900 //--------------------------------------------- | |
901 return p_hdr->m_cur_num_msg; | |
902 } | |
903 | |
904 return 0; | |
905 } | |
906 | |
907 template<class VoidPointer> | |
908 inline bool message_queue_t<VoidPointer>::remove(const char *name) | |
909 { return shared_memory_object::remove(name); } | |
910 | |
911 /// @endcond | |
912 | |
913 }} //namespace boost{ namespace interprocess{ | |
914 | |
915 #include <boost/interprocess/detail/config_end.hpp> | |
916 | |
917 #endif //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP |