annotate oscpack/ip/posix/UdpSocket.cpp @ 101:52e44ee1c791 tip master

enabled all scores in autostart script
author Rob Canning <rc@kiben.net>
date Tue, 21 Apr 2015 16:20:57 +0100
parents 0ae87af84e2f
children
rev   line source
rob@76 1 /*
rob@76 2 oscpack -- Open Sound Control (OSC) packet manipulation library
rob@76 3 http://www.rossbencina.com/code/oscpack
rob@76 4
rob@76 5 Copyright (c) 2004-2013 Ross Bencina <rossb@audiomulch.com>
rob@76 6
rob@76 7 Permission is hereby granted, free of charge, to any person obtaining
rob@76 8 a copy of this software and associated documentation files
rob@76 9 (the "Software"), to deal in the Software without restriction,
rob@76 10 including without limitation the rights to use, copy, modify, merge,
rob@76 11 publish, distribute, sublicense, and/or sell copies of the Software,
rob@76 12 and to permit persons to whom the Software is furnished to do so,
rob@76 13 subject to the following conditions:
rob@76 14
rob@76 15 The above copyright notice and this permission notice shall be
rob@76 16 included in all copies or substantial portions of the Software.
rob@76 17
rob@76 18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
rob@76 19 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
rob@76 20 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
rob@76 21 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
rob@76 22 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
rob@76 23 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
rob@76 24 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
rob@76 25 */
rob@76 26
rob@76 27 /*
rob@76 28 The text above constitutes the entire oscpack license; however,
rob@76 29 the oscpack developer(s) also make the following non-binding requests:
rob@76 30
rob@76 31 Any person wishing to distribute modifications to the Software is
rob@76 32 requested to send the modifications to the original developer so that
rob@76 33 they can be incorporated into the canonical version. It is also
rob@76 34 requested that these non-binding requests be included whenever the
rob@76 35 above license is reproduced.
rob@76 36 */
rob@76 37 #include "ip/UdpSocket.h"
rob@76 38
rob@76 39 #include <pthread.h>
rob@76 40 #include <unistd.h>
rob@76 41 #include <stdlib.h>
rob@76 42 #include <stdio.h>
rob@76 43 #include <netdb.h>
rob@76 44 #include <sys/types.h>
rob@76 45 #include <sys/socket.h>
rob@76 46 #include <sys/time.h>
rob@76 47 #include <netinet/in.h> // for sockaddr_in
rob@76 48
rob@76 49 #include <signal.h>
rob@76 50 #include <math.h>
rob@76 51 #include <errno.h>
rob@76 52 #include <string.h>
rob@76 53
rob@76 54 #include <algorithm>
rob@76 55 #include <cassert>
rob@76 56 #include <cstring> // for memset
rob@76 57 #include <stdexcept>
rob@76 58 #include <vector>
rob@76 59
rob@76 60 #include "ip/PacketListener.h"
rob@76 61 #include "ip/TimerListener.h"
rob@76 62
rob@76 63
rob@76 64 #if defined(__APPLE__) && !defined(_SOCKLEN_T)
rob@76 65 // pre system 10.3 didn't have socklen_t
rob@76 66 typedef ssize_t socklen_t;
rob@76 67 #endif
rob@76 68
rob@76 69
rob@76 70 static void SockaddrFromIpEndpointName( struct sockaddr_in& sockAddr, const IpEndpointName& endpoint )
rob@76 71 {
rob@76 72 std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
rob@76 73 sockAddr.sin_family = AF_INET;
rob@76 74
rob@76 75 sockAddr.sin_addr.s_addr =
rob@76 76 (endpoint.address == IpEndpointName::ANY_ADDRESS)
rob@76 77 ? INADDR_ANY
rob@76 78 : htonl( endpoint.address );
rob@76 79
rob@76 80 sockAddr.sin_port =
rob@76 81 (endpoint.port == IpEndpointName::ANY_PORT)
rob@76 82 ? 0
rob@76 83 : htons( endpoint.port );
rob@76 84 }
rob@76 85
rob@76 86
rob@76 87 static IpEndpointName IpEndpointNameFromSockaddr( const struct sockaddr_in& sockAddr )
rob@76 88 {
rob@76 89 return IpEndpointName(
rob@76 90 (sockAddr.sin_addr.s_addr == INADDR_ANY)
rob@76 91 ? IpEndpointName::ANY_ADDRESS
rob@76 92 : ntohl( sockAddr.sin_addr.s_addr ),
rob@76 93 (sockAddr.sin_port == 0)
rob@76 94 ? IpEndpointName::ANY_PORT
rob@76 95 : ntohs( sockAddr.sin_port )
rob@76 96 );
rob@76 97 }
rob@76 98
rob@76 99
rob@76 100 class UdpSocket::Implementation{
rob@76 101 bool isBound_;
rob@76 102 bool isConnected_;
rob@76 103
rob@76 104 int socket_;
rob@76 105 struct sockaddr_in connectedAddr_;
rob@76 106 struct sockaddr_in sendToAddr_;
rob@76 107
rob@76 108 public:
rob@76 109
rob@76 110 Implementation()
rob@76 111 : isBound_( false )
rob@76 112 , isConnected_( false )
rob@76 113 , socket_( -1 )
rob@76 114 {
rob@76 115 if( (socket_ = socket( AF_INET, SOCK_DGRAM, 0 )) == -1 ){
rob@76 116 throw std::runtime_error("unable to create udp socket\n");
rob@76 117 }
rob@76 118
rob@76 119 std::memset( &sendToAddr_, 0, sizeof(sendToAddr_) );
rob@76 120 sendToAddr_.sin_family = AF_INET;
rob@76 121 }
rob@76 122
rob@76 123 ~Implementation()
rob@76 124 {
rob@76 125 if (socket_ != -1) close(socket_);
rob@76 126 }
rob@76 127
rob@76 128 void SetEnableBroadcast( bool enableBroadcast )
rob@76 129 {
rob@76 130 int broadcast = (enableBroadcast) ? 1 : 0; // int on posix
rob@76 131 setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &broadcast, sizeof(broadcast));
rob@76 132 }
rob@76 133
rob@76 134 void SetAllowReuse( bool allowReuse )
rob@76 135 {
rob@76 136 int reuseAddr = (allowReuse) ? 1 : 0; // int on posix
rob@76 137 setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr));
rob@76 138
rob@76 139 #ifdef __APPLE__
rob@76 140 // needed also for OS X - enable multiple listeners for a single port on same network interface
rob@76 141 int reusePort = (allowReuse) ? 1 : 0; // int on posix
rob@76 142 setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &reusePort, sizeof(reusePort));
rob@76 143 #endif
rob@76 144 }
rob@76 145
rob@76 146 IpEndpointName LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
rob@76 147 {
rob@76 148 assert( isBound_ );
rob@76 149
rob@76 150 // first connect the socket to the remote server
rob@76 151
rob@76 152 struct sockaddr_in connectSockAddr;
rob@76 153 SockaddrFromIpEndpointName( connectSockAddr, remoteEndpoint );
rob@76 154
rob@76 155 if (connect(socket_, (struct sockaddr *)&connectSockAddr, sizeof(connectSockAddr)) < 0) {
rob@76 156 throw std::runtime_error("unable to connect udp socket\n");
rob@76 157 }
rob@76 158
rob@76 159 // get the address
rob@76 160
rob@76 161 struct sockaddr_in sockAddr;
rob@76 162 std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
rob@76 163 socklen_t length = sizeof(sockAddr);
rob@76 164 if (getsockname(socket_, (struct sockaddr *)&sockAddr, &length) < 0) {
rob@76 165 throw std::runtime_error("unable to getsockname\n");
rob@76 166 }
rob@76 167
rob@76 168 if( isConnected_ ){
rob@76 169 // reconnect to the connected address
rob@76 170
rob@76 171 if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
rob@76 172 throw std::runtime_error("unable to connect udp socket\n");
rob@76 173 }
rob@76 174
rob@76 175 }else{
rob@76 176 // unconnect from the remote address
rob@76 177
rob@76 178 struct sockaddr_in unconnectSockAddr;
rob@76 179 std::memset( (char *)&unconnectSockAddr, 0, sizeof(unconnectSockAddr ) );
rob@76 180 unconnectSockAddr.sin_family = AF_UNSPEC;
rob@76 181 // address fields are zero
rob@76 182 int connectResult = connect(socket_, (struct sockaddr *)&unconnectSockAddr, sizeof(unconnectSockAddr));
rob@76 183 if ( connectResult < 0 && errno != EAFNOSUPPORT ) {
rob@76 184 throw std::runtime_error("unable to un-connect udp socket\n");
rob@76 185 }
rob@76 186 }
rob@76 187
rob@76 188 return IpEndpointNameFromSockaddr( sockAddr );
rob@76 189 }
rob@76 190
rob@76 191 void Connect( const IpEndpointName& remoteEndpoint )
rob@76 192 {
rob@76 193 SockaddrFromIpEndpointName( connectedAddr_, remoteEndpoint );
rob@76 194
rob@76 195 if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
rob@76 196 throw std::runtime_error("unable to connect udp socket\n");
rob@76 197 }
rob@76 198
rob@76 199 isConnected_ = true;
rob@76 200 }
rob@76 201
rob@76 202 void Send( const char *data, std::size_t size )
rob@76 203 {
rob@76 204 assert( isConnected_ );
rob@76 205
rob@76 206 send( socket_, data, size, 0 );
rob@76 207 }
rob@76 208
rob@76 209 void SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size )
rob@76 210 {
rob@76 211 sendToAddr_.sin_addr.s_addr = htonl( remoteEndpoint.address );
rob@76 212 sendToAddr_.sin_port = htons( remoteEndpoint.port );
rob@76 213
rob@76 214 sendto( socket_, data, size, 0, (sockaddr*)&sendToAddr_, sizeof(sendToAddr_) );
rob@76 215 }
rob@76 216
rob@76 217 void Bind( const IpEndpointName& localEndpoint )
rob@76 218 {
rob@76 219 struct sockaddr_in bindSockAddr;
rob@76 220 SockaddrFromIpEndpointName( bindSockAddr, localEndpoint );
rob@76 221
rob@76 222 if (bind(socket_, (struct sockaddr *)&bindSockAddr, sizeof(bindSockAddr)) < 0) {
rob@76 223 throw std::runtime_error("unable to bind udp socket\n");
rob@76 224 }
rob@76 225
rob@76 226 isBound_ = true;
rob@76 227 }
rob@76 228
rob@76 229 bool IsBound() const { return isBound_; }
rob@76 230
rob@76 231 std::size_t ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size )
rob@76 232 {
rob@76 233 assert( isBound_ );
rob@76 234
rob@76 235 struct sockaddr_in fromAddr;
rob@76 236 socklen_t fromAddrLen = sizeof(fromAddr);
rob@76 237
rob@76 238 ssize_t result = recvfrom(socket_, data, size, 0,
rob@76 239 (struct sockaddr *) &fromAddr, (socklen_t*)&fromAddrLen);
rob@76 240 if( result < 0 )
rob@76 241 return 0;
rob@76 242
rob@76 243 remoteEndpoint.address = ntohl(fromAddr.sin_addr.s_addr);
rob@76 244 remoteEndpoint.port = ntohs(fromAddr.sin_port);
rob@76 245
rob@76 246 return (std::size_t)result;
rob@76 247 }
rob@76 248
rob@76 249 int Socket() { return socket_; }
rob@76 250 };
rob@76 251
rob@76 252 UdpSocket::UdpSocket()
rob@76 253 {
rob@76 254 impl_ = new Implementation();
rob@76 255 }
rob@76 256
rob@76 257 UdpSocket::~UdpSocket()
rob@76 258 {
rob@76 259 delete impl_;
rob@76 260 }
rob@76 261
rob@76 262 void UdpSocket::SetEnableBroadcast( bool enableBroadcast )
rob@76 263 {
rob@76 264 impl_->SetEnableBroadcast( enableBroadcast );
rob@76 265 }
rob@76 266
rob@76 267 void UdpSocket::SetAllowReuse( bool allowReuse )
rob@76 268 {
rob@76 269 impl_->SetAllowReuse( allowReuse );
rob@76 270 }
rob@76 271
rob@76 272 IpEndpointName UdpSocket::LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
rob@76 273 {
rob@76 274 return impl_->LocalEndpointFor( remoteEndpoint );
rob@76 275 }
rob@76 276
rob@76 277 void UdpSocket::Connect( const IpEndpointName& remoteEndpoint )
rob@76 278 {
rob@76 279 impl_->Connect( remoteEndpoint );
rob@76 280 }
rob@76 281
rob@76 282 void UdpSocket::Send( const char *data, std::size_t size )
rob@76 283 {
rob@76 284 impl_->Send( data, size );
rob@76 285 }
rob@76 286
rob@76 287 void UdpSocket::SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size )
rob@76 288 {
rob@76 289 impl_->SendTo( remoteEndpoint, data, size );
rob@76 290 }
rob@76 291
rob@76 292 void UdpSocket::Bind( const IpEndpointName& localEndpoint )
rob@76 293 {
rob@76 294 impl_->Bind( localEndpoint );
rob@76 295 }
rob@76 296
rob@76 297 bool UdpSocket::IsBound() const
rob@76 298 {
rob@76 299 return impl_->IsBound();
rob@76 300 }
rob@76 301
rob@76 302 std::size_t UdpSocket::ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size )
rob@76 303 {
rob@76 304 return impl_->ReceiveFrom( remoteEndpoint, data, size );
rob@76 305 }
rob@76 306
rob@76 307
rob@76 308 struct AttachedTimerListener{
rob@76 309 AttachedTimerListener( int id, int p, TimerListener *tl )
rob@76 310 : initialDelayMs( id )
rob@76 311 , periodMs( p )
rob@76 312 , listener( tl ) {}
rob@76 313 int initialDelayMs;
rob@76 314 int periodMs;
rob@76 315 TimerListener *listener;
rob@76 316 };
rob@76 317
rob@76 318
rob@76 319 static bool CompareScheduledTimerCalls(
rob@76 320 const std::pair< double, AttachedTimerListener > & lhs, const std::pair< double, AttachedTimerListener > & rhs )
rob@76 321 {
rob@76 322 return lhs.first < rhs.first;
rob@76 323 }
rob@76 324
rob@76 325
rob@76 326 SocketReceiveMultiplexer *multiplexerInstanceToAbortWithSigInt_ = 0;
rob@76 327
rob@76 328 extern "C" /*static*/ void InterruptSignalHandler( int );
rob@76 329 /*static*/ void InterruptSignalHandler( int )
rob@76 330 {
rob@76 331 multiplexerInstanceToAbortWithSigInt_->AsynchronousBreak();
rob@76 332 signal( SIGINT, SIG_DFL );
rob@76 333 }
rob@76 334
rob@76 335
rob@76 336 class SocketReceiveMultiplexer::Implementation{
rob@76 337 std::vector< std::pair< PacketListener*, UdpSocket* > > socketListeners_;
rob@76 338 std::vector< AttachedTimerListener > timerListeners_;
rob@76 339
rob@76 340 volatile bool break_;
rob@76 341 int breakPipe_[2]; // [0] is the reader descriptor and [1] the writer
rob@76 342
rob@76 343 double GetCurrentTimeMs() const
rob@76 344 {
rob@76 345 struct timeval t;
rob@76 346
rob@76 347 gettimeofday( &t, 0 );
rob@76 348
rob@76 349 return ((double)t.tv_sec*1000.) + ((double)t.tv_usec / 1000.);
rob@76 350 }
rob@76 351
rob@76 352 public:
rob@76 353 Implementation()
rob@76 354 {
rob@76 355 if( pipe(breakPipe_) != 0 )
rob@76 356 throw std::runtime_error( "creation of asynchronous break pipes failed\n" );
rob@76 357 }
rob@76 358
rob@76 359 ~Implementation()
rob@76 360 {
rob@76 361 close( breakPipe_[0] );
rob@76 362 close( breakPipe_[1] );
rob@76 363 }
rob@76 364
rob@76 365 void AttachSocketListener( UdpSocket *socket, PacketListener *listener )
rob@76 366 {
rob@76 367 assert( std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) ) == socketListeners_.end() );
rob@76 368 // we don't check that the same socket has been added multiple times, even though this is an error
rob@76 369 socketListeners_.push_back( std::make_pair( listener, socket ) );
rob@76 370 }
rob@76 371
rob@76 372 void DetachSocketListener( UdpSocket *socket, PacketListener *listener )
rob@76 373 {
rob@76 374 std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i =
rob@76 375 std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) );
rob@76 376 assert( i != socketListeners_.end() );
rob@76 377
rob@76 378 socketListeners_.erase( i );
rob@76 379 }
rob@76 380
rob@76 381 void AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
rob@76 382 {
rob@76 383 timerListeners_.push_back( AttachedTimerListener( periodMilliseconds, periodMilliseconds, listener ) );
rob@76 384 }
rob@76 385
rob@76 386 void AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
rob@76 387 {
rob@76 388 timerListeners_.push_back( AttachedTimerListener( initialDelayMilliseconds, periodMilliseconds, listener ) );
rob@76 389 }
rob@76 390
rob@76 391 void DetachPeriodicTimerListener( TimerListener *listener )
rob@76 392 {
rob@76 393 std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
rob@76 394 while( i != timerListeners_.end() ){
rob@76 395 if( i->listener == listener )
rob@76 396 break;
rob@76 397 ++i;
rob@76 398 }
rob@76 399
rob@76 400 assert( i != timerListeners_.end() );
rob@76 401
rob@76 402 timerListeners_.erase( i );
rob@76 403 }
rob@76 404
rob@76 405 void Run()
rob@76 406 {
rob@76 407 break_ = false;
rob@76 408 char *data = 0;
rob@76 409
rob@76 410 try{
rob@76 411
rob@76 412 // configure the master fd_set for select()
rob@76 413
rob@76 414 fd_set masterfds, tempfds;
rob@76 415 FD_ZERO( &masterfds );
rob@76 416 FD_ZERO( &tempfds );
rob@76 417
rob@76 418 // in addition to listening to the inbound sockets we
rob@76 419 // also listen to the asynchronous break pipe, so that AsynchronousBreak()
rob@76 420 // can break us out of select() from another thread.
rob@76 421 FD_SET( breakPipe_[0], &masterfds );
rob@76 422 int fdmax = breakPipe_[0];
rob@76 423
rob@76 424 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
rob@76 425 i != socketListeners_.end(); ++i ){
rob@76 426
rob@76 427 if( fdmax < i->second->impl_->Socket() )
rob@76 428 fdmax = i->second->impl_->Socket();
rob@76 429 FD_SET( i->second->impl_->Socket(), &masterfds );
rob@76 430 }
rob@76 431
rob@76 432
rob@76 433 // configure the timer queue
rob@76 434 double currentTimeMs = GetCurrentTimeMs();
rob@76 435
rob@76 436 // expiry time ms, listener
rob@76 437 std::vector< std::pair< double, AttachedTimerListener > > timerQueue_;
rob@76 438 for( std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
rob@76 439 i != timerListeners_.end(); ++i )
rob@76 440 timerQueue_.push_back( std::make_pair( currentTimeMs + i->initialDelayMs, *i ) );
rob@76 441 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
rob@76 442
rob@76 443 const int MAX_BUFFER_SIZE = 4098;
rob@76 444 data = new char[ MAX_BUFFER_SIZE ];
rob@76 445 IpEndpointName remoteEndpoint;
rob@76 446
rob@76 447 struct timeval timeout;
rob@76 448
rob@76 449 while( !break_ ){
rob@76 450 tempfds = masterfds;
rob@76 451
rob@76 452 struct timeval *timeoutPtr = 0;
rob@76 453 if( !timerQueue_.empty() ){
rob@76 454 double timeoutMs = timerQueue_.front().first - GetCurrentTimeMs();
rob@76 455 if( timeoutMs < 0 )
rob@76 456 timeoutMs = 0;
rob@76 457
rob@76 458 long timoutSecondsPart = (long)(timeoutMs * .001);
rob@76 459 timeout.tv_sec = (time_t)timoutSecondsPart;
rob@76 460 // 1000000 microseconds in a second
rob@76 461 timeout.tv_usec = (suseconds_t)((timeoutMs - (timoutSecondsPart * 1000)) * 1000);
rob@76 462 timeoutPtr = &timeout;
rob@76 463 }
rob@76 464
rob@76 465 if( select( fdmax + 1, &tempfds, 0, 0, timeoutPtr ) < 0 ){
rob@76 466 if( break_ ){
rob@76 467 break;
rob@76 468 }else if( errno == EINTR ){
rob@76 469 // on returning an error, select() doesn't clear tempfds.
rob@76 470 // so tempfds would remain all set, which would cause read( breakPipe_[0]...
rob@76 471 // below to block indefinitely. therefore if select returns EINTR we restart
rob@76 472 // the while() loop instead of continuing on to below.
rob@76 473 continue;
rob@76 474 }else{
rob@76 475 throw std::runtime_error("select failed\n");
rob@76 476 }
rob@76 477 }
rob@76 478
rob@76 479 if( FD_ISSET( breakPipe_[0], &tempfds ) ){
rob@76 480 // clear pending data from the asynchronous break pipe
rob@76 481 char c;
rob@76 482 read( breakPipe_[0], &c, 1 );
rob@76 483 }
rob@76 484
rob@76 485 if( break_ )
rob@76 486 break;
rob@76 487
rob@76 488 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
rob@76 489 i != socketListeners_.end(); ++i ){
rob@76 490
rob@76 491 if( FD_ISSET( i->second->impl_->Socket(), &tempfds ) ){
rob@76 492
rob@76 493 std::size_t size = i->second->ReceiveFrom( remoteEndpoint, data, MAX_BUFFER_SIZE );
rob@76 494 if( size > 0 ){
rob@76 495 i->first->ProcessPacket( data, (int)size, remoteEndpoint );
rob@76 496 if( break_ )
rob@76 497 break;
rob@76 498 }
rob@76 499 }
rob@76 500 }
rob@76 501
rob@76 502 // execute any expired timers
rob@76 503 currentTimeMs = GetCurrentTimeMs();
rob@76 504 bool resort = false;
rob@76 505 for( std::vector< std::pair< double, AttachedTimerListener > >::iterator i = timerQueue_.begin();
rob@76 506 i != timerQueue_.end() && i->first <= currentTimeMs; ++i ){
rob@76 507
rob@76 508 i->second.listener->TimerExpired();
rob@76 509 if( break_ )
rob@76 510 break;
rob@76 511
rob@76 512 i->first += i->second.periodMs;
rob@76 513 resort = true;
rob@76 514 }
rob@76 515 if( resort )
rob@76 516 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
rob@76 517 }
rob@76 518
rob@76 519 delete [] data;
rob@76 520 }catch(...){
rob@76 521 if( data )
rob@76 522 delete [] data;
rob@76 523 throw;
rob@76 524 }
rob@76 525 }
rob@76 526
rob@76 527 void Break()
rob@76 528 {
rob@76 529 break_ = true;
rob@76 530 }
rob@76 531
rob@76 532 void AsynchronousBreak()
rob@76 533 {
rob@76 534 break_ = true;
rob@76 535
rob@76 536 // Send a termination message to the asynchronous break pipe, so select() will return
rob@76 537 write( breakPipe_[1], "!", 1 );
rob@76 538 }
rob@76 539 };
rob@76 540
rob@76 541
rob@76 542
rob@76 543 SocketReceiveMultiplexer::SocketReceiveMultiplexer()
rob@76 544 {
rob@76 545 impl_ = new Implementation();
rob@76 546 }
rob@76 547
rob@76 548 SocketReceiveMultiplexer::~SocketReceiveMultiplexer()
rob@76 549 {
rob@76 550 delete impl_;
rob@76 551 }
rob@76 552
rob@76 553 void SocketReceiveMultiplexer::AttachSocketListener( UdpSocket *socket, PacketListener *listener )
rob@76 554 {
rob@76 555 impl_->AttachSocketListener( socket, listener );
rob@76 556 }
rob@76 557
rob@76 558 void SocketReceiveMultiplexer::DetachSocketListener( UdpSocket *socket, PacketListener *listener )
rob@76 559 {
rob@76 560 impl_->DetachSocketListener( socket, listener );
rob@76 561 }
rob@76 562
rob@76 563 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
rob@76 564 {
rob@76 565 impl_->AttachPeriodicTimerListener( periodMilliseconds, listener );
rob@76 566 }
rob@76 567
rob@76 568 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
rob@76 569 {
rob@76 570 impl_->AttachPeriodicTimerListener( initialDelayMilliseconds, periodMilliseconds, listener );
rob@76 571 }
rob@76 572
rob@76 573 void SocketReceiveMultiplexer::DetachPeriodicTimerListener( TimerListener *listener )
rob@76 574 {
rob@76 575 impl_->DetachPeriodicTimerListener( listener );
rob@76 576 }
rob@76 577
rob@76 578 void SocketReceiveMultiplexer::Run()
rob@76 579 {
rob@76 580 impl_->Run();
rob@76 581 }
rob@76 582
rob@76 583 void SocketReceiveMultiplexer::RunUntilSigInt()
rob@76 584 {
rob@76 585 assert( multiplexerInstanceToAbortWithSigInt_ == 0 ); /* at present we support only one multiplexer instance running until sig int */
rob@76 586 multiplexerInstanceToAbortWithSigInt_ = this;
rob@76 587 signal( SIGINT, InterruptSignalHandler );
rob@76 588 impl_->Run();
rob@76 589 signal( SIGINT, SIG_DFL );
rob@76 590 multiplexerInstanceToAbortWithSigInt_ = 0;
rob@76 591 }
rob@76 592
rob@76 593 void SocketReceiveMultiplexer::Break()
rob@76 594 {
rob@76 595 impl_->Break();
rob@76 596 }
rob@76 597
rob@76 598 void SocketReceiveMultiplexer::AsynchronousBreak()
rob@76 599 {
rob@76 600 impl_->AsynchronousBreak();
rob@76 601 }
rob@76 602