Mercurial > hg > beaglert
changeset 143:dd24379336f1 ClockSync
Merged
author | Giulio Moro <giuliomoro@yahoo.it> |
---|---|
date | Mon, 14 Sep 2015 17:35:18 +0100 |
parents | 44d07fa9bd03 (diff) d064234468cd (current diff) |
children | 55c1e591cb2e |
files | |
diffstat | 29 files changed, 2454 insertions(+), 347 deletions(-) [+] |
line wrap: on
line diff
--- a/.cproject Mon Sep 14 17:31:24 2015 +0100 +++ b/.cproject Mon Sep 14 17:35:18 2015 +0100 @@ -14,11 +14,11 @@ </extensions> </storageModule> <storageModule moduleId="cdtBuildSystem" version="4.0.0"> - <configuration artifactName="${ProjName}" buildArtefactType="org.eclipse.cdt.build.core.buildArtefactType.exe" buildProperties="org.eclipse.cdt.build.core.buildType=org.eclipse.cdt.build.core.buildType.debug,org.eclipse.cdt.build.core.buildArtefactType=org.eclipse.cdt.build.core.buildArtefactType.exe" cleanCommand="rm -rf" description="" id="cdt.managedbuild.config.gnu.exe.debug.528876549" name="Debug" parent="cdt.managedbuild.config.gnu.exe.debug" postannouncebuildStep="Kills the process running on the BeagleBone (if any) and copies the new binary to the BeagleBone in beaglert/" postbuildStep="ssh root@192.168.7.2 "kill -s 2 \`pidof ${BuildArtifactFileName}\` 2>/dev/null; sleep 0.5; scp host:${PWD}/${BuildArtifactFileName} ~/beaglert/ && echo 'done copying\n' | wall || echo 'error'|wall""> + <configuration artifactName="${ProjName}" buildArtefactType="org.eclipse.cdt.build.core.buildArtefactType.exe" buildProperties="org.eclipse.cdt.build.core.buildType=org.eclipse.cdt.build.core.buildType.debug,org.eclipse.cdt.build.core.buildArtefactType=org.eclipse.cdt.build.core.buildArtefactType.exe" cleanCommand="rm -rf" description="" id="cdt.managedbuild.config.gnu.exe.debug.528876549" name="Debug" parent="cdt.managedbuild.config.gnu.exe.debug" postannouncebuildStep="Kills the process running on the BeagleBone (if any) and copies the new binary to the BeagleBone in beaglert/" postbuildStep="ssh root@192.168.7.2 "kill -s 9 \`pidof ${BuildArtifactFileName}\` 2>/dev/null; sleep 0.5; scp host:${PWD}/${BuildArtifactFileName} ~/beaglert/ && echo 'done copying\n' | wall || echo 'error'|wall""> <folderInfo id="cdt.managedbuild.config.gnu.exe.debug.528876549." name="/" resourcePath=""> <toolChain id="cdt.managedbuild.toolchain.gnu.exe.debug.681872250" name="Linux GCC" superClass="cdt.managedbuild.toolchain.gnu.exe.debug"> <targetPlatform id="cdt.managedbuild.target.gnu.platform.exe.debug.295375065" name="Debug Platform" superClass="cdt.managedbuild.target.gnu.platform.exe.debug"/> - <builder buildPath="${workspace_loc:/BBB_audio+input/Debug}" id="cdt.managedbuild.target.gnu.builder.exe.debug.26322421" keepEnvironmentInBuildfile="false" managedBuildOn="true" name="Gnu Make Builder" superClass="cdt.managedbuild.target.gnu.builder.exe.debug"> + <builder arguments="-j6" buildPath="${workspace_loc:/BBB_audio+input/Debug}" command="make" id="cdt.managedbuild.target.gnu.builder.exe.debug.26322421" keepEnvironmentInBuildfile="false" managedBuildOn="true" name="Gnu Make Builder" superClass="cdt.managedbuild.target.gnu.builder.exe.debug"> <outputEntries> <entry flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="outputPath" name="Debug"/> <entry flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="outputPath" name="Release"/> @@ -31,8 +31,6 @@ <option id="gnu.cpp.compiler.option.include.paths.2031219124" name="Include paths (-I)" superClass="gnu.cpp.compiler.option.include.paths" valueType="includePath"> <listOptionValue builtIn="false" value="/usr/xenomai/include"/> <listOptionValue builtIn="false" value="/usr/arm-linux-gnueabihf/include/xenomai/include"/> - <listOptionValue builtIn="false" value="/import/teaching/ECS732/arm-gcc/arm-linux-gnueabihf/include"/> - <listOptionValue builtIn="false" value="/import/teaching/ECS732/arm-gcc/arm-linux-gnueabihf/include/xenomai/include"/> <listOptionValue builtIn="false" value="/usr/arm-linux-gnueabihf/include/ne10"/> <listOptionValue builtIn="false" value=""${workspace_loc:/BeagleRT/include}""/> </option> @@ -47,10 +45,8 @@ <option id="gnu.c.compiler.option.include.paths.358825414" name="Include paths (-I)" superClass="gnu.c.compiler.option.include.paths" valueType="includePath"> <listOptionValue builtIn="false" value="/usr/xenomai/include"/> <listOptionValue builtIn="false" value="/usr/arm-linux-gnueabihf/include/xenomai/include"/> - <listOptionValue builtIn="false" value="/import/teaching/ECS732/arm-gcc/arm-linux-gnueabihf/include"/> - <listOptionValue builtIn="false" value="/import/teaching/ECS732/arm-gcc/arm-linux-gnueabihf/include/xenomai/include"/> <listOptionValue builtIn="false" value="/usr/arm-linux-gnueabihf/include/ne10"/> - <listOptionValue builtIn="false" value=""${workspace_loc:/BeagleRT/include}""/> + <listOptionValue builtIn="false" value="/BeagleRT/include"/> </option> <option id="gnu.c.compiler.option.misc.other.835792865" name="Other flags" superClass="gnu.c.compiler.option.misc.other" value="-c -fmessage-length=0 -Wpointer-arith -Wunused-result -D_GNU_SOURCE -D_REENTRANT -D__XENO__ -std=gnu99" valueType="string"/> <option id="gnu.c.compiler.option.warnings.allwarn.1145989346" name="All warnings (-Wall)" superClass="gnu.c.compiler.option.warnings.allwarn" value="true" valueType="boolean"/> @@ -61,9 +57,10 @@ <option id="gnu.cpp.link.option.paths.462980690" name="Library search path (-L)" superClass="gnu.cpp.link.option.paths" valueType="libPaths"> <listOptionValue builtIn="false" value="/usr/xenomai/lib"/> <listOptionValue builtIn="false" value="/usr/local/linaro/arm-linux-gnueabihf/include/xenomai/lib"/> - <listOptionValue builtIn="false" value="/import/teaching/ECS732/arm-gcc/arm-linux-gnueabihf/lib"/> <listOptionValue builtIn="false" value="/usr/lib/arm-linux-gnueabihf"/> - <listOptionValue builtIn="false" value="/import/teaching/ECS732/arm-gcc/arm-linux-gnueabihf/lib/xenomai"/> + <listOptionValue builtIn="false" value="/usr/arm-linux-gnueabihf/lib"/> + <listOptionValue builtIn="false" value="/usr/arm-linux-gnueabihf/lib/xenomai"/> + <listOptionValue builtIn="false" value="/usr/arm-linux-gnueabihf/include/xenomai/lib"/> </option> <option id="gnu.cpp.link.option.libs.139390951" name="Libraries (-l)" superClass="gnu.cpp.link.option.libs" valueType="libs"> <listOptionValue builtIn="false" value="rt"/> @@ -82,10 +79,6 @@ </tool> <tool command="arm-linux-gnueabihf-as" id="cdt.managedbuild.tool.gnu.assembler.exe.debug.37270610" name="GCC Assembler" superClass="cdt.managedbuild.tool.gnu.assembler.exe.debug"> <option id="gnu.both.asm.option.include.paths.1403814918" name="Include paths (-I)" superClass="gnu.both.asm.option.include.paths" valueType="includePath"> - <listOptionValue builtIn="false" value="/import/teaching/ECS732/arm-gcc/arm-linux-gnueabihf/include/xenomai/include"/> - <listOptionValue builtIn="false" value="/usr/arm-linux-gnueabihf/include/xenomai/include"/> - <listOptionValue builtIn="false" value="/import/teaching/ECS732/arm-gcc/arm-linux-gnueabihf/include"/> - <listOptionValue builtIn="false" value="/usr/arm-linux-gnueabihf/include/ne10"/> <listOptionValue builtIn="false" value=""${workspace_loc:/BeagleRT/include}""/> </option> <inputType id="cdt.managedbuild.tool.gnu.assembler.input.1788972942" superClass="cdt.managedbuild.tool.gnu.assembler.input"/> @@ -95,6 +88,7 @@ <sourceEntries> <entry excluding="default_main.cpp|audio_routines_old.S" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name="core"/> <entry flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name="include"/> + <entry flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="projects/scope"/> <entry flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="source"/> </sourceEntries> </configuration> @@ -114,13 +108,13 @@ </extensions> </storageModule> <storageModule moduleId="cdtBuildSystem" version="4.0.0"> - <configuration artifactName="${ProjName}" buildArtefactType="org.eclipse.cdt.build.core.buildArtefactType.exe" buildProperties="org.eclipse.cdt.build.core.buildType=org.eclipse.cdt.build.core.buildType.release,org.eclipse.cdt.build.core.buildArtefactType=org.eclipse.cdt.build.core.buildArtefactType.exe" cleanCommand="rm -rf" description="" id="cdt.managedbuild.config.gnu.exe.release.1521194538" name="Release" parent="cdt.managedbuild.config.gnu.exe.release" postannouncebuildStep="Stopping process on BBB and copying new binary" postbuildStep="ssh root@192.168.7.2 "kill -s 2 \`pidof ${BuildArtifactFileName}\` 2>/dev/null"; scp ${BuildArtifactFilePrefix}${BuildArtifactFileName} root@192.168.7.2:~/beaglert/ ; echo 'done copying' | wall"> + <configuration artifactName="${ProjName}" buildArtefactType="org.eclipse.cdt.build.core.buildArtefactType.exe" buildProperties="org.eclipse.cdt.build.core.buildType=org.eclipse.cdt.build.core.buildType.release,org.eclipse.cdt.build.core.buildArtefactType=org.eclipse.cdt.build.core.buildArtefactType.exe" cleanCommand="rm -rf" description="" id="cdt.managedbuild.config.gnu.exe.release.1521194538" name="Release" parent="cdt.managedbuild.config.gnu.exe.release" postannouncebuildStep="Stopping process on BBB and copying new binary" postbuildStep="ssh root@192.168.7.2 "kill -s 9 \`pidof ${BuildArtifactFileName}\` 2>/dev/null; sleep 0.5;"; scp ${PWD}/${BuildArtifactFileName} root@192.168.7.2:beaglert/ && echo 'done copying\n' | wall || echo 'error'|wall"> <folderInfo id="cdt.managedbuild.config.gnu.exe.release.1521194538." name="/" resourcePath=""> <toolChain id="cdt.managedbuild.toolchain.gnu.exe.release.1612059942" name="Linux GCC" superClass="cdt.managedbuild.toolchain.gnu.exe.release"> <targetPlatform id="cdt.managedbuild.target.gnu.platform.exe.release.908983575" name="Debug Platform" superClass="cdt.managedbuild.target.gnu.platform.exe.release"/> - <builder buildPath="${workspace_loc:/BBB_audio+input/Release}" id="cdt.managedbuild.target.gnu.builder.exe.release.511190290" keepEnvironmentInBuildfile="false" managedBuildOn="true" name="Gnu Make Builder" superClass="cdt.managedbuild.target.gnu.builder.exe.release"/> + <builder arguments="-j6" buildPath="${workspace_loc:/BBB_audio+input/Release}" command="make" id="cdt.managedbuild.target.gnu.builder.exe.release.511190290" keepEnvironmentInBuildfile="false" managedBuildOn="true" name="Gnu Make Builder" superClass="cdt.managedbuild.target.gnu.builder.exe.release"/> <tool command="arm-linux-gnueabihf-g++ " id="cdt.managedbuild.tool.gnu.archiver.base.810674388" name="GCC Archiver" superClass="cdt.managedbuild.tool.gnu.archiver.base"/> - <tool command="arm-linux-gnueabihf-g++ " commandLinePattern="${COMMAND} ${FLAGS} ${OUTPUT_FLAG} ${OUTPUT_PREFIX}${OUTPUT} ${INPUTS}" id="cdt.managedbuild.tool.gnu.cpp.compiler.exe.release.163790048" name="GCC C++ Compiler" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.exe.release"> + <tool command="arm-linux-gnueabihf-g++" commandLinePattern="${COMMAND} ${FLAGS} ${OUTPUT_FLAG} ${OUTPUT_PREFIX}${OUTPUT} ${INPUTS}" id="cdt.managedbuild.tool.gnu.cpp.compiler.exe.release.163790048" name="GCC C++ Compiler" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.exe.release"> <option id="gnu.cpp.compiler.exe.release.option.optimization.level.2031085570" name="Optimization Level" superClass="gnu.cpp.compiler.exe.release.option.optimization.level" value="gnu.cpp.compiler.optimization.level.more" valueType="enumerated"/> <option id="gnu.cpp.compiler.exe.release.option.debugging.level.701035863" name="Debug Level" superClass="gnu.cpp.compiler.exe.release.option.debugging.level" value="gnu.cpp.compiler.debugging.level.none" valueType="enumerated"/> <option id="gnu.cpp.compiler.option.include.paths.823255770" name="Include paths (-I)" superClass="gnu.cpp.compiler.option.include.paths" valueType="includePath">
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/Clock.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,24 @@ +#include "Clock.h" + +namespace Clock +{ +#ifdef USE_JUCE + myClock_t getTimeUs(){ + struct timeval tv; + struct timezone tz; + int ret=gettimeofday(&tv, &tz); + if(ret == -1){ + printf("Error while getting time of the day\n"); + return -1; + } + myClock_t time=tv.tv_usec + tv.tv_sec*1e6; + // printf("Time is: %.6f\n", time/1000000.0); + // printf("Time is: %lld\n", time); + return time; + } +#else + myClock_t getTimeUs(){ + return (myClock_t)((rt_timer_read()+500)/1000); + } +#endif /* USE_JUCE */ +};
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/ClockSync.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,337 @@ +#include "ClockSync.h" + +void ClockSync::setVirtualClock(VirtualClock &aVirtualClock){ + virtualClock=&aVirtualClock; +} +void ClockSync::init(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){ + setVirtualClock(aVirtualClock); + slave=thisIsSlave; + setPort(aPort); + // isSlave() ? client.setServer("127.0.0.1") : client.setServer("127.0.0.1"); + isSlave() ? client.setServer("192.168.7.1") : client.setServer("192.168.7.2"); + bufferLength=kSyncMessageLength; + resetTs(); + receiveLoopSleepUs=100; + receiveLoopTimeout=1e5; + movingAverage.setLength(31); + expectedClockSyncType=isSlave() ? kSync : kNone; +} +void ClockSync::resetTs(){ + T1=-1; + T1p=-1; + T2=-1; + T2p=-1; +} +bool ClockSync::areTsValid(){ + return T1>0 && T1p>0 && T2>0 && T2p>0; +} +ClockSync::ClockSync(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock){ + init(thisIsSlave, aPort, aVirtualClock); +} +void* ClockSync::getBuffer(){ + return buffer; +} +bool ClockSync::isSlave(){ + return slave; +} +bool ClockSync::isMaster(){ + return !slave; +} +int ClockSync::getType(){ + return ((int*)buffer)[0]; +} +myClock_t ClockSync::getTimestamp(){ + return *((myClock_t*)((char*)buffer+sizeof(int))); +} +void ClockSync::setType(int clockSyncType){ + ((int*)buffer)[0]=clockSyncType; +} +void ClockSync::setTimestamp(myClock_t timestamp){ +// printf("setting timestamp: %lld\n", timestamp); + ((myClock_t*)((char*)buffer+sizeof(int)))[0]=timestamp; +} +void ClockSync::print(){ + //printf("type: %d, timestamp: %lld\n",*((int*)buffer),*((myClock_t*)(((int*)buffer)+1))); +} +void ClockSync::setPort(int aPort){ + if(aPort>0){ + int inPort=isSlave() ? aPort : aPort+1; + int outPort=isSlave() ? aPort+1: aPort; + server.bindToPort(inPort); + client.setPort(outPort); + //printf("Receiving on port %d\n", inPort); + //printf("Sending to port %d\n", outPort); + } +} +/** + * sends a clockSync without blocking, checks results and returns the timestamp + * immediately after the clockSync has been sent or -1 if there was an error or timeout expired. +*/ +myClock_t ClockSync::send(){ + // print(); + int ret; + ret=client.waitUntilReady(false, 0); + if(ret<=0){ //don't retry + return -1; + } + ret=client.send(buffer, bufferLength); + myClock_t timestamp=(myClock_t)virtualClock->getNow(); + if(ret<0){ + //if there was an error while sending, don't retry + return -1; + } + return timestamp; //get the accurate time *after* you sent the sync clockSync +} +/** + * receives a clockSync without blocking, checks results and returns the timestamp + * immediately after the clockSync has been received, or -1 if there was an error + * or 0 if timeout expired. +*/ +myClock_t ClockSync::receive(){ + int ret; + ret=server.waitUntilReady(true, 0); + if(ret<=0){ //don't retry + return 0; + } + ret=server.read(buffer, bufferLength, false); + myClock_t timestamp=(myClock_t)virtualClock->getNow(); + if(timestamp==0){ + //printf("The virtualClock period is <=0\n"); + return -3; + } + if(ret==-1){ + //if there was an error while receiving, don't retry + return -1; + } + if(ret!=bufferLength){ + //printf("Received a clockSync of the wrong size: %d\n", ret); + return -2; + } + // print(); + return timestamp; //get the accurate time *after* you sent the sync clockSync +} + +int ClockSync::masterSendSync(){ + //let's send a sync clockSync! + //printf("Sending a sync clockSync\n"); + setType(kSync); + setTimestamp(-1);//do not care about sending the timestamp, a more accurate one will be sent in the follow up + localTimestamp=send(); + if(localTimestamp<0){ + //printf("Could not send sync clockSync\n"); + return -1; //error, don't retry, start over + } + //let's send a followUp + //printf("sent a sync clockSync\n"); + setType(kFollowUp); + setTimestamp(localTimestamp); + if(localTimestamp<0){ + //printf("Could not send followup clockSync\n"); + return -2; //error, don't retry, start over + } + int ret=send(); + if(ret<0){ + //printf("Error while sending followup\n"); + return -3; + } + //printf("sent a followUp clockSync\n"); + expectedClockSyncType=kDelayReq; + return 1; +} +#ifdef USE_JUCE +#define NOTGSHOULDSTOP 1 +#else +extern bool gShouldStop; +#define NOTGSHOULDSTOP (!gShouldStop) +#endif /* USE_JUCE */ +int ClockSync::receiveLoop(){ + int receiveLoopElapsed=0; + while( NOTGSHOULDSTOP && (isSlave() || (receiveLoopElapsed<receiveLoopTimeout))){ //when slave, does not timeout! + receiveLoopElapsed+=receiveLoopSleepUs; + usleep(receiveLoopSleepUs); //how often to check for new clockSyncs; + // //printf("waiting for clockSyncs\n"); + localTimestamp=receive(); + if(localTimestamp<=0){ + if(localTimestamp==0){ +// printf("Socket not ready to be read: %lld\n", localTimestamp); + } + else if(localTimestamp==-1){ + printf("Error while receiving: %lld\n", localTimestamp); + } + else if(localTimestamp==-2){ + printf("Wrong size of the received clockSync: %lld\n", localTimestamp); + } + continue ; //keep waiting + } + clockSyncType=getType(); + clockSyncTimestamp=getTimestamp(); + if(clockSyncType!=expectedClockSyncType){ + //printf("Wrong clockSync type: %d, expected: %d\n",clockSyncType, expectedClockSyncType); + return -2; //start over + } +// printf("Received clockSync type: %d, clockSyncTimestamp: %lld\n", clockSyncType, clockSyncTimestamp); + if(isSlave()==true){ + int ret=slaveHandleMessage(); + if(ret==1 && clockSyncType==kDelayResp){ //we are done, end of a cycle! + return 1; + } else if (ret!=1) { + return -1; // + } else { + continue; + } + } + if(isMaster()==true){ //this is master + int ret=masterHandleMessage(); + if(ret==1 && clockSyncType==kDelayReq){ //we are done, end of a cycle! + return 1; + } else { + return -2; //we are done but something was wrong + } + } + } + //printf("Receive loop timeout\n"); + return -1; +} + +int ClockSync::slaveHandleMessage(){ + switch(clockSyncType){ + case kSync: //the clockSync timestamp is meaningless, the localTimestamp is when kSync was received + resetTs(); + T1p=localTimestamp; + expectedClockSyncType=kFollowUp; + break; + case kFollowUp: //the clockSyncTimestamp is the time when kSync was sent, the localTimestamp is meaningless + T1=clockSyncTimestamp; + //send delayReq + setType(kDelayReq); + setTimestamp(-1); + T2=send(); + if(T2<0){ + //printf("Error while sending delayReq\n"); + return -1; + } + expectedClockSyncType=kDelayResp; + break; + case kDelayResp: {//the clockSyncTimestamp is the instant when the master received the kDelayResp clockSync, the localTimestamp is meaningless + T2p=clockSyncTimestamp; + //TODO: evaluate things + double offset=(T1p-T1-T2p+T2)/2.0d; + if(areTsValid()){ + processOffset(offset); + + /* + static int calls=0; + static double referenceOffset=0; + + if(calls<100){ // start by averaging everything + movingAverage.add(offset); + } else { //once we get an estimate, start discarding outliers + float maxOffsetDeviation=20; + float deviation=fabsf(movingAverage.getAverage()-offset); + if(deviation<maxOffsetDeviation){ + movingAverage.add(offset); + printf("w(end+1)=%4.1f;\n", movingAverage.getAverage()); + } else { +// printf("Out of range: %f \n", deviation); + } + } + printf("offset(end+1)=%f;\n", offset); + if (calls==100){ +// printf("Update reference\n"); + referenceOffset=movingAverage.getAverage(); + } else if (calls==200){ + calls=99; + } + calls++; +*/ + +// printf("%lld, %lld, %lld, %lld \n", T1, T1p, T2, T2p); +// if(T2-T1p<0){ +// printf("Negative: %lld, %lld, %lld, %lld \n", T1, T1p, T2, T2p); +// } + } else { + printf("InvalidTs:\n %lld, %lld, %lld, %lld \n", T1, T1p, T2, T2p); + } + expectedClockSyncType=kSync; //end of the cycle, wait for next sync. + break; + } + default: + //printf("Unexpected message type\n"); // we should never get here + return -1; + } + return 1; +} +#include <I2c_Codec.h> +extern I2c_Codec* gAudioCodec; +void ClockSync::processOffset(double offset){ + static int calls=0; + // TODO: change the flow control below so that it can happen multiple times + //(base it upon the length of movingAverage rather than the number of calls) + if(calls<10) { //get an initial guess + movingAverage.add(offset); +// printf("-----------OFFSET IS : %04.4f samples, average: %04.4f samples\n", +// offset, movingAverage.getAverage()); + } else if (calls==10){ //then compensate for initial offset +// printf("compensating for offset: %f\n", offset); + virtualClock->addOffset(movingAverage.getAverage()); + movingAverage.reset(); + } else if (calls>=10){ //use IIR filter from now on + //filter coefficients obtained from Matlab : [B,A]=butter(2,0.005); +// static float B[3]={6.10061787580662e-05, 0.000122012357516132, 6.10061787580662e-05}; +// static float A[3]={1, -1.97778648377676, 0.978030508491796}; + static float B[3]={6.10061787580662e-05, 0.000122012357516132, 6.10061787580662e-05}; + static float A[3]={1, -1.97778648377676, 0.978030508491796}; + static float pastOut[3]={0,0,0}; + static float pastIn[3]={0,0,0}; + float in=offset; + float out= -pastOut[1]*A[1] -pastOut[2]*A[2] +in*B[0] +pastIn[1]*B[1] +pastIn[2]*B[2]; + pastOut[2]=pastOut[1]; + pastOut[1]=out; + pastIn[2]=pastIn[1]; + pastIn[1]=in; + offset=out; + static float maxOffset=0; + maxOffset=fabsf(offset) > fabsf(maxOffset) ? offset : maxOffset; + printf("%10.3f, %10.3f, %10.3f, %10.3f\n", in, offset, offset-pastOut[2], maxOffset); //unfiltered, filtered + if(fabsf(offset)>10 && calls>30){ + calls=11; + //TODO: correct for offset + float targetSamplingRate=offset>0 ? 44097 : 44103; + gAudioCodec->setAudioSamplingRate(targetSamplingRate); +// pastOut[1]=pastOut[2]=pastIn[1]=pastIn[2]=offset; + printf("------setAudioSmplingRate to %f\n", targetSamplingRate); + } + } + calls++; +} +int ClockSync::masterHandleMessage(){ + switch(clockSyncType){ + case kDelayReq: + //send kDelayResp + setType(kDelayResp); + setTimestamp(localTimestamp); + send(); + expectedClockSyncType=kNone; + return 1; + break; + default: + return -1; + } +} + +int ClockSync::sendReceiveLoop(){ + if(isSlave()==true){ + //printf("Waiting for a sync clockSync\n"); + } else { //if this is master + usleep(100000); //this times (roughly) how often sync clockSyncs are being sent. + int ret=masterSendSync(); + if(ret<=0) + return -1; + } + int ret=receiveLoop(); + if(ret<=0) + return -2; + return 1; +} + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/ClockSyncThread.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,66 @@ +#include "ClockSyncThread.h" +#ifdef USE_JUCE +#else //declare static members TODO: rather refactor this similar to other threads so that only run and clockSyncTask are static + myClock_t ClockSyncThread::lastTime; // Used for clock synchronization + bool ClockSyncThread::listening; + ClockSync ClockSyncThread::clockSync; + VirtualClock* ClockSyncThread::virtualClock; + bool ClockSyncThread::threadIsExiting; + AuxiliaryTask ClockSyncThread::clockSyncTask; +#endif +#ifdef USE_JUCE +ClockSyncThread::ClockSyncThread(const String &threadName) : + Thread(threadName) +#else +ClockSyncThread::ClockSyncThread() +#endif /* USE_JUCE */ +{ +}; +ClockSyncThread::~ClockSyncThread(){ +#ifdef USE_JUCE + stopThread(1000); +#else + stopThread(); +#endif /* USE_JUCE */ +} +void ClockSyncThread::init(bool isSlave, int aPort, VirtualClock &aVirtualClock){ + setVirtualClock(aVirtualClock); + listening=false; + clockSync.init(isSlave, aPort, *virtualClock); +#ifdef USE_JUCE + startThread(5); +#else + threadIsExiting=false; + clockSyncTask=BeagleRT_createAuxiliaryTask(&ClockSyncThread::run,60, "clockSyncTask"); + //TODO: the thread cannot be started here at the moment because init() is called in setup(), where tasks cannot be scheduled +#endif /* USE_JUCE */ +} + +#ifdef USE_JUCE +#else +void ClockSyncThread::startThread(){ + BeagleRT_scheduleAuxiliaryTask(clockSyncTask); +} +void ClockSyncThread::stopThread(){ + threadIsExiting=true; +} +bool ClockSyncThread::threadShouldExit(){ + return(gShouldStop || threadIsExiting ); +} +#endif /* USE_JUCE */ + +void ClockSyncThread::setVirtualClock(VirtualClock &aVirtualClock){ + virtualClock=&aVirtualClock; +}; + +void ClockSyncThread::run(){ + printf("var=["); + while(!threadShouldExit()){ + clockSync.sendReceiveLoop(); +// double now=virtualClock->getNow(); +// printf("th(end+1)=%f;\n", now); +// printf("act(end+1)=%lld;\n", Clock::getTimeUs()); + } + printf("];\n"); +// printf("Thread is not running \n"); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/ClockSynchronizer.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,136 @@ +/* + * ClockSynchronizer.cpp + * + * Created on: 26 Aug 2015 + * Author: giulio + */ + +#include "ClockSynchronizer.h" + +// declare static members +float ClockSynchronizer::targetSamplingRate; +float ClockSynchronizer::currentSamplingRate; +bool ClockSynchronizer::threadRunning; +int ClockSynchronizer::threadWasRunning; +bool ClockSynchronizer::staticConstructed=false; +AuxiliaryTask ClockSynchronizer::setClockTask; + +void ClockSynchronizer::staticConstructor(){ + int priority=90; + setClockTask=BeagleRT_createAuxiliaryTask(&ClockSynchronizer::setClock, priority, "setClockTask"); + threadRunning=false; +} + +void ClockSynchronizer::setClock(){ +// rt_printf("Setting clock to %f\n",targetSamplingRate); + threadRunning=true; +//I2C magic + gAudioCodec->setAudioSamplingRate(targetSamplingRate); + threadRunning=false; + threadWasRunning=1; +// rt_printf("Exiting thread\n"); +}; + +ClockSynchronizer::ClockSynchronizer() { + reset(); +} + +ClockSynchronizer::~ClockSynchronizer(){}; + +void ClockSynchronizer::setup(){ + if(staticConstructed==false) //This should be called in the constructor, but because of the current limitations of + // BeagleRT, it is here and setup() needs to be called in BeagleRT setup(); + staticConstructor(); +} +void ClockSynchronizer::reset(){ + localCounter=0; + remoteCounter=0; + lastTime=0; + localOffset=-1; + remoteOffset=-1; + timeOffset=-1; + currentSamplingRate=44100; +} + +void ClockSynchronizer::update(int aLocalCounter, int aRemoteCounter, RTIME aLastTime){ + if(threadRunning==true){ //do nothing if clock is being adjusted + rt_printf("do nothing if clock is being adjusted\n"); + return; + } + if(threadWasRunning > 0){ //reset variables after clock has been adjusted + threadWasRunning--; // wait a few calls to make sure the clock stabilizes + rt_printf("wait a few calls to make sure the clock stabilizes\n"); + return; + }/* + if(threadWasRunning==1){ + threadWasRunning=0; + // reset offsets after a correction + localOffset=aLocalCounter; + remoteOffset=aRemoteCounter; + timeOffset=aLastTime; + rt_printf("reset variables after clock has been adjusted\n"); + return; + }*/ + if (localOffset<=0 || remoteOffset<=0){ // probably this is the first run + localOffset=aLocalCounter; + remoteOffset=aRemoteCounter; + timeOffset=aLastTime; + rt_printf("First run of update(), localOffset: %d, remoteOffset: %d, timeOffset: %llu\n", + localOffset, remoteOffset, timeOffset); + return; + } + localCounter=aLocalCounter-localOffset; + remoteCounter=aRemoteCounter-remoteOffset; + lastTime=aLastTime-timeOffset; + if (localCounter<=0 || remoteCounter<=0 || timeOffset<=0) {// did anything wrong happened? + rt_printf("fourth\n"); + return; + } + // TODO: make sure we do not get actually adjust the clock too often (e.g.: limits on the timestamp) + // Should keep track of last time a change has been made, so that when a large compensation is needed + // (e.g.: as the program has just started), adjustClock() is called more frequently + // but gets called less often later. + // Should also try to avoid drastic correction after a while that the program + // has started, so to avoid glitches. But this can maybe be handled by the thread itself. + // TODO: should keep a log and maybe compensate for (and prevent) overshoot according to previous changes + static RTIME lastlastTime=0; +// rt_printf("interval: %f, \n",(double)(remoteCounter*300.0)/lastTime*1000000000.0); + lastlastTime=lastTime; + currentSamplingRate=gAudioCodec->getAudioSamplingRate(); + float T=1/currentSamplingRate; + int elapsedSamples=remoteCounter*(NetworkBuffer::bufferLength-NetworkBuffer::headerLength); + double expectedTimeUs=T*elapsedSamples*1000000; + double actualTimeS=lastTime/1000000000.0; + double actualTimeUs=lastTime/1000.0; + static float averageBuffer[101]={0}; + static int averageBufferPointer=0; + static float average=0; + static int bufferFull=0; + average-=averageBuffer[averageBufferPointer]; + averageBuffer[averageBufferPointer]=elapsedSamples/actualTimeS; + average+=averageBuffer[averageBufferPointer]; + averageBufferPointer++; + if(averageBufferPointer==101){ + averageBufferPointer=0; + bufferFull++; + } + +// rt_printf("Fs: %f, expectedTimeUs: %f, lastTime: %ul\n", expectedTimeUs, lastTime); +// rt_printf("Fs: %.1f, actualTimeS: %4.6f, targetFs_ %.3f\n", + if((averageBufferPointer&3)==0){ + static float oldTargetSamplingRate=0; + targetSamplingRate=average/101; + if(bufferFull>=3 && fabsf(targetSamplingRate-oldTargetSamplingRate) < 1){ + rt_printf("%.1f, %4.6f, %.3f;\n", + gAudioCodec->getAudioSamplingRate(), actualTimeS, targetSamplingRate, targetSamplingRate-oldTargetSamplingRate); + adjustClock(); + } + oldTargetSamplingRate=targetSamplingRate; + } +} + +void ClockSynchronizer::adjustClock(){ + if(fabsf(currentSamplingRate-targetSamplingRate)>0.7){ //TODO: actually check that the difference is less than the quantization error in the PLL + BeagleRT_scheduleAuxiliaryTask(setClockTask); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/NetworkSend.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,294 @@ +#include "NetworkSend.h" + +#ifdef USE_JUCE +#else +//initialize the static members of NetworkSend +bool NetworkSend::staticConstructed=false; +int NetworkSend::sleepTimeMs; +bool NetworkSend::threadIsExiting; +bool NetworkSend::threadRunning; +std::vector<NetworkSend*> NetworkSend::objAddrs(0); +AuxiliaryTask NetworkSend::sendDataTask=NULL; + +void sendData(){ + NetworkSend::run(); +} +void NetworkSend::staticConstructor(){ + if(staticConstructed==true) + return; + staticConstructed=true; + threadIsExiting=false; + threadRunning=false; + sendDataTask = BeagleRT_createAuxiliaryTask(::sendData, 95, "sendDataTask"); //TODO: allow variable priority +} +void NetworkSend::sendAllData(){ + for(unsigned int n=0; n<NetworkSend::objAddrs.size(); n++){ + NetworkSend::objAddrs[n]->sendData(); + } +} +int NetworkSend::getNumInstances(){ + return objAddrs.size(); +} +void NetworkSend::startThread(){ + BeagleRT_scheduleAuxiliaryTask(sendDataTask); +} +void NetworkSend::stopThread(){ + threadIsExiting=true; +} +bool NetworkSend::threadShouldExit(){ + return(gShouldStop || threadIsExiting); +} +bool NetworkSend::isThreadRunning(){ + return threadRunning; +} +#endif /* USE_JUCE */ + +#ifdef USE_JUCE +NetworkSend::NetworkSend(const String &threadName): + Thread(threadName) +#else +NetworkSend::NetworkSend() +#endif /* USE_JUCE */ +{ + channel.buffers=NULL; + channel.doneOnTime=NULL; + channel.readyToBeSent=NULL; + channel.enabled=false; + sleepTimeMs=2; //This should actually be initialized in the staticConstructor for non-Juce code, but doing it here makes it more portable + channel.sampleCount=0; +} + +NetworkSend::~NetworkSend(){ +#ifdef USE_JUCE + stopThread(1000); +#else + stopThread(); + for(unsigned int n=0; n<objAddrs.size(); n++){ //keep track of deleted instances; + if(objAddrs[n]==this){ + objAddrs.erase(objAddrs.begin()+n); + break; + } + } +#endif + dealloc(); +} +void NetworkSend::dealloc(){ + channel.enabled=false; + if(channel.buffers!=NULL){ + for(int n=0; n<channel.numBuffers; n++){ + free(channel.buffers[n]); + channel.buffers[n]=NULL; + } + free(channel.buffers); + channel.buffers=NULL; + } + free(channel.readyToBeSent); + channel.readyToBeSent=NULL; + free(channel.doneOnTime); + channel.doneOnTime=NULL; +} +void NetworkSend::cleanup(){ + dealloc(); +} + +void NetworkSend::setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer){ +#ifdef USE_JUCE +#else + staticConstructor(); //FIXME: ideally this should be in the constructor, but this is not currently possible + //because of limitations in BeagleRT_createAuxiliaryTask() + //keep track of added active instances + objAddrs.push_back(this);//TODO: this line should be in the constructor, but something weird happens if + // an instance of NetworkSend is then declared globally: the constructor gets called, + // and objAddrs.size()==1 but when you get to setup, objAddrs.size() has reverted back to 0, without + // any destructor being called in between ... +#endif /* USE_JUCE */ + cleanup(); + int numSamples=blockSize*4>4*channel.bufferLength ? blockSize*4 : 4*channel.bufferLength; + channel.numBuffers= 1+numSamples/channel.bufferLength; //the +1 takes the ceil() of the division + channel.buffers=(float**)malloc(channel.numBuffers*sizeof(float*)); + printf("NumBuffers: %d\n", channel.numBuffers); + if(channel.buffers==NULL) + return; + for(int n=0; n<channel.numBuffers; n++){ + channel.buffers[n]=(float*)malloc(channel.bufferLength*sizeof(float)); + if(channel.buffers[n]==NULL) + return; + } + channel.readyToBeSent=(bool*)malloc(channel.numBuffers*sizeof(bool)); + channel.doneOnTime=(bool*)malloc(channel.numBuffers*sizeof(bool)); + for(int n=0; n<channel.numBuffers; n++){ + channel.readyToBeSent[n]=false; + channel.doneOnTime[n]=true; + } + if(channel.readyToBeSent==NULL || channel.doneOnTime==NULL) + return; + channel.writePointer=0; + channel.writeBuffer=0; + channel.readBuffer=0; + setChannelNumber(aChannelNumber); + setPort(aPort); //TODO: check for the return value + setServer(aServer); //TODO: check for the return value + printf("Channel %d is sending messages to: %s:%d at %fHz\n", getChannelNumber(), aServer, aPort, aSampleRate); + channel.enabled=true; +} + +void NetworkSend::log(float value){ //TODO: add a vectorized version of this method + if(channel.enabled==false) + return; + if(channel.writePointer==channel.bufferLength){ // when the buffer is filled ... + channel.readyToBeSent[channel.writeBuffer]=true; // flag it as such +// printf("Scheduling for send %d\n",(int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); + channel.writePointer=channel.headerLength; //reset the writePointer + channel.writeBuffer=(channel.writeBuffer+1); //switch buffer + if(channel.writeBuffer==channel.numBuffers) // and wrap it + channel.writeBuffer=0; +// printf("WriteBuffer:%d\n", channel.writeBuffer); + if(channel.doneOnTime[channel.writeBuffer]==false){ //check if this buffer's last sending has completed on time ... + printf("Network buffer underrun. timestamp: %d :-{\n", + (int)channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]); + } + channel.doneOnTime[channel.writeBuffer]=false; // ... and then reset the flag +#ifdef USE_JUCE + if(isThreadRunning()==false){ + startThread(10); + } +#else + if(isThreadRunning()==false){ + startThread(); + } +#endif /* USE_JUCE */ + } + if(channel.writePointer==channel.headerLength){ // we are about to start writing in the buffer, let's set the header + //set dynamic header values here. Static values are set in setup() and setChannelNumber(). + channel.buffers[channel.writeBuffer][channel.headerTimestampIndex]=(float)channel.sampleCount; //timestamp + channel.sampleCount++; + //add here more header fields + } + channel.buffers[channel.writeBuffer][channel.writePointer++]=value; +// sampleCount++; +}; + +void NetworkSend::setServer(const char *aServer){ +#ifdef USE_JUCE + remoteHostname=String::fromUTF8(aServer); +#else + udpClient.setServer(aServer); +#endif /* USE_JUCE */ +} +void NetworkSend::setPort(int aPort){ +#ifdef USE_JUCE + remotePortNumber=aPort; +#else + udpClient.setPort(aPort); +#endif /* USE_JUCE */ +} + +void NetworkSend::setChannelNumber(int aChannelNumber){ + channel.channelNumber=aChannelNumber; + for(int n=0; n<channel.numBuffers; n++){ //initialize the header + channel.buffers[n][channel.headerChannelIndex]=channel.channelNumber; + //add here more static header fields + } +}; +int NetworkSend::getChannelNumber(){ + return channel.channelNumber; +}; + +int NetworkSend::getTimestamp(){ + return channel.buffers[channel.readBuffer][channel.headerTimestampIndex]; +} + +void NetworkSend::sendData(){ + if(channel.enabled==false) + return; + while(channel.readyToBeSent[channel.readBuffer]==true){ + channel.readyToBeSent[channel.readBuffer]=false; + void* sourceBuffer=channel.buffers[channel.readBuffer]; +// printf("Trying to send timestamp %d\n",(int)((float*)sourceBuffer)[channel.headerTimestampIndex]); +// printf("ReadBuffer:%d\n", channel.readBuffer); + unsigned int numBytesToSend=NETWORK_AUDIO_BUFFER_SIZE*sizeof(float); + //TODO: call waitUntilReady before trying to write/send, to avoid blocks! (OR NOT?) +#ifdef USE_JUCE + if(1==udpClient.waitUntilReady(0, 5)){ + udpClient.write(remoteHostname, remotePortNumber, sourceBuffer, numBytesToSend); + channel.doneOnTime[channel.readBuffer]=true; + // printf ("Sent timestamp: %d\n", (int)((float*)sourceBuffer)[1]); + } else { + // printf ("Not ready timestamp: %d\n", (int)((float*)sourceBuffer)[1]); + } +#else + udpClient.send(sourceBuffer, numBytesToSend); + channel.doneOnTime[channel.readBuffer]=true; +#endif /* USE_JUCE */ + channel.readBuffer++; + if(channel.readBuffer==channel.numBuffers) + channel.readBuffer=0; + } +} + +void NetworkSend::run(){ +#ifdef USE_JUCE + // std::chrono::high_resolution_clock::time_point t1; + // std::chrono::high_resolution_clock::time_point t2; + // std::chrono::high_resolution_clock::time_point t3; + while(threadShouldExit()==false){ + // t3 = std::chrono::high_resolution_clock::now(); + // t1 = std::chrono::high_resolution_clock::now(); + sendData(); + // t2 = std::chrono::high_resolution_clock::now(); + // auto duration1 = std::chrono::duration_cast<std::chrono::microseconds>( t3 - t1 ).count(); + // auto duration2 = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count(); + // if(duration2>0) + // std::cout << "Duration is: " << duration2 <<". Whole loop is: " << duration1 << "\n"; + sleep(1); + } +#else + threadRunning=true; + while(threadShouldExit()==false){ + sendAllData(); + usleep(sleepTimeMs*1000); + } + threadRunning=false; +#endif +} +#ifdef USE_JUCE +#else +Scope::Scope(int aNumChannels): + channels(aNumChannels) +{}; +Scope::~Scope(){}; + +void Scope::log(int channel, float value){ + if(channel>=getNumChannels()) //TODO: assert this + return; + channels[channel].log(value); +} + +void Scope::setup(){ + setup(44100, 9999, "127.0.0.1"); +} + +void Scope::setup(float sampleRate, int aPort, const char* aServer){ + for(int n=0; n<getNumChannels(); n++){ + channels[n].setup(sampleRate, 128, n, aPort, aServer); //TODO: replace 128 with the actual block size + } +} + +void Scope::setPort(int port){ + for(int n=0; n<getNumChannels(); n++){ + channels[n].setPort(port); + } +} +void Scope::setPort(int channel, int aPort){ + channels[channel].setPort(aPort); + printf("Channel %d is now sending to port %d\n", channel, aPort); +} + +int Scope::getNumChannels(){ + return channels.size(); +} + +void Scope::sendData(){ + NetworkSend::sendAllData(); +} +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/ReceiveAudioThread.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,310 @@ +#include "ReceiveAudioThread.h" + +#ifdef USE_JUCE +#else +//initialise static members +bool ReceiveAudioThread::staticConstructed=false; +AuxiliaryTask ReceiveAudioThread::receiveDataTask=NULL; +std::vector<ReceiveAudioThread *> ReceiveAudioThread::objAddrs(0); +bool ReceiveAudioThread::threadRunning; +bool ReceiveAudioThread::threadIsExiting; +int ReceiveAudioThread::sleepTime; + +void receiveData(){ + ReceiveAudioThread::run(); +} +void ReceiveAudioThread::staticConstructor(){ + if(staticConstructed==true) + return; + staticConstructed=true; + threadIsExiting=false; + receiveDataTask=BeagleRT_createAuxiliaryTask(receiveData, 90, "receiveDataTask"); //TODO: allow different priorities +} +#endif /* USE_JUCE */ + +void ReceiveAudioThread::dealloc(){ + free(buffer); + buffer=NULL; + free(stackBuffer); + stackBuffer=NULL; +} +void ReceiveAudioThread::wrapWritePointer(){ + //this is not quite a simple wrapping as you would do in a circular buffer, + //as there is no guarantee the buffer will be full at all times, given that there must alwas be enough space at the end of it + //to hold a full payload + // lastValidPointer indicates the last pointer in the buffer containing valid data + // + if(writePointer+payloadLength+headerLength>bufferLength){ //if we are going to exceed the length of the buffer with the next reading + // lastValidPointer=writePointer+headerLength; //remember where the last valid data are + // for(int n=headerLength;n<lastValidPointer; n++){ + // fprintf(fd2, "%f\n",buffer[n]); //DEBUG + // } + writePointer=0; //and reset to beginning of the buffer + } +} +void ReceiveAudioThread::pushPayload(int startIndex){ //backup the payload samples that will be overwritten by the new header + for(int n=0; n<headerLength; n++){ + stackBuffer[n]=buffer[startIndex+n]; + } +} +void ReceiveAudioThread::popPayload(int startIndex){ + for(int n=0; n<headerLength; n++){ + buffer[startIndex+n]=stackBuffer[n]; + } +} + +int ReceiveAudioThread::readUdpToBuffer(){ + if(listening==false || bufferReady==false) + return 0; + if(writePointer<0) + return 0; + if(socket.waitUntilReady(true, waitForSocketTime)){// TODO: if waitForSocketTime here is >>5, the +#ifdef USE_JUCE +#else + lastTime=rt_timer_read(); +// rt_printf("lastTimeread= %llu\n", lastTime); +#endif /* USE_JUCE */ + // destructor (always or sometimes) never actually gets called, despite run() returns ...see issue #1381 + pushPayload(writePointer); //backup headerLength samples. This could be skipped if writePointer==0 + //read header+payload + int numBytes=socket.read(buffer+writePointer, bytesToRead, true); //read without waiting. + //TODO: (if using variable-length payload) validate the actual numBytes read against the size declared in the header + if(numBytes<0){ + printf("error numBytes1\n"); + return -3; //TODO: something went wrong, you have to discard the rest of the packet! + } + if(numBytes==0){//TODO: this should not happen unless you actually receive a packet of size zero (is it at all possible?) +// printf("received 0 bytes\n"); + return 0; + } + if(numBytes != bytesToRead){ //this is equivalent to (numBytes<bytesToRead) + printf("error numBytes2: %d\n", numBytes); + return -4; //TODO: something went wrong, less bytes than expected in the payload. + } + if(channel!=(int)buffer[writePointer]){ +// printf("I am channel %d, but I received data for channel %d\n", channel, (int)buffer[writePointer]); + return -5; + } + if(buffer[writePointer+1]!=timestamp+1) + printf("missing a timestamp: %d\n",timestamp+1); + timestamp=buffer[writePointer+1]; +// rt_printf("Received a message of length %d, it was on channel %d and timestamp %d\n", numBytes, (int)buffer[writePointer], (int)buffer[writePointer+1]); + + popPayload(writePointer); //restore headerLength payload samples. This could be skipped if writePointer==0 + //even though we just wrote (payloadLength+headerLength) samples in the buffer, + //we only increment by payloadLength. This way, next time a socket.read is performed, we will + //backup the last headerLength samples that we just wrote and we will overwrite them with + //the header from the new read. After parsing the header we will then restore the backed up samples. + //This way we guarantee that, apart from the first headerLength samples, buffer is a circular buffer! + writePointer+=payloadLength; + wrapWritePointer(); + return numBytes; + } + return 0; //timeout occurred +} +//USE_JUCE Thread(threadName), +#ifdef USE_JUCE +ReceiveAudioThread::ReceiveAudioThread(const String &threadName) : + Thread(threadName), +#else +ReceiveAudioThread::ReceiveAudioThread() : +#endif /* USE_JUCE */ + socket(0), + listening(false), + bufferReady(false), + buffer(NULL), + stackBuffer(NULL), + bufferLength(0), + lastValidPointer(0), + waitForSocketTime(5), +#ifdef USE_JUCE + threadPriority(5) +#else + threadPriority(88) +#endif /* USE_JUCE */ +{}; +ReceiveAudioThread::~ReceiveAudioThread(){ +#ifdef USE_JUCE + stopThread(1000); +#else + stopThread(); + while(threadRunning){ + usleep(sleepTime*2); //wait for thread to stop + std::cout<< "Waiting for receiveAudioTask to stop" << std::endl; + } +#endif /* USE_JUCE */ + //TODO: check if thread stopped, otherwise kill it before dealloc + dealloc(); +} +void ReceiveAudioThread::init(int aPort, int aSamplesPerBlock, int aChannel){ + dealloc(); +#ifdef USE_JUCE +#else + staticConstructor(); + objAddrs.push_back(this);//TODO: this line should be in the constructor +#endif /* USE_JUCE */ + bindToPort(aPort); + channel=aChannel; + printf("Channel %d is receiving on port %d\n",aChannel, aPort); + // fd=fopen("output.m","w"); //DEBUG + // fprintf(fd,"var=["); //DEBUG + headerLength=2; + payloadLength=300; //TODO: make sure that payloadLength and headerLength are the same as the client is sending. + bufferLength=std::max(headerLength+(payloadLength*4), headerLength+(aSamplesPerBlock*4)); //there are many considerations that can be done here ... + //We keep a headerLength padding at the beginning of the array to allow full reads from the socket + buffer=(float*)malloc(sizeof(float)*bufferLength); + if(buffer==NULL) // something wrong + return; + lastValidPointer=headerLength+ ((bufferLength-headerLength)/payloadLength)*payloadLength; + memset(buffer,0,bufferLength*sizeof(float)); + stackBuffer=(float*)malloc(sizeof(float)*headerLength); + if(stackBuffer==NULL) // something wrong + return; + bufferReady=true; + bytesToRead=sizeof(float)*(payloadLength + headerLength); + writePointer=-1; + readPointer=0; + sleepTime=payloadLength/(float)44100 /4.0; //set sleepTime so that you do not check too often or too infrequently + timestamp=0; +#ifdef USE_JUCE + startThread(threadPriority); +#else + //TODO: the thread cannot be started here at the moment because init() is called in setup(), where tasks cannot be scheduled +#endif /* USE_JUCE */ +} + +void ReceiveAudioThread::bindToPort(int aPort){ + listening=socket.bindToPort(aPort); +#ifdef USE_JUCE +#else + if(listening==false) //this condition is valid also for USE_JUCE, but we do not printf in USE_JUCE + printf("Could not bind to port %d\n",aPort); +#endif /* USE_JUCE */ +} +bool ReceiveAudioThread::isListening(){ + return listening; +} +float* ReceiveAudioThread::getCurrentBuffer(int length){ // NOTE: this cannot work all the time unless samplesPerBuffer and payloadLength are multiples + //TODO: make it return the number of samples actually available at the specified location + if(isListening()==false || length>bufferLength) + return NULL; + readPointer+=length; + if(readPointer>lastValidPointer){ + readPointer=headerLength; + } + return buffer+(int)readPointer; +}; +int ReceiveAudioThread::getSamplesSrc(float *destination, int length, + float samplingRateRatio, int numChannelsInDestination, + int channelToWriteTo) +{ + if (!(samplingRateRatio>0 && samplingRateRatio<=2)) + return -2; + if(isListening()==false) + return -1; + static int numCalls=0; + if(writePointer<0 /*|| (numCalls&16383)==0*/){ //if writePointer has not been initalized yet ... +#ifdef USE_JUCE +#else //debug + rt_printf("reinit the writePointer, readPointer: %f;\n",readPointer); + readPointer=0; +#endif /* USE_JUCE */ + writePointer=2*length; // do it, so that it starts writing at a safety margin from where we write. + // This will help keeping them in sync. + //TODO: handle what happens when the remote stream is interrupted and then restarted + } + numCalls++; + if(length>lastValidPointer) { + //not enough samples available, we fill the buffer with what is available, but the destination buffer will not be filled completely + //at this very moment the other thread might be writing at most one payload into the buffer. + //To avoid a race condition, we need to let alone the buffer where we are currently writing + //as writing the payload also temporarily overwrites the previous headerLength samples, we need to account for them as well + //TODO: This assumes that the writePointer and readPointer do not drift. When doing clock synchronization we will find out that it is not true! + length=lastValidPointer-payloadLength-headerLength; + if(length<0) //no samples available at all! + return 0; + } + for(int n=0; n<length; n++){ + destination[n*numChannelsInDestination+channelToWriteTo]=buffer[(int)(0.5+readPointer)];//simple ZOH non-interpolation (nearest neighbour) + // fprintf(fd,"%f, %d, %f;\n",readPointer,writePointer,destination[n]); //DEBUG + readPointer+=samplingRateRatio; + if((int)(0.5+readPointer)>=lastValidPointer){ + readPointer=readPointer-lastValidPointer+headerLength; + } + } + return length; +} +int ReceiveAudioThread::getSamplesSrc(float *destination, int length, float samplingRateRatio){ + return getSamplesSrc(destination, length, samplingRateRatio, 1,0); + // TODO: rewriting this so that it does not call the override method we can save a multiply and add + // for each sample. +} +bool ReceiveAudioThread::isBufferReady(){ + return bufferReady; +} +#ifdef USE_JUCE +#else +void ReceiveAudioThread::startThread(){ + BeagleRT_scheduleAuxiliaryTask(receiveDataTask); +} +void ReceiveAudioThread::stopThread(){ + threadIsExiting=true; +} +bool ReceiveAudioThread::threadShouldExit(){ + return(gShouldStop || threadIsExiting ); +} +RTIME ReceiveAudioThread::getLastTime(){ + return lastTime; +} +#endif /* USE_JUCE */ +int ReceiveAudioThread::getTimestamp(){ + return timestamp; +} +void ReceiveAudioThread::run(){ + // fd2=fopen("buffer.m","w"); //DEBUG + // fprintf(fd2, "buf=["); //DEBUG + threadRunning=true; + int maxCount=10; + int count=0; + // Clean the socket from anything that is currently in it. +#ifdef USE_JUCE + // this is borrowed from BeagleRT's UdpServer class. + int n; + do { + float waste; + if(socket.waitUntilReady(true, 0)==0) + break; + n=socket.read((void*)&waste, sizeof(float), false); + count++; + if(n<0){ + printf("error\n"); + break; + } + printf("n: %d\n",n); + } while (n>0 && (maxCount<=0 || count<maxCount)); +#else + for(unsigned int n=0; n<objAddrs.size(); n++){ + count=objAddrs[n]->socket.empty(maxCount); + } +#endif /* USE_JUCE */ + printf("socket emptied with %d reads\n", count); + + while(!threadShouldExit()){ //TODO: check that the socket buffer is empty before starting +#ifdef USE_JUCE + readUdpToBuffer(); // read into the oldBuffer + sleep(sleepTime); +#else + for(unsigned int n=0; n<ReceiveAudioThread::objAddrs.size(); n++){ +// printf("%d\n", n); + ReceiveAudioThread::objAddrs[n]->readUdpToBuffer(); + } + usleep(sleepTime); //TODO: use rt_task_sleep instead +#endif /* USE_JUCE */ + } + threadRunning=false; + printf("Thread is not running \n"); + // fprintf(fd,"];readPointer,writePointer,lastValidPointer,destination]=deal(var(:,1), var(:,2), var(:,3), var(:,4));"); //DEBUG + // fclose(fd);//DEBUG + // fprintf(fd2,"];");//DEBUG + // fclose(fd2); //DEBUG +}
--- a/core/UdpClient.cpp Mon Sep 14 17:31:24 2015 +0100 +++ b/core/UdpClient.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -1,57 +1,77 @@ -/* - * udpClient.cpp - * - * Created on: 19 May 2015 - * Author: giulio moro - */ -#include "../include/UdpClient.h" - - UdpClient::UdpClient(){ - outSocket=socket(AF_INET, SOCK_DGRAM, 0); - isSetPort=false; - isSetServer=false; - enabled=false; - } - UdpClient::UdpClient(int aPort, const char* aServerName){ - outSocket=socket(AF_INET, SOCK_DGRAM, 0); - if(outSocket<0){ - enabled=false; - return; - } - setPort(aPort); - setServer(aServerName); - isSetPort=true; - isSetServer=true; - enabled=true; - } - UdpClient::~UdpClient(){ - close(outSocket); - } - void UdpClient::setPort(int aPort){ - port=aPort; - destinationServer.sin_port = htons(port); - destinationServer.sin_family = AF_INET; - isSetPort=true; - if(isSetServer){ - enabled=true; - } - }; - void UdpClient::setServer(const char* aServerName){ - inet_pton(AF_INET,aServerName,&destinationServer.sin_addr); - isSetServer=true; - if(isSetPort){ - enabled=true; - } - }; - int UdpClient::send(void * message, int size){ - if(!enabled) - return -1; - unsigned int length; - length=sizeof(struct sockaddr_in); - int n=sendto(outSocket,message,size,0,(const struct sockaddr *)&destinationServer,length); - if (n < 0){ - return n; - } - return 1; - }; - +/* + * udpClient.cpp + * + * Created on: 19 May 2015 + * Author: giulio moro + */ +#include "UdpClient.h" + + UdpClient::UdpClient(){ + outSocket=socket(AF_INET, SOCK_DGRAM, 0); + isSetPort=false; + isSetServer=false; + enabled=false; + } + UdpClient::UdpClient(int aPort, const char* aServerName){ + outSocket=socket(AF_INET, SOCK_DGRAM, 0); + if(outSocket<0){ + enabled=false; + return; + } + setPort(aPort); + setServer(aServerName); + isSetPort=true; + isSetServer=true; + enabled=true; + memset(&stTimeOut, 0, sizeof(struct timeval)); + } + UdpClient::~UdpClient(){ + close(outSocket); + } + void UdpClient::setPort(int aPort){ + port=aPort; + destinationServer.sin_port = htons(port); + destinationServer.sin_family = AF_INET; + isSetPort=true; + if(isSetServer){ + enabled=true; + } + }; + void UdpClient::setServer(const char* aServerName){ + inet_pton(AF_INET,aServerName,&destinationServer.sin_addr); + isSetServer=true; + if(isSetPort){ + enabled=true; + } + }; + int UdpClient::send(void * message, int size){ + if(!enabled) + return -1; + unsigned int length; + length=sizeof(struct sockaddr_in); + int n=sendto(outSocket,message,size,0,(const struct sockaddr *)&destinationServer,length); + if (n < 0){ + return n; + } + return 1; + }; + int UdpClient::write(const char* remoteHostname, int remotePortNumber, void* sourceBuffer, int numBytesToWrite){ + setServer(remoteHostname); + setPort(remotePortNumber); + send(sourceBuffer, numBytesToWrite); + } + int UdpClient::waitUntilReady(bool readyForReading, int timeoutMsecs){ +// If the socket is ready on return, this returns 1. If it times-out before the socket becomes ready, it returns 0. If an error occurs, it returns -1. + if(enabled==false) + return -1; + if(timeoutMsecs<0) + return select(outSocket+1, NULL, &stWriteFDS, NULL, NULL); //calling this with a NULL timeout will block indefinitely + FD_ZERO(&stWriteFDS); + FD_SET(outSocket, &stWriteFDS); + float timeOutSecs=timeoutMsecs*0.001; + stTimeOut.tv_sec=(int)timeOutSecs; + timeOutSecs-=(int)timeOutSecs; + stTimeOut.tv_usec=(int)(timeOutSecs*1000000); + int descriptorReady= select(outSocket+1, NULL, &stWriteFDS, NULL, &stTimeOut); + return descriptorReady>0? 1 : descriptorReady; + } \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/UdpServer.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,119 @@ +/* + * udpServer.cpp + * + * Created on: 19 May 2015 + * Author: giulio moro + */ +#include "UdpServer.h" + +UdpServer::UdpServer(int aPort){ + init(aPort); +}; +UdpServer::UdpServer(){ + init(0); +} +UdpServer::~UdpServer(){ + close(); +}; +bool UdpServer::init(int aPort){ + enabled=true; + stZeroTimeOut.tv_sec = 0; //set timeout to 0 + stZeroTimeOut.tv_usec = 0; + inSocket=socket(AF_INET, SOCK_DGRAM, 0); + if (inSocket < 0){ + enabled=false; + } + length = sizeof(server); + server.sin_family=AF_INET; + server.sin_addr.s_addr=INADDR_ANY; + enabled=bindToPort(aPort); + wasteBufferSize=2048; + wasteBuffer=malloc(wasteBufferSize); + memset(&stTimeOut,0,sizeof(struct timeval)); + return enabled; +} + +bool UdpServer::bindToPort(int aPort){ + port=aPort; + if(port<1){ + enabled=false; + return false; + } + server.sin_port=htons(port); + if (bind(inSocket,(struct sockaddr *)&server,length)<0){ + enabled=false; + return false; + } + enabled=true; + return true; +} + +void UdpServer::close(){ + int ret=::close(inSocket); + if(ret != 0) + printf("Error while closing socket, errno: %d\n", errno);//Stop receiving data for this socket. If further data arrives, reject it. + inSocket=0; +} + +int UdpServer::waitUntilReady(bool readyForReading, int timeoutMsecs){ +// If the socket is ready on return, this returns 1. If it times-out before the socket becomes ready, it returns 0. If an error occurs, it returns -1. + if(enabled==false) + return -1; + if(timeoutMsecs<0) + return select(inSocket+1, &stReadFDS, NULL, NULL, NULL); //calling this with a NULL timeout will block indefinitely + FD_ZERO(&stReadFDS); + FD_SET(inSocket, &stReadFDS); + if(timeoutMsecs>=1000){ + float timeOutSecs=timeoutMsecs*0.001; + stTimeOut.tv_sec=(long int)timeOutSecs; + timeOutSecs-=(int)timeOutSecs; + stTimeOut.tv_usec=(long int)(timeOutSecs*1000000); + } else //faster! + stTimeOut.tv_usec=(long int)timeoutMsecs*1000; + int descriptorReady= select(inSocket+1, &stReadFDS, NULL, NULL, &stTimeOut); + return descriptorReady>0? 1 : descriptorReady; +} + +int UdpServer::read(//Returns the number of bytes read, or -1 if there was an error. + void *destBuffer, + int maxBytesToRead, + bool blockUntilSpecifiedAmountHasArrived) +{ + if(enabled==false) + return -1; + FD_ZERO(&stReadFDS); + FD_SET(inSocket, &stReadFDS); + int descriptorReady= select(inSocket+1, &stReadFDS, NULL, NULL, &stZeroTimeOut); //TODO: this is not JUCE-compliant + if(descriptorReady<0){ //an error occurred + return -1; + } + int numberOfBytes=0; +// do + { + if (FD_ISSET(inSocket, &stReadFDS)) + { + // numberOfBytes=recvfrom(inSocket,destBuffer,maxBytesToRead,0,(struct sockaddr *)&from,&fromLength); + numberOfBytes+=recv(inSocket,destBuffer,maxBytesToRead-numberOfBytes,0); + if(numberOfBytes<0) + return -1; + } + } +// while (blockUntilSpecifiedAmountHasArrived && numberOfBytes==maxBytesToRead); + return numberOfBytes; +} +int UdpServer::empty(){ + return empty(0); +} +int UdpServer::empty(int maxCount){ + int count=0; + int n; + do { + if(waitUntilReady(true, 0)==0) + return 0; + float waste; + n=read(&waste, sizeof(float), false); + count++; + } while (n>0 && (maxCount<=0 || maxCount<count)); + printf("socket emptied with %d reads\n", count); + return count; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/VirtualClock.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,69 @@ +#include "VirtualClock.h" +void VirtualClock::init(){ + firstRun=true; + movingAverage.setLength(101); //TODO: a better filtering algorithm ( Did you say Kalman?) + period=-1; + elapsedPeriods=0; + startTime=0; + startTimeOffset=0; + elapsedPeriodsOffset=0; +} + +VirtualClock::VirtualClock(){ + init(); +} +void VirtualClock::sync(){ + sync(1); +} +void VirtualClock::sync(double numPeriods){ + myClock_t currentTime=Clock::getTimeUs(); + static int calls=0; + elapsedPeriods+=numPeriods; + if(calls==50){ //TODO: this is dangerous as the clock might jump suddenly if currentTime is not precise + calls=0; + startTimeOffset=startTime; + startTime=currentTime; + elapsedPeriodsOffset=elapsedPeriods; + } + calls++; + if(firstRun==true){ + firstRun=false; + startTime=currentTime; + } else { + double newPeriod=(currentTime-lastSync)/numPeriods; + double expectedPeriod=22.67; + double maxPeriodDeviation=10; + if(fabs(newPeriod-expectedPeriod)<maxPeriodDeviation){ // filtering outliers + period=movingAverage.add(newPeriod); //TODO: replace with Kalman filter + } else { + printf("period out of range: %f\n", newPeriod); + } + } + lastSync=currentTime; +// printf("lastSync: %lld\n",lastSync-startTime); +} + +double VirtualClock::getNow(){ + myClock_t currentSystemTime=Clock::getTimeUs(); + if(period<=0){ + return currentSystemTime; // TODO: this is not very meaningful. + } + // double beginningOfPeriod=lastSync; // TODO: if sync() does not get called every time (but e.g. only every so often), + // then this line (and the class) needs editing + myClock_t elapsed=(currentSystemTime-startTime); + double now=elapsedPeriodsOffset + elapsed/(double)period; +// printf("elapsed=%lld; sincelastSync=%lld; period=%f; now=%f\n", elapsed, currentSystemTime-lastSync, period, now); + return now; +} + +void VirtualClock::addOffset(double periodOffset){ + elapsedPeriods-=periodOffset; + if(elapsedPeriods<0){ + printf("ERROR: periodOffset adjustment of %f resulted in elapsedPeriods=%f\n", periodOffset, elapsedPeriods); + exit(1); + } + movingAverage.reset(); +} +double VirtualClock::getPeriod(){ + return period; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/intervals.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,94 @@ +/* + * intervals.h + * + * Created on: 18 May 2015 + * Author: unmanaged + */ + +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> + +#include <native/timer.h> +#include <rtdk.h> + +#include "../include/intervals.h" +void Interval::init(int aNumAverages, int aNumFrames, float aSamplingRate, const char *aName){ + enabled=false; + numAverages=aNumAverages; + intervals=(RTIME *)malloc(sizeof(RTIME)*numAverages); + samplingRate=aSamplingRate; + numFrames=aNumFrames; + maxTimeus=0; + intervalsPointer=0; + sum=0; + startTime=0; + + if(intervals!=0) + enabled=true; + int len=strlen(aName); + name=(char *)malloc(sizeof(char)*(len+1)); + strcpy(name, aName); + if(name == 0) + enabled=false; +}; +Interval::Interval(){ + init(100,1,44100,""); +} +Interval::Interval(int aNumAverages){ + init(aNumAverages,1,44100,""); +} +Interval::Interval(int aNumAverages, int aNumFrames, float aSamplingRate, const char *aName){ + init(aNumAverages,aNumFrames,aSamplingRate,aName); +} +Interval::~Interval(){ + free(intervals); +// free(name); +} +void Interval::setNumFrames(int aNumFrames){ + numFrames=aNumFrames; +}; + +int Interval::start(){ +// printf("start: intervals: 0x%x, intervalsPointer: %d, numAverages: %d\n", intervals,intervalsPointer,numAverages); + if(!enabled) + return 0; + startTime=rt_timer_read(); + return 1; +} +int Interval::resetMax(){ + maxTimeus=0; + return 1; +} +int Interval::split(){ //updates + if(!enabled) + return 0; + int currentInterval=rt_timer_read()-startTime; + RTIME *currentPointer=&(intervals[intervalsPointer]); + sum-=*currentPointer; //take out the oldest value from the sum + *currentPointer=currentInterval; + sum+=*currentPointer; //add the new value to the sum + timeus=((float)sum)/numAverages/1000.0; + maxTimeus=timeus>maxTimeus?timeus:maxTimeus; + intervalsPointer++; + if(intervalsPointer>=(numAverages-1)){ + intervalsPointer=0; + } + return 1; +} +void Interval::setEnabled(bool aActive){ + enabled=aActive; +} +float Interval::getTimeus(){ + return timeus; +} +float Interval::getMaxTimeus(){ + return maxTimeus; +} +void Interval::print(){ + rt_printf(//"sleepTimer time: 29.484us, (1.30 samples, numFrames: 2, 65.01%%). MaxTime: 30.439us, (1.34 samples, 67.12%%)\n"); +"%s time: %.3fus, (%.2f samples, numFrames: %d, %.2f%%). MaxTime: %.3fus, (%.2f samples, %.2f%%)\n", + name, + timeus, timeus/1000000.0*samplingRate, numFrames, timeus/1000000.0*samplingRate/numFrames * 100, + maxTimeus, maxTimeus/1000000.0*samplingRate, maxTimeus/1000000.0*samplingRate/numFrames * 100); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/Clock.h Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,18 @@ +#ifndef CLOCK_H_INCLUDED +#define CLOCK_H_INCLUDED +#include <sys/time.h> +#include <stdio.h> + +#ifdef USE_JUCE +#else +//#include <BeagleRT.h> +#include <native/timer.h> +#endif /* USE_JUCE */ + +typedef long long int myClock_t; + +namespace Clock +{ + myClock_t getTimeUs(); +} +#endif /* CLOCK_H_INCLUDED */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/ClockSync.h Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,84 @@ +#ifndef CLOCK_SYNC_H_INCLUDED +#define CLOCK_SYNC_H_INCLUDED +#include "stats.hpp" +#include "UdpServer.h" +#include "UdpClient.h" +#include "Clock.h" +#include "VirtualClock.h" + +enum ptpMessageConsts{ + kSyncMessageLength=sizeof(myClock_t)+sizeof(int) +}; +enum ptpMessageType{ + kSync=0, + kFollowUp=1, + kDelayReq=2, + kDelayResp=3, + kNone=4 +}; + +enum ptpStatus{ + kSyncSent, + kFollowUpSent, + kDelayReqSent, + kDelayRespSent +}; + +class ClockSync{ +private: + MovingAverage<double> movingAverage; + UdpServer server; + UdpClient client; + bool slave; + int bufferLength; + int clockSyncType; + int expectedClockSyncType; + myClock_t clockSyncTimestamp; + myClock_t localTimestamp; + myClock_t T1; + myClock_t T1p; + myClock_t T2; + myClock_t T2p; + int receiveLoopSleepUs; + int receiveLoopTimeout; + char buffer[kSyncMessageLength]; + VirtualClock *virtualClock; + void resetTs(); + bool areTsValid(); + void processOffset(double offset); +public: + ClockSync(){}; + ClockSync(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock); + void init(bool thisIsSlave, int aPort, VirtualClock &aVirtualClock); + void* getBuffer(); + bool isSlave(); + bool isMaster(); + int getType(); + myClock_t getTimestamp(); + void setVirtualClock(VirtualClock &aVirtualClock); + void setPort(int aPort); + void setType(int clockSyncType); + void setTimestamp(myClock_t timestamp); + void print(); + /** + * sends a clockSync without blocking, checks results and returns the timestamp + * immediately after the clockSync has been sent or -1 if there was an error or timeout expired. + */ + myClock_t send(); + /** + * receives a clockSync without blocking, checks results and returns the timestamp + * immediately after the clockSync has been received, or -1 if there was an error + * or 0 if timeout expired. + */ + myClock_t receive(); + int masterSendSync(); + int receiveLoop(); + int slaveHandleMessage(); + int masterHandleMessage(); + int sendReceiveLoop(); + operator void*(){ + return getBuffer(); + } +}; + +#endif /* CLOCK_SYNC_H_INCLUDED */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/ClockSyncThread.h Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,48 @@ +#ifndef CLOCK_SYNC_THREAD_H_INCLUDED +#define CLOCK_SYNC_THREAD_H_INCLUDED + +#ifdef USE_JUCE +#include <JuceHeader.h> +#define IS_STATIC +#else +#define IS_STATIC static +#include <BeagleRT.h> +#endif /*USE_JUCE*/ + +#include "ClockSync.h" + +#ifdef USE_JUCE +class ClockSyncThread : public Thread { +#else +class ClockSyncThread { +#endif /* USE_JUCE */ +private: + IS_STATIC myClock_t lastTime; // Used for clock synchronization + IS_STATIC bool listening; + IS_STATIC ClockSync clockSync; + IS_STATIC VirtualClock *virtualClock; +#ifdef USE_JUCE +#else + IS_STATIC bool threadIsExiting; + IS_STATIC AuxiliaryTask clockSyncTask; +#endif /* USE_JUCE */ + +public: +#ifdef USE_JUCE + ClockSyncThread(const String &threadName); +#else + ClockSyncThread(); +#endif /* USE_JUCE */ + ~ClockSyncThread(); + IS_STATIC void init(bool isSlave, int aPort, VirtualClock &aVirtualClock); + IS_STATIC void setVirtualClock(VirtualClock &aVirtualClock); + + IS_STATIC void run(); +#ifdef USE_JUCE +#else + IS_STATIC void startThread(); + IS_STATIC void stopThread(); + IS_STATIC bool threadShouldExit(); +#endif // USE_JUCE +}; +#endif // CLOCK_SYNC_THREAD_H_INCLUDED
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/ClockSynchronizer.h Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,42 @@ +/* + * ClockSynchronizer.h + * + * Created on: 26 Aug 2015 + * Author: giulio + */ + +#ifndef CLOCKSYNCHRONIZER_H_ +#define CLOCKSYNCHRONIZER_H_ + +#include <BeagleRT.h> +#include <I2c_Codec.h> +#include <NetworkSend.h> +#include <native/timer.h> +extern I2c_Codec *gAudioCodec; + +class ClockSynchronizer { +private: + int localOffset; + int remoteOffset; + RTIME timeOffset; + int localCounter; + int remoteCounter; + RTIME lastTime; + void reset(); + static bool staticConstructed; + static void staticConstructor(); + static float currentSamplingRate; + static float targetSamplingRate; + static bool threadRunning; + static int threadWasRunning; + static AuxiliaryTask setClockTask; +public: + ClockSynchronizer(); + ~ClockSynchronizer(); + void setup(); + void update(int aLocalCounter, int aRemoteCounter, RTIME aLastTime); + void adjustClock(); + static void setClock(); +}; + +#endif /* CLOCKSYNCHRONIZER_H_ */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/NetworkSend.h Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,115 @@ +//scope.cpp +#ifndef SCOPE_H_ +#define SCOPE_H_ + +#ifdef USE_JUCE +#include <JuceHeader.h> +#else +#include <BeagleRT.h> +#include <rtdk.h> +#include <cmath> +#include <UdpClient.h> +#include <vector> +#include <string> +extern bool gShouldStop; +#endif /* USE_JUCE */ + +#define NETWORK_AUDIO_BUFFER_SIZE 302 +#define UDP_BUFFER_HEADER_CHANNEL_INDEX 0 +#define UDP_BUFFER_HEADER_TIMESTAMP_INDEX 1 +#define UDP_BUFFER_HEADER_LENGTH 2 + +struct NetworkBuffer{ + int channelNumber; + int numBuffers; + int writeBuffer; + int readBuffer; + int writePointer; + float** buffers; + bool* doneOnTime; + bool* readyToBeSent; + bool enabled; + int sampleCount; + static const int bufferLength=NETWORK_AUDIO_BUFFER_SIZE; + static const int headerLength=UDP_BUFFER_HEADER_LENGTH; + static const int headerChannelIndex=UDP_BUFFER_HEADER_CHANNEL_INDEX; + static const int headerTimestampIndex=UDP_BUFFER_HEADER_TIMESTAMP_INDEX; +}; + +#ifdef USE_JUCE +class NetworkSend: public Thread { +#else +class NetworkSend { +#endif /* USE_JUCE */ + float sampleRate; +#ifdef USE_JUCE + DatagramSocket udpClient; + int sleepTimeMs; + String remoteHostname; + int remotePortNumber; +#else + UdpClient udpClient; + bool isThreadRunning(); + static int sleepTimeMs; + static bool threadShouldExit(); + static bool threadIsExiting; + static bool threadRunning; + static bool staticConstructed; + static void staticConstructor(); + static AuxiliaryTask sendDataTask; //TODO: allow different AuxiliaryTasks for different priorities (e.g.: audio vs scope) + static std::vector<NetworkSend *> objAddrs; +#endif /* USE_JUCE */ + void dealloc(); +public: + NetworkBuffer channel; +#ifdef USE_JUCE + NetworkSend(const String &threadName); +#else + NetworkSend(); +#endif + ~NetworkSend(); + void setup(float aSampleRate, int blockSize, int aChannelNumber, int aPort, const char *aServer); + void cleanup(); + void sendData(); + void log(float value); + void setPort(int aPort); + void setServer(const char* aServer); + void setChannelNumber(int aChannelNumber); + int getChannelNumber(); + int getTimestamp(); +#ifdef USE_JUCE + void run(); +#else + static int getNumInstances(); + static void sendAllData(); + static void startThread(); + static void stopThread(); + static void run(); +#endif /* USE_JUCE */ +}; + +#ifdef USE_JUCE +#else +/** + * An array of NetworkSend objects with some default parameters + * + * All sending on the same port (defaults to 9999) + * All sending to the same server (defaults to 127.0.0.1) +*/ +class Scope { + std::vector<NetworkSend> channels; + void deallocate(); +public: + Scope(int aNumChannels); + ~Scope(); + void log(int channel, float value); + void setup(); + void setup(float sampleRate, int aPort, const char* aServer); + void sendData(); + void setPort(int port); + void setPort(int channel, int port); + int getNumChannels(); +}; +#endif /* USE_JUCE */ + +#endif /* SCOPE_H */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/ReceiveAudioThread.h Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,105 @@ +#ifndef RECEIVEAUDIOTHREAD_H_INCLUDED +#define RECEIVEAUDIOTHREAD_H_INCLUDED + +#ifdef USE_JUCE +#include <JuceHeader.h> +#else +#include <BeagleRT.h> +#include <UdpServer.h> +#include <vector> +#include <iostream> +#include <native/task.h> +#include <native/timer.h> +#include <math.h> + +#endif /*USE_JUCE*/ + +#ifdef USE_JUCE +class ReceiveAudioThread : public Thread { +#else +class ReceiveAudioThread{ +#endif /* USE_JUCE */ +private: + // FILE *fd; //DEBUG + // FILE *fd2; //DEBUG +#ifdef USE_JUCE + DatagramSocket socket; +#else + UdpServer socket; +#endif /* USE_JUCE */ + bool listening; + bool bufferReady; +#ifdef USE_JUCE + bool threadRunning; //do we really need this ? +#else + static bool threadRunning; + static bool threadIsExiting; +#endif + float *buffer; + float *stackBuffer; + int bufferLength; + float readPointer; + int writePointer; + int lastValidPointer; +#ifdef USE_JUCE + int sleepTime; +#else + static int sleepTime; +#endif + int waitForSocketTime; + int payloadLength; //size of the payload of each datagram + int headerLength; //size of the header of each datagram + int bytesToRead; + int threadPriority; + int channel; + int timestamp; + void dealloc(); + void wrapWritePointer(); + void pushPayload(int startIndex); + void popPayload(int startIndex); + int readUdpToBuffer(); +#ifdef USE_JUCE +#else + RTIME lastTime; // Used for clock synchronization + static bool threadShouldExit(); + static bool staticConstructed; + static void staticConstructor(); + static AuxiliaryTask receiveDataTask; //TODO: allow different AuxiliaryTasks for different priorities (e.g.: audio vs scope) + static std::vector<ReceiveAudioThread *> objAddrs; +#endif +public: +#ifdef USE_JUCE + ReceiveAudioThread(const String &threadName); +#else + ReceiveAudioThread(); +#endif + ~ReceiveAudioThread(); + void init(int port, int aSamplesPerBlock, int channel); + void bindToPort(int aPort); + bool isListening(); + float* getCurrentBuffer(int length); + /** + * Copies the samples to a non-interleaved buffer. + */ + int getSamplesSrc(float *destination, int length, float samplingRateRatio); + /** + * Copies the samples to an interleaved buffer. + */ + int getSamplesSrc(float *destination, int length, + float samplingRateRatio, int numChannelsInDestination, + int channelToWriteTo); + bool isBufferReady(); + int getTimestamp(); +#ifdef USE_JUCE // if we are in Juce, then we run a separate thread for each receiver + // (as each of them are typically receiving on a mono or stereo track) + void run(); +#else + RTIME getLastTime(); + void static run(); //while in BeagleRT we have a single thread that receives for all the instances. + //TODO: make run() private in BeagleRT + static void startThread(); + static void stopThread(); + static int getNumInstances(); +#endif // USE_JUCE +}; +#endif // RECEIVEAUDIOTHREAD_H_INCLUDED
--- a/include/Scope.h Mon Sep 14 17:31:24 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,103 +0,0 @@ -//scope.cpp -#include <BeagleRT.h> -#include <rtdk.h> -#include <cmath> -#include <UdpClient.h> - -#define BUILD_FOR_UDPRECEIVE_PLUGIN -#define NETWORK_AUDIO_BUFFER_SIZE 302 -struct networkAudio{ - int timestamp; - int currentBuffer; - int index; - float buffers[2][NETWORK_AUDIO_BUFFER_SIZE]; - int doneOnTime; - bool toBeSent; - UdpClient udpClient; -}; - -#define NUM_SCOPE_CHANNELS 6 - -static void SendScopeData(); - -class Scope { - int sampleCount; - float sampleRate; - AuxiliaryTask scopeTask; - public: - int numChannels; - networkAudio channel[NUM_SCOPE_CHANNELS]; - Scope(){ - numChannels = NUM_SCOPE_CHANNELS; - sampleCount = 0; -#ifdef BUILD_FOR_UDPRECEIVE_PLUGIN - char server[]="192.168.7.1"; -#else - char server[]="127.0.0.1"; -#endif /* BUILD_FOR_UDPRECEIVE_PLUGIN */ - printf("Sending messages to : %s\n", server); - for(int n=0; n<numChannels; n++){ - channel[n].doneOnTime=1; - channel[n].index=2; //leave space for the heading message (channel, timestamp) - channel[n].timestamp=0; - channel[n].currentBuffer=0; - channel[n].toBeSent=false; - channel[n].udpClient.setServer(server); -#ifdef BUILD_FOR_UDPRECEIVE_PLUGIN - channel[n].udpClient.setPort(9999+n); -#else - channel[n].udpClient.setPort(9999); -#endif /* BUILD_FOR_UDPRECEIVE_PLUGIN */ - } - } - void setup(float _sampleRate); - void log(float channel1=0.0, float channel2=0.0, float channel3=0.0, float channel4=0.0, float channel5=0.0, float channel6=0.0){ - - for(int j=0; j<numChannels; j++){ - if(channel[j].index==(NETWORK_AUDIO_BUFFER_SIZE)){ // when the buffer is ready ... - channel[j].buffers[channel[j].currentBuffer][0] = (float)j; - channel[j].buffers[channel[j].currentBuffer][1] = (float)channel[j].timestamp; - channel[j].toBeSent=true; - channel[j].index=2; //reset the counter - if(channel[j].doneOnTime==0) - rt_printf("Network buffer underrun :-{\n"); - channel[j].timestamp=sampleCount; - channel[j].currentBuffer=!channel[j].currentBuffer; //switch buffer - channel[j].doneOnTime=0; - BeagleRT_scheduleAuxiliaryTask(scopeTask); //send the buffer - } - } - - channel[0].buffers[channel[0].currentBuffer][channel[0].index++]=channel1; - channel[1].buffers[channel[1].currentBuffer][channel[1].index++]=channel2; - channel[2].buffers[channel[2].currentBuffer][channel[2].index++]=channel3; - channel[3].buffers[channel[3].currentBuffer][channel[3].index++]=channel4; - channel[4].buffers[channel[4].currentBuffer][channel[4].index++]=channel5; - channel[5].buffers[channel[5].currentBuffer][channel[5].index++]=channel6; - - sampleCount++; - } -}; - -Scope* gOscilloscopeInstance; - -void Scope::setup(float _sampleRate){ - sampleRate = _sampleRate; - gOscilloscopeInstance = this; - scopeTask = BeagleRT_createAuxiliaryTask(*SendScopeData, 98, "transmit-receive-audio"); -} - -//Scope scope; - -static void SendScopeData(){ - for(int n=0; n<gOscilloscopeInstance->numChannels; n++){ - if(gOscilloscopeInstance->channel[n].toBeSent){ - gOscilloscopeInstance->channel[n].toBeSent=false; - gOscilloscopeInstance->channel[n].udpClient.send( - gOscilloscopeInstance->channel[n].buffers[!gOscilloscopeInstance->channel[n].currentBuffer], - NETWORK_AUDIO_BUFFER_SIZE*sizeof(float) - ); - gOscilloscopeInstance->channel[n].doneOnTime=1; - } - } -}
--- a/include/UdpClient.h Mon Sep 14 17:31:24 2015 +0100 +++ b/include/UdpClient.h Mon Sep 14 17:35:18 2015 +0100 @@ -23,6 +23,8 @@ int port; int enabled; int outSocket; + struct timeval stTimeOut; + fd_set stWriteFDS; bool isSetPort; bool isSetServer; struct sockaddr_in destinationServer; @@ -33,6 +35,8 @@ void setPort(int aPort); void setServer(const char* aServerName); int send(void* message, int size); + int write(const char* remoteHostname, int remotePortNumber, void* sourceBuffer, int numBytesToWrite); + int waitUntilReady(bool readyForReading, int timeoutMsecs); };
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/UdpServer.h Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,70 @@ +/* + * udpServer.h + * + * Created on: 19 May 2015 + * Author: giulio moro + */ + +#ifndef UDPSERVER_H_ +#define UDPSERVER_H_ + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <errno.h> +#include <netdb.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> + +class UdpServer{ + private: + int port; + int enabled; + int inSocket; + struct sockaddr_in server; + struct timeval stTimeOut; + struct timeval stZeroTimeOut; + fd_set stReadFDS; + int size; + void *wasteBuffer; + int wasteBufferSize; + int length; + socklen_t fromLength; + struct sockaddr_in from; + public: + UdpServer(); + UdpServer(int aPort); + ~UdpServer(); + bool init(int aPort); + bool bindToPort(int aPort); + int getBoundPort() const; + /* + * Reads bytes from the socket. + * + * Drop-in replacement for JUCE DatagramSocket::read() + * + If blockUntilSpecifiedAmountHasArrived is true, the method will block until maxBytesToRead + bytes have been read, (or until an error occurs). If this flag is false, the method will + return as much data as is currently available without blocking. + */ + int read(void* destBuffer, int maxBytesToRead, bool blockUntilSpecifiedAmountHasArrived); + void close(); + int empty(); + int empty(int maxCount); + /* + * Waits until the socket is ready for reading or writing. + * + Drop-in replacement for JUCE DatagramSocket::waitUntilReady. + If readyForReading is true, it will wait until the socket is ready for reading; if false, it will wait until it's ready for writing. + If the timeout is < 0, it will wait forever, or else will give up after the specified time. + If the socket is ready on return, this returns 1. If it times-out before the socket becomes ready, it returns 0. If an error occurs, it returns -1. + */ + int waitUntilReady(bool readyForReading, int timeoutMsecs); +}; + + + +#endif /* UDPSERVER_H_ */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/VirtualClock.h Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,53 @@ +#ifndef VIRTUAL_CLOCK_H_INCLUDED +#define VIRTUAL_CLOCK_H_INCLUDED + +#include "math.h" +#include "stats.hpp" +#include "Clock.h" +#ifdef USE_JUCE +#else +#include <BeagleRT.h> +#endif /* USE_JUCE */ + +class VirtualClock{ +private: + myClock_t startTime; + myClock_t startTimeOffset; + myClock_t lastSync; + bool firstRun; + double elapsedPeriods; + double elapsedPeriodsOffset; + double period; + MovingAverage<double> movingAverage; +public: + void init(); + VirtualClock(); +/** + Call this method at regular intervals to sync che virtual clock +*/ + void sync(); +/** + Call this method asynchronously, passing a number of equally spaced events that have elapsed since the last call. +*/ + void sync(double numPeriods); +/** + Get the current time according to the VirtualClock. + + @return Time elapsed since the first call to sync(), in period units. +*/ + double getNow(); +/** + Get the length of the period. + + Get the length of the period (difference between calls to sync() after various filtering operations) +*/ + double getPeriod(); +/** + * Add an offset to the number of elapsed periods. + * + * Add an offset to the number of elapsed periods. It also compensates for the corresponding time offset. + */ + void addOffset(double periodOffset); +}; + +#endif /* VIRTUAL_CLOCK_H_INCLUDED */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/intervals.h Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,50 @@ +/* + * intervals.h + * + * Created on: 18 May 2015 + * Author: unmanaged + */ + +#ifndef INTERVALS_H_ +#define INTERVALS_H_ + +#define TEN_POW_9 1000000000 + +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> + +#include <native/timer.h> +#include <rtdk.h> + +class Interval +{ + private: + int intervalsPointer; + long sum; + RTIME startTime; + RTIME *intervals; + float maxTimeus; + float timeus; + float samplingRate; //used for getPrint() + int numFrames; + bool enabled; //whether it actually reads the clock or not + int numAverages; + char *name; + void init(int aNumAverages, int aNumFrames, float aSamplingRate, const char *aName); + public: + Interval(); + Interval(int aNumAverages); + Interval(int aNumAverages, int aNumFrames, float aSamplingRate, const char *aName); + ~Interval(); + void setNumFrames(int aNumFrames); + int start(); + int resetMax(); + int split(); + void setEnabled(bool aActive); + float getTimeus(); + float getMaxTimeus(); + void print(); +}; + +#endif /* INTERVALS_H_ */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/stats.hpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,76 @@ +#ifndef STATS_HPP_INCLUDED +#define STATS_HPP_INCLUDED +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +template<class TYPE> +class MovingAverage{ +private: + TYPE* array; + int length; + bool bufferFull; + int pointer; + TYPE sum; + double scale; + double average; + void dealloc(){ + free(array); + // delete array; + } + + void init(int aLength){ + length=aLength; + scale=1.0/length; + // array= new TYPE(length); // for some reason this causes memory corruption, so I am using malloc() instead... + array=(TYPE*)malloc(sizeof(TYPE)*length); + sum=0; + if(array==NULL) + printf("Error while allocating array\n"); + memset(array, 0, sizeof(TYPE)*length); + reset(); + } +public: + MovingAverage(){ + init(0); + } + MovingAverage(int aLength){ + init(aLength); + } + ~MovingAverage(){ + dealloc(); + } + int getLength(){ + return bufferFull ? length : pointer; + } + void setLength(int aLength){ + dealloc(); + init(aLength); + } + double add(TYPE newElement){ + sum-=array[pointer]; + array[pointer]=newElement; + sum+=newElement; + if(bufferFull==true){ + average=sum*scale; + } + else{ + average=sum/(double)(1+pointer); + } + pointer++; + if(pointer==length){ + pointer=0; + bufferFull=true; + } + return average; + } + double getAverage(){ + return average; + } + void reset(){ + pointer=0; + bufferFull=false; + } +}; + +#endif /* STATS_HPP_INCLUDED */
--- a/projects/basic_network/render.cpp Mon Sep 14 17:31:24 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,150 +0,0 @@ -/* - * render.cpp - * - * Created on: Oct 24, 2014 - * Author: parallels - */ - -#include <BeagleRT.h> -//#include <rtdk.h> -#include <cmath> -#include <UdpClient.h> -#include <Utilities.h> - -AuxiliaryTask transmitReceiveDataTask; - -#define NETWORK_AUDIO_BUFFER_SIZE 400 //1400/4 //maximum payload for a UDP datagram over ethernet is 1472 bytes, I leave some headroom and divide by 4 to get the number of floats -struct networkAudio{ - int timestamp; - int currentBuffer; - int index; - float buffers[2][NETWORK_AUDIO_BUFFER_SIZE]; - int doneOnTime; - bool toBeSent; - UdpClient udpClient; -}; - -float gFrequency; -float gPhase; -float gInverseSampleRate; -int gCount=0; -//networkData networkObject; -#define numNetAudio 3 -networkAudio netAudio[numNetAudio]; -AuxiliaryTask printIntervalTask; -AuxiliaryTask transmitReceiveAudioTask; - -void transmitReceiveAudio(){ //transmit and receive audio buffers - for(int n=0;n<numNetAudio; n++){ - if(netAudio[n].toBeSent){ - netAudio[n].toBeSent=false; - netAudio[n].udpClient.send(netAudio[n].buffers[!netAudio[n].currentBuffer],NETWORK_AUDIO_BUFFER_SIZE*sizeof(float)); - netAudio[n].doneOnTime=1; - } - } -} - -// setup() is called once before the audio rendering starts. -// Use it to perform any initialisation and allocation which is dependent -// on the period size or sample rate. -// -// userData holds an opaque pointer to a data structure that was passed -// in from the call to initAudio(). -// -// Return true on success; returning false halts the program. -bool setup(BeagleRTContext *context, void *userData) -{ - // Retrieve a parameter passed in from the initAudio() call - gFrequency = *(float *)userData; - - gInverseSampleRate = 1.0 / context->audioSampleRate; - gPhase = 0.0; - -// networkObject.counter=&gCount; -// networkObject.variables[0]=&gFrequency; -// networkObject.variables[1]=&gPhase; -// networkObject.numVariables=2; - for(int n=0; n<numNetAudio; n++){ - netAudio[n].doneOnTime=1; - netAudio[n].index=0; - netAudio[n].currentBuffer=0; - netAudio[n].toBeSent=false; -// netAudio[n].udpClient.setPort(settings->transmitPort+n); -// netAudio[n].udpClient.setServer(settings->serverName); - netAudio[n].udpClient.setPort(9999+n); - netAudio[n].udpClient.setServer("192.168.7.1"); - } -// setupSockets(settings->receivePort, settings->transmitPort, settings->serverName); - -// transmitReceiveDataTask=createAuxiliaryTask(*transmitReceiveData, 10, "transmit-receive-data"); -// scheduleAuxiliaryTask(transmitReceiveDataTask); //here it does not work - transmitReceiveAudioTask=BeagleRT_createAuxiliaryTask(*transmitReceiveAudio, 98, "transmit-receive-audio"); - return true; -} - -// render() is called regularly at the highest priority by the audio engine. -// Input and output are given from the audio hardware and the other -// ADCs and DACs (if available). If only audio is available, numMatrixFrames -// will be 0. - -void render(BeagleRTContext *context, void *userData) -{/* - for(unsigned int n = 0; n < context->audioFrames; n++) { - float out = 0.7f * sinf(gPhase); - gPhase += 2.0 * M_PI * gFrequency * gInverseSampleRate; - if(gPhase > 2.0 * M_PI) - gPhase -= 2.0 * M_PI; - - for(unsigned int channel = 0; channel < context->audioChannels; channel++) - context->audioOut[n * context->audioChannels + channel] = out; - - if(gCount == 0){ - BeagleRT_scheduleAuxiliaryTask(transmitReceiveDataTask); - } - gCount++; - } - - -*/ - for(int n = 0; n < context->audioFrames; n++) { - float out = 0.7f * sinf(gPhase); - gPhase += 2.0 * M_PI * gFrequency * gInverseSampleRate; - if(gPhase > 2.0 * M_PI) - gPhase -= 2.0 * M_PI; - -// for(int channel = 0; channel < context->audioChannels; channel++) -// context->audioOut[n * context->audioChannels + channel] = context->audioIn[n * context->audioChannels + 0]+context->audioIn[n * context->audioChannels + 1]; - context->audioOut[n * context->audioChannels] = context->audioIn[n*context->audioChannels+0]; - context->audioOut[n * context->audioChannels+1]=out; - if(0==gCount){ -// scheduleAuxiliaryTask(transmitReceiveDataTask); - } - for(int j=0; j<numNetAudio; j++){ - if(netAudio[j].index==(NETWORK_AUDIO_BUFFER_SIZE)){ // when the buffer is ready ... - netAudio[j].toBeSent=true; - netAudio[j].index=0; //reset the counter - if(netAudio[j].doneOnTime==0) - rt_printf("Network buffer underrun :-{\n"); - netAudio[j].timestamp=gCount; - netAudio[j].currentBuffer=!netAudio[j].currentBuffer; //switch buffer - netAudio[j].doneOnTime=0; - BeagleRT_scheduleAuxiliaryTask(transmitReceiveAudioTask); //send the buffer - } - } - if((gCount&1)==0){ - netAudio[1].buffers[netAudio[1].currentBuffer][netAudio[1].index++]=analogReadFrame(context,n/2,0)+context->audioOut[n*context->audioChannels + 0]; - netAudio[2].buffers[netAudio[2].currentBuffer][netAudio[2].index++]=analogReadFrame(context,n/2,1)+context->audioOut[n*context->audioChannels + 0]; - } - netAudio[0].buffers[netAudio[0].currentBuffer][netAudio[0].index++]=0.5*(out+context->audioOut[n*context->audioChannels + 0]);//copy channel 0 to the buffer -// netAudio[1].buffers[netAudio[1].currentBuffer][netAudio[1].index++]=0.5*(out+context->audioOut[n*context->audioChannels + 0]); -// netAudio[2].buffers[netAudio[2].currentBuffer][netAudio[2].index++]=0.5*(out+context->audioOut[n*context->audioChannels + 0]); - gCount++; - } -} - -// cleanup() is called once at the end, after the audio has stopped. -// Release any resources that were allocated in setup(). - -void cleanup(BeagleRTContext *context, void *userData) -{ -}
--- a/projects/scope/render.cpp Mon Sep 14 17:31:24 2015 +0100 +++ b/projects/scope/render.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -1,12 +1,16 @@ #include <BeagleRT.h> -#include <Scope.h> +#include <NetworkSend.h> +#include <ReceiveAudioThread.h> +#include <ClockSynchronizer.h> #include <cmath> +#include <ClockSyncThread.h> float gPhase1, gPhase2; float gFrequency1, gFrequency2; float gInverseSampleRate; -Scope scope; //create a scope object +//Scope scope(2); //create a scope object with 2 channels +//NetworkSend networkSend; // initialise_render() is called once before the audio rendering starts. // Use it to perform any initialisation and allocation which is dependent @@ -16,10 +20,37 @@ // in from the call to initAudio(). // // Return true on success; returning false halts the program. +//ReceiveAudioThread receiveAudio0; +//ReceiveAudioThread receiveAudio1; +ClockSynchronizer clockSynchronizer; +extern I2c_Codec* gAudioCodec; +VirtualClock virtualClock; +ClockSyncThread clockSyncThread; +AuxiliaryTask testTime; +void testTimeFunction(){ + rt_printf("time=["); + while(!gShouldStop){ + rt_task_sleep(50000*1e3); + rt_printf("%f, ", virtualClock.getNow()); + rt_printf("%f, ", virtualClock.getPeriod()); + rt_task_sleep(20000); + rt_printf("%f,", virtualClock.getNow()); + rt_printf("%f\n", virtualClock.getPeriod()); + } + rt_printf("];"); +} bool setup(BeagleRTContext *context, void *userData) { - scope.setup(context->audioSampleRate); //call this once in setup to initialise the scope - +// receiveAudio0.init(10000, context->audioFrames, 0); +// receiveAudio1.init(10000, context->audioFrames, 1); + +// scope.setup(); //call this once in setup to initialise the scope +// scope.setPort(0, 9999); +// scope.setPort(1, 10000); +// networkSend.setup(context->audioSampleRate, context->audioFrames, 0, 9999, "192.168.7.1"); +// clockSynchronizer.setup(); + virtualClock.init(); + clockSyncThread.init(true, 5000, virtualClock); //start as slave gInverseSampleRate = 1.0/context->audioSampleRate; gPhase1 = 0.0; @@ -27,6 +58,8 @@ gFrequency1 = 200.0; gFrequency2 = 201.0; + +// testTime=BeagleRT_createAuxiliaryTask(testTimeFunction, 80, "testTimeTask"); return true; } @@ -37,33 +70,94 @@ void render(BeagleRTContext *context, void *userData) { + virtualClock.sync(context->audioFrames); + static int count=0; + if(count==0){ +// BeagleRT_scheduleAuxiliaryTask(testTime); + clockSyncThread.startThread(); //make sure you uncomment .init in setup() + } + static float phase=0; + float phaseInc=200.0/44100.0*2*M_PI; +// rt_printf("phaseInc: %f, phase: %f\n",phaseInc,phase); + for(unsigned int n=0; n<context->audioFrames; n++){ + context->audioOut[n*2]=sinf(phaseInc);//context->audioIn[n*2]; + context->audioOut[n*2+1]=sinf(phaseInc);//context->audioIn[n*2]; + phase+=200.0/44100.0*2*M_PI; + if(phase>=2*M_PI) + phase-=2*M_PI; +// context->audioOut[n*2+1]=rand()/(float)RAND_MAX;context->audioIn[n*2]; + } + count++; + /* +// if((count&262143)==0){ +// static int nextCall=160000; + if( ((count&(2047))==0)){ +// rt_printf("b %d\n", count); + clockSynchronizer.update(networkSend.getTimestamp(), receiveAudio0.getTimestamp(), receiveAudio0.getLastTime()); +// nextCall=count+100000; +// rt_printf("a %d\n", count); + } +// if(count == nextCall){ +// clockSynchronizer.update(networkSend.getTimestamp(), receiveAudio0.getTimestamp(), receiveAudio0.getLastTime()); +// } + if(count==0){ + gAudioCodec->setAudioSamplingRate( 44100); + rt_printf("startHread\n"); + ReceiveAudioThread::startThread(); + } for(unsigned int n = 0; n < context->audioFrames; n++) { - float chn1 = sinf(gPhase1); - float chn2 = sinf(gPhase2); - - float chn3 = context->audioIn[n*2 + 0]; - float chn4 = context->audioIn[n*2 + 1]; - - float chn5 = context->analogIn[(int)n/2*8 + 0]; - float chn6 = context->analogIn[(int)n/2*8 + 1]; + float chn0 = sinf(gPhase1); +// float chn1 = sinf(gPhase2); + + // float chn2 = context->audioIn[n*2 + 0]; + // float chn3 = context->audioIn[n*2 + 1]; + + // float chn4 = context->analogIn[(int)n/2*8 + 0]; + // float chn5 = context->analogIn[(int)n/2*8 + 1]; +// networkSend.log(context->audioIn[n]); + networkSend.log(chn0); +// scope.log(0, chn0); +// scope.log(1, chn1); + // scope.log(2, chn2); + // scope.log(3, chn3); + // scope.log(4, chn4); + // scope.log(5, chn5); // scope.log(chn1, chn2, chn3, chn4, chn5, chn6); - scope.log(chn1); //call this once every audio frame //takes six or fewer floats as parameters //first parameter becomes channel 1 etc //to view, click the 'oscilloscope' button on the toolbar while BeagleRT is NOT running //then click the big red button on the toolbar on this page - gPhase1 += 2.0 * M_PI * gFrequency1 * gInverseSampleRate; - gPhase2 += 2.0 * M_PI * gFrequency2 * gInverseSampleRate; + gPhase1 += 2.0 * M_PI * gFrequency1 * gInverseSampleRate * ((count&65535)/65535.0+1); + gPhase2 += 2.0 * M_PI * gFrequency2 * gInverseSampleRate; if(gPhase1 > 2.0 * M_PI) gPhase1 -= 2.0 * M_PI; if(gPhase2 > 2.0 * M_PI) gPhase2 -= 2.0 * M_PI; - + int value=count%1000; + context->audioOut[n*2]=value>=500 ? 1 : -1; + context->audioOut[n*2+1]=context->audioOut[n*2]; + count++; } + if(count>0){ + float samplingRateRatio=1; + int channelsInDestinationBuffer=2; + int channelToWriteTo=1; + int length=receiveAudio0.getSamplesSrc(context->audioOut, context->audioFrames, + samplingRateRatio, channelsInDestinationBuffer, channelToWriteTo); + if((unsigned int)length!=context->audioFrames){ + rt_printf("Length mismatch: %d\n", length); + } +// int readPointer1=receiveAudio1.getSamplesSrc(context->audioOut, context->audioFrames, 1, 2, 1); + } + for(unsigned int n=0; n<context->audioFrames; n++){ +// context->audioOut[n*2+1]=context->audioOut[n*2]; + } + */ + } // cleanup_render() is called once at the end, after the audio has stopped.
--- a/resources/network/udp-server.c Mon Sep 14 17:31:24 2015 +0100 +++ b/resources/network/udp-server.c Mon Sep 14 17:35:18 2015 +0100 @@ -47,11 +47,12 @@ n = recvfrom(sock,buf,2048,0,(struct sockaddr *)&from,&fromlen); if (n < 0) error("recvfrom"); printf("Received a datagram of size %d: \n", n); - for(i=0; i<n/sizeof(float); i+=8) - printf("[%05d]: %+f, %+f, %+f, %+f, %+f, %+f, %+f, %+f\n",i,buf[0+i],buf[1+i],buf[2+i],buf[3+i],buf[4+i],buf[5+i],buf[6+i],buf[7+i]); - n = sendto(sock,"Got your message\n",17, - 0,(struct sockaddr *)&from,fromlen); - if (n < 0) error("sendto"); + printf("Header: channel: %d, timestamp: %d\n", (int)buf[0], (int)buf[1]); + // for(i=2; i<n/sizeof(float); i+=8) + // printf("%+f, %+f, %+f, %+f, %+f, %+f, %+f, %+f\n",buf[0+i],buf[1+i],buf[2+i],buf[3+i],buf[4+i],buf[5+i],buf[6+i],buf[7+i]); + // n = sendto(sock,"Got your message\n",17, + // 0,(struct sockaddr *)&from,fromlen); + // if (n < 0) error("sendto"); } return 0; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/resources/tests/UdpClientUdpServerTest.cpp Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,128 @@ +#include "../../include/UdpServer.h" +#include "../../include/UdpClient.h" +#include <unistd.h> + +int test1(UdpServer *server, UdpClient *client){ + int buffer; + int tot=100; + int errors=0; + for(int n=0; n<tot; n++){ + if(client->send(&n,sizeof(int))!=1){ + printf("error: while sending\n"); + errors++; + } + } + for(int n=0; n<tot; n++){ + if(server->read(&buffer,sizeof(int))<0){ + printf("error: unable to read\n"); + errors++; + continue; + }; + if(n!=buffer){ + printf("error: %d!=%d\n",n,buffer); + errors++; + } + } + int n=server->emptySocket(); + if(n!=0) + printf("Error: the socket had %d bytes\n",n); + return errors; +} +int compareStrings(char * str1, char * str2){ + if(strlen(str1)!=strlen(str2)) + return -1; + for(int n=0; n<strlen(str1); n++){ + if(str1[n]!=str2[n]) + return -1; + } + return 0; +} + +int test2(UdpServer *server, UdpClient *client){ + char buffer[1000]; + int tot=100; + int errors=0; + for(int n=0; n<tot; n++){ + int num=sprintf(buffer,"%08.8f",n/1000.0); + client->send(&buffer,sizeof(char)*(num+1)); + } + char auxBuffer[100]; + for(int n=0; n<tot; n++){ + int num=sprintf(auxBuffer,"%08.8f",n/1000.0); + server->read(&buffer,num*sizeof(char)); + if(compareStrings(auxBuffer,buffer)==-1){ + printf("error: %s!=%s\n",auxBuffer,buffer); + errors++; + } + } + + return errors; +} + +int test3(UdpServer *server, UdpClient *client){ + char buffer[1000]; + int tot=100; + int errors=0; + int totNum=0; + for(int n=0; n<tot; n++){ + int num=sprintf(buffer,"%.8f",n/1000.0); + client->send(&buffer,sizeof(char)*(num+1)); + totNum+=1+num; + } + int n=server->emptySocket(); + if(n!=totNum){ + errors=1; + printf("retrieved bytes differs from sent bytes: %d!=%d\n",n,totNum); + } + return errors; +} + + +int main(){ + int port=1234; + char serverName[]="127.0.0.1"; + UdpServer server(port); + UdpClient client(port,serverName); + int errors=0; + int ret=0; + ret=test1(&server,&client); + errors+=ret; + if(ret) + printf("test1 failed with %d errors\n", ret); + else + printf("test1 passed\n"); + + ret=test2(&server,&client); + errors+=ret; + if(ret) + printf("test2 failed with %d errors\n", ret); + else + printf("test2 passed\n"); + + ret=test3(&server,&client); + errors+=ret; + if(ret) + printf("test3 failed with %d errors\n", ret); + else + printf("test3 passed\n"); +//now test if the setPort and setServer methods work + client.~UdpClient(); + server.~UdpServer(); + port=1235; + UdpServer server1; + UdpClient client1; + client1.setPort(port); + client1.setServer(serverName); + if(server1.bindToPort(port)==false){ + printf("unable to bind to port %d\n",port); + errors+=1; + } + ret=test1(&server1, &client1); + errors+=ret; + if(ret) + printf("test1 failed with %d errors\n", ret); + else + printf("test1 passed\n"); + + return errors; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/resources/tests/make_tests.sh Mon Sep 14 17:35:18 2015 +0100 @@ -0,0 +1,7 @@ +#!/bin/bash +mkdir -p build +mkdir -p bin +g++ -o build/UdpServer.o -O2 -c ../../core/UdpServer.cpp &&\ +g++ -O2 -o build/UdpClient.o -c ../../core/UdpClient.cpp && \ +g++ -O2 -o build/UdpClientUdpServerTest.o -c UdpClientUdpServerTest.cpp && \ +g++ -o bin/UdpClientUdpServerTest build/UdpClientUdpServerTest.o build/UdpClient.o build/UdpServer.o