Mercurial > hg > beaglert
changeset 135:e77e2e712fbc ClockSync
To work with the ClockSync plugin
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Sat, 12 Sep 2015 20:05:55 +0100 |
parents | 04b1678614c9 |
children | 772dbb57442b |
files | .cproject core/Clock.cpp core/ClockSync.cpp core/ClockSyncThread.cpp core/ReceiveAudioThread.cpp core/UdpClient.cpp core/UdpServer.cpp core/VirtualClock.cpp include/Clock.h include/ClockSync.h include/ClockSyncThread.h include/UdpClient.h include/VirtualClock.h include/stats.hpp projects/scope/render.cpp |
diffstat | 15 files changed, 738 insertions(+), 74 deletions(-) [+] |
line wrap: on
line diff
--- a/.cproject Thu Aug 27 03:33:32 2015 +0100 +++ b/.cproject Sat Sep 12 20:05:55 2015 +0100 @@ -108,13 +108,13 @@ </extensions> </storageModule> <storageModule moduleId="cdtBuildSystem" version="4.0.0"> - <configuration artifactName="${ProjName}" buildArtefactType="org.eclipse.cdt.build.core.buildArtefactType.exe" buildProperties="org.eclipse.cdt.build.core.buildType=org.eclipse.cdt.build.core.buildType.release,org.eclipse.cdt.build.core.buildArtefactType=org.eclipse.cdt.build.core.buildArtefactType.exe" cleanCommand="rm -rf" description="" id="cdt.managedbuild.config.gnu.exe.release.1521194538" name="Release" parent="cdt.managedbuild.config.gnu.exe.release" postannouncebuildStep="Stopping process on BBB and copying new binary" postbuildStep="ssh root@192.168.7.2 "kill -s 9 \`pidof ${BuildArtifactFileName}\` 2>/dev/null; sleep 0.5; "; scp ${PWD}/${BuildArtifactFileName} root@192.168.7.2:~/beaglert/ && echo 'done copying\n' | wall || echo 'error'|wall"> + <configuration artifactName="${ProjName}" buildArtefactType="org.eclipse.cdt.build.core.buildArtefactType.exe" buildProperties="org.eclipse.cdt.build.core.buildType=org.eclipse.cdt.build.core.buildType.release,org.eclipse.cdt.build.core.buildArtefactType=org.eclipse.cdt.build.core.buildArtefactType.exe" cleanCommand="rm -rf" description="" id="cdt.managedbuild.config.gnu.exe.release.1521194538" name="Release" parent="cdt.managedbuild.config.gnu.exe.release" postannouncebuildStep="Stopping process on BBB and copying new binary" postbuildStep="scp ${PWD}/${BuildArtifactFileName} root@192.168.7.2:~/beaglert/ && echo 'done copying\n' | wall || echo 'error'|wall"> <folderInfo id="cdt.managedbuild.config.gnu.exe.release.1521194538." name="/" resourcePath=""> <toolChain id="cdt.managedbuild.toolchain.gnu.exe.release.1612059942" name="Linux GCC" superClass="cdt.managedbuild.toolchain.gnu.exe.release"> <targetPlatform id="cdt.managedbuild.target.gnu.platform.exe.release.908983575" name="Debug Platform" superClass="cdt.managedbuild.target.gnu.platform.exe.release"/> <builder buildPath="${workspace_loc:/BBB_audio+input/Release}" id="cdt.managedbuild.target.gnu.builder.exe.release.511190290" keepEnvironmentInBuildfile="false" managedBuildOn="true" name="Gnu Make Builder" superClass="cdt.managedbuild.target.gnu.builder.exe.release"/> <tool command="arm-linux-gnueabihf-g++ " id="cdt.managedbuild.tool.gnu.archiver.base.810674388" name="GCC Archiver" superClass="cdt.managedbuild.tool.gnu.archiver.base"/> - <tool command="arm-linux-gnueabihf-g++ " commandLinePattern="${COMMAND} ${FLAGS} ${OUTPUT_FLAG} ${OUTPUT_PREFIX}${OUTPUT} ${INPUTS}" id="cdt.managedbuild.tool.gnu.cpp.compiler.exe.release.163790048" name="GCC C++ Compiler" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.exe.release"> + <tool command="arm-linux-gnueabihf-g++" commandLinePattern="${COMMAND} ${FLAGS} ${OUTPUT_FLAG} ${OUTPUT_PREFIX}${OUTPUT} ${INPUTS}" id="cdt.managedbuild.tool.gnu.cpp.compiler.exe.release.163790048" name="GCC C++ Compiler" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.exe.release"> <option id="gnu.cpp.compiler.exe.release.option.optimization.level.2031085570" name="Optimization Level" superClass="gnu.cpp.compiler.exe.release.option.optimization.level" value="gnu.cpp.compiler.optimization.level.more" valueType="enumerated"/> <option id="gnu.cpp.compiler.exe.release.option.debugging.level.701035863" name="Debug Level" superClass="gnu.cpp.compiler.exe.release.option.debugging.level" value="gnu.cpp.compiler.debugging.level.none" valueType="enumerated"/> <option id="gnu.cpp.compiler.option.include.paths.823255770" name="Include paths (-I)" superClass="gnu.cpp.compiler.option.include.paths" valueType="includePath">
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/Clock.cpp Sat Sep 12 20:05:55 2015 +0100 @@ -0,0 +1,18 @@ +#include "Clock.h" + +namespace Clock +{ + myClock_t getTimeUs(){ + struct timeval tv; + struct timezone tz; + int ret=gettimeofday(&tv, &tz); + if(ret == -1){ + printf("Error while getting time of the day\n"); + return -1; + } + myClock_t time=tv.tv_usec + tv.tv_sec*1e6; + // printf("Time is: %.6f\n", time/1000000.0); + // printf("Time is: %lld\n", time); + return time; + } +}; \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/ClockSync.cpp Sat Sep 12 20:05:55 2015 +0100 @@ -0,0 +1,250 @@ +#include "ClockSync.h" + +void ClockSync::setVirtualClock(VirtualClock &aVirtualClock){ + virtualClock=&aVirtualClock; +} +void ClockSync::init(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){ + setVirtualClock(aVirtualClock); + slave=thisIsSlave; + setPort(aPort); + // isSlave() ? client.setServer("127.0.0.1") : client.setServer("127.0.0.1"); + isSlave() ? client.setServer("192.168.7.1") : client.setServer("192.168.7.2"); + bufferLength=kSyncMessageLength; + T1=-1; + T1p=-1; + T2=-1; + T2p=-1; + receiveLoopSleepUs=10; + receiveLoopTimeout=1e5; + movingAverage.setLength(201); + expectedClockSyncType=isSlave() ? kSync : kNone; +} +ClockSync::ClockSync(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){ + init(thisIsSlave, aPort, aVirtualClock); +} +void* ClockSync::getBuffer(){ + return buffer; +} +bool ClockSync::isSlave(){ + return slave; +} +bool ClockSync::isMaster(){ + return !slave; +} +int ClockSync::getType(){ + return ((int*)buffer)[0]; +} +myClock_t ClockSync::getTimestamp(){ + return *((myClock_t*)((char*)buffer+sizeof(int))); +} +void ClockSync::setType(int clockSyncType){ + ((int*)buffer)[0]=clockSyncType; +} +void ClockSync::setTimestamp(myClock_t timestamp){ + ((myClock_t*)((char*)buffer+sizeof(int)))[0]=timestamp; +} +void ClockSync::print(){ + //printf("type: %d, timestamp: %lld\n",*((int*)buffer),*((myClock_t*)(((int*)buffer)+1))); +} +void ClockSync::setPort(int aPort){ + if(aPort>0){ + int inPort=isSlave() ? aPort : aPort+1; + int outPort=isSlave() ? aPort+1: aPort; + server.bindToPort(inPort); + client.setPort(outPort); + //printf("Receiving on port %d\n", inPort); + //printf("Sending to port %d\n", outPort); + } +} +/** + * sends a clockSync without blocking, checks results and returns the timestamp + * immediately after the clockSync has been sent or -1 if there was an error or timeout expired. +*/ +myClock_t ClockSync::send(){ + // print(); + int ret; + ret=client.waitUntilReady(false, 0); + if(ret<=0){ //don't retry + return -1; + } + ret=client.send(buffer, bufferLength); + myClock_t timestamp=(myClock_t)virtualClock->getNow(); + if(ret<0){ + //if there was an error while sending, don't retry + return -1; + } + return timestamp; //get the accurate time *after* you sent the sync clockSync +} +/** + * receives a clockSync without blocking, checks results and returns the timestamp + * immediately after the clockSync has been received, or -1 if there was an error + * or 0 if timeout expired. +*/ +myClock_t ClockSync::receive(){ + int ret; + ret=server.waitUntilReady(true, 0); + if(ret<=0){ //don't retry + return 0; + } + ret=server.read(buffer, bufferLength, false); + myClock_t timestamp=(myClock_t)virtualClock->getNow(); + if(timestamp==0){ + //printf("The virtualClock period is <=0\n"); + return -3; + } + if(ret==-1){ + //if there was an error while receiving, don't retry + return -1; + } + if(ret!=bufferLength){ + //printf("Received a clockSync of the wrong size: %d\n", ret); + return -2; + } + // print(); + return timestamp; //get the accurate time *after* you sent the sync clockSync +} + +int ClockSync::masterSendSync(){ + //let's send a sync clockSync! + //printf("Sending a sync clockSync\n"); + setType(kSync); + setTimestamp(-1);//do not care about sending the timestamp, a more accurate one will be sent in the follow up + localTimestamp=send(); + if(localTimestamp<0){ + //printf("Could not send sync clockSync\n"); + return -1; //error, don't retry, start over + } + //let's send a followUp + //printf("sent a sync clockSync\n"); + setType(kFollowUp); + setTimestamp(localTimestamp); + if(localTimestamp<0){ + //printf("Could not send followup clockSync\n"); + return -2; //error, don't retry, start over + } + int ret=send(); + if(ret<0){ + //printf("Error while sending followup\n"); + return -3; + } + //printf("sent a followUp clockSync\n"); + expectedClockSyncType=kDelayReq; + return 1; +} +#ifdef USE_JUCE +#define NOTGSHOULDSTOP 1 +#else +extern bool gShouldStop; +#define NOTGSHOULDSTOP (!gShouldStop) +#endif /* USE_JUCE */ +int ClockSync::receiveLoop(){ + int receiveLoopElapsed=0; + while( NOTGSHOULDSTOP && (isSlave() || (receiveLoopElapsed<receiveLoopTimeout))){ //when slave, does not timeout! + receiveLoopElapsed+=receiveLoopSleepUs; + usleep(receiveLoopSleepUs); //how often to check for new clockSyncs; + // //printf("waiting for clockSyncs\n"); + localTimestamp=receive(); + if(localTimestamp<=0){ + if(localTimestamp==0){ + // //printf("Socket not ready to be read: %lld\n", localTimestamp); + } + else if(localTimestamp==-1){ + //printf("Error while receiving: %lld\n", localTimestamp); + } + else if(localTimestamp==-2){ + // //printf("Wrong size of the received clockSync: %lld\n", localTimestamp); + } + continue ; //keep waiting + } + clockSyncType=getType(); + clockSyncTimestamp=getTimestamp(); + if(clockSyncType!=expectedClockSyncType){ + //printf("Wrong clockSync type: %d, expected: %d\n",clockSyncType, expectedClockSyncType); + return -2; //start over + } + //printf("Received clockSync type: %d, clockSyncTimestamp: %lld\n", clockSyncType, clockSyncTimestamp); + if(isSlave()==true){ + int ret=slaveHandleMessage(); + if(ret==1 && clockSyncType==kDelayResp){ //we are done, end of a cycle! + return 1; + } else if (ret!=1) { + return -1; // + } else { + continue; + } + } + if(isMaster()==true){ //this is master + int ret=masterHandleMessage(); + if(ret==1 && clockSyncType==kDelayReq){ //we are done, end of a cycle! + return 1; + } else { + return -2; //we are done but something was wrong + } + } + } + //printf("Receive loop timeout\n"); + return -1; +} + +int ClockSync::slaveHandleMessage(){ + switch(clockSyncType){ + case kSync: //the clockSync timestamp is meaningless, the localTimestamp is when kSync was received + T1p=localTimestamp; + expectedClockSyncType=kFollowUp; + break; + case kFollowUp: //the clockSyncTimestamp is the time when kSync was sent, the localTimestamp is meaningless + T1=clockSyncTimestamp; + //send delayReq + setType(kDelayReq); + setTimestamp(-1); + T2=send(); + if(T2<0){ + //printf("Error while sending delayReq\n"); + return -1; + } + expectedClockSyncType=kDelayResp; + break; + case kDelayResp: {//the clockSyncTimestamp is the instant when the master received the kDelayResp clockSync, the localTimestamp is meaningless + T2p=clockSyncTimestamp; + //TODO: evaluate things + double offset=(T1p-T1-T2p+T2)/2.0d; + printf("-----------OFFSET IS : %04.1f, average: %04.2f\n", offset, movingAverage.add(offset)); + expectedClockSyncType=kSync; //end of the cycle, wait for next sync. + break; + } + default: + //printf("Unexpected message type\n"); // we should never get here + return -1; + } + return 1; +} + +int ClockSync::masterHandleMessage(){ + switch(clockSyncType){ + case kDelayReq: + //TODO: do something with it + //send kDelayResp + setType(kDelayResp); + setTimestamp(localTimestamp); + send(); + expectedClockSyncType=kNone; + break; + } +} + +int ClockSync::sendReceiveLoop(){ + if(isSlave()==true){ + //printf("Waiting for a sync clockSync\n"); + } else { //if this is master + usleep(100000); //this times (roughly) how often sync clockSyncs are being sent. + int ret=masterSendSync(); + if(ret<=0) + return -1; + } + int ret=receiveLoop(); + if(ret<=0) + return -2; + return 1; +} + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/ClockSyncThread.cpp Sat Sep 12 20:05:55 2015 +0100 @@ -0,0 +1,67 @@ +#include "ClockSyncThread.h" +#ifdef USE_JUCE +#else //declare static members TODO: rather refactor this similar to other threads so that only run and clockSyncTask are static + myClock_t ClockSyncThread::lastTime; // Used for clock synchronization + bool ClockSyncThread::listening; + ClockSync ClockSyncThread::clockSync; + VirtualClock* ClockSyncThread::virtualClock; + bool ClockSyncThread::threadIsExiting; + AuxiliaryTask ClockSyncThread::clockSyncTask; +#endif +#ifdef USE_JUCE +ClockSyncThread::ClockSyncThread(const String &threadName) : + Thread(threadName) +#else +ClockSyncThread::ClockSyncThread() +#endif /* USE_JUCE */ +{ +}; +ClockSyncThread::~ClockSyncThread(){ +#ifdef USE_JUCE + stopThread(1000); +#else + stopThread(); +#endif /* USE_JUCE */ +} +void ClockSyncThread::init(bool isSlave, int aPort, VirtualClock &aVirtualClock){ + setVirtualClock(aVirtualClock); + listening=false; + clockSync.init(isSlave, aPort, *virtualClock); +#ifdef USE_JUCE + startThread(5); +#else + threadIsExiting=false; + clockSyncTask=BeagleRT_createAuxiliaryTask(&ClockSyncThread::run,98, "clockSyncTask"); + //TODO: the thread cannot be started here at the moment because init() is called in setup(), where tasks cannot be scheduled +#endif /* USE_JUCE */ +} + +#ifdef USE_JUCE +#else +void ClockSyncThread::startThread(){ + printf("starting\n"); + BeagleRT_scheduleAuxiliaryTask(clockSyncTask); + printf("started\n"); +} +void ClockSyncThread::stopThread(){ + threadIsExiting=true; +} +bool ClockSyncThread::threadShouldExit(){ + return(gShouldStop || threadIsExiting ); +} +#endif /* USE_JUCE */ + +void ClockSyncThread::setVirtualClock(VirtualClock &aVirtualClock){ + virtualClock=&aVirtualClock; +}; + +void ClockSyncThread::run(){ + while(!threadShouldExit()){ + clockSync.sendReceiveLoop(); + usleep(5000); +// double now=virtualClock->getNow(); +// printf("th(end+1)=%f;\n", now/44100.0f); +// printf("act(end+1)=%lld;\n", Clock::getTimeUs()); + } + printf("Thread is not running \n"); +}
--- a/core/ReceiveAudioThread.cpp Thu Aug 27 03:33:32 2015 +0100 +++ b/core/ReceiveAudioThread.cpp Sat Sep 12 20:05:55 2015 +0100 @@ -203,7 +203,7 @@ if(isListening()==false) return -1; static int numCalls=0; - if(writePointer<0 || (numCalls&16383)==0){ //if writePointer has not been initalized yet ... + if(writePointer<0 /*|| (numCalls&16383)==0*/){ //if writePointer has not been initalized yet ... #ifdef USE_JUCE #else //debug rt_printf("reinit the writePointer, readPointer: %f;\n",readPointer);
--- a/core/UdpClient.cpp Thu Aug 27 03:33:32 2015 +0100 +++ b/core/UdpClient.cpp Sat Sep 12 20:05:55 2015 +0100 @@ -1,62 +1,77 @@ -/* - * udpClient.cpp - * - * Created on: 19 May 2015 - * Author: giulio moro - */ -#include "../include/UdpClient.h" - - UdpClient::UdpClient(){ - outSocket=socket(AF_INET, SOCK_DGRAM, 0); - isSetPort=false; - isSetServer=false; - enabled=false; - } - UdpClient::UdpClient(int aPort, const char* aServerName){ - outSocket=socket(AF_INET, SOCK_DGRAM, 0); - if(outSocket<0){ - enabled=false; - return; - } - setPort(aPort); - setServer(aServerName); - isSetPort=true; - isSetServer=true; - enabled=true; - } - UdpClient::~UdpClient(){ - close(outSocket); - } - void UdpClient::setPort(int aPort){ - port=aPort; - destinationServer.sin_port = htons(port); - destinationServer.sin_family = AF_INET; - isSetPort=true; - if(isSetServer){ - enabled=true; - } - }; - void UdpClient::setServer(const char* aServerName){ - inet_pton(AF_INET,aServerName,&destinationServer.sin_addr); - isSetServer=true; - if(isSetPort){ - enabled=true; - } - }; - int UdpClient::send(void * message, int size){ - if(!enabled) - return -1; - unsigned int length; - length=sizeof(struct sockaddr_in); - int n=sendto(outSocket,message,size,0,(const struct sockaddr *)&destinationServer,length); - if (n < 0){ - return n; - } - return 1; - }; - int UdpClient::write(const char* remoteHostname, int remotePortNumber, void* sourceBuffer, int numBytesToWrite){ - setServer(remoteHostname); - setPort(remotePortNumber); - send(sourceBuffer, numBytesToWrite); - } - +/* + * udpClient.cpp + * + * Created on: 19 May 2015 + * Author: giulio moro + */ +#include "UdpClient.h" + + UdpClient::UdpClient(){ + outSocket=socket(AF_INET, SOCK_DGRAM, 0); + isSetPort=false; + isSetServer=false; + enabled=false; + } + UdpClient::UdpClient(int aPort, const char* aServerName){ + outSocket=socket(AF_INET, SOCK_DGRAM, 0); + if(outSocket<0){ + enabled=false; + return; + } + setPort(aPort); + setServer(aServerName); + isSetPort=true; + isSetServer=true; + enabled=true; + memset(&stTimeOut, 0, sizeof(struct timeval)); + } + UdpClient::~UdpClient(){ + close(outSocket); + } + void UdpClient::setPort(int aPort){ + port=aPort; + destinationServer.sin_port = htons(port); + destinationServer.sin_family = AF_INET; + isSetPort=true; + if(isSetServer){ + enabled=true; + } + }; + void UdpClient::setServer(const char* aServerName){ + inet_pton(AF_INET,aServerName,&destinationServer.sin_addr); + isSetServer=true; + if(isSetPort){ + enabled=true; + } + }; + int UdpClient::send(void * message, int size){ + if(!enabled) + return -1; + unsigned int length; + length=sizeof(struct sockaddr_in); + int n=sendto(outSocket,message,size,0,(const struct sockaddr *)&destinationServer,length); + if (n < 0){ + return n; + } + return 1; + }; + int UdpClient::write(const char* remoteHostname, int remotePortNumber, void* sourceBuffer, int numBytesToWrite){ + setServer(remoteHostname); + setPort(remotePortNumber); + send(sourceBuffer, numBytesToWrite); + } + int UdpClient::waitUntilReady(bool readyForReading, int timeoutMsecs){ +// If the socket is ready on return, this returns 1. If it times-out before the socket becomes ready, it returns 0. If an error occurs, it returns -1. + if(enabled==false) + return -1; + if(timeoutMsecs<0) + return select(outSocket+1, NULL, &stWriteFDS, NULL, NULL); //calling this with a NULL timeout will block indefinitely + FD_ZERO(&stWriteFDS); + FD_SET(outSocket, &stWriteFDS); + float timeOutSecs=timeoutMsecs*0.001; + stTimeOut.tv_sec=(int)timeOutSecs; + timeOutSecs-=(int)timeOutSecs; + stTimeOut.tv_usec=(int)(timeOutSecs*1000000); + int descriptorReady= select(outSocket+1, NULL, &stWriteFDS, NULL, &stTimeOut); + return descriptorReady>0? 1 : descriptorReady; + } \ No newline at end of file
--- a/core/UdpServer.cpp Thu Aug 27 03:33:32 2015 +0100 +++ b/core/UdpServer.cpp Sat Sep 12 20:05:55 2015 +0100 @@ -4,7 +4,7 @@ * Created on: 19 May 2015 * Author: giulio moro */ -#include "../include/UdpServer.h" +#include "UdpServer.h" UdpServer::UdpServer(int aPort){ init(aPort); @@ -29,6 +29,7 @@ enabled=bindToPort(aPort); wasteBufferSize=2048; wasteBuffer=malloc(wasteBufferSize); + memset(&stTimeOut,0,sizeof(struct timeval)); return enabled; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/VirtualClock.cpp Sat Sep 12 20:05:55 2015 +0100 @@ -0,0 +1,40 @@ +#include "VirtualClock.h" +void VirtualClock::init(){ + firstRun=true; + movingAverage.setLength(31); //TODO: a better filtering algorithm ( Did you say Kalman?) + period=-1; +} + +VirtualClock::VirtualClock(){ + init(); +} +void VirtualClock::sync(){ + sync(1); +} +void VirtualClock::sync(double count){ + myClock_t currentTime=Clock::getTimeUs(); + if(firstRun==true){ + firstRun=false; + startTime=currentTime; + } else { + period=movingAverage.add((currentTime-lastSync)/count); //TODO: replace with Kalman filter + } + lastSync=currentTime; +} + +double VirtualClock::getNow(){ + myClock_t now=Clock::getTimeUs(); + if(period<=0){ + return now; + } + // double beginningOfPeriod=lastSync; // TODO: if sync() does not get called every time (but e.g. only every so often), + // then this line (and the class) needs editing + myClock_t elapsed=(now-startTime); + double frac=elapsed/(double)period; + // printf("now=%lld; beginningOfPeriod=%f; lastSync=%lld; period=%lld; frac=%f\n", now, beginningOfPeriod, lastSync, period, frac); + return frac; +} + +double VirtualClock::getPeriod(){ + return period; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/Clock.h Sat Sep 12 20:05:55 2015 +0100 @@ -0,0 +1,12 @@ +#ifndef CLOCK_H_INCLUDED +#define CLOCK_H_INCLUDED +#include <sys/time.h> +#include <stdio.h> + +typedef long long int myClock_t; + +namespace Clock +{ + myClock_t getTimeUs(); +} +#endif /* CLOCK_H_INCLUDED */ \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/ClockSync.h Sat Sep 12 20:05:55 2015 +0100 @@ -0,0 +1,81 @@ +#ifndef CLOCK_SYNC_H_INCLUDED +#define CLOCK_SYNC_H_INCLUDED +#include "stats.hpp" +#include "UdpServer.h" +#include "UdpClient.h" +#include "Clock.h" +#include "VirtualClock.h" + +enum ptpMessageConsts{ + kSyncMessageLength=sizeof(myClock_t)+sizeof(int) +}; +enum ptpMessageType{ + kSync=0, + kFollowUp=1, + kDelayReq=2, + kDelayResp=3, + kNone=4 +}; + +enum ptpStatus{ + kSyncSent, + kFollowUpSent, + kDelayReqSent, + kDelayRespSent +}; + +class ClockSync{ +private: + MovingAverage<double> movingAverage; + UdpServer server; + UdpClient client; + bool slave; + int bufferLength; + int clockSyncType; + int expectedClockSyncType; + myClock_t clockSyncTimestamp; + myClock_t localTimestamp; + myClock_t T1; + myClock_t T1p; + myClock_t T2; + myClock_t T2p; + int receiveLoopSleepUs; + int receiveLoopTimeout; + char buffer[kSyncMessageLength]; + VirtualClock *virtualClock; +public: + ClockSync(){}; + ClockSync(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock); + void init(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock); + void* getBuffer(); + bool isSlave(); + bool isMaster(); + int getType(); + myClock_t getTimestamp(); + void setVirtualClock(VirtualClock &aVirtualClock); + void setPort(int aPort); + void setType(int clockSyncType); + void setTimestamp(myClock_t timestamp); + void print(); + /** + * sends a clockSync without blocking, checks results and returns the timestamp + * immediately after the clockSync has been sent or -1 if there was an error or timeout expired. + */ + myClock_t send(); + /** + * receives a clockSync without blocking, checks results and returns the timestamp + * immediately after the clockSync has been received, or -1 if there was an error + * or 0 if timeout expired. + */ + myClock_t receive(); + int masterSendSync(); + int receiveLoop(); + int slaveHandleMessage(); + int masterHandleMessage(); + int sendReceiveLoop(); + operator void*(){ + return getBuffer(); + } +}; + +#endif /* CLOCK_SYNC_H_INCLUDED */ \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/ClockSyncThread.h Sat Sep 12 20:05:55 2015 +0100 @@ -0,0 +1,48 @@ +#ifndef CLOCK_SYNC_THREAD_H_INCLUDED +#define CLOCK_SYNC_THREAD_H_INCLUDED + +#ifdef USE_JUCE +#include <JuceHeader.h> +#define IS_STATIC +#else +#define IS_STATIC static +#include <BeagleRT.h> +#endif /*USE_JUCE*/ + +#include "ClockSync.h" + +#ifdef USE_JUCE +class ClockSyncThread : public Thread { +#else +class ClockSyncThread { +#endif /* USE_JUCE */ +private: + IS_STATIC myClock_t lastTime; // Used for clock synchronization + IS_STATIC bool listening; + IS_STATIC ClockSync clockSync; + IS_STATIC VirtualClock *virtualClock; +#ifdef USE_JUCE +#else + IS_STATIC bool threadIsExiting; + IS_STATIC AuxiliaryTask clockSyncTask; +#endif /* USE_JUCE */ + +public: +#ifdef USE_JUCE + ClockSyncThread(const String &threadName); +#else + ClockSyncThread(); +#endif /* USE_JUCE */ + ~ClockSyncThread(); + IS_STATIC void init(bool isSlave, int aPort, VirtualClock &aVirtualClock); + IS_STATIC void setVirtualClock(VirtualClock &aVirtualClock); + + IS_STATIC void run(); +#ifdef USE_JUCE +#else + IS_STATIC void startThread(); + IS_STATIC void stopThread(); + IS_STATIC bool threadShouldExit(); +#endif // USE_JUCE +}; +#endif // CLOCK_SYNC_THREAD_H_INCLUDED
--- a/include/UdpClient.h Thu Aug 27 03:33:32 2015 +0100 +++ b/include/UdpClient.h Sat Sep 12 20:05:55 2015 +0100 @@ -23,6 +23,8 @@ int port; int enabled; int outSocket; + struct timeval stTimeOut; + fd_set stWriteFDS; bool isSetPort; bool isSetServer; struct sockaddr_in destinationServer; @@ -34,6 +36,7 @@ void setServer(const char* aServerName); int send(void* message, int size); int write(const char* remoteHostname, int remotePortNumber, void* sourceBuffer, int numBytesToWrite); + int waitUntilReady(bool readyForReading, int timeoutMsecs); };
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/VirtualClock.h Sat Sep 12 20:05:55 2015 +0100 @@ -0,0 +1,39 @@ +#ifndef VIRTUAL_CLOCK_H_INCLUDED +#define VIRTUAL_CLOCK_H_INCLUDED + +#include "stats.hpp" +#include "Clock.h" + +class VirtualClock{ +private: + myClock_t startTime; + myClock_t lastSync; + bool firstRun; + double period; + MovingAverage<double> movingAverage; +public: + void init(); + VirtualClock(); +/** + Call this method at regular intervals to sync che virtual clock +*/ + void sync(); +/** + Call this method asynchronously, passing a number of equally spaced events that have elapsed since the last call. +*/ + void sync(double count); +/** + Get the current time according to the VirtualClock. + + @return Time elapsed since the first call to sync(), in period units. +*/ + double getNow(); +/** + Get the length of the period. + + Get the length of the period (difference between calls to sync() after various filtering operations) +*/ + double getPeriod(); +}; + +#endif /* VIRTUAL_CLOCK_H_INCLUDED */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/stats.hpp Sat Sep 12 20:05:55 2015 +0100 @@ -0,0 +1,64 @@ +#ifndef STATS_HPP_INCLUDED +#define STATS_HPP_INCLUDED +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +template<class TYPE> +class MovingAverage{ +private: + TYPE* array; + int length; + int pointer; + TYPE sum; + double scale; + double average; + void dealloc(){ + free(array); + // delete array; + } + + void init(int aLength){ + length=aLength; + scale=1.0/length; + // array= new TYPE(length); // for some reason this causes memory corruption, so I am using malloc() instead... + array=(TYPE*)malloc(sizeof(TYPE)*length); + sum=0; + if(array==NULL) + printf("Error while allocating array\n"); + memset(array, 0, sizeof(TYPE)*length); + pointer=0; + } +public: + MovingAverage(){ + init(0); + } + MovingAverage(int aLength){ + init(aLength); + } + ~MovingAverage(){ + dealloc(); + } + int getLength(){ + return length; + } + void setLength(int aLength){ + dealloc(); + init(aLength); + } + double add(TYPE newElement){ + sum-=array[pointer]; + array[pointer]=newElement; + sum+=newElement; + average=sum*scale; + pointer++; + if(pointer==length) + pointer=0; + return average; + } + double getAverage(){ + return average; + } +}; + +#endif /* STATS_HPP_INCLUDED */ \ No newline at end of file
--- a/projects/scope/render.cpp Thu Aug 27 03:33:32 2015 +0100 +++ b/projects/scope/render.cpp Sat Sep 12 20:05:55 2015 +0100 @@ -3,13 +3,14 @@ #include <ReceiveAudioThread.h> #include <ClockSynchronizer.h> #include <cmath> +#include <ClockSyncThread.h> float gPhase1, gPhase2; float gFrequency1, gFrequency2; float gInverseSampleRate; //Scope scope(2); //create a scope object with 2 channels -NetworkSend networkSend; +//NetworkSend networkSend; // initialise_render() is called once before the audio rendering starts. // Use it to perform any initialisation and allocation which is dependent @@ -19,20 +20,24 @@ // in from the call to initAudio(). // // Return true on success; returning false halts the program. -ReceiveAudioThread receiveAudio0; +//ReceiveAudioThread receiveAudio0; //ReceiveAudioThread receiveAudio1; ClockSynchronizer clockSynchronizer; extern I2c_Codec* gAudioCodec; +VirtualClock virtualClock; +ClockSyncThread clockSyncThread; bool setup(BeagleRTContext *context, void *userData) { - receiveAudio0.init(10000, context->audioFrames, 0); +// receiveAudio0.init(10000, context->audioFrames, 0); // receiveAudio1.init(10000, context->audioFrames, 1); // scope.setup(); //call this once in setup to initialise the scope // scope.setPort(0, 9999); // scope.setPort(1, 10000); - networkSend.setup(context->audioSampleRate, context->audioFrames, 0, 9999, "192.168.7.1"); +// networkSend.setup(context->audioSampleRate, context->audioFrames, 0, 9999, "192.168.7.1"); clockSynchronizer.setup(); + virtualClock.init(); + clockSyncThread.init(true, 5000, virtualClock); //start as slave gInverseSampleRate = 1.0/context->audioSampleRate; gPhase1 = 0.0; @@ -51,10 +56,26 @@ void render(BeagleRTContext *context, void *userData) { + virtualClock.sync(context->audioFrames); static int count=0; + if(count==0) + clockSyncThread.startThread(); + static float phase=0; + float phaseInc=200.0/44100.0*2*M_PI; +// rt_printf("phaseInc: %f, phase: %f\n",phaseInc,phase); + for(unsigned int n=0; n<context->audioFrames; n++){ + context->audioOut[n*2]=sinf(phaseInc);//context->audioIn[n*2]; + context->audioOut[n*2+1]=sinf(phaseInc);//context->audioIn[n*2]; + phase+=200.0/44100.0*2*M_PI; + if(phase>=2*M_PI) + phase-=2*M_PI; +// context->audioOut[n*2+1]=rand()/(float)RAND_MAX;context->audioIn[n*2]; + } + count++; + /* // if((count&262143)==0){ // static int nextCall=160000; - if( ((count&(2047-1))==0 /*&& count>200000*/)){ + if( ((count&(2047))==0)){ // rt_printf("b %d\n", count); clockSynchronizer.update(networkSend.getTimestamp(), receiveAudio0.getTimestamp(), receiveAudio0.getLastTime()); // nextCall=count+100000; @@ -64,7 +85,7 @@ // clockSynchronizer.update(networkSend.getTimestamp(), receiveAudio0.getTimestamp(), receiveAudio0.getLastTime()); // } if(count==0){ - gAudioCodec->setAudioSamplingRate( 44101); + gAudioCodec->setAudioSamplingRate( 44100); rt_printf("startHread\n"); ReceiveAudioThread::startThread(); } @@ -100,12 +121,15 @@ gPhase1 -= 2.0 * M_PI; if(gPhase2 > 2.0 * M_PI) gPhase2 -= 2.0 * M_PI; + int value=count%1000; + context->audioOut[n*2]=value>=500 ? 1 : -1; + context->audioOut[n*2+1]=context->audioOut[n*2]; count++; } if(count>0){ float samplingRateRatio=1; int channelsInDestinationBuffer=2; - int channelToWriteTo=0; + int channelToWriteTo=1; int length=receiveAudio0.getSamplesSrc(context->audioOut, context->audioFrames, samplingRateRatio, channelsInDestinationBuffer, channelToWriteTo); if((unsigned int)length!=context->audioFrames){ @@ -114,8 +138,10 @@ // int readPointer1=receiveAudio1.getSamplesSrc(context->audioOut, context->audioFrames, 1, 2, 1); } for(unsigned int n=0; n<context->audioFrames; n++){ - context->audioOut[n*2+1]=context->audioOut[n*2]; +// context->audioOut[n*2+1]=context->audioOut[n*2]; } + */ + } // cleanup_render() is called once at the end, after the audio has stopped.