andrew@0: /* andrew@0: andrew@0: Copyright (c) 2007-2009, Damian Stewart andrew@0: All rights reserved. andrew@0: andrew@0: Redistribution and use in source and binary forms, with or without andrew@0: modification, are permitted provided that the following conditions are met: andrew@0: * Redistributions of source code must retain the above copyright andrew@0: notice, this list of conditions and the following disclaimer. andrew@0: * Redistributions in binary form must reproduce the above copyright andrew@0: notice, this list of conditions and the following disclaimer in the andrew@0: documentation and/or other materials provided with the distribution. andrew@0: * Neither the name of the developer nor the andrew@0: names of its contributors may be used to endorse or promote products andrew@0: derived from this software without specific prior written permission. andrew@0: andrew@0: THIS SOFTWARE IS PROVIDED BY DAMIAN STEWART ''AS IS'' AND ANY andrew@0: EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED andrew@0: WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE andrew@0: DISCLAIMED. IN NO EVENT SHALL DAMIAN STEWART BE LIABLE FOR ANY andrew@0: DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES andrew@0: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; andrew@0: LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND andrew@0: ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT andrew@0: (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS andrew@0: SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. andrew@0: */ andrew@0: andrew@0: #include "ofxOscReceiver.h" andrew@0: andrew@0: #ifndef TARGET_WIN32 andrew@0: #include andrew@0: #endif andrew@0: #include andrew@0: #include andrew@0: andrew@0: ofxOscReceiver::ofxOscReceiver() andrew@0: { andrew@0: listen_socket = NULL; andrew@0: } andrew@0: andrew@0: void ofxOscReceiver::setup( int listen_port ) andrew@0: { andrew@0: // if we're already running, shutdown before running again andrew@0: if ( listen_socket ) andrew@0: shutdown(); andrew@0: andrew@0: // create the mutex andrew@0: #ifdef TARGET_WIN32 andrew@0: mutex = CreateMutexA( NULL, FALSE, NULL ); andrew@0: #else andrew@0: pthread_mutex_init( &mutex, NULL ); andrew@0: #endif andrew@0: andrew@0: // create socket andrew@0: socketHasShutdown = false; andrew@0: listen_socket = new UdpListeningReceiveSocket( IpEndpointName( IpEndpointName::ANY_ADDRESS, listen_port ), this ); andrew@0: andrew@0: // start thread andrew@0: #ifdef TARGET_WIN32 andrew@0: thread = CreateThread( andrew@0: NULL, // default security attributes andrew@0: 0, // use default stack size andrew@0: &ofxOscReceiver::startThread, // thread function andrew@0: (void*)this, // argument to thread function andrew@0: 0, // use default creation flags andrew@0: NULL); // we don't the the thread id andrew@0: andrew@0: #else andrew@0: pthread_create( &thread, NULL, &ofxOscReceiver::startThread, (void*)this ); andrew@0: #endif andrew@0: } andrew@0: andrew@0: void ofxOscReceiver::shutdown() andrew@0: { andrew@0: if ( listen_socket ) andrew@0: { andrew@0: // tell the socket to shutdown andrew@0: listen_socket->AsynchronousBreak(); andrew@0: // wait for shutdown to complete andrew@0: while (!socketHasShutdown) andrew@0: { andrew@0: #ifdef TARGET_WIN32 andrew@0: Sleep(1); andrew@0: #else andrew@0: // sleep 0.1ms andrew@0: usleep(100); andrew@0: #endif andrew@0: } andrew@0: andrew@0: // thread will clean up itself andrew@0: andrew@0: // clean up the mutex andrew@0: #ifdef TARGET_WIN32 andrew@0: ReleaseMutex( mutex ); andrew@0: #else andrew@0: pthread_mutex_destroy( &mutex ); andrew@0: #endif andrew@0: andrew@0: // delete the socket andrew@0: delete listen_socket; andrew@0: listen_socket = NULL; andrew@0: } andrew@0: } andrew@0: andrew@0: ofxOscReceiver::~ofxOscReceiver() andrew@0: { andrew@0: shutdown(); andrew@0: } andrew@0: andrew@0: #ifdef TARGET_WIN32 andrew@0: DWORD WINAPI andrew@0: #else andrew@0: void* andrew@0: #endif andrew@0: ofxOscReceiver::startThread( void* receiverInstance ) andrew@0: { andrew@0: // cast the instance andrew@0: ofxOscReceiver* instance = (ofxOscReceiver*)receiverInstance; andrew@0: // start the socket listener andrew@0: instance->listen_socket->Run(); andrew@0: // socket listener has finished - remember this fact andrew@0: instance->socketHasShutdown = true; andrew@0: // return andrew@0: #ifdef TARGET_WIN32 andrew@0: return 0; andrew@0: #else andrew@0: return NULL; andrew@0: #endif andrew@0: } andrew@0: andrew@0: void ofxOscReceiver::ProcessMessage( const osc::ReceivedMessage &m, const IpEndpointName& remoteEndpoint ) andrew@0: { andrew@0: // convert the message to an ofxOscMessage andrew@0: ofxOscMessage* ofMessage = new ofxOscMessage(); andrew@0: andrew@0: // set the address andrew@0: ofMessage->setAddress( m.AddressPattern() ); andrew@0: andrew@0: // set the sender ip/host andrew@0: char endpoint_host[ IpEndpointName::ADDRESS_STRING_LENGTH ]; andrew@0: remoteEndpoint.AddressAsString( endpoint_host ); andrew@0: ofMessage->setRemoteEndpoint( endpoint_host, remoteEndpoint.port ); andrew@0: andrew@0: // transfer the arguments andrew@0: for ( osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin(); andrew@0: arg != m.ArgumentsEnd(); andrew@0: ++arg ) andrew@0: { andrew@0: if ( arg->IsInt32() ) andrew@0: ofMessage->addIntArg( arg->AsInt32Unchecked() ); andrew@0: else if ( arg->IsFloat() ) andrew@0: ofMessage->addFloatArg( arg->AsFloatUnchecked() ); andrew@0: else if ( arg->IsString() ) andrew@0: ofMessage->addStringArg( arg->AsStringUnchecked() ); andrew@0: else andrew@0: { andrew@0: assert( false && "message argument is not int, float, or string" ); andrew@0: } andrew@0: } andrew@0: andrew@0: // now add to the queue andrew@0: andrew@0: // at this point we are running inside the thread created by startThread, andrew@0: // so anyone who calls hasWaitingMessages() or getNextMessage() is coming andrew@0: // from a different thread andrew@0: andrew@0: // so we have to practise shared memory management andrew@0: andrew@0: // grab a lock on the queue andrew@0: grabMutex(); andrew@0: andrew@0: // add incoming message on to the queue andrew@0: messages.push_back( ofMessage ); andrew@0: andrew@0: // release the lock andrew@0: releaseMutex(); andrew@0: } andrew@0: andrew@0: bool ofxOscReceiver::hasWaitingMessages() andrew@0: { andrew@0: // grab a lock on the queue andrew@0: grabMutex(); andrew@0: andrew@0: // check the length of the queue andrew@0: int queue_length = (int)messages.size(); andrew@0: andrew@0: // release the lock andrew@0: releaseMutex(); andrew@0: andrew@0: // return whether we have any messages andrew@0: return queue_length > 0; andrew@0: } andrew@0: andrew@0: bool ofxOscReceiver::getNextMessage( ofxOscMessage* message ) andrew@0: { andrew@0: // grab a lock on the queue andrew@0: grabMutex(); andrew@0: andrew@0: // check if there are any to be got andrew@0: if ( messages.size() == 0 ) andrew@0: { andrew@0: // no: release the mutex andrew@0: releaseMutex(); andrew@0: return false; andrew@0: } andrew@0: andrew@0: // copy the message from the queue to message andrew@0: ofxOscMessage* src_message = messages.front(); andrew@0: message->copy( *src_message ); andrew@0: andrew@0: // now delete the src message andrew@0: delete src_message; andrew@0: // and remove it from the queue andrew@0: messages.pop_front(); andrew@0: andrew@0: // release the lock on the queue andrew@0: releaseMutex(); andrew@0: andrew@0: // return success andrew@0: return true; andrew@0: } andrew@0: andrew@0: void ofxOscReceiver::grabMutex() andrew@0: { andrew@0: #ifdef TARGET_WIN32 andrew@0: WaitForSingleObject( mutex, INFINITE ); andrew@0: #else andrew@0: pthread_mutex_lock( &mutex ); andrew@0: #endif andrew@0: } andrew@0: andrew@0: void ofxOscReceiver::releaseMutex() andrew@0: { andrew@0: #ifdef TARGET_WIN32 andrew@0: ReleaseMutex( mutex ); andrew@0: #else andrew@0: pthread_mutex_unlock( &mutex ); andrew@0: #endif andrew@0: }