Mercurial > hg > beaglert
comparison core/ReceiveAudioThread.cpp @ 125:850a4a9bd832 scope-refactoring
Added ifdefs and unified the code with udpioplugin ... the latter has not been tested (or committed). TODO: still it hangs after ctrl-c BeagleRT (auxiliary tasks do not terminate). TODO: sometimes you can hear dropouts in the transmission. Maybe it is due to pointer drifting. Rebooting BBB fixes/affects this issue.
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Sat, 22 Aug 2015 02:53:36 +0100 |
parents | bc514f29c3aa |
children | 6c8fb6f07b47 |
comparison
equal
deleted
inserted
replaced
124:23137a333c93 | 125:850a4a9bd832 |
---|---|
18 return; | 18 return; |
19 staticConstructed=true; | 19 staticConstructed=true; |
20 threadIsExiting=false; | 20 threadIsExiting=false; |
21 receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities | 21 receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities |
22 } | 22 } |
23 #endif | 23 #endif /* JUCE */ |
24 | 24 |
25 void ReceiveAudioThread::dealloc(){ | 25 void ReceiveAudioThread::dealloc(){ |
26 free(buffer); | 26 free(buffer); |
27 buffer=NULL; | 27 buffer=NULL; |
28 free(stackBuffer); | 28 free(stackBuffer); |
61 if(socket.waitUntilReady(true, waitForSocketTime)){ // waitForSocketTime could have been set to -1 (wait forever), | 61 if(socket.waitUntilReady(true, waitForSocketTime)){ // waitForSocketTime could have been set to -1 (wait forever), |
62 // but it would have made it more difficult for the thread to be killed | 62 // but it would have made it more difficult for the thread to be killed |
63 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 | 63 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 |
64 //read header+payload | 64 //read header+payload |
65 //JUCE int numBytes=socket.read(buffer+writePointer, bytesToRead,1); | 65 //JUCE int numBytes=socket.read(buffer+writePointer, bytesToRead,1); |
66 int numBytes=socket.read(buffer+writePointer, bytesToRead, false); //read without waiting. | 66 int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting. |
67 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header | 67 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header |
68 if(numBytes<0){ | 68 if(numBytes<0){ |
69 printf("error numBytes1\n"); | 69 printf("error numBytes1\n"); |
70 return -3; //TODO: something went wrong, you have to discard the rest of the packet! | 70 return -3; //TODO: something went wrong, you have to discard the rest of the packet! |
71 } | 71 } |
72 if(numBytes==0){//TODO: when inplementing waitUntilReady, this should not happen unless you actually receive a packate of size zero (is it at all possible?) | 72 if(numBytes==0){//TODO: this should not happen unless you actually receive a packet of size zero (is it at all possible?) |
73 // printf("received 0 bytes\n"); | 73 // printf("received 0 bytes\n"); |
74 return 0; | 74 return 0; |
75 } | 75 } |
76 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead) | 76 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead) |
77 printf("error numBytes2: %d\n", numBytes); | 77 printf("error numBytes2: %d\n", numBytes); |
99 wrapWritePointer(); | 99 wrapWritePointer(); |
100 return numBytes; | 100 return numBytes; |
101 } | 101 } |
102 return 0; //timeout occurred | 102 return 0; //timeout occurred |
103 } | 103 } |
104 //JUCE Thread(threadName), | |
105 #ifdef JUCE | |
106 ReceiveAudioThread::ReceiveAudioThread(const String &threadName) : | |
107 Thread(threadName), | |
108 #else | |
104 ReceiveAudioThread::ReceiveAudioThread() : | 109 ReceiveAudioThread::ReceiveAudioThread() : |
105 //JUCE Thread(threadName), | 110 #endif /* JUCE */ |
106 socket(0), | 111 socket(NULL), |
107 listening(false), | 112 listening(false), |
108 bufferReady(false), | 113 bufferReady(false), |
109 buffer(NULL), | 114 buffer(NULL), |
110 stackBuffer(NULL), | 115 stackBuffer(NULL), |
111 bufferLength(0), | 116 bufferLength(0), |
112 lastValidPointer(0), | 117 lastValidPointer(0), |
113 waitForSocketTime(100), | 118 waitForSocketTime(100), |
114 threadPriority(95) | 119 #ifdef JUCE |
120 threadPriority(5) | |
121 #else | |
122 threadPriority(88) | |
123 #endif /* JUCE */ | |
115 {}; | 124 {}; |
116 ReceiveAudioThread::~ReceiveAudioThread(){ | 125 ReceiveAudioThread::~ReceiveAudioThread(){ |
117 //JUCE stopThread(1000); | 126 #ifdef JUCE |
127 stopThread(1000); | |
128 #else | |
129 printf("inside the destructor\n"); | |
118 while(threadRunning){ | 130 while(threadRunning){ |
131 printf("while in the destructor\n"); | |
119 usleep(sleepTime*2); //wait for thread to stop | 132 usleep(sleepTime*2); //wait for thread to stop |
120 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl; | 133 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl; |
121 } | 134 } |
135 #endif /* JUCE */ | |
122 //TODO: check if thread stopped, otherwise kill it before dealloc | 136 //TODO: check if thread stopped, otherwise kill it before dealloc |
137 printf("dealloc\n"); | |
123 dealloc(); | 138 dealloc(); |
124 } | 139 } |
125 void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){ | 140 void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){ |
126 dealloc(); | 141 dealloc(); |
127 #ifdef JUCE | 142 #ifdef JUCE |
128 #else | 143 #else |
129 staticConstructor(); | 144 staticConstructor(); |
130 objAddrs.push_back(this);//TODO: this line should be in the constructor | 145 objAddrs.push_back(this);//TODO: this line should be in the constructor |
131 #endif | 146 #endif /* JUCE */ |
132 bindToPort(aPort); | 147 bindToPort(aPort); |
133 channel=aChannel; | 148 channel=aChannel; |
134 printf("Channel %d is receiving on port %d\n",aChannel, aPort); | 149 printf("Channel %d is receiving on port %d\n",aChannel, aPort); |
135 // fd=fopen("output.m","w"); //DEBUG | 150 // fd=fopen("output.m","w"); //DEBUG |
136 // fprintf(fd,"var=["); //DEBUG | 151 // fprintf(fd,"var=["); //DEBUG |
139 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... | 154 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... |
140 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket | 155 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket |
141 buffer=(float*)malloc(sizeof(float)*bufferLength); | 156 buffer=(float*)malloc(sizeof(float)*bufferLength); |
142 if(buffer==NULL) // something wrong | 157 if(buffer==NULL) // something wrong |
143 return; | 158 return; |
144 bufferReady=true; | |
145 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength; | 159 lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength; |
146 memset(buffer,0,bufferLength*sizeof(float)); | 160 memset(buffer,0,bufferLength*sizeof(float)); |
147 stackBuffer=(float*)malloc(sizeof(float)*headerLength); | 161 stackBuffer=(float*)malloc(sizeof(float)*headerLength); |
162 if(stackBuffer==NULL) // something wrong | |
163 return; | |
164 bufferReady=true; | |
148 bytesToRead=sizeof(float)*(payloadLength + headerLength); | 165 bytesToRead=sizeof(float)*(payloadLength + headerLength); |
149 writePointer=-1; | 166 writePointer=-1; |
150 readPointer=0; | 167 readPointer=0; |
151 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently | 168 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently |
152 //JUCE startThread(threadPriority); | 169 #ifdef JUCE |
170 startThread(threadPriority); | |
171 #else | |
172 //TODO: the thread cannot be started here at the moment because init() is called in setup(), where tasks cannot be scheduled | |
173 #endif /* JUCE */ | |
153 } | 174 } |
154 | 175 |
155 void ReceiveAudioThread::bindToPort(int aPort){ | 176 void ReceiveAudioThread::bindToPort(int aPort){ |
156 listening=socket.bindToPort(aPort); | 177 listening=socket.bindToPort(aPort); |
157 #ifdef JUCE | 178 #ifdef JUCE |
158 #else | 179 #else |
159 if(listening==false) | 180 if(listening==false) //this condition is valid also for JUCE, but we do not printf in JUCE |
160 printf("Could not bind to port %d\n",aPort); | 181 printf("Could not bind to port %d\n",aPort); |
161 #endif | 182 #endif /* JUCE */ |
162 } | 183 } |
163 bool ReceiveAudioThread::isListening(){ | 184 bool ReceiveAudioThread::isListening(){ |
164 return listening; | 185 return listening; |
165 } | 186 } |
166 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples | 187 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples |
202 readPointer+=samplingRateRatio; | 223 readPointer+=samplingRateRatio; |
203 if((int)(0.5+readPointer)>=lastValidPointer){ | 224 if((int)(0.5+readPointer)>=lastValidPointer){ |
204 readPointer=readPointer-lastValidPointer+headerLength; | 225 readPointer=readPointer-lastValidPointer+headerLength; |
205 } | 226 } |
206 } | 227 } |
207 return readPointer; | 228 return length; |
208 } | 229 } |
209 int ReceiveAudioThread::getSamplesSrc(float *destination, int length, float samplingRateRatio){ | 230 int ReceiveAudioThread::getSamplesSrc(float *destination, int length, float samplingRateRatio){ |
210 return getSamplesSrc(destination, length, samplingRateRatio, 1,0); | 231 return getSamplesSrc(destination, length, samplingRateRatio, 1,0); |
211 // TODO: rewriting this so that it does not call the override method we can save a multiply and add | 232 // TODO: rewriting this so that it does not call the override method we can save a multiply and add |
212 // for each sample. | 233 // for each sample. |
213 } | 234 } |
214 bool ReceiveAudioThread::isBufferReady(){ | 235 bool ReceiveAudioThread::isBufferReady(){ |
215 return bufferReady; | 236 return bufferReady; |
216 } | 237 } |
238 #ifdef JUCE | |
239 #else | |
217 void ReceiveAudioThread::startThread(){ | 240 void ReceiveAudioThread::startThread(){ |
218 printf("receivedata is going to be scheduled\n"); | 241 printf("receivedata is going to be scheduled\n"); |
219 BeagleRT_scheduleAuxiliaryTask(receiveDataTask); | 242 BeagleRT_scheduleAuxiliaryTask(receiveDataTask); |
220 printf("receivedata has been scheduled\n"); | 243 printf("receivedata has been scheduled\n"); |
221 } | 244 } |
223 threadIsExiting=true; | 246 threadIsExiting=true; |
224 } | 247 } |
225 bool ReceiveAudioThread::threadShouldExit(){ | 248 bool ReceiveAudioThread::threadShouldExit(){ |
226 return(gShouldStop || threadIsExiting ); | 249 return(gShouldStop || threadIsExiting ); |
227 } | 250 } |
251 #endif /* JUCE */ | |
228 void ReceiveAudioThread::run(){ | 252 void ReceiveAudioThread::run(){ |
229 // fd2=fopen("buffer.m","w"); //DEBUG | 253 // fd2=fopen("buffer.m","w"); //DEBUG |
230 // fprintf(fd2, "buf=["); //DEBUG | 254 // fprintf(fd2, "buf=["); //DEBUG |
231 threadRunning=true; | 255 threadRunning=true; |
232 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting | 256 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting |
237 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){ | 261 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){ |
238 // printf("%d\n", n); | 262 // printf("%d\n", n); |
239 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer(); | 263 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer(); |
240 } | 264 } |
241 usleep(sleepTime); //TODO: use rt_task_sleep instead | 265 usleep(sleepTime); //TODO: use rt_task_sleep instead |
242 #endif | 266 #endif /* JUCE */ |
243 } | 267 } |
244 threadRunning=false; | 268 threadRunning=false; |
269 printf("Thread is not running \n"); | |
245 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG | 270 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG |
246 // fclose(fd);//DEBUG | 271 // fclose(fd);//DEBUG |
247 // fprintf(fd2,"];");//DEBUG | 272 // fprintf(fd2,"];");//DEBUG |
248 // fclose(fd2); //DEBUG | 273 // fclose(fd2); //DEBUG |
249 } | 274 } |