Mercurial > hg > beaglert
comparison core/ReceiveAudioThread.cpp @ 222:6a23c07d0fbb mergingClockSync
Working with UdpIoPlugin
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Sun, 14 Feb 2016 01:09:23 +0000 |
parents | c42a6b4dc2d4 |
children | cb47043c8c28 |
comparison
equal
deleted
inserted
replaced
221:dbff109f64c2 | 222:6a23c07d0fbb |
---|---|
37 if(writePointer+payloadLength+headerLength>bufferLength){ //if we are going to exceed the length of the buffer with the next reading | 37 if(writePointer+payloadLength+headerLength>bufferLength){ //if we are going to exceed the length of the buffer with the next reading |
38 // lastValidPointer=writePointer+headerLength; //remember where the last valid data are | 38 // lastValidPointer=writePointer+headerLength; //remember where the last valid data are |
39 // for(int n=headerLength;n<lastValidPointer; n++){ | 39 // for(int n=headerLength;n<lastValidPointer; n++){ |
40 // fprintf(fd2, "%f\n",buffer[n]); //DEBUG | 40 // fprintf(fd2, "%f\n",buffer[n]); //DEBUG |
41 // } | 41 // } |
42 writePointer=0; //and reset to beginning of the buffer | 42 writePointer=0; //and reset to beginning of the buffer |
43 } | 43 } |
44 } | 44 } |
45 void ReceiveAudioThread::pushPayload(int startIndex){ //backup the payload samples that will be overwritten by the new header | 45 void ReceiveAudioThread::pushPayload(int startIndex){ //backup the payload samples that will be overwritten by the new header |
46 for(int n=0; n<headerLength; n++){ | 46 for(int n=0; n<headerLength; n++){ |
47 stackBuffer[n]=buffer[startIndex+n]; | 47 stackBuffer[n]=buffer[startIndex+n]; |
52 buffer[startIndex+n]=stackBuffer[n]; | 52 buffer[startIndex+n]=stackBuffer[n]; |
53 } | 53 } |
54 } | 54 } |
55 | 55 |
56 int ReceiveAudioThread::readUdpToBuffer(){ | 56 int ReceiveAudioThread::readUdpToBuffer(){ |
57 if(listening==false || bufferReady==false) | 57 |
58 if(listening==false || bufferReady==false) | |
58 return 0; | 59 return 0; |
59 if(writePointer<0) | 60 if(writePointer<0) |
60 return 0; | 61 return 0; |
61 if(socket.waitUntilReady(true, waitForSocketTime)){// TODO: if waitForSocketTime here is >>5, the | 62 if(socket.waitUntilReady(true, waitForSocketTime)){// TODO: if waitForSocketTime here is >>5, the |
63 // destructor (always or sometimes) never actually gets called, despite run() returns ...see issue #1381 | |
62 #ifdef USE_JUCE | 64 #ifdef USE_JUCE |
63 #else | 65 #else |
64 lastTime=rt_timer_read(); | 66 lastTime=rt_timer_read(); |
65 // rt_printf("lastTimeread= %llu\n", lastTime); | 67 // rt_printf("lastTimeread= %llu\n", lastTime); |
66 #endif /* USE_JUCE */ | 68 #endif /* USE_JUCE */ |
67 // destructor (always or sometimes) never actually gets called, despite run() returns ...see issue #1381 | |
68 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 | 69 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 |
69 //read header+payload | 70 //read header+payload |
70 int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting. | 71 int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting. |
71 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header | 72 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header |
72 if(numBytes<0){ | 73 if(numBytes<0){ |
148 printf("Channel %d is receiving on port %d\n",aChannel, aPort); | 149 printf("Channel %d is receiving on port %d\n",aChannel, aPort); |
149 // fd=fopen("output.m","w"); //DEBUG | 150 // fd=fopen("output.m","w"); //DEBUG |
150 // fprintf(fd,"var=["); //DEBUG | 151 // fprintf(fd,"var=["); //DEBUG |
151 headerLength=2; | 152 headerLength=2; |
152 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending. | 153 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending. |
153 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... | 154 bufferLength=10 * std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... |
154 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket | 155 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket |
155 buffer=(float*)malloc(sizeof(float)*bufferLength); | 156 buffer=(float*)malloc(sizeof(float)*bufferLength); |
156 if(buffer==NULL) // something wrong | 157 if(buffer==NULL) // something wrong |
157 return; | 158 return; |
158 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength; | 159 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength; |
204 return -1; | 205 return -1; |
205 static int numCalls=0; | 206 static int numCalls=0; |
206 if(writePointer<0 /*|| (numCalls&16383)==0*/){ //if writePointer has not been initalized yet ... | 207 if(writePointer<0 /*|| (numCalls&16383)==0*/){ //if writePointer has not been initalized yet ... |
207 #ifdef USE_JUCE | 208 #ifdef USE_JUCE |
208 #else //debug | 209 #else //debug |
209 rt_printf("reinit the writePointer, readPointer: %f;\n",readPointer); | 210 readPointer = headerLength; |
210 readPointer=0; | 211 #endif /* USE_JUCE */ |
211 #endif /* USE_JUCE */ | 212 // this cumbersome line means: start writing at a position which is as close as possible |
212 writePointer=2*length; // do it, so that it starts writing at a safety margin from where we write. | 213 // to the center of the buffer, but still is aligned to (payloadLength*x)+headerLength |
214 // thus allowing buffering to allow clock drift to go either way | |
215 writePointer = headerLength + ((bufferLength-headerLength)/payloadLength/2)*payloadLength; | |
213 // This will help keeping them in sync. | 216 // This will help keeping them in sync. |
214 //TODO: handle what happens when the remote stream is interrupted and then restarted | 217 //TODO: handle what happens when the remote stream is interrupted and then restarted |
218 printf("write pointer inited at: %d\n", writePointer); | |
215 } | 219 } |
216 numCalls++; | 220 numCalls++; |
217 if(length>lastValidPointer) { | 221 if(length>lastValidPointer) { |
218 //not enough samples available, we fill the buffer with what is available, but the destination buffer will not be filled completely | 222 //not enough samples available, we fill the buffer with what is available, but the destination buffer will not be filled completely |
219 //at this very moment the other thread might be writing at most one payload into the buffer. | 223 //at this very moment the other thread might be writing at most one payload into the buffer. |
293 #ifdef USE_JUCE | 297 #ifdef USE_JUCE |
294 readUdpToBuffer(); // read into the oldBuffer | 298 readUdpToBuffer(); // read into the oldBuffer |
295 sleep(sleepTime); | 299 sleep(sleepTime); |
296 #else | 300 #else |
297 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){ | 301 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){ |
298 // printf("%d\n", n); | |
299 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer(); | 302 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer(); |
300 } | 303 } |
301 usleep(sleepTime); //TODO: use rt_task_sleep instead | 304 usleep(sleepTime); //TODO: use rt_task_sleep instead |
302 #endif /* USE_JUCE */ | 305 #endif /* USE_JUCE */ |
303 } | 306 } |