annotate core/ReceiveAudioThread.cpp @ 151:e9c9404e3d1f ClockSync

Pff partially working. No PID. When setting the audio clock on the bbb to 44098 the master and slave clock keep diverging instead of converging ...
author Giulio Moro <giuliomoro@yahoo.it>
date Tue, 22 Sep 2015 04:10:07 +0100
parents e77e2e712fbc
children
rev   line source
giuliomoro@117 1 #include "ReceiveAudioThread.h"
giuliomoro@119 2
giuliomoro@128 3 #ifdef USE_JUCE
giuliomoro@119 4 #else
giuliomoro@117 5 //initialise static members
giuliomoro@119 6 bool ReceiveAudioThread::staticConstructed=false;
giuliomoro@117 7 AuxiliaryTask ReceiveAudioThread::receiveDataTask=NULL;
giuliomoro@119 8 std::vector<ReceiveAudioThread *> ReceiveAudioThread::objAddrs(0);
giuliomoro@119 9 bool ReceiveAudioThread::threadRunning;
giuliomoro@119 10 bool ReceiveAudioThread::threadIsExiting;
giuliomoro@119 11 int ReceiveAudioThread::sleepTime;
giuliomoro@119 12
giuliomoro@119 13 void receiveData(){
giuliomoro@119 14 ReceiveAudioThread::run();
giuliomoro@119 15 }
giuliomoro@119 16 void ReceiveAudioThread::staticConstructor(){
giuliomoro@119 17 if(staticConstructed==true)
giuliomoro@119 18 return;
giuliomoro@119 19 staticConstructed=true;
giuliomoro@119 20 threadIsExiting=false;
giuliomoro@119 21 receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities
giuliomoro@119 22 }
giuliomoro@128 23 #endif /* USE_JUCE */
giuliomoro@117 24
giuliomoro@117 25 void ReceiveAudioThread::dealloc(){
giuliomoro@117 26 free(buffer);
giuliomoro@117 27 buffer=NULL;
giuliomoro@117 28 free(stackBuffer);
giuliomoro@117 29 stackBuffer=NULL;
giuliomoro@117 30 }
giuliomoro@117 31 void ReceiveAudioThread::wrapWritePointer(){
giuliomoro@122 32 //this is not quite a simple wrapping as you would do in a circular buffer,
giuliomoro@117 33 //as there is no guarantee the buffer will be full at all times, given that there must alwas be enough space at the end of it
giuliomoro@117 34 //to hold a full payload
giuliomoro@117 35 // lastValidPointer indicates the last pointer in the buffer containing valid data
giuliomoro@117 36 //
giuliomoro@117 37 if(writePointer+payloadLength+headerLength>bufferLength){ //if we are going to exceed the length of the buffer with the next reading
giuliomoro@117 38 // lastValidPointer=writePointer+headerLength; //remember where the last valid data are
giuliomoro@117 39 // for(int n=headerLength;n<lastValidPointer; n++){
giuliomoro@117 40 // fprintf(fd2, "%f\n",buffer[n]); //DEBUG
giuliomoro@117 41 // }
giuliomoro@117 42 writePointer=0; //and reset to beginning of the buffer
giuliomoro@117 43 }
giuliomoro@117 44 }
giuliomoro@117 45 void ReceiveAudioThread::pushPayload(int startIndex){ //backup the payload samples that will be overwritten by the new header
giuliomoro@117 46 for(int n=0; n<headerLength; n++){
giuliomoro@117 47 stackBuffer[n]=buffer[startIndex+n];
giuliomoro@117 48 }
giuliomoro@117 49 }
giuliomoro@117 50 void ReceiveAudioThread::popPayload(int startIndex){
giuliomoro@117 51 for(int n=0; n<headerLength; n++){
giuliomoro@117 52 buffer[startIndex+n]=stackBuffer[n];
giuliomoro@117 53 }
giuliomoro@117 54 }
giuliomoro@117 55
giuliomoro@117 56 int ReceiveAudioThread::readUdpToBuffer(){
giuliomoro@117 57 if(listening==false || bufferReady==false)
giuliomoro@117 58 return 0;
giuliomoro@117 59 if(writePointer<0)
giuliomoro@117 60 return 0;
giuliomoro@127 61 if(socket.waitUntilReady(true, waitForSocketTime)){// TODO: if waitForSocketTime here is >>5, the
giuliomoro@132 62 #ifdef USE_JUCE
giuliomoro@132 63 #else
giuliomoro@132 64 lastTime=rt_timer_read();
giuliomoro@132 65 // rt_printf("lastTimeread= %llu\n", lastTime);
giuliomoro@132 66 #endif /* USE_JUCE */
giuliomoro@127 67 // destructor (always or sometimes) never actually gets called, despite run() returns ...see issue #1381
giuliomoro@117 68 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0
giuliomoro@117 69 //read header+payload
giuliomoro@125 70 int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting.
giuliomoro@117 71 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header
giuliomoro@117 72 if(numBytes<0){
giuliomoro@117 73 printf("error numBytes1\n");
giuliomoro@117 74 return -3; //TODO: something went wrong, you have to discard the rest of the packet!
giuliomoro@117 75 }
giuliomoro@125 76 if(numBytes==0){//TODO: this should not happen unless you actually receive a packet of size zero (is it at all possible?)
giuliomoro@117 77 // printf("received 0 bytes\n");
giuliomoro@117 78 return 0;
giuliomoro@117 79 }
giuliomoro@119 80 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead)
giuliomoro@117 81 printf("error numBytes2: %d\n", numBytes);
giuliomoro@117 82 return -4; //TODO: something went wrong, less bytes than expected in the payload.
giuliomoro@117 83 }
giuliomoro@119 84 if(channel!=(int)buffer[writePointer]){
giuliomoro@119 85 // printf("I am channel %d, but I received data for channel %d\n", channel, (int)buffer[writePointer]);
giuliomoro@119 86 return -5;
giuliomoro@119 87 }
giuliomoro@131 88 if(buffer[writePointer+1]!=timestamp+1)
giuliomoro@131 89 printf("missing a timestamp: %d\n",timestamp+1);
giuliomoro@131 90 timestamp=buffer[writePointer+1];
giuliomoro@119 91 // rt_printf("Received a message of length %d, it was on channel %d and timestamp %d\n", numBytes, (int)buffer[writePointer], (int)buffer[writePointer+1]);
giuliomoro@131 92
giuliomoro@117 93 popPayload(writePointer); //restore headerLength payload samples. This could be skipped if writePointer==0
giuliomoro@117 94 //even though we just wrote (payloadLength+headerLength) samples in the buffer,
giuliomoro@117 95 //we only increment by payloadLength. This way, next time a socket.read is performed, we will
giuliomoro@117 96 //backup the last headerLength samples that we just wrote and we will overwrite them with
giuliomoro@117 97 //the header from the new read. After parsing the header we will then restore the backed up samples.
giuliomoro@117 98 //This way we guarantee that, apart from the first headerLength samples, buffer is a circular buffer!
giuliomoro@117 99 writePointer+=payloadLength;
giuliomoro@117 100 wrapWritePointer();
giuliomoro@117 101 return numBytes;
giuliomoro@117 102 }
giuliomoro@117 103 return 0; //timeout occurred
giuliomoro@117 104 }
giuliomoro@128 105 //USE_JUCE Thread(threadName),
giuliomoro@128 106 #ifdef USE_JUCE
giuliomoro@125 107 ReceiveAudioThread::ReceiveAudioThread(const String &threadName) :
giuliomoro@125 108 Thread(threadName),
giuliomoro@125 109 #else
giuliomoro@117 110 ReceiveAudioThread::ReceiveAudioThread() :
giuliomoro@128 111 #endif /* USE_JUCE */
giuliomoro@127 112 socket(0),
giuliomoro@117 113 listening(false),
giuliomoro@117 114 bufferReady(false),
giuliomoro@117 115 buffer(NULL),
giuliomoro@117 116 stackBuffer(NULL),
giuliomoro@117 117 bufferLength(0),
giuliomoro@117 118 lastValidPointer(0),
giuliomoro@127 119 waitForSocketTime(5),
giuliomoro@128 120 #ifdef USE_JUCE
giuliomoro@125 121 threadPriority(5)
giuliomoro@125 122 #else
giuliomoro@125 123 threadPriority(88)
giuliomoro@128 124 #endif /* USE_JUCE */
giuliomoro@117 125 {};
giuliomoro@117 126 ReceiveAudioThread::~ReceiveAudioThread(){
giuliomoro@128 127 #ifdef USE_JUCE
giuliomoro@125 128 stopThread(1000);
giuliomoro@125 129 #else
giuliomoro@131 130 stopThread();
giuliomoro@117 131 while(threadRunning){
giuliomoro@119 132 usleep(sleepTime*2); //wait for thread to stop
giuliomoro@117 133 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl;
giuliomoro@117 134 }
giuliomoro@128 135 #endif /* USE_JUCE */
giuliomoro@117 136 //TODO: check if thread stopped, otherwise kill it before dealloc
giuliomoro@117 137 dealloc();
giuliomoro@117 138 }
giuliomoro@119 139 void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){
giuliomoro@117 140 dealloc();
giuliomoro@128 141 #ifdef USE_JUCE
giuliomoro@119 142 #else
giuliomoro@119 143 staticConstructor();
giuliomoro@119 144 objAddrs.push_back(this);//TODO: this line should be in the constructor
giuliomoro@128 145 #endif /* USE_JUCE */
giuliomoro@119 146 bindToPort(aPort);
giuliomoro@119 147 channel=aChannel;
giuliomoro@120 148 printf("Channel %d is receiving on port %d\n",aChannel, aPort);
giuliomoro@117 149 // fd=fopen("output.m","w"); //DEBUG
giuliomoro@117 150 // fprintf(fd,"var=["); //DEBUG
giuliomoro@117 151 headerLength=2;
giuliomoro@117 152 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending.
giuliomoro@117 153 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ...
giuliomoro@117 154 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket
giuliomoro@117 155 buffer=(float*)malloc(sizeof(float)*bufferLength);
giuliomoro@117 156 if(buffer==NULL) // something wrong
giuliomoro@117 157 return;
giuliomoro@117 158 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength;
giuliomoro@117 159 memset(buffer,0,bufferLength*sizeof(float));
giuliomoro@117 160 stackBuffer=(float*)malloc(sizeof(float)*headerLength);
giuliomoro@125 161 if(stackBuffer==NULL) // something wrong
giuliomoro@125 162 return;
giuliomoro@125 163 bufferReady=true;
giuliomoro@117 164 bytesToRead=sizeof(float)*(payloadLength + headerLength);
giuliomoro@117 165 writePointer=-1;
giuliomoro@120 166 readPointer=0;
giuliomoro@117 167 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently
giuliomoro@132 168 timestamp=0;
giuliomoro@128 169 #ifdef USE_JUCE
giuliomoro@125 170 startThread(threadPriority);
giuliomoro@125 171 #else
giuliomoro@125 172 //TODO: the thread cannot be started here at the moment because init() is called in setup(), where tasks cannot be scheduled
giuliomoro@128 173 #endif /* USE_JUCE */
giuliomoro@117 174 }
giuliomoro@117 175
giuliomoro@117 176 void ReceiveAudioThread::bindToPort(int aPort){
giuliomoro@117 177 listening=socket.bindToPort(aPort);
giuliomoro@128 178 #ifdef USE_JUCE
giuliomoro@119 179 #else
giuliomoro@128 180 if(listening==false) //this condition is valid also for USE_JUCE, but we do not printf in USE_JUCE
giuliomoro@119 181 printf("Could not bind to port %d\n",aPort);
giuliomoro@128 182 #endif /* USE_JUCE */
giuliomoro@117 183 }
giuliomoro@117 184 bool ReceiveAudioThread::isListening(){
giuliomoro@117 185 return listening;
giuliomoro@117 186 }
giuliomoro@117 187 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples
giuliomoro@117 188 //TODO: make it return the number of samples actually available at the specified location
giuliomoro@117 189 if(isListening()==false || length>bufferLength)
giuliomoro@117 190 return NULL;
giuliomoro@117 191 readPointer+=length;
giuliomoro@117 192 if(readPointer>lastValidPointer){
giuliomoro@117 193 readPointer=headerLength;
giuliomoro@117 194 }
giuliomoro@117 195 return buffer+(int)readPointer;
giuliomoro@117 196 };
giuliomoro@120 197 int ReceiveAudioThread::getSamplesSrc(float *destination, int length,
giuliomoro@120 198 float samplingRateRatio, int numChannelsInDestination,
giuliomoro@120 199 int channelToWriteTo)
giuliomoro@120 200 {
giuliomoro@117 201 if (!(samplingRateRatio>0 && samplingRateRatio<=2))
giuliomoro@117 202 return -2;
giuliomoro@117 203 if(isListening()==false)
giuliomoro@117 204 return -1;
giuliomoro@133 205 static int numCalls=0;
giuliomoro@135 206 if(writePointer<0 /*|| (numCalls&16383)==0*/){ //if writePointer has not been initalized yet ...
giuliomoro@133 207 #ifdef USE_JUCE
giuliomoro@133 208 #else //debug
giuliomoro@133 209 rt_printf("reinit the writePointer, readPointer: %f;\n",readPointer);
giuliomoro@133 210 readPointer=0;
giuliomoro@133 211 #endif /* USE_JUCE */
giuliomoro@117 212 writePointer=2*length; // do it, so that it starts writing at a safety margin from where we write.
giuliomoro@117 213 // This will help keeping them in sync.
giuliomoro@117 214 //TODO: handle what happens when the remote stream is interrupted and then restarted
giuliomoro@117 215 }
giuliomoro@133 216 numCalls++;
giuliomoro@117 217 if(length>lastValidPointer) {
giuliomoro@117 218 //not enough samples available, we fill the buffer with what is available, but the destination buffer will not be filled completely
giuliomoro@117 219 //at this very moment the other thread might be writing at most one payload into the buffer.
giuliomoro@117 220 //To avoid a race condition, we need to let alone the buffer where we are currently writing
giuliomoro@117 221 //as writing the payload also temporarily overwrites the previous headerLength samples, we need to account for them as well
giuliomoro@117 222 //TODO: This assumes that the writePointer and readPointer do not drift. When doing clock synchronization we will find out that it is not true!
giuliomoro@117 223 length=lastValidPointer-payloadLength-headerLength;
giuliomoro@117 224 if(length<0) //no samples available at all!
giuliomoro@117 225 return 0;
giuliomoro@117 226 }
giuliomoro@117 227 for(int n=0; n<length; n++){
giuliomoro@120 228 destination[n*numChannelsInDestination+channelToWriteTo]=buffer[(int)(0.5+readPointer)];//simple ZOH non-interpolation (nearest neighbour)
giuliomoro@117 229 // fprintf(fd,"%f, %d, %f;\n",readPointer,writePointer,destination[n]); //DEBUG
giuliomoro@117 230 readPointer+=samplingRateRatio;
giuliomoro@117 231 if((int)(0.5+readPointer)>=lastValidPointer){
giuliomoro@117 232 readPointer=readPointer-lastValidPointer+headerLength;
giuliomoro@117 233 }
giuliomoro@117 234 }
giuliomoro@125 235 return length;
giuliomoro@117 236 }
giuliomoro@120 237 int ReceiveAudioThread::getSamplesSrc(float *destination, int length, float samplingRateRatio){
giuliomoro@120 238 return getSamplesSrc(destination, length, samplingRateRatio, 1,0);
giuliomoro@120 239 // TODO: rewriting this so that it does not call the override method we can save a multiply and add
giuliomoro@120 240 // for each sample.
giuliomoro@120 241 }
giuliomoro@117 242 bool ReceiveAudioThread::isBufferReady(){
giuliomoro@117 243 return bufferReady;
giuliomoro@117 244 }
giuliomoro@128 245 #ifdef USE_JUCE
giuliomoro@125 246 #else
giuliomoro@117 247 void ReceiveAudioThread::startThread(){
giuliomoro@117 248 BeagleRT_scheduleAuxiliaryTask(receiveDataTask);
giuliomoro@117 249 }
giuliomoro@117 250 void ReceiveAudioThread::stopThread(){
giuliomoro@117 251 threadIsExiting=true;
giuliomoro@117 252 }
giuliomoro@117 253 bool ReceiveAudioThread::threadShouldExit(){
giuliomoro@117 254 return(gShouldStop || threadIsExiting );
giuliomoro@117 255 }
giuliomoro@132 256 RTIME ReceiveAudioThread::getLastTime(){
giuliomoro@132 257 return lastTime;
giuliomoro@132 258 }
giuliomoro@128 259 #endif /* USE_JUCE */
giuliomoro@132 260 int ReceiveAudioThread::getTimestamp(){
giuliomoro@132 261 return timestamp;
giuliomoro@132 262 }
giuliomoro@117 263 void ReceiveAudioThread::run(){
giuliomoro@117 264 // fd2=fopen("buffer.m","w"); //DEBUG
giuliomoro@117 265 // fprintf(fd2, "buf=["); //DEBUG
giuliomoro@117 266 threadRunning=true;
giuliomoro@131 267 int maxCount=10;
giuliomoro@131 268 int count=0;
giuliomoro@131 269 // Clean the socket from anything that is currently in it.
giuliomoro@131 270 #ifdef USE_JUCE
giuliomoro@131 271 // this is borrowed from BeagleRT's UdpServer class.
giuliomoro@131 272 int n;
giuliomoro@131 273 do {
giuliomoro@131 274 float waste;
giuliomoro@131 275 if(socket.waitUntilReady(true, 0)==0)
giuliomoro@131 276 break;
giuliomoro@131 277 n=socket.read((void*)&waste, sizeof(float), false);
giuliomoro@131 278 count++;
giuliomoro@131 279 if(n<0){
giuliomoro@131 280 printf("error\n");
giuliomoro@131 281 break;
giuliomoro@131 282 }
giuliomoro@131 283 printf("n: %d\n",n);
giuliomoro@131 284 } while (n>0 && (maxCount<=0 || count<maxCount));
giuliomoro@131 285 #else
giuliomoro@131 286 for(unsigned int n=0; n<objAddrs.size(); n++){
giuliomoro@131 287 count=objAddrs[n]->socket.empty(maxCount);
giuliomoro@131 288 }
giuliomoro@131 289 #endif /* USE_JUCE */
giuliomoro@131 290 printf("socket emptied with %d reads\n", count);
giuliomoro@131 291
giuliomoro@117 292 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting
giuliomoro@128 293 #ifdef USE_JUCE
giuliomoro@117 294 readUdpToBuffer(); // read into the oldBuffer
giuliomoro@128 295 sleep(sleepTime);
giuliomoro@119 296 #else
giuliomoro@119 297 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){
giuliomoro@122 298 // printf("%d\n", n);
giuliomoro@119 299 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer();
giuliomoro@119 300 }
giuliomoro@119 301 usleep(sleepTime); //TODO: use rt_task_sleep instead
giuliomoro@128 302 #endif /* USE_JUCE */
giuliomoro@117 303 }
giuliomoro@117 304 threadRunning=false;
giuliomoro@125 305 printf("Thread is not running \n");
giuliomoro@117 306 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG
giuliomoro@117 307 // fclose(fd);//DEBUG
giuliomoro@117 308 // fprintf(fd2,"];");//DEBUG
giuliomoro@117 309 // fclose(fd2); //DEBUG
giuliomoro@117 310 }