Chris@16
|
1 //////////////////////////////////////////////////////////////////////////////
|
Chris@16
|
2 //
|
Chris@16
|
3 // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
|
Chris@16
|
4 // Software License, Version 1.0. (See accompanying file
|
Chris@16
|
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
Chris@16
|
6 //
|
Chris@16
|
7 // See http://www.boost.org/libs/interprocess for documentation.
|
Chris@16
|
8 //
|
Chris@16
|
9 //////////////////////////////////////////////////////////////////////////////
|
Chris@16
|
10
|
Chris@16
|
11 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
|
Chris@16
|
12 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
|
Chris@16
|
13
|
Chris@101
|
14 #ifndef BOOST_CONFIG_HPP
|
Chris@101
|
15 # include <boost/config.hpp>
|
Chris@101
|
16 #endif
|
Chris@101
|
17 #
|
Chris@101
|
18 #if defined(BOOST_HAS_PRAGMA_ONCE)
|
Chris@101
|
19 # pragma once
|
Chris@101
|
20 #endif
|
Chris@101
|
21
|
Chris@16
|
22 #include <boost/interprocess/detail/config_begin.hpp>
|
Chris@16
|
23 #include <boost/interprocess/detail/workaround.hpp>
|
Chris@16
|
24
|
Chris@16
|
25 #include <boost/interprocess/shared_memory_object.hpp>
|
Chris@16
|
26 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
|
Chris@16
|
27 #include <boost/interprocess/sync/interprocess_condition.hpp>
|
Chris@16
|
28 #include <boost/interprocess/sync/interprocess_mutex.hpp>
|
Chris@16
|
29 #include <boost/interprocess/sync/scoped_lock.hpp>
|
Chris@16
|
30 #include <boost/interprocess/detail/utilities.hpp>
|
Chris@16
|
31 #include <boost/interprocess/offset_ptr.hpp>
|
Chris@16
|
32 #include <boost/interprocess/creation_tags.hpp>
|
Chris@16
|
33 #include <boost/interprocess/exceptions.hpp>
|
Chris@16
|
34 #include <boost/interprocess/permissions.hpp>
|
Chris@101
|
35 #include <boost/core/no_exceptions_support.hpp>
|
Chris@16
|
36 #include <boost/interprocess/detail/type_traits.hpp>
|
Chris@16
|
37 #include <boost/intrusive/pointer_traits.hpp>
|
Chris@101
|
38 #include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of
|
Chris@16
|
39 #include <boost/intrusive/pointer_traits.hpp>
|
Chris@16
|
40 #include <boost/assert.hpp>
|
Chris@16
|
41 #include <algorithm> //std::lower_bound
|
Chris@16
|
42 #include <cstddef> //std::size_t
|
Chris@16
|
43 #include <cstring> //memcpy
|
Chris@16
|
44
|
Chris@16
|
45
|
Chris@16
|
46 //!\file
|
Chris@16
|
47 //!Describes an inter-process message queue. This class allows sending
|
Chris@16
|
48 //!messages between processes and allows blocking, non-blocking and timed
|
Chris@16
|
49 //!sending and receiving.
|
Chris@16
|
50
|
Chris@16
|
51 namespace boost{ namespace interprocess{
|
Chris@16
|
52
|
Chris@16
|
53 namespace ipcdetail
|
Chris@16
|
54 {
|
Chris@16
|
55 template<class VoidPointer>
|
Chris@16
|
56 class msg_queue_initialization_func_t;
|
Chris@16
|
57 }
|
Chris@16
|
58
|
Chris@16
|
59 //!A class that allows sending messages
|
Chris@16
|
60 //!between processes.
|
Chris@16
|
61 template<class VoidPointer>
|
Chris@16
|
62 class message_queue_t
|
Chris@16
|
63 {
|
Chris@101
|
64 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
|
Chris@16
|
65 //Blocking modes
|
Chris@16
|
66 enum block_t { blocking, timed, non_blocking };
|
Chris@16
|
67
|
Chris@16
|
68 message_queue_t();
|
Chris@101
|
69 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
|
Chris@16
|
70
|
Chris@16
|
71 public:
|
Chris@16
|
72 typedef VoidPointer void_pointer;
|
Chris@16
|
73 typedef typename boost::intrusive::
|
Chris@16
|
74 pointer_traits<void_pointer>::template
|
Chris@16
|
75 rebind_pointer<char>::type char_ptr;
|
Chris@16
|
76 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
|
Chris@101
|
77 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type;
|
Chris@16
|
78
|
Chris@16
|
79 //!Creates a process shared message queue with name "name". For this message queue,
|
Chris@16
|
80 //!the maximum number of messages will be "max_num_msg" and the maximum message size
|
Chris@16
|
81 //!will be "max_msg_size". Throws on error and if the queue was previously created.
|
Chris@16
|
82 message_queue_t(create_only_t create_only,
|
Chris@16
|
83 const char *name,
|
Chris@16
|
84 size_type max_num_msg,
|
Chris@16
|
85 size_type max_msg_size,
|
Chris@16
|
86 const permissions &perm = permissions());
|
Chris@16
|
87
|
Chris@16
|
88 //!Opens or creates a process shared message queue with name "name".
|
Chris@16
|
89 //!If the queue is created, the maximum number of messages will be "max_num_msg"
|
Chris@16
|
90 //!and the maximum message size will be "max_msg_size". If queue was previously
|
Chris@16
|
91 //!created the queue will be opened and "max_num_msg" and "max_msg_size" parameters
|
Chris@16
|
92 //!are ignored. Throws on error.
|
Chris@16
|
93 message_queue_t(open_or_create_t open_or_create,
|
Chris@16
|
94 const char *name,
|
Chris@16
|
95 size_type max_num_msg,
|
Chris@16
|
96 size_type max_msg_size,
|
Chris@16
|
97 const permissions &perm = permissions());
|
Chris@16
|
98
|
Chris@16
|
99 //!Opens a previously created process shared message queue with name "name".
|
Chris@16
|
100 //!If the queue was not previously created or there are no free resources,
|
Chris@16
|
101 //!throws an error.
|
Chris@16
|
102 message_queue_t(open_only_t open_only,
|
Chris@16
|
103 const char *name);
|
Chris@16
|
104
|
Chris@16
|
105 //!Destroys *this and indicates that the calling process is finished using
|
Chris@16
|
106 //!the resource. All opened message queues are still
|
Chris@16
|
107 //!valid after destruction. The destructor function will deallocate
|
Chris@16
|
108 //!any system resources allocated by the system for use by this process for
|
Chris@16
|
109 //!this resource. The resource can still be opened again calling
|
Chris@16
|
110 //!the open constructor overload. To erase the message queue from the system
|
Chris@16
|
111 //!use remove().
|
Chris@16
|
112 ~message_queue_t();
|
Chris@16
|
113
|
Chris@16
|
114 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
|
Chris@16
|
115 //!message queue with priority "priority". If the message queue is full
|
Chris@16
|
116 //!the sender is blocked. Throws interprocess_error on error.
|
Chris@16
|
117 void send (const void *buffer, size_type buffer_size,
|
Chris@16
|
118 unsigned int priority);
|
Chris@16
|
119
|
Chris@16
|
120 //!Sends a message stored in buffer "buffer" with size "buffer_size" through the
|
Chris@16
|
121 //!message queue with priority "priority". If the message queue is full
|
Chris@16
|
122 //!the sender is not blocked and returns false, otherwise returns true.
|
Chris@16
|
123 //!Throws interprocess_error on error.
|
Chris@16
|
124 bool try_send (const void *buffer, size_type buffer_size,
|
Chris@16
|
125 unsigned int priority);
|
Chris@16
|
126
|
Chris@16
|
127 //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
|
Chris@16
|
128 //!message queue with priority "priority". If the message queue is full
|
Chris@16
|
129 //!the sender retries until time "abs_time" is reached. Returns true if
|
Chris@16
|
130 //!the message has been successfully sent. Returns false if timeout is reached.
|
Chris@16
|
131 //!Throws interprocess_error on error.
|
Chris@16
|
132 bool timed_send (const void *buffer, size_type buffer_size,
|
Chris@16
|
133 unsigned int priority, const boost::posix_time::ptime& abs_time);
|
Chris@16
|
134
|
Chris@16
|
135 //!Receives a message from the message queue. The message is stored in buffer
|
Chris@16
|
136 //!"buffer", which has size "buffer_size". The received message has size
|
Chris@16
|
137 //!"recvd_size" and priority "priority". If the message queue is empty
|
Chris@16
|
138 //!the receiver is blocked. Throws interprocess_error on error.
|
Chris@16
|
139 void receive (void *buffer, size_type buffer_size,
|
Chris@16
|
140 size_type &recvd_size,unsigned int &priority);
|
Chris@16
|
141
|
Chris@16
|
142 //!Receives a message from the message queue. The message is stored in buffer
|
Chris@16
|
143 //!"buffer", which has size "buffer_size". The received message has size
|
Chris@16
|
144 //!"recvd_size" and priority "priority". If the message queue is empty
|
Chris@16
|
145 //!the receiver is not blocked and returns false, otherwise returns true.
|
Chris@16
|
146 //!Throws interprocess_error on error.
|
Chris@16
|
147 bool try_receive (void *buffer, size_type buffer_size,
|
Chris@16
|
148 size_type &recvd_size,unsigned int &priority);
|
Chris@16
|
149
|
Chris@16
|
150 //!Receives a message from the message queue. The message is stored in buffer
|
Chris@16
|
151 //!"buffer", which has size "buffer_size". The received message has size
|
Chris@16
|
152 //!"recvd_size" and priority "priority". If the message queue is empty
|
Chris@16
|
153 //!the receiver retries until time "abs_time" is reached. Returns true if
|
Chris@16
|
154 //!the message has been successfully sent. Returns false if timeout is reached.
|
Chris@16
|
155 //!Throws interprocess_error on error.
|
Chris@16
|
156 bool timed_receive (void *buffer, size_type buffer_size,
|
Chris@16
|
157 size_type &recvd_size,unsigned int &priority,
|
Chris@16
|
158 const boost::posix_time::ptime &abs_time);
|
Chris@16
|
159
|
Chris@16
|
160 //!Returns the maximum number of messages allowed by the queue. The message
|
Chris@16
|
161 //!queue must be opened or created previously. Otherwise, returns 0.
|
Chris@16
|
162 //!Never throws
|
Chris@16
|
163 size_type get_max_msg() const;
|
Chris@16
|
164
|
Chris@16
|
165 //!Returns the maximum size of message allowed by the queue. The message
|
Chris@16
|
166 //!queue must be opened or created previously. Otherwise, returns 0.
|
Chris@16
|
167 //!Never throws
|
Chris@16
|
168 size_type get_max_msg_size() const;
|
Chris@16
|
169
|
Chris@16
|
170 //!Returns the number of messages currently stored.
|
Chris@16
|
171 //!Never throws
|
Chris@16
|
172 size_type get_num_msg() const;
|
Chris@16
|
173
|
Chris@16
|
174 //!Removes the message queue from the system.
|
Chris@16
|
175 //!Returns false on error. Never throws
|
Chris@16
|
176 static bool remove(const char *name);
|
Chris@16
|
177
|
Chris@101
|
178 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
|
Chris@16
|
179 private:
|
Chris@16
|
180 typedef boost::posix_time::ptime ptime;
|
Chris@16
|
181
|
Chris@16
|
182 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
|
Chris@16
|
183
|
Chris@16
|
184 bool do_receive(block_t block,
|
Chris@16
|
185 void *buffer, size_type buffer_size,
|
Chris@16
|
186 size_type &recvd_size, unsigned int &priority,
|
Chris@16
|
187 const ptime &abs_time);
|
Chris@16
|
188
|
Chris@16
|
189 bool do_send(block_t block,
|
Chris@16
|
190 const void *buffer, size_type buffer_size,
|
Chris@16
|
191 unsigned int priority, const ptime &abs_time);
|
Chris@16
|
192
|
Chris@16
|
193 //!Returns the needed memory size for the shared message queue.
|
Chris@16
|
194 //!Never throws
|
Chris@16
|
195 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
|
Chris@16
|
196 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
|
Chris@16
|
197 open_create_impl_t m_shmem;
|
Chris@101
|
198 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
|
Chris@16
|
199 };
|
Chris@16
|
200
|
Chris@101
|
201 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
|
Chris@16
|
202
|
Chris@16
|
203 namespace ipcdetail {
|
Chris@16
|
204
|
Chris@16
|
205 //!This header is the prefix of each message in the queue
|
Chris@16
|
206 template<class VoidPointer>
|
Chris@16
|
207 class msg_hdr_t
|
Chris@16
|
208 {
|
Chris@16
|
209 typedef VoidPointer void_pointer;
|
Chris@16
|
210 typedef typename boost::intrusive::
|
Chris@16
|
211 pointer_traits<void_pointer>::template
|
Chris@16
|
212 rebind_pointer<char>::type char_ptr;
|
Chris@16
|
213 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
|
Chris@101
|
214 typedef typename boost::container::container_detail::make_unsigned<difference_type>::type size_type;
|
Chris@16
|
215
|
Chris@16
|
216 public:
|
Chris@16
|
217 size_type len; // Message length
|
Chris@16
|
218 unsigned int priority;// Message priority
|
Chris@16
|
219 //!Returns the data buffer associated with this this message
|
Chris@16
|
220 void * data(){ return this+1; } //
|
Chris@16
|
221 };
|
Chris@16
|
222
|
Chris@16
|
223 //!This functor is the predicate to order stored messages by priority
|
Chris@16
|
224 template<class VoidPointer>
|
Chris@16
|
225 class priority_functor
|
Chris@16
|
226 {
|
Chris@16
|
227 typedef typename boost::intrusive::
|
Chris@16
|
228 pointer_traits<VoidPointer>::template
|
Chris@16
|
229 rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t;
|
Chris@16
|
230
|
Chris@16
|
231 public:
|
Chris@16
|
232 bool operator()(const msg_hdr_ptr_t &msg1,
|
Chris@16
|
233 const msg_hdr_ptr_t &msg2) const
|
Chris@16
|
234 { return msg1->priority < msg2->priority; }
|
Chris@16
|
235 };
|
Chris@16
|
236
|
Chris@16
|
237 //!This header is placed in the beginning of the shared memory and contains
|
Chris@16
|
238 //!the data to control the queue. This class initializes the shared memory
|
Chris@16
|
239 //!in the following way: in ascending memory address with proper alignment
|
Chris@16
|
240 //!fillings:
|
Chris@16
|
241 //!
|
Chris@16
|
242 //!-> mq_hdr_t:
|
Chris@16
|
243 //! Main control block that controls the rest of the elements
|
Chris@16
|
244 //!
|
Chris@16
|
245 //!-> offset_ptr<msg_hdr_t> index [max_num_msg]
|
Chris@16
|
246 //! An array of pointers with size "max_num_msg" called index. Each pointer
|
Chris@16
|
247 //! points to a preallocated message. Elements of this array are
|
Chris@16
|
248 //! reordered in runtime in the following way:
|
Chris@16
|
249 //!
|
Chris@16
|
250 //! IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined:
|
Chris@16
|
251 //!
|
Chris@16
|
252 //! When the current number of messages is "cur_num_msg", the array
|
Chris@16
|
253 //! is treated like a circular buffer. Starting from position "cur_first_msg"
|
Chris@16
|
254 //! "cur_num_msg" in a circular way, pointers point to inserted messages and the rest
|
Chris@16
|
255 //! point to free messages. Those "cur_num_msg" pointers are
|
Chris@16
|
256 //! ordered by the priority of the pointed message and by insertion order
|
Chris@16
|
257 //! if two messages have the same priority. So the next message to be
|
Chris@16
|
258 //! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg]
|
Chris@16
|
259 //! and the first free message ready to be used in a "send" operation is
|
Chris@101
|
260 //! [cur_first_msg] if circular buffer is extended from front,
|
Chris@16
|
261 //! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise.
|
Chris@16
|
262 //!
|
Chris@16
|
263 //! This transforms the index in a circular buffer with an embedded free
|
Chris@16
|
264 //! message queue.
|
Chris@16
|
265 //!
|
Chris@16
|
266 //! ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined):
|
Chris@16
|
267 //!
|
Chris@16
|
268 //! When the current number of messages is "cur_num_msg", the first
|
Chris@16
|
269 //! "cur_num_msg" pointers point to inserted messages and the rest
|
Chris@16
|
270 //! point to free messages. The first "cur_num_msg" pointers are
|
Chris@16
|
271 //! ordered by the priority of the pointed message and by insertion order
|
Chris@16
|
272 //! if two messages have the same priority. So the next message to be
|
Chris@16
|
273 //! used in a "receive" is pointed by index [cur_num_msg-1] and the first free
|
Chris@16
|
274 //! message ready to be used in a "send" operation is index [cur_num_msg].
|
Chris@16
|
275 //!
|
Chris@16
|
276 //! This transforms the index in a fixed size priority queue with an embedded free
|
Chris@16
|
277 //! message queue.
|
Chris@16
|
278 //!
|
Chris@16
|
279 //!-> struct message_t
|
Chris@16
|
280 //! {
|
Chris@16
|
281 //! msg_hdr_t header;
|
Chris@16
|
282 //! char[max_msg_size] data;
|
Chris@16
|
283 //! } messages [max_num_msg];
|
Chris@16
|
284 //!
|
Chris@16
|
285 //! An array of buffers of preallocated messages, each one prefixed with the
|
Chris@16
|
286 //! msg_hdr_t structure. Each of this message is pointed by one pointer of
|
Chris@16
|
287 //! the index structure.
|
Chris@16
|
288 template<class VoidPointer>
|
Chris@16
|
289 class mq_hdr_t
|
Chris@16
|
290 : public ipcdetail::priority_functor<VoidPointer>
|
Chris@16
|
291 {
|
Chris@16
|
292 typedef VoidPointer void_pointer;
|
Chris@16
|
293 typedef msg_hdr_t<void_pointer> msg_header;
|
Chris@16
|
294 typedef typename boost::intrusive::
|
Chris@16
|
295 pointer_traits<void_pointer>::template
|
Chris@16
|
296 rebind_pointer<msg_header>::type msg_hdr_ptr_t;
|
Chris@16
|
297 typedef typename boost::intrusive::pointer_traits
|
Chris@16
|
298 <msg_hdr_ptr_t>::difference_type difference_type;
|
Chris@101
|
299 typedef typename boost::container::
|
Chris@101
|
300 container_detail::make_unsigned<difference_type>::type size_type;
|
Chris@16
|
301 typedef typename boost::intrusive::
|
Chris@16
|
302 pointer_traits<void_pointer>::template
|
Chris@16
|
303 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t;
|
Chris@16
|
304 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
|
Chris@16
|
305
|
Chris@16
|
306 public:
|
Chris@16
|
307 //!Constructor. This object must be constructed in the beginning of the
|
Chris@16
|
308 //!shared memory of the size returned by the function "get_mem_size".
|
Chris@16
|
309 //!This constructor initializes the needed resources and creates
|
Chris@16
|
310 //!the internal structures like the priority index. This can throw.
|
Chris@16
|
311 mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
|
Chris@16
|
312 : m_max_num_msg(max_num_msg),
|
Chris@16
|
313 m_max_msg_size(max_msg_size),
|
Chris@16
|
314 m_cur_num_msg(0)
|
Chris@16
|
315 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@16
|
316 ,m_cur_first_msg(0u)
|
Chris@101
|
317 ,m_blocked_senders(0u)
|
Chris@101
|
318 ,m_blocked_receivers(0u)
|
Chris@16
|
319 #endif
|
Chris@16
|
320 { this->initialize_memory(); }
|
Chris@16
|
321
|
Chris@16
|
322 //!Returns true if the message queue is full
|
Chris@16
|
323 bool is_full() const
|
Chris@16
|
324 { return m_cur_num_msg == m_max_num_msg; }
|
Chris@16
|
325
|
Chris@16
|
326 //!Returns true if the message queue is empty
|
Chris@16
|
327 bool is_empty() const
|
Chris@16
|
328 { return !m_cur_num_msg; }
|
Chris@16
|
329
|
Chris@16
|
330 //!Frees the top priority message and saves it in the free message list
|
Chris@16
|
331 void free_top_msg()
|
Chris@16
|
332 { --m_cur_num_msg; }
|
Chris@16
|
333
|
Chris@16
|
334 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@16
|
335
|
Chris@16
|
336 typedef msg_hdr_ptr_t *iterator;
|
Chris@16
|
337
|
Chris@16
|
338 size_type end_pos() const
|
Chris@16
|
339 {
|
Chris@16
|
340 const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
|
Chris@16
|
341 return space_until_bufend > m_cur_num_msg
|
Chris@16
|
342 ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
|
Chris@16
|
343 }
|
Chris@16
|
344
|
Chris@16
|
345 //!Returns the inserted message with top priority
|
Chris@16
|
346 msg_header &top_msg()
|
Chris@16
|
347 {
|
Chris@16
|
348 size_type pos = this->end_pos();
|
Chris@16
|
349 return *mp_index[pos ? --pos : m_max_num_msg - 1];
|
Chris@16
|
350 }
|
Chris@16
|
351
|
Chris@16
|
352 //!Returns the inserted message with bottom priority
|
Chris@16
|
353 msg_header &bottom_msg()
|
Chris@16
|
354 { return *mp_index[m_cur_first_msg]; }
|
Chris@16
|
355
|
Chris@16
|
356 iterator inserted_ptr_begin() const
|
Chris@16
|
357 { return &mp_index[m_cur_first_msg]; }
|
Chris@16
|
358
|
Chris@16
|
359 iterator inserted_ptr_end() const
|
Chris@16
|
360 { return &mp_index[this->end_pos()]; }
|
Chris@16
|
361
|
Chris@16
|
362 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
|
Chris@16
|
363 {
|
Chris@16
|
364 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
|
Chris@16
|
365 if(end < begin){
|
Chris@16
|
366 iterator idx_end = &mp_index[m_max_num_msg];
|
Chris@101
|
367 iterator ret = std::lower_bound(begin, idx_end, value, func);
|
Chris@16
|
368 if(idx_end == ret){
|
Chris@16
|
369 iterator idx_beg = &mp_index[0];
|
Chris@16
|
370 ret = std::lower_bound(idx_beg, end, value, func);
|
Chris@16
|
371 //sanity check, these cases should not call lower_bound (optimized out)
|
Chris@16
|
372 BOOST_ASSERT(ret != end);
|
Chris@16
|
373 BOOST_ASSERT(ret != begin);
|
Chris@16
|
374 return ret;
|
Chris@16
|
375 }
|
Chris@16
|
376 else{
|
Chris@16
|
377 return ret;
|
Chris@16
|
378 }
|
Chris@16
|
379 }
|
Chris@16
|
380 else{
|
Chris@16
|
381 return std::lower_bound(begin, end, value, func);
|
Chris@16
|
382 }
|
Chris@16
|
383 }
|
Chris@16
|
384
|
Chris@16
|
385 msg_header & insert_at(iterator where)
|
Chris@16
|
386 {
|
Chris@16
|
387 iterator it_inserted_ptr_end = this->inserted_ptr_end();
|
Chris@16
|
388 iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
|
Chris@101
|
389 if(where == it_inserted_ptr_beg){
|
Chris@16
|
390 //unsigned integer guarantees underflow
|
Chris@16
|
391 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
|
Chris@16
|
392 --m_cur_first_msg;
|
Chris@16
|
393 ++m_cur_num_msg;
|
Chris@16
|
394 return *mp_index[m_cur_first_msg];
|
Chris@16
|
395 }
|
Chris@101
|
396 else if(where == it_inserted_ptr_end){
|
Chris@101
|
397 ++m_cur_num_msg;
|
Chris@101
|
398 return **it_inserted_ptr_end;
|
Chris@101
|
399 }
|
Chris@16
|
400 else{
|
Chris@16
|
401 size_type pos = where - &mp_index[0];
|
Chris@16
|
402 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
|
Chris@16
|
403 //Check if it's more efficient to move back or move front
|
Chris@16
|
404 if(circ_pos < m_cur_num_msg/2){
|
Chris@16
|
405 //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos
|
Chris@16
|
406 //indicates two step insertion
|
Chris@16
|
407 if(!pos){
|
Chris@16
|
408 pos = m_max_num_msg;
|
Chris@16
|
409 where = &mp_index[m_max_num_msg-1];
|
Chris@16
|
410 }
|
Chris@16
|
411 else{
|
Chris@16
|
412 --where;
|
Chris@16
|
413 }
|
Chris@16
|
414 const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
|
Chris@16
|
415 const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u;
|
Chris@16
|
416 const size_type first_segment_end = pos;
|
Chris@16
|
417 const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
|
Chris@16
|
418 const size_type second_segment_end = m_max_num_msg;
|
Chris@16
|
419 const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1);
|
Chris@16
|
420
|
Chris@16
|
421 //First segment
|
Chris@16
|
422 if(!unique_segment){
|
Chris@16
|
423 std::copy( &mp_index[0] + second_segment_beg
|
Chris@16
|
424 , &mp_index[0] + second_segment_end
|
Chris@16
|
425 , &mp_index[0] + second_segment_beg - 1);
|
Chris@16
|
426 mp_index[m_max_num_msg-1] = mp_index[0];
|
Chris@16
|
427 }
|
Chris@16
|
428 std::copy( &mp_index[0] + first_segment_beg
|
Chris@16
|
429 , &mp_index[0] + first_segment_end
|
Chris@16
|
430 , &mp_index[0] + first_segment_beg - 1);
|
Chris@16
|
431 *where = backup;
|
Chris@16
|
432 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
|
Chris@16
|
433 --m_cur_first_msg;
|
Chris@16
|
434 ++m_cur_num_msg;
|
Chris@16
|
435 return **where;
|
Chris@16
|
436 }
|
Chris@16
|
437 else{
|
Chris@16
|
438 //The queue can't be full so end_pos < m_cur_first_msg
|
Chris@16
|
439 //indicates two step insertion
|
Chris@16
|
440 const size_type pos_end = this->end_pos();
|
Chris@16
|
441 const bool unique_segment = pos < pos_end;
|
Chris@16
|
442 const size_type first_segment_beg = pos;
|
Chris@16
|
443 const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1;
|
Chris@16
|
444 const size_type second_segment_beg = 0u;
|
Chris@16
|
445 const size_type second_segment_end = unique_segment ? 0u : pos_end;
|
Chris@16
|
446 const msg_hdr_ptr_t backup = *it_inserted_ptr_end;
|
Chris@16
|
447
|
Chris@16
|
448 //First segment
|
Chris@16
|
449 if(!unique_segment){
|
Chris@16
|
450 std::copy_backward( &mp_index[0] + second_segment_beg
|
Chris@16
|
451 , &mp_index[0] + second_segment_end
|
Chris@16
|
452 , &mp_index[0] + second_segment_end + 1);
|
Chris@16
|
453 mp_index[0] = mp_index[m_max_num_msg-1];
|
Chris@16
|
454 }
|
Chris@16
|
455 std::copy_backward( &mp_index[0] + first_segment_beg
|
Chris@16
|
456 , &mp_index[0] + first_segment_end
|
Chris@16
|
457 , &mp_index[0] + first_segment_end + 1);
|
Chris@16
|
458 *where = backup;
|
Chris@16
|
459 ++m_cur_num_msg;
|
Chris@16
|
460 return **where;
|
Chris@16
|
461 }
|
Chris@16
|
462 }
|
Chris@16
|
463 }
|
Chris@16
|
464
|
Chris@101
|
465 #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
|
Chris@16
|
466
|
Chris@16
|
467 typedef msg_hdr_ptr_t *iterator;
|
Chris@16
|
468
|
Chris@16
|
469 //!Returns the inserted message with top priority
|
Chris@16
|
470 msg_header &top_msg()
|
Chris@16
|
471 { return *mp_index[m_cur_num_msg-1]; }
|
Chris@16
|
472
|
Chris@16
|
473 //!Returns the inserted message with bottom priority
|
Chris@16
|
474 msg_header &bottom_msg()
|
Chris@16
|
475 { return *mp_index[0]; }
|
Chris@16
|
476
|
Chris@16
|
477 iterator inserted_ptr_begin() const
|
Chris@16
|
478 { return &mp_index[0]; }
|
Chris@16
|
479
|
Chris@16
|
480 iterator inserted_ptr_end() const
|
Chris@16
|
481 { return &mp_index[m_cur_num_msg]; }
|
Chris@16
|
482
|
Chris@16
|
483 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
|
Chris@16
|
484 { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); }
|
Chris@16
|
485
|
Chris@16
|
486 msg_header & insert_at(iterator pos)
|
Chris@16
|
487 {
|
Chris@16
|
488 const msg_hdr_ptr_t backup = *inserted_ptr_end();
|
Chris@16
|
489 std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
|
Chris@16
|
490 *pos = backup;
|
Chris@16
|
491 ++m_cur_num_msg;
|
Chris@16
|
492 return **pos;
|
Chris@16
|
493 }
|
Chris@16
|
494
|
Chris@101
|
495 #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
|
Chris@16
|
496
|
Chris@16
|
497 //!Inserts the first free message in the priority queue
|
Chris@16
|
498 msg_header & queue_free_msg(unsigned int priority)
|
Chris@16
|
499 {
|
Chris@16
|
500 //Get priority queue's range
|
Chris@16
|
501 iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end());
|
Chris@16
|
502 //Optimize for non-priority usage
|
Chris@16
|
503 if(m_cur_num_msg && priority > this->bottom_msg().priority){
|
Chris@16
|
504 //Check for higher priority than all stored messages
|
Chris@16
|
505 if(priority > this->top_msg().priority){
|
Chris@16
|
506 it = it_end;
|
Chris@16
|
507 }
|
Chris@16
|
508 else{
|
Chris@16
|
509 //Since we don't now which free message we will pick
|
Chris@16
|
510 //build a dummy header for searches
|
Chris@16
|
511 msg_header dummy_hdr;
|
Chris@16
|
512 dummy_hdr.priority = priority;
|
Chris@16
|
513
|
Chris@16
|
514 //Get free msg
|
Chris@16
|
515 msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
|
Chris@16
|
516
|
Chris@16
|
517 //Check where the free message should be placed
|
Chris@16
|
518 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
|
Chris@16
|
519 }
|
Chris@16
|
520 }
|
Chris@16
|
521 //Insert the free message in the correct position
|
Chris@16
|
522 return this->insert_at(it);
|
Chris@16
|
523 }
|
Chris@16
|
524
|
Chris@16
|
525 //!Returns the number of bytes needed to construct a message queue with
|
Chris@16
|
526 //!"max_num_size" maximum number of messages and "max_msg_size" maximum
|
Chris@16
|
527 //!message size. Never throws.
|
Chris@16
|
528 static size_type get_mem_size
|
Chris@16
|
529 (size_type max_msg_size, size_type max_num_msg)
|
Chris@16
|
530 {
|
Chris@16
|
531 const size_type
|
Chris@101
|
532 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value,
|
Chris@101
|
533 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value,
|
Chris@16
|
534 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
|
Chris@101
|
535 r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
|
Chris@101
|
536 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
|
Chris@16
|
537 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
|
Chris@16
|
538 open_create_impl_t::ManagedOpenOrCreateUserOffset;
|
Chris@16
|
539 }
|
Chris@16
|
540
|
Chris@16
|
541 //!Initializes the memory structures to preallocate messages and constructs the
|
Chris@16
|
542 //!message index. Never throws.
|
Chris@16
|
543 void initialize_memory()
|
Chris@16
|
544 {
|
Chris@16
|
545 const size_type
|
Chris@101
|
546 msg_hdr_align = ::boost::container::container_detail::alignment_of<msg_header>::value,
|
Chris@101
|
547 index_align = ::boost::container::container_detail::alignment_of<msg_hdr_ptr_t>::value,
|
Chris@16
|
548 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
|
Chris@101
|
549 r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
|
Chris@101
|
550 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
|
Chris@16
|
551
|
Chris@16
|
552 //Pointer to the index
|
Chris@16
|
553 msg_hdr_ptr_t *index = reinterpret_cast<msg_hdr_ptr_t*>
|
Chris@16
|
554 (reinterpret_cast<char*>(this)+r_hdr_size);
|
Chris@16
|
555
|
Chris@16
|
556 //Pointer to the first message header
|
Chris@16
|
557 msg_header *msg_hdr = reinterpret_cast<msg_header*>
|
Chris@16
|
558 (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
|
Chris@16
|
559
|
Chris@16
|
560 //Initialize the pointer to the index
|
Chris@16
|
561 mp_index = index;
|
Chris@16
|
562
|
Chris@16
|
563 //Initialize the index so each slot points to a preallocated message
|
Chris@16
|
564 for(size_type i = 0; i < m_max_num_msg; ++i){
|
Chris@16
|
565 index[i] = msg_hdr;
|
Chris@16
|
566 msg_hdr = reinterpret_cast<msg_header*>
|
Chris@16
|
567 (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
|
Chris@16
|
568 }
|
Chris@16
|
569 }
|
Chris@16
|
570
|
Chris@16
|
571 public:
|
Chris@16
|
572 //Pointer to the index
|
Chris@16
|
573 msg_hdr_ptr_ptr_t mp_index;
|
Chris@16
|
574 //Maximum number of messages of the queue
|
Chris@16
|
575 const size_type m_max_num_msg;
|
Chris@16
|
576 //Maximum size of messages of the queue
|
Chris@16
|
577 const size_type m_max_msg_size;
|
Chris@16
|
578 //Current number of messages
|
Chris@16
|
579 size_type m_cur_num_msg;
|
Chris@16
|
580 //Mutex to protect data structures
|
Chris@16
|
581 interprocess_mutex m_mutex;
|
Chris@16
|
582 //Condition block receivers when there are no messages
|
Chris@16
|
583 interprocess_condition m_cond_recv;
|
Chris@16
|
584 //Condition block senders when the queue is full
|
Chris@16
|
585 interprocess_condition m_cond_send;
|
Chris@16
|
586 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@16
|
587 //Current start offset in the circular index
|
Chris@16
|
588 size_type m_cur_first_msg;
|
Chris@101
|
589 size_type m_blocked_senders;
|
Chris@101
|
590 size_type m_blocked_receivers;
|
Chris@16
|
591 #endif
|
Chris@16
|
592 };
|
Chris@16
|
593
|
Chris@16
|
594
|
Chris@16
|
595 //!This is the atomic functor to be executed when creating or opening
|
Chris@16
|
596 //!shared memory. Never throws
|
Chris@16
|
597 template<class VoidPointer>
|
Chris@16
|
598 class msg_queue_initialization_func_t
|
Chris@16
|
599 {
|
Chris@16
|
600 public:
|
Chris@16
|
601 typedef typename boost::intrusive::
|
Chris@16
|
602 pointer_traits<VoidPointer>::template
|
Chris@101
|
603 rebind_pointer<char>::type char_ptr;
|
Chris@101
|
604 typedef typename boost::intrusive::pointer_traits<char_ptr>::
|
Chris@101
|
605 difference_type difference_type;
|
Chris@101
|
606 typedef typename boost::container::container_detail::
|
Chris@101
|
607 make_unsigned<difference_type>::type size_type;
|
Chris@16
|
608
|
Chris@16
|
609 msg_queue_initialization_func_t(size_type maxmsg = 0,
|
Chris@16
|
610 size_type maxmsgsize = 0)
|
Chris@16
|
611 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
|
Chris@16
|
612
|
Chris@16
|
613 bool operator()(void *address, size_type, bool created)
|
Chris@16
|
614 {
|
Chris@16
|
615 char *mptr;
|
Chris@16
|
616
|
Chris@16
|
617 if(created){
|
Chris@16
|
618 mptr = reinterpret_cast<char*>(address);
|
Chris@16
|
619 //Construct the message queue header at the beginning
|
Chris@16
|
620 BOOST_TRY{
|
Chris@16
|
621 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
|
Chris@16
|
622 }
|
Chris@16
|
623 BOOST_CATCH(...){
|
Chris@16
|
624 return false;
|
Chris@16
|
625 }
|
Chris@16
|
626 BOOST_CATCH_END
|
Chris@16
|
627 }
|
Chris@16
|
628 return true;
|
Chris@16
|
629 }
|
Chris@16
|
630
|
Chris@16
|
631 std::size_t get_min_size() const
|
Chris@16
|
632 {
|
Chris@16
|
633 return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
|
Chris@16
|
634 - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
|
Chris@16
|
635 }
|
Chris@16
|
636
|
Chris@16
|
637 const size_type m_maxmsg;
|
Chris@16
|
638 const size_type m_maxmsgsize;
|
Chris@16
|
639 };
|
Chris@16
|
640
|
Chris@16
|
641 } //namespace ipcdetail {
|
Chris@16
|
642
|
Chris@16
|
643 template<class VoidPointer>
|
Chris@16
|
644 inline message_queue_t<VoidPointer>::~message_queue_t()
|
Chris@16
|
645 {}
|
Chris@16
|
646
|
Chris@16
|
647 template<class VoidPointer>
|
Chris@16
|
648 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
|
Chris@16
|
649 (size_type max_msg_size, size_type max_num_msg)
|
Chris@16
|
650 { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); }
|
Chris@16
|
651
|
Chris@16
|
652 template<class VoidPointer>
|
Chris@16
|
653 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
|
Chris@16
|
654 const char *name,
|
Chris@16
|
655 size_type max_num_msg,
|
Chris@16
|
656 size_type max_msg_size,
|
Chris@16
|
657 const permissions &perm)
|
Chris@16
|
658 //Create shared memory and execute functor atomically
|
Chris@16
|
659 : m_shmem(create_only,
|
Chris@16
|
660 name,
|
Chris@16
|
661 get_mem_size(max_msg_size, max_num_msg),
|
Chris@16
|
662 read_write,
|
Chris@16
|
663 static_cast<void*>(0),
|
Chris@16
|
664 //Prepare initialization functor
|
Chris@16
|
665 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
|
Chris@16
|
666 perm)
|
Chris@16
|
667 {}
|
Chris@16
|
668
|
Chris@16
|
669 template<class VoidPointer>
|
Chris@16
|
670 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
|
Chris@16
|
671 const char *name,
|
Chris@16
|
672 size_type max_num_msg,
|
Chris@16
|
673 size_type max_msg_size,
|
Chris@16
|
674 const permissions &perm)
|
Chris@16
|
675 //Create shared memory and execute functor atomically
|
Chris@16
|
676 : m_shmem(open_or_create,
|
Chris@16
|
677 name,
|
Chris@16
|
678 get_mem_size(max_msg_size, max_num_msg),
|
Chris@16
|
679 read_write,
|
Chris@16
|
680 static_cast<void*>(0),
|
Chris@16
|
681 //Prepare initialization functor
|
Chris@16
|
682 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
|
Chris@16
|
683 perm)
|
Chris@16
|
684 {}
|
Chris@16
|
685
|
Chris@16
|
686 template<class VoidPointer>
|
Chris@16
|
687 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
|
Chris@16
|
688 //Create shared memory and execute functor atomically
|
Chris@16
|
689 : m_shmem(open_only,
|
Chris@16
|
690 name,
|
Chris@16
|
691 read_write,
|
Chris@16
|
692 static_cast<void*>(0),
|
Chris@16
|
693 //Prepare initialization functor
|
Chris@16
|
694 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
|
Chris@16
|
695 {}
|
Chris@16
|
696
|
Chris@16
|
697 template<class VoidPointer>
|
Chris@16
|
698 inline void message_queue_t<VoidPointer>::send
|
Chris@16
|
699 (const void *buffer, size_type buffer_size, unsigned int priority)
|
Chris@16
|
700 { this->do_send(blocking, buffer, buffer_size, priority, ptime()); }
|
Chris@16
|
701
|
Chris@16
|
702 template<class VoidPointer>
|
Chris@16
|
703 inline bool message_queue_t<VoidPointer>::try_send
|
Chris@16
|
704 (const void *buffer, size_type buffer_size, unsigned int priority)
|
Chris@16
|
705 { return this->do_send(non_blocking, buffer, buffer_size, priority, ptime()); }
|
Chris@16
|
706
|
Chris@16
|
707 template<class VoidPointer>
|
Chris@16
|
708 inline bool message_queue_t<VoidPointer>::timed_send
|
Chris@16
|
709 (const void *buffer, size_type buffer_size
|
Chris@16
|
710 ,unsigned int priority, const boost::posix_time::ptime &abs_time)
|
Chris@16
|
711 {
|
Chris@16
|
712 if(abs_time == boost::posix_time::pos_infin){
|
Chris@16
|
713 this->send(buffer, buffer_size, priority);
|
Chris@16
|
714 return true;
|
Chris@16
|
715 }
|
Chris@16
|
716 return this->do_send(timed, buffer, buffer_size, priority, abs_time);
|
Chris@16
|
717 }
|
Chris@16
|
718
|
Chris@16
|
719 template<class VoidPointer>
|
Chris@16
|
720 inline bool message_queue_t<VoidPointer>::do_send(block_t block,
|
Chris@16
|
721 const void *buffer, size_type buffer_size,
|
Chris@16
|
722 unsigned int priority, const boost::posix_time::ptime &abs_time)
|
Chris@16
|
723 {
|
Chris@16
|
724 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
|
Chris@16
|
725 //Check if buffer is smaller than maximum allowed
|
Chris@16
|
726 if (buffer_size > p_hdr->m_max_msg_size) {
|
Chris@16
|
727 throw interprocess_exception(size_error);
|
Chris@16
|
728 }
|
Chris@16
|
729
|
Chris@101
|
730 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@101
|
731 bool notify_blocked_receivers = false;
|
Chris@101
|
732 #endif
|
Chris@16
|
733 //---------------------------------------------
|
Chris@16
|
734 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
|
Chris@16
|
735 //---------------------------------------------
|
Chris@16
|
736 {
|
Chris@16
|
737 //If the queue is full execute blocking logic
|
Chris@16
|
738 if (p_hdr->is_full()) {
|
Chris@101
|
739 BOOST_TRY{
|
Chris@101
|
740 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
|
Chris@101
|
741 ++p_hdr->m_blocked_senders;
|
Chris@101
|
742 #endif
|
Chris@101
|
743 switch(block){
|
Chris@101
|
744 case non_blocking :
|
Chris@101
|
745 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
|
Chris@101
|
746 --p_hdr->m_blocked_senders;
|
Chris@101
|
747 #endif
|
Chris@101
|
748 return false;
|
Chris@101
|
749 break;
|
Chris@16
|
750
|
Chris@101
|
751 case blocking :
|
Chris@101
|
752 do{
|
Chris@101
|
753 p_hdr->m_cond_send.wait(lock);
|
Chris@101
|
754 }
|
Chris@101
|
755 while (p_hdr->is_full());
|
Chris@101
|
756 break;
|
Chris@16
|
757
|
Chris@101
|
758 case timed :
|
Chris@101
|
759 do{
|
Chris@101
|
760 if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
|
Chris@101
|
761 if(p_hdr->is_full()){
|
Chris@101
|
762 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
|
Chris@101
|
763 --p_hdr->m_blocked_senders;
|
Chris@101
|
764 #endif
|
Chris@101
|
765 return false;
|
Chris@101
|
766 }
|
Chris@101
|
767 break;
|
Chris@101
|
768 }
|
Chris@16
|
769 }
|
Chris@101
|
770 while (p_hdr->is_full());
|
Chris@101
|
771 break;
|
Chris@101
|
772 default:
|
Chris@101
|
773 break;
|
Chris@101
|
774 }
|
Chris@101
|
775 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
|
Chris@101
|
776 --p_hdr->m_blocked_senders;
|
Chris@101
|
777 #endif
|
Chris@16
|
778 }
|
Chris@101
|
779 BOOST_CATCH(...){
|
Chris@101
|
780 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
|
Chris@101
|
781 --p_hdr->m_blocked_senders;
|
Chris@101
|
782 #endif
|
Chris@101
|
783 BOOST_RETHROW;
|
Chris@101
|
784 }
|
Chris@101
|
785 BOOST_CATCH_END
|
Chris@16
|
786 }
|
Chris@16
|
787
|
Chris@101
|
788 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@101
|
789 notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
|
Chris@101
|
790 #endif
|
Chris@16
|
791 //Insert the first free message in the priority queue
|
Chris@16
|
792 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
|
Chris@16
|
793
|
Chris@16
|
794 //Sanity check, free msgs are always cleaned when received
|
Chris@16
|
795 BOOST_ASSERT(free_msg_hdr.priority == 0);
|
Chris@16
|
796 BOOST_ASSERT(free_msg_hdr.len == 0);
|
Chris@16
|
797
|
Chris@16
|
798 //Copy control data to the free message
|
Chris@16
|
799 free_msg_hdr.priority = priority;
|
Chris@16
|
800 free_msg_hdr.len = buffer_size;
|
Chris@16
|
801
|
Chris@16
|
802 //Copy user buffer to the message
|
Chris@16
|
803 std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
|
Chris@16
|
804 } // Lock end
|
Chris@16
|
805
|
Chris@16
|
806 //Notify outside lock to avoid contention. This might produce some
|
Chris@16
|
807 //spurious wakeups, but it's usually far better than notifying inside.
|
Chris@16
|
808 //If this message changes the queue empty state, notify it to receivers
|
Chris@101
|
809 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@101
|
810 if (notify_blocked_receivers){
|
Chris@16
|
811 p_hdr->m_cond_recv.notify_one();
|
Chris@16
|
812 }
|
Chris@101
|
813 #else
|
Chris@101
|
814 p_hdr->m_cond_recv.notify_one();
|
Chris@101
|
815 #endif
|
Chris@16
|
816
|
Chris@16
|
817 return true;
|
Chris@16
|
818 }
|
Chris@16
|
819
|
Chris@16
|
820 template<class VoidPointer>
|
Chris@16
|
821 inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size,
|
Chris@16
|
822 size_type &recvd_size, unsigned int &priority)
|
Chris@16
|
823 { this->do_receive(blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
|
Chris@16
|
824
|
Chris@16
|
825 template<class VoidPointer>
|
Chris@16
|
826 inline bool
|
Chris@16
|
827 message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size,
|
Chris@16
|
828 size_type &recvd_size, unsigned int &priority)
|
Chris@16
|
829 { return this->do_receive(non_blocking, buffer, buffer_size, recvd_size, priority, ptime()); }
|
Chris@16
|
830
|
Chris@16
|
831 template<class VoidPointer>
|
Chris@16
|
832 inline bool
|
Chris@16
|
833 message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size,
|
Chris@16
|
834 size_type &recvd_size, unsigned int &priority,
|
Chris@16
|
835 const boost::posix_time::ptime &abs_time)
|
Chris@16
|
836 {
|
Chris@16
|
837 if(abs_time == boost::posix_time::pos_infin){
|
Chris@16
|
838 this->receive(buffer, buffer_size, recvd_size, priority);
|
Chris@16
|
839 return true;
|
Chris@16
|
840 }
|
Chris@16
|
841 return this->do_receive(timed, buffer, buffer_size, recvd_size, priority, abs_time);
|
Chris@16
|
842 }
|
Chris@16
|
843
|
Chris@16
|
844 template<class VoidPointer>
|
Chris@16
|
845 inline bool
|
Chris@16
|
846 message_queue_t<VoidPointer>::do_receive(block_t block,
|
Chris@16
|
847 void *buffer, size_type buffer_size,
|
Chris@16
|
848 size_type &recvd_size, unsigned int &priority,
|
Chris@16
|
849 const boost::posix_time::ptime &abs_time)
|
Chris@16
|
850 {
|
Chris@16
|
851 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
|
Chris@16
|
852 //Check if buffer is big enough for any message
|
Chris@16
|
853 if (buffer_size < p_hdr->m_max_msg_size) {
|
Chris@16
|
854 throw interprocess_exception(size_error);
|
Chris@16
|
855 }
|
Chris@16
|
856
|
Chris@101
|
857 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@101
|
858 bool notify_blocked_senders = false;
|
Chris@101
|
859 #endif
|
Chris@16
|
860 //---------------------------------------------
|
Chris@16
|
861 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
|
Chris@16
|
862 //---------------------------------------------
|
Chris@16
|
863 {
|
Chris@16
|
864 //If there are no messages execute blocking logic
|
Chris@16
|
865 if (p_hdr->is_empty()) {
|
Chris@101
|
866 BOOST_TRY{
|
Chris@101
|
867 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@101
|
868 ++p_hdr->m_blocked_receivers;
|
Chris@101
|
869 #endif
|
Chris@101
|
870 switch(block){
|
Chris@101
|
871 case non_blocking :
|
Chris@101
|
872 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@101
|
873 --p_hdr->m_blocked_receivers;
|
Chris@101
|
874 #endif
|
Chris@101
|
875 return false;
|
Chris@101
|
876 break;
|
Chris@16
|
877
|
Chris@101
|
878 case blocking :
|
Chris@101
|
879 do{
|
Chris@101
|
880 p_hdr->m_cond_recv.wait(lock);
|
Chris@101
|
881 }
|
Chris@101
|
882 while (p_hdr->is_empty());
|
Chris@101
|
883 break;
|
Chris@16
|
884
|
Chris@101
|
885 case timed :
|
Chris@101
|
886 do{
|
Chris@101
|
887 if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
|
Chris@101
|
888 if(p_hdr->is_empty()){
|
Chris@101
|
889 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@101
|
890 --p_hdr->m_blocked_receivers;
|
Chris@101
|
891 #endif
|
Chris@101
|
892 return false;
|
Chris@101
|
893 }
|
Chris@101
|
894 break;
|
Chris@101
|
895 }
|
Chris@16
|
896 }
|
Chris@101
|
897 while (p_hdr->is_empty());
|
Chris@101
|
898 break;
|
Chris@16
|
899
|
Chris@101
|
900 //Paranoia check
|
Chris@101
|
901 default:
|
Chris@101
|
902 break;
|
Chris@101
|
903 }
|
Chris@101
|
904 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@101
|
905 --p_hdr->m_blocked_receivers;
|
Chris@101
|
906 #endif
|
Chris@16
|
907 }
|
Chris@101
|
908 BOOST_CATCH(...){
|
Chris@101
|
909 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
|
Chris@101
|
910 --p_hdr->m_blocked_receivers;
|
Chris@101
|
911 #endif
|
Chris@101
|
912 BOOST_RETHROW;
|
Chris@101
|
913 }
|
Chris@101
|
914 BOOST_CATCH_END
|
Chris@16
|
915 }
|
Chris@16
|
916
|
Chris@101
|
917 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
|
Chris@101
|
918 notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
|
Chris@101
|
919 #endif
|
Chris@101
|
920
|
Chris@16
|
921 //There is at least one message ready to pick, get the top one
|
Chris@16
|
922 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
|
Chris@16
|
923
|
Chris@16
|
924 //Get data from the message
|
Chris@16
|
925 recvd_size = top_msg.len;
|
Chris@16
|
926 priority = top_msg.priority;
|
Chris@16
|
927
|
Chris@16
|
928 //Some cleanup to ease debugging
|
Chris@16
|
929 top_msg.len = 0;
|
Chris@16
|
930 top_msg.priority = 0;
|
Chris@16
|
931
|
Chris@16
|
932 //Copy data to receiver's bufers
|
Chris@16
|
933 std::memcpy(buffer, top_msg.data(), recvd_size);
|
Chris@16
|
934
|
Chris@16
|
935 //Free top message and put it in the free message list
|
Chris@16
|
936 p_hdr->free_top_msg();
|
Chris@16
|
937 } //Lock end
|
Chris@16
|
938
|
Chris@16
|
939 //Notify outside lock to avoid contention. This might produce some
|
Chris@16
|
940 //spurious wakeups, but it's usually far better than notifying inside.
|
Chris@16
|
941 //If this reception changes the queue full state, notify senders
|
Chris@101
|
942 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
|
Chris@101
|
943 if (notify_blocked_senders){
|
Chris@16
|
944 p_hdr->m_cond_send.notify_one();
|
Chris@16
|
945 }
|
Chris@101
|
946 #else
|
Chris@101
|
947 p_hdr->m_cond_send.notify_one();
|
Chris@101
|
948 #endif
|
Chris@16
|
949
|
Chris@16
|
950 return true;
|
Chris@16
|
951 }
|
Chris@16
|
952
|
Chris@16
|
953 template<class VoidPointer>
|
Chris@16
|
954 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
|
Chris@16
|
955 {
|
Chris@16
|
956 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
|
Chris@16
|
957 return p_hdr ? p_hdr->m_max_num_msg : 0; }
|
Chris@16
|
958
|
Chris@16
|
959 template<class VoidPointer>
|
Chris@16
|
960 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
|
Chris@16
|
961 {
|
Chris@16
|
962 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
|
Chris@16
|
963 return p_hdr ? p_hdr->m_max_msg_size : 0;
|
Chris@16
|
964 }
|
Chris@16
|
965
|
Chris@16
|
966 template<class VoidPointer>
|
Chris@16
|
967 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
|
Chris@16
|
968 {
|
Chris@16
|
969 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
|
Chris@16
|
970 if(p_hdr){
|
Chris@16
|
971 //---------------------------------------------
|
Chris@16
|
972 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
|
Chris@16
|
973 //---------------------------------------------
|
Chris@16
|
974 return p_hdr->m_cur_num_msg;
|
Chris@16
|
975 }
|
Chris@16
|
976
|
Chris@16
|
977 return 0;
|
Chris@16
|
978 }
|
Chris@16
|
979
|
Chris@16
|
980 template<class VoidPointer>
|
Chris@16
|
981 inline bool message_queue_t<VoidPointer>::remove(const char *name)
|
Chris@16
|
982 { return shared_memory_object::remove(name); }
|
Chris@16
|
983
|
Chris@101
|
984 #else
|
Chris@101
|
985
|
Chris@101
|
986 //!Typedef for a default message queue
|
Chris@101
|
987 //!to be used between processes
|
Chris@101
|
988 typedef message_queue_t<offset_ptr<void> > message_queue;
|
Chris@101
|
989
|
Chris@101
|
990 #endif //#ifndef BOOST_INTERPROCESS_DOXYGEN_INVOKED
|
Chris@16
|
991
|
Chris@16
|
992 }} //namespace boost{ namespace interprocess{
|
Chris@16
|
993
|
Chris@16
|
994 #include <boost/interprocess/detail/config_end.hpp>
|
Chris@16
|
995
|
Chris@16
|
996 #endif //#ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
|