Source code for pylablib.core.dataproc.fourier

Routines for Fourier transform.

from __future__ import division

from ..datatable.wrapping import wrap
from ..datatable import column
from . import waveforms, specfunc
import numpy as np
import numpy.fft as fft        

[docs]def truncate_len_pow2(trace, truncate_power=None): """ Truncate trace length to the the nearest power of 2. If `truncate_power` is not ``None``, it determines the minimal power of 2 that has to divide the length. (if it is ``None``, than it's the maximal possible power). """ if truncate_power==0: return trace if truncate_power<0: truncate_power=None l=len(trace) chunk_l=1 power=0 while chunk_l*2<=l: chunk_l=chunk_l*2 power=power+1 if truncate_power is not None and power>=truncate_power: break l=(l//chunk_l)*chunk_l return wrap(trace).t[:l]
[docs]def normalize_fourier_transform(ft, normalization="none"): """ Normalize the Fourier transform data. `ft` is a 2D data with 2 columns: frequency and complex amplitude. `normalization` can be ``'none'`` (none done), ``'sum'`` (the power sum is preserved: ``sum(abs(ft)**2)==sum(abs(trace)**2)``) or ``'density'`` (power spectral density normalization). """ l=len(ft) if normalization=="sum": ft=wrap(ft).copy() ft[:,1]=ft[:,1]/np.sqrt(l) ft=ft.cont elif normalization=="density" or normalization=="dBc": ft=wrap(ft).copy() norm=np.sqrt(l**2*abs(ft[1,0]-ft[0,0])) if normalization=="dBc": norm=norm*ft[len(ft)//2,1]/l ft[:,1]=ft[:,1]/norm ft=ft.cont elif normalization!="none": raise ValueError("unrecognized normalization mode: {0}".format(normalization)) return ft
[docs]def apply_window(trace_values, window="rectangle", window_power_compensate=True): """ Apply FT window to the trace. If ``window_power_compensate==True``, multiply the data is multiplied by a compensating factor to preserve power in the spectrum. """ if window=="rectangle": return trace_values window=specfunc.get_window_func(window) window_trace=window(np.arange(len(trace_values)),len(trace_values),ft_compensated=window_power_compensate) return trace_values*window_trace
[docs]def fourier_transform(trace, truncate=False, truncate_power=None, normalization="none", no_time=False, single_sided=False, window="rectangle", window_power_compensate=True): """ Calculate a fourier transform of the trace. Args: trace: Time trace to be transformed. Either an ``Nx2`` array, where ``trace[:,0]`` is time and ``trace[:,1]`` is data (real or complex), or an ``Nx3`` array, where ``trace[:,0]`` is time, ``trace[:,1]`` is the real part of the signal and ``trace[:,2]`` is the imaginary part. truncate (bool): If ``True``, cut the data to the power of 2. truncate_power: If ``None``, cut to the nearest power of 2; otherwise, cut to the largest possible length that divides ``2**truncate_power``. Only relevant if ``truncate==True``. normalization (str): Fourier transform normalization: - ``'none'``: no normalization; - ``'sum'``: then norm of the data is conserved (``sum(abs(ft[:,1])**2)==sum(abs(trace[:,1])**2)``); - ``'density'``: power spectral density normalization, in ``x/rtHz`` (``sum(abs(ft[:,1])**2)*df==mean(abs(trace[:,1])**2)``); - ``'dBc'``: like ``'density'``, but normalized to the mean trace value. no_time (bool): If ``True``, assume that the time axis is missing and use the standard index instead (if trace is 1D data, `no_time` is always ``True``). single_sided (bool): If ``True``, only leave positive frequency side of the transform. window (str): FT window. Can be ``'rectangle'`` (essentially, no window), ``'hann'`` or ``'hamming'``. window_power_compensate (bool): If ``True``, the data is multiplied by a compensating factor to preserve power in the spectrum. Returns: a two-column array, where the first column is frequency, and the second is complex FT data. """ wrapped=wrap(trace) column_names=["frequency","ft_data"] if trace.ndim==1: trace_values=wrapped[:] else: if wrapped.shape()[1]==(1 if no_time else 2): trace_values=wrapped[:,-1] elif wrapped.shape()[1]==(2 if no_time else 3): trace_values=wrapped[:,-2]+1j*wrapped[:,-1] else: raise ValueError("fourier_transform doesn't work for an array with shape {0}".format(wrapped.shape())) dt=1. if (no_time or wrapped.ndim()==1) else wrapped[1,0]-wrapped[0,0] if len(trace_values)==0: return wrapped.from_array(np.zeros((0,2)),column_names,wrapped=False) if len(trace_values)==1: return wrapped.from_array(np.array([[0,trace_values[0]]]),column_names,wrapped=False) if truncate: trace_values=truncate_len_pow2(trace_values,truncate_power=truncate_power) trace_values=apply_window(trace_values,window,window_power_compensate=window_power_compensate) ft=fft.fftshift(fft.fft(trace_values)) df=1./(dt*len(ft)) frequencies=column.crange(-len(ft)/2.,len(ft)/2.)*df ft=wrapped.from_columns([frequencies.as_array(),ft],column_names,wrapped=False) if wrapped.ndim()>1 else np.column_stack((frequencies,ft)) ft=normalize_fourier_transform(ft,normalization) if single_sided: ft=wrap(ft).t[len(ft)//2:,:] ft[0,0]=0 # numerical error compensation return ft
[docs]def flip_fourier_transform(ft): """ Flip the fourier transform (analogous to making frequencies negative and flipping the order). """ ft=wrap(ft).copy() if len(ft)%2==1: ft[:,1]=ft[::-1,1] else: ft[1::,1]=ft[:0:-1,1] return ft.cont
[docs]def inverse_fourier_transform(ft, truncate=False, truncate_power=None, no_freq=False, zero_loc=None, symmetric_time=False): """ Calculate an inverse fourier transform of the trace. Args: ft: Fourier transform data to be inverted. Is an ``Nx2`` array, where ``ft[:,0]`` is frequency and ``ft[:,1]`` is fourier transform (real or complex). truncate (bool): If ``True``, cut the data to the power of 2. truncate_power: If ``None``, cut to the nearest power of 2; otherwise, cut to the largest possible length that divides ``2**truncate_power``. Only relevant if ``truncate==True``. no_freq (bool): If ``True``, assume that the frequency axis is missing and use the standard index instead (if trace is 1D data, `no_freq` is always ``True``). zero_loc (bool): Location of the zero frequency point. Can be ``None`` (the one with the value of f-axis closest to zero), ``'center'`` (mid-point) or an integer index. symmetric_time (bool): If ``True``, make time axis go from ``(-0.5/df, 0.5/df)`` rather than ``(0, 1./df)``. Returns: a two-column array, where the first column is frequency, and the second is the complex-valued trace data. """ wrapped=wrap(ft) column_names=["time","data"] if len(ft)==0: return wrapped.from_array(np.zeros((0,2)),column_names,wrapped=False) if len(ft)==1: return wrapped.from_array(np.array([[0,wrapped[:,0]]]),column_names,wrapped=False) no_freq=no_freq or wrapped.ndim()==1 if zero_loc is None: if no_freq: zero_freq_point=0 else: zero_freq_point=waveforms.find_closest_arg(wrapped.c[0],0,ordered=True) if zero_freq_point is None: raise ValueError("can't find zero frequency point; closest is {0}".format(wrapped[zero_freq_point,0])) elif zero_loc=="center": zero_freq_point=len(ft)//2 else: zero_freq_point=zero_loc if wrapped.ndim()==1: ft_ordered=np.concatenate(( wrapped[zero_freq_point:], wrapped[:zero_freq_point] )) else: ft_ordered=np.concatenate(( wrapped[zero_freq_point:,-1], wrapped[:zero_freq_point,-1] )) if truncate: ft_ordered=truncate_len_pow2(ft_ordered,truncate_power=truncate_power) trace=fft.ifft(ft_ordered) l=len(trace) df=1. if no_freq else wrapped[1,0]-wrapped[0,0] dt=1./(df*l) times=column.crange(len(ft))*dt if symmetric_time: times=times-times[l//2] trace=np.concatenate((trace[l//2:],trace[:l//2])) if wrapped.ndim()==1: return np.column_stack((times,trace)) else: return wrapped.from_columns([times.as_array(),trace],column_names,wrapped=False)
[docs]def power_spectral_density(trace, truncate=False, truncate_power=None, normalization="density", no_time=False, single_sided=False, window="rectangle", window_power_compensate=True): """ Calculate a power spectral density of the trace. Args: trace: Time trace to be transformed. Either an ``Nx2`` array, where ``trace[:,0]`` is time and ``trace[:,1]`` is data (real or complex), or an ``Nx3`` array, where ``trace[:,0]`` is time, ``trace[:,1]`` is the real part of the signal and ``trace[:,2]`` is the imaginary part. truncate (bool): If ``True``, cut the data to the power of 2. truncate_power: If ``None``, cut to the nearest power of 2; otherwise, cut to the largest possible length that divides ``2**truncate_power``. Only relevant if ``truncate==True``. normalization (str): Fourier transform normalization: - ``'none'``: no normalization; - ``'sum'``: then norm of the data is conserved (``sum(PSD[:,1]))==sum(abs(trace[:,1])**2)``); - ``'density'``: power spectral density normalization, in ``x/rtHz`` (``sum(PSD[:,1])*df==mean(abs(trace[:,1])**2)``); - ``'dBc'``: like ``'density'``, but normalized to the mean trace value. no_time (bool): If ``True``, assume that the time axis is missing and use the standard index instead (if trace is 1D data, `no_time` is always ``True``). single_sided (bool): If ``True``, only leave positive frequency side of the PSD. window (str): FT window. Can be ``'rectangle'`` (essentially, no window), ``'hann'`` or ``'hamming'``. window_power_compensate (bool): If ``True``, the data is multiplied by a compensating factor to preserve power in the spectrum. Returns: a two-column array, where the first column is frequency, and the second is positive PSD. """ column_names=["frequency","PSD"] ft=fourier_transform(trace, truncate=truncate, truncate_power=truncate_power, normalization=normalization, no_time=no_time, single_sided=single_sided, window=window, window_power_compensate=window_power_compensate) wrapped=wrap(ft) PSD=wrapped.from_columns((wrapped.c[0].real,abs(wrapped.c[1])**2),column_names,wrapped=False) return PSD
[docs]def get_real_part(ft): """ Get the fourier transform of the real part only from the fourier transform of a complex variable. """ re_ft=wrap(ft).copy() re_ft[1:,1]=(ft[1:,1]+ft[:0:-1,1].conjugate())*0.5 re_ft[0,1]=np.real(ft[0,1]) return re_ft.cont
[docs]def get_imag_part(ft): """ Get the fourier transform of the imaginary part only from the fourier transform of a complex variable. """ im_ft=wrap(ft).copy() im_ft[1:,1]=(im_ft[1:,1]-im_ft[:0:-1,1].conjugate())/2.j im_ft[0,1]=im_ft[0,1].imag return im_ft.cont
[docs]def get_correlations(ft_a, ft_b, zero_mean=True, normalization="none"): """ Calculate the correlation function of the two variables given their fourier transforms. Args: ft_a: first variable fourier transform ft_b: second variable fourier transform zero_mean (bool): If ``True``, the value corresponding to the zero frequency is set to zero (only fluctuations around means of a and b are calculated). normalization (str): Can be ``'whole'`` (correlations are normalized by product of PSDs derived from `ft_a` and `ft_b`) or ``'individual'`` (normalization is done for each frequency individually, so that the absolute value is always 1). """ if len(ft_a)!=len(ft_b): raise ValueError("transforms should be of the same length") corr=ft_a.copy() corr[:,1]=corr[:,1]*ft_b[:,1].conjugate() if (zero_mean): corr[len(corr)/2,1]=0. if normalization=="whole": norm_a=(abs(ft_a[:,1])**2).sum()-abs(ft_a[len(ft_a)/2,1])**2 norm_b=(abs(ft_b[:,1])**2).sum()-abs(ft_b[len(ft_b)/2,1])**2 corr[:,1]=corr[:,1]/(norm_a*norm_b)**.5 elif normalization=="individual": norm_factors=abs(ft_a[:,1]*ft_b[:,1]) corr[:,1]=corr[:,1]/norm_factors elif normalization!="none": raise ValueError("unrecognized normalization method: {0}".format(normalization)) return corr