Mercurial > hg > beaglert
diff core/NetworkSend.cpp @ 217:c42a6b4dc2d4 mergingClockSync
Recovered some files from ClockSync
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Sat, 13 Feb 2016 04:09:12 +0000 |
parents | |
children | 0183185cab1b |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/NetworkSend.cpp Sat Feb 13 04:09:12 2016 +0000 @@ -0,0 +1,294 @@ +#include "NetworkSend.h" + +#ifdef USE_JUCE +#else +//initialize the static members of NetworkSend +bool NetworkSend::staticConstructed=false; +int NetworkSend::sleepTimeMs; +bool NetworkSend::threadIsExiting; +bool NetworkSend::threadRunning; +std::vector<NetworkSend*> NetworkSend::objAddrs(0); +AuxiliaryTask NetworkSend::sendDataTask=NULL; + +void sendData(){ + NetworkSend::run(); +} +void NetworkSend::staticConstructor(){ + if(staticConstructed==true) + return; + staticConstructed=true; + threadIsExiting=false; + threadRunning=false; + sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority +} +void NetworkSend::sendAllData(){ + for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){ + NetworkSend::objAddrs[n]->sendData(); + } +} +int NetworkSend::getNumInstances(){ + return objAddrs.size(); +} +void NetworkSend::startThread(){ + BeagleRT_scheduleAuxiliaryTask(sendDataTask); +} +void NetworkSend::stopThread(){ + threadIsExiting=true; +} +bool NetworkSend::threadShouldExit(){ + return(gShouldStop || threadIsExiting); +} +bool NetworkSend::isThreadRunning(){ + return threadRunning; +} +#endif /* USE_JUCE */ + +#ifdef USE_JUCE +NetworkSend::NetworkSend(const String &threadName): + Thread(threadName) +#else +NetworkSend::NetworkSend() +#endif /* USE_JUCE */ +{ + channel.buffers=NULL; + channel.doneOnTime=NULL; + channel.readyToBeSent=NULL; + channel.enabled=false; + sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable + channel.sampleCount=0; +} + +NetworkSend::~NetworkSend(){ +#ifdef USE_JUCE + stopThread(1000); +#else + stopThread(); + for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances; + if(objAddrs[n]==this){ + objAddrs.erase(objAddrs.begin()+n); + break; + } + } +#endif + dealloc(); +} +void NetworkSend::dealloc(){ + channel.enabled=false; + if(channel.buffers!=NULL){ + for(int n=0; n<channel.numBuffers; n++){ + free(channel.buffers[n]); + channel.buffers[n]=NULL; + } + free(channel.buffers); + channel.buffers=NULL; + } + free(channel.readyToBeSent); + channel.readyToBeSent=NULL; + free(channel.doneOnTime); + channel.doneOnTime=NULL; +} +void NetworkSend::cleanup(){ + dealloc(); +} + +void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){ +#ifdef USE_JUCE +#else + staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible + //because of limitations in BeagleRT_createAuxiliaryTask() + //keep track of added active instances + objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if + // an instance of NetworkSend is then declared globally: the constructor gets called, + // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without + // any destructor being called in between ... +#endif /* USE_JUCE */ + cleanup(); + int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength; + channel.numBuffers= 1+numSamples/channel.bufferLength; //the +1 takes the ceil() of the division + channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*)); + printf("NumBuffers: %d\n", channel.numBuffers); + if(channel.buffers==NULL) + return; + for(int n=0; n<channel.numBuffers; n++){ + channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float)); + if(channel.buffers[n]==NULL) + return; + } + channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool)); + channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool)); + for(int n=0; n<channel.numBuffers; n++){ + channel.readyToBeSent[n]=false; + channel.doneOnTime[n]=true; + } + if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL) + return; + channel.writePointer=0; + channel.writeBuffer=0; + channel.readBuffer=0; + setChannelNumber(aChannelNumber); + setPort(aPort); //TODO: check for the return value + setServer(aServer); //TODO: check for the return value + printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate); + channel.enabled=true; +} + +void NetworkSend::log(float value){ //TODO: add a vectorized version of this method + if(channel.enabled==false) + return; + if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ... + channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such +// printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); + channel.writePointer=channel.headerLength; //reset the writePointer + channel.writeBuffer=(channel.writeBuffer+1); //switch buffer + if(channel.writeBuffer==channel.numBuffers) // and wrap it + channel.writeBuffer=0; +// printf("WriteBuffer:%d\n", channel.writeBuffer); + if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ... + printf("Network buffer underrun. timestamp: %d :-{\n", + (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); + } + channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag +#ifdef USE_JUCE + if(isThreadRunning()==false){ + startThread(10); + } +#else + if(isThreadRunning()==false){ + startThread(); + } +#endif /* USE_JUCE */ + } + if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header + //set dynamic header values here. Static values are set in setup() and setChannelNumber(). + channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp + channel.sampleCount++; + //add here more header fields + } + channel.buffers[channel.writeBuffer][channel.writePointer++]=value; +// sampleCount++; +}; + +void NetworkSend::setServer(const char *aServer){ +#ifdef USE_JUCE + remoteHostname=String::fromUTF8(aServer); +#else + udpClient.setServer(aServer); +#endif /* USE_JUCE */ +} +void NetworkSend::setPort(int aPort){ +#ifdef USE_JUCE + remotePortNumber=aPort; +#else + udpClient.setPort(aPort); +#endif /* USE_JUCE */ +} + +void NetworkSend::setChannelNumber(int aChannelNumber){ + channel.channelNumber=aChannelNumber; + for(int n=0; n<channel.numBuffers; n++){ //initialize the header + channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber; + //add here more static header fields + } +}; +int NetworkSend::getChannelNumber(){ + return channel.channelNumber; +}; + +int NetworkSend::getTimestamp(){ + return channel.buffers[channel.readBuffer][channel.headerTimestampIndex]; +} + +void NetworkSend::sendData(){ + if(channel.enabled==false) + return; + while(channel.readyToBeSent[channel.readBuffer]==true){ + channel.readyToBeSent[channel.readBuffer]=false; + void* sourceBuffer=channel.buffers[channel.readBuffer]; +// printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]); +// printf("ReadBuffer:%d\n", channel.readBuffer); + unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float); + //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?) +#ifdef USE_JUCE + if(1==udpClient.waitUntilReady(0, 5)){ + udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend); + channel.doneOnTime[channel.readBuffer]=true; + // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]); + } else { + // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]); + } +#else + udpClient.send(sourceBuffer, numBytesToSend); + channel.doneOnTime[channel.readBuffer]=true; +#endif /* USE_JUCE */ + channel.readBuffer++; + if(channel.readBuffer==channel.numBuffers) + channel.readBuffer=0; + } +} + +void NetworkSend::run(){ +#ifdef USE_JUCE + // std::chrono::high_resolution_clock::time_point t1; + // std::chrono::high_resolution_clock::time_point t2; + // std::chrono::high_resolution_clock::time_point t3; + while(threadShouldExit()==false){ + // t3 = std::chrono::high_resolution_clock::now(); + // t1 = std::chrono::high_resolution_clock::now(); + sendData(); + // t2 = std::chrono::high_resolution_clock::now(); + // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count(); + // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count(); + // if(duration2>0) + // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n"; + sleep(1); + } +#else + threadRunning=true; + while(threadShouldExit()==false){ + sendAllData(); + usleep(sleepTimeMs*1000); + } + threadRunning=false; +#endif +} +#ifdef USE_JUCE +#else +Scope::Scope(int aNumChannels): + channels(aNumChannels) +{}; +Scope::~Scope(){}; + +void Scope::log(int channel, float value){ + if(channel>=getNumChannels()) //TODO: assert this + return; + channels[channel].log(value); +} + +void Scope::setup(){ + setup(44100, 9999, "127.0.0.1"); +} + +void Scope::setup(float sampleRate, int aPort, const char* aServer){ + for(int n=0; n<getNumChannels(); n++){ + channels[n].setup(sampleRate, 128, n, aPort, aServer); //TODO: replace 128 with the actual block size + } +} + +void Scope::setPort(int port){ + for(int n=0; n<getNumChannels(); n++){ + channels[n].setPort(port); + } +} +void Scope::setPort(int channel, int aPort){ + channels[channel].setPort(aPort); + printf("Channel %d is now sending to port %d\n", channel, aPort); +} + +int Scope::getNumChannels(){ + return channels.size(); +} + +void Scope::sendData(){ + NetworkSend::sendAllData(); +} +#endif