annotate core/NetworkSend.cpp @ 139:4e2dd3eb1d28 ClockSync

The reported offset is now meaningful. The whole thing is waaay too jittery.
author Giulio Moro <giuliomoro@yahoo.it>
date Sun, 13 Sep 2015 21:34:47 +0100
parents e24c531220ee
children
rev   line source
giuliomoro@131 1 #include "NetworkSend.h"
giuliomoro@111 2
giuliomoro@129 3 #ifdef USE_JUCE
giuliomoro@129 4 #else
giuliomoro@111 5 //initialize the static members of NetworkSend
giuliomoro@111 6 bool NetworkSend::staticConstructed=false;
giuliomoro@131 7 int NetworkSend::sleepTimeMs;
giuliomoro@131 8 bool NetworkSend::threadIsExiting;
giuliomoro@131 9 bool NetworkSend::threadRunning;
giuliomoro@111 10 std::vector<NetworkSend*> NetworkSend::objAddrs(0);
giuliomoro@116 11 AuxiliaryTask NetworkSend::sendDataTask=NULL;
giuliomoro@111 12
giuliomoro@129 13 void sendData(){
giuliomoro@131 14 NetworkSend::run();
giuliomoro@111 15 }
giuliomoro@129 16 void NetworkSend::staticConstructor(){
giuliomoro@129 17 if(staticConstructed==true)
giuliomoro@129 18 return;
giuliomoro@129 19 staticConstructed=true;
giuliomoro@131 20 threadIsExiting=false;
giuliomoro@131 21 threadRunning=false;
giuliomoro@129 22 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority
giuliomoro@129 23 }
giuliomoro@111 24 void NetworkSend::sendAllData(){
giuliomoro@111 25 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){
giuliomoro@111 26 NetworkSend::objAddrs[n]->sendData();
giuliomoro@111 27 }
giuliomoro@111 28 }
giuliomoro@129 29 int NetworkSend::getNumInstances(){
giuliomoro@129 30 return objAddrs.size();
giuliomoro@129 31 }
giuliomoro@131 32 void NetworkSend::startThread(){
giuliomoro@131 33 BeagleRT_scheduleAuxiliaryTask(sendDataTask);
giuliomoro@131 34 }
giuliomoro@131 35 void NetworkSend::stopThread(){
giuliomoro@131 36 threadIsExiting=true;
giuliomoro@131 37 }
giuliomoro@131 38 bool NetworkSend::threadShouldExit(){
giuliomoro@131 39 return(gShouldStop || threadIsExiting);
giuliomoro@131 40 }
giuliomoro@131 41 bool NetworkSend::isThreadRunning(){
giuliomoro@131 42 return threadRunning;
giuliomoro@131 43 }
giuliomoro@129 44 #endif /* USE_JUCE */
giuliomoro@111 45
giuliomoro@131 46 #ifdef USE_JUCE
giuliomoro@131 47 NetworkSend::NetworkSend(const String &threadName):
giuliomoro@131 48 Thread(threadName)
giuliomoro@131 49 #else
giuliomoro@111 50 NetworkSend::NetworkSend()
giuliomoro@131 51 #endif /* USE_JUCE */
giuliomoro@111 52 {
giuliomoro@131 53 channel.buffers=NULL;
giuliomoro@131 54 channel.doneOnTime=NULL;
giuliomoro@131 55 channel.readyToBeSent=NULL;
giuliomoro@131 56 channel.enabled=false;
giuliomoro@131 57 sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable
giuliomoro@131 58 channel.sampleCount=0;
giuliomoro@111 59 }
giuliomoro@116 60
giuliomoro@111 61 NetworkSend::~NetworkSend(){
giuliomoro@131 62 #ifdef USE_JUCE
giuliomoro@131 63 stopThread(1000);
giuliomoro@131 64 #else
giuliomoro@131 65 stopThread();
giuliomoro@111 66 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances;
giuliomoro@111 67 if(objAddrs[n]==this){
giuliomoro@111 68 objAddrs.erase(objAddrs.begin()+n);
giuliomoro@111 69 break;
giuliomoro@111 70 }
giuliomoro@111 71 }
giuliomoro@131 72 #endif
giuliomoro@131 73 dealloc();
giuliomoro@131 74 }
giuliomoro@131 75 void NetworkSend::dealloc(){
giuliomoro@131 76 channel.enabled=false;
giuliomoro@131 77 if(channel.buffers!=NULL){
giuliomoro@131 78 for(int n=0; n<channel.numBuffers; n++){
giuliomoro@131 79 free(channel.buffers[n]);
giuliomoro@131 80 channel.buffers[n]=NULL;
giuliomoro@131 81 }
giuliomoro@131 82 free(channel.buffers);
giuliomoro@131 83 channel.buffers=NULL;
giuliomoro@131 84 }
giuliomoro@131 85 free(channel.readyToBeSent);
giuliomoro@131 86 channel.readyToBeSent=NULL;
giuliomoro@131 87 free(channel.doneOnTime);
giuliomoro@131 88 channel.doneOnTime=NULL;
giuliomoro@131 89 }
giuliomoro@131 90 void NetworkSend::cleanup(){
giuliomoro@131 91 dealloc();
giuliomoro@111 92 }
giuliomoro@111 93
giuliomoro@131 94 void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){
giuliomoro@131 95 #ifdef USE_JUCE
giuliomoro@131 96 #else
giuliomoro@111 97 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible
giuliomoro@111 98 //because of limitations in BeagleRT_createAuxiliaryTask()
giuliomoro@111 99 //keep track of added active instances
giuliomoro@111 100 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if
giuliomoro@111 101 // an instance of NetworkSend is then declared globally: the constructor gets called,
giuliomoro@111 102 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without
giuliomoro@111 103 // any destructor being called in between ...
giuliomoro@131 104 #endif /* USE_JUCE */
giuliomoro@131 105 cleanup();
giuliomoro@131 106 int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength;
giuliomoro@131 107 channel.numBuffers= 1+numSamples/channel.bufferLength; //the +1 takes the ceil() of the division
giuliomoro@131 108 channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*));
giuliomoro@131 109 printf("NumBuffers: %d\n", channel.numBuffers);
giuliomoro@131 110 if(channel.buffers==NULL)
giuliomoro@131 111 return;
giuliomoro@131 112 for(int n=0; n<channel.numBuffers; n++){
giuliomoro@131 113 channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float));
giuliomoro@131 114 if(channel.buffers[n]==NULL)
giuliomoro@131 115 return;
giuliomoro@131 116 }
giuliomoro@131 117 channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool));
giuliomoro@131 118 channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool));
giuliomoro@131 119 for(int n=0; n<channel.numBuffers; n++){
giuliomoro@131 120 channel.readyToBeSent[n]=false;
giuliomoro@131 121 channel.doneOnTime[n]=true;
giuliomoro@131 122 }
giuliomoro@131 123 if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL)
giuliomoro@131 124 return;
giuliomoro@131 125 channel.writePointer=0;
giuliomoro@131 126 channel.writeBuffer=0;
giuliomoro@131 127 channel.readBuffer=0;
giuliomoro@111 128 setChannelNumber(aChannelNumber);
giuliomoro@131 129 setPort(aPort); //TODO: check for the return value
giuliomoro@131 130 setServer(aServer); //TODO: check for the return value
giuliomoro@120 131 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate);
giuliomoro@131 132 channel.enabled=true;
giuliomoro@111 133 }
giuliomoro@111 134
giuliomoro@111 135 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method
giuliomoro@131 136 if(channel.enabled==false)
giuliomoro@131 137 return;
giuliomoro@131 138 if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ...
giuliomoro@131 139 channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such
giuliomoro@131 140 // printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
giuliomoro@131 141 channel.writePointer=channel.headerLength; //reset the writePointer
giuliomoro@131 142 channel.writeBuffer=(channel.writeBuffer+1); //switch buffer
giuliomoro@131 143 if(channel.writeBuffer==channel.numBuffers) // and wrap it
giuliomoro@131 144 channel.writeBuffer=0;
giuliomoro@131 145 // printf("WriteBuffer:%d\n", channel.writeBuffer);
giuliomoro@131 146 if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ...
giuliomoro@131 147 printf("Network buffer underrun. timestamp: %d :-{\n",
giuliomoro@131 148 (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
giuliomoro@111 149 }
giuliomoro@131 150 channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag
giuliomoro@131 151 #ifdef USE_JUCE
giuliomoro@131 152 if(isThreadRunning()==false){
giuliomoro@131 153 startThread(10);
giuliomoro@131 154 }
giuliomoro@131 155 #else
giuliomoro@131 156 if(isThreadRunning()==false){
giuliomoro@131 157 startThread();
giuliomoro@131 158 }
giuliomoro@131 159 #endif /* USE_JUCE */
giuliomoro@111 160 }
giuliomoro@131 161 if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header
giuliomoro@131 162 //set dynamic header values here. Static values are set in setup() and setChannelNumber().
giuliomoro@131 163 channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp
giuliomoro@131 164 channel.sampleCount++;
giuliomoro@111 165 //add here more header fields
giuliomoro@111 166 }
giuliomoro@131 167 channel.buffers[channel.writeBuffer][channel.writePointer++]=value;
giuliomoro@131 168 // sampleCount++;
giuliomoro@111 169 };
giuliomoro@111 170
giuliomoro@111 171 void NetworkSend::setServer(const char *aServer){
giuliomoro@131 172 #ifdef USE_JUCE
giuliomoro@131 173 remoteHostname=String::fromUTF8(aServer);
giuliomoro@131 174 #else
giuliomoro@111 175 udpClient.setServer(aServer);
giuliomoro@131 176 #endif /* USE_JUCE */
giuliomoro@111 177 }
giuliomoro@111 178 void NetworkSend::setPort(int aPort){
giuliomoro@131 179 #ifdef USE_JUCE
giuliomoro@131 180 remotePortNumber=aPort;
giuliomoro@131 181 #else
giuliomoro@111 182 udpClient.setPort(aPort);
giuliomoro@131 183 #endif /* USE_JUCE */
giuliomoro@111 184 }
giuliomoro@111 185
giuliomoro@111 186 void NetworkSend::setChannelNumber(int aChannelNumber){
giuliomoro@111 187 channel.channelNumber=aChannelNumber;
giuliomoro@131 188 for(int n=0; n<channel.numBuffers; n++){ //initialize the header
giuliomoro@131 189 channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber;
giuliomoro@131 190 //add here more static header fields
giuliomoro@131 191 }
giuliomoro@111 192 };
giuliomoro@111 193 int NetworkSend::getChannelNumber(){
giuliomoro@111 194 return channel.channelNumber;
giuliomoro@111 195 };
giuliomoro@111 196
giuliomoro@132 197 int NetworkSend::getTimestamp(){
giuliomoro@132 198 return channel.buffers[channel.readBuffer][channel.headerTimestampIndex];
giuliomoro@132 199 }
giuliomoro@132 200
giuliomoro@111 201 void NetworkSend::sendData(){
giuliomoro@131 202 if(channel.enabled==false)
giuliomoro@131 203 return;
giuliomoro@131 204 while(channel.readyToBeSent[channel.readBuffer]==true){
giuliomoro@131 205 channel.readyToBeSent[channel.readBuffer]=false;
giuliomoro@131 206 void* sourceBuffer=channel.buffers[channel.readBuffer];
giuliomoro@131 207 // printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]);
giuliomoro@131 208 // printf("ReadBuffer:%d\n", channel.readBuffer);
giuliomoro@131 209 unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float);
giuliomoro@131 210 //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?)
giuliomoro@131 211 #ifdef USE_JUCE
giuliomoro@131 212 if(1==udpClient.waitUntilReady(0, 5)){
giuliomoro@131 213 udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend);
giuliomoro@131 214 channel.doneOnTime[channel.readBuffer]=true;
giuliomoro@131 215 // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
giuliomoro@131 216 } else {
giuliomoro@131 217 // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
giuliomoro@131 218 }
giuliomoro@131 219 #else
giuliomoro@131 220 udpClient.send(sourceBuffer, numBytesToSend);
giuliomoro@131 221 channel.doneOnTime[channel.readBuffer]=true;
giuliomoro@131 222 #endif /* USE_JUCE */
giuliomoro@131 223 channel.readBuffer++;
giuliomoro@131 224 if(channel.readBuffer==channel.numBuffers)
giuliomoro@131 225 channel.readBuffer=0;
giuliomoro@111 226 }
giuliomoro@111 227 }
giuliomoro@111 228
giuliomoro@131 229 void NetworkSend::run(){
giuliomoro@131 230 #ifdef USE_JUCE
giuliomoro@131 231 // std::chrono::high_resolution_clock::time_point t1;
giuliomoro@131 232 // std::chrono::high_resolution_clock::time_point t2;
giuliomoro@131 233 // std::chrono::high_resolution_clock::time_point t3;
giuliomoro@131 234 while(threadShouldExit()==false){
giuliomoro@131 235 // t3 = std::chrono::high_resolution_clock::now();
giuliomoro@131 236 // t1 = std::chrono::high_resolution_clock::now();
giuliomoro@131 237 sendData();
giuliomoro@131 238 // t2 = std::chrono::high_resolution_clock::now();
giuliomoro@131 239 // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count();
giuliomoro@131 240 // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
giuliomoro@131 241 // if(duration2>0)
giuliomoro@131 242 // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n";
giuliomoro@131 243 sleep(1);
giuliomoro@131 244 }
giuliomoro@131 245 #else
giuliomoro@131 246 threadRunning=true;
giuliomoro@131 247 while(threadShouldExit()==false){
giuliomoro@131 248 sendAllData();
giuliomoro@131 249 usleep(sleepTimeMs*1000);
giuliomoro@131 250 }
giuliomoro@131 251 threadRunning=false;
giuliomoro@131 252 #endif
giuliomoro@131 253 }
giuliomoro@129 254 #ifdef USE_JUCE
giuliomoro@129 255 #else
giuliomoro@111 256 Scope::Scope(int aNumChannels):
giuliomoro@111 257 channels(aNumChannels)
giuliomoro@111 258 {};
giuliomoro@111 259 Scope::~Scope(){};
giuliomoro@111 260
giuliomoro@111 261 void Scope::log(int channel, float value){
giuliomoro@111 262 if(channel>=getNumChannels()) //TODO: assert this
giuliomoro@111 263 return;
giuliomoro@111 264 channels[channel].log(value);
giuliomoro@111 265 }
giuliomoro@111 266
giuliomoro@111 267 void Scope::setup(){
giuliomoro@111 268 setup(44100, 9999, "127.0.0.1");
giuliomoro@111 269 }
giuliomoro@111 270
giuliomoro@111 271 void Scope::setup(float sampleRate, int aPort, const char* aServer){
giuliomoro@111 272 for(int n=0; n<getNumChannels(); n++){
giuliomoro@131 273 channels[n].setup(sampleRate, 128, n, aPort, aServer); //TODO: replace 128 with the actual block size
giuliomoro@111 274 }
giuliomoro@111 275 }
giuliomoro@111 276
giuliomoro@119 277 void Scope::setPort(int port){
giuliomoro@119 278 for(int n=0; n<getNumChannels(); n++){
giuliomoro@119 279 channels[n].setPort(port);
giuliomoro@119 280 }
giuliomoro@119 281 }
giuliomoro@120 282 void Scope::setPort(int channel, int aPort){
giuliomoro@120 283 channels[channel].setPort(aPort);
giuliomoro@120 284 printf("Channel %d is now sending to port %d\n", channel, aPort);
giuliomoro@119 285 }
giuliomoro@119 286
giuliomoro@111 287 int Scope::getNumChannels(){
giuliomoro@111 288 return channels.size();
giuliomoro@111 289 }
giuliomoro@111 290
giuliomoro@111 291 void Scope::sendData(){
giuliomoro@111 292 NetworkSend::sendAllData();
giuliomoro@111 293 }
giuliomoro@129 294 #endif