mi@0
|
1 #!/usr/bin/env python
|
mi@0
|
2 # encoding: utf-8
|
mi@0
|
3 """
|
mi@0
|
4 acorr.py
|
mi@0
|
5
|
mi@0
|
6 This function provides similar fuctionality to Matlab's xcorr() but for computing autocorrelation only.
|
mi@0
|
7
|
mi@0
|
8 It gives equivalent results given the following Matlab test script:
|
mi@0
|
9
|
mi@0
|
10 %==== MATLAB ====
|
mi@0
|
11 a=[1,2,3,4,5,1,2,3,4,5,2,3,4,5,6,1,2,3,4,5];
|
mi@0
|
12 [xcr,lag] = xcorr(a,a,20,'unbiased');
|
mi@0
|
13 disp(xcr);
|
mi@0
|
14 plot(lag,xcr);
|
mi@0
|
15 %==== MATLAB ====
|
mi@0
|
16
|
mi@0
|
17 Created by George Fazekas on 2014-02-26.
|
mi@0
|
18 Copyright (c) 2014 . All rights reserved.
|
mi@0
|
19 """
|
mi@0
|
20
|
mi@0
|
21 import sys,os
|
mi@0
|
22 import numpy as np
|
mi@0
|
23 from scipy.fftpack import fft, ifft
|
mi@0
|
24
|
mi@0
|
25 class ACORR(object):
|
mi@0
|
26
|
mi@0
|
27 def nextpow2(self, n):
|
mi@0
|
28 '''Return the next power of 2 such as 2^p >= n'''
|
mi@0
|
29 if np.any(n < 0):
|
mi@0
|
30 raise ValueError("n should be > 0")
|
mi@0
|
31 if np.isscalar(n):
|
mi@0
|
32 f, p = np.frexp(n)
|
mi@0
|
33 if f == 0.5:
|
mi@0
|
34 return p-1
|
mi@0
|
35 elif np.isfinite(f):
|
mi@0
|
36 return p
|
mi@0
|
37 else:
|
mi@0
|
38 return f
|
mi@0
|
39 else:
|
mi@0
|
40 f, p = np.frexp(n)
|
mi@0
|
41 res = f
|
mi@0
|
42 bet = np.isfinite(f)
|
mi@0
|
43 exa = (f == 0.5)
|
mi@0
|
44 res[bet] = p[bet]
|
mi@0
|
45 res[exa] = p[exa] - 1
|
mi@0
|
46 return res
|
mi@0
|
47
|
mi@0
|
48 def acorr_ifft_fft(self, x, nfft, lag, onesided=False, scale='none'):
|
mi@0
|
49 '''Compute the actual autocorrelation via IFFT/FFT '''
|
mi@0
|
50 ra = np.real(ifft(np.abs(fft(x, n=nfft) ** 2)))
|
mi@0
|
51 if onesided:
|
mi@0
|
52 b = ra[..., :lag]
|
mi@0
|
53 else:
|
mi@0
|
54 b = np.concatenate([ra[..., nfft-lag+1:nfft], ra[..., :lag]], axis=-1)
|
mi@0
|
55 #print b, ra[..., 0][..., np.newaxis], b / ra[..., 0][..., np.newaxis]
|
mi@0
|
56 if scale == 'coeff':
|
mi@0
|
57 return b / ra[..., 0][..., np.newaxis]
|
mi@0
|
58 elif scale == 'unbiased':
|
mi@0
|
59 '''scale = len(x)-abs(lags)'''
|
mi@0
|
60 lags = np.array(range(-lag+1,lag))
|
mi@0
|
61 # print lags,len(x)
|
mi@0
|
62 scale = len(x) - abs(lags)
|
mi@0
|
63 scale[scale<=0] = 1.0
|
mi@0
|
64 # print scale
|
mi@0
|
65 return b/scale
|
mi@0
|
66 else:
|
mi@0
|
67 return b
|
mi@0
|
68
|
mi@0
|
69 def acorr(self, x, axis=-1, onesided=False, lag=None, scale='none'):
|
mi@0
|
70 '''Compute autocorrelation of x along given axis.
|
mi@0
|
71 x : array-like
|
mi@0
|
72 signal to correlate.
|
mi@0
|
73 axis : int
|
mi@0
|
74 axis along which autocorrelation is computed.
|
mi@0
|
75 onesided: bool, optional
|
mi@0
|
76 if True, only returns the right side of the autocorrelation.
|
mi@0
|
77 scale: {'none', 'coeff'} scaling mode.
|
mi@0
|
78 If 'coeff', the correlation is normalised such as the 0-lag is equal to 1.
|
mi@0
|
79 if 'unbiased' the correlation is normalised using len(x) - abs(lags) '''
|
mi@0
|
80
|
mi@0
|
81 if not scale in ['none', 'coeff','unbiased']:
|
mi@0
|
82 raise ValueError("scale mode %s not understood" % scale)
|
mi@0
|
83 if not lag :
|
mi@0
|
84 lag = x.shape[axis]
|
mi@0
|
85 lag += 1
|
mi@0
|
86 nfft = 2 ** self.nextpow2(2 * lag - 1)
|
mi@0
|
87 lags = np.array(range(-lag+1, lag))
|
mi@0
|
88 if axis != -1:
|
mi@0
|
89 x = np.swapaxes(x, -1, axis)
|
mi@0
|
90 a = self.acorr_ifft_fft(x, nfft, lag, onesided, scale)
|
mi@0
|
91 if axis != -1:
|
mi@0
|
92 a = np.swapaxes(a, -1, axis)
|
mi@0
|
93 return a,lags
|
mi@0
|
94
|