annotate trunk/external/oscpack/ip/posix/UdpSocket.cpp @ 706:f8e90b5d85fd tip

Delete CARFAC code from this repository. It has been moved to https://github.com/google/carfac Please email me with your github username to get access. I've also created a new mailing list to discuss CARFAC development: https://groups.google.com/forum/#!forum/carfac-dev
author ronw@google.com
date Thu, 18 Jul 2013 20:56:51 +0000
parents 4b37b53105a3
children
rev   line source
tomwalters@570 1 /*
tomwalters@570 2 oscpack -- Open Sound Control packet manipulation library
tomwalters@570 3 http://www.audiomulch.com/~rossb/oscpack
tomwalters@570 4
tomwalters@570 5 Copyright (c) 2004-2005 Ross Bencina <rossb@audiomulch.com>
tomwalters@570 6
tomwalters@570 7 Permission is hereby granted, free of charge, to any person obtaining
tomwalters@570 8 a copy of this software and associated documentation files
tomwalters@570 9 (the "Software"), to deal in the Software without restriction,
tomwalters@570 10 including without limitation the rights to use, copy, modify, merge,
tomwalters@570 11 publish, distribute, sublicense, and/or sell copies of the Software,
tomwalters@570 12 and to permit persons to whom the Software is furnished to do so,
tomwalters@570 13 subject to the following conditions:
tomwalters@570 14
tomwalters@570 15 The above copyright notice and this permission notice shall be
tomwalters@570 16 included in all copies or substantial portions of the Software.
tomwalters@570 17
tomwalters@570 18 Any person wishing to distribute modifications to the Software is
tomwalters@570 19 requested to send the modifications to the original developer so that
tomwalters@570 20 they can be incorporated into the canonical version.
tomwalters@570 21
tomwalters@570 22 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
tomwalters@570 23 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
tomwalters@570 24 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
tomwalters@570 25 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
tomwalters@570 26 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
tomwalters@570 27 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
tomwalters@570 28 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
tomwalters@570 29 */
tomwalters@570 30 #include "ip/UdpSocket.h"
tomwalters@570 31
tomwalters@570 32 #include <vector>
tomwalters@570 33 #include <algorithm>
tomwalters@570 34 #include <stdexcept>
tomwalters@570 35 #include <assert.h>
tomwalters@570 36 #include <signal.h>
tomwalters@570 37 #include <math.h>
tomwalters@570 38 #include <errno.h>
tomwalters@570 39 #include <string.h> // for memset
tomwalters@570 40
tomwalters@570 41 #include <pthread.h>
tomwalters@570 42 #include <unistd.h>
tomwalters@570 43 #include <stdlib.h>
tomwalters@570 44 #include <stdio.h>
tomwalters@570 45 #include <netdb.h>
tomwalters@570 46 #include <sys/types.h>
tomwalters@570 47 #include <sys/socket.h>
tomwalters@570 48 #include <sys/time.h>
tomwalters@570 49 #include <netinet/in.h> // for sockaddr_in
tomwalters@570 50
tomwalters@570 51 #include "ip/PacketListener.h"
tomwalters@570 52 #include "ip/TimerListener.h"
tomwalters@570 53
tomwalters@570 54
tomwalters@570 55 #if defined(__APPLE__) && !defined(_SOCKLEN_T)
tomwalters@570 56 // pre system 10.3 didn have socklen_t
tomwalters@570 57 typedef ssize_t socklen_t;
tomwalters@570 58 #endif
tomwalters@570 59
tomwalters@570 60
tomwalters@570 61 static void SockaddrFromIpEndpointName( struct sockaddr_in& sockAddr, const IpEndpointName& endpoint )
tomwalters@570 62 {
tomwalters@570 63 memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
tomwalters@570 64 sockAddr.sin_family = AF_INET;
tomwalters@570 65
tomwalters@570 66 sockAddr.sin_addr.s_addr =
tomwalters@570 67 (endpoint.address == IpEndpointName::ANY_ADDRESS)
tomwalters@570 68 ? INADDR_ANY
tomwalters@570 69 : htonl( endpoint.address );
tomwalters@570 70
tomwalters@570 71 sockAddr.sin_port =
tomwalters@570 72 (endpoint.port == IpEndpointName::ANY_PORT)
tomwalters@570 73 ? 0
tomwalters@570 74 : htons( endpoint.port );
tomwalters@570 75 }
tomwalters@570 76
tomwalters@570 77
tomwalters@570 78 static IpEndpointName IpEndpointNameFromSockaddr( const struct sockaddr_in& sockAddr )
tomwalters@570 79 {
tomwalters@570 80 return IpEndpointName(
tomwalters@570 81 (sockAddr.sin_addr.s_addr == INADDR_ANY)
tomwalters@570 82 ? IpEndpointName::ANY_ADDRESS
tomwalters@570 83 : ntohl( sockAddr.sin_addr.s_addr ),
tomwalters@570 84 (sockAddr.sin_port == 0)
tomwalters@570 85 ? IpEndpointName::ANY_PORT
tomwalters@570 86 : ntohs( sockAddr.sin_port )
tomwalters@570 87 );
tomwalters@570 88 }
tomwalters@570 89
tomwalters@570 90
tomwalters@570 91 class UdpSocket::Implementation{
tomwalters@570 92 bool isBound_;
tomwalters@570 93 bool isConnected_;
tomwalters@570 94
tomwalters@570 95 int socket_;
tomwalters@570 96 struct sockaddr_in connectedAddr_;
tomwalters@570 97 struct sockaddr_in sendToAddr_;
tomwalters@570 98
tomwalters@570 99 public:
tomwalters@570 100
tomwalters@570 101 Implementation()
tomwalters@570 102 : isBound_( false )
tomwalters@570 103 , isConnected_( false )
tomwalters@570 104 , socket_( -1 )
tomwalters@570 105 {
tomwalters@570 106 if( (socket_ = socket( AF_INET, SOCK_DGRAM, 0 )) == -1 ){
tomwalters@570 107 throw std::runtime_error("unable to create udp socket\n");
tomwalters@570 108 }
tomwalters@570 109
tomwalters@570 110 memset( &sendToAddr_, 0, sizeof(sendToAddr_) );
tomwalters@570 111 sendToAddr_.sin_family = AF_INET;
tomwalters@570 112 }
tomwalters@570 113
tomwalters@570 114 ~Implementation()
tomwalters@570 115 {
tomwalters@570 116 if (socket_ != -1) close(socket_);
tomwalters@570 117 }
tomwalters@570 118
tomwalters@570 119 IpEndpointName LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
tomwalters@570 120 {
tomwalters@570 121 assert( isBound_ );
tomwalters@570 122
tomwalters@570 123 // first connect the socket to the remote server
tomwalters@570 124
tomwalters@570 125 struct sockaddr_in connectSockAddr;
tomwalters@570 126 SockaddrFromIpEndpointName( connectSockAddr, remoteEndpoint );
tomwalters@570 127
tomwalters@570 128 if (connect(socket_, (struct sockaddr *)&connectSockAddr, sizeof(connectSockAddr)) < 0) {
tomwalters@570 129 throw std::runtime_error("unable to connect udp socket\n");
tomwalters@570 130 }
tomwalters@570 131
tomwalters@570 132 // get the address
tomwalters@570 133
tomwalters@570 134 struct sockaddr_in sockAddr;
tomwalters@570 135 memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
tomwalters@570 136 socklen_t length = sizeof(sockAddr);
tomwalters@570 137 if (getsockname(socket_, (struct sockaddr *)&sockAddr, &length) < 0) {
tomwalters@570 138 throw std::runtime_error("unable to getsockname\n");
tomwalters@570 139 }
tomwalters@570 140
tomwalters@570 141 if( isConnected_ ){
tomwalters@570 142 // reconnect to the connected address
tomwalters@570 143
tomwalters@570 144 if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
tomwalters@570 145 throw std::runtime_error("unable to connect udp socket\n");
tomwalters@570 146 }
tomwalters@570 147
tomwalters@570 148 }else{
tomwalters@570 149 // unconnect from the remote address
tomwalters@570 150
tomwalters@570 151 struct sockaddr_in unconnectSockAddr;
tomwalters@570 152 memset( (char *)&unconnectSockAddr, 0, sizeof(unconnectSockAddr ) );
tomwalters@570 153 unconnectSockAddr.sin_family = AF_UNSPEC;
tomwalters@570 154 // address fields are zero
tomwalters@570 155 int connectResult = connect(socket_, (struct sockaddr *)&unconnectSockAddr, sizeof(unconnectSockAddr));
tomwalters@570 156 if ( connectResult < 0 && errno != EAFNOSUPPORT ) {
tomwalters@570 157 throw std::runtime_error("unable to un-connect udp socket\n");
tomwalters@570 158 }
tomwalters@570 159 }
tomwalters@570 160
tomwalters@570 161 return IpEndpointNameFromSockaddr( sockAddr );
tomwalters@570 162 }
tomwalters@570 163
tomwalters@570 164 void Connect( const IpEndpointName& remoteEndpoint )
tomwalters@570 165 {
tomwalters@570 166 SockaddrFromIpEndpointName( connectedAddr_, remoteEndpoint );
tomwalters@570 167
tomwalters@570 168 if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
tomwalters@570 169 throw std::runtime_error("unable to connect udp socket\n");
tomwalters@570 170 }
tomwalters@570 171
tomwalters@570 172 isConnected_ = true;
tomwalters@570 173 }
tomwalters@570 174
tomwalters@570 175 void Send( const char *data, int size )
tomwalters@570 176 {
tomwalters@570 177 assert( isConnected_ );
tomwalters@570 178
tomwalters@570 179 send( socket_, data, size, 0 );
tomwalters@570 180 }
tomwalters@570 181
tomwalters@570 182 void SendTo( const IpEndpointName& remoteEndpoint, const char *data, int size )
tomwalters@570 183 {
tomwalters@570 184 sendToAddr_.sin_addr.s_addr = htonl( remoteEndpoint.address );
tomwalters@570 185 sendToAddr_.sin_port = htons( remoteEndpoint.port );
tomwalters@570 186
tomwalters@570 187 sendto( socket_, data, size, 0, (sockaddr*)&sendToAddr_, sizeof(sendToAddr_) );
tomwalters@570 188 }
tomwalters@570 189
tomwalters@570 190 void Bind( const IpEndpointName& localEndpoint )
tomwalters@570 191 {
tomwalters@570 192 struct sockaddr_in bindSockAddr;
tomwalters@570 193 SockaddrFromIpEndpointName( bindSockAddr, localEndpoint );
tomwalters@570 194
tomwalters@570 195 if (bind(socket_, (struct sockaddr *)&bindSockAddr, sizeof(bindSockAddr)) < 0) {
tomwalters@570 196 throw std::runtime_error("unable to bind udp socket\n");
tomwalters@570 197 }
tomwalters@570 198
tomwalters@570 199 isBound_ = true;
tomwalters@570 200 }
tomwalters@570 201
tomwalters@570 202 bool IsBound() const { return isBound_; }
tomwalters@570 203
tomwalters@570 204 int ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, int size )
tomwalters@570 205 {
tomwalters@570 206 assert( isBound_ );
tomwalters@570 207
tomwalters@570 208 struct sockaddr_in fromAddr;
tomwalters@570 209 socklen_t fromAddrLen = sizeof(fromAddr);
tomwalters@570 210
tomwalters@570 211 int result = recvfrom(socket_, data, size, 0,
tomwalters@570 212 (struct sockaddr *) &fromAddr, (socklen_t*)&fromAddrLen);
tomwalters@570 213 if( result < 0 )
tomwalters@570 214 return 0;
tomwalters@570 215
tomwalters@570 216 remoteEndpoint.address = ntohl(fromAddr.sin_addr.s_addr);
tomwalters@570 217 remoteEndpoint.port = ntohs(fromAddr.sin_port);
tomwalters@570 218
tomwalters@570 219 return result;
tomwalters@570 220 }
tomwalters@570 221
tomwalters@570 222 int Socket() { return socket_; }
tomwalters@570 223 };
tomwalters@570 224
tomwalters@570 225 UdpSocket::UdpSocket()
tomwalters@570 226 {
tomwalters@570 227 impl_ = new Implementation();
tomwalters@570 228 }
tomwalters@570 229
tomwalters@570 230 UdpSocket::~UdpSocket()
tomwalters@570 231 {
tomwalters@570 232 delete impl_;
tomwalters@570 233 }
tomwalters@570 234
tomwalters@570 235 IpEndpointName UdpSocket::LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
tomwalters@570 236 {
tomwalters@570 237 return impl_->LocalEndpointFor( remoteEndpoint );
tomwalters@570 238 }
tomwalters@570 239
tomwalters@570 240 void UdpSocket::Connect( const IpEndpointName& remoteEndpoint )
tomwalters@570 241 {
tomwalters@570 242 impl_->Connect( remoteEndpoint );
tomwalters@570 243 }
tomwalters@570 244
tomwalters@570 245 void UdpSocket::Send( const char *data, int size )
tomwalters@570 246 {
tomwalters@570 247 impl_->Send( data, size );
tomwalters@570 248 }
tomwalters@570 249
tomwalters@570 250 void UdpSocket::SendTo( const IpEndpointName& remoteEndpoint, const char *data, int size )
tomwalters@570 251 {
tomwalters@570 252 impl_->SendTo( remoteEndpoint, data, size );
tomwalters@570 253 }
tomwalters@570 254
tomwalters@570 255 void UdpSocket::Bind( const IpEndpointName& localEndpoint )
tomwalters@570 256 {
tomwalters@570 257 impl_->Bind( localEndpoint );
tomwalters@570 258 }
tomwalters@570 259
tomwalters@570 260 bool UdpSocket::IsBound() const
tomwalters@570 261 {
tomwalters@570 262 return impl_->IsBound();
tomwalters@570 263 }
tomwalters@570 264
tomwalters@570 265 int UdpSocket::ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, int size )
tomwalters@570 266 {
tomwalters@570 267 return impl_->ReceiveFrom( remoteEndpoint, data, size );
tomwalters@570 268 }
tomwalters@570 269
tomwalters@570 270
tomwalters@570 271 struct AttachedTimerListener{
tomwalters@570 272 AttachedTimerListener( int id, int p, TimerListener *tl )
tomwalters@570 273 : initialDelayMs( id )
tomwalters@570 274 , periodMs( p )
tomwalters@570 275 , listener( tl ) {}
tomwalters@570 276 int initialDelayMs;
tomwalters@570 277 int periodMs;
tomwalters@570 278 TimerListener *listener;
tomwalters@570 279 };
tomwalters@570 280
tomwalters@570 281
tomwalters@570 282 static bool CompareScheduledTimerCalls(
tomwalters@570 283 const std::pair< double, AttachedTimerListener > & lhs, const std::pair< double, AttachedTimerListener > & rhs )
tomwalters@570 284 {
tomwalters@570 285 return lhs.first < rhs.first;
tomwalters@570 286 }
tomwalters@570 287
tomwalters@570 288
tomwalters@570 289 SocketReceiveMultiplexer *multiplexerInstanceToAbortWithSigInt_ = 0;
tomwalters@570 290
tomwalters@570 291 extern "C" /*static*/ void InterruptSignalHandler( int );
tomwalters@570 292 /*static*/ void InterruptSignalHandler( int )
tomwalters@570 293 {
tomwalters@570 294 multiplexerInstanceToAbortWithSigInt_->AsynchronousBreak();
tomwalters@570 295 signal( SIGINT, SIG_DFL );
tomwalters@570 296 }
tomwalters@570 297
tomwalters@570 298
tomwalters@570 299 class SocketReceiveMultiplexer::Implementation{
tomwalters@570 300 std::vector< std::pair< PacketListener*, UdpSocket* > > socketListeners_;
tomwalters@570 301 std::vector< AttachedTimerListener > timerListeners_;
tomwalters@570 302
tomwalters@570 303 volatile bool break_;
tomwalters@570 304 int breakPipe_[2]; // [0] is the reader descriptor and [1] the writer
tomwalters@570 305
tomwalters@570 306 double GetCurrentTimeMs() const
tomwalters@570 307 {
tomwalters@570 308 struct timeval t;
tomwalters@570 309
tomwalters@570 310 gettimeofday( &t, 0 );
tomwalters@570 311
tomwalters@570 312 return ((double)t.tv_sec*1000.) + ((double)t.tv_usec / 1000.);
tomwalters@570 313 }
tomwalters@570 314
tomwalters@570 315 public:
tomwalters@570 316 Implementation()
tomwalters@570 317 {
tomwalters@570 318 if( pipe(breakPipe_) != 0 )
tomwalters@570 319 throw std::runtime_error( "creation of asynchronous break pipes failed\n" );
tomwalters@570 320 }
tomwalters@570 321
tomwalters@570 322 ~Implementation()
tomwalters@570 323 {
tomwalters@570 324 close( breakPipe_[0] );
tomwalters@570 325 close( breakPipe_[1] );
tomwalters@570 326 }
tomwalters@570 327
tomwalters@570 328 void AttachSocketListener( UdpSocket *socket, PacketListener *listener )
tomwalters@570 329 {
tomwalters@570 330 assert( std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) ) == socketListeners_.end() );
tomwalters@570 331 // we don't check that the same socket has been added multiple times, even though this is an error
tomwalters@570 332 socketListeners_.push_back( std::make_pair( listener, socket ) );
tomwalters@570 333 }
tomwalters@570 334
tomwalters@570 335 void DetachSocketListener( UdpSocket *socket, PacketListener *listener )
tomwalters@570 336 {
tomwalters@570 337 std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i =
tomwalters@570 338 std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) );
tomwalters@570 339 assert( i != socketListeners_.end() );
tomwalters@570 340
tomwalters@570 341 socketListeners_.erase( i );
tomwalters@570 342 }
tomwalters@570 343
tomwalters@570 344 void AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
tomwalters@570 345 {
tomwalters@570 346 timerListeners_.push_back( AttachedTimerListener( periodMilliseconds, periodMilliseconds, listener ) );
tomwalters@570 347 }
tomwalters@570 348
tomwalters@570 349 void AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
tomwalters@570 350 {
tomwalters@570 351 timerListeners_.push_back( AttachedTimerListener( initialDelayMilliseconds, periodMilliseconds, listener ) );
tomwalters@570 352 }
tomwalters@570 353
tomwalters@570 354 void DetachPeriodicTimerListener( TimerListener *listener )
tomwalters@570 355 {
tomwalters@570 356 std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
tomwalters@570 357 while( i != timerListeners_.end() ){
tomwalters@570 358 if( i->listener == listener )
tomwalters@570 359 break;
tomwalters@570 360 ++i;
tomwalters@570 361 }
tomwalters@570 362
tomwalters@570 363 assert( i != timerListeners_.end() );
tomwalters@570 364
tomwalters@570 365 timerListeners_.erase( i );
tomwalters@570 366 }
tomwalters@570 367
tomwalters@570 368 void Run()
tomwalters@570 369 {
tomwalters@570 370 break_ = false;
tomwalters@570 371
tomwalters@570 372 // configure the master fd_set for select()
tomwalters@570 373
tomwalters@570 374 fd_set masterfds, tempfds;
tomwalters@570 375 FD_ZERO( &masterfds );
tomwalters@570 376 FD_ZERO( &tempfds );
tomwalters@570 377
tomwalters@570 378 // in addition to listening to the inbound sockets we
tomwalters@570 379 // also listen to the asynchronous break pipe, so that AsynchronousBreak()
tomwalters@570 380 // can break us out of select() from another thread.
tomwalters@570 381 FD_SET( breakPipe_[0], &masterfds );
tomwalters@570 382 int fdmax = breakPipe_[0];
tomwalters@570 383
tomwalters@570 384 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
tomwalters@570 385 i != socketListeners_.end(); ++i ){
tomwalters@570 386
tomwalters@570 387 if( fdmax < i->second->impl_->Socket() )
tomwalters@570 388 fdmax = i->second->impl_->Socket();
tomwalters@570 389 FD_SET( i->second->impl_->Socket(), &masterfds );
tomwalters@570 390 }
tomwalters@570 391
tomwalters@570 392
tomwalters@570 393 // configure the timer queue
tomwalters@570 394 double currentTimeMs = GetCurrentTimeMs();
tomwalters@570 395
tomwalters@570 396 // expiry time ms, listener
tomwalters@570 397 std::vector< std::pair< double, AttachedTimerListener > > timerQueue_;
tomwalters@570 398 for( std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
tomwalters@570 399 i != timerListeners_.end(); ++i )
tomwalters@570 400 timerQueue_.push_back( std::make_pair( currentTimeMs + i->initialDelayMs, *i ) );
tomwalters@570 401 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
tomwalters@570 402
tomwalters@570 403 const int MAX_BUFFER_SIZE = 4098;
tomwalters@570 404 char *data = new char[ MAX_BUFFER_SIZE ];
tomwalters@570 405 IpEndpointName remoteEndpoint;
tomwalters@570 406
tomwalters@570 407 struct timeval timeout;
tomwalters@570 408
tomwalters@570 409 while( !break_ ){
tomwalters@570 410 tempfds = masterfds;
tomwalters@570 411
tomwalters@570 412 struct timeval *timeoutPtr = 0;
tomwalters@570 413 if( !timerQueue_.empty() ){
tomwalters@570 414 double timeoutMs = timerQueue_.front().first - GetCurrentTimeMs();
tomwalters@570 415 if( timeoutMs < 0 )
tomwalters@570 416 timeoutMs = 0;
tomwalters@570 417
tomwalters@570 418 // 1000000 microseconds in a second
tomwalters@570 419 timeout.tv_sec = (long)(timeoutMs * .001);
tomwalters@570 420 timeout.tv_usec = (long)((timeoutMs - (timeout.tv_sec * 1000)) * 1000);
tomwalters@570 421 timeoutPtr = &timeout;
tomwalters@570 422 }
tomwalters@570 423
tomwalters@570 424 if( select( fdmax + 1, &tempfds, 0, 0, timeoutPtr ) < 0 && errno != EINTR ){
tomwalters@570 425 throw std::runtime_error("select failed\n");
tomwalters@570 426 }
tomwalters@570 427
tomwalters@570 428 if ( FD_ISSET( breakPipe_[0], &tempfds ) ){
tomwalters@570 429 // clear pending data from the asynchronous break pipe
tomwalters@570 430 char c;
tomwalters@570 431 read( breakPipe_[0], &c, 1 );
tomwalters@570 432 }
tomwalters@570 433
tomwalters@570 434 if( break_ )
tomwalters@570 435 break;
tomwalters@570 436
tomwalters@570 437 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
tomwalters@570 438 i != socketListeners_.end(); ++i ){
tomwalters@570 439
tomwalters@570 440 if( FD_ISSET( i->second->impl_->Socket(), &tempfds ) ){
tomwalters@570 441
tomwalters@570 442 int size = i->second->ReceiveFrom( remoteEndpoint, data, MAX_BUFFER_SIZE );
tomwalters@570 443 if( size > 0 ){
tomwalters@570 444 i->first->ProcessPacket( data, size, remoteEndpoint );
tomwalters@570 445 if( break_ )
tomwalters@570 446 break;
tomwalters@570 447 }
tomwalters@570 448 }
tomwalters@570 449 }
tomwalters@570 450
tomwalters@570 451 // execute any expired timers
tomwalters@570 452 currentTimeMs = GetCurrentTimeMs();
tomwalters@570 453 bool resort = false;
tomwalters@570 454 for( std::vector< std::pair< double, AttachedTimerListener > >::iterator i = timerQueue_.begin();
tomwalters@570 455 i != timerQueue_.end() && i->first <= currentTimeMs; ++i ){
tomwalters@570 456
tomwalters@570 457 i->second.listener->TimerExpired();
tomwalters@570 458 if( break_ )
tomwalters@570 459 break;
tomwalters@570 460
tomwalters@570 461 i->first += i->second.periodMs;
tomwalters@570 462 resort = true;
tomwalters@570 463 }
tomwalters@570 464 if( resort )
tomwalters@570 465 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
tomwalters@570 466 }
tomwalters@570 467
tomwalters@570 468 delete [] data;
tomwalters@570 469 }
tomwalters@570 470
tomwalters@570 471 void Break()
tomwalters@570 472 {
tomwalters@570 473 break_ = true;
tomwalters@570 474 }
tomwalters@570 475
tomwalters@570 476 void AsynchronousBreak()
tomwalters@570 477 {
tomwalters@570 478 break_ = true;
tomwalters@570 479
tomwalters@570 480 // Send a termination message to the asynchronous break pipe, so select() will return
tomwalters@570 481 write( breakPipe_[1], "!", 1 );
tomwalters@570 482 }
tomwalters@570 483 };
tomwalters@570 484
tomwalters@570 485
tomwalters@570 486
tomwalters@570 487 SocketReceiveMultiplexer::SocketReceiveMultiplexer()
tomwalters@570 488 {
tomwalters@570 489 impl_ = new Implementation();
tomwalters@570 490 }
tomwalters@570 491
tomwalters@570 492 SocketReceiveMultiplexer::~SocketReceiveMultiplexer()
tomwalters@570 493 {
tomwalters@570 494 delete impl_;
tomwalters@570 495 }
tomwalters@570 496
tomwalters@570 497 void SocketReceiveMultiplexer::AttachSocketListener( UdpSocket *socket, PacketListener *listener )
tomwalters@570 498 {
tomwalters@570 499 impl_->AttachSocketListener( socket, listener );
tomwalters@570 500 }
tomwalters@570 501
tomwalters@570 502 void SocketReceiveMultiplexer::DetachSocketListener( UdpSocket *socket, PacketListener *listener )
tomwalters@570 503 {
tomwalters@570 504 impl_->DetachSocketListener( socket, listener );
tomwalters@570 505 }
tomwalters@570 506
tomwalters@570 507 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
tomwalters@570 508 {
tomwalters@570 509 impl_->AttachPeriodicTimerListener( periodMilliseconds, listener );
tomwalters@570 510 }
tomwalters@570 511
tomwalters@570 512 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
tomwalters@570 513 {
tomwalters@570 514 impl_->AttachPeriodicTimerListener( initialDelayMilliseconds, periodMilliseconds, listener );
tomwalters@570 515 }
tomwalters@570 516
tomwalters@570 517 void SocketReceiveMultiplexer::DetachPeriodicTimerListener( TimerListener *listener )
tomwalters@570 518 {
tomwalters@570 519 impl_->DetachPeriodicTimerListener( listener );
tomwalters@570 520 }
tomwalters@570 521
tomwalters@570 522 void SocketReceiveMultiplexer::Run()
tomwalters@570 523 {
tomwalters@570 524 impl_->Run();
tomwalters@570 525 }
tomwalters@570 526
tomwalters@570 527 void SocketReceiveMultiplexer::RunUntilSigInt()
tomwalters@570 528 {
tomwalters@570 529 assert( multiplexerInstanceToAbortWithSigInt_ == 0 ); /* at present we support only one multiplexer instance running until sig int */
tomwalters@570 530 multiplexerInstanceToAbortWithSigInt_ = this;
tomwalters@570 531 signal( SIGINT, InterruptSignalHandler );
tomwalters@570 532 impl_->Run();
tomwalters@570 533 signal( SIGINT, SIG_DFL );
tomwalters@570 534 multiplexerInstanceToAbortWithSigInt_ = 0;
tomwalters@570 535 }
tomwalters@570 536
tomwalters@570 537 void SocketReceiveMultiplexer::Break()
tomwalters@570 538 {
tomwalters@570 539 impl_->Break();
tomwalters@570 540 }
tomwalters@570 541
tomwalters@570 542 void SocketReceiveMultiplexer::AsynchronousBreak()
tomwalters@570 543 {
tomwalters@570 544 impl_->AsynchronousBreak();
tomwalters@570 545 }
tomwalters@570 546