Mercurial > hg > beaglert
changeset 255:0f813ddf5773 aux_task_args
Removed core files which use 'old' aux_tasks.
author | Liam Donovan <l.b.donovan@qmul.ac.uk> |
---|---|
date | Tue, 03 May 2016 11:10:12 +0100 (2016-05-03) |
parents | 173978a5ab6a |
children | |
files | core/Midi.cpp core/NetworkSend.cpp core/ReceiveAudioThread.cpp core/WriteFile.cpp |
diffstat | 4 files changed, 0 insertions(+), 1125 deletions(-) [+] |
line wrap: on
line diff
--- a/core/Midi.cpp Tue May 03 11:04:56 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,259 +0,0 @@ -/* - * Midi.cpp - * - * Created on: 15 Jan 2016 - * Author: giulio - */ - -#include "Midi.h" -#include <fcntl.h> -#include <errno.h> - -#define kMidiInput 0 -#define kMidiOutput 1 - - -midi_byte_t midiMessageStatusBytes[midiMessageStatusBytesLength]= -{ - 0x80, - 0x90, - 0xA0, - 0xB0, - 0xC0, - 0xD0, - 0xE0, - 0 -}; - -unsigned int midiMessageNumDataBytes[midiMessageStatusBytesLength]={2, 2, 2, 2, 1, 1, 2, 0}; - -bool Midi::staticConstructed; -AuxiliaryTask Midi::midiInputTask; -AuxiliaryTask Midi::midiOutputTask; -std::vector<Midi *> Midi::objAddrs[2]; - -int MidiParser::parse(midi_byte_t* input, unsigned int length){ - unsigned int consumedBytes = 0; - for(unsigned int n = 0; n < length; n++){ - consumedBytes++; - if(waitingForStatus == true){ - int statusByte = input[n]; - MidiMessageType newType = kmmNone; - if (statusByte >= 0x80){//it actually is a status byte - for(int n = 0; n < midiMessageStatusBytesLength; n++){ //find the statusByte in the array - if(midiMessageStatusBytes[n] == (statusByte&0xf0)){ - newType = (MidiMessageType)n; - break; - } - } - elapsedDataBytes = 0; - waitingForStatus = false; - messages[writePointer].setType(newType); - messages[writePointer].setChannel((midi_byte_t)(statusByte&0xf)); - consumedBytes++; - } else { // either something went wrong or it's a system message - continue; - } - } else { - messages[writePointer].setDataByte(elapsedDataBytes, input[n]); - elapsedDataBytes++; - if(elapsedDataBytes == messages[writePointer].getNumDataBytes()){ - // done with the current message - // call the callback if available - if(isCallbackEnabled() == true){ - messageReadyCallback(getNextChannelMessage(), callbackArg); - } - waitingForStatus = true; - writePointer++; - if(writePointer == messages.size()){ - writePointer = 0; - } - } - } - } - - return consumedBytes; -}; - - -Midi::Midi(){ - outputPort = -1; - inputPort = -1; - inputParser = 0; - size_t inputBytesInitialSize = 1000; - inputBytes.resize(inputBytesInitialSize); - outputBytes.resize(inputBytesInitialSize); - inputBytesWritePointer = 0; - inputBytesReadPointer = inputBytes.size() - 1; - if(!staticConstructed){ - staticConstructor(); - } -} - -void Midi::staticConstructor(){ - staticConstructed = true; - midiInputTask = BeagleRT_createAuxiliaryTask(Midi::midiInputLoop, 50, "MidiInput"); - midiOutputTask = BeagleRT_createAuxiliaryTask(Midi::midiOutputLoop, 50, "MidiOutupt"); -} - -Midi::~Midi(){} - -void Midi::enableParser(bool enable){ - if(enable == true){ - delete inputParser; - inputParser = new MidiParser(); - parserEnabled = true; - } else { - delete inputParser; - parserEnabled = false; - } -} - -void Midi::midiInputLoop(){ - for(unsigned int n = 0; n < objAddrs[kMidiInput].size(); n++){ - objAddrs[kMidiInput][n] -> readInputLoop(); - } -} - -void Midi::midiOutputLoop(){ - for(unsigned int n = 0; n < objAddrs[kMidiOutput].size(); n++){ - objAddrs[kMidiOutput][n] -> writeOutputLoop(); - } -} - -void Midi::readInputLoop(){ - while(!gShouldStop){ - int maxBytesToRead = inputBytes.size() - inputBytesWritePointer; - int ret = read(inputPort, &inputBytes[inputBytesWritePointer], sizeof(midi_byte_t)*maxBytesToRead); - if(ret < 0){ - if(errno != EAGAIN){ // read() would return EAGAIN when no data are available to read just now - rt_printf("Error while reading midi %d\n", errno); - } - usleep(1000); - continue; - } - inputBytesWritePointer += ret; - if(inputBytesWritePointer == inputBytes.size()){ //wrap pointer around - inputBytesWritePointer = 0; - } - - if(parserEnabled == true && ret > 0){ // if the parser is enabled and there is new data, send the data to it - int input; - while((input=_getInput()) >= 0){ - midi_byte_t inputByte = (midi_byte_t)(input); - inputParser->parse(&inputByte, 1); - } - } - if(ret < maxBytesToRead){ //no more data to retrieve at the moment - usleep(1000); - } // otherwise there might be more data ready to be read (we were at the end of the buffer), so don't sleep - } -} - -void Midi::writeOutputLoop(){ - while(!gShouldStop){ - usleep(1000); - int length = outputBytesWritePointer - outputBytesReadPointer; - if(length < 0){ - length = outputBytes.size() - outputBytesReadPointer; - } - if(length == 0){ //nothing to be written - continue; - } - int ret; - ret = write(outputPort, &outputBytes[outputBytesReadPointer], sizeof(midi_byte_t)*length); - if(ret < 0){ //error occurred - rt_printf("error occurred while writing: %d\n", errno); - usleep(10000); //wait before retrying - continue; - } - } -} -int Midi::readFrom(int port){ - objAddrs[kMidiInput].push_back(this); - inputPort = open("/dev/midi1", O_RDONLY | O_NONBLOCK | O_NOCTTY); - if(inputPort < 0){ - return -1; - } else { - printf("Reading from Midi port %d\n", port); - BeagleRT_scheduleAuxiliaryTask(midiInputTask); - return 1; - } -} - -int Midi::writeTo(int port){ - objAddrs[kMidiOutput].push_back(this); - outputPort = open("/dev/midi1", O_WRONLY, 0); - if(outputPort < 0){ - return -1; - } else { - printf("Writing to Midi port %d\n", port); - BeagleRT_scheduleAuxiliaryTask(midiOutputTask); - return 1; - } -} - -int Midi::_getInput(){ - if(inputPort < 0) - return -2; - if(inputBytesReadPointer == inputBytesWritePointer){ - return -1; // no bytes to read - } - midi_byte_t inputMessage = inputBytes[inputBytesReadPointer++]; - if(inputBytesReadPointer == inputBytes.size()){ // wrap pointer - inputBytesReadPointer = 0; - } - return inputMessage; -} - -int Midi::getInput(){ - if(parserEnabled == true) { - return -3; - } - return _getInput(); -} - -MidiParser* Midi::getParser(){ - if(parserEnabled == false){ - return 0; - } - return inputParser; -}; - -int Midi::writeOutput(midi_byte_t byte){ - return writeOutput(&byte, 1); -} - -int Midi::writeOutput(midi_byte_t* bytes, unsigned int length){ - int ret = write(outputPort, bytes, length); - if(ret < 0) - return -1; - else - return 1; -} - -MidiChannelMessage::MidiChannelMessage(){}; -MidiChannelMessage::MidiChannelMessage(MidiMessageType type){ - setType(type); -}; -MidiChannelMessage::~MidiChannelMessage(){}; -MidiMessageType MidiChannelMessage::getType(){ - return _type; -}; -int MidiChannelMessage::getChannel(){ - return _channel; -}; -//int MidiChannelMessage::set(midi_byte_t* input); -// -//int MidiControlChangeMessage ::getValue(); -//int MidiControlChangeMessage::set(midi_byte_t* input){ -// channel = input[0]; -// number = input[1]; -// value = input[2]; -// return 3; -//} - -//int MidiNoteMessage::getNote(); -//int MidiNoteMessage::getVelocity(); - -//midi_byte_t MidiProgramChangeMessage::getProgram();
--- a/core/NetworkSend.cpp Tue May 03 11:04:56 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,257 +0,0 @@ -#include "NetworkSend.h" - -#ifdef USE_JUCE -#else -//initialize the static members of NetworkSend -bool NetworkSend::staticConstructed=false; -int NetworkSend::sleepTimeMs; -bool NetworkSend::threadIsExiting; -bool NetworkSend::threadRunning; -std::vector<NetworkSend*> NetworkSend::objAddrs(0); -AuxiliaryTask NetworkSend::sendDataTask=NULL; - -void sendData(){ - NetworkSend::run(); -} -void NetworkSend::staticConstructor(){ - if(staticConstructed==true) - return; - staticConstructed=true; - threadIsExiting=false; - threadRunning=false; - sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority -} -void NetworkSend::sendAllData(){ - for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){ - NetworkSend::objAddrs[n]->sendData(); - } -} -int NetworkSend::getNumInstances(){ - return objAddrs.size(); -} -void NetworkSend::startThread(){ - BeagleRT_scheduleAuxiliaryTask(sendDataTask); -} -void NetworkSend::stopThread(){ - threadIsExiting=true; -} -bool NetworkSend::threadShouldExit(){ - return(gShouldStop || threadIsExiting); -} -bool NetworkSend::isThreadRunning(){ - return threadRunning; -} -#endif /* USE_JUCE */ - -#ifdef USE_JUCE -NetworkSend::NetworkSend(const String &threadName): - Thread(threadName) -#else -NetworkSend::NetworkSend() -#endif /* USE_JUCE */ -{ - channel.buffers=NULL; - channel.doneOnTime=NULL; - channel.readyToBeSent=NULL; - channel.enabled=false; - sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable - channel.sampleCount=0; -} - -NetworkSend::~NetworkSend(){ -#ifdef USE_JUCE - stopThread(1000); -#else - stopThread(); - for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances; - if(objAddrs[n]==this){ - objAddrs.erase(objAddrs.begin()+n); - break; - } - } -#endif - dealloc(); -} -void NetworkSend::dealloc(){ - channel.enabled=false; - if(channel.buffers!=NULL){ - for(int n=0; n<channel.numBuffers; n++){ - free(channel.buffers[n]); - channel.buffers[n]=NULL; - } - free(channel.buffers); - channel.buffers=NULL; - } - free(channel.readyToBeSent); - channel.readyToBeSent=NULL; - free(channel.doneOnTime); - channel.doneOnTime=NULL; -} -void NetworkSend::cleanup(){ - dealloc(); -} - -void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){ -#ifdef USE_JUCE -#else - staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible - //because of limitations in BeagleRT_createAuxiliaryTask() - //keep track of added active instances - objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if - // an instance of NetworkSend is then declared globally: the constructor gets called, - // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without - // any destructor being called in between ... Have a look here - // http://stackoverflow.com/questions/7542054/global-vector-emptying-itself-between-calls . - // and maybe use accessor function instead of global, as was done in #1374 -#endif /* USE_JUCE */ - cleanup(); - int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength; - channel.numBuffers= (1+numSamples/channel.bufferLength) * 3; //the +1 takes the ceil() of the division - channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*)); - printf("NumBuffers: %d\n", channel.numBuffers); - if(channel.buffers==NULL) - return; - for(int n=0; n<channel.numBuffers; n++){ - channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float)); - if(channel.buffers[n]==NULL) - return; - } - channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool)); - channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool)); - for(int n=0; n<channel.numBuffers; n++){ - channel.readyToBeSent[n]=false; - channel.doneOnTime[n]=true; - } - if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL) - return; - channel.writePointer=channel.headerLength; - channel.writeBuffer=0; - channel.readBuffer=0; - setChannelNumber(aChannelNumber); - setPort(aPort); //TODO: check for the return value - setServer(aServer); //TODO: check for the return value - printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate); - channel.enabled=true; -} - -void NetworkSend::log(float value){ //TODO: add a vectorized version of this method - if(channel.enabled==false) - return; - if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ... - channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such -// printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); - channel.writePointer=channel.headerLength; //reset the writePointer - channel.writeBuffer=(channel.writeBuffer+1); //switch buffer - if(channel.writeBuffer==channel.numBuffers) // and wrap it - channel.writeBuffer=0; -// printf("WriteBuffer:%d\n", channel.writeBuffer); - if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ... - printf("NetworkSend buffer underrun. timestamp: %d :-{\n", - (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); - } - channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag -#ifdef USE_JUCE - if(isThreadRunning()==false){ - startThread(10); - } -#else - if(isThreadRunning()==false){ - startThread(); - } -#endif /* USE_JUCE */ - } - if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header - //set dynamic header values here. Static values are set in setup() and setChannelNumber(). - channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp - channel.sampleCount++; - //add here more header fields - } - channel.buffers[channel.writeBuffer][channel.writePointer++]=value; -// sampleCount++; -}; - -void NetworkSend::setServer(const char *aServer){ -#ifdef USE_JUCE - remoteHostname=String::fromUTF8(aServer); -#else - udpClient.setServer(aServer); -#endif /* USE_JUCE */ -} -void NetworkSend::setPort(int aPort){ -#ifdef USE_JUCE - remotePortNumber=aPort; -#else - udpClient.setPort(aPort); -#endif /* USE_JUCE */ -} - -void NetworkSend::setChannelNumber(int aChannelNumber){ - channel.channelNumber=aChannelNumber; - for(int n=0; n < channel.numBuffers; n++){ //initialize the header - channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber; - //add here more static header fields - } -}; -int NetworkSend::getChannelNumber(){ - return channel.channelNumber; -}; - -int NetworkSend::getTimestamp(){ - return channel.buffers[channel.readBuffer][channel.headerTimestampIndex]; -} - -void NetworkSend::sendData(){ - if(channel.enabled==false) - return; - while(channel.readyToBeSent[channel.readBuffer]==true){ - channel.readyToBeSent[channel.readBuffer]=false; - void* sourceBuffer=channel.buffers[channel.readBuffer]; -// printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]); -// printf("ReadBuffer:%d\n", channel.readBuffer); - unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float); - //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?) -#ifdef USE_JUCE - if(1==udpClient.waitUntilReady(0, 5)){ - udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend); - channel.doneOnTime[channel.readBuffer]=true; - // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]); - } else { - // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]); - } -#else - udpClient.send(sourceBuffer, numBytesToSend); -// printf("sent sourceBuffer: %d, channel: %f, timestamp: %f\n", channel.readBuffer, channel.buffers[channel.readBuffer][0], -// channel.buffers[channel.readBuffer][1]); - channel.doneOnTime[channel.readBuffer]=true; -#endif /* USE_JUCE */ - channel.readBuffer++; - if(channel.readBuffer==channel.numBuffers) - channel.readBuffer=0; - } -} - -void NetworkSend::run(){ -#ifdef USE_JUCE - // std::chrono::high_resolution_clock::time_point t1; - // std::chrono::high_resolution_clock::time_point t2; - // std::chrono::high_resolution_clock::time_point t3; - while(threadShouldExit()==false){ - // t3 = std::chrono::high_resolution_clock::now(); - // t1 = std::chrono::high_resolution_clock::now(); - sendData(); - // t2 = std::chrono::high_resolution_clock::now(); - // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count(); - // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count(); - // if(duration2>0) - // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n"; - usleep(1000); - } -#else - threadRunning=true; - while(threadShouldExit()==false){ - sendAllData(); - usleep(sleepTimeMs*1000); - } - threadRunning=false; -#endif -}
--- a/core/ReceiveAudioThread.cpp Tue May 03 11:04:56 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,313 +0,0 @@ -#include <ReceiveAudioThread.h> - -#ifdef USE_JUCE -#else -//initialise static members -bool ReceiveAudioThread::staticConstructed=false; -AuxiliaryTask ReceiveAudioThread::receiveDataTask=NULL; -std::vector<ReceiveAudioThread *> ReceiveAudioThread::objAddrs(0); -bool ReceiveAudioThread::threadRunning; -bool ReceiveAudioThread::threadIsExiting; -int ReceiveAudioThread::sleepTime; - -void receiveData(){ - ReceiveAudioThread::run(); -} -void ReceiveAudioThread::staticConstructor(){ - if(staticConstructed==true) - return; - staticConstructed=true; - threadIsExiting=false; - receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities -} -#endif /* USE_JUCE */ - -void ReceiveAudioThread::dealloc(){ - free(buffer); - buffer=NULL; - free(stackBuffer); - stackBuffer=NULL; -} -void ReceiveAudioThread::wrapWritePointer(){ - //this is not quite a simple wrapping as you would do in a circular buffer, - //as there is no guarantee the buffer will be full at all times, given that there must alwas be enough space at the end of it - //to hold a full payload - // lastValidPointer indicates the last pointer in the buffer containing valid data - // - if(writePointer+payloadLength+headerLength>bufferLength){ //if we are going to exceed the length of the buffer with the next reading - // lastValidPointer=writePointer+headerLength; //remember where the last valid data are - // for(int n=headerLength;n<lastValidPointer; n++){ - // fprintf(fd2, "%f\n",buffer[n]); //DEBUG - // } - writePointer=0; //and reset to beginning of the buffer - } -} -void ReceiveAudioThread::pushPayload(int startIndex){ //backup the payload samples that will be overwritten by the new header - for(int n=0; n<headerLength; n++){ - stackBuffer[n]=buffer[startIndex+n]; - } -} -void ReceiveAudioThread::popPayload(int startIndex){ - for(int n=0; n<headerLength; n++){ - buffer[startIndex+n]=stackBuffer[n]; - } -} - -int ReceiveAudioThread::readUdpToBuffer(){ - - if(listening==false || bufferReady==false) - return 0; - if(writePointer<0) - return 0; - if(socket.waitUntilReady(true, waitForSocketTime)){// TODO: if waitForSocketTime here is >>5, the - // destructor (always or sometimes) never actually gets called, despite run() returns ...see issue #1381 -#ifdef USE_JUCE -#else - lastTime=rt_timer_read(); -// rt_printf("lastTimeread= %llu\n", lastTime); -#endif /* USE_JUCE */ - pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 - //read header+payload - int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting. - //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header - if(numBytes<0){ - printf("error numBytes1\n"); - return -3; //TODO: something went wrong, you have to discard the rest of the packet! - } - if(numBytes==0){//TODO: this should not happen unless you actually receive a packet of size zero (is it at all possible?) -// printf("received 0 bytes\n"); - return 0; - } - if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead) - printf("error numBytes2: %d\n", numBytes); - return -4; //TODO: something went wrong, less bytes than expected in the payload. - } - if(channel!=(int)buffer[writePointer]){ -// printf("I am channel %d, but I received data for channel %d\n", channel, (int)buffer[writePointer]); - return -5; - } - if(buffer[writePointer+1]!=timestamp+1) - printf("missing a timestamp: %d\n",timestamp+1); - timestamp=buffer[writePointer+1]; -// rt_printf("Received a message of length %d, it was on channel %d and timestamp %d\n", numBytes, (int)buffer[writePointer], (int)buffer[writePointer+1]); - - popPayload(writePointer); //restore headerLength payload samples. This could be skipped if writePointer==0 - //even though we just wrote (payloadLength+headerLength) samples in the buffer, - //we only increment by payloadLength. This way, next time a socket.read is performed, we will - //backup the last headerLength samples that we just wrote and we will overwrite them with - //the header from the new read. After parsing the header we will then restore the backed up samples. - //This way we guarantee that, apart from the first headerLength samples, buffer is a circular buffer! - writePointer+=payloadLength; - wrapWritePointer(); - return numBytes; - } - return 0; //timeout occurred -} -//USE_JUCE Thread(threadName), -#ifdef USE_JUCE -ReceiveAudioThread::ReceiveAudioThread(const String &threadName) : - Thread(threadName), -#else -ReceiveAudioThread::ReceiveAudioThread() : -#endif /* USE_JUCE */ - socket(0), - listening(false), - bufferReady(false), - buffer(NULL), - stackBuffer(NULL), - bufferLength(0), - lastValidPointer(0), - waitForSocketTime(5), -#ifdef USE_JUCE - threadPriority(5) -#else - threadPriority(88) -#endif /* USE_JUCE */ -{}; -ReceiveAudioThread::~ReceiveAudioThread(){ -#ifdef USE_JUCE - stopThread(1000); -#else - stopThread(); - while(threadRunning){ - usleep(sleepTime*2); //wait for thread to stop - std::cout<< "Waiting for receiveAudioTask to stop" << std::endl; - } -#endif /* USE_JUCE */ - //TODO: check if thread stopped, otherwise kill it before dealloc - dealloc(); -} -void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){ - dealloc(); -#ifdef USE_JUCE -#else - staticConstructor(); - objAddrs.push_back(this);//TODO: this line should be in the constructor -#endif /* USE_JUCE */ - bindToPort(aPort); - channel=aChannel; - printf("Channel %d is receiving on port %d\n",aChannel, aPort); - // fd=fopen("output.m","w"); //DEBUG - // fprintf(fd,"var=["); //DEBUG - headerLength=2; - payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending. - bufferLength=10 * std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... - //We keep a headerLength padding at the beginning of the array to allow full reads from the socket - buffer=(float*)malloc(sizeof(float)*bufferLength); - if(buffer==NULL) // something wrong - return; - lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength; - memset(buffer,0,bufferLength*sizeof(float)); - stackBuffer=(float*)malloc(sizeof(float)*headerLength); - if(stackBuffer==NULL) // something wrong - return; - bufferReady=true; - bytesToRead=sizeof(float)*(payloadLength + headerLength); - writePointer=-1; - readPointer=0; - sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently - timestamp=0; -#ifdef USE_JUCE - startThread(threadPriority); -#else - //TODO: the thread cannot be started here at the moment because init() is called in setup(), where tasks cannot be scheduled -#endif /* USE_JUCE */ -} - -void ReceiveAudioThread::bindToPort(int aPort){ - listening=socket.bindToPort(aPort); -#ifdef USE_JUCE -#else - if(listening==false) //this condition is valid also for USE_JUCE, but we do not printf in USE_JUCE - printf("Could not bind to port %d\n",aPort); -#endif /* USE_JUCE */ -} -bool ReceiveAudioThread::isListening(){ - return listening; -} -float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples - //TODO: make it return the number of samples actually available at the specified location - if(isListening()==false || length>bufferLength) - return NULL; - readPointer+=length; - if(readPointer>lastValidPointer){ - readPointer=headerLength; - } - return buffer+(int)readPointer; -}; -int ReceiveAudioThread::getSamplesSrc(float *destination, int length, - float samplingRateRatio, int numChannelsInDestination, - int channelToWriteTo) -{ - if (!(samplingRateRatio>0 && samplingRateRatio<=2)) - return -2; - if(isListening()==false) - return -1; - static int numCalls=0; - if(writePointer<0 /*|| (numCalls&16383)==0*/){ //if writePointer has not been initalized yet ... -#ifdef USE_JUCE -#else //debug - readPointer = headerLength; -#endif /* USE_JUCE */ - // this cumbersome line means: start writing at a position which is as close as possible - // to the center of the buffer, but still is aligned to (payloadLength*x)+headerLength - // thus allowing buffering to allow clock drift to go either way - writePointer = headerLength + ((bufferLength-headerLength)/payloadLength/2)*payloadLength; - // This will help keeping them in sync. - //TODO: handle what happens when the remote stream is interrupted and then restarted - printf("write pointer inited at: %d\n", writePointer); - } - numCalls++; - if(length>lastValidPointer) { - //not enough samples available, we fill the buffer with what is available, but the destination buffer will not be filled completely - //at this very moment the other thread might be writing at most one payload into the buffer. - //To avoid a race condition, we need to let alone the buffer where we are currently writing - //as writing the payload also temporarily overwrites the previous headerLength samples, we need to account for them as well - //TODO: This assumes that the writePointer and readPointer do not drift. When doing clock synchronization we will find out that it is not true! - length=lastValidPointer-payloadLength-headerLength; - if(length<0) //no samples available at all! - return 0; - } - for(int n=0; n<length; n++){ - destination[n*numChannelsInDestination+channelToWriteTo]=buffer[(int)(0.5+readPointer)];//simple ZOH non-interpolation (nearest neighbour) - // fprintf(fd,"%f, %d, %f;\n",readPointer,writePointer,destination[n]); //DEBUG - readPointer+=samplingRateRatio; - if((int)(0.5+readPointer)>=lastValidPointer){ - readPointer=readPointer-lastValidPointer+headerLength; - } - } - return length; -} -int ReceiveAudioThread::getSamplesSrc(float *destination, int length, float samplingRateRatio){ - return getSamplesSrc(destination, length, samplingRateRatio, 1,0); - // TODO: rewriting this so that it does not call the override method we can save a multiply and add - // for each sample. -} -bool ReceiveAudioThread::isBufferReady(){ - return bufferReady; -} -#ifdef USE_JUCE -#else -void ReceiveAudioThread::startThread(){ - BeagleRT_scheduleAuxiliaryTask(receiveDataTask); -} -void ReceiveAudioThread::stopThread(){ - threadIsExiting=true; -} -bool ReceiveAudioThread::threadShouldExit(){ - return(gShouldStop || threadIsExiting ); -} -RTIME ReceiveAudioThread::getLastTime(){ - return lastTime; -} -#endif /* USE_JUCE */ -int ReceiveAudioThread::getTimestamp(){ - return timestamp; -} -void ReceiveAudioThread::run(){ - // fd2=fopen("buffer.m","w"); //DEBUG - // fprintf(fd2, "buf=["); //DEBUG - threadRunning=true; - int maxCount=10; - int count=0; - // Clean the socket from anything that is currently in it. -#ifdef USE_JUCE - // this is borrowed from BeagleRT's UdpServer class. - int n; - do { - float waste; - if(socket.waitUntilReady(true, 0)==0) - break; - n=socket.read((void*)&waste, sizeof(float), false); - count++; - if(n<0){ - printf("error\n"); - break; - } - printf("n: %d\n",n); - } while (n>0 && (maxCount<=0 || count<maxCount)); -#else - for(unsigned int n=0; n<objAddrs.size(); n++){ - count=objAddrs[n]->socket.empty(maxCount); - } -#endif /* USE_JUCE */ - printf("socket emptied with %d reads\n", count); - - while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting -#ifdef USE_JUCE - readUdpToBuffer(); // read into the oldBuffer - sleep(sleepTime); -#else - for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){ - ReceiveAudioThread::objAddrs[n]->readUdpToBuffer(); - } - usleep(sleepTime); //TODO: use rt_task_sleep instead -#endif /* USE_JUCE */ - } - threadRunning=false; - printf("Thread is not running \n"); - // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG - // fclose(fd);//DEBUG - // fprintf(fd2,"];");//DEBUG - // fclose(fd2); //DEBUG -}
--- a/core/WriteFile.cpp Tue May 03 11:04:56 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,296 +0,0 @@ -/* - * WriteFile.cpp - * - * Created on: 5 Oct 2015 - * Author: giulio - */ - -#include "WriteFile.h" -//initialise static members -bool WriteFile::staticConstructed=false; -AuxiliaryTask WriteFile::writeAllFilesTask=NULL; -std::vector<WriteFile *> WriteFile::objAddrs(0); -bool WriteFile::threadRunning; -bool WriteFile::threadIsExiting; -int WriteFile::sleepTimeMs; - -void WriteFile::staticConstructor(){ - if(staticConstructed==true) - return; - staticConstructed=true; - threadIsExiting=false; - threadRunning=false; - writeAllFilesTask = BeagleRT_createAuxiliaryTask(WriteFile::run, 60, "writeAllFilesTask"); -} - -WriteFile::WriteFile(){ - buffer = NULL; - format = NULL; - header = NULL; - footer = NULL; - stringBuffer = NULL; -}; - -void WriteFile::init(const char* filename){ //if you do not call this before using the object, results are undefined - file = fopen(filename, "w"); - variableOpen = false; - lineLength = 0; - setEcho(false); - bufferLength = 0; - textReadPointer = 0; - binaryReadPointer = 0; - writePointer = 0; - sleepTimeMs = 1; - stringBufferLength = 1000; - stringBuffer = (char*)malloc(sizeof(char) * (stringBufferLength)); - setHeader("variable=[\n"); - setFooter("];\n"); - staticConstructor(); //TODO: this line should be in the constructor, but cannot be because of a bug in BeagleRT - objAddrs.push_back(this); - echoedLines = 0; - echoPeriod = 1; -} - -void WriteFile::setFileType(WriteFileType newFileType){ - fileType = newFileType; - if(fileType == kBinary) - setLineLength(1); -} -void WriteFile::setEcho(bool newEcho){ - echo=newEcho; -} -void WriteFile::setEchoInterval(int newEchoPeriod){ - echoPeriod = newEchoPeriod; - if(echoPeriod != 0) - echo = true; - else - echo = false; -} -void WriteFile::print(const char* string){ - if(echo == true){ - echoedLines++; - if (echoedLines >= echoPeriod){ - echoedLines = 0; - printf("%s", string); - } - } - if(file != NULL && fileType != kBinary){ - fprintf(file, "%s", string); - } -} - -void WriteFile::writeLine(){ - if(echo == true || fileType != kBinary){ - int stringBufferPointer = 0; - for(unsigned int n = 0; n < formatTokens.size(); n++){ - int numOfCharsWritten = snprintf( &stringBuffer[stringBufferPointer], stringBufferLength - stringBufferPointer, - formatTokens[n], buffer[textReadPointer]); - stringBufferPointer += numOfCharsWritten; - textReadPointer++; - if(textReadPointer >= bufferLength){ - textReadPointer -= bufferLength; - } - } - print(stringBuffer); - } -} - -void WriteFile::setLineLength(int newLineLength){ - lineLength=newLineLength; - free(buffer); - bufferLength = lineLength * (int)1e7; // circular buffer of length 1e7 lineLenghts - buffer = (float*)malloc(sizeof(float) * bufferLength); -} - -void WriteFile::log(float value){ - if(fileType != kBinary && (format == NULL || buffer == NULL)) - return; - buffer[writePointer] = value; - writePointer++; - if(writePointer == bufferLength){ - writePointer = 0; - } - if((fileType == kText && writePointer == textReadPointer - 1) || - (fileType == kBinary && writePointer == binaryReadPointer - 1)){ - fprintf(stderr, "%d %d WriteFile: pointers crossed, you should probably slow down your writing to disk\n", writePointer, binaryReadPointer); - } - if(threadRunning == false){ - startThread(); - } -} - -void WriteFile::log(float* array, int length){ - for(int n = 0; n < length; n++){ - log(array[n]); - } -} - -WriteFile::~WriteFile() { - free(format); - free(buffer); - free(header); - free(footer); - free(stringBuffer); -} - -void WriteFile::setFormat(const char* newFormat){ - allocateAndCopyString(newFormat, &format); - for(unsigned int n = 0; n < formatTokens.size(); n++){ - free(formatTokens[n]); - } - formatTokens.clear(); - int tokenStart = 0; - bool firstToken = true; - for(unsigned int n = 0; n < strlen(format)+1; n++){ - if(format[n] == '%' && format[n + 1] == '%'){ - n++; - } else if (format[n] == '%' || format[n] == 0){ - if(firstToken == true){ - firstToken = false; - continue; - } - char* string; - unsigned int tokenLength = n - tokenStart; - if(tokenLength == 0) - continue; - string = (char*)malloc((1+tokenLength)*sizeof(char)); - for(unsigned int i = 0; i < tokenLength; i++){ - string[i] = format[tokenStart + i]; - } - string[tokenLength] = 0; - formatTokens.push_back(string); - tokenStart = n; - } - } - setLineLength(formatTokens.size()); -} - -int WriteFile::getNumInstances(){ - return objAddrs.size(); -} - -void WriteFile::startThread(){ - BeagleRT_scheduleAuxiliaryTask(writeAllFilesTask); -} - -void WriteFile::stopThread(){ - threadIsExiting=true; -} - -bool WriteFile::threadShouldExit(){ - return(gShouldStop || threadIsExiting); -} - -bool WriteFile::isThreadRunning(){ - return threadRunning; -} - -float WriteFile::getBufferStatus(){ - return 1-getOffset()/(float)bufferLength; -} - -int WriteFile::getOffsetFromPointer(int aReadPointer){ - int offset = writePointer - aReadPointer; - if( offset < 0) - offset += bufferLength; - return offset; -} -int WriteFile::getOffset(){ - if(fileType == kBinary){ - return getOffsetFromPointer(binaryReadPointer); - } - else{ - return getOffsetFromPointer(textReadPointer); - } -} - -void WriteFile::writeOutput(bool flush){ - while((echo == true || fileType == kText) && getOffsetFromPointer(textReadPointer) >= lineLength){ //if there is less than one line worth of data to write, skip over. - // So we make sure we only write full lines - writeLine(); - } - if(fileType == kBinary){ - int numBinaryElementsToWriteAtOnce = 3*(int)1e5; - while(getOffsetFromPointer(binaryReadPointer) > numBinaryElementsToWriteAtOnce){ - int elementsToEndOfBuffer = bufferLength - binaryReadPointer; - int numberElementsToWrite = numBinaryElementsToWriteAtOnce < elementsToEndOfBuffer ? - numBinaryElementsToWriteAtOnce : elementsToEndOfBuffer; - numberElementsToWrite = fwrite(&(buffer[binaryReadPointer]), sizeof(float), numberElementsToWrite, file); - binaryReadPointer += numberElementsToWrite; - if(binaryReadPointer >= bufferLength){ - binaryReadPointer = 0; - } - } - if(flush == true){ // flush all the buffer to the file - while(getOffsetFromPointer(binaryReadPointer) != 0){ - binaryReadPointer += fwrite(&(buffer[binaryReadPointer]), sizeof(float), 1, file); - if(binaryReadPointer >= bufferLength){ - binaryReadPointer = 0; - } - } - } - } -} - -void WriteFile::writeAllOutputs(bool flush){ - for(unsigned int n = 0; n < objAddrs.size(); n++){ - objAddrs[n] -> writeOutput(flush); - } -} - -void WriteFile::writeAllHeaders(){ - for(unsigned int n = 0; n < objAddrs.size(); n++){ - objAddrs[n] -> writeHeader(); - } -} - -void WriteFile::writeAllFooters(){ - for(unsigned int n = 0; n < objAddrs.size(); n++){ - objAddrs[n] -> writeFooter(); - } -} - -void WriteFile::writeHeader(){ - print(header); -} - -void WriteFile::writeFooter(){ - print(footer); - fflush(file); - fclose(file); -} - -void WriteFile::setHeader(const char* newHeader){ - allocateAndCopyString(newHeader, &header); - sanitizeString(header); -} - -void WriteFile::setFooter(const char* newFooter){ - allocateAndCopyString(newFooter, &footer); -} - -void WriteFile::sanitizeString(char* string){ - for(int unsigned n = 0; n < strlen(string); n++){ //purge %'s from the string - if(string[n] == '%'){ - string[n] = ' '; - } - } -} - -void WriteFile::run(){ - threadRunning = true; - writeAllHeaders(); - while(threadShouldExit()==false){ - writeAllOutputs(false); - usleep(sleepTimeMs*1000); - } - writeAllOutputs(true); - writeAllFooters(); // when ctrl-c is pressed, the last line is closed and the file is closed - threadRunning = false; -} - -void WriteFile::allocateAndCopyString(const char* source, char** destination){ - free(*destination); - *destination = (char*)malloc(sizeof(char) * (strlen(source) + 1)); - strcpy(*destination, source); -}