view core/ClockSync.cpp @ 151:e9c9404e3d1f ClockSync

Pff partially working. No PID. When setting the audio clock on the bbb to 44098 the master and slave clock keep diverging instead of converging ...
author Giulio Moro <giuliomoro@yahoo.it>
date Tue, 22 Sep 2015 04:10:07 +0100
parents 134bff10e561
children 8f98b32d0e23
line wrap: on
line source
#include "ClockSync.h"

void ClockSync::setVirtualClock(VirtualClock &aVirtualClock){
	virtualClock=&aVirtualClock;
}
void ClockSync::init(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){
	setVirtualClock(aVirtualClock);
	slave=thisIsSlave;
	setPort(aPort);
	//  isSlave() ? client.setServer("127.0.0.1") : client.setServer("127.0.0.1");
	isSlave() ? client.setServer("192.168.7.1") : client.setServer("192.168.7.2");
	bufferLength=kSyncMessageLength;
	resetTs();
	receiveLoopSleepUs=100;
	receiveLoopTimeout=1e5;
	movingAverage.setLength(31);
	expectedClockSyncType=isSlave() ? kSync : kNone;
}
void ClockSync::resetTs(){
	T1=-1;
	T1p=-1;
	T2=-1;
	T2p=-1;
}
bool ClockSync::areTsValid(){
	return T1>0 && T1p>0 && T2>0 && T2p>0;
}
ClockSync::ClockSync(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){
	init(thisIsSlave, aPort, aVirtualClock);
}
void* ClockSync::getBuffer(){
	return buffer;
}
bool ClockSync::isSlave(){
	return slave;
}
bool ClockSync::isMaster(){
	return !slave;
}
int ClockSync::getType(){
	return ((int*)buffer)[0];
}
myClock_t ClockSync::getTimestamp(){
	return *((myClock_t*)((char*)buffer+sizeof(int)));
}
void ClockSync::setType(int clockSyncType){
	((int*)buffer)[0]=clockSyncType;
}
void ClockSync::setTimestamp(myClock_t timestamp){
//	printf("setting timestamp: %lld\n", timestamp);
	((myClock_t*)((char*)buffer+sizeof(int)))[0]=timestamp;
}
void ClockSync::print(){
	 //printf("type: %d, timestamp: %lld\n",*((int*)buffer),*((myClock_t*)(((int*)buffer)+1)));
}
void ClockSync::setPort(int aPort){
	if(aPort>0){
		int inPort=isSlave() ? aPort : aPort+1;
		int outPort=isSlave() ? aPort+1: aPort;
		server.bindToPort(inPort);
		client.setPort(outPort);
		//printf("Receiving on port %d\n", inPort);
		//printf("Sending to port %d\n", outPort);
	}
}
/** 
 * sends a clockSync without blocking, checks results and returns the timestamp
 * immediately after the clockSync has been sent or -1 if there was an error or timeout expired.
*/
myClock_t ClockSync::send(){
	//  print();
	int ret;
	ret=client.waitUntilReady(false, isSlave() ? 110 : 5);
	if(ret<=0){ //don't retry
		return -1;
	}
	ret=client.send(buffer, bufferLength);
	myClock_t timestamp=(myClock_t)virtualClock->getNow();
	if(ret<0){
	//if there was an error while sending, don't retry
		return -1;
	}
	return timestamp; //get the accurate time *after* you sent the sync clockSync
}
/** 
 * receives a clockSync without blocking, checks results and returns the timestamp
 * immediately after the clockSync has been received, or -1 if there was an error 
 * or 0 if timeout expired.
*/
myClock_t ClockSync::receive(){
	int ret;
	ret=server.waitUntilReady(true, 0);
	if(ret<=0){ //don't retry
		return 0;
	}
	ret=server.read(buffer, bufferLength, false);
	myClock_t timestamp=(myClock_t)virtualClock->getNow();
	if(timestamp==0){
		//printf("The virtualClock period is <=0\n");
		return -3;
	}
	if(ret==-1){
		//if there was an error while receiving, don't retry
		return -1;
	}
	if(ret!=bufferLength){
		//printf("Received a clockSync of the wrong size: %d\n", ret);
		return -2;
	}
	//  print();
	return timestamp; //get the accurate time *after* you sent the sync clockSync
}

int ClockSync::masterSendSync(){
	//let's send a sync clockSync!
	//printf("Sending a sync clockSync\n");
	setType(kSync);
	setTimestamp(-1);//do not care about sending the timestamp, a more accurate one will be sent in the follow up
	localTimestamp=send();
	if(localTimestamp<0){
		//printf("Could not send sync clockSync\n");
		return -1; //error, don't retry, start over
	}
	//let's send a followUp
	//printf("sent a sync clockSync\n");
	setType(kFollowUp);
	setTimestamp(localTimestamp);
	if(localTimestamp<0){
		//printf("Could not send followup clockSync\n");
		return -2; //error, don't retry, start over
	}
	int ret=send();
	if(ret<0){
		//printf("Error while sending followup\n");
		return -3;
	}
	//printf("sent a followUp clockSync\n");
	expectedClockSyncType=kDelayReq;
	return 1;
}
#ifdef USE_JUCE
#define NOTGSHOULDSTOP 1
#else
extern bool gShouldStop;
#define NOTGSHOULDSTOP (!gShouldStop)
#endif /* USE_JUCE */
int ClockSync::receiveLoop(){
	int receiveLoopElapsed=0;
	while( NOTGSHOULDSTOP && (isSlave() || (receiveLoopElapsed<receiveLoopTimeout))){ //when slave, does not timeout!
		receiveLoopElapsed+=receiveLoopSleepUs;
		usleep(receiveLoopSleepUs); //how often to check for new clockSyncs;
		//   //printf("waiting for clockSyncs\n");
		localTimestamp=receive();
		if(localTimestamp<=0){
			if(localTimestamp==0){
//				  printf("Socket not ready to be read: %lld\n", localTimestamp);
			}
			else if(localTimestamp==-1){
				printf("Error while receiving: %lld\n", localTimestamp);
			}
			else if(localTimestamp==-2){
				  printf("Wrong size of the received clockSync: %lld\n", localTimestamp);
			}
			continue ; //keep waiting
		}
		clockSyncType=getType();
		clockSyncTimestamp=getTimestamp();
		if(clockSyncType!=expectedClockSyncType){
			//printf("Wrong clockSync type: %d, expected: %d\n",clockSyncType, expectedClockSyncType);
			return -2; //start over
		}
//		printf("Received clockSync type: %d, clockSyncTimestamp: %lld\n", clockSyncType, clockSyncTimestamp);
		if(isSlave()==true){
			int ret=slaveHandleMessage();
			if(ret==1 && clockSyncType==kDelayResp){ //we are done, end of a cycle!
				return 1;
			} else if (ret!=1) {
				return -1; //
			} else {
				continue;
			}
		}
		if(isMaster()==true){ //this is master
			int ret=masterHandleMessage();
			if(ret==1 && clockSyncType==kDelayReq){ //we are done, end of a cycle!
				return 1;
			} else {
				return -2; //we are done but something was wrong
			}
		}
	}
	//printf("Receive loop timeout\n");
	return -1;
}

int ClockSync::slaveHandleMessage(){
	switch(clockSyncType){
		case kSync: //the clockSync timestamp is meaningless, the localTimestamp is when kSync was received
			resetTs();
			T1p=localTimestamp;
			expectedClockSyncType=kFollowUp;
			break;
		case kFollowUp: //the clockSyncTimestamp is the time when kSync was sent, the localTimestamp is meaningless
			T1=clockSyncTimestamp;
			//send delayReq
			setType(kDelayReq);
			setTimestamp(-1);
			T2=send();
			if(T2<0){
				//printf("Error while sending delayReq\n");
				return -1;
			}
			expectedClockSyncType=kDelayResp;
			break;
		case kDelayResp: {//the clockSyncTimestamp is the instant when the master received the kDelayResp clockSync, the localTimestamp is meaningless
			T2p=clockSyncTimestamp;
			//TODO: evaluate things
			double offset=(T1p-T1-T2p+T2)/2.0d;
			if(areTsValid()){
				processOffset(offset);

				/*
				static int calls=0;
				static double referenceOffset=0;

				if(calls<100){ // start by averaging everything
					movingAverage.add(offset);
				} else { //once we get an estimate, start discarding outliers
					float maxOffsetDeviation=20;
					float deviation=fabsf(movingAverage.getAverage()-offset);
					if(deviation<maxOffsetDeviation){
						movingAverage.add(offset);
						printf("w(end+1)=%4.1f;\n", movingAverage.getAverage());
					} else {
//						printf("Out of range: %f \n", deviation);
					}
				}
				printf("offset(end+1)=%f;\n", offset);
				if (calls==100){
//					printf("Update reference\n");
					referenceOffset=movingAverage.getAverage();
				} else 	if (calls==200){
					calls=99;
				}
				calls++;
*/

//				printf("%lld, %lld, %lld, %lld \n", T1, T1p, T2, T2p);
//				if(T2-T1p<0){
//					printf("Negative: %lld, %lld, %lld, %lld \n", T1, T1p, T2, T2p);
//				}
			} else {
				printf("InvalidTs:\n  %lld, %lld, %lld, %lld \n", T1, T1p, T2, T2p);
			}
			expectedClockSyncType=kSync; //end of the cycle, wait for next sync.
			break;
		}
		default:
			//printf("Unexpected message type\n"); // we should never get here
			return -1;
	}
	return 1;
}
void ClockSync::processOffset(double offset){
	static int calls=0;
	// TODO: change the flow control below so that it can happen multiple times
	//(base it upon the length of movingAverage rather than the number of calls)
	if(calls<30){
		 // wait for the VirtualClock to stabilize and produce meaningful outputs
	} else if(calls<50) { //get an initial guess
		movingAverage.add(offset);
//		printf("-----------OFFSET IS : %04.4f samples, average: %04.4f samples\n",
//				offset, movingAverage.getAverage());
	} else if (calls==50){ //then compensate for initial offset
//		printf("compensating for offset: %f\n", movingAverage.getAverage());
		virtualClock->addOffset(movingAverage.getAverage());
		movingAverage.reset();
	} else if (calls>50){ //use IIR filter from now on
		//filter coefficients obtained from Matlab : [B,A]=butter(2,0.005);
		static IirFilter iir(1);
		static bool init = false;
		if(init == false){
//			double coeffs[5]={0.00554271721028068, 0.0110854344205614, 0.00554271721028068,
//							     -1.77863177782459, 0.800802646665708};
			double coeffs[5]={0.0200833655642113, 0.0401667311284225, 0.0200833655642113,
						-1.56101807580072, 0.641351538057563};
			iir.setCoefficients(coeffs);
			init = true;
		}
		float expectedOffsetDifferenceMean = 0.2432;
		float expectedOffsetDifferenceStd = 6.487;
		float nlOffset;
		static float pastOffset = 0;
		float offsetIncrement = offset-pastOffset;
		if ( fabsf(offsetIncrement) > 1*expectedOffsetDifferenceStd && calls>51){
			// non-linear filtering: remove outliers
			nlOffset = pastOffset;
//			printf("%f %f                                             0,0,0,0,0,0\n", offset,pastOffset);
		} else {
			nlOffset=offset;
		}
		pastOffset=nlOffset;
		float iirOffset = iir.process(nlOffset);
		static float maxOffset=0;
		static float pastIirOffset=0;
		if( calls > 0 ) {
			maxOffset=fabsf(iirOffset) > fabsf(maxOffset) ? iirOffset : maxOffset;
			pid.setError(iirOffset);
			float correction=pid.getOutput()*0.0001;
			static float oldSamplingRate=44100;
			printf("%10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f\n",
					offset, nlOffset, iirOffset, iirOffset - pastIirOffset, correction, oldSamplingRate); //unfiltered, filtered
			pastIirOffset = iirOffset;
			float targetSamplingRate;
			static int direction=1;
			float thresholdL = -1;
//			targetSamplingRate = oldSamplingRate - correction;
			float thresholdH = 3;

			targetSamplingRate = 44100;
			if(
				(iirOffset > thresholdH && direction == 1) ||
				(iirOffset > thresholdL && direction == -1)
			){
				targetSamplingRate = 44097;
				direction = -1;
			}
			else if (
				(iirOffset < -thresholdH && direction == -1) ||
				(iirOffset < -thresholdL && direction == 1)
			)
			{
				iirOffset = 44102;
				direction = 1;
			}
#ifndef USE_JUCE
			if(oldSamplingRate != targetSamplingRate){
				gAudioCodec->setAudioSamplingRate(targetSamplingRate);
			}
#endif
			oldSamplingRate=targetSamplingRate;
		}
	}
	calls++;
}
int ClockSync::masterHandleMessage(){
	switch(clockSyncType){
		case kDelayReq:
			//send kDelayResp
			setType(kDelayResp);
			setTimestamp(localTimestamp);
			send();
			expectedClockSyncType=kNone;
			return 1;
		break;
		default:
			return -1;
	}
}

int ClockSync::sendReceiveLoop(){
	if(isSlave()==true){
		 //printf("Waiting for a sync clockSync\n");
	} else { //if this is master
		usleep(30000); //this times (roughly) how often sync clockSyncs are being sent.
		int ret=masterSendSync();
		if(ret<=0)
			return -1;
	}
	int ret=receiveLoop();
	if(ret<=0)
		return -2;
	return 1;
}