Mercurial > hg > beaglert
comparison core/NetworkSend.cpp @ 217:c42a6b4dc2d4 mergingClockSync
Recovered some files from ClockSync
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Sat, 13 Feb 2016 04:09:12 +0000 |
parents | |
children | 0183185cab1b |
comparison
equal
deleted
inserted
replaced
216:869f5e703844 | 217:c42a6b4dc2d4 |
---|---|
1 #include "NetworkSend.h" | |
2 | |
3 #ifdef USE_JUCE | |
4 #else | |
5 //initialize the static members of NetworkSend | |
6 bool NetworkSend::staticConstructed=false; | |
7 int NetworkSend::sleepTimeMs; | |
8 bool NetworkSend::threadIsExiting; | |
9 bool NetworkSend::threadRunning; | |
10 std::vector<NetworkSend*> NetworkSend::objAddrs(0); | |
11 AuxiliaryTask NetworkSend::sendDataTask=NULL; | |
12 | |
13 void sendData(){ | |
14 NetworkSend::run(); | |
15 } | |
16 void NetworkSend::staticConstructor(){ | |
17 if(staticConstructed==true) | |
18 return; | |
19 staticConstructed=true; | |
20 threadIsExiting=false; | |
21 threadRunning=false; | |
22 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority | |
23 } | |
24 void NetworkSend::sendAllData(){ | |
25 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){ | |
26 NetworkSend::objAddrs[n]->sendData(); | |
27 } | |
28 } | |
29 int NetworkSend::getNumInstances(){ | |
30 return objAddrs.size(); | |
31 } | |
32 void NetworkSend::startThread(){ | |
33 BeagleRT_scheduleAuxiliaryTask(sendDataTask); | |
34 } | |
35 void NetworkSend::stopThread(){ | |
36 threadIsExiting=true; | |
37 } | |
38 bool NetworkSend::threadShouldExit(){ | |
39 return(gShouldStop || threadIsExiting); | |
40 } | |
41 bool NetworkSend::isThreadRunning(){ | |
42 return threadRunning; | |
43 } | |
44 #endif /* USE_JUCE */ | |
45 | |
46 #ifdef USE_JUCE | |
47 NetworkSend::NetworkSend(const String &threadName): | |
48 Thread(threadName) | |
49 #else | |
50 NetworkSend::NetworkSend() | |
51 #endif /* USE_JUCE */ | |
52 { | |
53 channel.buffers=NULL; | |
54 channel.doneOnTime=NULL; | |
55 channel.readyToBeSent=NULL; | |
56 channel.enabled=false; | |
57 sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable | |
58 channel.sampleCount=0; | |
59 } | |
60 | |
61 NetworkSend::~NetworkSend(){ | |
62 #ifdef USE_JUCE | |
63 stopThread(1000); | |
64 #else | |
65 stopThread(); | |
66 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances; | |
67 if(objAddrs[n]==this){ | |
68 objAddrs.erase(objAddrs.begin()+n); | |
69 break; | |
70 } | |
71 } | |
72 #endif | |
73 dealloc(); | |
74 } | |
75 void NetworkSend::dealloc(){ | |
76 channel.enabled=false; | |
77 if(channel.buffers!=NULL){ | |
78 for(int n=0; n<channel.numBuffers; n++){ | |
79 free(channel.buffers[n]); | |
80 channel.buffers[n]=NULL; | |
81 } | |
82 free(channel.buffers); | |
83 channel.buffers=NULL; | |
84 } | |
85 free(channel.readyToBeSent); | |
86 channel.readyToBeSent=NULL; | |
87 free(channel.doneOnTime); | |
88 channel.doneOnTime=NULL; | |
89 } | |
90 void NetworkSend::cleanup(){ | |
91 dealloc(); | |
92 } | |
93 | |
94 void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){ | |
95 #ifdef USE_JUCE | |
96 #else | |
97 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible | |
98 //because of limitations in BeagleRT_createAuxiliaryTask() | |
99 //keep track of added active instances | |
100 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if | |
101 // an instance of NetworkSend is then declared globally: the constructor gets called, | |
102 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without | |
103 // any destructor being called in between ... | |
104 #endif /* USE_JUCE */ | |
105 cleanup(); | |
106 int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength; | |
107 channel.numBuffers= 1+numSamples/channel.bufferLength; //the +1 takes the ceil() of the division | |
108 channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*)); | |
109 printf("NumBuffers: %d\n", channel.numBuffers); | |
110 if(channel.buffers==NULL) | |
111 return; | |
112 for(int n=0; n<channel.numBuffers; n++){ | |
113 channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float)); | |
114 if(channel.buffers[n]==NULL) | |
115 return; | |
116 } | |
117 channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool)); | |
118 channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool)); | |
119 for(int n=0; n<channel.numBuffers; n++){ | |
120 channel.readyToBeSent[n]=false; | |
121 channel.doneOnTime[n]=true; | |
122 } | |
123 if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL) | |
124 return; | |
125 channel.writePointer=0; | |
126 channel.writeBuffer=0; | |
127 channel.readBuffer=0; | |
128 setChannelNumber(aChannelNumber); | |
129 setPort(aPort); //TODO: check for the return value | |
130 setServer(aServer); //TODO: check for the return value | |
131 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate); | |
132 channel.enabled=true; | |
133 } | |
134 | |
135 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method | |
136 if(channel.enabled==false) | |
137 return; | |
138 if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ... | |
139 channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such | |
140 // printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); | |
141 channel.writePointer=channel.headerLength; //reset the writePointer | |
142 channel.writeBuffer=(channel.writeBuffer+1); //switch buffer | |
143 if(channel.writeBuffer==channel.numBuffers) // and wrap it | |
144 channel.writeBuffer=0; | |
145 // printf("WriteBuffer:%d\n", channel.writeBuffer); | |
146 if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ... | |
147 printf("Network buffer underrun. timestamp: %d :-{\n", | |
148 (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); | |
149 } | |
150 channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag | |
151 #ifdef USE_JUCE | |
152 if(isThreadRunning()==false){ | |
153 startThread(10); | |
154 } | |
155 #else | |
156 if(isThreadRunning()==false){ | |
157 startThread(); | |
158 } | |
159 #endif /* USE_JUCE */ | |
160 } | |
161 if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header | |
162 //set dynamic header values here. Static values are set in setup() and setChannelNumber(). | |
163 channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp | |
164 channel.sampleCount++; | |
165 //add here more header fields | |
166 } | |
167 channel.buffers[channel.writeBuffer][channel.writePointer++]=value; | |
168 // sampleCount++; | |
169 }; | |
170 | |
171 void NetworkSend::setServer(const char *aServer){ | |
172 #ifdef USE_JUCE | |
173 remoteHostname=String::fromUTF8(aServer); | |
174 #else | |
175 udpClient.setServer(aServer); | |
176 #endif /* USE_JUCE */ | |
177 } | |
178 void NetworkSend::setPort(int aPort){ | |
179 #ifdef USE_JUCE | |
180 remotePortNumber=aPort; | |
181 #else | |
182 udpClient.setPort(aPort); | |
183 #endif /* USE_JUCE */ | |
184 } | |
185 | |
186 void NetworkSend::setChannelNumber(int aChannelNumber){ | |
187 channel.channelNumber=aChannelNumber; | |
188 for(int n=0; n<channel.numBuffers; n++){ //initialize the header | |
189 channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber; | |
190 //add here more static header fields | |
191 } | |
192 }; | |
193 int NetworkSend::getChannelNumber(){ | |
194 return channel.channelNumber; | |
195 }; | |
196 | |
197 int NetworkSend::getTimestamp(){ | |
198 return channel.buffers[channel.readBuffer][channel.headerTimestampIndex]; | |
199 } | |
200 | |
201 void NetworkSend::sendData(){ | |
202 if(channel.enabled==false) | |
203 return; | |
204 while(channel.readyToBeSent[channel.readBuffer]==true){ | |
205 channel.readyToBeSent[channel.readBuffer]=false; | |
206 void* sourceBuffer=channel.buffers[channel.readBuffer]; | |
207 // printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]); | |
208 // printf("ReadBuffer:%d\n", channel.readBuffer); | |
209 unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float); | |
210 //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?) | |
211 #ifdef USE_JUCE | |
212 if(1==udpClient.waitUntilReady(0, 5)){ | |
213 udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend); | |
214 channel.doneOnTime[channel.readBuffer]=true; | |
215 // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]); | |
216 } else { | |
217 // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]); | |
218 } | |
219 #else | |
220 udpClient.send(sourceBuffer, numBytesToSend); | |
221 channel.doneOnTime[channel.readBuffer]=true; | |
222 #endif /* USE_JUCE */ | |
223 channel.readBuffer++; | |
224 if(channel.readBuffer==channel.numBuffers) | |
225 channel.readBuffer=0; | |
226 } | |
227 } | |
228 | |
229 void NetworkSend::run(){ | |
230 #ifdef USE_JUCE | |
231 // std::chrono::high_resolution_clock::time_point t1; | |
232 // std::chrono::high_resolution_clock::time_point t2; | |
233 // std::chrono::high_resolution_clock::time_point t3; | |
234 while(threadShouldExit()==false){ | |
235 // t3 = std::chrono::high_resolution_clock::now(); | |
236 // t1 = std::chrono::high_resolution_clock::now(); | |
237 sendData(); | |
238 // t2 = std::chrono::high_resolution_clock::now(); | |
239 // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count(); | |
240 // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count(); | |
241 // if(duration2>0) | |
242 // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n"; | |
243 sleep(1); | |
244 } | |
245 #else | |
246 threadRunning=true; | |
247 while(threadShouldExit()==false){ | |
248 sendAllData(); | |
249 usleep(sleepTimeMs*1000); | |
250 } | |
251 threadRunning=false; | |
252 #endif | |
253 } | |
254 #ifdef USE_JUCE | |
255 #else | |
256 Scope::Scope(int aNumChannels): | |
257 channels(aNumChannels) | |
258 {}; | |
259 Scope::~Scope(){}; | |
260 | |
261 void Scope::log(int channel, float value){ | |
262 if(channel>=getNumChannels()) //TODO: assert this | |
263 return; | |
264 channels[channel].log(value); | |
265 } | |
266 | |
267 void Scope::setup(){ | |
268 setup(44100, 9999, "127.0.0.1"); | |
269 } | |
270 | |
271 void Scope::setup(float sampleRate, int aPort, const char* aServer){ | |
272 for(int n=0; n<getNumChannels(); n++){ | |
273 channels[n].setup(sampleRate, 128, n, aPort, aServer); //TODO: replace 128 with the actual block size | |
274 } | |
275 } | |
276 | |
277 void Scope::setPort(int port){ | |
278 for(int n=0; n<getNumChannels(); n++){ | |
279 channels[n].setPort(port); | |
280 } | |
281 } | |
282 void Scope::setPort(int channel, int aPort){ | |
283 channels[channel].setPort(aPort); | |
284 printf("Channel %d is now sending to port %d\n", channel, aPort); | |
285 } | |
286 | |
287 int Scope::getNumChannels(){ | |
288 return channels.size(); | |
289 } | |
290 | |
291 void Scope::sendData(){ | |
292 NetworkSend::sendAllData(); | |
293 } | |
294 #endif |