annotate core/NetworkSend.cpp @ 151:e9c9404e3d1f ClockSync

Pff partially working. No PID. When setting the audio clock on the bbb to 44098 the master and slave clock keep diverging instead of converging ...
author Giulio Moro <giuliomoro@yahoo.it>
date Tue, 22 Sep 2015 04:10:07 +0100
parents e24c531220ee
children
rev   line source
giuliomoro@131 1 #include "NetworkSend.h"
giuliomoro@111 2
giuliomoro@129 3 #ifdef USE_JUCE
giuliomoro@129 4 #else
giuliomoro@111 5 //initialize the static members of NetworkSend
giuliomoro@111 6 bool NetworkSend::staticConstructed=false;
giuliomoro@131 7 int NetworkSend::sleepTimeMs;
giuliomoro@131 8 bool NetworkSend::threadIsExiting;
giuliomoro@131 9 bool NetworkSend::threadRunning;
giuliomoro@111 10 std::vector<NetworkSend*> NetworkSend::objAddrs(0);
giuliomoro@116 11 AuxiliaryTask NetworkSend::sendDataTask=NULL;
giuliomoro@111 12
giuliomoro@129 13 void sendData(){
giuliomoro@131 14 NetworkSend::run();
giuliomoro@111 15 }
giuliomoro@129 16 void NetworkSend::staticConstructor(){
giuliomoro@129 17 if(staticConstructed==true)
giuliomoro@129 18 return;
giuliomoro@129 19 staticConstructed=true;
giuliomoro@131 20 threadIsExiting=false;
giuliomoro@131 21 threadRunning=false;
giuliomoro@129 22 sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority
giuliomoro@129 23 }
giuliomoro@111 24 void NetworkSend::sendAllData(){
giuliomoro@111 25 for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){
giuliomoro@111 26 NetworkSend::objAddrs[n]->sendData();
giuliomoro@111 27 }
giuliomoro@111 28 }
giuliomoro@129 29 int NetworkSend::getNumInstances(){
giuliomoro@129 30 return objAddrs.size();
giuliomoro@129 31 }
giuliomoro@131 32 void NetworkSend::startThread(){
giuliomoro@131 33 BeagleRT_scheduleAuxiliaryTask(sendDataTask);
giuliomoro@131 34 }
giuliomoro@131 35 void NetworkSend::stopThread(){
giuliomoro@131 36 threadIsExiting=true;
giuliomoro@131 37 }
giuliomoro@131 38 bool NetworkSend::threadShouldExit(){
giuliomoro@131 39 return(gShouldStop || threadIsExiting);
giuliomoro@131 40 }
giuliomoro@131 41 bool NetworkSend::isThreadRunning(){
giuliomoro@131 42 return threadRunning;
giuliomoro@131 43 }
giuliomoro@129 44 #endif /* USE_JUCE */
giuliomoro@111 45
giuliomoro@131 46 #ifdef USE_JUCE
giuliomoro@131 47 NetworkSend::NetworkSend(const String &threadName):
giuliomoro@131 48 Thread(threadName)
giuliomoro@131 49 #else
giuliomoro@111 50 NetworkSend::NetworkSend()
giuliomoro@131 51 #endif /* USE_JUCE */
giuliomoro@111 52 {
giuliomoro@131 53 channel.buffers=NULL;
giuliomoro@131 54 channel.doneOnTime=NULL;
giuliomoro@131 55 channel.readyToBeSent=NULL;
giuliomoro@131 56 channel.enabled=false;
giuliomoro@131 57 sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable
giuliomoro@131 58 channel.sampleCount=0;
giuliomoro@111 59 }
giuliomoro@116 60
giuliomoro@111 61 NetworkSend::~NetworkSend(){
giuliomoro@131 62 #ifdef USE_JUCE
giuliomoro@131 63 stopThread(1000);
giuliomoro@131 64 #else
giuliomoro@131 65 stopThread();
giuliomoro@111 66 for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances;
giuliomoro@111 67 if(objAddrs[n]==this){
giuliomoro@111 68 objAddrs.erase(objAddrs.begin()+n);
giuliomoro@111 69 break;
giuliomoro@111 70 }
giuliomoro@111 71 }
giuliomoro@131 72 #endif
giuliomoro@131 73 dealloc();
giuliomoro@131 74 }
giuliomoro@131 75 void NetworkSend::dealloc(){
giuliomoro@131 76 channel.enabled=false;
giuliomoro@131 77 if(channel.buffers!=NULL){
giuliomoro@131 78 for(int n=0; n<channel.numBuffers; n++){
giuliomoro@131 79 free(channel.buffers[n]);
giuliomoro@131 80 channel.buffers[n]=NULL;
giuliomoro@131 81 }
giuliomoro@131 82 free(channel.buffers);
giuliomoro@131 83 channel.buffers=NULL;
giuliomoro@131 84 }
giuliomoro@131 85 free(channel.readyToBeSent);
giuliomoro@131 86 channel.readyToBeSent=NULL;
giuliomoro@131 87 free(channel.doneOnTime);
giuliomoro@131 88 channel.doneOnTime=NULL;
giuliomoro@131 89 }
giuliomoro@131 90 void NetworkSend::cleanup(){
giuliomoro@131 91 dealloc();
giuliomoro@111 92 }
giuliomoro@111 93
giuliomoro@131 94 void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){
giuliomoro@131 95 #ifdef USE_JUCE
giuliomoro@131 96 #else
giuliomoro@111 97 staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible
giuliomoro@111 98 //because of limitations in BeagleRT_createAuxiliaryTask()
giuliomoro@111 99 //keep track of added active instances
giuliomoro@111 100 objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if
giuliomoro@111 101 // an instance of NetworkSend is then declared globally: the constructor gets called,
giuliomoro@111 102 // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without
giuliomoro@111 103 // any destructor being called in between ...
giuliomoro@131 104 #endif /* USE_JUCE */
giuliomoro@131 105 cleanup();
giuliomoro@131 106 int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength;
giuliomoro@131 107 channel.numBuffers= 1+numSamples/channel.bufferLength; //the +1 takes the ceil() of the division
giuliomoro@131 108 channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*));
giuliomoro@131 109 printf("NumBuffers: %d\n", channel.numBuffers);
giuliomoro@131 110 if(channel.buffers==NULL)
giuliomoro@131 111 return;
giuliomoro@131 112 for(int n=0; n<channel.numBuffers; n++){
giuliomoro@131 113 channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float));
giuliomoro@131 114 if(channel.buffers[n]==NULL)
giuliomoro@131 115 return;
giuliomoro@131 116 }
giuliomoro@131 117 channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool));
giuliomoro@131 118 channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool));
giuliomoro@131 119 for(int n=0; n<channel.numBuffers; n++){
giuliomoro@131 120 channel.readyToBeSent[n]=false;
giuliomoro@131 121 channel.doneOnTime[n]=true;
giuliomoro@131 122 }
giuliomoro@131 123 if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL)
giuliomoro@131 124 return;
giuliomoro@131 125 channel.writePointer=0;
giuliomoro@131 126 channel.writeBuffer=0;
giuliomoro@131 127 channel.readBuffer=0;
giuliomoro@111 128 setChannelNumber(aChannelNumber);
giuliomoro@131 129 setPort(aPort); //TODO: check for the return value
giuliomoro@131 130 setServer(aServer); //TODO: check for the return value
giuliomoro@120 131 printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate);
giuliomoro@131 132 channel.enabled=true;
giuliomoro@111 133 }
giuliomoro@111 134
giuliomoro@111 135 void NetworkSend::log(float value){ //TODO: add a vectorized version of this method
giuliomoro@131 136 if(channel.enabled==false)
giuliomoro@131 137 return;
giuliomoro@131 138 if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ...
giuliomoro@131 139 channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such
giuliomoro@131 140 // printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
giuliomoro@131 141 channel.writePointer=channel.headerLength; //reset the writePointer
giuliomoro@131 142 channel.writeBuffer=(channel.writeBuffer+1); //switch buffer
giuliomoro@131 143 if(channel.writeBuffer==channel.numBuffers) // and wrap it
giuliomoro@131 144 channel.writeBuffer=0;
giuliomoro@131 145 // printf("WriteBuffer:%d\n", channel.writeBuffer);
giuliomoro@131 146 if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ...
giuliomoro@131 147 printf("Network buffer underrun. timestamp: %d :-{\n",
giuliomoro@131 148 (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
giuliomoro@111 149 }
giuliomoro@131 150 channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag
giuliomoro@131 151 #ifdef USE_JUCE
giuliomoro@131 152 if(isThreadRunning()==false){
giuliomoro@131 153 startThread(10);
giuliomoro@131 154 }
giuliomoro@131 155 #else
giuliomoro@131 156 if(isThreadRunning()==false){
giuliomoro@131 157 startThread();
giuliomoro@131 158 }
giuliomoro@131 159 #endif /* USE_JUCE */
giuliomoro@111 160 }
giuliomoro@131 161 if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header
giuliomoro@131 162 //set dynamic header values here. Static values are set in setup() and setChannelNumber().
giuliomoro@131 163 channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp
giuliomoro@131 164 channel.sampleCount++;
giuliomoro@111 165 //add here more header fields
giuliomoro@111 166 }
giuliomoro@131 167 channel.buffers[channel.writeBuffer][channel.writePointer++]=value;
giuliomoro@131 168 // sampleCount++;
giuliomoro@111 169 };
giuliomoro@111 170
giuliomoro@111 171 void NetworkSend::setServer(const char *aServer){
giuliomoro@131 172 #ifdef USE_JUCE
giuliomoro@131 173 remoteHostname=String::fromUTF8(aServer);
giuliomoro@131 174 #else
giuliomoro@111 175 udpClient.setServer(aServer);
giuliomoro@131 176 #endif /* USE_JUCE */
giuliomoro@111 177 }
giuliomoro@111 178 void NetworkSend::setPort(int aPort){
giuliomoro@131 179 #ifdef USE_JUCE
giuliomoro@131 180 remotePortNumber=aPort;
giuliomoro@131 181 #else
giuliomoro@111 182 udpClient.setPort(aPort);
giuliomoro@131 183 #endif /* USE_JUCE */
giuliomoro@111 184 }
giuliomoro@111 185
giuliomoro@111 186 void NetworkSend::setChannelNumber(int aChannelNumber){
giuliomoro@111 187 channel.channelNumber=aChannelNumber;
giuliomoro@131 188 for(int n=0; n<channel.numBuffers; n++){ //initialize the header
giuliomoro@131 189 channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber;
giuliomoro@131 190 //add here more static header fields
giuliomoro@131 191 }
giuliomoro@111 192 };
giuliomoro@111 193 int NetworkSend::getChannelNumber(){
giuliomoro@111 194 return channel.channelNumber;
giuliomoro@111 195 };
giuliomoro@111 196
giuliomoro@132 197 int NetworkSend::getTimestamp(){
giuliomoro@132 198 return channel.buffers[channel.readBuffer][channel.headerTimestampIndex];
giuliomoro@132 199 }
giuliomoro@132 200
giuliomoro@111 201 void NetworkSend::sendData(){
giuliomoro@131 202 if(channel.enabled==false)
giuliomoro@131 203 return;
giuliomoro@131 204 while(channel.readyToBeSent[channel.readBuffer]==true){
giuliomoro@131 205 channel.readyToBeSent[channel.readBuffer]=false;
giuliomoro@131 206 void* sourceBuffer=channel.buffers[channel.readBuffer];
giuliomoro@131 207 // printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]);
giuliomoro@131 208 // printf("ReadBuffer:%d\n", channel.readBuffer);
giuliomoro@131 209 unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float);
giuliomoro@131 210 //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?)
giuliomoro@131 211 #ifdef USE_JUCE
giuliomoro@131 212 if(1==udpClient.waitUntilReady(0, 5)){
giuliomoro@131 213 udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend);
giuliomoro@131 214 channel.doneOnTime[channel.readBuffer]=true;
giuliomoro@131 215 // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
giuliomoro@131 216 } else {
giuliomoro@131 217 // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
giuliomoro@131 218 }
giuliomoro@131 219 #else
giuliomoro@131 220 udpClient.send(sourceBuffer, numBytesToSend);
giuliomoro@131 221 channel.doneOnTime[channel.readBuffer]=true;
giuliomoro@131 222 #endif /* USE_JUCE */
giuliomoro@131 223 channel.readBuffer++;
giuliomoro@131 224 if(channel.readBuffer==channel.numBuffers)
giuliomoro@131 225 channel.readBuffer=0;
giuliomoro@111 226 }
giuliomoro@111 227 }
giuliomoro@111 228
giuliomoro@131 229 void NetworkSend::run(){
giuliomoro@131 230 #ifdef USE_JUCE
giuliomoro@131 231 // std::chrono::high_resolution_clock::time_point t1;
giuliomoro@131 232 // std::chrono::high_resolution_clock::time_point t2;
giuliomoro@131 233 // std::chrono::high_resolution_clock::time_point t3;
giuliomoro@131 234 while(threadShouldExit()==false){
giuliomoro@131 235 // t3 = std::chrono::high_resolution_clock::now();
giuliomoro@131 236 // t1 = std::chrono::high_resolution_clock::now();
giuliomoro@131 237 sendData();
giuliomoro@131 238 // t2 = std::chrono::high_resolution_clock::now();
giuliomoro@131 239 // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count();
giuliomoro@131 240 // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
giuliomoro@131 241 // if(duration2>0)
giuliomoro@131 242 // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n";
giuliomoro@131 243 sleep(1);
giuliomoro@131 244 }
giuliomoro@131 245 #else
giuliomoro@131 246 threadRunning=true;
giuliomoro@131 247 while(threadShouldExit()==false){
giuliomoro@131 248 sendAllData();
giuliomoro@131 249 usleep(sleepTimeMs*1000);
giuliomoro@131 250 }
giuliomoro@131 251 threadRunning=false;
giuliomoro@131 252 #endif
giuliomoro@131 253 }
giuliomoro@129 254 #ifdef USE_JUCE
giuliomoro@129 255 #else
giuliomoro@111 256 Scope::Scope(int aNumChannels):
giuliomoro@111 257 channels(aNumChannels)
giuliomoro@111 258 {};
giuliomoro@111 259 Scope::~Scope(){};
giuliomoro@111 260
giuliomoro@111 261 void Scope::log(int channel, float value){
giuliomoro@111 262 if(channel>=getNumChannels()) //TODO: assert this
giuliomoro@111 263 return;
giuliomoro@111 264 channels[channel].log(value);
giuliomoro@111 265 }
giuliomoro@111 266
giuliomoro@111 267 void Scope::setup(){
giuliomoro@111 268 setup(44100, 9999, "127.0.0.1");
giuliomoro@111 269 }
giuliomoro@111 270
giuliomoro@111 271 void Scope::setup(float sampleRate, int aPort, const char* aServer){
giuliomoro@111 272 for(int n=0; n<getNumChannels(); n++){
giuliomoro@131 273 channels[n].setup(sampleRate, 128, n, aPort, aServer); //TODO: replace 128 with the actual block size
giuliomoro@111 274 }
giuliomoro@111 275 }
giuliomoro@111 276
giuliomoro@119 277 void Scope::setPort(int port){
giuliomoro@119 278 for(int n=0; n<getNumChannels(); n++){
giuliomoro@119 279 channels[n].setPort(port);
giuliomoro@119 280 }
giuliomoro@119 281 }
giuliomoro@120 282 void Scope::setPort(int channel, int aPort){
giuliomoro@120 283 channels[channel].setPort(aPort);
giuliomoro@120 284 printf("Channel %d is now sending to port %d\n", channel, aPort);
giuliomoro@119 285 }
giuliomoro@119 286
giuliomoro@111 287 int Scope::getNumChannels(){
giuliomoro@111 288 return channels.size();
giuliomoro@111 289 }
giuliomoro@111 290
giuliomoro@111 291 void Scope::sendData(){
giuliomoro@111 292 NetworkSend::sendAllData();
giuliomoro@111 293 }
giuliomoro@129 294 #endif