comparison core/ReceiveAudioThread.cpp @ 119:c692827083e1 scope-refactoring

Enabled multi channel audio receive
author Giulio Moro <giuliomoro@yahoo.it>
date Fri, 21 Aug 2015 15:21:34 +0100
parents ada68d50e56a
children cdd441a304a9
comparison
equal deleted inserted replaced
118:26ad97b8aa9e 119:c692827083e1
1 #include "ReceiveAudioThread.h" 1 #include "ReceiveAudioThread.h"
2
3 #ifdef JUCE
4 #else
2 //initialise static members 5 //initialise static members
6 bool ReceiveAudioThread::staticConstructed=false;
3 AuxiliaryTask ReceiveAudioThread::receiveDataTask=NULL; 7 AuxiliaryTask ReceiveAudioThread::receiveDataTask=NULL;
8 std::vector<ReceiveAudioThread *> ReceiveAudioThread::objAddrs(0);
9 bool ReceiveAudioThread::threadRunning;
10 bool ReceiveAudioThread::threadIsExiting;
11 int ReceiveAudioThread::sleepTime;
12
13 void receiveData(){
14 ReceiveAudioThread::run();
15 }
16 void ReceiveAudioThread::staticConstructor(){
17 if(staticConstructed==true)
18 return;
19 staticConstructed=true;
20 threadIsExiting=false;
21 receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities
22 }
23 #endif
4 24
5 void ReceiveAudioThread::dealloc(){ 25 void ReceiveAudioThread::dealloc(){
6 free(buffer); 26 free(buffer);
7 buffer=NULL; 27 buffer=NULL;
8 free(stackBuffer); 28 free(stackBuffer);
43 // but it would have made it more difficult for the thread to be killed 63 // but it would have made it more difficult for the thread to be killed
44 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 64 pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0
45 //read header+payload 65 //read header+payload
46 //JUCE int numBytes=socket.read(buffer+writePointer, bytesToRead,1); 66 //JUCE int numBytes=socket.read(buffer+writePointer, bytesToRead,1);
47 int numBytes=socket.read(buffer+writePointer, bytesToRead); 67 int numBytes=socket.read(buffer+writePointer, bytesToRead);
48 //TODO: do something with the data in the header (e.g.: check that timestamp is sequential,
49 // check the channel number)
50 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header 68 //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header
51 if(numBytes<0){ 69 if(numBytes<0){
52 printf("error numBytes1\n"); 70 printf("error numBytes1\n");
53 return -3; //TODO: something went wrong, you have to discard the rest of the packet! 71 return -3; //TODO: something went wrong, you have to discard the rest of the packet!
54 } 72 }
55 if(numBytes==0){//TODO: when inplementing waitUntilReady, this should not happen unless you actually receive a packate of size zero (is it at all possible?) 73 if(numBytes==0){//TODO: when inplementing waitUntilReady, this should not happen unless you actually receive a packate of size zero (is it at all possible?)
56 // printf("received 0 bytes\n"); 74 // printf("received 0 bytes\n");
57 return 0; 75 return 0;
58 } 76 }
59 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<numBytes) 77 if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead)
60 printf("error numBytes2: %d\n", numBytes); 78 printf("error numBytes2: %d\n", numBytes);
61 return -4; //TODO: something went wrong, less bytes than expected in the payload. 79 return -4; //TODO: something went wrong, less bytes than expected in the payload.
62 } 80 }
63 if(buffer[writePointer]!=0) 81 if(channel!=(int)buffer[writePointer]){
64 return 0; //wrong channel 82 // printf("I am channel %d, but I received data for channel %d\n", channel, (int)buffer[writePointer]);
65 // printf("Received a message of length %d, it was on channel %d and timestamp %d\n", numBytes, (int)buffer[writePointer], (int)buffer[writePointer+1]); 83 return -5;
84 }
85 //TODO: do something else with the data in the header (e.g.: check that timestamp is sequential)
86 // rt_printf("Received a message of length %d, it was on channel %d and timestamp %d\n", numBytes, (int)buffer[writePointer], (int)buffer[writePointer+1]);
66 popPayload(writePointer); //restore headerLength payload samples. This could be skipped if writePointer==0 87 popPayload(writePointer); //restore headerLength payload samples. This could be skipped if writePointer==0
67 88
68 //even though we just wrote (payloadLength+headerLength) samples in the buffer, 89 //even though we just wrote (payloadLength+headerLength) samples in the buffer,
69 //we only increment by payloadLength. This way, next time a socket.read is performed, we will 90 //we only increment by payloadLength. This way, next time a socket.read is performed, we will
70 //backup the last headerLength samples that we just wrote and we will overwrite them with 91 //backup the last headerLength samples that we just wrote and we will overwrite them with
71 //the header from the new read. After parsing the header we will then restore the backed up samples. 92 //the header from the new read. After parsing the header we will then restore the backed up samples.
72 //This way we guarantee that, apart from the first headerLength samples, buffer is a circular buffer! 93 //This way we guarantee that, apart from the first headerLength samples, buffer is a circular buffer!
73 printf("writepointer:%d\n", writePointer); 94 // printf("writepointer:%d\n", writePointer);
74 writePointer+=payloadLength; 95 writePointer+=payloadLength;
75 96
76 if(writePointer>lastValidPointer){ 97 if(writePointer>lastValidPointer){
77 // lastValidPointer=writePointer+headerLength; 98 // lastValidPointer=writePointer+headerLength;
78 } 99 }
84 ReceiveAudioThread::ReceiveAudioThread() : 105 ReceiveAudioThread::ReceiveAudioThread() :
85 //JUCE Thread(threadName), 106 //JUCE Thread(threadName),
86 socket(0), 107 socket(0),
87 listening(false), 108 listening(false),
88 bufferReady(false), 109 bufferReady(false),
89 threadIsExiting(false),
90 buffer(NULL), 110 buffer(NULL),
91 stackBuffer(NULL), 111 stackBuffer(NULL),
92 bufferLength(0), 112 bufferLength(0),
93 lastValidPointer(0), 113 lastValidPointer(0),
94 sleepTime(2000),
95 waitForSocketTime(100), 114 waitForSocketTime(100),
96 threadPriority(95) 115 threadPriority(95)
97 {}; 116 {};
98 ReceiveAudioThread::~ReceiveAudioThread(){ 117 ReceiveAudioThread::~ReceiveAudioThread(){
99 //JUCE stopThread(1000); 118 //JUCE stopThread(1000);
100 while(threadRunning){ 119 while(threadRunning){
101 sleep(sleepTime*2); //wait for thread to stop 120 usleep(sleepTime*2); //wait for thread to stop
102 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl; 121 std::cout<< "Waiting for receiveAudioTask to stop" << std::endl;
103 } 122 }
104 //TODO: check if thread stopped, otherwise kill it before dealloc 123 //TODO: check if thread stopped, otherwise kill it before dealloc
105 dealloc(); 124 dealloc();
106 } 125 }
107 ReceiveAudioThread *gRAT; 126 void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){
108 void receiveData(){
109 gRAT->run();
110 }
111 void ReceiveAudioThread::init(int aSamplesPerBlock){
112 dealloc(); 127 dealloc();
128 #ifdef JUCE
129 #else
130 staticConstructor();
131 objAddrs.push_back(this);//TODO: this line should be in the constructor
132 #endif
133 bindToPort(aPort);
134 channel=aChannel;
113 // fd=fopen("output.m","w"); //DEBUG 135 // fd=fopen("output.m","w"); //DEBUG
114 // fprintf(fd,"var=["); //DEBUG 136 // fprintf(fd,"var=["); //DEBUG
115 gRAT=this;
116 headerLength=2; 137 headerLength=2;
117 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending. 138 payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending.
118 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... 139 bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ...
119 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket 140 //We keep a headerLength padding at the beginning of the array to allow full reads from the socket
120 buffer=(float*)malloc(sizeof(float)*bufferLength); 141 buffer=(float*)malloc(sizeof(float)*bufferLength);
126 stackBuffer=(float*)malloc(sizeof(float)*headerLength); 147 stackBuffer=(float*)malloc(sizeof(float)*headerLength);
127 bytesToRead=sizeof(float)*(payloadLength + headerLength); 148 bytesToRead=sizeof(float)*(payloadLength + headerLength);
128 writePointer=-1; 149 writePointer=-1;
129 readPointer=0; //TODO: this *4 is sortof a security margin 150 readPointer=0; //TODO: this *4 is sortof a security margin
130 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently 151 sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently
131 receiveDataTask=BeagleRT_createAuxiliaryTask( receiveData, threadPriority, "receiveDataTask");
132 //JUCE startThread(threadPriority); 152 //JUCE startThread(threadPriority);
133 } 153 }
134 154
135 void ReceiveAudioThread::bindToPort(int aPort){ 155 void ReceiveAudioThread::bindToPort(int aPort){
136 listening=socket.bindToPort(aPort); 156 listening=socket.bindToPort(aPort);
157 #ifdef JUCE
158 #else
159 if(listening==false)
160 printf("Could not bind to port %d\n",aPort);
161 #endif
137 } 162 }
138 bool ReceiveAudioThread::isListening(){ 163 bool ReceiveAudioThread::isListening(){
139 return listening; 164 return listening;
140 } 165 }
141 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples 166 float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples
194 void ReceiveAudioThread::run(){ 219 void ReceiveAudioThread::run(){
195 // fd2=fopen("buffer.m","w"); //DEBUG 220 // fd2=fopen("buffer.m","w"); //DEBUG
196 // fprintf(fd2, "buf=["); //DEBUG 221 // fprintf(fd2, "buf=["); //DEBUG
197 threadRunning=true; 222 threadRunning=true;
198 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting 223 while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting
224 #ifdef JUCE
199 readUdpToBuffer(); // read into the oldBuffer 225 readUdpToBuffer(); // read into the oldBuffer
200 usleep(sleepTime); 226 usleep(sleepTime);
227 #else
228 for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){
229 ReceiveAudioThread::objAddrs[n]->readUdpToBuffer();
230 }
231 usleep(sleepTime); //TODO: use rt_task_sleep instead
232 #endif
201 } 233 }
202 threadRunning=false; 234 threadRunning=false;
203 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG 235 // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG
204 // fclose(fd);//DEBUG 236 // fclose(fd);//DEBUG
205 // fprintf(fd2,"];");//DEBUG 237 // fprintf(fd2,"];");//DEBUG