Mercurial > hg > beaglert
comparison core/ReceiveAudioThread.cpp @ 119:c692827083e1 scope-refactoring
Enabled multi channel audio receive
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Fri, 21 Aug 2015 15:21:34 +0100 |
parents | ada68d50e56a |
children | cdd441a304a9 |
comparison
equal
deleted
inserted
replaced
118:26ad97b8aa9e | 119:c692827083e1 |
---|---|
1 #include "ReceiveAudioThread.h" | 1 #include "ReceiveAudioThread.h" |
2 | |
3 #ifdef JUCE | |
4 #else | |
2 //initialise static members | 5 //initialise static members |
6 bool ReceiveAudioThread::staticConstructed=false; | |
3 AuxiliaryTask ReceiveAudioThread::receiveDataTask=NULL; | 7 AuxiliaryTask ReceiveAudioThread::receiveDataTask=NULL; |
8 std::vector<ReceiveAudioThread *> ReceiveAudioThread::objAddrs(0); | |
9 bool ReceiveAudioThread::threadRunning; | |
10 bool ReceiveAudioThread::threadIsExiting; | |
11 int ReceiveAudioThread::sleepTime; | |
12 | |
13 void receiveData(){ | |
14 ReceiveAudioThread::run(); | |
15 } | |
16 void ReceiveAudioThread::staticConstructor(){ | |
17 if(staticConstructed==true) | |
18 return; | |
19 staticConstructed=true; | |
20 threadIsExiting=false; | |
21 receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities | |
22 } | |
23 #endif | |
4 | 24 |
5 void ReceiveAudioThread::dealloc(){ | 25 void ReceiveAudioThread::dealloc(){ |
6 free(buffer); | 26 free(buffer); |
7 buffer=NULL; | 27 buffer=NULL; |
8 free(stackBuffer); | 28 free(stackBuffer); |
43 // but it would have made it more difficult for the thread to be killed | 63 // but it would have made it more difficult for the thread to be killed |
44 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 | 64 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 |
45 //read header+payload | 65 //read header+payload |
46 //JUCE int numBytes=socket.read(buffer+writePointer, bytesToRead,1); | 66 //JUCE int numBytes=socket.read(buffer+writePointer, bytesToRead,1); |
47 int numBytes=socket.read(buffer+writePointer, bytesToRead); | 67 int numBytes=socket.read(buffer+writePointer, bytesToRead); |
48 //TODO: do something with the data in the header (e.g.: check that timestamp is sequential, | |
49 // check the channel number) | |
50 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header | 68 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header |
51 if(numBytes<0){ | 69 if(numBytes<0){ |
52 printf("error numBytes1\n"); | 70 printf("error numBytes1\n"); |
53 return -3; //TODO: something went wrong, you have to discard the rest of the packet! | 71 return -3; //TODO: something went wrong, you have to discard the rest of the packet! |
54 } | 72 } |
55 if(numBytes==0){//TODO: when inplementing waitUntilReady, this should not happen unless you actually receive a packate of size zero (is it at all possible?) | 73 if(numBytes==0){//TODO: when inplementing waitUntilReady, this should not happen unless you actually receive a packate of size zero (is it at all possible?) |
56 // printf("received 0 bytes\n"); | 74 // printf("received 0 bytes\n"); |
57 return 0; | 75 return 0; |
58 } | 76 } |
59 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<numBytes) | 77 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead) |
60 printf("error numBytes2: %d\n", numBytes); | 78 printf("error numBytes2: %d\n", numBytes); |
61 return -4; //TODO: something went wrong, less bytes than expected in the payload. | 79 return -4; //TODO: something went wrong, less bytes than expected in the payload. |
62 } | 80 } |
63 if(buffer[writePointer]!=0) | 81 if(channel!=(int)buffer[writePointer]){ |
64 return 0; //wrong channel | 82 // printf("I am channel %d, but I received data for channel %d\n", channel, (int)buffer[writePointer]); |
65 // printf("Received a message of length %d, it was on channel %d and timestamp %d\n", numBytes, (int)buffer[writePointer], (int)buffer[writePointer+1]); | 83 return -5; |
84 } | |
85 //TODO: do something else with the data in the header (e.g.: check that timestamp is sequential) | |
86 // rt_printf("Received a message of length %d, it was on channel %d and timestamp %d\n", numBytes, (int)buffer[writePointer], (int)buffer[writePointer+1]); | |
66 popPayload(writePointer); //restore headerLength payload samples. This could be skipped if writePointer==0 | 87 popPayload(writePointer); //restore headerLength payload samples. This could be skipped if writePointer==0 |
67 | 88 |
68 //even though we just wrote (payloadLength+headerLength) samples in the buffer, | 89 //even though we just wrote (payloadLength+headerLength) samples in the buffer, |
69 //we only increment by payloadLength. This way, next time a socket.read is performed, we will | 90 //we only increment by payloadLength. This way, next time a socket.read is performed, we will |
70 //backup the last headerLength samples that we just wrote and we will overwrite them with | 91 //backup the last headerLength samples that we just wrote and we will overwrite them with |
71 //the header from the new read. After parsing the header we will then restore the backed up samples. | 92 //the header from the new read. After parsing the header we will then restore the backed up samples. |
72 //This way we guarantee that, apart from the first headerLength samples, buffer is a circular buffer! | 93 //This way we guarantee that, apart from the first headerLength samples, buffer is a circular buffer! |
73 printf("writepointer:%d\n", writePointer); | 94 // printf("writepointer:%d\n", writePointer); |
74 writePointer+=payloadLength; | 95 writePointer+=payloadLength; |
75 | 96 |
76 if(writePointer>lastValidPointer){ | 97 if(writePointer>lastValidPointer){ |
77 // lastValidPointer=writePointer+headerLength; | 98 // lastValidPointer=writePointer+headerLength; |
78 } | 99 } |
84 ReceiveAudioThread::ReceiveAudioThread() : | 105 ReceiveAudioThread::ReceiveAudioThread() : |
85 //JUCE Thread(threadName), | 106 //JUCE Thread(threadName), |
86 socket(0), | 107 socket(0), |
87 listening(false), | 108 listening(false), |
88 bufferReady(false), | 109 bufferReady(false), |
89 threadIsExiting(false), | |
90 buffer(NULL), | 110 buffer(NULL), |
91 stackBuffer(NULL), | 111 stackBuffer(NULL), |
92 bufferLength(0), | 112 bufferLength(0), |
93 lastValidPointer(0), | 113 lastValidPointer(0), |
94 sleepTime(2000), | |
95 waitForSocketTime(100), | 114 waitForSocketTime(100), |
96 threadPriority(95) | 115 threadPriority(95) |
97 {}; | 116 {}; |
98 ReceiveAudioThread::~ReceiveAudioThread(){ | 117 ReceiveAudioThread::~ReceiveAudioThread(){ |
99 //JUCE stopThread(1000); | 118 //JUCE stopThread(1000); |
100 while(threadRunning){ | 119 while(threadRunning){ |
101 sleep(sleepTime*2); //wait for thread to stop | 120 usleep(sleepTime*2); //wait for thread to stop |
102 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl; | 121 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl; |
103 } | 122 } |
104 //TODO: check if thread stopped, otherwise kill it before dealloc | 123 //TODO: check if thread stopped, otherwise kill it before dealloc |
105 dealloc(); | 124 dealloc(); |
106 } | 125 } |
107 ReceiveAudioThread *gRAT; | 126 void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){ |
108 void receiveData(){ | |
109 gRAT->run(); | |
110 } | |
111 void ReceiveAudioThread::init(int aSamplesPerBlock){ | |
112 dealloc(); | 127 dealloc(); |
128 #ifdef JUCE | |
129 #else | |
130 staticConstructor(); | |
131 objAddrs.push_back(this);//TODO: this line should be in the constructor | |
132 #endif | |
133 bindToPort(aPort); | |
134 channel=aChannel; | |
113 // fd=fopen("output.m","w"); //DEBUG | 135 // fd=fopen("output.m","w"); //DEBUG |
114 // fprintf(fd,"var=["); //DEBUG | 136 // fprintf(fd,"var=["); //DEBUG |
115 gRAT=this; | |
116 headerLength=2; | 137 headerLength=2; |
117 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending. | 138 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending. |
118 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... | 139 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... |
119 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket | 140 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket |
120 buffer=(float*)malloc(sizeof(float)*bufferLength); | 141 buffer=(float*)malloc(sizeof(float)*bufferLength); |
126 stackBuffer=(float*)malloc(sizeof(float)*headerLength); | 147 stackBuffer=(float*)malloc(sizeof(float)*headerLength); |
127 bytesToRead=sizeof(float)*(payloadLength + headerLength); | 148 bytesToRead=sizeof(float)*(payloadLength + headerLength); |
128 writePointer=-1; | 149 writePointer=-1; |
129 readPointer=0; //TODO: this *4 is sortof a security margin | 150 readPointer=0; //TODO: this *4 is sortof a security margin |
130 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently | 151 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently |
131 receiveDataTask=BeagleRT_createAuxiliaryTask( receiveData, threadPriority, "receiveDataTask"); | |
132 //JUCE startThread(threadPriority); | 152 //JUCE startThread(threadPriority); |
133 } | 153 } |
134 | 154 |
135 void ReceiveAudioThread::bindToPort(int aPort){ | 155 void ReceiveAudioThread::bindToPort(int aPort){ |
136 listening=socket.bindToPort(aPort); | 156 listening=socket.bindToPort(aPort); |
157 #ifdef JUCE | |
158 #else | |
159 if(listening==false) | |
160 printf("Could not bind to port %d\n",aPort); | |
161 #endif | |
137 } | 162 } |
138 bool ReceiveAudioThread::isListening(){ | 163 bool ReceiveAudioThread::isListening(){ |
139 return listening; | 164 return listening; |
140 } | 165 } |
141 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples | 166 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples |
194 void ReceiveAudioThread::run(){ | 219 void ReceiveAudioThread::run(){ |
195 // fd2=fopen("buffer.m","w"); //DEBUG | 220 // fd2=fopen("buffer.m","w"); //DEBUG |
196 // fprintf(fd2, "buf=["); //DEBUG | 221 // fprintf(fd2, "buf=["); //DEBUG |
197 threadRunning=true; | 222 threadRunning=true; |
198 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting | 223 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting |
224 #ifdef JUCE | |
199 readUdpToBuffer(); // read into the oldBuffer | 225 readUdpToBuffer(); // read into the oldBuffer |
200 usleep(sleepTime); | 226 usleep(sleepTime); |
227 #else | |
228 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){ | |
229 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer(); | |
230 } | |
231 usleep(sleepTime); //TODO: use rt_task_sleep instead | |
232 #endif | |
201 } | 233 } |
202 threadRunning=false; | 234 threadRunning=false; |
203 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG | 235 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG |
204 // fclose(fd);//DEBUG | 236 // fclose(fd);//DEBUG |
205 // fprintf(fd2,"];");//DEBUG | 237 // fprintf(fd2,"];");//DEBUG |