view core/NetworkSend.cpp @ 132:e24c531220ee scope-refactoring

Added some sort of synchronization, not working great though
author Giulio Moro <giuliomoro@yahoo.it>
date Thu, 27 Aug 2015 01:42:04 +0100
parents ff28e56e5b7e
children
line wrap: on
line source
#include "NetworkSend.h"

#ifdef USE_JUCE
#else
//initialize the static members of NetworkSend
bool NetworkSend::staticConstructed=false;
int NetworkSend::sleepTimeMs;
bool NetworkSend::threadIsExiting;
bool NetworkSend::threadRunning;
std::vector<NetworkSend*> NetworkSend::objAddrs(0);
AuxiliaryTask NetworkSend::sendDataTask=NULL;

void sendData(){
	NetworkSend::run();
}
void NetworkSend::staticConstructor(){
	if(staticConstructed==true)
		return;
	staticConstructed=true;
	threadIsExiting=false;
	threadRunning=false;
	sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority
}
void NetworkSend::sendAllData(){
	for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){
		NetworkSend::objAddrs[n]->sendData();
	}
}
int NetworkSend::getNumInstances(){
	return objAddrs.size();
}
void NetworkSend::startThread(){
	BeagleRT_scheduleAuxiliaryTask(sendDataTask);
}
void NetworkSend::stopThread(){
	threadIsExiting=true;
}
bool NetworkSend::threadShouldExit(){
	return(gShouldStop || threadIsExiting);
}
bool NetworkSend::isThreadRunning(){
	return threadRunning;
}
#endif /* USE_JUCE */

#ifdef USE_JUCE
NetworkSend::NetworkSend(const String &threadName):
		Thread(threadName)
#else
NetworkSend::NetworkSend()
#endif /* USE_JUCE */
{
	channel.buffers=NULL;
	channel.doneOnTime=NULL;
	channel.readyToBeSent=NULL;
	channel.enabled=false;
	sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable
	channel.sampleCount=0;
}

NetworkSend::~NetworkSend(){
#ifdef USE_JUCE
	stopThread(1000);
#else
	stopThread();
	for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances;
		if(objAddrs[n]==this){
			objAddrs.erase(objAddrs.begin()+n);
			break;
		}
	}
#endif
	dealloc();
}
void NetworkSend::dealloc(){
	channel.enabled=false;
	if(channel.buffers!=NULL){
		for(int n=0; n<channel.numBuffers; n++){
			free(channel.buffers[n]);
			channel.buffers[n]=NULL;
		}
		free(channel.buffers);
		channel.buffers=NULL;
	}
	free(channel.readyToBeSent);
	channel.readyToBeSent=NULL;
	free(channel.doneOnTime);
	channel.doneOnTime=NULL;
}
void NetworkSend::cleanup(){
	dealloc();
}

void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){
#ifdef USE_JUCE
#else
	staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible
						//because of limitations in BeagleRT_createAuxiliaryTask()
	//keep track of added active instances
	objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if
	// an instance of NetworkSend is then declared globally: the constructor gets called,
	// and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without
	// any destructor being called in between ...
#endif /* USE_JUCE */
	cleanup();
	int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength;
	channel.numBuffers= 1+numSamples/channel.bufferLength; //the +1 takes the ceil() of the division
	channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*));
	printf("NumBuffers: %d\n", channel.numBuffers);
	if(channel.buffers==NULL)
		return;
	for(int n=0; n<channel.numBuffers; n++){
		channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float));
		if(channel.buffers[n]==NULL)
			return;
	}
	channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool));
	channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool));
	for(int n=0; n<channel.numBuffers; n++){
		channel.readyToBeSent[n]=false;
		channel.doneOnTime[n]=true;
	}
	if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL)
		return;
	channel.writePointer=0;
	channel.writeBuffer=0;
	channel.readBuffer=0;
	setChannelNumber(aChannelNumber);
	setPort(aPort); //TODO: check for the return value
	setServer(aServer); //TODO: check for the return value
	printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate);
	channel.enabled=true;
}

void NetworkSend::log(float value){ //TODO: add a vectorized version of this method
	if(channel.enabled==false)
		return;
	if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ...
		channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such
//		printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
		channel.writePointer=channel.headerLength; //reset the writePointer
		channel.writeBuffer=(channel.writeBuffer+1); //switch buffer
		if(channel.writeBuffer==channel.numBuffers) // and wrap it
			channel.writeBuffer=0;
//		printf("WriteBuffer:%d\n", channel.writeBuffer);
		if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ...
			printf("Network buffer underrun. timestamp: %d :-{\n",
					(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]);
		}
		channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag
#ifdef USE_JUCE
		if(isThreadRunning()==false){
			startThread(10);
		}
#else
		if(isThreadRunning()==false){
			startThread();
		}
#endif /* USE_JUCE */
	}
	if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header
		//set dynamic header values here. Static values are set in setup() and setChannelNumber().
		channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp
		channel.sampleCount++;
		//add here more header fields
	}
    channel.buffers[channel.writeBuffer][channel.writePointer++]=value;
//	sampleCount++;
};

void NetworkSend::setServer(const char *aServer){
#ifdef USE_JUCE
	remoteHostname=String::fromUTF8(aServer);
#else
	udpClient.setServer(aServer);
#endif /* USE_JUCE */
}
void NetworkSend::setPort(int aPort){
#ifdef USE_JUCE
	remotePortNumber=aPort;
#else
	udpClient.setPort(aPort);
#endif /* USE_JUCE */
}

void NetworkSend::setChannelNumber(int aChannelNumber){
	channel.channelNumber=aChannelNumber;
	for(int n=0; n<channel.numBuffers; n++){ //initialize the header
		channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber;
		//add here more static header fields
	}
};
int NetworkSend::getChannelNumber(){
	return channel.channelNumber;
};

int NetworkSend::getTimestamp(){
	return channel.buffers[channel.readBuffer][channel.headerTimestampIndex];
}

void NetworkSend::sendData(){
	if(channel.enabled==false)
		return;
	while(channel.readyToBeSent[channel.readBuffer]==true){
		channel.readyToBeSent[channel.readBuffer]=false;
		void* sourceBuffer=channel.buffers[channel.readBuffer];
//		printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]);
//		printf("ReadBuffer:%d\n", channel.readBuffer);
		unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float);
		//TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?)
#ifdef USE_JUCE
		if(1==udpClient.waitUntilReady(0, 5)){
			udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend);
			channel.doneOnTime[channel.readBuffer]=true;
			//  printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
		} else {
			//  printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]);
		}
#else
		udpClient.send(sourceBuffer, numBytesToSend);
		channel.doneOnTime[channel.readBuffer]=true;
#endif /* USE_JUCE */
		channel.readBuffer++;
		if(channel.readBuffer==channel.numBuffers)
			channel.readBuffer=0;
	}
}

void NetworkSend::run(){
#ifdef USE_JUCE
	//  std::chrono::high_resolution_clock::time_point t1;
    //  std::chrono::high_resolution_clock::time_point t2;
    //  std::chrono::high_resolution_clock::time_point t3;
	while(threadShouldExit()==false){
		//  t3 = std::chrono::high_resolution_clock::now();
		//  t1 = std::chrono::high_resolution_clock::now();
		sendData();
		//  t2 = std::chrono::high_resolution_clock::now();
		//  auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count();
		//  auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
		//  if(duration2>0)
			//  std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n"; 
		sleep(1);
	}
#else
	threadRunning=true;
	while(threadShouldExit()==false){
		sendAllData();
		usleep(sleepTimeMs*1000);
	}
	threadRunning=false;
#endif
}
#ifdef USE_JUCE
#else
Scope::Scope(int aNumChannels):
		channels(aNumChannels)
{};
Scope::~Scope(){};

void Scope::log(int channel, float value){
	if(channel>=getNumChannels()) //TODO: assert this
		return;
	channels[channel].log(value);
}

void Scope::setup(){
	setup(44100, 9999, "127.0.0.1");
}

void Scope::setup(float sampleRate, int aPort, const char* aServer){
	for(int n=0; n<getNumChannels(); n++){
		channels[n].setup(sampleRate, 128, n, aPort, aServer); //TODO: replace 128 with the actual block size
	}
}

void Scope::setPort(int port){
	for(int n=0; n<getNumChannels(); n++){
		channels[n].setPort(port);
	}
}
void Scope::setPort(int channel, int aPort){
	channels[channel].setPort(aPort);
	printf("Channel %d is now sending to port %d\n", channel, aPort);
}

int Scope::getNumChannels(){
	return channels.size();
}

void Scope::sendData(){
	NetworkSend::sendAllData();
}
#endif