comparison oscpack/ip/posix/UdpSocket.cpp @ 76:0ae87af84e2f

added oscgroups
author Rob Canning <rob@foo.net>
date Sun, 13 Jul 2014 10:07:41 +0100
parents
children
comparison
equal deleted inserted replaced
75:3a2845e3156e 76:0ae87af84e2f
1 /*
2 oscpack -- Open Sound Control (OSC) packet manipulation library
3 http://www.rossbencina.com/code/oscpack
4
5 Copyright (c) 2004-2013 Ross Bencina <rossb@audiomulch.com>
6
7 Permission is hereby granted, free of charge, to any person obtaining
8 a copy of this software and associated documentation files
9 (the "Software"), to deal in the Software without restriction,
10 including without limitation the rights to use, copy, modify, merge,
11 publish, distribute, sublicense, and/or sell copies of the Software,
12 and to permit persons to whom the Software is furnished to do so,
13 subject to the following conditions:
14
15 The above copyright notice and this permission notice shall be
16 included in all copies or substantial portions of the Software.
17
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
21 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
22 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
23 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 */
26
27 /*
28 The text above constitutes the entire oscpack license; however,
29 the oscpack developer(s) also make the following non-binding requests:
30
31 Any person wishing to distribute modifications to the Software is
32 requested to send the modifications to the original developer so that
33 they can be incorporated into the canonical version. It is also
34 requested that these non-binding requests be included whenever the
35 above license is reproduced.
36 */
37 #include "ip/UdpSocket.h"
38
39 #include <pthread.h>
40 #include <unistd.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <netdb.h>
44 #include <sys/types.h>
45 #include <sys/socket.h>
46 #include <sys/time.h>
47 #include <netinet/in.h> // for sockaddr_in
48
49 #include <signal.h>
50 #include <math.h>
51 #include <errno.h>
52 #include <string.h>
53
54 #include <algorithm>
55 #include <cassert>
56 #include <cstring> // for memset
57 #include <stdexcept>
58 #include <vector>
59
60 #include "ip/PacketListener.h"
61 #include "ip/TimerListener.h"
62
63
64 #if defined(__APPLE__) && !defined(_SOCKLEN_T)
65 // pre system 10.3 didn't have socklen_t
66 typedef ssize_t socklen_t;
67 #endif
68
69
70 static void SockaddrFromIpEndpointName( struct sockaddr_in& sockAddr, const IpEndpointName& endpoint )
71 {
72 std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
73 sockAddr.sin_family = AF_INET;
74
75 sockAddr.sin_addr.s_addr =
76 (endpoint.address == IpEndpointName::ANY_ADDRESS)
77 ? INADDR_ANY
78 : htonl( endpoint.address );
79
80 sockAddr.sin_port =
81 (endpoint.port == IpEndpointName::ANY_PORT)
82 ? 0
83 : htons( endpoint.port );
84 }
85
86
87 static IpEndpointName IpEndpointNameFromSockaddr( const struct sockaddr_in& sockAddr )
88 {
89 return IpEndpointName(
90 (sockAddr.sin_addr.s_addr == INADDR_ANY)
91 ? IpEndpointName::ANY_ADDRESS
92 : ntohl( sockAddr.sin_addr.s_addr ),
93 (sockAddr.sin_port == 0)
94 ? IpEndpointName::ANY_PORT
95 : ntohs( sockAddr.sin_port )
96 );
97 }
98
99
100 class UdpSocket::Implementation{
101 bool isBound_;
102 bool isConnected_;
103
104 int socket_;
105 struct sockaddr_in connectedAddr_;
106 struct sockaddr_in sendToAddr_;
107
108 public:
109
110 Implementation()
111 : isBound_( false )
112 , isConnected_( false )
113 , socket_( -1 )
114 {
115 if( (socket_ = socket( AF_INET, SOCK_DGRAM, 0 )) == -1 ){
116 throw std::runtime_error("unable to create udp socket\n");
117 }
118
119 std::memset( &sendToAddr_, 0, sizeof(sendToAddr_) );
120 sendToAddr_.sin_family = AF_INET;
121 }
122
123 ~Implementation()
124 {
125 if (socket_ != -1) close(socket_);
126 }
127
128 void SetEnableBroadcast( bool enableBroadcast )
129 {
130 int broadcast = (enableBroadcast) ? 1 : 0; // int on posix
131 setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &broadcast, sizeof(broadcast));
132 }
133
134 void SetAllowReuse( bool allowReuse )
135 {
136 int reuseAddr = (allowReuse) ? 1 : 0; // int on posix
137 setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr));
138
139 #ifdef __APPLE__
140 // needed also for OS X - enable multiple listeners for a single port on same network interface
141 int reusePort = (allowReuse) ? 1 : 0; // int on posix
142 setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &reusePort, sizeof(reusePort));
143 #endif
144 }
145
146 IpEndpointName LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
147 {
148 assert( isBound_ );
149
150 // first connect the socket to the remote server
151
152 struct sockaddr_in connectSockAddr;
153 SockaddrFromIpEndpointName( connectSockAddr, remoteEndpoint );
154
155 if (connect(socket_, (struct sockaddr *)&connectSockAddr, sizeof(connectSockAddr)) < 0) {
156 throw std::runtime_error("unable to connect udp socket\n");
157 }
158
159 // get the address
160
161 struct sockaddr_in sockAddr;
162 std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
163 socklen_t length = sizeof(sockAddr);
164 if (getsockname(socket_, (struct sockaddr *)&sockAddr, &length) < 0) {
165 throw std::runtime_error("unable to getsockname\n");
166 }
167
168 if( isConnected_ ){
169 // reconnect to the connected address
170
171 if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
172 throw std::runtime_error("unable to connect udp socket\n");
173 }
174
175 }else{
176 // unconnect from the remote address
177
178 struct sockaddr_in unconnectSockAddr;
179 std::memset( (char *)&unconnectSockAddr, 0, sizeof(unconnectSockAddr ) );
180 unconnectSockAddr.sin_family = AF_UNSPEC;
181 // address fields are zero
182 int connectResult = connect(socket_, (struct sockaddr *)&unconnectSockAddr, sizeof(unconnectSockAddr));
183 if ( connectResult < 0 && errno != EAFNOSUPPORT ) {
184 throw std::runtime_error("unable to un-connect udp socket\n");
185 }
186 }
187
188 return IpEndpointNameFromSockaddr( sockAddr );
189 }
190
191 void Connect( const IpEndpointName& remoteEndpoint )
192 {
193 SockaddrFromIpEndpointName( connectedAddr_, remoteEndpoint );
194
195 if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
196 throw std::runtime_error("unable to connect udp socket\n");
197 }
198
199 isConnected_ = true;
200 }
201
202 void Send( const char *data, std::size_t size )
203 {
204 assert( isConnected_ );
205
206 send( socket_, data, size, 0 );
207 }
208
209 void SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size )
210 {
211 sendToAddr_.sin_addr.s_addr = htonl( remoteEndpoint.address );
212 sendToAddr_.sin_port = htons( remoteEndpoint.port );
213
214 sendto( socket_, data, size, 0, (sockaddr*)&sendToAddr_, sizeof(sendToAddr_) );
215 }
216
217 void Bind( const IpEndpointName& localEndpoint )
218 {
219 struct sockaddr_in bindSockAddr;
220 SockaddrFromIpEndpointName( bindSockAddr, localEndpoint );
221
222 if (bind(socket_, (struct sockaddr *)&bindSockAddr, sizeof(bindSockAddr)) < 0) {
223 throw std::runtime_error("unable to bind udp socket\n");
224 }
225
226 isBound_ = true;
227 }
228
229 bool IsBound() const { return isBound_; }
230
231 std::size_t ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size )
232 {
233 assert( isBound_ );
234
235 struct sockaddr_in fromAddr;
236 socklen_t fromAddrLen = sizeof(fromAddr);
237
238 ssize_t result = recvfrom(socket_, data, size, 0,
239 (struct sockaddr *) &fromAddr, (socklen_t*)&fromAddrLen);
240 if( result < 0 )
241 return 0;
242
243 remoteEndpoint.address = ntohl(fromAddr.sin_addr.s_addr);
244 remoteEndpoint.port = ntohs(fromAddr.sin_port);
245
246 return (std::size_t)result;
247 }
248
249 int Socket() { return socket_; }
250 };
251
252 UdpSocket::UdpSocket()
253 {
254 impl_ = new Implementation();
255 }
256
257 UdpSocket::~UdpSocket()
258 {
259 delete impl_;
260 }
261
262 void UdpSocket::SetEnableBroadcast( bool enableBroadcast )
263 {
264 impl_->SetEnableBroadcast( enableBroadcast );
265 }
266
267 void UdpSocket::SetAllowReuse( bool allowReuse )
268 {
269 impl_->SetAllowReuse( allowReuse );
270 }
271
272 IpEndpointName UdpSocket::LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
273 {
274 return impl_->LocalEndpointFor( remoteEndpoint );
275 }
276
277 void UdpSocket::Connect( const IpEndpointName& remoteEndpoint )
278 {
279 impl_->Connect( remoteEndpoint );
280 }
281
282 void UdpSocket::Send( const char *data, std::size_t size )
283 {
284 impl_->Send( data, size );
285 }
286
287 void UdpSocket::SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size )
288 {
289 impl_->SendTo( remoteEndpoint, data, size );
290 }
291
292 void UdpSocket::Bind( const IpEndpointName& localEndpoint )
293 {
294 impl_->Bind( localEndpoint );
295 }
296
297 bool UdpSocket::IsBound() const
298 {
299 return impl_->IsBound();
300 }
301
302 std::size_t UdpSocket::ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size )
303 {
304 return impl_->ReceiveFrom( remoteEndpoint, data, size );
305 }
306
307
308 struct AttachedTimerListener{
309 AttachedTimerListener( int id, int p, TimerListener *tl )
310 : initialDelayMs( id )
311 , periodMs( p )
312 , listener( tl ) {}
313 int initialDelayMs;
314 int periodMs;
315 TimerListener *listener;
316 };
317
318
319 static bool CompareScheduledTimerCalls(
320 const std::pair< double, AttachedTimerListener > & lhs, const std::pair< double, AttachedTimerListener > & rhs )
321 {
322 return lhs.first < rhs.first;
323 }
324
325
326 SocketReceiveMultiplexer *multiplexerInstanceToAbortWithSigInt_ = 0;
327
328 extern "C" /*static*/ void InterruptSignalHandler( int );
329 /*static*/ void InterruptSignalHandler( int )
330 {
331 multiplexerInstanceToAbortWithSigInt_->AsynchronousBreak();
332 signal( SIGINT, SIG_DFL );
333 }
334
335
336 class SocketReceiveMultiplexer::Implementation{
337 std::vector< std::pair< PacketListener*, UdpSocket* > > socketListeners_;
338 std::vector< AttachedTimerListener > timerListeners_;
339
340 volatile bool break_;
341 int breakPipe_[2]; // [0] is the reader descriptor and [1] the writer
342
343 double GetCurrentTimeMs() const
344 {
345 struct timeval t;
346
347 gettimeofday( &t, 0 );
348
349 return ((double)t.tv_sec*1000.) + ((double)t.tv_usec / 1000.);
350 }
351
352 public:
353 Implementation()
354 {
355 if( pipe(breakPipe_) != 0 )
356 throw std::runtime_error( "creation of asynchronous break pipes failed\n" );
357 }
358
359 ~Implementation()
360 {
361 close( breakPipe_[0] );
362 close( breakPipe_[1] );
363 }
364
365 void AttachSocketListener( UdpSocket *socket, PacketListener *listener )
366 {
367 assert( std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) ) == socketListeners_.end() );
368 // we don't check that the same socket has been added multiple times, even though this is an error
369 socketListeners_.push_back( std::make_pair( listener, socket ) );
370 }
371
372 void DetachSocketListener( UdpSocket *socket, PacketListener *listener )
373 {
374 std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i =
375 std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) );
376 assert( i != socketListeners_.end() );
377
378 socketListeners_.erase( i );
379 }
380
381 void AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
382 {
383 timerListeners_.push_back( AttachedTimerListener( periodMilliseconds, periodMilliseconds, listener ) );
384 }
385
386 void AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
387 {
388 timerListeners_.push_back( AttachedTimerListener( initialDelayMilliseconds, periodMilliseconds, listener ) );
389 }
390
391 void DetachPeriodicTimerListener( TimerListener *listener )
392 {
393 std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
394 while( i != timerListeners_.end() ){
395 if( i->listener == listener )
396 break;
397 ++i;
398 }
399
400 assert( i != timerListeners_.end() );
401
402 timerListeners_.erase( i );
403 }
404
405 void Run()
406 {
407 break_ = false;
408 char *data = 0;
409
410 try{
411
412 // configure the master fd_set for select()
413
414 fd_set masterfds, tempfds;
415 FD_ZERO( &masterfds );
416 FD_ZERO( &tempfds );
417
418 // in addition to listening to the inbound sockets we
419 // also listen to the asynchronous break pipe, so that AsynchronousBreak()
420 // can break us out of select() from another thread.
421 FD_SET( breakPipe_[0], &masterfds );
422 int fdmax = breakPipe_[0];
423
424 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
425 i != socketListeners_.end(); ++i ){
426
427 if( fdmax < i->second->impl_->Socket() )
428 fdmax = i->second->impl_->Socket();
429 FD_SET( i->second->impl_->Socket(), &masterfds );
430 }
431
432
433 // configure the timer queue
434 double currentTimeMs = GetCurrentTimeMs();
435
436 // expiry time ms, listener
437 std::vector< std::pair< double, AttachedTimerListener > > timerQueue_;
438 for( std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
439 i != timerListeners_.end(); ++i )
440 timerQueue_.push_back( std::make_pair( currentTimeMs + i->initialDelayMs, *i ) );
441 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
442
443 const int MAX_BUFFER_SIZE = 4098;
444 data = new char[ MAX_BUFFER_SIZE ];
445 IpEndpointName remoteEndpoint;
446
447 struct timeval timeout;
448
449 while( !break_ ){
450 tempfds = masterfds;
451
452 struct timeval *timeoutPtr = 0;
453 if( !timerQueue_.empty() ){
454 double timeoutMs = timerQueue_.front().first - GetCurrentTimeMs();
455 if( timeoutMs < 0 )
456 timeoutMs = 0;
457
458 long timoutSecondsPart = (long)(timeoutMs * .001);
459 timeout.tv_sec = (time_t)timoutSecondsPart;
460 // 1000000 microseconds in a second
461 timeout.tv_usec = (suseconds_t)((timeoutMs - (timoutSecondsPart * 1000)) * 1000);
462 timeoutPtr = &timeout;
463 }
464
465 if( select( fdmax + 1, &tempfds, 0, 0, timeoutPtr ) < 0 ){
466 if( break_ ){
467 break;
468 }else if( errno == EINTR ){
469 // on returning an error, select() doesn't clear tempfds.
470 // so tempfds would remain all set, which would cause read( breakPipe_[0]...
471 // below to block indefinitely. therefore if select returns EINTR we restart
472 // the while() loop instead of continuing on to below.
473 continue;
474 }else{
475 throw std::runtime_error("select failed\n");
476 }
477 }
478
479 if( FD_ISSET( breakPipe_[0], &tempfds ) ){
480 // clear pending data from the asynchronous break pipe
481 char c;
482 read( breakPipe_[0], &c, 1 );
483 }
484
485 if( break_ )
486 break;
487
488 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
489 i != socketListeners_.end(); ++i ){
490
491 if( FD_ISSET( i->second->impl_->Socket(), &tempfds ) ){
492
493 std::size_t size = i->second->ReceiveFrom( remoteEndpoint, data, MAX_BUFFER_SIZE );
494 if( size > 0 ){
495 i->first->ProcessPacket( data, (int)size, remoteEndpoint );
496 if( break_ )
497 break;
498 }
499 }
500 }
501
502 // execute any expired timers
503 currentTimeMs = GetCurrentTimeMs();
504 bool resort = false;
505 for( std::vector< std::pair< double, AttachedTimerListener > >::iterator i = timerQueue_.begin();
506 i != timerQueue_.end() && i->first <= currentTimeMs; ++i ){
507
508 i->second.listener->TimerExpired();
509 if( break_ )
510 break;
511
512 i->first += i->second.periodMs;
513 resort = true;
514 }
515 if( resort )
516 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
517 }
518
519 delete [] data;
520 }catch(...){
521 if( data )
522 delete [] data;
523 throw;
524 }
525 }
526
527 void Break()
528 {
529 break_ = true;
530 }
531
532 void AsynchronousBreak()
533 {
534 break_ = true;
535
536 // Send a termination message to the asynchronous break pipe, so select() will return
537 write( breakPipe_[1], "!", 1 );
538 }
539 };
540
541
542
543 SocketReceiveMultiplexer::SocketReceiveMultiplexer()
544 {
545 impl_ = new Implementation();
546 }
547
548 SocketReceiveMultiplexer::~SocketReceiveMultiplexer()
549 {
550 delete impl_;
551 }
552
553 void SocketReceiveMultiplexer::AttachSocketListener( UdpSocket *socket, PacketListener *listener )
554 {
555 impl_->AttachSocketListener( socket, listener );
556 }
557
558 void SocketReceiveMultiplexer::DetachSocketListener( UdpSocket *socket, PacketListener *listener )
559 {
560 impl_->DetachSocketListener( socket, listener );
561 }
562
563 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
564 {
565 impl_->AttachPeriodicTimerListener( periodMilliseconds, listener );
566 }
567
568 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
569 {
570 impl_->AttachPeriodicTimerListener( initialDelayMilliseconds, periodMilliseconds, listener );
571 }
572
573 void SocketReceiveMultiplexer::DetachPeriodicTimerListener( TimerListener *listener )
574 {
575 impl_->DetachPeriodicTimerListener( listener );
576 }
577
578 void SocketReceiveMultiplexer::Run()
579 {
580 impl_->Run();
581 }
582
583 void SocketReceiveMultiplexer::RunUntilSigInt()
584 {
585 assert( multiplexerInstanceToAbortWithSigInt_ == 0 ); /* at present we support only one multiplexer instance running until sig int */
586 multiplexerInstanceToAbortWithSigInt_ = this;
587 signal( SIGINT, InterruptSignalHandler );
588 impl_->Run();
589 signal( SIGINT, SIG_DFL );
590 multiplexerInstanceToAbortWithSigInt_ = 0;
591 }
592
593 void SocketReceiveMultiplexer::Break()
594 {
595 impl_->Break();
596 }
597
598 void SocketReceiveMultiplexer::AsynchronousBreak()
599 {
600 impl_->AsynchronousBreak();
601 }
602