Chris@16
|
1 //
|
Chris@16
|
2 // detail/impl/kqueue_reactor.ipp
|
Chris@16
|
3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
Chris@16
|
4 //
|
Chris@16
|
5 // Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
|
Chris@16
|
6 // Copyright (c) 2005 Stefan Arentz (stefan at soze dot com)
|
Chris@16
|
7 //
|
Chris@16
|
8 // Distributed under the Boost Software License, Version 1.0. (See accompanying
|
Chris@16
|
9 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
Chris@16
|
10 //
|
Chris@16
|
11
|
Chris@16
|
12 #ifndef BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
|
Chris@16
|
13 #define BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
|
Chris@16
|
14
|
Chris@16
|
15 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
|
Chris@16
|
16 # pragma once
|
Chris@16
|
17 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
|
Chris@16
|
18
|
Chris@16
|
19 #include <boost/asio/detail/config.hpp>
|
Chris@16
|
20
|
Chris@16
|
21 #if defined(BOOST_ASIO_HAS_KQUEUE)
|
Chris@16
|
22
|
Chris@16
|
23 #include <boost/asio/detail/kqueue_reactor.hpp>
|
Chris@16
|
24 #include <boost/asio/detail/throw_error.hpp>
|
Chris@16
|
25 #include <boost/asio/error.hpp>
|
Chris@16
|
26
|
Chris@16
|
27 #include <boost/asio/detail/push_options.hpp>
|
Chris@16
|
28
|
Chris@16
|
29 #if defined(__NetBSD__)
|
Chris@16
|
30 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
|
Chris@16
|
31 EV_SET(ev, ident, filt, flags, fflags, data, \
|
Chris@16
|
32 reinterpret_cast<intptr_t>(static_cast<void*>(udata)))
|
Chris@16
|
33 #else
|
Chris@16
|
34 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
|
Chris@16
|
35 EV_SET(ev, ident, filt, flags, fflags, data, udata)
|
Chris@16
|
36 #endif
|
Chris@16
|
37
|
Chris@16
|
38 namespace boost {
|
Chris@16
|
39 namespace asio {
|
Chris@16
|
40 namespace detail {
|
Chris@16
|
41
|
Chris@16
|
42 kqueue_reactor::kqueue_reactor(boost::asio::io_service& io_service)
|
Chris@16
|
43 : boost::asio::detail::service_base<kqueue_reactor>(io_service),
|
Chris@16
|
44 io_service_(use_service<io_service_impl>(io_service)),
|
Chris@16
|
45 mutex_(),
|
Chris@16
|
46 kqueue_fd_(do_kqueue_create()),
|
Chris@16
|
47 interrupter_(),
|
Chris@16
|
48 shutdown_(false)
|
Chris@16
|
49 {
|
Chris@16
|
50 // The interrupter is put into a permanently readable state. Whenever we want
|
Chris@16
|
51 // to interrupt the blocked kevent call we register a read operation against
|
Chris@16
|
52 // the descriptor.
|
Chris@16
|
53 interrupter_.interrupt();
|
Chris@16
|
54 }
|
Chris@16
|
55
|
Chris@16
|
56 kqueue_reactor::~kqueue_reactor()
|
Chris@16
|
57 {
|
Chris@16
|
58 close(kqueue_fd_);
|
Chris@16
|
59 }
|
Chris@16
|
60
|
Chris@16
|
61 void kqueue_reactor::shutdown_service()
|
Chris@16
|
62 {
|
Chris@16
|
63 mutex::scoped_lock lock(mutex_);
|
Chris@16
|
64 shutdown_ = true;
|
Chris@16
|
65 lock.unlock();
|
Chris@16
|
66
|
Chris@16
|
67 op_queue<operation> ops;
|
Chris@16
|
68
|
Chris@16
|
69 while (descriptor_state* state = registered_descriptors_.first())
|
Chris@16
|
70 {
|
Chris@16
|
71 for (int i = 0; i < max_ops; ++i)
|
Chris@16
|
72 ops.push(state->op_queue_[i]);
|
Chris@16
|
73 state->shutdown_ = true;
|
Chris@16
|
74 registered_descriptors_.free(state);
|
Chris@16
|
75 }
|
Chris@16
|
76
|
Chris@16
|
77 timer_queues_.get_all_timers(ops);
|
Chris@16
|
78
|
Chris@16
|
79 io_service_.abandon_operations(ops);
|
Chris@16
|
80 }
|
Chris@16
|
81
|
Chris@16
|
82 void kqueue_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)
|
Chris@16
|
83 {
|
Chris@16
|
84 if (fork_ev == boost::asio::io_service::fork_child)
|
Chris@16
|
85 {
|
Chris@16
|
86 // The kqueue descriptor is automatically closed in the child.
|
Chris@16
|
87 kqueue_fd_ = -1;
|
Chris@16
|
88 kqueue_fd_ = do_kqueue_create();
|
Chris@16
|
89
|
Chris@16
|
90 interrupter_.recreate();
|
Chris@16
|
91
|
Chris@16
|
92 // Re-register all descriptors with kqueue.
|
Chris@16
|
93 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
|
Chris@16
|
94 for (descriptor_state* state = registered_descriptors_.first();
|
Chris@16
|
95 state != 0; state = state->next_)
|
Chris@16
|
96 {
|
Chris@16
|
97 struct kevent events[2];
|
Chris@16
|
98 int num_events = 0;
|
Chris@16
|
99
|
Chris@16
|
100 if (!state->op_queue_[read_op].empty())
|
Chris@16
|
101 BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_,
|
Chris@16
|
102 EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
|
Chris@16
|
103 else if (!state->op_queue_[except_op].empty())
|
Chris@16
|
104 BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_,
|
Chris@16
|
105 EVFILT_READ, EV_ADD | EV_CLEAR, EV_OOBAND, 0, state);
|
Chris@16
|
106
|
Chris@16
|
107 if (!state->op_queue_[write_op].empty())
|
Chris@16
|
108 BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_,
|
Chris@16
|
109 EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
|
Chris@16
|
110
|
Chris@16
|
111 if (num_events && ::kevent(kqueue_fd_, events, num_events, 0, 0, 0) == -1)
|
Chris@16
|
112 {
|
Chris@16
|
113 boost::system::error_code error(errno,
|
Chris@16
|
114 boost::asio::error::get_system_category());
|
Chris@16
|
115 boost::asio::detail::throw_error(error);
|
Chris@16
|
116 }
|
Chris@16
|
117 }
|
Chris@16
|
118 }
|
Chris@16
|
119 }
|
Chris@16
|
120
|
Chris@16
|
121 void kqueue_reactor::init_task()
|
Chris@16
|
122 {
|
Chris@16
|
123 io_service_.init_task();
|
Chris@16
|
124 }
|
Chris@16
|
125
|
Chris@16
|
126 int kqueue_reactor::register_descriptor(socket_type descriptor,
|
Chris@16
|
127 kqueue_reactor::per_descriptor_data& descriptor_data)
|
Chris@16
|
128 {
|
Chris@16
|
129 descriptor_data = allocate_descriptor_state();
|
Chris@16
|
130
|
Chris@16
|
131 mutex::scoped_lock lock(descriptor_data->mutex_);
|
Chris@16
|
132
|
Chris@16
|
133 descriptor_data->descriptor_ = descriptor;
|
Chris@16
|
134 descriptor_data->shutdown_ = false;
|
Chris@16
|
135
|
Chris@16
|
136 return 0;
|
Chris@16
|
137 }
|
Chris@16
|
138
|
Chris@16
|
139 int kqueue_reactor::register_internal_descriptor(
|
Chris@16
|
140 int op_type, socket_type descriptor,
|
Chris@16
|
141 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
|
Chris@16
|
142 {
|
Chris@16
|
143 descriptor_data = allocate_descriptor_state();
|
Chris@16
|
144
|
Chris@16
|
145 mutex::scoped_lock lock(descriptor_data->mutex_);
|
Chris@16
|
146
|
Chris@16
|
147 descriptor_data->descriptor_ = descriptor;
|
Chris@16
|
148 descriptor_data->shutdown_ = false;
|
Chris@16
|
149 descriptor_data->op_queue_[op_type].push(op);
|
Chris@16
|
150
|
Chris@16
|
151 struct kevent event;
|
Chris@16
|
152 switch (op_type)
|
Chris@16
|
153 {
|
Chris@16
|
154 case read_op:
|
Chris@16
|
155 BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
|
Chris@16
|
156 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
|
Chris@16
|
157 break;
|
Chris@16
|
158 case write_op:
|
Chris@16
|
159 BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE,
|
Chris@16
|
160 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
|
Chris@16
|
161 break;
|
Chris@16
|
162 case except_op:
|
Chris@16
|
163 BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
|
Chris@16
|
164 EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data);
|
Chris@16
|
165 break;
|
Chris@16
|
166 }
|
Chris@16
|
167 ::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
|
Chris@16
|
168
|
Chris@16
|
169 return 0;
|
Chris@16
|
170 }
|
Chris@16
|
171
|
Chris@16
|
172 void kqueue_reactor::move_descriptor(socket_type,
|
Chris@16
|
173 kqueue_reactor::per_descriptor_data& target_descriptor_data,
|
Chris@16
|
174 kqueue_reactor::per_descriptor_data& source_descriptor_data)
|
Chris@16
|
175 {
|
Chris@16
|
176 target_descriptor_data = source_descriptor_data;
|
Chris@16
|
177 source_descriptor_data = 0;
|
Chris@16
|
178 }
|
Chris@16
|
179
|
Chris@16
|
180 void kqueue_reactor::start_op(int op_type, socket_type descriptor,
|
Chris@16
|
181 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
|
Chris@16
|
182 bool is_continuation, bool allow_speculative)
|
Chris@16
|
183 {
|
Chris@16
|
184 if (!descriptor_data)
|
Chris@16
|
185 {
|
Chris@16
|
186 op->ec_ = boost::asio::error::bad_descriptor;
|
Chris@16
|
187 post_immediate_completion(op, is_continuation);
|
Chris@16
|
188 return;
|
Chris@16
|
189 }
|
Chris@16
|
190
|
Chris@16
|
191 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
|
Chris@16
|
192
|
Chris@16
|
193 if (descriptor_data->shutdown_)
|
Chris@16
|
194 {
|
Chris@16
|
195 post_immediate_completion(op, is_continuation);
|
Chris@16
|
196 return;
|
Chris@16
|
197 }
|
Chris@16
|
198
|
Chris@16
|
199 bool first = descriptor_data->op_queue_[op_type].empty();
|
Chris@16
|
200 if (first)
|
Chris@16
|
201 {
|
Chris@16
|
202 if (allow_speculative)
|
Chris@16
|
203 {
|
Chris@16
|
204 if (op_type != read_op || descriptor_data->op_queue_[except_op].empty())
|
Chris@16
|
205 {
|
Chris@16
|
206 if (op->perform())
|
Chris@16
|
207 {
|
Chris@16
|
208 descriptor_lock.unlock();
|
Chris@16
|
209 io_service_.post_immediate_completion(op, is_continuation);
|
Chris@16
|
210 return;
|
Chris@16
|
211 }
|
Chris@16
|
212 }
|
Chris@16
|
213 }
|
Chris@16
|
214 }
|
Chris@16
|
215
|
Chris@16
|
216 descriptor_data->op_queue_[op_type].push(op);
|
Chris@16
|
217 io_service_.work_started();
|
Chris@16
|
218
|
Chris@16
|
219 if (first)
|
Chris@16
|
220 {
|
Chris@16
|
221 struct kevent event;
|
Chris@16
|
222 switch (op_type)
|
Chris@16
|
223 {
|
Chris@16
|
224 case read_op:
|
Chris@16
|
225 BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
|
Chris@16
|
226 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
|
Chris@16
|
227 break;
|
Chris@16
|
228 case write_op:
|
Chris@16
|
229 BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE,
|
Chris@16
|
230 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
|
Chris@16
|
231 break;
|
Chris@16
|
232 case except_op:
|
Chris@16
|
233 if (!descriptor_data->op_queue_[read_op].empty())
|
Chris@16
|
234 return; // Already registered for read events.
|
Chris@16
|
235 BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
|
Chris@16
|
236 EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data);
|
Chris@16
|
237 break;
|
Chris@16
|
238 }
|
Chris@16
|
239
|
Chris@16
|
240 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
|
Chris@16
|
241 {
|
Chris@16
|
242 op->ec_ = boost::system::error_code(errno,
|
Chris@16
|
243 boost::asio::error::get_system_category());
|
Chris@16
|
244 descriptor_data->op_queue_[op_type].pop();
|
Chris@16
|
245 io_service_.post_deferred_completion(op);
|
Chris@16
|
246 }
|
Chris@16
|
247 }
|
Chris@16
|
248 }
|
Chris@16
|
249
|
Chris@16
|
250 void kqueue_reactor::cancel_ops(socket_type,
|
Chris@16
|
251 kqueue_reactor::per_descriptor_data& descriptor_data)
|
Chris@16
|
252 {
|
Chris@16
|
253 if (!descriptor_data)
|
Chris@16
|
254 return;
|
Chris@16
|
255
|
Chris@16
|
256 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
|
Chris@16
|
257
|
Chris@16
|
258 op_queue<operation> ops;
|
Chris@16
|
259 for (int i = 0; i < max_ops; ++i)
|
Chris@16
|
260 {
|
Chris@16
|
261 while (reactor_op* op = descriptor_data->op_queue_[i].front())
|
Chris@16
|
262 {
|
Chris@16
|
263 op->ec_ = boost::asio::error::operation_aborted;
|
Chris@16
|
264 descriptor_data->op_queue_[i].pop();
|
Chris@16
|
265 ops.push(op);
|
Chris@16
|
266 }
|
Chris@16
|
267 }
|
Chris@16
|
268
|
Chris@16
|
269 descriptor_lock.unlock();
|
Chris@16
|
270
|
Chris@16
|
271 io_service_.post_deferred_completions(ops);
|
Chris@16
|
272 }
|
Chris@16
|
273
|
Chris@16
|
274 void kqueue_reactor::deregister_descriptor(socket_type descriptor,
|
Chris@16
|
275 kqueue_reactor::per_descriptor_data& descriptor_data, bool closing)
|
Chris@16
|
276 {
|
Chris@16
|
277 if (!descriptor_data)
|
Chris@16
|
278 return;
|
Chris@16
|
279
|
Chris@16
|
280 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
|
Chris@16
|
281
|
Chris@16
|
282 if (!descriptor_data->shutdown_)
|
Chris@16
|
283 {
|
Chris@16
|
284 if (closing)
|
Chris@16
|
285 {
|
Chris@16
|
286 // The descriptor will be automatically removed from the kqueue when it
|
Chris@16
|
287 // is closed.
|
Chris@16
|
288 }
|
Chris@16
|
289 else
|
Chris@16
|
290 {
|
Chris@16
|
291 struct kevent events[2];
|
Chris@16
|
292 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
|
Chris@16
|
293 EVFILT_READ, EV_DELETE, 0, 0, 0);
|
Chris@16
|
294 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
|
Chris@16
|
295 EVFILT_WRITE, EV_DELETE, 0, 0, 0);
|
Chris@16
|
296 ::kevent(kqueue_fd_, events, 2, 0, 0, 0);
|
Chris@16
|
297 }
|
Chris@16
|
298
|
Chris@16
|
299 op_queue<operation> ops;
|
Chris@16
|
300 for (int i = 0; i < max_ops; ++i)
|
Chris@16
|
301 {
|
Chris@16
|
302 while (reactor_op* op = descriptor_data->op_queue_[i].front())
|
Chris@16
|
303 {
|
Chris@16
|
304 op->ec_ = boost::asio::error::operation_aborted;
|
Chris@16
|
305 descriptor_data->op_queue_[i].pop();
|
Chris@16
|
306 ops.push(op);
|
Chris@16
|
307 }
|
Chris@16
|
308 }
|
Chris@16
|
309
|
Chris@16
|
310 descriptor_data->descriptor_ = -1;
|
Chris@16
|
311 descriptor_data->shutdown_ = true;
|
Chris@16
|
312
|
Chris@16
|
313 descriptor_lock.unlock();
|
Chris@16
|
314
|
Chris@16
|
315 free_descriptor_state(descriptor_data);
|
Chris@16
|
316 descriptor_data = 0;
|
Chris@16
|
317
|
Chris@16
|
318 io_service_.post_deferred_completions(ops);
|
Chris@16
|
319 }
|
Chris@16
|
320 }
|
Chris@16
|
321
|
Chris@16
|
322 void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
|
Chris@16
|
323 kqueue_reactor::per_descriptor_data& descriptor_data)
|
Chris@16
|
324 {
|
Chris@16
|
325 if (!descriptor_data)
|
Chris@16
|
326 return;
|
Chris@16
|
327
|
Chris@16
|
328 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
|
Chris@16
|
329
|
Chris@16
|
330 if (!descriptor_data->shutdown_)
|
Chris@16
|
331 {
|
Chris@16
|
332 struct kevent events[2];
|
Chris@16
|
333 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
|
Chris@16
|
334 EVFILT_READ, EV_DELETE, 0, 0, 0);
|
Chris@16
|
335 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
|
Chris@16
|
336 EVFILT_WRITE, EV_DELETE, 0, 0, 0);
|
Chris@16
|
337 ::kevent(kqueue_fd_, events, 2, 0, 0, 0);
|
Chris@16
|
338
|
Chris@16
|
339 op_queue<operation> ops;
|
Chris@16
|
340 for (int i = 0; i < max_ops; ++i)
|
Chris@16
|
341 ops.push(descriptor_data->op_queue_[i]);
|
Chris@16
|
342
|
Chris@16
|
343 descriptor_data->descriptor_ = -1;
|
Chris@16
|
344 descriptor_data->shutdown_ = true;
|
Chris@16
|
345
|
Chris@16
|
346 descriptor_lock.unlock();
|
Chris@16
|
347
|
Chris@16
|
348 free_descriptor_state(descriptor_data);
|
Chris@16
|
349 descriptor_data = 0;
|
Chris@16
|
350 }
|
Chris@16
|
351 }
|
Chris@16
|
352
|
Chris@16
|
353 void kqueue_reactor::run(bool block, op_queue<operation>& ops)
|
Chris@16
|
354 {
|
Chris@16
|
355 mutex::scoped_lock lock(mutex_);
|
Chris@16
|
356
|
Chris@16
|
357 // Determine how long to block while waiting for events.
|
Chris@16
|
358 timespec timeout_buf = { 0, 0 };
|
Chris@16
|
359 timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
|
Chris@16
|
360
|
Chris@16
|
361 lock.unlock();
|
Chris@16
|
362
|
Chris@16
|
363 // Block on the kqueue descriptor.
|
Chris@16
|
364 struct kevent events[128];
|
Chris@16
|
365 int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
|
Chris@16
|
366
|
Chris@16
|
367 // Dispatch the waiting events.
|
Chris@16
|
368 for (int i = 0; i < num_events; ++i)
|
Chris@16
|
369 {
|
Chris@16
|
370 int descriptor = static_cast<int>(events[i].ident);
|
Chris@16
|
371 void* ptr = reinterpret_cast<void*>(events[i].udata);
|
Chris@16
|
372 if (ptr == &interrupter_)
|
Chris@16
|
373 {
|
Chris@16
|
374 // No need to reset the interrupter since we're leaving the descriptor
|
Chris@16
|
375 // in a ready-to-read state and relying on edge-triggered notifications.
|
Chris@16
|
376 }
|
Chris@16
|
377 else
|
Chris@16
|
378 {
|
Chris@16
|
379 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
|
Chris@16
|
380 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
|
Chris@16
|
381
|
Chris@16
|
382 // Exception operations must be processed first to ensure that any
|
Chris@16
|
383 // out-of-band data is read before normal data.
|
Chris@16
|
384 #if defined(__NetBSD__)
|
Chris@16
|
385 static const unsigned int filter[max_ops] =
|
Chris@16
|
386 #else
|
Chris@16
|
387 static const int filter[max_ops] =
|
Chris@16
|
388 #endif
|
Chris@16
|
389 { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
|
Chris@16
|
390 for (int j = max_ops - 1; j >= 0; --j)
|
Chris@16
|
391 {
|
Chris@16
|
392 if (events[i].filter == filter[j])
|
Chris@16
|
393 {
|
Chris@16
|
394 if (j != except_op || events[i].flags & EV_OOBAND)
|
Chris@16
|
395 {
|
Chris@16
|
396 while (reactor_op* op = descriptor_data->op_queue_[j].front())
|
Chris@16
|
397 {
|
Chris@16
|
398 if (events[i].flags & EV_ERROR)
|
Chris@16
|
399 {
|
Chris@16
|
400 op->ec_ = boost::system::error_code(
|
Chris@16
|
401 static_cast<int>(events[i].data),
|
Chris@16
|
402 boost::asio::error::get_system_category());
|
Chris@16
|
403 descriptor_data->op_queue_[j].pop();
|
Chris@16
|
404 ops.push(op);
|
Chris@16
|
405 }
|
Chris@16
|
406 if (op->perform())
|
Chris@16
|
407 {
|
Chris@16
|
408 descriptor_data->op_queue_[j].pop();
|
Chris@16
|
409 ops.push(op);
|
Chris@16
|
410 }
|
Chris@16
|
411 else
|
Chris@16
|
412 break;
|
Chris@16
|
413 }
|
Chris@16
|
414 }
|
Chris@16
|
415 }
|
Chris@16
|
416 }
|
Chris@16
|
417
|
Chris@16
|
418 // Renew registration for event notifications.
|
Chris@16
|
419 struct kevent event;
|
Chris@16
|
420 switch (events[i].filter)
|
Chris@16
|
421 {
|
Chris@16
|
422 case EVFILT_READ:
|
Chris@16
|
423 if (!descriptor_data->op_queue_[read_op].empty())
|
Chris@16
|
424 BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
|
Chris@16
|
425 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
|
Chris@16
|
426 else if (!descriptor_data->op_queue_[except_op].empty())
|
Chris@16
|
427 BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
|
Chris@16
|
428 EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data);
|
Chris@16
|
429 else
|
Chris@16
|
430 continue;
|
Chris@16
|
431 break;
|
Chris@16
|
432 case EVFILT_WRITE:
|
Chris@16
|
433 if (!descriptor_data->op_queue_[write_op].empty())
|
Chris@16
|
434 BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE,
|
Chris@16
|
435 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
|
Chris@16
|
436 else
|
Chris@16
|
437 continue;
|
Chris@16
|
438 break;
|
Chris@16
|
439 default:
|
Chris@16
|
440 break;
|
Chris@16
|
441 }
|
Chris@16
|
442 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
|
Chris@16
|
443 {
|
Chris@16
|
444 boost::system::error_code error(errno,
|
Chris@16
|
445 boost::asio::error::get_system_category());
|
Chris@16
|
446 for (int j = 0; j < max_ops; ++j)
|
Chris@16
|
447 {
|
Chris@16
|
448 while (reactor_op* op = descriptor_data->op_queue_[j].front())
|
Chris@16
|
449 {
|
Chris@16
|
450 op->ec_ = error;
|
Chris@16
|
451 descriptor_data->op_queue_[j].pop();
|
Chris@16
|
452 ops.push(op);
|
Chris@16
|
453 }
|
Chris@16
|
454 }
|
Chris@16
|
455 }
|
Chris@16
|
456 }
|
Chris@16
|
457 }
|
Chris@16
|
458
|
Chris@16
|
459 lock.lock();
|
Chris@16
|
460 timer_queues_.get_ready_timers(ops);
|
Chris@16
|
461 }
|
Chris@16
|
462
|
Chris@16
|
463 void kqueue_reactor::interrupt()
|
Chris@16
|
464 {
|
Chris@16
|
465 struct kevent event;
|
Chris@16
|
466 BOOST_ASIO_KQUEUE_EV_SET(&event, interrupter_.read_descriptor(),
|
Chris@16
|
467 EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &interrupter_);
|
Chris@16
|
468 ::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
|
Chris@16
|
469 }
|
Chris@16
|
470
|
Chris@16
|
471 int kqueue_reactor::do_kqueue_create()
|
Chris@16
|
472 {
|
Chris@16
|
473 int fd = ::kqueue();
|
Chris@16
|
474 if (fd == -1)
|
Chris@16
|
475 {
|
Chris@16
|
476 boost::system::error_code ec(errno,
|
Chris@16
|
477 boost::asio::error::get_system_category());
|
Chris@16
|
478 boost::asio::detail::throw_error(ec, "kqueue");
|
Chris@16
|
479 }
|
Chris@16
|
480 return fd;
|
Chris@16
|
481 }
|
Chris@16
|
482
|
Chris@16
|
483 kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state()
|
Chris@16
|
484 {
|
Chris@16
|
485 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
|
Chris@16
|
486 return registered_descriptors_.alloc();
|
Chris@16
|
487 }
|
Chris@16
|
488
|
Chris@16
|
489 void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s)
|
Chris@16
|
490 {
|
Chris@16
|
491 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
|
Chris@16
|
492 registered_descriptors_.free(s);
|
Chris@16
|
493 }
|
Chris@16
|
494
|
Chris@16
|
495 void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue)
|
Chris@16
|
496 {
|
Chris@16
|
497 mutex::scoped_lock lock(mutex_);
|
Chris@16
|
498 timer_queues_.insert(&queue);
|
Chris@16
|
499 }
|
Chris@16
|
500
|
Chris@16
|
501 void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue)
|
Chris@16
|
502 {
|
Chris@16
|
503 mutex::scoped_lock lock(mutex_);
|
Chris@16
|
504 timer_queues_.erase(&queue);
|
Chris@16
|
505 }
|
Chris@16
|
506
|
Chris@16
|
507 timespec* kqueue_reactor::get_timeout(timespec& ts)
|
Chris@16
|
508 {
|
Chris@16
|
509 // By default we will wait no longer than 5 minutes. This will ensure that
|
Chris@16
|
510 // any changes to the system clock are detected after no longer than this.
|
Chris@16
|
511 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
|
Chris@16
|
512 ts.tv_sec = usec / 1000000;
|
Chris@16
|
513 ts.tv_nsec = (usec % 1000000) * 1000;
|
Chris@16
|
514 return &ts;
|
Chris@16
|
515 }
|
Chris@16
|
516
|
Chris@16
|
517 } // namespace detail
|
Chris@16
|
518 } // namespace asio
|
Chris@16
|
519 } // namespace boost
|
Chris@16
|
520
|
Chris@16
|
521 #undef BOOST_ASIO_KQUEUE_EV_SET
|
Chris@16
|
522
|
Chris@16
|
523 #include <boost/asio/detail/pop_options.hpp>
|
Chris@16
|
524
|
Chris@16
|
525 #endif // defined(BOOST_ASIO_HAS_KQUEUE)
|
Chris@16
|
526
|
Chris@16
|
527 #endif // BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
|