Mercurial > hg > beaglert
comparison core/NetworkSend.cpp @ 131:ff28e56e5b7e scope-refactoring
Updated Network files to match Udpioplugin 18:fb5a61b10223
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Wed, 26 Aug 2015 02:02:10 +0100 |
parents | cce58e6ec2a2 |
children | e24c531220ee |
comparison
equal
deleted
inserted
replaced
130:da1c61aa97ea | 131:ff28e56e5b7e |
---|---|
1 #include <NetworkSend.h> | 1 #include "NetworkSend.h" |
2 | 2 |
3 #ifdef USE_JUCE | 3 #ifdef USE_JUCE |
4 #else | 4 #else |
5 //initialize the static members of NetworkSend | 5 //initialize the static members of NetworkSend |
6 bool NetworkSend::staticConstructed=false; | 6 bool NetworkSend::staticConstructed=false; |
7 int NetworkSend::sleepTimeMs; | |
8 bool NetworkSend::threadIsExiting; | |
9 bool NetworkSend::threadRunning; | |
7 std::vector<NetworkSend*> NetworkSend::objAddrs(0); | 10 std::vector<NetworkSend*> NetworkSend::objAddrs(0); |
8 AuxiliaryTask NetworkSend::sendDataTask=NULL; | 11 AuxiliaryTask NetworkSend::sendDataTask=NULL; |
9 | 12 |
10 void sendData(){ | 13 void sendData(){ |
11 NetworkSend::sendAllData(); | 14 NetworkSend::run(); |
12 } | 15 } |
13 void NetworkSend::staticConstructor(){ | 16 void NetworkSend::staticConstructor(){ |
14 if(staticConstructed==true) | 17 if(staticConstructed==true) |
15 return; | 18 return; |
16 staticConstructed=true; | 19 staticConstructed=true; |
20 threadIsExiting=false; | |
21 threadRunning=false; | |
17 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority | 22 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority |
18 } | 23 } |
19 void NetworkSend::sendAllData(){ | 24 void NetworkSend::sendAllData(){ |
20 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){ | 25 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){ |
21 NetworkSend::objAddrs[n]->sendData(); | 26 NetworkSend::objAddrs[n]->sendData(); |
22 } | 27 } |
23 } | 28 } |
24 int NetworkSend::getNumInstances(){ | 29 int NetworkSend::getNumInstances(){ |
25 return objAddrs.size(); | 30 return objAddrs.size(); |
26 } | 31 } |
27 #endif /* USE_JUCE */ | 32 void NetworkSend::startThread(){ |
28 | 33 BeagleRT_scheduleAuxiliaryTask(sendDataTask); |
34 } | |
35 void NetworkSend::stopThread(){ | |
36 threadIsExiting=true; | |
37 } | |
38 bool NetworkSend::threadShouldExit(){ | |
39 return(gShouldStop || threadIsExiting); | |
40 } | |
41 bool NetworkSend::isThreadRunning(){ | |
42 return threadRunning; | |
43 } | |
44 #endif /* USE_JUCE */ | |
45 | |
46 #ifdef USE_JUCE | |
47 NetworkSend::NetworkSend(const String &threadName): | |
48 Thread(threadName) | |
49 #else | |
29 NetworkSend::NetworkSend() | 50 NetworkSend::NetworkSend() |
51 #endif /* USE_JUCE */ | |
30 { | 52 { |
31 sampleCount = 0; | 53 channel.buffers=NULL; |
32 channel.doneOnTime=true; | 54 channel.doneOnTime=NULL; |
33 channel.index=channel.headerLength; //leave space for the heading message (channel, timestamp) | 55 channel.readyToBeSent=NULL; |
34 channel.activeBuffer=0; | 56 channel.enabled=false; |
35 channel.readyToBeSent=false; | 57 sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable |
58 channel.sampleCount=0; | |
36 } | 59 } |
37 | 60 |
38 NetworkSend::~NetworkSend(){ | 61 NetworkSend::~NetworkSend(){ |
62 #ifdef USE_JUCE | |
63 stopThread(1000); | |
64 #else | |
65 stopThread(); | |
39 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances; | 66 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances; |
40 if(objAddrs[n]==this){ | 67 if(objAddrs[n]==this){ |
41 objAddrs.erase(objAddrs.begin()+n); | 68 objAddrs.erase(objAddrs.begin()+n); |
42 break; | 69 break; |
43 } | 70 } |
44 } | 71 } |
45 } | 72 #endif |
46 | 73 dealloc(); |
47 void NetworkSend::setup(float aSampleRate){//TODO: remove this method | 74 } |
48 setup(aSampleRate, 0, 9999, "192.168.7.1");//channelNumber=0 | 75 void NetworkSend::dealloc(){ |
49 } | 76 channel.enabled=false; |
50 | 77 if(channel.buffers!=NULL){ |
51 void NetworkSend::setup(float aSampleRate, int aChannelNumber, int aPort, const char *aServer){ | 78 for(int n=0; n<channel.numBuffers; n++){ |
79 free(channel.buffers[n]); | |
80 channel.buffers[n]=NULL; | |
81 } | |
82 free(channel.buffers); | |
83 channel.buffers=NULL; | |
84 } | |
85 free(channel.readyToBeSent); | |
86 channel.readyToBeSent=NULL; | |
87 free(channel.doneOnTime); | |
88 channel.doneOnTime=NULL; | |
89 } | |
90 void NetworkSend::cleanup(){ | |
91 dealloc(); | |
92 } | |
93 | |
94 void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){ | |
95 #ifdef USE_JUCE | |
96 #else | |
52 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible | 97 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible |
53 //because of limitations in BeagleRT_createAuxiliaryTask() | 98 //because of limitations in BeagleRT_createAuxiliaryTask() |
54 //keep track of added active instances | 99 //keep track of added active instances |
55 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if | 100 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if |
56 // an instance of NetworkSend is then declared globally: the constructor gets called, | 101 // an instance of NetworkSend is then declared globally: the constructor gets called, |
57 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without | 102 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without |
58 // any destructor being called in between ... | 103 // any destructor being called in between ... |
104 #endif /* USE_JUCE */ | |
105 cleanup(); | |
106 int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength; | |
107 channel.numBuffers= 1+numSamples/channel.bufferLength; //the +1 takes the ceil() of the division | |
108 channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*)); | |
109 printf("NumBuffers: %d\n", channel.numBuffers); | |
110 if(channel.buffers==NULL) | |
111 return; | |
112 for(int n=0; n<channel.numBuffers; n++){ | |
113 channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float)); | |
114 if(channel.buffers[n]==NULL) | |
115 return; | |
116 } | |
117 channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool)); | |
118 channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool)); | |
119 for(int n=0; n<channel.numBuffers; n++){ | |
120 channel.readyToBeSent[n]=false; | |
121 channel.doneOnTime[n]=true; | |
122 } | |
123 if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL) | |
124 return; | |
125 channel.writePointer=0; | |
126 channel.writeBuffer=0; | |
127 channel.readBuffer=0; | |
59 setChannelNumber(aChannelNumber); | 128 setChannelNumber(aChannelNumber); |
60 setPort(aPort); | 129 setPort(aPort); //TODO: check for the return value |
61 setServer(aServer); | 130 setServer(aServer); //TODO: check for the return value |
62 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate); | 131 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate); |
132 channel.enabled=true; | |
63 } | 133 } |
64 | 134 |
65 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method | 135 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method |
66 if(channel.index==(NETWORK_AUDIO_BUFFER_SIZE)){ // when the buffer is ready ... | 136 if(channel.enabled==false) |
67 channel.readyToBeSent=true; | 137 return; |
68 channel.index=channel.headerLength; //reset the counter | 138 if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ... |
69 if(channel.doneOnTime==false){ | 139 channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such |
70 printf("Network buffer underrun. timestamp: %d :-{\n", (int)channel.buffers[!channel.activeBuffer][1]); | 140 // printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); |
71 } | 141 channel.writePointer=channel.headerLength; //reset the writePointer |
72 channel.activeBuffer=!channel.activeBuffer; //switch buffer | 142 channel.writeBuffer=(channel.writeBuffer+1); //switch buffer |
73 channel.doneOnTime=false; | 143 if(channel.writeBuffer==channel.numBuffers) // and wrap it |
74 BeagleRT_scheduleAuxiliaryTask(NetworkSend::sendDataTask); //send the buffer | 144 channel.writeBuffer=0; |
75 // TODO: maybe we should have transmitAudioTask running in a loop instead of scheduling it multiple times? | 145 // printf("WriteBuffer:%d\n", channel.writeBuffer); |
76 // The current solution allows to minimize latency when a single channel is used, as there is no inherent | 146 if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ... |
77 // rt_task_sleep in the thread, as we are signaling it every time. | 147 printf("Network buffer underrun. timestamp: %d :-{\n", |
78 // Although, there is a possible race condition: if the auxiliaryTask is scheduled by channel 0, | 148 (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); |
79 // it might still be executing when channel 1 schedules it. But if the AuxTask has already skipped | 149 } |
80 // over channel 1, then we are at risk that channel 1 never gets sent. | 150 channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag |
81 } | 151 #ifdef USE_JUCE |
82 if(channel.index==channel.headerLength){ | 152 if(isThreadRunning()==false){ |
83 channel.buffers[channel.activeBuffer][0] = (float)channel.channelNumber; //TODO: this could actually be done just once in setup() | 153 startThread(10); |
84 channel.buffers[channel.activeBuffer][1]=(float)sampleCount; //timestamp | 154 } |
155 #else | |
156 if(isThreadRunning()==false){ | |
157 startThread(); | |
158 } | |
159 #endif /* USE_JUCE */ | |
160 } | |
161 if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header | |
162 //set dynamic header values here. Static values are set in setup() and setChannelNumber(). | |
163 channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp | |
164 channel.sampleCount++; | |
85 //add here more header fields | 165 //add here more header fields |
86 } | 166 } |
87 channel.buffers[channel.activeBuffer][channel.index++]=value; | 167 channel.buffers[channel.writeBuffer][channel.writePointer++]=value; |
88 sampleCount++; | 168 // sampleCount++; |
89 }; | 169 }; |
90 | 170 |
91 void NetworkSend::setServer(const char *aServer){ | 171 void NetworkSend::setServer(const char *aServer){ |
172 #ifdef USE_JUCE | |
173 remoteHostname=String::fromUTF8(aServer); | |
174 #else | |
92 udpClient.setServer(aServer); | 175 udpClient.setServer(aServer); |
176 #endif /* USE_JUCE */ | |
93 } | 177 } |
94 void NetworkSend::setPort(int aPort){ | 178 void NetworkSend::setPort(int aPort){ |
179 #ifdef USE_JUCE | |
180 remotePortNumber=aPort; | |
181 #else | |
95 udpClient.setPort(aPort); | 182 udpClient.setPort(aPort); |
183 #endif /* USE_JUCE */ | |
96 } | 184 } |
97 | 185 |
98 void NetworkSend::setChannelNumber(int aChannelNumber){ | 186 void NetworkSend::setChannelNumber(int aChannelNumber){ |
99 channel.channelNumber=aChannelNumber; | 187 channel.channelNumber=aChannelNumber; |
188 for(int n=0; n<channel.numBuffers; n++){ //initialize the header | |
189 channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber; | |
190 //add here more static header fields | |
191 } | |
100 }; | 192 }; |
101 int NetworkSend::getChannelNumber(){ | 193 int NetworkSend::getChannelNumber(){ |
102 return channel.channelNumber; | 194 return channel.channelNumber; |
103 }; | 195 }; |
104 | 196 |
105 void NetworkSend::sendData(){ | 197 void NetworkSend::sendData(){ |
106 if(channel.readyToBeSent){ | 198 if(channel.enabled==false) |
107 channel.readyToBeSent=false; | 199 return; |
108 udpClient.send( | 200 while(channel.readyToBeSent[channel.readBuffer]==true){ |
109 channel.buffers[!channel.activeBuffer], | 201 channel.readyToBeSent[channel.readBuffer]=false; |
110 NETWORK_AUDIO_BUFFER_SIZE*sizeof(float) | 202 void* sourceBuffer=channel.buffers[channel.readBuffer]; |
111 ); | 203 // printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]); |
112 channel.doneOnTime=true; | 204 // printf("ReadBuffer:%d\n", channel.readBuffer); |
113 } | 205 unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float); |
114 } | 206 //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?) |
115 | 207 #ifdef USE_JUCE |
208 if(1==udpClient.waitUntilReady(0, 5)){ | |
209 udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend); | |
210 channel.doneOnTime[channel.readBuffer]=true; | |
211 // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]); | |
212 } else { | |
213 // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]); | |
214 } | |
215 #else | |
216 udpClient.send(sourceBuffer, numBytesToSend); | |
217 channel.doneOnTime[channel.readBuffer]=true; | |
218 #endif /* USE_JUCE */ | |
219 channel.readBuffer++; | |
220 if(channel.readBuffer==channel.numBuffers) | |
221 channel.readBuffer=0; | |
222 } | |
223 } | |
224 | |
225 void NetworkSend::run(){ | |
226 #ifdef USE_JUCE | |
227 // std::chrono::high_resolution_clock::time_point t1; | |
228 // std::chrono::high_resolution_clock::time_point t2; | |
229 // std::chrono::high_resolution_clock::time_point t3; | |
230 while(threadShouldExit()==false){ | |
231 // t3 = std::chrono::high_resolution_clock::now(); | |
232 // t1 = std::chrono::high_resolution_clock::now(); | |
233 sendData(); | |
234 // t2 = std::chrono::high_resolution_clock::now(); | |
235 // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count(); | |
236 // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count(); | |
237 // if(duration2>0) | |
238 // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n"; | |
239 sleep(1); | |
240 } | |
241 #else | |
242 threadRunning=true; | |
243 while(threadShouldExit()==false){ | |
244 sendAllData(); | |
245 usleep(sleepTimeMs*1000); | |
246 } | |
247 threadRunning=false; | |
248 #endif | |
249 } | |
116 #ifdef USE_JUCE | 250 #ifdef USE_JUCE |
117 #else | 251 #else |
118 Scope::Scope(int aNumChannels): | 252 Scope::Scope(int aNumChannels): |
119 channels(aNumChannels) | 253 channels(aNumChannels) |
120 {}; | 254 {}; |
130 setup(44100, 9999, "127.0.0.1"); | 264 setup(44100, 9999, "127.0.0.1"); |
131 } | 265 } |
132 | 266 |
133 void Scope::setup(float sampleRate, int aPort, const char* aServer){ | 267 void Scope::setup(float sampleRate, int aPort, const char* aServer){ |
134 for(int n=0; n<getNumChannels(); n++){ | 268 for(int n=0; n<getNumChannels(); n++){ |
135 channels[n].setup(sampleRate, n, aPort, aServer); | 269 channels[n].setup(sampleRate, 128, n, aPort, aServer); //TODO: replace 128 with the actual block size |
136 } | 270 } |
137 } | 271 } |
138 | 272 |
139 void Scope::setPort(int port){ | 273 void Scope::setPort(int port){ |
140 for(int n=0; n<getNumChannels(); n++){ | 274 for(int n=0; n<getNumChannels(); n++){ |