comparison core/ReceiveAudioThread.cpp @ 125:850a4a9bd832 scope-refactoring

Added ifdefs and unified the code with udpioplugin ... the latter has not been tested (or committed). TODO: still it hangs after ctrl-c BeagleRT (auxiliary tasks do not terminate). TODO: sometimes you can hear dropouts in the transmission. Maybe it is due to pointer drifting. Rebooting BBB fixes/affects this issue.
author Giulio Moro <giuliomoro@yahoo.it>
date Sat, 22 Aug 2015 02:53:36 +0100
parents bc514f29c3aa
children 6c8fb6f07b47
comparison
equal deleted inserted replaced
124:23137a333c93 125:850a4a9bd832
18 return; 18 return;
19 staticConstructed=true; 19 staticConstructed=true;
20 threadIsExiting=false; 20 threadIsExiting=false;
21 receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities 21 receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities
22 } 22 }
23 #endif 23 #endif /* JUCE */
24 24
25 void ReceiveAudioThread::dealloc(){ 25 void ReceiveAudioThread::dealloc(){
26 free(buffer); 26 free(buffer);
27 buffer=NULL; 27 buffer=NULL;
28 free(stackBuffer); 28 free(stackBuffer);
61 if(socket.waitUntilReady(true, waitForSocketTime)){ // waitForSocketTime could have been set to -1 (wait forever), 61 if(socket.waitUntilReady(true, waitForSocketTime)){ // waitForSocketTime could have been set to -1 (wait forever),
62 // but it would have made it more difficult for the thread to be killed 62 // but it would have made it more difficult for the thread to be killed
63 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 63 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0
64 //read header+payload 64 //read header+payload
65 //JUCE int numBytes=socket.read(buffer+writePointer, bytesToRead,1); 65 //JUCE int numBytes=socket.read(buffer+writePointer, bytesToRead,1);
66 int numBytes=socket.read(buffer+writePointer, bytesToRead, false); //read without waiting. 66 int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting.
67 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header 67 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header
68 if(numBytes<0){ 68 if(numBytes<0){
69 printf("error numBytes1\n"); 69 printf("error numBytes1\n");
70 return -3; //TODO: something went wrong, you have to discard the rest of the packet! 70 return -3; //TODO: something went wrong, you have to discard the rest of the packet!
71 } 71 }
72 if(numBytes==0){//TODO: when inplementing waitUntilReady, this should not happen unless you actually receive a packate of size zero (is it at all possible?) 72 if(numBytes==0){//TODO: this should not happen unless you actually receive a packet of size zero (is it at all possible?)
73 // printf("received 0 bytes\n"); 73 // printf("received 0 bytes\n");
74 return 0; 74 return 0;
75 } 75 }
76 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead) 76 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead)
77 printf("error numBytes2: %d\n", numBytes); 77 printf("error numBytes2: %d\n", numBytes);
99 wrapWritePointer(); 99 wrapWritePointer();
100 return numBytes; 100 return numBytes;
101 } 101 }
102 return 0; //timeout occurred 102 return 0; //timeout occurred
103 } 103 }
104 //JUCE Thread(threadName),
105 #ifdef JUCE
106 ReceiveAudioThread::ReceiveAudioThread(const String &threadName) :
107 Thread(threadName),
108 #else
104 ReceiveAudioThread::ReceiveAudioThread() : 109 ReceiveAudioThread::ReceiveAudioThread() :
105 //JUCE Thread(threadName), 110 #endif /* JUCE */
106 socket(0), 111 socket(NULL),
107 listening(false), 112 listening(false),
108 bufferReady(false), 113 bufferReady(false),
109 buffer(NULL), 114 buffer(NULL),
110 stackBuffer(NULL), 115 stackBuffer(NULL),
111 bufferLength(0), 116 bufferLength(0),
112 lastValidPointer(0), 117 lastValidPointer(0),
113 waitForSocketTime(100), 118 waitForSocketTime(100),
114 threadPriority(95) 119 #ifdef JUCE
120 threadPriority(5)
121 #else
122 threadPriority(88)
123 #endif /* JUCE */
115 {}; 124 {};
116 ReceiveAudioThread::~ReceiveAudioThread(){ 125 ReceiveAudioThread::~ReceiveAudioThread(){
117 //JUCE stopThread(1000); 126 #ifdef JUCE
127 stopThread(1000);
128 #else
129 printf("inside the destructor\n");
118 while(threadRunning){ 130 while(threadRunning){
131 printf("while in the destructor\n");
119 usleep(sleepTime*2); //wait for thread to stop 132 usleep(sleepTime*2); //wait for thread to stop
120 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl; 133 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl;
121 } 134 }
135 #endif /* JUCE */
122 //TODO: check if thread stopped, otherwise kill it before dealloc 136 //TODO: check if thread stopped, otherwise kill it before dealloc
137 printf("dealloc\n");
123 dealloc(); 138 dealloc();
124 } 139 }
125 void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){ 140 void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){
126 dealloc(); 141 dealloc();
127 #ifdef JUCE 142 #ifdef JUCE
128 #else 143 #else
129 staticConstructor(); 144 staticConstructor();
130 objAddrs.push_back(this);//TODO: this line should be in the constructor 145 objAddrs.push_back(this);//TODO: this line should be in the constructor
131 #endif 146 #endif /* JUCE */
132 bindToPort(aPort); 147 bindToPort(aPort);
133 channel=aChannel; 148 channel=aChannel;
134 printf("Channel %d is receiving on port %d\n",aChannel, aPort); 149 printf("Channel %d is receiving on port %d\n",aChannel, aPort);
135 // fd=fopen("output.m","w"); //DEBUG 150 // fd=fopen("output.m","w"); //DEBUG
136 // fprintf(fd,"var=["); //DEBUG 151 // fprintf(fd,"var=["); //DEBUG
139 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... 154 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ...
140 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket 155 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket
141 buffer=(float*)malloc(sizeof(float)*bufferLength); 156 buffer=(float*)malloc(sizeof(float)*bufferLength);
142 if(buffer==NULL) // something wrong 157 if(buffer==NULL) // something wrong
143 return; 158 return;
144 bufferReady=true;
145 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength; 159 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength;
146 memset(buffer,0,bufferLength*sizeof(float)); 160 memset(buffer,0,bufferLength*sizeof(float));
147 stackBuffer=(float*)malloc(sizeof(float)*headerLength); 161 stackBuffer=(float*)malloc(sizeof(float)*headerLength);
162 if(stackBuffer==NULL) // something wrong
163 return;
164 bufferReady=true;
148 bytesToRead=sizeof(float)*(payloadLength + headerLength); 165 bytesToRead=sizeof(float)*(payloadLength + headerLength);
149 writePointer=-1; 166 writePointer=-1;
150 readPointer=0; 167 readPointer=0;
151 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently 168 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently
152 //JUCE startThread(threadPriority); 169 #ifdef JUCE
170 startThread(threadPriority);
171 #else
172 //TODO: the thread cannot be started here at the moment because init() is called in setup(), where tasks cannot be scheduled
173 #endif /* JUCE */
153 } 174 }
154 175
155 void ReceiveAudioThread::bindToPort(int aPort){ 176 void ReceiveAudioThread::bindToPort(int aPort){
156 listening=socket.bindToPort(aPort); 177 listening=socket.bindToPort(aPort);
157 #ifdef JUCE 178 #ifdef JUCE
158 #else 179 #else
159 if(listening==false) 180 if(listening==false) //this condition is valid also for JUCE, but we do not printf in JUCE
160 printf("Could not bind to port %d\n",aPort); 181 printf("Could not bind to port %d\n",aPort);
161 #endif 182 #endif /* JUCE */
162 } 183 }
163 bool ReceiveAudioThread::isListening(){ 184 bool ReceiveAudioThread::isListening(){
164 return listening; 185 return listening;
165 } 186 }
166 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples 187 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples
202 readPointer+=samplingRateRatio; 223 readPointer+=samplingRateRatio;
203 if((int)(0.5+readPointer)>=lastValidPointer){ 224 if((int)(0.5+readPointer)>=lastValidPointer){
204 readPointer=readPointer-lastValidPointer+headerLength; 225 readPointer=readPointer-lastValidPointer+headerLength;
205 } 226 }
206 } 227 }
207 return readPointer; 228 return length;
208 } 229 }
209 int ReceiveAudioThread::getSamplesSrc(float *destination, int length, float samplingRateRatio){ 230 int ReceiveAudioThread::getSamplesSrc(float *destination, int length, float samplingRateRatio){
210 return getSamplesSrc(destination, length, samplingRateRatio, 1,0); 231 return getSamplesSrc(destination, length, samplingRateRatio, 1,0);
211 // TODO: rewriting this so that it does not call the override method we can save a multiply and add 232 // TODO: rewriting this so that it does not call the override method we can save a multiply and add
212 // for each sample. 233 // for each sample.
213 } 234 }
214 bool ReceiveAudioThread::isBufferReady(){ 235 bool ReceiveAudioThread::isBufferReady(){
215 return bufferReady; 236 return bufferReady;
216 } 237 }
238 #ifdef JUCE
239 #else
217 void ReceiveAudioThread::startThread(){ 240 void ReceiveAudioThread::startThread(){
218 printf("receivedata is going to be scheduled\n"); 241 printf("receivedata is going to be scheduled\n");
219 BeagleRT_scheduleAuxiliaryTask(receiveDataTask); 242 BeagleRT_scheduleAuxiliaryTask(receiveDataTask);
220 printf("receivedata has been scheduled\n"); 243 printf("receivedata has been scheduled\n");
221 } 244 }
223 threadIsExiting=true; 246 threadIsExiting=true;
224 } 247 }
225 bool ReceiveAudioThread::threadShouldExit(){ 248 bool ReceiveAudioThread::threadShouldExit(){
226 return(gShouldStop || threadIsExiting ); 249 return(gShouldStop || threadIsExiting );
227 } 250 }
251 #endif /* JUCE */
228 void ReceiveAudioThread::run(){ 252 void ReceiveAudioThread::run(){
229 // fd2=fopen("buffer.m","w"); //DEBUG 253 // fd2=fopen("buffer.m","w"); //DEBUG
230 // fprintf(fd2, "buf=["); //DEBUG 254 // fprintf(fd2, "buf=["); //DEBUG
231 threadRunning=true; 255 threadRunning=true;
232 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting 256 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting
237 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){ 261 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){
238 // printf("%d\n", n); 262 // printf("%d\n", n);
239 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer(); 263 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer();
240 } 264 }
241 usleep(sleepTime); //TODO: use rt_task_sleep instead 265 usleep(sleepTime); //TODO: use rt_task_sleep instead
242 #endif 266 #endif /* JUCE */
243 } 267 }
244 threadRunning=false; 268 threadRunning=false;
269 printf("Thread is not running \n");
245 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG 270 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG
246 // fclose(fd);//DEBUG 271 // fclose(fd);//DEBUG
247 // fprintf(fd2,"];");//DEBUG 272 // fprintf(fd2,"];");//DEBUG
248 // fclose(fd2); //DEBUG 273 // fclose(fd2); //DEBUG
249 } 274 }