rob@76: /* rob@76: oscpack -- Open Sound Control (OSC) packet manipulation library rob@76: http://www.rossbencina.com/code/oscpack rob@76: rob@76: Copyright (c) 2004-2013 Ross Bencina rob@76: rob@76: Permission is hereby granted, free of charge, to any person obtaining rob@76: a copy of this software and associated documentation files rob@76: (the "Software"), to deal in the Software without restriction, rob@76: including without limitation the rights to use, copy, modify, merge, rob@76: publish, distribute, sublicense, and/or sell copies of the Software, rob@76: and to permit persons to whom the Software is furnished to do so, rob@76: subject to the following conditions: rob@76: rob@76: The above copyright notice and this permission notice shall be rob@76: included in all copies or substantial portions of the Software. rob@76: rob@76: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, rob@76: EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF rob@76: MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. rob@76: IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR rob@76: ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF rob@76: CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION rob@76: WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. rob@76: */ rob@76: rob@76: /* rob@76: The text above constitutes the entire oscpack license; however, rob@76: the oscpack developer(s) also make the following non-binding requests: rob@76: rob@76: Any person wishing to distribute modifications to the Software is rob@76: requested to send the modifications to the original developer so that rob@76: they can be incorporated into the canonical version. It is also rob@76: requested that these non-binding requests be included whenever the rob@76: above license is reproduced. rob@76: */ rob@76: #include "ip/UdpSocket.h" rob@76: rob@76: #include rob@76: #include rob@76: #include rob@76: #include rob@76: #include rob@76: #include rob@76: #include rob@76: #include rob@76: #include // for sockaddr_in rob@76: rob@76: #include rob@76: #include rob@76: #include rob@76: #include rob@76: rob@76: #include rob@76: #include rob@76: #include // for memset rob@76: #include rob@76: #include rob@76: rob@76: #include "ip/PacketListener.h" rob@76: #include "ip/TimerListener.h" rob@76: rob@76: rob@76: #if defined(__APPLE__) && !defined(_SOCKLEN_T) rob@76: // pre system 10.3 didn't have socklen_t rob@76: typedef ssize_t socklen_t; rob@76: #endif rob@76: rob@76: rob@76: static void SockaddrFromIpEndpointName( struct sockaddr_in& sockAddr, const IpEndpointName& endpoint ) rob@76: { rob@76: std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) ); rob@76: sockAddr.sin_family = AF_INET; rob@76: rob@76: sockAddr.sin_addr.s_addr = rob@76: (endpoint.address == IpEndpointName::ANY_ADDRESS) rob@76: ? INADDR_ANY rob@76: : htonl( endpoint.address ); rob@76: rob@76: sockAddr.sin_port = rob@76: (endpoint.port == IpEndpointName::ANY_PORT) rob@76: ? 0 rob@76: : htons( endpoint.port ); rob@76: } rob@76: rob@76: rob@76: static IpEndpointName IpEndpointNameFromSockaddr( const struct sockaddr_in& sockAddr ) rob@76: { rob@76: return IpEndpointName( rob@76: (sockAddr.sin_addr.s_addr == INADDR_ANY) rob@76: ? IpEndpointName::ANY_ADDRESS rob@76: : ntohl( sockAddr.sin_addr.s_addr ), rob@76: (sockAddr.sin_port == 0) rob@76: ? IpEndpointName::ANY_PORT rob@76: : ntohs( sockAddr.sin_port ) rob@76: ); rob@76: } rob@76: rob@76: rob@76: class UdpSocket::Implementation{ rob@76: bool isBound_; rob@76: bool isConnected_; rob@76: rob@76: int socket_; rob@76: struct sockaddr_in connectedAddr_; rob@76: struct sockaddr_in sendToAddr_; rob@76: rob@76: public: rob@76: rob@76: Implementation() rob@76: : isBound_( false ) rob@76: , isConnected_( false ) rob@76: , socket_( -1 ) rob@76: { rob@76: if( (socket_ = socket( AF_INET, SOCK_DGRAM, 0 )) == -1 ){ rob@76: throw std::runtime_error("unable to create udp socket\n"); rob@76: } rob@76: rob@76: std::memset( &sendToAddr_, 0, sizeof(sendToAddr_) ); rob@76: sendToAddr_.sin_family = AF_INET; rob@76: } rob@76: rob@76: ~Implementation() rob@76: { rob@76: if (socket_ != -1) close(socket_); rob@76: } rob@76: rob@76: void SetEnableBroadcast( bool enableBroadcast ) rob@76: { rob@76: int broadcast = (enableBroadcast) ? 1 : 0; // int on posix rob@76: setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &broadcast, sizeof(broadcast)); rob@76: } rob@76: rob@76: void SetAllowReuse( bool allowReuse ) rob@76: { rob@76: int reuseAddr = (allowReuse) ? 1 : 0; // int on posix rob@76: setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr)); rob@76: rob@76: #ifdef __APPLE__ rob@76: // needed also for OS X - enable multiple listeners for a single port on same network interface rob@76: int reusePort = (allowReuse) ? 1 : 0; // int on posix rob@76: setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &reusePort, sizeof(reusePort)); rob@76: #endif rob@76: } rob@76: rob@76: IpEndpointName LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const rob@76: { rob@76: assert( isBound_ ); rob@76: rob@76: // first connect the socket to the remote server rob@76: rob@76: struct sockaddr_in connectSockAddr; rob@76: SockaddrFromIpEndpointName( connectSockAddr, remoteEndpoint ); rob@76: rob@76: if (connect(socket_, (struct sockaddr *)&connectSockAddr, sizeof(connectSockAddr)) < 0) { rob@76: throw std::runtime_error("unable to connect udp socket\n"); rob@76: } rob@76: rob@76: // get the address rob@76: rob@76: struct sockaddr_in sockAddr; rob@76: std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) ); rob@76: socklen_t length = sizeof(sockAddr); rob@76: if (getsockname(socket_, (struct sockaddr *)&sockAddr, &length) < 0) { rob@76: throw std::runtime_error("unable to getsockname\n"); rob@76: } rob@76: rob@76: if( isConnected_ ){ rob@76: // reconnect to the connected address rob@76: rob@76: if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) { rob@76: throw std::runtime_error("unable to connect udp socket\n"); rob@76: } rob@76: rob@76: }else{ rob@76: // unconnect from the remote address rob@76: rob@76: struct sockaddr_in unconnectSockAddr; rob@76: std::memset( (char *)&unconnectSockAddr, 0, sizeof(unconnectSockAddr ) ); rob@76: unconnectSockAddr.sin_family = AF_UNSPEC; rob@76: // address fields are zero rob@76: int connectResult = connect(socket_, (struct sockaddr *)&unconnectSockAddr, sizeof(unconnectSockAddr)); rob@76: if ( connectResult < 0 && errno != EAFNOSUPPORT ) { rob@76: throw std::runtime_error("unable to un-connect udp socket\n"); rob@76: } rob@76: } rob@76: rob@76: return IpEndpointNameFromSockaddr( sockAddr ); rob@76: } rob@76: rob@76: void Connect( const IpEndpointName& remoteEndpoint ) rob@76: { rob@76: SockaddrFromIpEndpointName( connectedAddr_, remoteEndpoint ); rob@76: rob@76: if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) { rob@76: throw std::runtime_error("unable to connect udp socket\n"); rob@76: } rob@76: rob@76: isConnected_ = true; rob@76: } rob@76: rob@76: void Send( const char *data, std::size_t size ) rob@76: { rob@76: assert( isConnected_ ); rob@76: rob@76: send( socket_, data, size, 0 ); rob@76: } rob@76: rob@76: void SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size ) rob@76: { rob@76: sendToAddr_.sin_addr.s_addr = htonl( remoteEndpoint.address ); rob@76: sendToAddr_.sin_port = htons( remoteEndpoint.port ); rob@76: rob@76: sendto( socket_, data, size, 0, (sockaddr*)&sendToAddr_, sizeof(sendToAddr_) ); rob@76: } rob@76: rob@76: void Bind( const IpEndpointName& localEndpoint ) rob@76: { rob@76: struct sockaddr_in bindSockAddr; rob@76: SockaddrFromIpEndpointName( bindSockAddr, localEndpoint ); rob@76: rob@76: if (bind(socket_, (struct sockaddr *)&bindSockAddr, sizeof(bindSockAddr)) < 0) { rob@76: throw std::runtime_error("unable to bind udp socket\n"); rob@76: } rob@76: rob@76: isBound_ = true; rob@76: } rob@76: rob@76: bool IsBound() const { return isBound_; } rob@76: rob@76: std::size_t ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size ) rob@76: { rob@76: assert( isBound_ ); rob@76: rob@76: struct sockaddr_in fromAddr; rob@76: socklen_t fromAddrLen = sizeof(fromAddr); rob@76: rob@76: ssize_t result = recvfrom(socket_, data, size, 0, rob@76: (struct sockaddr *) &fromAddr, (socklen_t*)&fromAddrLen); rob@76: if( result < 0 ) rob@76: return 0; rob@76: rob@76: remoteEndpoint.address = ntohl(fromAddr.sin_addr.s_addr); rob@76: remoteEndpoint.port = ntohs(fromAddr.sin_port); rob@76: rob@76: return (std::size_t)result; rob@76: } rob@76: rob@76: int Socket() { return socket_; } rob@76: }; rob@76: rob@76: UdpSocket::UdpSocket() rob@76: { rob@76: impl_ = new Implementation(); rob@76: } rob@76: rob@76: UdpSocket::~UdpSocket() rob@76: { rob@76: delete impl_; rob@76: } rob@76: rob@76: void UdpSocket::SetEnableBroadcast( bool enableBroadcast ) rob@76: { rob@76: impl_->SetEnableBroadcast( enableBroadcast ); rob@76: } rob@76: rob@76: void UdpSocket::SetAllowReuse( bool allowReuse ) rob@76: { rob@76: impl_->SetAllowReuse( allowReuse ); rob@76: } rob@76: rob@76: IpEndpointName UdpSocket::LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const rob@76: { rob@76: return impl_->LocalEndpointFor( remoteEndpoint ); rob@76: } rob@76: rob@76: void UdpSocket::Connect( const IpEndpointName& remoteEndpoint ) rob@76: { rob@76: impl_->Connect( remoteEndpoint ); rob@76: } rob@76: rob@76: void UdpSocket::Send( const char *data, std::size_t size ) rob@76: { rob@76: impl_->Send( data, size ); rob@76: } rob@76: rob@76: void UdpSocket::SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size ) rob@76: { rob@76: impl_->SendTo( remoteEndpoint, data, size ); rob@76: } rob@76: rob@76: void UdpSocket::Bind( const IpEndpointName& localEndpoint ) rob@76: { rob@76: impl_->Bind( localEndpoint ); rob@76: } rob@76: rob@76: bool UdpSocket::IsBound() const rob@76: { rob@76: return impl_->IsBound(); rob@76: } rob@76: rob@76: std::size_t UdpSocket::ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size ) rob@76: { rob@76: return impl_->ReceiveFrom( remoteEndpoint, data, size ); rob@76: } rob@76: rob@76: rob@76: struct AttachedTimerListener{ rob@76: AttachedTimerListener( int id, int p, TimerListener *tl ) rob@76: : initialDelayMs( id ) rob@76: , periodMs( p ) rob@76: , listener( tl ) {} rob@76: int initialDelayMs; rob@76: int periodMs; rob@76: TimerListener *listener; rob@76: }; rob@76: rob@76: rob@76: static bool CompareScheduledTimerCalls( rob@76: const std::pair< double, AttachedTimerListener > & lhs, const std::pair< double, AttachedTimerListener > & rhs ) rob@76: { rob@76: return lhs.first < rhs.first; rob@76: } rob@76: rob@76: rob@76: SocketReceiveMultiplexer *multiplexerInstanceToAbortWithSigInt_ = 0; rob@76: rob@76: extern "C" /*static*/ void InterruptSignalHandler( int ); rob@76: /*static*/ void InterruptSignalHandler( int ) rob@76: { rob@76: multiplexerInstanceToAbortWithSigInt_->AsynchronousBreak(); rob@76: signal( SIGINT, SIG_DFL ); rob@76: } rob@76: rob@76: rob@76: class SocketReceiveMultiplexer::Implementation{ rob@76: std::vector< std::pair< PacketListener*, UdpSocket* > > socketListeners_; rob@76: std::vector< AttachedTimerListener > timerListeners_; rob@76: rob@76: volatile bool break_; rob@76: int breakPipe_[2]; // [0] is the reader descriptor and [1] the writer rob@76: rob@76: double GetCurrentTimeMs() const rob@76: { rob@76: struct timeval t; rob@76: rob@76: gettimeofday( &t, 0 ); rob@76: rob@76: return ((double)t.tv_sec*1000.) + ((double)t.tv_usec / 1000.); rob@76: } rob@76: rob@76: public: rob@76: Implementation() rob@76: { rob@76: if( pipe(breakPipe_) != 0 ) rob@76: throw std::runtime_error( "creation of asynchronous break pipes failed\n" ); rob@76: } rob@76: rob@76: ~Implementation() rob@76: { rob@76: close( breakPipe_[0] ); rob@76: close( breakPipe_[1] ); rob@76: } rob@76: rob@76: void AttachSocketListener( UdpSocket *socket, PacketListener *listener ) rob@76: { rob@76: assert( std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) ) == socketListeners_.end() ); rob@76: // we don't check that the same socket has been added multiple times, even though this is an error rob@76: socketListeners_.push_back( std::make_pair( listener, socket ) ); rob@76: } rob@76: rob@76: void DetachSocketListener( UdpSocket *socket, PacketListener *listener ) rob@76: { rob@76: std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = rob@76: std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) ); rob@76: assert( i != socketListeners_.end() ); rob@76: rob@76: socketListeners_.erase( i ); rob@76: } rob@76: rob@76: void AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener ) rob@76: { rob@76: timerListeners_.push_back( AttachedTimerListener( periodMilliseconds, periodMilliseconds, listener ) ); rob@76: } rob@76: rob@76: void AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener ) rob@76: { rob@76: timerListeners_.push_back( AttachedTimerListener( initialDelayMilliseconds, periodMilliseconds, listener ) ); rob@76: } rob@76: rob@76: void DetachPeriodicTimerListener( TimerListener *listener ) rob@76: { rob@76: std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin(); rob@76: while( i != timerListeners_.end() ){ rob@76: if( i->listener == listener ) rob@76: break; rob@76: ++i; rob@76: } rob@76: rob@76: assert( i != timerListeners_.end() ); rob@76: rob@76: timerListeners_.erase( i ); rob@76: } rob@76: rob@76: void Run() rob@76: { rob@76: break_ = false; rob@76: char *data = 0; rob@76: rob@76: try{ rob@76: rob@76: // configure the master fd_set for select() rob@76: rob@76: fd_set masterfds, tempfds; rob@76: FD_ZERO( &masterfds ); rob@76: FD_ZERO( &tempfds ); rob@76: rob@76: // in addition to listening to the inbound sockets we rob@76: // also listen to the asynchronous break pipe, so that AsynchronousBreak() rob@76: // can break us out of select() from another thread. rob@76: FD_SET( breakPipe_[0], &masterfds ); rob@76: int fdmax = breakPipe_[0]; rob@76: rob@76: for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin(); rob@76: i != socketListeners_.end(); ++i ){ rob@76: rob@76: if( fdmax < i->second->impl_->Socket() ) rob@76: fdmax = i->second->impl_->Socket(); rob@76: FD_SET( i->second->impl_->Socket(), &masterfds ); rob@76: } rob@76: rob@76: rob@76: // configure the timer queue rob@76: double currentTimeMs = GetCurrentTimeMs(); rob@76: rob@76: // expiry time ms, listener rob@76: std::vector< std::pair< double, AttachedTimerListener > > timerQueue_; rob@76: for( std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin(); rob@76: i != timerListeners_.end(); ++i ) rob@76: timerQueue_.push_back( std::make_pair( currentTimeMs + i->initialDelayMs, *i ) ); rob@76: std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls ); rob@76: rob@76: const int MAX_BUFFER_SIZE = 4098; rob@76: data = new char[ MAX_BUFFER_SIZE ]; rob@76: IpEndpointName remoteEndpoint; rob@76: rob@76: struct timeval timeout; rob@76: rob@76: while( !break_ ){ rob@76: tempfds = masterfds; rob@76: rob@76: struct timeval *timeoutPtr = 0; rob@76: if( !timerQueue_.empty() ){ rob@76: double timeoutMs = timerQueue_.front().first - GetCurrentTimeMs(); rob@76: if( timeoutMs < 0 ) rob@76: timeoutMs = 0; rob@76: rob@76: long timoutSecondsPart = (long)(timeoutMs * .001); rob@76: timeout.tv_sec = (time_t)timoutSecondsPart; rob@76: // 1000000 microseconds in a second rob@76: timeout.tv_usec = (suseconds_t)((timeoutMs - (timoutSecondsPart * 1000)) * 1000); rob@76: timeoutPtr = &timeout; rob@76: } rob@76: rob@76: if( select( fdmax + 1, &tempfds, 0, 0, timeoutPtr ) < 0 ){ rob@76: if( break_ ){ rob@76: break; rob@76: }else if( errno == EINTR ){ rob@76: // on returning an error, select() doesn't clear tempfds. rob@76: // so tempfds would remain all set, which would cause read( breakPipe_[0]... rob@76: // below to block indefinitely. therefore if select returns EINTR we restart rob@76: // the while() loop instead of continuing on to below. rob@76: continue; rob@76: }else{ rob@76: throw std::runtime_error("select failed\n"); rob@76: } rob@76: } rob@76: rob@76: if( FD_ISSET( breakPipe_[0], &tempfds ) ){ rob@76: // clear pending data from the asynchronous break pipe rob@76: char c; rob@76: read( breakPipe_[0], &c, 1 ); rob@76: } rob@76: rob@76: if( break_ ) rob@76: break; rob@76: rob@76: for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin(); rob@76: i != socketListeners_.end(); ++i ){ rob@76: rob@76: if( FD_ISSET( i->second->impl_->Socket(), &tempfds ) ){ rob@76: rob@76: std::size_t size = i->second->ReceiveFrom( remoteEndpoint, data, MAX_BUFFER_SIZE ); rob@76: if( size > 0 ){ rob@76: i->first->ProcessPacket( data, (int)size, remoteEndpoint ); rob@76: if( break_ ) rob@76: break; rob@76: } rob@76: } rob@76: } rob@76: rob@76: // execute any expired timers rob@76: currentTimeMs = GetCurrentTimeMs(); rob@76: bool resort = false; rob@76: for( std::vector< std::pair< double, AttachedTimerListener > >::iterator i = timerQueue_.begin(); rob@76: i != timerQueue_.end() && i->first <= currentTimeMs; ++i ){ rob@76: rob@76: i->second.listener->TimerExpired(); rob@76: if( break_ ) rob@76: break; rob@76: rob@76: i->first += i->second.periodMs; rob@76: resort = true; rob@76: } rob@76: if( resort ) rob@76: std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls ); rob@76: } rob@76: rob@76: delete [] data; rob@76: }catch(...){ rob@76: if( data ) rob@76: delete [] data; rob@76: throw; rob@76: } rob@76: } rob@76: rob@76: void Break() rob@76: { rob@76: break_ = true; rob@76: } rob@76: rob@76: void AsynchronousBreak() rob@76: { rob@76: break_ = true; rob@76: rob@76: // Send a termination message to the asynchronous break pipe, so select() will return rob@76: write( breakPipe_[1], "!", 1 ); rob@76: } rob@76: }; rob@76: rob@76: rob@76: rob@76: SocketReceiveMultiplexer::SocketReceiveMultiplexer() rob@76: { rob@76: impl_ = new Implementation(); rob@76: } rob@76: rob@76: SocketReceiveMultiplexer::~SocketReceiveMultiplexer() rob@76: { rob@76: delete impl_; rob@76: } rob@76: rob@76: void SocketReceiveMultiplexer::AttachSocketListener( UdpSocket *socket, PacketListener *listener ) rob@76: { rob@76: impl_->AttachSocketListener( socket, listener ); rob@76: } rob@76: rob@76: void SocketReceiveMultiplexer::DetachSocketListener( UdpSocket *socket, PacketListener *listener ) rob@76: { rob@76: impl_->DetachSocketListener( socket, listener ); rob@76: } rob@76: rob@76: void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener ) rob@76: { rob@76: impl_->AttachPeriodicTimerListener( periodMilliseconds, listener ); rob@76: } rob@76: rob@76: void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener ) rob@76: { rob@76: impl_->AttachPeriodicTimerListener( initialDelayMilliseconds, periodMilliseconds, listener ); rob@76: } rob@76: rob@76: void SocketReceiveMultiplexer::DetachPeriodicTimerListener( TimerListener *listener ) rob@76: { rob@76: impl_->DetachPeriodicTimerListener( listener ); rob@76: } rob@76: rob@76: void SocketReceiveMultiplexer::Run() rob@76: { rob@76: impl_->Run(); rob@76: } rob@76: rob@76: void SocketReceiveMultiplexer::RunUntilSigInt() rob@76: { rob@76: assert( multiplexerInstanceToAbortWithSigInt_ == 0 ); /* at present we support only one multiplexer instance running until sig int */ rob@76: multiplexerInstanceToAbortWithSigInt_ = this; rob@76: signal( SIGINT, InterruptSignalHandler ); rob@76: impl_->Run(); rob@76: signal( SIGINT, SIG_DFL ); rob@76: multiplexerInstanceToAbortWithSigInt_ = 0; rob@76: } rob@76: rob@76: void SocketReceiveMultiplexer::Break() rob@76: { rob@76: impl_->Break(); rob@76: } rob@76: rob@76: void SocketReceiveMultiplexer::AsynchronousBreak() rob@76: { rob@76: impl_->AsynchronousBreak(); rob@76: } rob@76: