rob@76
|
1 /*
|
rob@76
|
2 oscpack -- Open Sound Control (OSC) packet manipulation library
|
rob@76
|
3 http://www.rossbencina.com/code/oscpack
|
rob@76
|
4
|
rob@76
|
5 Copyright (c) 2004-2013 Ross Bencina <rossb@audiomulch.com>
|
rob@76
|
6
|
rob@76
|
7 Permission is hereby granted, free of charge, to any person obtaining
|
rob@76
|
8 a copy of this software and associated documentation files
|
rob@76
|
9 (the "Software"), to deal in the Software without restriction,
|
rob@76
|
10 including without limitation the rights to use, copy, modify, merge,
|
rob@76
|
11 publish, distribute, sublicense, and/or sell copies of the Software,
|
rob@76
|
12 and to permit persons to whom the Software is furnished to do so,
|
rob@76
|
13 subject to the following conditions:
|
rob@76
|
14
|
rob@76
|
15 The above copyright notice and this permission notice shall be
|
rob@76
|
16 included in all copies or substantial portions of the Software.
|
rob@76
|
17
|
rob@76
|
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
rob@76
|
19 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
rob@76
|
20 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
rob@76
|
21 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
|
rob@76
|
22 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
|
rob@76
|
23 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
rob@76
|
24 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
rob@76
|
25 */
|
rob@76
|
26
|
rob@76
|
27 /*
|
rob@76
|
28 The text above constitutes the entire oscpack license; however,
|
rob@76
|
29 the oscpack developer(s) also make the following non-binding requests:
|
rob@76
|
30
|
rob@76
|
31 Any person wishing to distribute modifications to the Software is
|
rob@76
|
32 requested to send the modifications to the original developer so that
|
rob@76
|
33 they can be incorporated into the canonical version. It is also
|
rob@76
|
34 requested that these non-binding requests be included whenever the
|
rob@76
|
35 above license is reproduced.
|
rob@76
|
36 */
|
rob@76
|
37 #include "ip/UdpSocket.h"
|
rob@76
|
38
|
rob@76
|
39 #include <pthread.h>
|
rob@76
|
40 #include <unistd.h>
|
rob@76
|
41 #include <stdlib.h>
|
rob@76
|
42 #include <stdio.h>
|
rob@76
|
43 #include <netdb.h>
|
rob@76
|
44 #include <sys/types.h>
|
rob@76
|
45 #include <sys/socket.h>
|
rob@76
|
46 #include <sys/time.h>
|
rob@76
|
47 #include <netinet/in.h> // for sockaddr_in
|
rob@76
|
48
|
rob@76
|
49 #include <signal.h>
|
rob@76
|
50 #include <math.h>
|
rob@76
|
51 #include <errno.h>
|
rob@76
|
52 #include <string.h>
|
rob@76
|
53
|
rob@76
|
54 #include <algorithm>
|
rob@76
|
55 #include <cassert>
|
rob@76
|
56 #include <cstring> // for memset
|
rob@76
|
57 #include <stdexcept>
|
rob@76
|
58 #include <vector>
|
rob@76
|
59
|
rob@76
|
60 #include "ip/PacketListener.h"
|
rob@76
|
61 #include "ip/TimerListener.h"
|
rob@76
|
62
|
rob@76
|
63
|
rob@76
|
64 #if defined(__APPLE__) && !defined(_SOCKLEN_T)
|
rob@76
|
65 // pre system 10.3 didn't have socklen_t
|
rob@76
|
66 typedef ssize_t socklen_t;
|
rob@76
|
67 #endif
|
rob@76
|
68
|
rob@76
|
69
|
rob@76
|
70 static void SockaddrFromIpEndpointName( struct sockaddr_in& sockAddr, const IpEndpointName& endpoint )
|
rob@76
|
71 {
|
rob@76
|
72 std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
|
rob@76
|
73 sockAddr.sin_family = AF_INET;
|
rob@76
|
74
|
rob@76
|
75 sockAddr.sin_addr.s_addr =
|
rob@76
|
76 (endpoint.address == IpEndpointName::ANY_ADDRESS)
|
rob@76
|
77 ? INADDR_ANY
|
rob@76
|
78 : htonl( endpoint.address );
|
rob@76
|
79
|
rob@76
|
80 sockAddr.sin_port =
|
rob@76
|
81 (endpoint.port == IpEndpointName::ANY_PORT)
|
rob@76
|
82 ? 0
|
rob@76
|
83 : htons( endpoint.port );
|
rob@76
|
84 }
|
rob@76
|
85
|
rob@76
|
86
|
rob@76
|
87 static IpEndpointName IpEndpointNameFromSockaddr( const struct sockaddr_in& sockAddr )
|
rob@76
|
88 {
|
rob@76
|
89 return IpEndpointName(
|
rob@76
|
90 (sockAddr.sin_addr.s_addr == INADDR_ANY)
|
rob@76
|
91 ? IpEndpointName::ANY_ADDRESS
|
rob@76
|
92 : ntohl( sockAddr.sin_addr.s_addr ),
|
rob@76
|
93 (sockAddr.sin_port == 0)
|
rob@76
|
94 ? IpEndpointName::ANY_PORT
|
rob@76
|
95 : ntohs( sockAddr.sin_port )
|
rob@76
|
96 );
|
rob@76
|
97 }
|
rob@76
|
98
|
rob@76
|
99
|
rob@76
|
100 class UdpSocket::Implementation{
|
rob@76
|
101 bool isBound_;
|
rob@76
|
102 bool isConnected_;
|
rob@76
|
103
|
rob@76
|
104 int socket_;
|
rob@76
|
105 struct sockaddr_in connectedAddr_;
|
rob@76
|
106 struct sockaddr_in sendToAddr_;
|
rob@76
|
107
|
rob@76
|
108 public:
|
rob@76
|
109
|
rob@76
|
110 Implementation()
|
rob@76
|
111 : isBound_( false )
|
rob@76
|
112 , isConnected_( false )
|
rob@76
|
113 , socket_( -1 )
|
rob@76
|
114 {
|
rob@76
|
115 if( (socket_ = socket( AF_INET, SOCK_DGRAM, 0 )) == -1 ){
|
rob@76
|
116 throw std::runtime_error("unable to create udp socket\n");
|
rob@76
|
117 }
|
rob@76
|
118
|
rob@76
|
119 std::memset( &sendToAddr_, 0, sizeof(sendToAddr_) );
|
rob@76
|
120 sendToAddr_.sin_family = AF_INET;
|
rob@76
|
121 }
|
rob@76
|
122
|
rob@76
|
123 ~Implementation()
|
rob@76
|
124 {
|
rob@76
|
125 if (socket_ != -1) close(socket_);
|
rob@76
|
126 }
|
rob@76
|
127
|
rob@76
|
128 void SetEnableBroadcast( bool enableBroadcast )
|
rob@76
|
129 {
|
rob@76
|
130 int broadcast = (enableBroadcast) ? 1 : 0; // int on posix
|
rob@76
|
131 setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &broadcast, sizeof(broadcast));
|
rob@76
|
132 }
|
rob@76
|
133
|
rob@76
|
134 void SetAllowReuse( bool allowReuse )
|
rob@76
|
135 {
|
rob@76
|
136 int reuseAddr = (allowReuse) ? 1 : 0; // int on posix
|
rob@76
|
137 setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr));
|
rob@76
|
138
|
rob@76
|
139 #ifdef __APPLE__
|
rob@76
|
140 // needed also for OS X - enable multiple listeners for a single port on same network interface
|
rob@76
|
141 int reusePort = (allowReuse) ? 1 : 0; // int on posix
|
rob@76
|
142 setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &reusePort, sizeof(reusePort));
|
rob@76
|
143 #endif
|
rob@76
|
144 }
|
rob@76
|
145
|
rob@76
|
146 IpEndpointName LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
|
rob@76
|
147 {
|
rob@76
|
148 assert( isBound_ );
|
rob@76
|
149
|
rob@76
|
150 // first connect the socket to the remote server
|
rob@76
|
151
|
rob@76
|
152 struct sockaddr_in connectSockAddr;
|
rob@76
|
153 SockaddrFromIpEndpointName( connectSockAddr, remoteEndpoint );
|
rob@76
|
154
|
rob@76
|
155 if (connect(socket_, (struct sockaddr *)&connectSockAddr, sizeof(connectSockAddr)) < 0) {
|
rob@76
|
156 throw std::runtime_error("unable to connect udp socket\n");
|
rob@76
|
157 }
|
rob@76
|
158
|
rob@76
|
159 // get the address
|
rob@76
|
160
|
rob@76
|
161 struct sockaddr_in sockAddr;
|
rob@76
|
162 std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
|
rob@76
|
163 socklen_t length = sizeof(sockAddr);
|
rob@76
|
164 if (getsockname(socket_, (struct sockaddr *)&sockAddr, &length) < 0) {
|
rob@76
|
165 throw std::runtime_error("unable to getsockname\n");
|
rob@76
|
166 }
|
rob@76
|
167
|
rob@76
|
168 if( isConnected_ ){
|
rob@76
|
169 // reconnect to the connected address
|
rob@76
|
170
|
rob@76
|
171 if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
|
rob@76
|
172 throw std::runtime_error("unable to connect udp socket\n");
|
rob@76
|
173 }
|
rob@76
|
174
|
rob@76
|
175 }else{
|
rob@76
|
176 // unconnect from the remote address
|
rob@76
|
177
|
rob@76
|
178 struct sockaddr_in unconnectSockAddr;
|
rob@76
|
179 std::memset( (char *)&unconnectSockAddr, 0, sizeof(unconnectSockAddr ) );
|
rob@76
|
180 unconnectSockAddr.sin_family = AF_UNSPEC;
|
rob@76
|
181 // address fields are zero
|
rob@76
|
182 int connectResult = connect(socket_, (struct sockaddr *)&unconnectSockAddr, sizeof(unconnectSockAddr));
|
rob@76
|
183 if ( connectResult < 0 && errno != EAFNOSUPPORT ) {
|
rob@76
|
184 throw std::runtime_error("unable to un-connect udp socket\n");
|
rob@76
|
185 }
|
rob@76
|
186 }
|
rob@76
|
187
|
rob@76
|
188 return IpEndpointNameFromSockaddr( sockAddr );
|
rob@76
|
189 }
|
rob@76
|
190
|
rob@76
|
191 void Connect( const IpEndpointName& remoteEndpoint )
|
rob@76
|
192 {
|
rob@76
|
193 SockaddrFromIpEndpointName( connectedAddr_, remoteEndpoint );
|
rob@76
|
194
|
rob@76
|
195 if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
|
rob@76
|
196 throw std::runtime_error("unable to connect udp socket\n");
|
rob@76
|
197 }
|
rob@76
|
198
|
rob@76
|
199 isConnected_ = true;
|
rob@76
|
200 }
|
rob@76
|
201
|
rob@76
|
202 void Send( const char *data, std::size_t size )
|
rob@76
|
203 {
|
rob@76
|
204 assert( isConnected_ );
|
rob@76
|
205
|
rob@76
|
206 send( socket_, data, size, 0 );
|
rob@76
|
207 }
|
rob@76
|
208
|
rob@76
|
209 void SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size )
|
rob@76
|
210 {
|
rob@76
|
211 sendToAddr_.sin_addr.s_addr = htonl( remoteEndpoint.address );
|
rob@76
|
212 sendToAddr_.sin_port = htons( remoteEndpoint.port );
|
rob@76
|
213
|
rob@76
|
214 sendto( socket_, data, size, 0, (sockaddr*)&sendToAddr_, sizeof(sendToAddr_) );
|
rob@76
|
215 }
|
rob@76
|
216
|
rob@76
|
217 void Bind( const IpEndpointName& localEndpoint )
|
rob@76
|
218 {
|
rob@76
|
219 struct sockaddr_in bindSockAddr;
|
rob@76
|
220 SockaddrFromIpEndpointName( bindSockAddr, localEndpoint );
|
rob@76
|
221
|
rob@76
|
222 if (bind(socket_, (struct sockaddr *)&bindSockAddr, sizeof(bindSockAddr)) < 0) {
|
rob@76
|
223 throw std::runtime_error("unable to bind udp socket\n");
|
rob@76
|
224 }
|
rob@76
|
225
|
rob@76
|
226 isBound_ = true;
|
rob@76
|
227 }
|
rob@76
|
228
|
rob@76
|
229 bool IsBound() const { return isBound_; }
|
rob@76
|
230
|
rob@76
|
231 std::size_t ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size )
|
rob@76
|
232 {
|
rob@76
|
233 assert( isBound_ );
|
rob@76
|
234
|
rob@76
|
235 struct sockaddr_in fromAddr;
|
rob@76
|
236 socklen_t fromAddrLen = sizeof(fromAddr);
|
rob@76
|
237
|
rob@76
|
238 ssize_t result = recvfrom(socket_, data, size, 0,
|
rob@76
|
239 (struct sockaddr *) &fromAddr, (socklen_t*)&fromAddrLen);
|
rob@76
|
240 if( result < 0 )
|
rob@76
|
241 return 0;
|
rob@76
|
242
|
rob@76
|
243 remoteEndpoint.address = ntohl(fromAddr.sin_addr.s_addr);
|
rob@76
|
244 remoteEndpoint.port = ntohs(fromAddr.sin_port);
|
rob@76
|
245
|
rob@76
|
246 return (std::size_t)result;
|
rob@76
|
247 }
|
rob@76
|
248
|
rob@76
|
249 int Socket() { return socket_; }
|
rob@76
|
250 };
|
rob@76
|
251
|
rob@76
|
252 UdpSocket::UdpSocket()
|
rob@76
|
253 {
|
rob@76
|
254 impl_ = new Implementation();
|
rob@76
|
255 }
|
rob@76
|
256
|
rob@76
|
257 UdpSocket::~UdpSocket()
|
rob@76
|
258 {
|
rob@76
|
259 delete impl_;
|
rob@76
|
260 }
|
rob@76
|
261
|
rob@76
|
262 void UdpSocket::SetEnableBroadcast( bool enableBroadcast )
|
rob@76
|
263 {
|
rob@76
|
264 impl_->SetEnableBroadcast( enableBroadcast );
|
rob@76
|
265 }
|
rob@76
|
266
|
rob@76
|
267 void UdpSocket::SetAllowReuse( bool allowReuse )
|
rob@76
|
268 {
|
rob@76
|
269 impl_->SetAllowReuse( allowReuse );
|
rob@76
|
270 }
|
rob@76
|
271
|
rob@76
|
272 IpEndpointName UdpSocket::LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
|
rob@76
|
273 {
|
rob@76
|
274 return impl_->LocalEndpointFor( remoteEndpoint );
|
rob@76
|
275 }
|
rob@76
|
276
|
rob@76
|
277 void UdpSocket::Connect( const IpEndpointName& remoteEndpoint )
|
rob@76
|
278 {
|
rob@76
|
279 impl_->Connect( remoteEndpoint );
|
rob@76
|
280 }
|
rob@76
|
281
|
rob@76
|
282 void UdpSocket::Send( const char *data, std::size_t size )
|
rob@76
|
283 {
|
rob@76
|
284 impl_->Send( data, size );
|
rob@76
|
285 }
|
rob@76
|
286
|
rob@76
|
287 void UdpSocket::SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size )
|
rob@76
|
288 {
|
rob@76
|
289 impl_->SendTo( remoteEndpoint, data, size );
|
rob@76
|
290 }
|
rob@76
|
291
|
rob@76
|
292 void UdpSocket::Bind( const IpEndpointName& localEndpoint )
|
rob@76
|
293 {
|
rob@76
|
294 impl_->Bind( localEndpoint );
|
rob@76
|
295 }
|
rob@76
|
296
|
rob@76
|
297 bool UdpSocket::IsBound() const
|
rob@76
|
298 {
|
rob@76
|
299 return impl_->IsBound();
|
rob@76
|
300 }
|
rob@76
|
301
|
rob@76
|
302 std::size_t UdpSocket::ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size )
|
rob@76
|
303 {
|
rob@76
|
304 return impl_->ReceiveFrom( remoteEndpoint, data, size );
|
rob@76
|
305 }
|
rob@76
|
306
|
rob@76
|
307
|
rob@76
|
308 struct AttachedTimerListener{
|
rob@76
|
309 AttachedTimerListener( int id, int p, TimerListener *tl )
|
rob@76
|
310 : initialDelayMs( id )
|
rob@76
|
311 , periodMs( p )
|
rob@76
|
312 , listener( tl ) {}
|
rob@76
|
313 int initialDelayMs;
|
rob@76
|
314 int periodMs;
|
rob@76
|
315 TimerListener *listener;
|
rob@76
|
316 };
|
rob@76
|
317
|
rob@76
|
318
|
rob@76
|
319 static bool CompareScheduledTimerCalls(
|
rob@76
|
320 const std::pair< double, AttachedTimerListener > & lhs, const std::pair< double, AttachedTimerListener > & rhs )
|
rob@76
|
321 {
|
rob@76
|
322 return lhs.first < rhs.first;
|
rob@76
|
323 }
|
rob@76
|
324
|
rob@76
|
325
|
rob@76
|
326 SocketReceiveMultiplexer *multiplexerInstanceToAbortWithSigInt_ = 0;
|
rob@76
|
327
|
rob@76
|
328 extern "C" /*static*/ void InterruptSignalHandler( int );
|
rob@76
|
329 /*static*/ void InterruptSignalHandler( int )
|
rob@76
|
330 {
|
rob@76
|
331 multiplexerInstanceToAbortWithSigInt_->AsynchronousBreak();
|
rob@76
|
332 signal( SIGINT, SIG_DFL );
|
rob@76
|
333 }
|
rob@76
|
334
|
rob@76
|
335
|
rob@76
|
336 class SocketReceiveMultiplexer::Implementation{
|
rob@76
|
337 std::vector< std::pair< PacketListener*, UdpSocket* > > socketListeners_;
|
rob@76
|
338 std::vector< AttachedTimerListener > timerListeners_;
|
rob@76
|
339
|
rob@76
|
340 volatile bool break_;
|
rob@76
|
341 int breakPipe_[2]; // [0] is the reader descriptor and [1] the writer
|
rob@76
|
342
|
rob@76
|
343 double GetCurrentTimeMs() const
|
rob@76
|
344 {
|
rob@76
|
345 struct timeval t;
|
rob@76
|
346
|
rob@76
|
347 gettimeofday( &t, 0 );
|
rob@76
|
348
|
rob@76
|
349 return ((double)t.tv_sec*1000.) + ((double)t.tv_usec / 1000.);
|
rob@76
|
350 }
|
rob@76
|
351
|
rob@76
|
352 public:
|
rob@76
|
353 Implementation()
|
rob@76
|
354 {
|
rob@76
|
355 if( pipe(breakPipe_) != 0 )
|
rob@76
|
356 throw std::runtime_error( "creation of asynchronous break pipes failed\n" );
|
rob@76
|
357 }
|
rob@76
|
358
|
rob@76
|
359 ~Implementation()
|
rob@76
|
360 {
|
rob@76
|
361 close( breakPipe_[0] );
|
rob@76
|
362 close( breakPipe_[1] );
|
rob@76
|
363 }
|
rob@76
|
364
|
rob@76
|
365 void AttachSocketListener( UdpSocket *socket, PacketListener *listener )
|
rob@76
|
366 {
|
rob@76
|
367 assert( std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) ) == socketListeners_.end() );
|
rob@76
|
368 // we don't check that the same socket has been added multiple times, even though this is an error
|
rob@76
|
369 socketListeners_.push_back( std::make_pair( listener, socket ) );
|
rob@76
|
370 }
|
rob@76
|
371
|
rob@76
|
372 void DetachSocketListener( UdpSocket *socket, PacketListener *listener )
|
rob@76
|
373 {
|
rob@76
|
374 std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i =
|
rob@76
|
375 std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) );
|
rob@76
|
376 assert( i != socketListeners_.end() );
|
rob@76
|
377
|
rob@76
|
378 socketListeners_.erase( i );
|
rob@76
|
379 }
|
rob@76
|
380
|
rob@76
|
381 void AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
|
rob@76
|
382 {
|
rob@76
|
383 timerListeners_.push_back( AttachedTimerListener( periodMilliseconds, periodMilliseconds, listener ) );
|
rob@76
|
384 }
|
rob@76
|
385
|
rob@76
|
386 void AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
|
rob@76
|
387 {
|
rob@76
|
388 timerListeners_.push_back( AttachedTimerListener( initialDelayMilliseconds, periodMilliseconds, listener ) );
|
rob@76
|
389 }
|
rob@76
|
390
|
rob@76
|
391 void DetachPeriodicTimerListener( TimerListener *listener )
|
rob@76
|
392 {
|
rob@76
|
393 std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
|
rob@76
|
394 while( i != timerListeners_.end() ){
|
rob@76
|
395 if( i->listener == listener )
|
rob@76
|
396 break;
|
rob@76
|
397 ++i;
|
rob@76
|
398 }
|
rob@76
|
399
|
rob@76
|
400 assert( i != timerListeners_.end() );
|
rob@76
|
401
|
rob@76
|
402 timerListeners_.erase( i );
|
rob@76
|
403 }
|
rob@76
|
404
|
rob@76
|
405 void Run()
|
rob@76
|
406 {
|
rob@76
|
407 break_ = false;
|
rob@76
|
408 char *data = 0;
|
rob@76
|
409
|
rob@76
|
410 try{
|
rob@76
|
411
|
rob@76
|
412 // configure the master fd_set for select()
|
rob@76
|
413
|
rob@76
|
414 fd_set masterfds, tempfds;
|
rob@76
|
415 FD_ZERO( &masterfds );
|
rob@76
|
416 FD_ZERO( &tempfds );
|
rob@76
|
417
|
rob@76
|
418 // in addition to listening to the inbound sockets we
|
rob@76
|
419 // also listen to the asynchronous break pipe, so that AsynchronousBreak()
|
rob@76
|
420 // can break us out of select() from another thread.
|
rob@76
|
421 FD_SET( breakPipe_[0], &masterfds );
|
rob@76
|
422 int fdmax = breakPipe_[0];
|
rob@76
|
423
|
rob@76
|
424 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
|
rob@76
|
425 i != socketListeners_.end(); ++i ){
|
rob@76
|
426
|
rob@76
|
427 if( fdmax < i->second->impl_->Socket() )
|
rob@76
|
428 fdmax = i->second->impl_->Socket();
|
rob@76
|
429 FD_SET( i->second->impl_->Socket(), &masterfds );
|
rob@76
|
430 }
|
rob@76
|
431
|
rob@76
|
432
|
rob@76
|
433 // configure the timer queue
|
rob@76
|
434 double currentTimeMs = GetCurrentTimeMs();
|
rob@76
|
435
|
rob@76
|
436 // expiry time ms, listener
|
rob@76
|
437 std::vector< std::pair< double, AttachedTimerListener > > timerQueue_;
|
rob@76
|
438 for( std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
|
rob@76
|
439 i != timerListeners_.end(); ++i )
|
rob@76
|
440 timerQueue_.push_back( std::make_pair( currentTimeMs + i->initialDelayMs, *i ) );
|
rob@76
|
441 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
|
rob@76
|
442
|
rob@76
|
443 const int MAX_BUFFER_SIZE = 4098;
|
rob@76
|
444 data = new char[ MAX_BUFFER_SIZE ];
|
rob@76
|
445 IpEndpointName remoteEndpoint;
|
rob@76
|
446
|
rob@76
|
447 struct timeval timeout;
|
rob@76
|
448
|
rob@76
|
449 while( !break_ ){
|
rob@76
|
450 tempfds = masterfds;
|
rob@76
|
451
|
rob@76
|
452 struct timeval *timeoutPtr = 0;
|
rob@76
|
453 if( !timerQueue_.empty() ){
|
rob@76
|
454 double timeoutMs = timerQueue_.front().first - GetCurrentTimeMs();
|
rob@76
|
455 if( timeoutMs < 0 )
|
rob@76
|
456 timeoutMs = 0;
|
rob@76
|
457
|
rob@76
|
458 long timoutSecondsPart = (long)(timeoutMs * .001);
|
rob@76
|
459 timeout.tv_sec = (time_t)timoutSecondsPart;
|
rob@76
|
460 // 1000000 microseconds in a second
|
rob@76
|
461 timeout.tv_usec = (suseconds_t)((timeoutMs - (timoutSecondsPart * 1000)) * 1000);
|
rob@76
|
462 timeoutPtr = &timeout;
|
rob@76
|
463 }
|
rob@76
|
464
|
rob@76
|
465 if( select( fdmax + 1, &tempfds, 0, 0, timeoutPtr ) < 0 ){
|
rob@76
|
466 if( break_ ){
|
rob@76
|
467 break;
|
rob@76
|
468 }else if( errno == EINTR ){
|
rob@76
|
469 // on returning an error, select() doesn't clear tempfds.
|
rob@76
|
470 // so tempfds would remain all set, which would cause read( breakPipe_[0]...
|
rob@76
|
471 // below to block indefinitely. therefore if select returns EINTR we restart
|
rob@76
|
472 // the while() loop instead of continuing on to below.
|
rob@76
|
473 continue;
|
rob@76
|
474 }else{
|
rob@76
|
475 throw std::runtime_error("select failed\n");
|
rob@76
|
476 }
|
rob@76
|
477 }
|
rob@76
|
478
|
rob@76
|
479 if( FD_ISSET( breakPipe_[0], &tempfds ) ){
|
rob@76
|
480 // clear pending data from the asynchronous break pipe
|
rob@76
|
481 char c;
|
rob@76
|
482 read( breakPipe_[0], &c, 1 );
|
rob@76
|
483 }
|
rob@76
|
484
|
rob@76
|
485 if( break_ )
|
rob@76
|
486 break;
|
rob@76
|
487
|
rob@76
|
488 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
|
rob@76
|
489 i != socketListeners_.end(); ++i ){
|
rob@76
|
490
|
rob@76
|
491 if( FD_ISSET( i->second->impl_->Socket(), &tempfds ) ){
|
rob@76
|
492
|
rob@76
|
493 std::size_t size = i->second->ReceiveFrom( remoteEndpoint, data, MAX_BUFFER_SIZE );
|
rob@76
|
494 if( size > 0 ){
|
rob@76
|
495 i->first->ProcessPacket( data, (int)size, remoteEndpoint );
|
rob@76
|
496 if( break_ )
|
rob@76
|
497 break;
|
rob@76
|
498 }
|
rob@76
|
499 }
|
rob@76
|
500 }
|
rob@76
|
501
|
rob@76
|
502 // execute any expired timers
|
rob@76
|
503 currentTimeMs = GetCurrentTimeMs();
|
rob@76
|
504 bool resort = false;
|
rob@76
|
505 for( std::vector< std::pair< double, AttachedTimerListener > >::iterator i = timerQueue_.begin();
|
rob@76
|
506 i != timerQueue_.end() && i->first <= currentTimeMs; ++i ){
|
rob@76
|
507
|
rob@76
|
508 i->second.listener->TimerExpired();
|
rob@76
|
509 if( break_ )
|
rob@76
|
510 break;
|
rob@76
|
511
|
rob@76
|
512 i->first += i->second.periodMs;
|
rob@76
|
513 resort = true;
|
rob@76
|
514 }
|
rob@76
|
515 if( resort )
|
rob@76
|
516 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
|
rob@76
|
517 }
|
rob@76
|
518
|
rob@76
|
519 delete [] data;
|
rob@76
|
520 }catch(...){
|
rob@76
|
521 if( data )
|
rob@76
|
522 delete [] data;
|
rob@76
|
523 throw;
|
rob@76
|
524 }
|
rob@76
|
525 }
|
rob@76
|
526
|
rob@76
|
527 void Break()
|
rob@76
|
528 {
|
rob@76
|
529 break_ = true;
|
rob@76
|
530 }
|
rob@76
|
531
|
rob@76
|
532 void AsynchronousBreak()
|
rob@76
|
533 {
|
rob@76
|
534 break_ = true;
|
rob@76
|
535
|
rob@76
|
536 // Send a termination message to the asynchronous break pipe, so select() will return
|
rob@76
|
537 write( breakPipe_[1], "!", 1 );
|
rob@76
|
538 }
|
rob@76
|
539 };
|
rob@76
|
540
|
rob@76
|
541
|
rob@76
|
542
|
rob@76
|
543 SocketReceiveMultiplexer::SocketReceiveMultiplexer()
|
rob@76
|
544 {
|
rob@76
|
545 impl_ = new Implementation();
|
rob@76
|
546 }
|
rob@76
|
547
|
rob@76
|
548 SocketReceiveMultiplexer::~SocketReceiveMultiplexer()
|
rob@76
|
549 {
|
rob@76
|
550 delete impl_;
|
rob@76
|
551 }
|
rob@76
|
552
|
rob@76
|
553 void SocketReceiveMultiplexer::AttachSocketListener( UdpSocket *socket, PacketListener *listener )
|
rob@76
|
554 {
|
rob@76
|
555 impl_->AttachSocketListener( socket, listener );
|
rob@76
|
556 }
|
rob@76
|
557
|
rob@76
|
558 void SocketReceiveMultiplexer::DetachSocketListener( UdpSocket *socket, PacketListener *listener )
|
rob@76
|
559 {
|
rob@76
|
560 impl_->DetachSocketListener( socket, listener );
|
rob@76
|
561 }
|
rob@76
|
562
|
rob@76
|
563 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
|
rob@76
|
564 {
|
rob@76
|
565 impl_->AttachPeriodicTimerListener( periodMilliseconds, listener );
|
rob@76
|
566 }
|
rob@76
|
567
|
rob@76
|
568 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
|
rob@76
|
569 {
|
rob@76
|
570 impl_->AttachPeriodicTimerListener( initialDelayMilliseconds, periodMilliseconds, listener );
|
rob@76
|
571 }
|
rob@76
|
572
|
rob@76
|
573 void SocketReceiveMultiplexer::DetachPeriodicTimerListener( TimerListener *listener )
|
rob@76
|
574 {
|
rob@76
|
575 impl_->DetachPeriodicTimerListener( listener );
|
rob@76
|
576 }
|
rob@76
|
577
|
rob@76
|
578 void SocketReceiveMultiplexer::Run()
|
rob@76
|
579 {
|
rob@76
|
580 impl_->Run();
|
rob@76
|
581 }
|
rob@76
|
582
|
rob@76
|
583 void SocketReceiveMultiplexer::RunUntilSigInt()
|
rob@76
|
584 {
|
rob@76
|
585 assert( multiplexerInstanceToAbortWithSigInt_ == 0 ); /* at present we support only one multiplexer instance running until sig int */
|
rob@76
|
586 multiplexerInstanceToAbortWithSigInt_ = this;
|
rob@76
|
587 signal( SIGINT, InterruptSignalHandler );
|
rob@76
|
588 impl_->Run();
|
rob@76
|
589 signal( SIGINT, SIG_DFL );
|
rob@76
|
590 multiplexerInstanceToAbortWithSigInt_ = 0;
|
rob@76
|
591 }
|
rob@76
|
592
|
rob@76
|
593 void SocketReceiveMultiplexer::Break()
|
rob@76
|
594 {
|
rob@76
|
595 impl_->Break();
|
rob@76
|
596 }
|
rob@76
|
597
|
rob@76
|
598 void SocketReceiveMultiplexer::AsynchronousBreak()
|
rob@76
|
599 {
|
rob@76
|
600 impl_->AsynchronousBreak();
|
rob@76
|
601 }
|
rob@76
|
602
|