annotate core/ReceiveAudioThread.cpp @ 269:ac8eb07afcf5

Oxygen text added to each render.cpp file for the default projects. Text includes project explanation from Wiki, edited in places. Empty project added as a default project. Doxyfile updated. Each of the project locations added to INPUT configuration option. Consider just watching the whole project file so all new projects are automatically pulled through.
author Robert Jack <robert.h.jack@gmail.com>
date Tue, 17 May 2016 15:40:16 +0100
parents cb47043c8c28
children e4392164b458
rev   line source
giuliomoro@236 1 #include <ReceiveAudioThread.h>
giuliomoro@217 2
giuliomoro@217 3 #ifdef USE_JUCE
giuliomoro@217 4 #else
giuliomoro@217 5 //initialise static members
giuliomoro@217 6 bool ReceiveAudioThread::staticConstructed=false;
giuliomoro@217 7 AuxiliaryTask ReceiveAudioThread::receiveDataTask=NULL;
giuliomoro@217 8 std::vector<ReceiveAudioThread *> ReceiveAudioThread::objAddrs(0);
giuliomoro@217 9 bool ReceiveAudioThread::threadRunning;
giuliomoro@217 10 bool ReceiveAudioThread::threadIsExiting;
giuliomoro@217 11 int ReceiveAudioThread::sleepTime;
giuliomoro@217 12
giuliomoro@217 13 void receiveData(){
giuliomoro@217 14 ReceiveAudioThread::run();
giuliomoro@217 15 }
giuliomoro@217 16 void ReceiveAudioThread::staticConstructor(){
giuliomoro@217 17 if(staticConstructed==true)
giuliomoro@217 18 return;
giuliomoro@217 19 staticConstructed=true;
giuliomoro@217 20 threadIsExiting=false;
giuliomoro@217 21 receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities
giuliomoro@217 22 }
giuliomoro@217 23 #endif /* USE_JUCE */
giuliomoro@217 24
giuliomoro@217 25 void ReceiveAudioThread::dealloc(){
giuliomoro@217 26 free(buffer);
giuliomoro@217 27 buffer=NULL;
giuliomoro@217 28 free(stackBuffer);
giuliomoro@217 29 stackBuffer=NULL;
giuliomoro@217 30 }
giuliomoro@217 31 void ReceiveAudioThread::wrapWritePointer(){
giuliomoro@217 32 //this is not quite a simple wrapping as you would do in a circular buffer,
giuliomoro@217 33 //as there is no guarantee the buffer will be full at all times, given that there must alwas be enough space at the end of it
giuliomoro@217 34 //to hold a full payload
giuliomoro@217 35 // lastValidPointer indicates the last pointer in the buffer containing valid data
giuliomoro@217 36 //
giuliomoro@217 37 if(writePointer+payloadLength+headerLength>bufferLength){ //if we are going to exceed the length of the buffer with the next reading
giuliomoro@217 38 // lastValidPointer=writePointer+headerLength; //remember where the last valid data are
giuliomoro@217 39 // for(int n=headerLength;n<lastValidPointer; n++){
giuliomoro@217 40 // fprintf(fd2, "%f\n",buffer[n]); //DEBUG
giuliomoro@217 41 // }
giuliomoro@222 42 writePointer=0; //and reset to beginning of the buffer
giuliomoro@217 43 }
giuliomoro@217 44 }
giuliomoro@217 45 void ReceiveAudioThread::pushPayload(int startIndex){ //backup the payload samples that will be overwritten by the new header
giuliomoro@217 46 for(int n=0; n<headerLength; n++){
giuliomoro@217 47 stackBuffer[n]=buffer[startIndex+n];
giuliomoro@217 48 }
giuliomoro@217 49 }
giuliomoro@217 50 void ReceiveAudioThread::popPayload(int startIndex){
giuliomoro@217 51 for(int n=0; n<headerLength; n++){
giuliomoro@217 52 buffer[startIndex+n]=stackBuffer[n];
giuliomoro@217 53 }
giuliomoro@217 54 }
giuliomoro@217 55
giuliomoro@217 56 int ReceiveAudioThread::readUdpToBuffer(){
giuliomoro@222 57
giuliomoro@222 58 if(listening==false || bufferReady==false)
giuliomoro@217 59 return 0;
giuliomoro@217 60 if(writePointer<0)
giuliomoro@217 61 return 0;
giuliomoro@217 62 if(socket.waitUntilReady(true, waitForSocketTime)){// TODO: if waitForSocketTime here is >>5, the
giuliomoro@222 63 // destructor (always or sometimes) never actually gets called, despite run() returns ...see issue #1381
giuliomoro@217 64 #ifdef USE_JUCE
giuliomoro@217 65 #else
giuliomoro@217 66 lastTime=rt_timer_read();
giuliomoro@217 67 // rt_printf("lastTimeread= %llu\n", lastTime);
giuliomoro@217 68 #endif /* USE_JUCE */
giuliomoro@217 69 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0
giuliomoro@217 70 //read header+payload
giuliomoro@217 71 int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting.
giuliomoro@217 72 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header
giuliomoro@217 73 if(numBytes<0){
giuliomoro@217 74 printf("error numBytes1\n");
giuliomoro@217 75 return -3; //TODO: something went wrong, you have to discard the rest of the packet!
giuliomoro@217 76 }
giuliomoro@217 77 if(numBytes==0){//TODO: this should not happen unless you actually receive a packet of size zero (is it at all possible?)
giuliomoro@217 78 // printf("received 0 bytes\n");
giuliomoro@217 79 return 0;
giuliomoro@217 80 }
giuliomoro@217 81 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead)
giuliomoro@217 82 printf("error numBytes2: %d\n", numBytes);
giuliomoro@217 83 return -4; //TODO: something went wrong, less bytes than expected in the payload.
giuliomoro@217 84 }
giuliomoro@217 85 if(channel!=(int)buffer[writePointer]){
giuliomoro@217 86 // printf("I am channel %d, but I received data for channel %d\n", channel, (int)buffer[writePointer]);
giuliomoro@217 87 return -5;
giuliomoro@217 88 }
giuliomoro@217 89 if(buffer[writePointer+1]!=timestamp+1)
giuliomoro@217 90 printf("missing a timestamp: %d\n",timestamp+1);
giuliomoro@217 91 timestamp=buffer[writePointer+1];
giuliomoro@217 92 // rt_printf("Received a message of length %d, it was on channel %d and timestamp %d\n", numBytes, (int)buffer[writePointer], (int)buffer[writePointer+1]);
giuliomoro@217 93
giuliomoro@217 94 popPayload(writePointer); //restore headerLength payload samples. This could be skipped if writePointer==0
giuliomoro@217 95 //even though we just wrote (payloadLength+headerLength) samples in the buffer,
giuliomoro@217 96 //we only increment by payloadLength. This way, next time a socket.read is performed, we will
giuliomoro@217 97 //backup the last headerLength samples that we just wrote and we will overwrite them with
giuliomoro@217 98 //the header from the new read. After parsing the header we will then restore the backed up samples.
giuliomoro@217 99 //This way we guarantee that, apart from the first headerLength samples, buffer is a circular buffer!
giuliomoro@217 100 writePointer+=payloadLength;
giuliomoro@217 101 wrapWritePointer();
giuliomoro@217 102 return numBytes;
giuliomoro@217 103 }
giuliomoro@217 104 return 0; //timeout occurred
giuliomoro@217 105 }
giuliomoro@217 106 //USE_JUCE Thread(threadName),
giuliomoro@217 107 #ifdef USE_JUCE
giuliomoro@217 108 ReceiveAudioThread::ReceiveAudioThread(const String &threadName) :
giuliomoro@217 109 Thread(threadName),
giuliomoro@217 110 #else
giuliomoro@217 111 ReceiveAudioThread::ReceiveAudioThread() :
giuliomoro@217 112 #endif /* USE_JUCE */
giuliomoro@217 113 socket(0),
giuliomoro@217 114 listening(false),
giuliomoro@217 115 bufferReady(false),
giuliomoro@217 116 buffer(NULL),
giuliomoro@217 117 stackBuffer(NULL),
giuliomoro@217 118 bufferLength(0),
giuliomoro@217 119 lastValidPointer(0),
giuliomoro@217 120 waitForSocketTime(5),
giuliomoro@217 121 #ifdef USE_JUCE
giuliomoro@217 122 threadPriority(5)
giuliomoro@217 123 #else
giuliomoro@217 124 threadPriority(88)
giuliomoro@217 125 #endif /* USE_JUCE */
giuliomoro@217 126 {};
giuliomoro@217 127 ReceiveAudioThread::~ReceiveAudioThread(){
giuliomoro@217 128 #ifdef USE_JUCE
giuliomoro@217 129 stopThread(1000);
giuliomoro@217 130 #else
giuliomoro@217 131 stopThread();
giuliomoro@217 132 while(threadRunning){
giuliomoro@217 133 usleep(sleepTime*2); //wait for thread to stop
giuliomoro@217 134 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl;
giuliomoro@217 135 }
giuliomoro@217 136 #endif /* USE_JUCE */
giuliomoro@217 137 //TODO: check if thread stopped, otherwise kill it before dealloc
giuliomoro@217 138 dealloc();
giuliomoro@217 139 }
giuliomoro@217 140 void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){
giuliomoro@217 141 dealloc();
giuliomoro@217 142 #ifdef USE_JUCE
giuliomoro@217 143 #else
giuliomoro@217 144 staticConstructor();
giuliomoro@217 145 objAddrs.push_back(this);//TODO: this line should be in the constructor
giuliomoro@217 146 #endif /* USE_JUCE */
giuliomoro@217 147 bindToPort(aPort);
giuliomoro@217 148 channel=aChannel;
giuliomoro@217 149 printf("Channel %d is receiving on port %d\n",aChannel, aPort);
giuliomoro@217 150 // fd=fopen("output.m","w"); //DEBUG
giuliomoro@217 151 // fprintf(fd,"var=["); //DEBUG
giuliomoro@217 152 headerLength=2;
giuliomoro@217 153 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending.
giuliomoro@222 154 bufferLength=10 * std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ...
giuliomoro@217 155 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket
giuliomoro@217 156 buffer=(float*)malloc(sizeof(float)*bufferLength);
giuliomoro@217 157 if(buffer==NULL) // something wrong
giuliomoro@217 158 return;
giuliomoro@217 159 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength;
giuliomoro@217 160 memset(buffer,0,bufferLength*sizeof(float));
giuliomoro@217 161 stackBuffer=(float*)malloc(sizeof(float)*headerLength);
giuliomoro@217 162 if(stackBuffer==NULL) // something wrong
giuliomoro@217 163 return;
giuliomoro@217 164 bufferReady=true;
giuliomoro@217 165 bytesToRead=sizeof(float)*(payloadLength + headerLength);
giuliomoro@217 166 writePointer=-1;
giuliomoro@217 167 readPointer=0;
giuliomoro@217 168 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently
giuliomoro@217 169 timestamp=0;
giuliomoro@217 170 #ifdef USE_JUCE
giuliomoro@217 171 startThread(threadPriority);
giuliomoro@217 172 #else
giuliomoro@217 173 //TODO: the thread cannot be started here at the moment because init() is called in setup(), where tasks cannot be scheduled
giuliomoro@217 174 #endif /* USE_JUCE */
giuliomoro@217 175 }
giuliomoro@217 176
giuliomoro@217 177 void ReceiveAudioThread::bindToPort(int aPort){
giuliomoro@217 178 listening=socket.bindToPort(aPort);
giuliomoro@217 179 #ifdef USE_JUCE
giuliomoro@217 180 #else
giuliomoro@217 181 if(listening==false) //this condition is valid also for USE_JUCE, but we do not printf in USE_JUCE
giuliomoro@217 182 printf("Could not bind to port %d\n",aPort);
giuliomoro@217 183 #endif /* USE_JUCE */
giuliomoro@217 184 }
giuliomoro@217 185 bool ReceiveAudioThread::isListening(){
giuliomoro@217 186 return listening;
giuliomoro@217 187 }
giuliomoro@217 188 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples
giuliomoro@217 189 //TODO: make it return the number of samples actually available at the specified location
giuliomoro@217 190 if(isListening()==false || length>bufferLength)
giuliomoro@217 191 return NULL;
giuliomoro@217 192 readPointer+=length;
giuliomoro@217 193 if(readPointer>lastValidPointer){
giuliomoro@217 194 readPointer=headerLength;
giuliomoro@217 195 }
giuliomoro@217 196 return buffer+(int)readPointer;
giuliomoro@217 197 };
giuliomoro@217 198 int ReceiveAudioThread::getSamplesSrc(float *destination, int length,
giuliomoro@217 199 float samplingRateRatio, int numChannelsInDestination,
giuliomoro@217 200 int channelToWriteTo)
giuliomoro@217 201 {
giuliomoro@217 202 if (!(samplingRateRatio>0 && samplingRateRatio<=2))
giuliomoro@217 203 return -2;
giuliomoro@217 204 if(isListening()==false)
giuliomoro@217 205 return -1;
giuliomoro@217 206 static int numCalls=0;
giuliomoro@217 207 if(writePointer<0 /*|| (numCalls&16383)==0*/){ //if writePointer has not been initalized yet ...
giuliomoro@217 208 #ifdef USE_JUCE
giuliomoro@217 209 #else //debug
giuliomoro@222 210 readPointer = headerLength;
giuliomoro@217 211 #endif /* USE_JUCE */
giuliomoro@222 212 // this cumbersome line means: start writing at a position which is as close as possible
giuliomoro@222 213 // to the center of the buffer, but still is aligned to (payloadLength*x)+headerLength
giuliomoro@222 214 // thus allowing buffering to allow clock drift to go either way
giuliomoro@222 215 writePointer = headerLength + ((bufferLength-headerLength)/payloadLength/2)*payloadLength;
giuliomoro@217 216 // This will help keeping them in sync.
giuliomoro@217 217 //TODO: handle what happens when the remote stream is interrupted and then restarted
giuliomoro@222 218 printf("write pointer inited at: %d\n", writePointer);
giuliomoro@217 219 }
giuliomoro@217 220 numCalls++;
giuliomoro@217 221 if(length>lastValidPointer) {
giuliomoro@217 222 //not enough samples available, we fill the buffer with what is available, but the destination buffer will not be filled completely
giuliomoro@217 223 //at this very moment the other thread might be writing at most one payload into the buffer.
giuliomoro@217 224 //To avoid a race condition, we need to let alone the buffer where we are currently writing
giuliomoro@217 225 //as writing the payload also temporarily overwrites the previous headerLength samples, we need to account for them as well
giuliomoro@217 226 //TODO: This assumes that the writePointer and readPointer do not drift. When doing clock synchronization we will find out that it is not true!
giuliomoro@217 227 length=lastValidPointer-payloadLength-headerLength;
giuliomoro@217 228 if(length<0) //no samples available at all!
giuliomoro@217 229 return 0;
giuliomoro@217 230 }
giuliomoro@217 231 for(int n=0; n<length; n++){
giuliomoro@217 232 destination[n*numChannelsInDestination+channelToWriteTo]=buffer[(int)(0.5+readPointer)];//simple ZOH non-interpolation (nearest neighbour)
giuliomoro@217 233 // fprintf(fd,"%f, %d, %f;\n",readPointer,writePointer,destination[n]); //DEBUG
giuliomoro@217 234 readPointer+=samplingRateRatio;
giuliomoro@217 235 if((int)(0.5+readPointer)>=lastValidPointer){
giuliomoro@217 236 readPointer=readPointer-lastValidPointer+headerLength;
giuliomoro@217 237 }
giuliomoro@217 238 }
giuliomoro@217 239 return length;
giuliomoro@217 240 }
giuliomoro@217 241 int ReceiveAudioThread::getSamplesSrc(float *destination, int length, float samplingRateRatio){
giuliomoro@217 242 return getSamplesSrc(destination, length, samplingRateRatio, 1,0);
giuliomoro@217 243 // TODO: rewriting this so that it does not call the override method we can save a multiply and add
giuliomoro@217 244 // for each sample.
giuliomoro@217 245 }
giuliomoro@217 246 bool ReceiveAudioThread::isBufferReady(){
giuliomoro@217 247 return bufferReady;
giuliomoro@217 248 }
giuliomoro@217 249 #ifdef USE_JUCE
giuliomoro@217 250 #else
giuliomoro@217 251 void ReceiveAudioThread::startThread(){
giuliomoro@217 252 BeagleRT_scheduleAuxiliaryTask(receiveDataTask);
giuliomoro@217 253 }
giuliomoro@217 254 void ReceiveAudioThread::stopThread(){
giuliomoro@217 255 threadIsExiting=true;
giuliomoro@217 256 }
giuliomoro@217 257 bool ReceiveAudioThread::threadShouldExit(){
giuliomoro@217 258 return(gShouldStop || threadIsExiting );
giuliomoro@217 259 }
giuliomoro@217 260 RTIME ReceiveAudioThread::getLastTime(){
giuliomoro@217 261 return lastTime;
giuliomoro@217 262 }
giuliomoro@217 263 #endif /* USE_JUCE */
giuliomoro@217 264 int ReceiveAudioThread::getTimestamp(){
giuliomoro@217 265 return timestamp;
giuliomoro@217 266 }
giuliomoro@217 267 void ReceiveAudioThread::run(){
giuliomoro@217 268 // fd2=fopen("buffer.m","w"); //DEBUG
giuliomoro@217 269 // fprintf(fd2, "buf=["); //DEBUG
giuliomoro@217 270 threadRunning=true;
giuliomoro@217 271 int maxCount=10;
giuliomoro@217 272 int count=0;
giuliomoro@217 273 // Clean the socket from anything that is currently in it.
giuliomoro@217 274 #ifdef USE_JUCE
giuliomoro@217 275 // this is borrowed from BeagleRT's UdpServer class.
giuliomoro@217 276 int n;
giuliomoro@217 277 do {
giuliomoro@217 278 float waste;
giuliomoro@217 279 if(socket.waitUntilReady(true, 0)==0)
giuliomoro@217 280 break;
giuliomoro@217 281 n=socket.read((void*)&waste, sizeof(float), false);
giuliomoro@217 282 count++;
giuliomoro@217 283 if(n<0){
giuliomoro@217 284 printf("error\n");
giuliomoro@217 285 break;
giuliomoro@217 286 }
giuliomoro@217 287 printf("n: %d\n",n);
giuliomoro@217 288 } while (n>0 && (maxCount<=0 || count<maxCount));
giuliomoro@217 289 #else
giuliomoro@217 290 for(unsigned int n=0; n<objAddrs.size(); n++){
giuliomoro@217 291 count=objAddrs[n]->socket.empty(maxCount);
giuliomoro@217 292 }
giuliomoro@217 293 #endif /* USE_JUCE */
giuliomoro@217 294 printf("socket emptied with %d reads\n", count);
giuliomoro@217 295
giuliomoro@217 296 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting
giuliomoro@217 297 #ifdef USE_JUCE
giuliomoro@217 298 readUdpToBuffer(); // read into the oldBuffer
giuliomoro@217 299 sleep(sleepTime);
giuliomoro@217 300 #else
giuliomoro@217 301 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){
giuliomoro@217 302 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer();
giuliomoro@217 303 }
giuliomoro@217 304 usleep(sleepTime); //TODO: use rt_task_sleep instead
giuliomoro@217 305 #endif /* USE_JUCE */
giuliomoro@217 306 }
giuliomoro@217 307 threadRunning=false;
giuliomoro@217 308 printf("Thread is not running \n");
giuliomoro@217 309 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG
giuliomoro@217 310 // fclose(fd);//DEBUG
giuliomoro@217 311 // fprintf(fd2,"];");//DEBUG
giuliomoro@217 312 // fclose(fd2); //DEBUG
giuliomoro@217 313 }