comparison core/NetworkSend.cpp @ 235:3d41a6fa1830

Merge
author Giulio Moro <giuliomoro@yahoo.it>
date Sun, 10 Apr 2016 04:08:06 +0200
parents 6a23c07d0fbb
children e4392164b458
comparison
equal deleted inserted replaced
216:869f5e703844 235:3d41a6fa1830
1 #include "NetworkSend.h"
2
3 #ifdef USE_JUCE
4 #else
5 //initialize the static members of NetworkSend
6 bool NetworkSend::staticConstructed=false;
7 int NetworkSend::sleepTimeMs;
8 bool NetworkSend::threadIsExiting;
9 bool NetworkSend::threadRunning;
10 std::vector<NetworkSend*> NetworkSend::objAddrs(0);
11 AuxiliaryTask NetworkSend::sendDataTask=NULL;
12
13 void sendData(){
14 NetworkSend::run();
15 }
16 void NetworkSend::staticConstructor(){
17 if(staticConstructed==true)
18 return;
19 staticConstructed=true;
20 threadIsExiting=false;
21 threadRunning=false;
22 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority
23 }
24 void NetworkSend::sendAllData(){
25 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){
26 NetworkSend::objAddrs[n]->sendData();
27 }
28 }
29 int NetworkSend::getNumInstances(){
30 return objAddrs.size();
31 }
32 void NetworkSend::startThread(){
33 BeagleRT_scheduleAuxiliaryTask(sendDataTask);
34 }
35 void NetworkSend::stopThread(){
36 threadIsExiting=true;
37 }
38 bool NetworkSend::threadShouldExit(){
39 return(gShouldStop || threadIsExiting);
40 }
41 bool NetworkSend::isThreadRunning(){
42 return threadRunning;
43 }
44 #endif /* USE_JUCE */
45
46 #ifdef USE_JUCE
47 NetworkSend::NetworkSend(const String &threadName):
48 Thread(threadName)
49 #else
50 NetworkSend::NetworkSend()
51 #endif /* USE_JUCE */
52 {
53 channel.buffers=NULL;
54 channel.doneOnTime=NULL;
55 channel.readyToBeSent=NULL;
56 channel.enabled=false;
57 sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable
58 channel.sampleCount=0;
59 }
60
61 NetworkSend::~NetworkSend(){
62 #ifdef USE_JUCE
63 stopThread(1000);
64 #else
65 stopThread();
66 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances;
67 if(objAddrs[n]==this){
68 objAddrs.erase(objAddrs.begin()+n);
69 break;
70 }
71 }
72 #endif
73 dealloc();
74 }
75 void NetworkSend::dealloc(){
76 channel.enabled=false;
77 if(channel.buffers!=NULL){
78 for(int n=0; n<channel.numBuffers; n++){
79 free(channel.buffers[n]);
80 channel.buffers[n]=NULL;
81 }
82 free(channel.buffers);
83 channel.buffers=NULL;
84 }
85 free(channel.readyToBeSent);
86 channel.readyToBeSent=NULL;
87 free(channel.doneOnTime);
88 channel.doneOnTime=NULL;
89 }
90 void NetworkSend::cleanup(){
91 dealloc();
92 }
93
94 void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){
95 #ifdef USE_JUCE
96 #else
97 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible
98 //because of limitations in BeagleRT_createAuxiliaryTask()
99 //keep track of added active instances
100 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if
101 // an instance of NetworkSend is then declared globally: the constructor gets called,
102 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without
103 // any destructor being called in between ... Have a look here
104 // http://stackoverflow.com/questions/7542054/global-vector-emptying-itself-between-calls .
105 // and maybe use accessor function instead of global, as was done in #1374
106 #endif /* USE_JUCE */
107 cleanup();
108 int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength;
109 channel.numBuffers= (1+numSamples/channel.bufferLength) * 3; //the +1 takes the ceil() of the division
110 channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*));
111 printf("NumBuffers: %d\n", channel.numBuffers);
112 if(channel.buffers==NULL)
113 return;
114 for(int n=0; n<channel.numBuffers; n++){
115 channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float));
116 if(channel.buffers[n]==NULL)
117 return;
118 }
119 channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool));
120 channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool));
121 for(int n=0; n<channel.numBuffers; n++){
122 channel.readyToBeSent[n]=false;
123 channel.doneOnTime[n]=true;
124 }
125 if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL)
126 return;
127 channel.writePointer=channel.headerLength;
128 channel.writeBuffer=0;
129 channel.readBuffer=0;
130 setChannelNumber(aChannelNumber);
131 setPort(aPort); //TODO: check for the return value
132 setServer(aServer); //TODO: check for the return value
133 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate);
134 channel.enabled=true;
135 }
136
137 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method
138 if(channel.enabled==false)
139 return;
140 if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ...
141 channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such
142 // printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
143 channel.writePointer=channel.headerLength; //reset the writePointer
144 channel.writeBuffer=(channel.writeBuffer+1); //switch buffer
145 if(channel.writeBuffer==channel.numBuffers) // and wrap it
146 channel.writeBuffer=0;
147 // printf("WriteBuffer:%d\n", channel.writeBuffer);
148 if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ...
149 printf("NetworkSend buffer underrun. timestamp: %d :-{\n",
150 (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
151 }
152 channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag
153 #ifdef USE_JUCE
154 if(isThreadRunning()==false){
155 startThread(10);
156 }
157 #else
158 if(isThreadRunning()==false){
159 startThread();
160 }
161 #endif /* USE_JUCE */
162 }
163 if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header
164 //set dynamic header values here. Static values are set in setup() and setChannelNumber().
165 channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp
166 channel.sampleCount++;
167 //add here more header fields
168 }
169 channel.buffers[channel.writeBuffer][channel.writePointer++]=value;
170 // sampleCount++;
171 };
172
173 void NetworkSend::setServer(const char *aServer){
174 #ifdef USE_JUCE
175 remoteHostname=String::fromUTF8(aServer);
176 #else
177 udpClient.setServer(aServer);
178 #endif /* USE_JUCE */
179 }
180 void NetworkSend::setPort(int aPort){
181 #ifdef USE_JUCE
182 remotePortNumber=aPort;
183 #else
184 udpClient.setPort(aPort);
185 #endif /* USE_JUCE */
186 }
187
188 void NetworkSend::setChannelNumber(int aChannelNumber){
189 channel.channelNumber=aChannelNumber;
190 for(int n=0; n < channel.numBuffers; n++){ //initialize the header
191 channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber;
192 //add here more static header fields
193 }
194 };
195 int NetworkSend::getChannelNumber(){
196 return channel.channelNumber;
197 };
198
199 int NetworkSend::getTimestamp(){
200 return channel.buffers[channel.readBuffer][channel.headerTimestampIndex];
201 }
202
203 void NetworkSend::sendData(){
204 if(channel.enabled==false)
205 return;
206 while(channel.readyToBeSent[channel.readBuffer]==true){
207 channel.readyToBeSent[channel.readBuffer]=false;
208 void* sourceBuffer=channel.buffers[channel.readBuffer];
209 // printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]);
210 // printf("ReadBuffer:%d\n", channel.readBuffer);
211 unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float);
212 //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?)
213 #ifdef USE_JUCE
214 if(1==udpClient.waitUntilReady(0, 5)){
215 udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend);
216 channel.doneOnTime[channel.readBuffer]=true;
217 // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
218 } else {
219 // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
220 }
221 #else
222 udpClient.send(sourceBuffer, numBytesToSend);
223 // printf("sent sourceBuffer: %d, channel: %f, timestamp: %f\n", channel.readBuffer, channel.buffers[channel.readBuffer][0],
224 // channel.buffers[channel.readBuffer][1]);
225 channel.doneOnTime[channel.readBuffer]=true;
226 #endif /* USE_JUCE */
227 channel.readBuffer++;
228 if(channel.readBuffer==channel.numBuffers)
229 channel.readBuffer=0;
230 }
231 }
232
233 void NetworkSend::run(){
234 #ifdef USE_JUCE
235 // std::chrono::high_resolution_clock::time_point t1;
236 // std::chrono::high_resolution_clock::time_point t2;
237 // std::chrono::high_resolution_clock::time_point t3;
238 while(threadShouldExit()==false){
239 // t3 = std::chrono::high_resolution_clock::now();
240 // t1 = std::chrono::high_resolution_clock::now();
241 sendData();
242 // t2 = std::chrono::high_resolution_clock::now();
243 // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count();
244 // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
245 // if(duration2>0)
246 // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n";
247 usleep(1000);
248 }
249 #else
250 threadRunning=true;
251 while(threadShouldExit()==false){
252 sendAllData();
253 usleep(sleepTimeMs*1000);
254 }
255 threadRunning=false;
256 #endif
257 }