comparison core/ReceiveAudioThread.cpp @ 222:6a23c07d0fbb mergingClockSync

Working with UdpIoPlugin
author Giulio Moro <giuliomoro@yahoo.it>
date Sun, 14 Feb 2016 01:09:23 +0000
parents c42a6b4dc2d4
children cb47043c8c28
comparison
equal deleted inserted replaced
221:dbff109f64c2 222:6a23c07d0fbb
37 if(writePointer+payloadLength+headerLength>bufferLength){ //if we are going to exceed the length of the buffer with the next reading 37 if(writePointer+payloadLength+headerLength>bufferLength){ //if we are going to exceed the length of the buffer with the next reading
38 // lastValidPointer=writePointer+headerLength; //remember where the last valid data are 38 // lastValidPointer=writePointer+headerLength; //remember where the last valid data are
39 // for(int n=headerLength;n<lastValidPointer; n++){ 39 // for(int n=headerLength;n<lastValidPointer; n++){
40 // fprintf(fd2, "%f\n",buffer[n]); //DEBUG 40 // fprintf(fd2, "%f\n",buffer[n]); //DEBUG
41 // } 41 // }
42 writePointer=0; //and reset to beginning of the buffer 42 writePointer=0; //and reset to beginning of the buffer
43 } 43 }
44 } 44 }
45 void ReceiveAudioThread::pushPayload(int startIndex){ //backup the payload samples that will be overwritten by the new header 45 void ReceiveAudioThread::pushPayload(int startIndex){ //backup the payload samples that will be overwritten by the new header
46 for(int n=0; n<headerLength; n++){ 46 for(int n=0; n<headerLength; n++){
47 stackBuffer[n]=buffer[startIndex+n]; 47 stackBuffer[n]=buffer[startIndex+n];
52 buffer[startIndex+n]=stackBuffer[n]; 52 buffer[startIndex+n]=stackBuffer[n];
53 } 53 }
54 } 54 }
55 55
56 int ReceiveAudioThread::readUdpToBuffer(){ 56 int ReceiveAudioThread::readUdpToBuffer(){
57 if(listening==false || bufferReady==false) 57
58 if(listening==false || bufferReady==false)
58 return 0; 59 return 0;
59 if(writePointer<0) 60 if(writePointer<0)
60 return 0; 61 return 0;
61 if(socket.waitUntilReady(true, waitForSocketTime)){// TODO: if waitForSocketTime here is >>5, the 62 if(socket.waitUntilReady(true, waitForSocketTime)){// TODO: if waitForSocketTime here is >>5, the
63 // destructor (always or sometimes) never actually gets called, despite run() returns ...see issue #1381
62 #ifdef USE_JUCE 64 #ifdef USE_JUCE
63 #else 65 #else
64 lastTime=rt_timer_read(); 66 lastTime=rt_timer_read();
65 // rt_printf("lastTimeread= %llu\n", lastTime); 67 // rt_printf("lastTimeread= %llu\n", lastTime);
66 #endif /* USE_JUCE */ 68 #endif /* USE_JUCE */
67 // destructor (always or sometimes) never actually gets called, despite run() returns ...see issue #1381
68 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 69 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0
69 //read header+payload 70 //read header+payload
70 int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting. 71 int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting.
71 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header 72 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header
72 if(numBytes<0){ 73 if(numBytes<0){
148 printf("Channel %d is receiving on port %d\n",aChannel, aPort); 149 printf("Channel %d is receiving on port %d\n",aChannel, aPort);
149 // fd=fopen("output.m","w"); //DEBUG 150 // fd=fopen("output.m","w"); //DEBUG
150 // fprintf(fd,"var=["); //DEBUG 151 // fprintf(fd,"var=["); //DEBUG
151 headerLength=2; 152 headerLength=2;
152 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending. 153 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending.
153 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... 154 bufferLength=10 * std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ...
154 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket 155 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket
155 buffer=(float*)malloc(sizeof(float)*bufferLength); 156 buffer=(float*)malloc(sizeof(float)*bufferLength);
156 if(buffer==NULL) // something wrong 157 if(buffer==NULL) // something wrong
157 return; 158 return;
158 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength; 159 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength;
204 return -1; 205 return -1;
205 static int numCalls=0; 206 static int numCalls=0;
206 if(writePointer<0 /*|| (numCalls&16383)==0*/){ //if writePointer has not been initalized yet ... 207 if(writePointer<0 /*|| (numCalls&16383)==0*/){ //if writePointer has not been initalized yet ...
207 #ifdef USE_JUCE 208 #ifdef USE_JUCE
208 #else //debug 209 #else //debug
209 rt_printf("reinit the writePointer, readPointer: %f;\n",readPointer); 210 readPointer = headerLength;
210 readPointer=0; 211 #endif /* USE_JUCE */
211 #endif /* USE_JUCE */ 212 // this cumbersome line means: start writing at a position which is as close as possible
212 writePointer=2*length; // do it, so that it starts writing at a safety margin from where we write. 213 // to the center of the buffer, but still is aligned to (payloadLength*x)+headerLength
214 // thus allowing buffering to allow clock drift to go either way
215 writePointer = headerLength + ((bufferLength-headerLength)/payloadLength/2)*payloadLength;
213 // This will help keeping them in sync. 216 // This will help keeping them in sync.
214 //TODO: handle what happens when the remote stream is interrupted and then restarted 217 //TODO: handle what happens when the remote stream is interrupted and then restarted
218 printf("write pointer inited at: %d\n", writePointer);
215 } 219 }
216 numCalls++; 220 numCalls++;
217 if(length>lastValidPointer) { 221 if(length>lastValidPointer) {
218 //not enough samples available, we fill the buffer with what is available, but the destination buffer will not be filled completely 222 //not enough samples available, we fill the buffer with what is available, but the destination buffer will not be filled completely
219 //at this very moment the other thread might be writing at most one payload into the buffer. 223 //at this very moment the other thread might be writing at most one payload into the buffer.
293 #ifdef USE_JUCE 297 #ifdef USE_JUCE
294 readUdpToBuffer(); // read into the oldBuffer 298 readUdpToBuffer(); // read into the oldBuffer
295 sleep(sleepTime); 299 sleep(sleepTime);
296 #else 300 #else
297 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){ 301 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){
298 // printf("%d\n", n);
299 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer(); 302 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer();
300 } 303 }
301 usleep(sleepTime); //TODO: use rt_task_sleep instead 304 usleep(sleepTime); //TODO: use rt_task_sleep instead
302 #endif /* USE_JUCE */ 305 #endif /* USE_JUCE */
303 } 306 }