Source code for gigaanalysis.diglock

"""GigaAnalysis - Digital Lock In - :mod:`gigaanalysis.diglock`

This program is to recreate what a lock in would do for slower measurements
but for our high field experiments. This is based around what the program in 
DRS and WUH did. This module also includes the :func:`scanning_fft` which is 
used for PDO and TDO measurements.

from .data import *

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.signal import (blackmanharris,  # for find_freq
    hamming,  # for hamming_window
    butter, filtfilt,  # for butter_bandpass and butter_bandpass_filter
    get_window)  # For sfft
from numpy.fft import rfft, rfftfreq # for find_freq
from scipy.optimize import minimize  # for find_phase, phase_in

[docs]def polypeak(signal, fit_point=3, low_f_skip=0): """Finds the largest value in a data set by fitting a parabola. It picks the largest point in the dataset and fits a quadratic parabola to it. It then uses that to get the interpolated maximum. Parameters ---------- signal : numpy.ndarray The data to interpolate the highest value of. fit_point : int, optional The number of points to use in the fit. Needs to be odd. low_f_skip : int, optional The number of points to disregard at the start of the data. Returns ------- x : float The x position as a rational number in relation to the index of the maximal value. y : float The y position of the interpolated maximum value. """ if fit_point%2 != 1: ValueError( f"fit_point needs to be odd, was {fit_point} .") m = int((fit_point - 1)/2) # Num of points either side i = low_f_skip + np.argmax(signal[low_f_skip:]) # Find highest point # Fit a quadratic and get the x and y of the highest point a, b, c = np.polyfit(np.arange(i-m,i+m+1), signal[i-m:i+m+1], 2) x = -0.5*b/a y = a*x**2 + b*x + c return x, y
[docs]def find_freq(data, samp_freq, padding=1, fit_point=3, plot=False, amp=False, skip_start=40): """Finds the dominate frequency in oscillatory data. It performs an FFT and then finds the maximal frequency using :func:`polypeak`. Parameters ---------- data : numpy.ndarray The signal in evenly spaced points samp_freq : float The measurement frequency of the data points padding : float, optional Pads the data my multiplying it before the FFT, default is 1. fit_point : int, optional Number of fit points to be used in :func:`polypeak`, default is 3. plot : bool, optional If `True` plots a figure to check the identification of the peak. amp : bool, optional If `True` returns the FFT amplitude of the frequency as well. skip_start : int, optional The number of points to skip the low frequency tail of the FFT, the default is 40. Returns ------- peak_freq : float The value of the dominate frequency. peak_amp : float If amp is `True` also returns the amplitude of the dominate frequency. """ windowed = data*blackmanharris(len(data)) n_fft = round(padding*len(windowed)) fft = abs(rfft(windowed, n=n_fft)) peak_freq, peak_amp = polypeak( fft[skip_start:], fit_point=fit_point) peak_freq = samp_freq*(peak_freq+skip_start)/n_fft if plot: # This plots the FFT it uses to find the frequency f_freq = rfftfreq(padding*len(windowed), 1/samp_freq) plt.plot(f_freq, fft, '.') plt.plot(peak_freq, peak_amp, '.') if amp: return peak_freq, peak_amp else: return peak_freq
[docs]def butter_bandpass(lowcut, highcut, fs, order=5): """Produces the polynomial values for the Butterworth bandpass. This make use of :func:`scipy.signal.butter`, and supplies values for :func:`scipy.signal.filtfilt`. Parameters ---------- lowcut : float The low frequency cut off. highcut : float The high frequency cut off. fs : float The sample frequency of the data. order : int, optional The order of the Butterworth filter, default is 5. Returns ------- b, numpy.ndarray, The numerator of the polynomials of the IIR filter. a, numpy.ndarray, The denominator of the polynomials of the IIR filter. """ nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq b, a = butter(order, [low, high], btype='band') return b, a
[docs]def butter_bandpass_filter(data, lowcut, highcut, fs, order=5): """Applies a Butterworth bandpass filter to a set of data. This makes use of :func:`butter_bandpass` and applied that filter to a given signal. Parameters ---------- data : numpy.ndarray A array containing the signal. lowcut : float The low frequency cut off. highcut : float The high frequency cut off. fs : float The sample frequency of the data points. order : int, optional The order of the filter to apply, default is 5. Returns ------- filtered : numpy.ndarray The filtered data. """ b, a = butter_bandpass(lowcut, highcut, fs, order=order) y = filtfilt(b, a, data) return y
[docs]def gen_ref(freq, fs, phase, number): """Produces the reference signal for the digital lock in. Parameters ---------- freq : float Frequency of the signal. fs : float Sample frequency of the data. phase : float Phase of the signal in degrees. number : int The number of points to generate. Returns ------- ref_signal : numpy.ndarray An array containing the reference signal. """ return np.sin( 2*np.pi*freq*np.arange(0, number, 1)/fs + phase*np.pi/180)
[docs]def find_phase(data, fs, freq): """Finds the phase of a oscillatory signal. Parameters ---------- data : numpy.ndarray An array containing the signal. fs : float Sample frequency of the data points. freq : float The frequency of the oscillatory signal. Returns ------- phase : float The phase in degrees of the oscillatory signal. """ def to_min(p): """Produces a reference signal and sums the product with the data. This is used in the optimisation routine.""" return -np.sum(data*gen_ref(freq, fs, p, len(data))) return float(minimize(to_min, 180, bounds=[(0, 360)]).x)
[docs]def round_oscillation(time_const, freq): """Rounds to nearest number of whole oscillations. Used for minimising aliasing issues. Parameters ---------- time_const : float The averaging time freq : float Frequency of the signal Returns ------- number_osc : int The closet number of oscillations in that time window. """ return round(time_const*freq)
[docs]def flat_window(time_const, fs, freq): """Produces a flat window for averaging. Uses :func:`round_oscillation` to set the window as the same length as a whole number of oscillations. Parameters ---------- time_const : float The time for averaging fs : float The sample frequency of the signal freq : float The frequency of the oscillatory signal. Returns ------- flat_window : numpy.ndarray An array to convolve with the signal that has a unit total. """ window = np.full(int(round_oscillation(time_const, freq)*fs/freq), 1.0) return window/np.sum(window)
[docs]def hamming_window(time_const, fs): """Produces a hamming window for averaging the signal. Uses a Hamming filter shape :func:`scipy.signal.hamming`. Parameters ---------- time_const : float The time for averaging, this is like the 'mean' time. fs : float The sample frequency of the data. Returns ------- hamming_window : numpy.ndarray An array to convolve with the signal that has unit total. """ window = hamming(round(time_const*fs/0.54)) return window/np.sum(window)
[docs]def ham_lock_in(signal, time_const, fs, freq, phase): """Performs a lock in of the signal and averages with a hamming window. The window from :func:`hamming_window` is convolved with the signal that has been multiplied by the reference from :func:`gen_ref`. Parameters ---------- signal : numpy.ndarray The oscillatory signal to lock in to. time_const : float The time constant for the averaging. fs : float The sample frequency of the signal data. freq : float The frequency of the oscillatory signal to lock in to. phase : float The phase of the signal is degrees. Returns ------- signal_amp : numpy.ndarray The signal after the lock in processes which is equal to the amplitude of the oscillatory signal at the given frequency. """ if time_const*fs > len(signal): raise ValueError( f"The averaging window is longer than signal.") window = hamming_window(time_const, fs) lock_ref = gen_ref(freq, fs, phase, len(signal)) return np.convolve(signal*lock_ref, window, mode='same')*np.sqrt(2)
[docs]def flat_lock_in(signal, time_const, fs, freq, phase): """Performs a lock in of the signal and averages with a flat window. The window from :func:`flat_window` is convolved with the signal that has been multiplied by the reference from :func:`gen_ref`. Parameters ---------- signal : numpy.ndarray The oscillatory signal to lock in to. time_const : float The time constant for the averaging. fs : float The sample frequency of the signal data. freq : float The frequency of the oscillatory signal to lock in to. phase : float The phase of the signal is degrees. Returns ------- signal_amp : numpy.ndarray The signal after the lock in processes which is equal to the amplitude of the oscillatory signal at the given frequency. """ if time_const*fs > len(signal): raise ValueError( f"The averaging window is longer than signal.") window = flat_window(time_const, fs, freq) lock_ref = gen_ref(freq, fs, phase, len(signal)) return np.convolve(signal*lock_ref, window, mode='same')*np.sqrt(2)
[docs]def phase_in_change(signal_in, signal_out, **kwargs): """Picks a phase to capture the majority of the change of the signal. This given an in and out of phase signal returns the phase shift to move the majority of the change in signal into the in phase component. It also chooses the phase shift so the signal is positive and between 0 and 360 deg. Uses :func:`scipy.optimize.minimize` and keyword arguments are passed to it. Parameters --------- signal_in : numpy.ndarray The values containing the in phase signal. signal_out : numpy.ndarray The values containing the out of phase signal needs to be the same shape as `signal_in`. Returns ------- max_phase : float The phase in degrees between 0 deg and 360 deg where the change in signal in the out of phase is minimised. """ if signal_in.shape != signal_out.shape: raise ValueError( f"The singal_in and singal_out arrays need to be the same " f"shape. They are {signal_in.shape} and {signal_out.shape}") scaling = 1./np.sum( np.sqrt(signal_in*signal_in + signal_out*signal_out)) def to_min(phase): """This returns the average of the square of the out of phase signal minus its mean. Used for the optimisation.""" new_out = signal_out*np.cos(phase*np.pi/180) - \ signal_in*np.sin(phase*np.pi/180) resisual = (new_out - np.average(new_out))*scaling return np.average(resisual*resisual) # Find the minimum max_phase = np.asscalar( minimize(to_min, x0=0., method='Nelder-Mead', **kwargs).x ) % 180 # Pick double value that makes signal positive if np.sum(signal_in*np.cos(max_phase*np.pi/180) + \ signal_out*np.sin(max_phase*np.pi/180)) < 0: return max_phase + 180 else: return max_phase
[docs]def phase_in_value(signal_in, signal_out, **kwargs): """Picks a phase to capture the majority of the amplitude of the signal. This given an in and out of phase signal returns the phase shift to move the majority of the signal into the in phase component. It also chooses the phase shift so the signal is positive and between 0 and 360 deg. Uses :func:`scipy.optimize.minimize` and keyword arguments are passed to it. Parameters --------- signal_in : numpy.ndarray The values containing the in phase signal. signal_out : numpy.ndarray The values containing the out of phase signal needs to be the same shape as `signal_in`. Returns ------- max_phase : float The phase in degrees between 0 deg and 360 deg where amplitude of the signal is maximised. """ if signal_in.shape != signal_out.shape: raise ValueError( f"The singal_in and singal_out arrays need to be the same " f"shape. They are {signal_in.shape} and {signal_out.shape}") scaling = 1./np.sum( np.sqrt(signal_in*signal_in + signal_out*signal_out)) def to_min(phase): """This returns the average of the square of the out of phase signal. Used for the optimisation. Scaled to help optimisation.""" new_out = signal_out*np.cos(phase*np.pi/180) - \ signal_in*np.sin(phase*np.pi/180) out_abs = np.abs(new_out*scaling) return np.average(out_abs) # Find the minimum max_phase = np.asscalar( minimize(to_min, x0=0., method='Nelder-Mead', **kwargs).x) % 180 # Pick double value that makes signal positive if np.sum(signal_in*np.cos(max_phase*np.pi/180) + \ signal_out*np.sin(max_phase*np.pi/180)) < 0: return max_phase + 180 else: return max_phase
[docs]def phase_in(signal_in, signal_out, aim='change', **kwargs): """Picks a phase that maximises something about the signal. This makes use of either :func:`phase_in_change` or :func:`phase_in_value', depending on the aim keyword. Uses :func:`scipy.optimize.minimize` and keyword arguments are passed to it. Parameters --------- signal_in : numpy.ndarray The values containing the in phase signal. signal_out : numpy.ndarray The values containing the out of phase signal needs to be the same shape as `signal_in`. aim : str, {'change', 'value'}, optional What to maximise. The default is 'change'. Returns ------- max_phase : float The best phase in degrees between 0 deg and 360 deg. """ if signal_in.shape != signal_out.shape: raise ValueError( f"The singal_in and singal_out arrays need to be the same " f"shape. They are {signal_in.shape} and {signal_out.shape}") if aim == 'change': return phase_in_change(signal_in, signal_out, **kwargs) elif aim == 'value': return phase_in_value(signal_in, signal_out, **kwargs) else: raise ValueError( f"Aim must be either 'change' or 'value', but was '{aim}'.")
[docs]def select_not_spikes(data, sdl=2., region=1001): """Identifies spikes in the data and returns a boolean array. This finds spikes in a set of data an excludes the region around them too. It does this by looking at where the value changes unusually quickly. Parameters ---------- data : numpy.ndarray The signal to check for spikes. sdl : float, optional The number of standard deviations the data need to deviate by to be considered an outlier. region : int, optional The number of points around an outlier to exclude. Default is 1001. Returns ------- good_vals : numpy.ndarray A boolean array with the same shape as the signal data with points near spikes labelled `False` and the unaffected points labelled `True`. """ change = np.append(np.diff(data), 0) spikes = abs(change - np.mean(change)) > sdl*np.std(change) good_vals = np.convolve(spikes, np.ones(region), mode='same') == 0 return good_vals
[docs]def spike_lock_in(signal, time_const, fs, freq, phase, sdl=2., region=1001): """Performs a lock in of the signal but with also spike removal. This lock in makes use of :func:`ham_lock_in`. It also removes the points effected by spikes by using :func:`select_not_spikes`. It tries to interpolate between the points. The spike removal works better with a smaller time constant. Parameters ---------- signal : numpy.ndarray The AC signal to lock in to. time_const : float The time for averaging with the hamming window. fs : float The sample frequency. freq : float The frequency of the AC signal. phase : float The phase of the AC signal in degrees. sdl : float, optional The number of standard deviations that will trigger a spike detection. The default is 2. Returns ------- locked_signal : numpy.ndarray The signal after the spike removal and lock in process. """ window = hamming_window(time_const, fs) lock_ref = gen_ref(freq, fs, phase, len(signal)) times = np.arange(len(signal)) good_points = select_not_spikes(signal*lock_ref, sdl, region) locked_in = np.convolve((signal*lock_ref)[good_points], window, mode='same')*np.sqrt(2) # step_size = int(np.ceil(len(window)/pp_window)) return np.interp(times, times[good_points], locked_in)
[docs]def scanning_fft(signal, fs, tseg, tstep, nfft=None, window='hamming', fit_point=5, low_f_skip=100, tqdm_bar=None): """Finds the changing dominate frequency of a oscillatory signal. Finds how the frequency of a oscillatory signal changes with time. This is achieved by performing many FFTs over a small window of signal which is slid along the complete signal. This is useful for extracting the measurement from PDO and TDO experiments. Parameters ---------- signal : numpy.ndarray The data to extract the signal from in the form of a 1d array. fs : float The sample frequency of the measurement signal in Hertz. tseg : float The length in time to examine for each FFT in seconds. tstep : float How far to shift the window between each FFT in seconds. nfft : None, optional The number of points to use for the FFT extra points will be zero padded. The number of points used by default is ``20*tseg*fs``, where ``tseg*fs`` is the length of the unpadded signal. window : str, optional The windowing function to used for the FFT that will be passed to :func:`scipy.signal.get_window`. THe default is 'hamming'. fit_points : int, optional The number of points to fit a parabola to identify the peak of the FFT. THe default is `5` and this is passed to :func:`polypeak`. low_f_slip : int, optional The number of points to skip when identifying the peak at the beginning of the FFT to ignore the low freq upturn. The default is `100` and this is passed to :func:`polypeak`. tqdm_bar : `tqdm.tqdm`, optional This function can be slow so a tqdm progress bar can be passed using this keyword which will be updated to show the progress of the calculation. This is done by:: from tqdm import tqdm with tqdm() as bar: res = scanning_fft(signal, fs, tseg, tstep, tqdm_bar=bar) Returns ------- times : numpy.ndarray The midpoint of the time windows which the FFTs where taken at in seconds. freqs : numpy.ndarray The frequencies of the dominate oscillatory signal against time in Hertz. amps : numpy.ndarray The amplitude of the oscillatory signal from the FFT. This should be in the units of the signal. """ nperseg = int(tseg*fs) nstep = int(tstep*fs) if nfft is None: nfft = nperseg*20 ntimepoint = int((len(signal)-nperseg+nstep)/nstep) times = (nperseg/2+np.arange(ntimepoint)*nstep)/fs freqs_i = np.zeros(ntimepoint) amps = np.zeros(ntimepoint) window = get_window('hamming', nperseg) if tqdm_bar is not None: tqdm_bar.reset(total=ntimepoint) for x in range(ntimepoint): if tqdm_bar is not None: tqdm_bar.update() freqs_i[x], amps[x] = polypeak( np.abs( np.fft.rfft(signal[x*nstep:nperseg+x*nstep]*window, n=nfft)), fit_point=fit_point, low_f_skip=low_f_skip) freqs = freqs_i*fs/nfft amps = 2*amps/np.average(window)/nperseg return times, freqs, amps