Mercurial > hg > beaglert
view core/ClockSync.cpp @ 152:8f98b32d0e23 ClockSync
Last commit on this branch for a while. Overall not very succesful
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Mon, 05 Oct 2015 13:06:14 +0100 |
parents | e9c9404e3d1f |
children |
line wrap: on
line source
#include "ClockSync.h" void ClockSync::setVirtualClock(VirtualClock &aVirtualClock){ virtualClock=&aVirtualClock; } Pid* gClockSyncPid; void ClockSync::init(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){ setVirtualClock(aVirtualClock); slave=thisIsSlave; setPort(aPort); // isSlave() ? client.setServer("127.0.0.1") : client.setServer("127.0.0.1"); isSlave() ? client.setServer("192.168.7.1") : client.setServer("192.168.7.2"); bufferLength=kSyncMessageLength; resetTs(); receiveLoopSleepUs=100; receiveLoopTimeout=1e5; movingAverage.setLength(31); expectedClockSyncType=isSlave() ? kSync : kNone; gClockSyncPid = &pid; } void ClockSync::resetTs(){ T1=-1; T1p=-1; T2=-1; T2p=-1; } bool ClockSync::areTsValid(){ return T1>0 && T1p>0 && T2>0 && T2p>0; } ClockSync::ClockSync(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){ init(thisIsSlave, aPort, aVirtualClock); } void* ClockSync::getBuffer(){ return buffer; } bool ClockSync::isSlave(){ return slave; } bool ClockSync::isMaster(){ return !slave; } int ClockSync::getType(){ return ((int*)buffer)[0]; } myClock_t ClockSync::getTimestamp(){ return *((myClock_t*)((char*)buffer+sizeof(int))); } void ClockSync::setType(int clockSyncType){ ((int*)buffer)[0]=clockSyncType; } void ClockSync::setTimestamp(myClock_t timestamp){ // printf("setting timestamp: %lld\n", timestamp); ((myClock_t*)((char*)buffer+sizeof(int)))[0]=timestamp; } void ClockSync::print(){ //printf("type: %d, timestamp: %lld\n",*((int*)buffer),*((myClock_t*)(((int*)buffer)+1))); } void ClockSync::setPort(int aPort){ if(aPort>0){ int inPort=isSlave() ? aPort : aPort+1; int outPort=isSlave() ? aPort+1: aPort; server.bindToPort(inPort); client.setPort(outPort); //printf("Receiving on port %d\n", inPort); //printf("Sending to port %d\n", outPort); } } /** * sends a clockSync without blocking, checks results and returns the timestamp * immediately after the clockSync has been sent or -1 if there was an error or timeout expired. */ myClock_t ClockSync::send(){ // print(); int ret; ret=client.waitUntilReady(false, isSlave() ? 110 : 5); if(ret<=0){ //don't retry return -1; } ret=client.send(buffer, bufferLength); myClock_t timestamp=(myClock_t)virtualClock->getNow(); if(ret<0){ //if there was an error while sending, don't retry return -1; } return timestamp; //get the accurate time *after* you sent the sync clockSync } /** * receives a clockSync without blocking, checks results and returns the timestamp * immediately after the clockSync has been received, or -1 if there was an error * or 0 if timeout expired. */ myClock_t ClockSync::receive(){ int ret; ret=server.waitUntilReady(true, 0); if(ret<=0){ //don't retry return 0; } ret=server.read(buffer, bufferLength, false); myClock_t timestamp=(myClock_t)virtualClock->getNow(); if(timestamp==0){ //printf("The virtualClock period is <=0\n"); return -3; } if(ret==-1){ //if there was an error while receiving, don't retry return -1; } if(ret!=bufferLength){ //printf("Received a clockSync of the wrong size: %d\n", ret); return -2; } // print(); return timestamp; //get the accurate time *after* you sent the sync clockSync } int ClockSync::masterSendSync(){ //let's send a sync clockSync! //printf("Sending a sync clockSync\n"); setType(kSync); setTimestamp(-1);//do not care about sending the timestamp, a more accurate one will be sent in the follow up localTimestamp=send(); if(localTimestamp<0){ //printf("Could not send sync clockSync\n"); return -1; //error, don't retry, start over } //let's send a followUp //printf("sent a sync clockSync\n"); setType(kFollowUp); setTimestamp(localTimestamp); if(localTimestamp<0){ //printf("Could not send followup clockSync\n"); return -2; //error, don't retry, start over } int ret=send(); if(ret<0){ //printf("Error while sending followup\n"); return -3; } //printf("sent a followUp clockSync\n"); expectedClockSyncType=kDelayReq; return 1; } #ifdef USE_JUCE #define NOTGSHOULDSTOP 1 #else extern bool gShouldStop; #define NOTGSHOULDSTOP (!gShouldStop) #endif /* USE_JUCE */ int ClockSync::receiveLoop(){ int receiveLoopElapsed=0; while( NOTGSHOULDSTOP && (isSlave() || (receiveLoopElapsed<receiveLoopTimeout))){ //when slave, does not timeout! receiveLoopElapsed+=receiveLoopSleepUs; usleep(receiveLoopSleepUs); //how often to check for new clockSyncs; // //printf("waiting for clockSyncs\n"); localTimestamp=receive(); if(localTimestamp<=0){ if(localTimestamp==0){ // printf("Socket not ready to be read: %lld\n", localTimestamp); } else if(localTimestamp==-1){ printf("Error while receiving: %lld\n", localTimestamp); } else if(localTimestamp==-2){ printf("Wrong size of the received clockSync: %lld\n", localTimestamp); } continue ; //keep waiting } clockSyncType=getType(); clockSyncTimestamp=getTimestamp(); if(clockSyncType!=expectedClockSyncType){ //printf("Wrong clockSync type: %d, expected: %d\n",clockSyncType, expectedClockSyncType); return -2; //start over } // printf("Received clockSync type: %d, clockSyncTimestamp: %lld\n", clockSyncType, clockSyncTimestamp); if(isSlave()==true){ int ret=slaveHandleMessage(); if(ret==1 && clockSyncType==kDelayResp){ //we are done, end of a cycle! return 1; } else if (ret!=1) { return -1; // } else { continue; } } if(isMaster()==true){ //this is master int ret=masterHandleMessage(); if(ret==1 && clockSyncType==kDelayReq){ //we are done, end of a cycle! return 1; } else { return -2; //we are done but something was wrong } } } //printf("Receive loop timeout\n"); return -1; } int ClockSync::slaveHandleMessage(){ switch(clockSyncType){ case kSync: //the clockSync timestamp is meaningless, the localTimestamp is when kSync was received resetTs(); T1p=localTimestamp; expectedClockSyncType=kFollowUp; break; case kFollowUp: //the clockSyncTimestamp is the time when kSync was sent, the localTimestamp is meaningless T1=clockSyncTimestamp; //send delayReq setType(kDelayReq); setTimestamp(-1); T2=send(); if(T2<0){ //printf("Error while sending delayReq\n"); return -1; } expectedClockSyncType=kDelayResp; break; case kDelayResp: {//the clockSyncTimestamp is the instant when the master received the kDelayResp clockSync, the localTimestamp is meaningless T2p=clockSyncTimestamp; //TODO: evaluate things if(areTsValid()){ offset = (T1p-T1-T2p+T2)/2.0d; processOffset(); } else { printf("InvalidTs:\n %lld, %lld, %lld, %lld \n", T1, T1p, T2, T2p); } expectedClockSyncType=kSync; //end of the cycle, wait for next sync. break; } default: //printf("Unexpected message type\n"); // we should never get here return -1; } return 1; } void ClockSync::processOffset(){ static int calls=0; // TODO: change the flow control below so that it can happen multiple times //(base it upon the length of movingAverage rather than the number of calls) if(calls<30){ // wait for the VirtualClock to stabilize and produce meaningful outputs } else if(calls<50) { //get an initial guess movingAverage.add(offset); // printf("-----------OFFSET IS : %04.4f samples, average: %04.4f samples\n", // offset, movingAverage.getAverage()); } else if (calls==50){ //then compensate for initial offset // printf("compensating for offset: %f\n", movingAverage.getAverage()); virtualClock->addOffset(movingAverage.getAverage()); movingAverage.reset(); } else if (calls>50){ //use IIR filter from now on //filter coefficients obtained from Matlab : [B,A]=butter(2,0.005); static IirFilter iir(1); static bool init = false; if(init == false){ //cutoff 0.05 // double coeffs[5]={0.00554271721028068, 0.0110854344205614, 0.00554271721028068, // -1.77863177782459, 0.800802646665708}; //cutoff 0.015 double coeffs[5]={0.000537169774812052, 0.0010743395496241, 0.000537169774812052, -1.93338022587993, 0.935528904979178}; // double coeffs[5]={0.0200833655642113, 0.0401667311284225, 0.0200833655642113, // -1.56101807580072, 0.641351538057563}; iir.setCoefficients(coeffs); init = true; } float expectedOffsetDifferenceMean = 0.2432; float expectedOffsetDifference = 3.5; static float pastNlOffset = 0; static float pastIncrement = 0; float offsetIncrement = offset-pastNlOffset; static int filteredOut=0; if (calls>51 && (fabsf(offsetIncrement) > expectedOffsetDifference && filteredOut < 8)){ // non-linear filtering: remove outliers nlOffset = pastNlOffset; // printf("this actually happened\n"); //nlOffset=offset; // BYPASS non linear filter // printf("%f %f 0,0,0,0,0,0\n", offset,pastOffset); filteredOut++; } else { nlOffset=offset; filteredOut = 0; } pastIncrement = nlOffset - pastNlOffset; pastNlOffset = nlOffset; iirOffset = iir.process(nlOffset); static float maxOffset=0; static float pastIirOffset=0; if( calls > 0 ) { maxOffset=fabsf(iirOffset) > fabsf(maxOffset) ? iirOffset : maxOffset; pid.setError(iirOffset); float correction=pid.getOutput(); static float oldSamplingRate=44100; if( (calls&0) == 0){ printf("%10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f, %10.3f\n", offset, nlOffset, iirOffset, iirOffset - pastIirOffset, pid.getProportionalGain(), pid.getIntegralGain(), pid.getDerivativeGain(), pid.getGlobalGain(), correction, pid.getIntegralError(), oldSamplingRate); //unfiltered, filtered } pastIirOffset = iirOffset; float targetSamplingRate; // Applit PID targetSamplingRate = oldSamplingRate - correction; if(targetSamplingRate> 44110) //clip the Pid ! targetSamplingRate = 44110; if (targetSamplingRate < 44090) targetSamplingRate = 44090; // or use a naive hysteresis comparator static int direction=1; float thresholdL = -1; float thresholdH = 3; targetSamplingRate = 44100; if( (iirOffset > thresholdH && direction == 1) || (iirOffset > thresholdL && direction == -1) ){ targetSamplingRate = 44097; direction = -1; } else if ( (iirOffset < -thresholdH && direction == -1) || (iirOffset < -thresholdL && direction == 1) ) { iirOffset = 44103; direction = 1; } #ifndef USE_JUCE if(oldSamplingRate != targetSamplingRate){ gAudioCodec->setAudioSamplingRate(targetSamplingRate); } #endif oldSamplingRate=targetSamplingRate; } } calls++; } int ClockSync::masterHandleMessage(){ switch(clockSyncType){ case kDelayReq: //send kDelayResp setType(kDelayResp); setTimestamp(localTimestamp); send(); expectedClockSyncType=kNone; return 1; break; default: return -1; } } int ClockSync::sendReceiveLoop(){ if(isSlave()==true){ //printf("Waiting for a sync clockSync\n"); } else { //if this is master usleep(10000); //this times (roughly) how often sync clockSyncs are being sent. int ret=masterSendSync(); if(ret<=0) return -1; } int ret=receiveLoop(); if(ret<=0) return -2; return 1; } float ClockSync::getOffset(){ return offset; } float ClockSync::getIirOffset(){ return iirOffset; }