Mercurial > hg > beaglert
changeset 131:ff28e56e5b7e scope-refactoring
Updated Network files to match Udpioplugin 18:fb5a61b10223
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Wed, 26 Aug 2015 02:02:10 +0100 |
parents | da1c61aa97ea |
children | e24c531220ee |
files | core/NetworkSend.cpp core/ReceiveAudioThread.cpp core/UdpServer.cpp include/NetworkSend.h include/ReceiveAudioThread.h include/UdpServer.h projects/scope/render.cpp |
diffstat | 7 files changed, 285 insertions(+), 94 deletions(-) [+] |
line wrap: on
line diff
--- a/core/NetworkSend.cpp Tue Aug 25 11:14:25 2015 +0100 +++ b/core/NetworkSend.cpp Wed Aug 26 02:02:10 2015 +0100 @@ -1,19 +1,24 @@ -#include <NetworkSend.h> +#include "NetworkSend.h" #ifdef USE_JUCE #else //initialize the static members of NetworkSend bool NetworkSend::staticConstructed=false; +int NetworkSend::sleepTimeMs; +bool NetworkSend::threadIsExiting; +bool NetworkSend::threadRunning; std::vector<NetworkSend*> NetworkSend::objAddrs(0); AuxiliaryTask NetworkSend::sendDataTask=NULL; void sendData(){ - NetworkSend::sendAllData(); + NetworkSend::run(); } void NetworkSend::staticConstructor(){ if(staticConstructed==true) return; staticConstructed=true; + threadIsExiting=false; + threadRunning=false; sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority } void NetworkSend::sendAllData(){ @@ -24,31 +29,71 @@ int NetworkSend::getNumInstances(){ return objAddrs.size(); } +void NetworkSend::startThread(){ + BeagleRT_scheduleAuxiliaryTask(sendDataTask); +} +void NetworkSend::stopThread(){ + threadIsExiting=true; +} +bool NetworkSend::threadShouldExit(){ + return(gShouldStop || threadIsExiting); +} +bool NetworkSend::isThreadRunning(){ + return threadRunning; +} #endif /* USE_JUCE */ +#ifdef USE_JUCE +NetworkSend::NetworkSend(const String &threadName): + Thread(threadName) +#else NetworkSend::NetworkSend() +#endif /* USE_JUCE */ { - sampleCount = 0; - channel.doneOnTime=true; - channel.index=channel.headerLength; //leave space for the heading message (channel, timestamp) - channel.activeBuffer=0; - channel.readyToBeSent=false; + channel.buffers=NULL; + channel.doneOnTime=NULL; + channel.readyToBeSent=NULL; + channel.enabled=false; + sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable + channel.sampleCount=0; } NetworkSend::~NetworkSend(){ +#ifdef USE_JUCE + stopThread(1000); +#else + stopThread(); for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances; if(objAddrs[n]==this){ objAddrs.erase(objAddrs.begin()+n); break; } } +#endif + dealloc(); +} +void NetworkSend::dealloc(){ + channel.enabled=false; + if(channel.buffers!=NULL){ + for(int n=0; n<channel.numBuffers; n++){ + free(channel.buffers[n]); + channel.buffers[n]=NULL; + } + free(channel.buffers); + channel.buffers=NULL; + } + free(channel.readyToBeSent); + channel.readyToBeSent=NULL; + free(channel.doneOnTime); + channel.doneOnTime=NULL; +} +void NetworkSend::cleanup(){ + dealloc(); } -void NetworkSend::setup(float aSampleRate){//TODO: remove this method - setup(aSampleRate, 0, 9999, "192.168.7.1");//channelNumber=0 -} - -void NetworkSend::setup(float aSampleRate, int aChannelNumber, int aPort, const char *aServer){ +void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){ +#ifdef USE_JUCE +#else staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible //because of limitations in BeagleRT_createAuxiliaryTask() //keep track of added active instances @@ -56,63 +101,152 @@ // an instance of NetworkSend is then declared globally: the constructor gets called, // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without // any destructor being called in between ... +#endif /* USE_JUCE */ + cleanup(); + int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength; + channel.numBuffers= 1+numSamples/channel.bufferLength; //the +1 takes the ceil() of the division + channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*)); + printf("NumBuffers: %d\n", channel.numBuffers); + if(channel.buffers==NULL) + return; + for(int n=0; n<channel.numBuffers; n++){ + channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float)); + if(channel.buffers[n]==NULL) + return; + } + channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool)); + channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool)); + for(int n=0; n<channel.numBuffers; n++){ + channel.readyToBeSent[n]=false; + channel.doneOnTime[n]=true; + } + if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL) + return; + channel.writePointer=0; + channel.writeBuffer=0; + channel.readBuffer=0; setChannelNumber(aChannelNumber); - setPort(aPort); - setServer(aServer); + setPort(aPort); //TODO: check for the return value + setServer(aServer); //TODO: check for the return value printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate); + channel.enabled=true; } void NetworkSend::log(float value){ //TODO: add a vectorized version of this method - if(channel.index==(NETWORK_AUDIO_BUFFER_SIZE)){ // when the buffer is ready ... - channel.readyToBeSent=true; - channel.index=channel.headerLength; //reset the counter - if(channel.doneOnTime==false){ - printf("Network buffer underrun. timestamp: %d :-{\n", (int)channel.buffers[!channel.activeBuffer][1]); + if(channel.enabled==false) + return; + if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ... + channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such +// printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); + channel.writePointer=channel.headerLength; //reset the writePointer + channel.writeBuffer=(channel.writeBuffer+1); //switch buffer + if(channel.writeBuffer==channel.numBuffers) // and wrap it + channel.writeBuffer=0; +// printf("WriteBuffer:%d\n", channel.writeBuffer); + if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ... + printf("Network buffer underrun. timestamp: %d :-{\n", + (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); } - channel.activeBuffer=!channel.activeBuffer; //switch buffer - channel.doneOnTime=false; - BeagleRT_scheduleAuxiliaryTask(NetworkSend::sendDataTask); //send the buffer - // TODO: maybe we should have transmitAudioTask running in a loop instead of scheduling it multiple times? - // The current solution allows to minimize latency when a single channel is used, as there is no inherent - // rt_task_sleep in the thread, as we are signaling it every time. - // Although, there is a possible race condition: if the auxiliaryTask is scheduled by channel 0, - // it might still be executing when channel 1 schedules it. But if the AuxTask has already skipped - // over channel 1, then we are at risk that channel 1 never gets sent. + channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag +#ifdef USE_JUCE + if(isThreadRunning()==false){ + startThread(10); + } +#else + if(isThreadRunning()==false){ + startThread(); + } +#endif /* USE_JUCE */ } - if(channel.index==channel.headerLength){ - channel.buffers[channel.activeBuffer][0] = (float)channel.channelNumber; //TODO: this could actually be done just once in setup() - channel.buffers[channel.activeBuffer][1]=(float)sampleCount; //timestamp + if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header + //set dynamic header values here. Static values are set in setup() and setChannelNumber(). + channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp + channel.sampleCount++; //add here more header fields } - channel.buffers[channel.activeBuffer][channel.index++]=value; - sampleCount++; + channel.buffers[channel.writeBuffer][channel.writePointer++]=value; +// sampleCount++; }; void NetworkSend::setServer(const char *aServer){ +#ifdef USE_JUCE + remoteHostname=String::fromUTF8(aServer); +#else udpClient.setServer(aServer); +#endif /* USE_JUCE */ } void NetworkSend::setPort(int aPort){ +#ifdef USE_JUCE + remotePortNumber=aPort; +#else udpClient.setPort(aPort); +#endif /* USE_JUCE */ } void NetworkSend::setChannelNumber(int aChannelNumber){ channel.channelNumber=aChannelNumber; + for(int n=0; n<channel.numBuffers; n++){ //initialize the header + channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber; + //add here more static header fields + } }; int NetworkSend::getChannelNumber(){ return channel.channelNumber; }; void NetworkSend::sendData(){ - if(channel.readyToBeSent){ - channel.readyToBeSent=false; - udpClient.send( - channel.buffers[!channel.activeBuffer], - NETWORK_AUDIO_BUFFER_SIZE*sizeof(float) - ); - channel.doneOnTime=true; + if(channel.enabled==false) + return; + while(channel.readyToBeSent[channel.readBuffer]==true){ + channel.readyToBeSent[channel.readBuffer]=false; + void* sourceBuffer=channel.buffers[channel.readBuffer]; +// printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]); +// printf("ReadBuffer:%d\n", channel.readBuffer); + unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float); + //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?) +#ifdef USE_JUCE + if(1==udpClient.waitUntilReady(0, 5)){ + udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend); + channel.doneOnTime[channel.readBuffer]=true; + // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]); + } else { + // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]); + } +#else + udpClient.send(sourceBuffer, numBytesToSend); + channel.doneOnTime[channel.readBuffer]=true; +#endif /* USE_JUCE */ + channel.readBuffer++; + if(channel.readBuffer==channel.numBuffers) + channel.readBuffer=0; } } +void NetworkSend::run(){ +#ifdef USE_JUCE + // std::chrono::high_resolution_clock::time_point t1; + // std::chrono::high_resolution_clock::time_point t2; + // std::chrono::high_resolution_clock::time_point t3; + while(threadShouldExit()==false){ + // t3 = std::chrono::high_resolution_clock::now(); + // t1 = std::chrono::high_resolution_clock::now(); + sendData(); + // t2 = std::chrono::high_resolution_clock::now(); + // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count(); + // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count(); + // if(duration2>0) + // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n"; + sleep(1); + } +#else + threadRunning=true; + while(threadShouldExit()==false){ + sendAllData(); + usleep(sleepTimeMs*1000); + } + threadRunning=false; +#endif +} #ifdef USE_JUCE #else Scope::Scope(int aNumChannels): @@ -132,7 +266,7 @@ void Scope::setup(float sampleRate, int aPort, const char* aServer){ for(int n=0; n<getNumChannels(); n++){ - channels[n].setup(sampleRate, n, aPort, aServer); + channels[n].setup(sampleRate, 128, n, aPort, aServer); //TODO: replace 128 with the actual block size } }
--- a/core/ReceiveAudioThread.cpp Tue Aug 25 11:14:25 2015 +0100 +++ b/core/ReceiveAudioThread.cpp Wed Aug 26 02:02:10 2015 +0100 @@ -80,21 +80,19 @@ // printf("I am channel %d, but I received data for channel %d\n", channel, (int)buffer[writePointer]); return -5; } - //TODO: do something else with the data in the header (e.g.: check that timestamp is sequential) + static int timestamp=0; + if(buffer[writePointer+1]!=timestamp+1) + printf("missing a timestamp: %d\n",timestamp+1); + timestamp=buffer[writePointer+1]; // rt_printf("Received a message of length %d, it was on channel %d and timestamp %d\n", numBytes, (int)buffer[writePointer], (int)buffer[writePointer+1]); + popPayload(writePointer); //restore headerLength payload samples. This could be skipped if writePointer==0 - //even though we just wrote (payloadLength+headerLength) samples in the buffer, //we only increment by payloadLength. This way, next time a socket.read is performed, we will //backup the last headerLength samples that we just wrote and we will overwrite them with //the header from the new read. After parsing the header we will then restore the backed up samples. //This way we guarantee that, apart from the first headerLength samples, buffer is a circular buffer! -// printf("writepointer:%d\n", writePointer); writePointer+=payloadLength; - - if(writePointer>lastValidPointer){ - // lastValidPointer=writePointer+headerLength; - } wrapWritePointer(); return numBytes; } @@ -125,6 +123,7 @@ #ifdef USE_JUCE stopThread(1000); #else + stopThread(); while(threadRunning){ usleep(sleepTime*2); //wait for thread to stop std::cout<< "Waiting for receiveAudioTask to stop" << std::endl; @@ -247,6 +246,31 @@ // fd2=fopen("buffer.m","w"); //DEBUG // fprintf(fd2, "buf=["); //DEBUG threadRunning=true; + int maxCount=10; + int count=0; + // Clean the socket from anything that is currently in it. +#ifdef USE_JUCE + // this is borrowed from BeagleRT's UdpServer class. + int n; + do { + float waste; + if(socket.waitUntilReady(true, 0)==0) + break; + n=socket.read((void*)&waste, sizeof(float), false); + count++; + if(n<0){ + printf("error\n"); + break; + } + printf("n: %d\n",n); + } while (n>0 && (maxCount<=0 || count<maxCount)); +#else + for(unsigned int n=0; n<objAddrs.size(); n++){ + count=objAddrs[n]->socket.empty(maxCount); + } +#endif /* USE_JUCE */ + printf("socket emptied with %d reads\n", count); + while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting #ifdef USE_JUCE readUdpToBuffer(); // read into the oldBuffer
--- a/core/UdpServer.cpp Tue Aug 25 11:14:25 2015 +0100 +++ b/core/UdpServer.cpp Wed Aug 26 02:02:10 2015 +0100 @@ -82,7 +82,7 @@ return -1; FD_ZERO(&stReadFDS); FD_SET(inSocket, &stReadFDS); - int descriptorReady= select(inSocket+1, &stReadFDS, NULL, NULL, &stZeroTimeOut); + int descriptorReady= select(inSocket+1, &stReadFDS, NULL, NULL, &stZeroTimeOut); //TODO: this is not JUCE-compliant if(descriptorReady<0){ //an error occurred return -1; } @@ -100,24 +100,19 @@ // while (blockUntilSpecifiedAmountHasArrived && numberOfBytes==maxBytesToRead); return numberOfBytes; } -int UdpServer::emptySocket(){ - return emptySocket(0); +int UdpServer::empty(){ + return empty(0); } -int UdpServer::emptySocket(int maxBytes){//discards up to maxBytes from the socket. Returns the number of bytes discarded. - if(wasteBuffer==NULL) - return -1; - int numberOfBytes=0; - while(int n=read(wasteBuffer, wasteBufferSize, false)){// calls the read function until it does not return any more bytes (i.e.: socket is empty) - if(n>0) - numberOfBytes+=n; - if(n<0) - return -1; - if(maxBytes>0 && numberOfBytes>=maxBytes) - break; - }; - return numberOfBytes; +int UdpServer::empty(int maxCount){ + int count=0; + int n; + do { + if(waitUntilReady(true, 0)==0) + return 0; + float waste; + n=read(&waste, sizeof(float), false); + count++; + } while (n>0 && (maxCount<=0 || maxCount<count)); + printf("socket emptied with %d reads\n", count); + return count; } -void* UdpServer::getWaste(){ //returns the last datagram retrieved by emptySocket() - return wasteBuffer; -} -
--- a/include/NetworkSend.h Tue Aug 25 11:14:25 2015 +0100 +++ b/include/NetworkSend.h Wed Aug 26 02:02:10 2015 +0100 @@ -3,24 +3,37 @@ #define SCOPE_H_ #ifdef USE_JUCE +#include <JuceHeader.h> #else #include <BeagleRT.h> #include <rtdk.h> #include <cmath> #include <UdpClient.h> #include <vector> +#include <string> +extern bool gShouldStop; #endif /* USE_JUCE */ #define NETWORK_AUDIO_BUFFER_SIZE 302 +#define UDP_BUFFER_HEADER_CHANNEL_INDEX 0 +#define UDP_BUFFER_HEADER_TIMESTAMP_INDEX 1 +#define UDP_BUFFER_HEADER_LENGTH 2 struct NetworkBuffer{ int channelNumber; - int activeBuffer; - int index; - float buffers[2][NETWORK_AUDIO_BUFFER_SIZE]; - bool doneOnTime; - bool readyToBeSent; - static const int headerLength=2; + int numBuffers; + int writeBuffer; + int readBuffer; + int writePointer; + float** buffers; + bool* doneOnTime; + bool* readyToBeSent; + bool enabled; + int sampleCount; + static const int bufferLength=NETWORK_AUDIO_BUFFER_SIZE; + static const int headerLength=UDP_BUFFER_HEADER_LENGTH; + static const int headerChannelIndex=UDP_BUFFER_HEADER_CHANNEL_INDEX; + static const int headerTimestampIndex=UDP_BUFFER_HEADER_TIMESTAMP_INDEX; }; #ifdef USE_JUCE @@ -28,23 +41,35 @@ #else class NetworkSend { #endif /* USE_JUCE */ - int sampleCount; float sampleRate; #ifdef USE_JUCE DatagramSocket udpClient; + int sleepTimeMs; + String remoteHostname; + int remotePortNumber; #else UdpClient udpClient; + bool isThreadRunning(); + static int sleepTimeMs; + static bool threadShouldExit(); + static bool threadIsExiting; + static bool threadRunning; static bool staticConstructed; static void staticConstructor(); static AuxiliaryTask sendDataTask; //TODO: allow different AuxiliaryTasks for different priorities (e.g.: audio vs scope) static std::vector<NetworkSend *> objAddrs; #endif /* USE_JUCE */ - public: + void dealloc(); +public: NetworkBuffer channel; +#ifdef USE_JUCE + NetworkSend(const String &threadName); +#else NetworkSend(); +#endif ~NetworkSend(); - void setup(float aSampleRate); - void setup(float aSampleRate, int aChannelNumber, int aPort, const char *aServer); + void setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer); + void cleanup(); void sendData(); void log(float value); void setPort(int aPort); @@ -56,6 +81,8 @@ #else static int getNumInstances(); static void sendAllData(); + static void startThread(); + static void stopThread(); static void run(); #endif /* USE_JUCE */ };
--- a/include/ReceiveAudioThread.h Tue Aug 25 11:14:25 2015 +0100 +++ b/include/ReceiveAudioThread.h Wed Aug 26 02:02:10 2015 +0100 @@ -9,6 +9,7 @@ #include <UdpServer.h> #include <BeagleRT.h> #include <native/task.h> +#include <math.h> #endif /*USE_JUCE*/ #ifdef USE_JUCE
--- a/include/UdpServer.h Tue Aug 25 11:14:25 2015 +0100 +++ b/include/UdpServer.h Wed Aug 26 02:02:10 2015 +0100 @@ -52,9 +52,8 @@ */ int read(void* destBuffer, int maxBytesToRead, bool blockUntilSpecifiedAmountHasArrived); void close(); - int emptySocket(); - int emptySocket(int maxBytes); - void *getWaste(); + int empty(); + int empty(int maxCount); /* * Waits until the socket is ready for reading or writing. *
--- a/projects/scope/render.cpp Tue Aug 25 11:14:25 2015 +0100 +++ b/projects/scope/render.cpp Wed Aug 26 02:02:10 2015 +0100 @@ -18,17 +18,17 @@ // in from the call to initAudio(). // // Return true on success; returning false halts the program. -//ReceiveAudioThread receiveAudio0; +ReceiveAudioThread receiveAudio0; //ReceiveAudioThread receiveAudio1; bool setup(BeagleRTContext *context, void *userData) { -// receiveAudio0.init(9999, context->audioFrames, 0); + receiveAudio0.init(10000, context->audioFrames, 0); // receiveAudio1.init(10000, context->audioFrames, 1); -// + // scope.setup(); //call this once in setup to initialise the scope // scope.setPort(0, 9999); // scope.setPort(1, 10000); - networkSend.setup(context->audioSampleRate, 0, 9999, "192.168.7.1"); + networkSend.setup(context->audioSampleRate, context->audioFrames, 0, 9999, "192.168.7.1"); gInverseSampleRate = 1.0/context->audioSampleRate; @@ -49,23 +49,24 @@ void render(BeagleRTContext *context, void *userData) { static int count=0; -// if(count==0){ -// printf("startHread\n"); -// ReceiveAudioThread::startThread(); -// } + if(count==0){ + printf("startHread\n"); + ReceiveAudioThread::startThread(); + } for(unsigned int n = 0; n < context->audioFrames; n++) { float chn0 = sinf(gPhase1); - float chn1 = sinf(gPhase2); +// float chn1 = sinf(gPhase2); // float chn2 = context->audioIn[n*2 + 0]; // float chn3 = context->audioIn[n*2 + 1]; // float chn4 = context->analogIn[(int)n/2*8 + 0]; // float chn5 = context->analogIn[(int)n/2*8 + 1]; - networkSend.log(chn0); -// scope.log(0, chn0); -// scope.log(1, chn1); + networkSend.log(context->audioIn[n]); +// networkSend.log(chn0); +// scope.log(0, chn0); +// scope.log(1, chn1); // scope.log(2, chn2); // scope.log(3, chn3); // scope.log(4, chn4); @@ -87,9 +88,19 @@ } if(count>0){ -// int readPointer0=receiveAudio0.getSamplesSrc(context->audioOut, context->audioFrames, 1, 2, 0); + float samplingRateRatio=1; + int channelsInDestinationBuffer=2; + int channelToWriteTo=0; + int length=receiveAudio0.getSamplesSrc(context->audioOut, context->audioFrames, + samplingRateRatio, channelsInDestinationBuffer, channelToWriteTo); + if(length!=context->audioFrames){ + rt_printf("Length mismatch: %d\n", length); + } // int readPointer1=receiveAudio1.getSamplesSrc(context->audioOut, context->audioFrames, 1, 2, 1); } + for(int n=0; n<context->audioFrames; n++){ + context->audioOut[n*2+1]=context->audioOut[n*2]; + } count++; }