annotate core/NetworkSend.cpp @ 258:88cf310417cd aux_task_args

Add a parameter 'autoSchedule' to createAuxiliaryTask() which when true causes the task to be automatically scheduled after every render function call, without the user needing to call scheduleAuxiliaryTask()
author Liam Donovan <l.b.donovan@qmul.ac.uk>
date Sat, 07 May 2016 13:23:15 +0100
parents 6a23c07d0fbb
children e4392164b458
rev   line source
giuliomoro@217 1 #include "NetworkSend.h"
giuliomoro@217 2
giuliomoro@217 3 #ifdef USE_JUCE
giuliomoro@217 4 #else
giuliomoro@217 5 //initialize the static members of NetworkSend
giuliomoro@217 6 bool NetworkSend::staticConstructed=false;
giuliomoro@217 7 int NetworkSend::sleepTimeMs;
giuliomoro@217 8 bool NetworkSend::threadIsExiting;
giuliomoro@217 9 bool NetworkSend::threadRunning;
giuliomoro@217 10 std::vector<NetworkSend*> NetworkSend::objAddrs(0);
giuliomoro@217 11 AuxiliaryTask NetworkSend::sendDataTask=NULL;
giuliomoro@217 12
giuliomoro@217 13 void sendData(){
giuliomoro@217 14 NetworkSend::run();
giuliomoro@217 15 }
giuliomoro@217 16 void NetworkSend::staticConstructor(){
giuliomoro@217 17 if(staticConstructed==true)
giuliomoro@217 18 return;
giuliomoro@217 19 staticConstructed=true;
giuliomoro@217 20 threadIsExiting=false;
giuliomoro@217 21 threadRunning=false;
giuliomoro@217 22 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority
giuliomoro@217 23 }
giuliomoro@217 24 void NetworkSend::sendAllData(){
giuliomoro@217 25 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){
giuliomoro@217 26 NetworkSend::objAddrs[n]->sendData();
giuliomoro@217 27 }
giuliomoro@217 28 }
giuliomoro@217 29 int NetworkSend::getNumInstances(){
giuliomoro@217 30 return objAddrs.size();
giuliomoro@217 31 }
giuliomoro@217 32 void NetworkSend::startThread(){
giuliomoro@217 33 BeagleRT_scheduleAuxiliaryTask(sendDataTask);
giuliomoro@217 34 }
giuliomoro@217 35 void NetworkSend::stopThread(){
giuliomoro@217 36 threadIsExiting=true;
giuliomoro@217 37 }
giuliomoro@217 38 bool NetworkSend::threadShouldExit(){
giuliomoro@217 39 return(gShouldStop || threadIsExiting);
giuliomoro@217 40 }
giuliomoro@217 41 bool NetworkSend::isThreadRunning(){
giuliomoro@217 42 return threadRunning;
giuliomoro@217 43 }
giuliomoro@217 44 #endif /* USE_JUCE */
giuliomoro@217 45
giuliomoro@217 46 #ifdef USE_JUCE
giuliomoro@217 47 NetworkSend::NetworkSend(const String &threadName):
giuliomoro@217 48 Thread(threadName)
giuliomoro@217 49 #else
giuliomoro@217 50 NetworkSend::NetworkSend()
giuliomoro@217 51 #endif /* USE_JUCE */
giuliomoro@217 52 {
giuliomoro@217 53 channel.buffers=NULL;
giuliomoro@217 54 channel.doneOnTime=NULL;
giuliomoro@217 55 channel.readyToBeSent=NULL;
giuliomoro@217 56 channel.enabled=false;
giuliomoro@217 57 sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable
giuliomoro@217 58 channel.sampleCount=0;
giuliomoro@217 59 }
giuliomoro@217 60
giuliomoro@217 61 NetworkSend::~NetworkSend(){
giuliomoro@217 62 #ifdef USE_JUCE
giuliomoro@217 63 stopThread(1000);
giuliomoro@217 64 #else
giuliomoro@217 65 stopThread();
giuliomoro@217 66 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances;
giuliomoro@217 67 if(objAddrs[n]==this){
giuliomoro@217 68 objAddrs.erase(objAddrs.begin()+n);
giuliomoro@217 69 break;
giuliomoro@217 70 }
giuliomoro@217 71 }
giuliomoro@217 72 #endif
giuliomoro@217 73 dealloc();
giuliomoro@217 74 }
giuliomoro@217 75 void NetworkSend::dealloc(){
giuliomoro@217 76 channel.enabled=false;
giuliomoro@217 77 if(channel.buffers!=NULL){
giuliomoro@217 78 for(int n=0; n<channel.numBuffers; n++){
giuliomoro@217 79 free(channel.buffers[n]);
giuliomoro@217 80 channel.buffers[n]=NULL;
giuliomoro@217 81 }
giuliomoro@217 82 free(channel.buffers);
giuliomoro@217 83 channel.buffers=NULL;
giuliomoro@217 84 }
giuliomoro@217 85 free(channel.readyToBeSent);
giuliomoro@217 86 channel.readyToBeSent=NULL;
giuliomoro@217 87 free(channel.doneOnTime);
giuliomoro@217 88 channel.doneOnTime=NULL;
giuliomoro@217 89 }
giuliomoro@217 90 void NetworkSend::cleanup(){
giuliomoro@217 91 dealloc();
giuliomoro@217 92 }
giuliomoro@217 93
giuliomoro@217 94 void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){
giuliomoro@217 95 #ifdef USE_JUCE
giuliomoro@217 96 #else
giuliomoro@217 97 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible
giuliomoro@217 98 //because of limitations in BeagleRT_createAuxiliaryTask()
giuliomoro@217 99 //keep track of added active instances
giuliomoro@217 100 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if
giuliomoro@217 101 // an instance of NetworkSend is then declared globally: the constructor gets called,
giuliomoro@217 102 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without
giuliomoro@220 103 // any destructor being called in between ... Have a look here
giuliomoro@220 104 // http://stackoverflow.com/questions/7542054/global-vector-emptying-itself-between-calls .
giuliomoro@220 105 // and maybe use accessor function instead of global, as was done in #1374
giuliomoro@217 106 #endif /* USE_JUCE */
giuliomoro@217 107 cleanup();
giuliomoro@217 108 int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength;
giuliomoro@222 109 channel.numBuffers= (1+numSamples/channel.bufferLength) * 3; //the +1 takes the ceil() of the division
giuliomoro@217 110 channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*));
giuliomoro@217 111 printf("NumBuffers: %d\n", channel.numBuffers);
giuliomoro@217 112 if(channel.buffers==NULL)
giuliomoro@217 113 return;
giuliomoro@217 114 for(int n=0; n<channel.numBuffers; n++){
giuliomoro@217 115 channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float));
giuliomoro@217 116 if(channel.buffers[n]==NULL)
giuliomoro@217 117 return;
giuliomoro@217 118 }
giuliomoro@217 119 channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool));
giuliomoro@217 120 channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool));
giuliomoro@217 121 for(int n=0; n<channel.numBuffers; n++){
giuliomoro@217 122 channel.readyToBeSent[n]=false;
giuliomoro@217 123 channel.doneOnTime[n]=true;
giuliomoro@217 124 }
giuliomoro@217 125 if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL)
giuliomoro@217 126 return;
giuliomoro@220 127 channel.writePointer=channel.headerLength;
giuliomoro@217 128 channel.writeBuffer=0;
giuliomoro@217 129 channel.readBuffer=0;
giuliomoro@217 130 setChannelNumber(aChannelNumber);
giuliomoro@217 131 setPort(aPort); //TODO: check for the return value
giuliomoro@217 132 setServer(aServer); //TODO: check for the return value
giuliomoro@217 133 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate);
giuliomoro@217 134 channel.enabled=true;
giuliomoro@217 135 }
giuliomoro@217 136
giuliomoro@217 137 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method
giuliomoro@217 138 if(channel.enabled==false)
giuliomoro@217 139 return;
giuliomoro@217 140 if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ...
giuliomoro@217 141 channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such
giuliomoro@217 142 // printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
giuliomoro@217 143 channel.writePointer=channel.headerLength; //reset the writePointer
giuliomoro@217 144 channel.writeBuffer=(channel.writeBuffer+1); //switch buffer
giuliomoro@217 145 if(channel.writeBuffer==channel.numBuffers) // and wrap it
giuliomoro@222 146 channel.writeBuffer=0;
giuliomoro@217 147 // printf("WriteBuffer:%d\n", channel.writeBuffer);
giuliomoro@217 148 if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ...
giuliomoro@220 149 printf("NetworkSend buffer underrun. timestamp: %d :-{\n",
giuliomoro@217 150 (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
giuliomoro@217 151 }
giuliomoro@217 152 channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag
giuliomoro@217 153 #ifdef USE_JUCE
giuliomoro@217 154 if(isThreadRunning()==false){
giuliomoro@217 155 startThread(10);
giuliomoro@217 156 }
giuliomoro@217 157 #else
giuliomoro@217 158 if(isThreadRunning()==false){
giuliomoro@217 159 startThread();
giuliomoro@217 160 }
giuliomoro@217 161 #endif /* USE_JUCE */
giuliomoro@217 162 }
giuliomoro@217 163 if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header
giuliomoro@217 164 //set dynamic header values here. Static values are set in setup() and setChannelNumber().
giuliomoro@217 165 channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp
giuliomoro@217 166 channel.sampleCount++;
giuliomoro@217 167 //add here more header fields
giuliomoro@217 168 }
giuliomoro@217 169 channel.buffers[channel.writeBuffer][channel.writePointer++]=value;
giuliomoro@217 170 // sampleCount++;
giuliomoro@217 171 };
giuliomoro@217 172
giuliomoro@217 173 void NetworkSend::setServer(const char *aServer){
giuliomoro@217 174 #ifdef USE_JUCE
giuliomoro@217 175 remoteHostname=String::fromUTF8(aServer);
giuliomoro@217 176 #else
giuliomoro@217 177 udpClient.setServer(aServer);
giuliomoro@217 178 #endif /* USE_JUCE */
giuliomoro@217 179 }
giuliomoro@217 180 void NetworkSend::setPort(int aPort){
giuliomoro@217 181 #ifdef USE_JUCE
giuliomoro@217 182 remotePortNumber=aPort;
giuliomoro@217 183 #else
giuliomoro@217 184 udpClient.setPort(aPort);
giuliomoro@217 185 #endif /* USE_JUCE */
giuliomoro@217 186 }
giuliomoro@217 187
giuliomoro@217 188 void NetworkSend::setChannelNumber(int aChannelNumber){
giuliomoro@217 189 channel.channelNumber=aChannelNumber;
giuliomoro@220 190 for(int n=0; n < channel.numBuffers; n++){ //initialize the header
giuliomoro@217 191 channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber;
giuliomoro@217 192 //add here more static header fields
giuliomoro@217 193 }
giuliomoro@217 194 };
giuliomoro@217 195 int NetworkSend::getChannelNumber(){
giuliomoro@217 196 return channel.channelNumber;
giuliomoro@217 197 };
giuliomoro@217 198
giuliomoro@217 199 int NetworkSend::getTimestamp(){
giuliomoro@217 200 return channel.buffers[channel.readBuffer][channel.headerTimestampIndex];
giuliomoro@217 201 }
giuliomoro@217 202
giuliomoro@217 203 void NetworkSend::sendData(){
giuliomoro@217 204 if(channel.enabled==false)
giuliomoro@217 205 return;
giuliomoro@217 206 while(channel.readyToBeSent[channel.readBuffer]==true){
giuliomoro@217 207 channel.readyToBeSent[channel.readBuffer]=false;
giuliomoro@217 208 void* sourceBuffer=channel.buffers[channel.readBuffer];
giuliomoro@217 209 // printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]);
giuliomoro@217 210 // printf("ReadBuffer:%d\n", channel.readBuffer);
giuliomoro@217 211 unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float);
giuliomoro@217 212 //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?)
giuliomoro@217 213 #ifdef USE_JUCE
giuliomoro@217 214 if(1==udpClient.waitUntilReady(0, 5)){
giuliomoro@217 215 udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend);
giuliomoro@217 216 channel.doneOnTime[channel.readBuffer]=true;
giuliomoro@217 217 // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
giuliomoro@217 218 } else {
giuliomoro@217 219 // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
giuliomoro@217 220 }
giuliomoro@217 221 #else
giuliomoro@217 222 udpClient.send(sourceBuffer, numBytesToSend);
giuliomoro@220 223 // printf("sent sourceBuffer: %d, channel: %f, timestamp: %f\n", channel.readBuffer, channel.buffers[channel.readBuffer][0],
giuliomoro@220 224 // channel.buffers[channel.readBuffer][1]);
giuliomoro@217 225 channel.doneOnTime[channel.readBuffer]=true;
giuliomoro@217 226 #endif /* USE_JUCE */
giuliomoro@217 227 channel.readBuffer++;
giuliomoro@217 228 if(channel.readBuffer==channel.numBuffers)
giuliomoro@217 229 channel.readBuffer=0;
giuliomoro@217 230 }
giuliomoro@217 231 }
giuliomoro@217 232
giuliomoro@217 233 void NetworkSend::run(){
giuliomoro@217 234 #ifdef USE_JUCE
giuliomoro@217 235 // std::chrono::high_resolution_clock::time_point t1;
giuliomoro@217 236 // std::chrono::high_resolution_clock::time_point t2;
giuliomoro@217 237 // std::chrono::high_resolution_clock::time_point t3;
giuliomoro@217 238 while(threadShouldExit()==false){
giuliomoro@217 239 // t3 = std::chrono::high_resolution_clock::now();
giuliomoro@217 240 // t1 = std::chrono::high_resolution_clock::now();
giuliomoro@217 241 sendData();
giuliomoro@217 242 // t2 = std::chrono::high_resolution_clock::now();
giuliomoro@217 243 // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count();
giuliomoro@217 244 // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
giuliomoro@217 245 // if(duration2>0)
giuliomoro@217 246 // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n";
giuliomoro@219 247 usleep(1000);
giuliomoro@217 248 }
giuliomoro@217 249 #else
giuliomoro@217 250 threadRunning=true;
giuliomoro@217 251 while(threadShouldExit()==false){
giuliomoro@217 252 sendAllData();
giuliomoro@217 253 usleep(sleepTimeMs*1000);
giuliomoro@217 254 }
giuliomoro@217 255 threadRunning=false;
giuliomoro@217 256 #endif
giuliomoro@217 257 }