view core/ClockSync.cpp @ 152:8f98b32d0e23 ClockSync

Last commit on this branch for a while. Overall not very succesful
author Giulio Moro <giuliomoro@yahoo.it>
date Mon, 05 Oct 2015 13:06:14 +0100
parents e9c9404e3d1f
children
line wrap: on
line source
#include "ClockSync.h"

void ClockSync::setVirtualClock(VirtualClock &aVirtualClock){
	virtualClock=&aVirtualClock;
}
Pid* gClockSyncPid;
void ClockSync::init(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){
	setVirtualClock(aVirtualClock);
	slave=thisIsSlave;
	setPort(aPort);
	//  isSlave() ? client.setServer("127.0.0.1") : client.setServer("127.0.0.1");
	isSlave() ? client.setServer("192.168.7.1") : client.setServer("192.168.7.2");
	bufferLength=kSyncMessageLength;
	resetTs();
	receiveLoopSleepUs=100;
	receiveLoopTimeout=1e5;
	movingAverage.setLength(31);
	expectedClockSyncType=isSlave() ? kSync : kNone;
	gClockSyncPid = &pid;
}
void ClockSync::resetTs(){
	T1=-1;
	T1p=-1;
	T2=-1;
	T2p=-1;
}
bool ClockSync::areTsValid(){
	return T1>0 && T1p>0 && T2>0 && T2p>0;
}
ClockSync::ClockSync(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){
	init(thisIsSlave, aPort, aVirtualClock);
}
void* ClockSync::getBuffer(){
	return buffer;
}
bool ClockSync::isSlave(){
	return slave;
}
bool ClockSync::isMaster(){
	return !slave;
}
int ClockSync::getType(){
	return ((int*)buffer)[0];
}
myClock_t ClockSync::getTimestamp(){
	return *((myClock_t*)((char*)buffer+sizeof(int)));
}
void ClockSync::setType(int clockSyncType){
	((int*)buffer)[0]=clockSyncType;
}
void ClockSync::setTimestamp(myClock_t timestamp){
//	printf("setting timestamp: %lld\n", timestamp);
	((myClock_t*)((char*)buffer+sizeof(int)))[0]=timestamp;
}
void ClockSync::print(){
	 //printf("type: %d, timestamp: %lld\n",*((int*)buffer),*((myClock_t*)(((int*)buffer)+1)));
}
void ClockSync::setPort(int aPort){
	if(aPort>0){
		int inPort=isSlave() ? aPort : aPort+1;
		int outPort=isSlave() ? aPort+1: aPort;
		server.bindToPort(inPort);
		client.setPort(outPort);
		//printf("Receiving on port %d\n", inPort);
		//printf("Sending to port %d\n", outPort);
	}
}
/** 
 * sends a clockSync without blocking, checks results and returns the timestamp
 * immediately after the clockSync has been sent or -1 if there was an error or timeout expired.
*/
myClock_t ClockSync::send(){
	//  print();
	int ret;
	ret=client.waitUntilReady(false, isSlave() ? 110 : 5);
	if(ret<=0){ //don't retry
		return -1;
	}
	ret=client.send(buffer, bufferLength);
	myClock_t timestamp=(myClock_t)virtualClock->getNow();
	if(ret<0){
	//if there was an error while sending, don't retry
		return -1;
	}
	return timestamp; //get the accurate time *after* you sent the sync clockSync
}
/** 
 * receives a clockSync without blocking, checks results and returns the timestamp
 * immediately after the clockSync has been received, or -1 if there was an error 
 * or 0 if timeout expired.
*/
myClock_t ClockSync::receive(){
	int ret;
	ret=server.waitUntilReady(true, 0);
	if(ret<=0){ //don't retry
		return 0;
	}
	ret=server.read(buffer, bufferLength, false);
	myClock_t timestamp=(myClock_t)virtualClock->getNow();
	if(timestamp==0){
		//printf("The virtualClock period is <=0\n");
		return -3;
	}
	if(ret==-1){
		//if there was an error while receiving, don't retry
		return -1;
	}
	if(ret!=bufferLength){
		//printf("Received a clockSync of the wrong size: %d\n", ret);
		return -2;
	}
	//  print();
	return timestamp; //get the accurate time *after* you sent the sync clockSync
}

int ClockSync::masterSendSync(){
	//let's send a sync clockSync!
	//printf("Sending a sync clockSync\n");
	setType(kSync);
	setTimestamp(-1);//do not care about sending the timestamp, a more accurate one will be sent in the follow up
	localTimestamp=send();
	if(localTimestamp<0){
		//printf("Could not send sync clockSync\n");
		return -1; //error, don't retry, start over
	}
	//let's send a followUp
	//printf("sent a sync clockSync\n");
	setType(kFollowUp);
	setTimestamp(localTimestamp);
	if(localTimestamp<0){
		//printf("Could not send followup clockSync\n");
		return -2; //error, don't retry, start over
	}
	int ret=send();
	if(ret<0){
		//printf("Error while sending followup\n");
		return -3;
	}
	//printf("sent a followUp clockSync\n");
	expectedClockSyncType=kDelayReq;
	return 1;
}
#ifdef USE_JUCE
#define NOTGSHOULDSTOP 1
#else
extern bool gShouldStop;
#define NOTGSHOULDSTOP (!gShouldStop)
#endif /* USE_JUCE */
int ClockSync::receiveLoop(){
	int receiveLoopElapsed=0;
	while( NOTGSHOULDSTOP && (isSlave() || (receiveLoopElapsed<receiveLoopTimeout))){ //when slave, does not timeout!
		receiveLoopElapsed+=receiveLoopSleepUs;
		usleep(receiveLoopSleepUs); //how often to check for new clockSyncs;
		//   //printf("waiting for clockSyncs\n");
		localTimestamp=receive();
		if(localTimestamp<=0){
			if(localTimestamp==0){
//				  printf("Socket not ready to be read: %lld\n", localTimestamp);
			}
			else if(localTimestamp==-1){
				printf("Error while receiving: %lld\n", localTimestamp);
			}
			else if(localTimestamp==-2){
				  printf("Wrong size of the received clockSync: %lld\n", localTimestamp);
			}
			continue ; //keep waiting
		}
		clockSyncType=getType();
		clockSyncTimestamp=getTimestamp();
		if(clockSyncType!=expectedClockSyncType){
			//printf("Wrong clockSync type: %d, expected: %d\n",clockSyncType, expectedClockSyncType);
			return -2; //start over
		}
//		printf("Received clockSync type: %d, clockSyncTimestamp: %lld\n", clockSyncType, clockSyncTimestamp);
		if(isSlave()==true){
			int ret=slaveHandleMessage();
			if(ret==1 && clockSyncType==kDelayResp){ //we are done, end of a cycle!
				return 1;
			} else if (ret!=1) {
				return -1; //
			} else {
				continue;
			}
		}
		if(isMaster()==true){ //this is master
			int ret=masterHandleMessage();
			if(ret==1 && clockSyncType==kDelayReq){ //we are done, end of a cycle!
				return 1;
			} else {
				return -2; //we are done but something was wrong
			}
		}
	}
	//printf("Receive loop timeout\n");
	return -1;
}

int ClockSync::slaveHandleMessage(){
	switch(clockSyncType){
		case kSync: //the clockSync timestamp is meaningless, the localTimestamp is when kSync was received
			resetTs();
			T1p=localTimestamp;
			expectedClockSyncType=kFollowUp;
			break;
		case kFollowUp: //the clockSyncTimestamp is the time when kSync was sent, the localTimestamp is meaningless
			T1=clockSyncTimestamp;
			//send delayReq
			setType(kDelayReq);
			setTimestamp(-1);
			T2=send();
			if(T2<0){
				//printf("Error while sending delayReq\n");
				return -1;
			}
			expectedClockSyncType=kDelayResp;
			break;
		case kDelayResp: {//the clockSyncTimestamp is the instant when the master received the kDelayResp clockSync, the localTimestamp is meaningless
			T2p=clockSyncTimestamp;
			//TODO: evaluate things
			if(areTsValid()){
				offset = (T1p-T1-T2p+T2)/2.0d;
				processOffset();
			} else {
				printf("InvalidTs:\n  %lld, %lld, %lld, %lld \n", T1, T1p, T2, T2p);
			}
			expectedClockSyncType=kSync; //end of the cycle, wait for next sync.
			break;
		}
		default:
			//printf("Unexpected message type\n"); // we should never get here
			return -1;
	}
	return 1;
}
void ClockSync::processOffset(){
	static int calls=0;
	// TODO: change the flow control below so that it can happen multiple times
	//(base it upon the length of movingAverage rather than the number of calls)
	if(calls<30){
		 // wait for the VirtualClock to stabilize and produce meaningful outputs
	} else if(calls<50) { //get an initial guess
		movingAverage.add(offset);
//		printf("-----------OFFSET IS : %04.4f samples, average: %04.4f samples\n",
//				offset, movingAverage.getAverage());
	} else if (calls==50){ //then compensate for initial offset
//		printf("compensating for offset: %f\n", movingAverage.getAverage());
		virtualClock->addOffset(movingAverage.getAverage());
		movingAverage.reset();
	} else if (calls>50){ //use IIR filter from now on
		//filter coefficients obtained from Matlab : [B,A]=butter(2,0.005);
		static IirFilter iir(1);
		static bool init = false;
		if(init == false){
			//cutoff 0.05
//			double coeffs[5]={0.00554271721028068, 0.0110854344205614, 0.00554271721028068,
//							     -1.77863177782459, 0.800802646665708};
			//cutoff 0.015
			double coeffs[5]={0.000537169774812052, 0.0010743395496241, 0.000537169774812052,
					-1.93338022587993, 0.935528904979178};
//			double coeffs[5]={0.0200833655642113, 0.0401667311284225, 0.0200833655642113,
//						-1.56101807580072, 0.641351538057563};
			iir.setCoefficients(coeffs);
			init = true;
		}
		float expectedOffsetDifferenceMean = 0.2432;
		float expectedOffsetDifference = 3.5;
		static float pastNlOffset = 0;
		static float pastIncrement = 0;
		float offsetIncrement = offset-pastNlOffset;
		static int filteredOut=0;
		if (calls>51 && (fabsf(offsetIncrement) > expectedOffsetDifference && filteredOut < 8)){
			// non-linear filtering: remove outliers
			nlOffset = pastNlOffset;
//			printf("this actually happened\n");
//nlOffset=offset; // BYPASS non linear filter
//			printf("%f %f                                             0,0,0,0,0,0\n", offset,pastOffset);
			filteredOut++;
		} else {
			nlOffset=offset;
			filteredOut = 0;
		}
		pastIncrement = nlOffset - pastNlOffset;
		pastNlOffset = nlOffset;
		iirOffset = iir.process(nlOffset);
		static float maxOffset=0;
		static float pastIirOffset=0;
		if( calls > 0 ) {
			maxOffset=fabsf(iirOffset) > fabsf(maxOffset) ? iirOffset : maxOffset;
			pid.setError(iirOffset);
			float correction=pid.getOutput();
			static float oldSamplingRate=44100;
			if( (calls&0) == 0){
				printf("%10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f\n",
						offset, nlOffset, iirOffset, iirOffset - pastIirOffset,
						pid.getProportionalGain(), pid.getIntegralGain(), pid.getDerivativeGain(), pid.getGlobalGain(),
						correction, pid.getIntegralError(), oldSamplingRate); //unfiltered, filtered
			}
			pastIirOffset = iirOffset;
			float targetSamplingRate;
			// Applit PID
			targetSamplingRate = oldSamplingRate - correction;
			if(targetSamplingRate> 44110) //clip the Pid !
				targetSamplingRate = 44110;
			if (targetSamplingRate < 44090)
				targetSamplingRate = 44090;

			// or use a naive hysteresis comparator
			static int direction=1;
			float thresholdL = -1;
			float thresholdH = 3;
			targetSamplingRate = 44100;
			if(
				(iirOffset > thresholdH && direction == 1) ||
				(iirOffset > thresholdL && direction == -1)
			){
				targetSamplingRate = 44097;
				direction = -1;
			}
			else if (
				(iirOffset < -thresholdH && direction == -1) ||
				(iirOffset < -thresholdL && direction == 1)
			)
			{
				iirOffset = 44103;
				direction = 1;
			}
#ifndef USE_JUCE
			if(oldSamplingRate != targetSamplingRate){
				gAudioCodec->setAudioSamplingRate(targetSamplingRate);
			}
#endif
			oldSamplingRate=targetSamplingRate;
		}
	}
	calls++;
}
int ClockSync::masterHandleMessage(){
	switch(clockSyncType){
		case kDelayReq:
			//send kDelayResp
			setType(kDelayResp);
			setTimestamp(localTimestamp);
			send();
			expectedClockSyncType=kNone;
			return 1;
		break;
		default:
			return -1;
	}
}

int ClockSync::sendReceiveLoop(){
	if(isSlave()==true){
		 //printf("Waiting for a sync clockSync\n");
	} else { //if this is master
		usleep(10000); //this times (roughly) how often sync clockSyncs are being sent.
		int ret=masterSendSync();
		if(ret<=0)
			return -1;
	}
	int ret=receiveLoop();
	if(ret<=0)
		return -2;
	return 1;
}

float ClockSync::getOffset(){
	return offset;
}
float ClockSync::getIirOffset(){
	return iirOffset;
}