comparison core/NetworkSend.cpp @ 131:ff28e56e5b7e scope-refactoring

Updated Network files to match Udpioplugin 18:fb5a61b10223
author Giulio Moro <giuliomoro@yahoo.it>
date Wed, 26 Aug 2015 02:02:10 +0100
parents cce58e6ec2a2
children e24c531220ee
comparison
equal deleted inserted replaced
130:da1c61aa97ea 131:ff28e56e5b7e
1 #include <NetworkSend.h> 1 #include "NetworkSend.h"
2 2
3 #ifdef USE_JUCE 3 #ifdef USE_JUCE
4 #else 4 #else
5 //initialize the static members of NetworkSend 5 //initialize the static members of NetworkSend
6 bool NetworkSend::staticConstructed=false; 6 bool NetworkSend::staticConstructed=false;
7 int NetworkSend::sleepTimeMs;
8 bool NetworkSend::threadIsExiting;
9 bool NetworkSend::threadRunning;
7 std::vector<NetworkSend*> NetworkSend::objAddrs(0); 10 std::vector<NetworkSend*> NetworkSend::objAddrs(0);
8 AuxiliaryTask NetworkSend::sendDataTask=NULL; 11 AuxiliaryTask NetworkSend::sendDataTask=NULL;
9 12
10 void sendData(){ 13 void sendData(){
11 NetworkSend::sendAllData(); 14 NetworkSend::run();
12 } 15 }
13 void NetworkSend::staticConstructor(){ 16 void NetworkSend::staticConstructor(){
14 if(staticConstructed==true) 17 if(staticConstructed==true)
15 return; 18 return;
16 staticConstructed=true; 19 staticConstructed=true;
20 threadIsExiting=false;
21 threadRunning=false;
17 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority 22 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority
18 } 23 }
19 void NetworkSend::sendAllData(){ 24 void NetworkSend::sendAllData(){
20 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){ 25 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){
21 NetworkSend::objAddrs[n]->sendData(); 26 NetworkSend::objAddrs[n]->sendData();
22 } 27 }
23 } 28 }
24 int NetworkSend::getNumInstances(){ 29 int NetworkSend::getNumInstances(){
25 return objAddrs.size(); 30 return objAddrs.size();
26 } 31 }
27 #endif /* USE_JUCE */ 32 void NetworkSend::startThread(){
28 33 BeagleRT_scheduleAuxiliaryTask(sendDataTask);
34 }
35 void NetworkSend::stopThread(){
36 threadIsExiting=true;
37 }
38 bool NetworkSend::threadShouldExit(){
39 return(gShouldStop || threadIsExiting);
40 }
41 bool NetworkSend::isThreadRunning(){
42 return threadRunning;
43 }
44 #endif /* USE_JUCE */
45
46 #ifdef USE_JUCE
47 NetworkSend::NetworkSend(const String &threadName):
48 Thread(threadName)
49 #else
29 NetworkSend::NetworkSend() 50 NetworkSend::NetworkSend()
51 #endif /* USE_JUCE */
30 { 52 {
31 sampleCount = 0; 53 channel.buffers=NULL;
32 channel.doneOnTime=true; 54 channel.doneOnTime=NULL;
33 channel.index=channel.headerLength; //leave space for the heading message (channel, timestamp) 55 channel.readyToBeSent=NULL;
34 channel.activeBuffer=0; 56 channel.enabled=false;
35 channel.readyToBeSent=false; 57 sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable
58 channel.sampleCount=0;
36 } 59 }
37 60
38 NetworkSend::~NetworkSend(){ 61 NetworkSend::~NetworkSend(){
62 #ifdef USE_JUCE
63 stopThread(1000);
64 #else
65 stopThread();
39 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances; 66 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances;
40 if(objAddrs[n]==this){ 67 if(objAddrs[n]==this){
41 objAddrs.erase(objAddrs.begin()+n); 68 objAddrs.erase(objAddrs.begin()+n);
42 break; 69 break;
43 } 70 }
44 } 71 }
45 } 72 #endif
46 73 dealloc();
47 void NetworkSend::setup(float aSampleRate){//TODO: remove this method 74 }
48 setup(aSampleRate, 0, 9999, "192.168.7.1");//channelNumber=0 75 void NetworkSend::dealloc(){
49 } 76 channel.enabled=false;
50 77 if(channel.buffers!=NULL){
51 void NetworkSend::setup(float aSampleRate, int aChannelNumber, int aPort, const char *aServer){ 78 for(int n=0; n<channel.numBuffers; n++){
79 free(channel.buffers[n]);
80 channel.buffers[n]=NULL;
81 }
82 free(channel.buffers);
83 channel.buffers=NULL;
84 }
85 free(channel.readyToBeSent);
86 channel.readyToBeSent=NULL;
87 free(channel.doneOnTime);
88 channel.doneOnTime=NULL;
89 }
90 void NetworkSend::cleanup(){
91 dealloc();
92 }
93
94 void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){
95 #ifdef USE_JUCE
96 #else
52 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible 97 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible
53 //because of limitations in BeagleRT_createAuxiliaryTask() 98 //because of limitations in BeagleRT_createAuxiliaryTask()
54 //keep track of added active instances 99 //keep track of added active instances
55 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if 100 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if
56 // an instance of NetworkSend is then declared globally: the constructor gets called, 101 // an instance of NetworkSend is then declared globally: the constructor gets called,
57 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without 102 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without
58 // any destructor being called in between ... 103 // any destructor being called in between ...
104 #endif /* USE_JUCE */
105 cleanup();
106 int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength;
107 channel.numBuffers= 1+numSamples/channel.bufferLength; //the +1 takes the ceil() of the division
108 channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*));
109 printf("NumBuffers: %d\n", channel.numBuffers);
110 if(channel.buffers==NULL)
111 return;
112 for(int n=0; n<channel.numBuffers; n++){
113 channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float));
114 if(channel.buffers[n]==NULL)
115 return;
116 }
117 channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool));
118 channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool));
119 for(int n=0; n<channel.numBuffers; n++){
120 channel.readyToBeSent[n]=false;
121 channel.doneOnTime[n]=true;
122 }
123 if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL)
124 return;
125 channel.writePointer=0;
126 channel.writeBuffer=0;
127 channel.readBuffer=0;
59 setChannelNumber(aChannelNumber); 128 setChannelNumber(aChannelNumber);
60 setPort(aPort); 129 setPort(aPort); //TODO: check for the return value
61 setServer(aServer); 130 setServer(aServer); //TODO: check for the return value
62 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate); 131 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate);
132 channel.enabled=true;
63 } 133 }
64 134
65 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method 135 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method
66 if(channel.index==(NETWORK_AUDIO_BUFFER_SIZE)){ // when the buffer is ready ... 136 if(channel.enabled==false)
67 channel.readyToBeSent=true; 137 return;
68 channel.index=channel.headerLength; //reset the counter 138 if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ...
69 if(channel.doneOnTime==false){ 139 channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such
70 printf("Network buffer underrun. timestamp: %d :-{\n", (int)channel.buffers[!channel.activeBuffer][1]); 140 // printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
71 } 141 channel.writePointer=channel.headerLength; //reset the writePointer
72 channel.activeBuffer=!channel.activeBuffer; //switch buffer 142 channel.writeBuffer=(channel.writeBuffer+1); //switch buffer
73 channel.doneOnTime=false; 143 if(channel.writeBuffer==channel.numBuffers) // and wrap it
74 BeagleRT_scheduleAuxiliaryTask(NetworkSend::sendDataTask); //send the buffer 144 channel.writeBuffer=0;
75 // TODO: maybe we should have transmitAudioTask running in a loop instead of scheduling it multiple times? 145 // printf("WriteBuffer:%d\n", channel.writeBuffer);
76 // The current solution allows to minimize latency when a single channel is used, as there is no inherent 146 if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ...
77 // rt_task_sleep in the thread, as we are signaling it every time. 147 printf("Network buffer underrun. timestamp: %d :-{\n",
78 // Although, there is a possible race condition: if the auxiliaryTask is scheduled by channel 0, 148 (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
79 // it might still be executing when channel 1 schedules it. But if the AuxTask has already skipped 149 }
80 // over channel 1, then we are at risk that channel 1 never gets sent. 150 channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag
81 } 151 #ifdef USE_JUCE
82 if(channel.index==channel.headerLength){ 152 if(isThreadRunning()==false){
83 channel.buffers[channel.activeBuffer][0] = (float)channel.channelNumber; //TODO: this could actually be done just once in setup() 153 startThread(10);
84 channel.buffers[channel.activeBuffer][1]=(float)sampleCount; //timestamp 154 }
155 #else
156 if(isThreadRunning()==false){
157 startThread();
158 }
159 #endif /* USE_JUCE */
160 }
161 if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header
162 //set dynamic header values here. Static values are set in setup() and setChannelNumber().
163 channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp
164 channel.sampleCount++;
85 //add here more header fields 165 //add here more header fields
86 } 166 }
87 channel.buffers[channel.activeBuffer][channel.index++]=value; 167 channel.buffers[channel.writeBuffer][channel.writePointer++]=value;
88 sampleCount++; 168 // sampleCount++;
89 }; 169 };
90 170
91 void NetworkSend::setServer(const char *aServer){ 171 void NetworkSend::setServer(const char *aServer){
172 #ifdef USE_JUCE
173 remoteHostname=String::fromUTF8(aServer);
174 #else
92 udpClient.setServer(aServer); 175 udpClient.setServer(aServer);
176 #endif /* USE_JUCE */
93 } 177 }
94 void NetworkSend::setPort(int aPort){ 178 void NetworkSend::setPort(int aPort){
179 #ifdef USE_JUCE
180 remotePortNumber=aPort;
181 #else
95 udpClient.setPort(aPort); 182 udpClient.setPort(aPort);
183 #endif /* USE_JUCE */
96 } 184 }
97 185
98 void NetworkSend::setChannelNumber(int aChannelNumber){ 186 void NetworkSend::setChannelNumber(int aChannelNumber){
99 channel.channelNumber=aChannelNumber; 187 channel.channelNumber=aChannelNumber;
188 for(int n=0; n<channel.numBuffers; n++){ //initialize the header
189 channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber;
190 //add here more static header fields
191 }
100 }; 192 };
101 int NetworkSend::getChannelNumber(){ 193 int NetworkSend::getChannelNumber(){
102 return channel.channelNumber; 194 return channel.channelNumber;
103 }; 195 };
104 196
105 void NetworkSend::sendData(){ 197 void NetworkSend::sendData(){
106 if(channel.readyToBeSent){ 198 if(channel.enabled==false)
107 channel.readyToBeSent=false; 199 return;
108 udpClient.send( 200 while(channel.readyToBeSent[channel.readBuffer]==true){
109 channel.buffers[!channel.activeBuffer], 201 channel.readyToBeSent[channel.readBuffer]=false;
110 NETWORK_AUDIO_BUFFER_SIZE*sizeof(float) 202 void* sourceBuffer=channel.buffers[channel.readBuffer];
111 ); 203 // printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]);
112 channel.doneOnTime=true; 204 // printf("ReadBuffer:%d\n", channel.readBuffer);
113 } 205 unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float);
114 } 206 //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?)
115 207 #ifdef USE_JUCE
208 if(1==udpClient.waitUntilReady(0, 5)){
209 udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend);
210 channel.doneOnTime[channel.readBuffer]=true;
211 // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
212 } else {
213 // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
214 }
215 #else
216 udpClient.send(sourceBuffer, numBytesToSend);
217 channel.doneOnTime[channel.readBuffer]=true;
218 #endif /* USE_JUCE */
219 channel.readBuffer++;
220 if(channel.readBuffer==channel.numBuffers)
221 channel.readBuffer=0;
222 }
223 }
224
225 void NetworkSend::run(){
226 #ifdef USE_JUCE
227 // std::chrono::high_resolution_clock::time_point t1;
228 // std::chrono::high_resolution_clock::time_point t2;
229 // std::chrono::high_resolution_clock::time_point t3;
230 while(threadShouldExit()==false){
231 // t3 = std::chrono::high_resolution_clock::now();
232 // t1 = std::chrono::high_resolution_clock::now();
233 sendData();
234 // t2 = std::chrono::high_resolution_clock::now();
235 // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count();
236 // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
237 // if(duration2>0)
238 // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n";
239 sleep(1);
240 }
241 #else
242 threadRunning=true;
243 while(threadShouldExit()==false){
244 sendAllData();
245 usleep(sleepTimeMs*1000);
246 }
247 threadRunning=false;
248 #endif
249 }
116 #ifdef USE_JUCE 250 #ifdef USE_JUCE
117 #else 251 #else
118 Scope::Scope(int aNumChannels): 252 Scope::Scope(int aNumChannels):
119 channels(aNumChannels) 253 channels(aNumChannels)
120 {}; 254 {};
130 setup(44100, 9999, "127.0.0.1"); 264 setup(44100, 9999, "127.0.0.1");
131 } 265 }
132 266
133 void Scope::setup(float sampleRate, int aPort, const char* aServer){ 267 void Scope::setup(float sampleRate, int aPort, const char* aServer){
134 for(int n=0; n<getNumChannels(); n++){ 268 for(int n=0; n<getNumChannels(); n++){
135 channels[n].setup(sampleRate, n, aPort, aServer); 269 channels[n].setup(sampleRate, 128, n, aPort, aServer); //TODO: replace 128 with the actual block size
136 } 270 }
137 } 271 }
138 272
139 void Scope::setPort(int port){ 273 void Scope::setPort(int port){
140 for(int n=0; n<getNumChannels(); n++){ 274 for(int n=0; n<getNumChannels(); n++){