Mercurial > hg > beaglert
comparison core/NetworkSend.cpp @ 235:3d41a6fa1830
Merge
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Sun, 10 Apr 2016 04:08:06 +0200 |
parents | 6a23c07d0fbb |
children | e4392164b458 |
comparison
equal
deleted
inserted
replaced
216:869f5e703844 | 235:3d41a6fa1830 |
---|---|
1 #include "NetworkSend.h" | |
2 | |
3 #ifdef USE_JUCE | |
4 #else | |
5 //initialize the static members of NetworkSend | |
6 bool NetworkSend::staticConstructed=false; | |
7 int NetworkSend::sleepTimeMs; | |
8 bool NetworkSend::threadIsExiting; | |
9 bool NetworkSend::threadRunning; | |
10 std::vector<NetworkSend*> NetworkSend::objAddrs(0); | |
11 AuxiliaryTask NetworkSend::sendDataTask=NULL; | |
12 | |
13 void sendData(){ | |
14 NetworkSend::run(); | |
15 } | |
16 void NetworkSend::staticConstructor(){ | |
17 if(staticConstructed==true) | |
18 return; | |
19 staticConstructed=true; | |
20 threadIsExiting=false; | |
21 threadRunning=false; | |
22 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority | |
23 } | |
24 void NetworkSend::sendAllData(){ | |
25 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){ | |
26 NetworkSend::objAddrs[n]->sendData(); | |
27 } | |
28 } | |
29 int NetworkSend::getNumInstances(){ | |
30 return objAddrs.size(); | |
31 } | |
32 void NetworkSend::startThread(){ | |
33 BeagleRT_scheduleAuxiliaryTask(sendDataTask); | |
34 } | |
35 void NetworkSend::stopThread(){ | |
36 threadIsExiting=true; | |
37 } | |
38 bool NetworkSend::threadShouldExit(){ | |
39 return(gShouldStop || threadIsExiting); | |
40 } | |
41 bool NetworkSend::isThreadRunning(){ | |
42 return threadRunning; | |
43 } | |
44 #endif /* USE_JUCE */ | |
45 | |
46 #ifdef USE_JUCE | |
47 NetworkSend::NetworkSend(const String &threadName): | |
48 Thread(threadName) | |
49 #else | |
50 NetworkSend::NetworkSend() | |
51 #endif /* USE_JUCE */ | |
52 { | |
53 channel.buffers=NULL; | |
54 channel.doneOnTime=NULL; | |
55 channel.readyToBeSent=NULL; | |
56 channel.enabled=false; | |
57 sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable | |
58 channel.sampleCount=0; | |
59 } | |
60 | |
61 NetworkSend::~NetworkSend(){ | |
62 #ifdef USE_JUCE | |
63 stopThread(1000); | |
64 #else | |
65 stopThread(); | |
66 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances; | |
67 if(objAddrs[n]==this){ | |
68 objAddrs.erase(objAddrs.begin()+n); | |
69 break; | |
70 } | |
71 } | |
72 #endif | |
73 dealloc(); | |
74 } | |
75 void NetworkSend::dealloc(){ | |
76 channel.enabled=false; | |
77 if(channel.buffers!=NULL){ | |
78 for(int n=0; n<channel.numBuffers; n++){ | |
79 free(channel.buffers[n]); | |
80 channel.buffers[n]=NULL; | |
81 } | |
82 free(channel.buffers); | |
83 channel.buffers=NULL; | |
84 } | |
85 free(channel.readyToBeSent); | |
86 channel.readyToBeSent=NULL; | |
87 free(channel.doneOnTime); | |
88 channel.doneOnTime=NULL; | |
89 } | |
90 void NetworkSend::cleanup(){ | |
91 dealloc(); | |
92 } | |
93 | |
94 void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){ | |
95 #ifdef USE_JUCE | |
96 #else | |
97 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible | |
98 //because of limitations in BeagleRT_createAuxiliaryTask() | |
99 //keep track of added active instances | |
100 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if | |
101 // an instance of NetworkSend is then declared globally: the constructor gets called, | |
102 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without | |
103 // any destructor being called in between ... Have a look here | |
104 // http://stackoverflow.com/questions/7542054/global-vector-emptying-itself-between-calls . | |
105 // and maybe use accessor function instead of global, as was done in #1374 | |
106 #endif /* USE_JUCE */ | |
107 cleanup(); | |
108 int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength; | |
109 channel.numBuffers= (1+numSamples/channel.bufferLength) * 3; //the +1 takes the ceil() of the division | |
110 channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*)); | |
111 printf("NumBuffers: %d\n", channel.numBuffers); | |
112 if(channel.buffers==NULL) | |
113 return; | |
114 for(int n=0; n<channel.numBuffers; n++){ | |
115 channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float)); | |
116 if(channel.buffers[n]==NULL) | |
117 return; | |
118 } | |
119 channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool)); | |
120 channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool)); | |
121 for(int n=0; n<channel.numBuffers; n++){ | |
122 channel.readyToBeSent[n]=false; | |
123 channel.doneOnTime[n]=true; | |
124 } | |
125 if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL) | |
126 return; | |
127 channel.writePointer=channel.headerLength; | |
128 channel.writeBuffer=0; | |
129 channel.readBuffer=0; | |
130 setChannelNumber(aChannelNumber); | |
131 setPort(aPort); //TODO: check for the return value | |
132 setServer(aServer); //TODO: check for the return value | |
133 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate); | |
134 channel.enabled=true; | |
135 } | |
136 | |
137 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method | |
138 if(channel.enabled==false) | |
139 return; | |
140 if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ... | |
141 channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such | |
142 // printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); | |
143 channel.writePointer=channel.headerLength; //reset the writePointer | |
144 channel.writeBuffer=(channel.writeBuffer+1); //switch buffer | |
145 if(channel.writeBuffer==channel.numBuffers) // and wrap it | |
146 channel.writeBuffer=0; | |
147 // printf("WriteBuffer:%d\n", channel.writeBuffer); | |
148 if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ... | |
149 printf("NetworkSend buffer underrun. timestamp: %d :-{\n", | |
150 (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); | |
151 } | |
152 channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag | |
153 #ifdef USE_JUCE | |
154 if(isThreadRunning()==false){ | |
155 startThread(10); | |
156 } | |
157 #else | |
158 if(isThreadRunning()==false){ | |
159 startThread(); | |
160 } | |
161 #endif /* USE_JUCE */ | |
162 } | |
163 if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header | |
164 //set dynamic header values here. Static values are set in setup() and setChannelNumber(). | |
165 channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp | |
166 channel.sampleCount++; | |
167 //add here more header fields | |
168 } | |
169 channel.buffers[channel.writeBuffer][channel.writePointer++]=value; | |
170 // sampleCount++; | |
171 }; | |
172 | |
173 void NetworkSend::setServer(const char *aServer){ | |
174 #ifdef USE_JUCE | |
175 remoteHostname=String::fromUTF8(aServer); | |
176 #else | |
177 udpClient.setServer(aServer); | |
178 #endif /* USE_JUCE */ | |
179 } | |
180 void NetworkSend::setPort(int aPort){ | |
181 #ifdef USE_JUCE | |
182 remotePortNumber=aPort; | |
183 #else | |
184 udpClient.setPort(aPort); | |
185 #endif /* USE_JUCE */ | |
186 } | |
187 | |
188 void NetworkSend::setChannelNumber(int aChannelNumber){ | |
189 channel.channelNumber=aChannelNumber; | |
190 for(int n=0; n < channel.numBuffers; n++){ //initialize the header | |
191 channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber; | |
192 //add here more static header fields | |
193 } | |
194 }; | |
195 int NetworkSend::getChannelNumber(){ | |
196 return channel.channelNumber; | |
197 }; | |
198 | |
199 int NetworkSend::getTimestamp(){ | |
200 return channel.buffers[channel.readBuffer][channel.headerTimestampIndex]; | |
201 } | |
202 | |
203 void NetworkSend::sendData(){ | |
204 if(channel.enabled==false) | |
205 return; | |
206 while(channel.readyToBeSent[channel.readBuffer]==true){ | |
207 channel.readyToBeSent[channel.readBuffer]=false; | |
208 void* sourceBuffer=channel.buffers[channel.readBuffer]; | |
209 // printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]); | |
210 // printf("ReadBuffer:%d\n", channel.readBuffer); | |
211 unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float); | |
212 //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?) | |
213 #ifdef USE_JUCE | |
214 if(1==udpClient.waitUntilReady(0, 5)){ | |
215 udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend); | |
216 channel.doneOnTime[channel.readBuffer]=true; | |
217 // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]); | |
218 } else { | |
219 // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]); | |
220 } | |
221 #else | |
222 udpClient.send(sourceBuffer, numBytesToSend); | |
223 // printf("sent sourceBuffer: %d, channel: %f, timestamp: %f\n", channel.readBuffer, channel.buffers[channel.readBuffer][0], | |
224 // channel.buffers[channel.readBuffer][1]); | |
225 channel.doneOnTime[channel.readBuffer]=true; | |
226 #endif /* USE_JUCE */ | |
227 channel.readBuffer++; | |
228 if(channel.readBuffer==channel.numBuffers) | |
229 channel.readBuffer=0; | |
230 } | |
231 } | |
232 | |
233 void NetworkSend::run(){ | |
234 #ifdef USE_JUCE | |
235 // std::chrono::high_resolution_clock::time_point t1; | |
236 // std::chrono::high_resolution_clock::time_point t2; | |
237 // std::chrono::high_resolution_clock::time_point t3; | |
238 while(threadShouldExit()==false){ | |
239 // t3 = std::chrono::high_resolution_clock::now(); | |
240 // t1 = std::chrono::high_resolution_clock::now(); | |
241 sendData(); | |
242 // t2 = std::chrono::high_resolution_clock::now(); | |
243 // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count(); | |
244 // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count(); | |
245 // if(duration2>0) | |
246 // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n"; | |
247 usleep(1000); | |
248 } | |
249 #else | |
250 threadRunning=true; | |
251 while(threadShouldExit()==false){ | |
252 sendAllData(); | |
253 usleep(sleepTimeMs*1000); | |
254 } | |
255 threadRunning=false; | |
256 #endif | |
257 } |