import peakutils #peak detection
import numpy as np #to handle datas
import math #to handle mathematical stuff (example power of 2)
import scipy
from scipy.signal import butter, lfilter, welch, square #for signal filtering
import matplotlib.pyplot as plt
###############################################################################
# #
# TIME DOMAIN FEATURES #
# #
###############################################################################
""" Features have been taken from: Phinyomark, A., Phukpattaranont, P., & Limsakul, C. (2012). """
[docs]def getIEMG(rawEMGSignal):
""" This function compute the sum of absolute values of EMG signal Amplitude.::
IEMG = sum(|xi|) for i = 1 --> N
* Input:
* raw EMG Signal as list
* Output:
* integrated EMG
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: the IEMG of the EMG Signal
:rtype: float
"""
IEMG = np.sum([abs(x) for x in rawEMGSignal])
return(IEMG)
[docs]def getMAV(rawEMGSignal):
""" Thif functions compute the average of EMG signal Amplitude.::
MAV = 1/N * sum(|xi|) for i = 1 --> N
* Input:
* raw EMG Signal as list
* Output:
* Mean Absolute Value
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: the MAV of the EMG Signal
:rtype: float
"""
MAV = 1/len(rawEMGSignal) * np.sum([abs(x) for x in rawEMGSignal])
return(MAV)
[docs]def getMAV1(rawEMGSignal):
""" This functoin evaluate Average of EMG signal Amplitude, using the modified version n°.1.::
IEMG = 1/N * sum(wi|xi|) for i = 1 --> N
wi = {
1 if 0.25N <= i <= 0.75N,
0.5 otherwise
}
* Input:
* raw EMG Signal as list
* Output:
* Mean Absolute Value
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: the MAV (modified version n. 1) of the EMG Signal
:rtype: float
"""
wIndexMin = int(0.25 * len(rawEMGSignal))
wIndexMax = int(0.75 * len(rawEMGSignal))
absoluteSignal = [abs(x) for x in rawEMGSignal]
IEMG = 0.5 * np.sum([x for x in absoluteSignal[0:wIndexMin]]) + np.sum([x for x in absoluteSignal[wIndexMin:wIndexMax]]) + 0.5 * np.sum([x for x in absoluteSignal[wIndexMax:]])
MAV1 = IEMG / len(rawEMGSignal)
return(MAV1)
[docs]def getMAV2(rawEMGSignal):
""" This functoin evaluate Average of EMG signal Amplitude, using the modified version n°.2.::
IEMG = 1/N * sum(wi|xi|) for i = 1 --> N
wi = {
1 if 0.25N <= i <= 0.75N,
4i/N if i < 0.25N
4(i-N)/N otherwise
}
* Input:
* raw EMG Signal as list
* Output:
* Mean Absolute Value
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: the MAV (modified version n. 2) of the EMG Signal
:rtype: float
"""
N = len(rawEMGSignal)
wIndexMin = int(0.25 * N) #get the index at 0.25N
wIndexMax = int(0.75 * N)#get the index at 0.75N
temp = [] #create an empty list
for i in range(0,wIndexMin): #case 1: i < 0.25N
x = abs(rawEMGSignal[i] * (4*i/N))
temp.append(x)
for i in range(wIndexMin,wIndexMax+1): #case2: 0.25 <= i <= 0.75N
x = abs(rawEMGSignal[i])
temp.append(x)
for i in range(wIndexMax+1,N): #case3; i > 0.75N
x = abs(rawEMGSignal[i]) * (4*(i - N) / N)
temp.append(x)
MAV2 = np.sum(temp) / N
return(MAV2)
[docs]def getSSI(rawEMGSignal):
""" This function compute the summation of square values of the EMG signal.::
SSI = sum(xi**2) for i = 1 --> N
* Input:
* raw EMG Signal as list
* Output:
* Simple Square Integral
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: SSI of the signal
:rtype: float
"""
SSI = np.sum([x**2 for x in rawEMGSignal])
return(SSI)
[docs]def getVAR(rawEMGSignal):
""" Summation of average square values of the deviation of a variable.::
VAR = (1 / (N - 1)) * sum(xi**2) for i = 1 --> N
* Input:
* raw EMG Signal as list
* Output:
* Summation of the average square values
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: the VAR of the EMG Signal
:rtype: float
"""
SSI = np.sum([x**2 for x in rawEMGSignal])
N = len(rawEMGSignal)
VAR = SSI* (1 / (N - 1))
return(VAR)
[docs]def getTM(rawEMGSignal, order):
"""
This function compute the Temporal Moment of order X of the EMG signal.::
TM = (1 / N * sum(xi**order) for i = 1 --> N
* Input:
* raw EMG Signal as list
* Output:
* TM of order = order
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:param order: order the the TM function
:type order: int
:return: Temporal Moment of order X of the EMG signal
:rtype: float
"""
N = len(rawEMGSignal)
TM = abs((1/N) * np.sum([x**order for x in rawEMGSignal]))
return(TM)
[docs]def getRMS(rawEMGSignal):
""" Get the root mean square of a signal.::
RMS = (sqrt( (1 / N) * sum(xi**2))) for i = 1 --> N
* Input:
* raw EMG Signal as list
* Output:
* Root mean square of the signal
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: Root mean square of the EMG signal
:rtype: float
"""
N = len(rawEMGSignal)
RMS = np.sqrt((1/N) * np.sum([x**2 for x in rawEMGSignal]))
return(RMS)
[docs]def getLOG(rawEMGSignal):
""" LOG is a feature that provides an estimate of the muscle contraction force.::
LOG = e^((1/N) * sum(|xi|)) for x i = 1 --> N
* Input:
* raw EMG Signal
* Output = * LOG
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: LOG feature of the EMG Signal
:rtype: float
"""
LOG = math.exp( (1/len(rawEMGSignal)) * sum([abs(x) for x in rawEMGSignal]))
return(LOG)
[docs]def getWL(rawEMGSignal):
""" Get the waveform length of the signal, a measure of complexity of the EMG Signal.::
WL = sum(|x(i+1) - xi|) for i = 1 --> N-1
* Input:
* raw EMG Signal as list
* Output:
* wavelength of the signal
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: Waveform length of the signal
:rtype: float
"""
N = len(rawEMGSignal)
temp = []
for i in range(0,N-1):
temp.append(abs(rawEMGSignal[i+1] - rawEMGSignal[i]))
WL = sum(temp)
return(WL)
[docs]def getAAC(rawEMGSignal):
""" Get the Average amplitude change.::
AAC = 1/N * sum(|x(i+1) - xi|) for i = 1 --> N-1
* Input:
* raw EMG Signal as list
* Output:
* Average amplitude change of the signal
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: Average Amplitude Change of the signal
:rtype: float
"""
N = len(rawEMGSignal)
WL = getWL(rawEMGSignal)
ACC = 1/N * WL
return(ACC)
[docs]def getDASDV(rawEMGSignal):
""" Get the standard deviation value of the the wavelength.::
DASDV = sqrt( (1 / (N-1)) * sum((x[i+1] - x[i])**2 ) for i = 1 --> N - 1
* Input:
* raw EMG Signal
* Output:
* DASDV
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:return: standard deviation value of the the wavelength
:rtype: float
"""
N = len(rawEMGSignal)
temp = []
for i in range(0,N-1):
temp.append((rawEMGSignal[i+1] - rawEMGSignal[i])**2)
DASDV = (1 / (N - 1)) * sum(temp)
return(DASDV)
[docs]def getAFB(rawEMGSignal,samplerate, windowSize=32):
""" Get the amplitude at first Burst.
Reference: Du, S., & Vuskovic, M. (2004, November). Temporal vs. spectral approach to feature extraction from prehensile EMG signals. In Information Reuse and Integration, 2004. IRI 2004. Proceedings of the 2004 IEEE International Conference on (pp. 344-350). IEEE.
* Input:
* rawEMGSignal as list
* samplerate of the signal in Hz (sample / s)
* windowSize = window size in ms
* Output:
* amplitude at first burst
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:param samplerate: samplerate of the signal int Hz
:type samplerate: int
:param windowSize: window size in ms to use for the analysis
:type windowsSize: int
:return: Amplitute ad first Burst
:rtype: float
"""
squaredSignal = square(rawEMGSignal) #squaring the signal
windowSample = int((windowSize * 1000) / samplerate) #get the number of samples for each window
w = np.hamming(windowSample)
#From: http://scipy-cookbook.readthedocs.io/items/SignalSmooth.html
filteredSignal = np.convolve(w/w.sum(),squaredSignal,mode='valid')
peak = peakutils.indexes(filteredSignal)[0]
AFB = filteredSignal[peak]
return(AFB)
[docs]def getZC(rawEMGSignal, threshold):
""" How many times does the signal crosses the 0 (+-threshold).::
ZC = sum([sgn(x[i] X x[i+1]) intersecated |x[i] - x[i+1]| >= threshold]) for i = 1 --> N - 1
sign(x) = {
1, if x >= threshold
0, otherwise
}
* Input:
* rawEMGSignal = EMG signal as list
* threshold = threshold to use in order to avoid fluctuations caused by noise and low voltage fluctuations
* Output:
* ZC index
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:param threshold: value to sum / substract to the zero when evaluating the crossing.
:type threshold: int
:return: Number of times the signal crosses the 0 (+- threshold)
:rtype: float
"""
positive = (rawEMGSignal[0] > threshold)
ZC = 0
for x in rawEMGSignal[1:]:
if(positive):
if(x < 0 -threshold):
positive = False
ZC += 1
else:
if(x > 0 + threshold):
positive = True
ZC += 1
return(ZC)
[docs]def getMYOP(rawEMGSignal, threshold):
""" The myopulse percentage rate (MYOP) is an average value of myopulse output.
It is defined as one absolute value of the EMG signal exceed a pre-defined thershold value. ::
MYOP = (1/N) * sum(|f(xi)|) for i = 1 --> N
f(x) = {
1 if x >= threshold
0 otherwise
}
* Input:
* rawEMGSignal = EMG signal as list
* threshold = threshold to avoid fluctuations caused by noise and low voltage fluctuations
* Output:
* Myopulse percentage rate
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:param threshold: value to sum / substract to the zero when evaluating the crossing.
:type threshold: int
:return: Myopulse percentage rate of the signal
:rtype: float
"""
N = len(rawEMGSignal)
MYOP = len([1 for x in rawEMGSignal if abs(x) >= threshold]) / N
return(MYOP)
[docs]def getWAMP(rawEMGSignal, threshold):
""" Wilson or Willison amplitude is a measure of frequency information.
It is a number of time resulting from difference between the EMG signal of two adjoining segments, that exceed a threshold.::
WAMP = sum( f(|x[i] - x[i+1]|)) for n = 1 --> n-1
f(x){
1 if x >= threshold
0 otherwise
}
* Input:
* rawEMGSignal = EMG signal as list
* threshold = threshold to avoid fluctuations caused by noise and low voltage fluctuations
* Output:
* Wilson Amplitude value
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:param threshold: value to sum / substract to the zero when evaluating the crossing.
:type threshold: int
:return: Willison amplitude
:rtype: float
"""
N = len(rawEMGSignal)
WAMP = 0
for i in range(0,N-1):
x = rawEMGSignal[i] - rawEMGSignal[i+1]
if(x >= threshold):
WAMP += 1
return(WAMP)
[docs]def getSSC(rawEMGSignal,threshold):
""" Number of times the slope of the EMG signal changes sign.::
SSC = sum(f( (x[i] - x[i-1]) X (x[i] - x[i+1]))) for i = 2 --> n-1
f(x){
1 if x >= threshold
0 otherwise
}
* Input:
* raw EMG Signal
* Output:
* number of Slope Changes
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:param threshold: value to sum / substract to the zero when evaluating the crossing.
:type threshold: int
:return: Number of slope's sign changes
:rtype: int
"""
N = len(rawEMGSignal)
SSC = 0
for i in range(1,N-1):
a,b,c = [rawEMGSignal[i-1],rawEMGSignal[i],rawEMGSignal[i+1]]
if(a + b + c >= threshold *3 ): #computed only if the 3 values are above the threshold
if(a < b > c or a > b < c ): #if there's change in the slope
SSC += 1
return(SSC)
[docs]def getMAVSLPk(rawEMGSignal, nseg):
""" Mean Absolute value slope is a modified versions of MAV feature.
The MAVs of adiacent segments are determinated. ::
MAVSLPk = MAV[k+1] - MAV[k]; k = 1,..,k+1
* Input:
* raw EMG signal as list
* nseg = number of segments to evaluate
* Output:
* list of MAVs
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:param nseg: number of segments to evaluate
:type nseg: int
:return: Mean absolute slope value
:rtype: float
"""
N = len(rawEMGSignal)
lenK = int(N / nseg) #length of each segment to compute
MAVSLPk = []
for s in range(0,N,lenK):
MAVSLPk.append(getMAV(rawEMGSignal[s:s+lenK]))
return(MAVSLPk)
[docs]def getHIST(rawEMGSignal,nseg=9,threshold=50):
""" Histograms is an extension version of ZC and WAMP features.
* Input:
* raw EMG Signal as list
* nseg = number of segment to analyze
* threshold = threshold to use to avoid DC fluctuations
* Output:
* get zc/wamp for each segment
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:param nseg: number of segments to analyze
:type nseg: int
:param threshold: value to sum / substract to the zero when evaluating the crossing.
:type threshold: int
:return: Willison amplitude
:rtype: float
"""
segmentLength = int(len(rawEMGSignal) / nseg)
HIST = {}
for seg in range(0,nseg):
HIST[seg+1] = {}
thisSegment = rawEMGSignal[seg * segmentLength: (seg+1) * segmentLength]
HIST[seg+1]["ZC"] = getZC(thisSegment,threshold)
HIST[seg+1]["WAMP"] = getWAMP(thisSegment,threshold)
return(HIST)
###############################################################################
# #
# FREQUENCY DOMAIN FEATURES #
# #
###############################################################################
""" This section contains all the functions used in frequency analysis """
[docs]def getMNF(rawEMGPowerSpectrum, frequencies):
""" Obtain the mean frequency of the EMG signal, evaluated as the sum of
product of the EMG power spectrum and the frequency divided by total sum of the spectrum intensity::
MNF = sum(fPj) / sum(Pj) for j = 1 -> M
M = length of the frequency bin
Pj = power at freqeuncy bin j
fJ = frequency of the spectrum at frequency bin j
* Input:
* rawEMGPowerSpectrum: PSD as list
* frequencies: frequencies of the PSD spectrum as list
* Output:
* Mean Frequency of the PSD
:param rawEMGPowerSpectrum: power spectrum of the EMG signal
:type rawEMGPowerSpectrum: list
:param frequencies: frequencies of the PSD
:type frequencies: list
:return: mean frequency of the EMG power spectrum
:rtype: float
"""
a = []
for i in range(0, len(frequencies)):
a.append(frequencies[i] * rawEMGPowerSpectrum[i])
b = sum(rawEMGPowerSpectrum)
MNF = sum(a) / b
return(MNF)
[docs]def getMDF(rawEMGPowerSpectrum, frequencies):
""" Obtain the Median Frequency of the PSD.
MDF is a frequency at which the spectrum is divided into two regions with equal amplitude, in other words, MDF is half of TTP feature
* Input:
* raw EMG Power Spectrum
* frequencies
* Output:
* Median Frequency (Hz)
:param rawEMGPowerSpectrum: power spectrum of the EMG signal
:type rawEMGPowerSpectrum: list
:param frequencies: frequencies of the PSD
:type frequencies: list
:return: median frequency of the EMG power spectrum
:rtype: float
"""
MDP = sum(rawEMGPowerSpectrum) * (1/2)
for i in range(1, len(rawEMGPowerSpectrum)):
if(sum(rawEMGPowerSpectrum[0:i]) >= MDP):
return(frequencies[i])
[docs]def getPeakFrequency(rawEMGPowerSpectrum, frequencies):
""" Obtain the frequency at which the maximum peak occur
* Input:
* raw EMG Power Spectrum as list
* frequencies as list
* Output:
* frequency in Hz
:param rawEMGPowerSpectrum: power spectrum of the EMG signal
:type rawEMGPowerSpectrum: list
:param frequencies: frequencies of the PSD
:type frequencies: list
:return: peakfrequency of the EMG Power spectrum
:rtype: float
"""
peakFrequency = frequencies[np.argmax(rawEMGPowerSpectrum)]
return(peakFrequency)
[docs]def getMNP(rawEMGPowerSpectrum):
""" This functions evaluate the mean power of the spectrum.::
Mean Power = sum(Pj) / M, j = 1 --> M, M = len of the spectrum
* Input:
* EMG power spectrum
* Output:
* mean power
:param rawEMGPowerSpectrum: power spectrum of the EMG signal
:type rawEMGPowerSpectrum: list
:param frequencies: frequencies of the PSD
:type frequencies: list
:return: mean power of the EMG power spectrum
:rtype: float
"""
MNP = np.mean(rawEMGPowerSpectrum)
return(MNP)
[docs]def getTTP(rawEMGPowerSpectrum):
""" This functions evaluate the aggregate of the EMG power spectrum (aka Zero Spectral Moment)
* Input:
* raw EMG Power Spectrum
* Output:
* Total Power
:param rawEMGPowerSpectrum: power spectrum of the EMG signal
:type rawEMGPowerSpectrum: list
:param frequencies: frequencies of the PSD
:type frequencies: list
:return: total power of the EMG power spectrum
:rtype: float
"""
TTP = sum(rawEMGPowerSpectrum)
return(TTP)
[docs]def getSM(rawEMGPowerSpectrum, frequencies, order):
""" Get the spectral moment of a spectrum::
SM = sum(fj*(Pj**order)), j = 1 --> M
* Input:
* raw EMG Power Spectrum
* frequencies as list
* order (int)
* Output:
* SM of order = order
:param rawEMGPowerSpectrum: power spectrum of the EMG signal
:type rawEMGPowerSpectrum: list
:param frequencies: frequencies of the PSD
:type frequencies: list
:param order: order to the moment
:type order: int
:return: Spectral moment of order X of the EMG power spectrum
:rtype: float
"""
SMo = []
for j in range(0, len(frequencies)):
SMo.append(frequencies[j]*(rawEMGPowerSpectrum[j] ** order))
SMo = sum(SMo)
return(SMo)
[docs]def getFR(rawEMGPowerSpectrum, frequencies, llc=30, ulc=250, lhc=250,uhc=500):
""" This functions evaluate the frequency ratio of the power spectrum.
Cut-off value can be decidec experimentally or from the MNF Feature See: Oskoei, M.A., Hu, H. (2006). GA-based feature subset selection for myoelectric classification.
* Input:
* raw EMG power spectrum as list,
* frequencies as list,
* llc = lower low cutoff
* ulc = upper low cutoff
* lhc = lower high cutoff
* uhc = upper high cutoff
* Output:
* Frequency Ratio
:param rawEMGPowerSpectrum: power spectrum of the EMG signal
:type rawEMGPowerSpectrum: list
:param frequencies: frequencies of the PSD
:type frequencies: list
:param llc: lower cutoff frequency for the low frequency components
:type llc: float
:param ulc: upper cutoff frequency for the low frequency components
:type ulc: float
:param lhc: lower cutoff frequency for the high frequency components
:type lhc: float
:param uhc: upper cutoff frequency for the high frequency components
:type uhc: float
:return: frequencies ratio of the EMG power spectrum
:rtype: float
"""
frequencies = list(frequencies)
#First we check for the closest value into the frequency list to the cutoff frequencies
llc = min(frequencies, key=lambda x:abs(x-llc))
ulc = min(frequencies, key=lambda x:abs(x-ulc))
lhc = min(frequencies, key=lambda x:abs(x-lhc))
uhc = min(frequencies, key=lambda x:abs(x-uhc))
LF = sum([P for P in rawEMGPowerSpectrum[frequencies.index(llc):frequencies.index(ulc)]])
HF = sum([P for P in rawEMGPowerSpectrum[frequencies.index(lhc):frequencies.index(uhc)]])
FR = LF / HF
return(FR)
[docs]def getPSR(rawEMGPowerSpectrum,frequencies,n=20,fmin=10,fmax=500):
""" This function computes the Power Spectrum Ratio of the signal, defined as:
Ratio between the energy P0 which is nearby the maximum value of the EMG power spectrum and the energy P which is the whole energy of the EMG power spectrum
* Input:
* EMG power spectrum
* frequencies as list
* n = range around f0 to evaluate P0
* fmin = min frequency
* fmax = max frequency
:param rawEMGPowerSpectrum: power spectrum of the EMG signal
:type rawEMGPowerSpectrum: list
:param frequencies: frequencies of the PSD
:type frequencies: list
:param n: range of frequencies around f0 to evaluate
:type n: int
:param fmin: min frequency to evaluate
:type fmin: int
:param fmax: lmaximum frequency to evaluate
:type fmax: int
:return: Power spectrum ratio of the EMG power spectrum
:rtype: float
"""
frequencies = list(frequencies)
#The maximum peak and frequencies are evaluate using the getPeakFrequency functions
#First we check for the closest value into the frequency list to the cutoff frequencies
peakFrequency = getPeakFrequency(rawEMGPowerSpectrum, frequencies)
f0min = peakFrequency - n
f0max = peakFrequency + n
f0min = min(frequencies, key=lambda x:abs(x-f0min))
f0max = min(frequencies, key=lambda x:abs(x-f0max))
fmin = min(frequencies, key=lambda x:abs(x-fmin))
fmax = min(frequencies, key=lambda x:abs(x-fmax))
#here we evaluate P0 and P
P0 = sum(rawEMGPowerSpectrum[frequencies.index(f0min):frequencies.index(f0max)])
P = sum(rawEMGPowerSpectrum[frequencies.index(fmin):frequencies.index(fmax)])
PSR = P0 / P
return(PSR)
[docs]def getVCF(SM0,SM1,SM2):
"""This function evaluate the variance of the central freuency of the PSD.::
VCF = (1 / SM0)*sum(Pj*(fj - fc)**2),j = 1 --> M, = SM2 / SM0 - (SM1 /SM0) **2
* Input:
* SM0: spectral moment of order 0
* SM1: spectral moment of order 1
* SM2: spectral moment of order 0
* Output:
* Variance of Central frequency of the Power spectrum
:param SM0: Spectral moment of order 0
:type SM0: float
:param SM1: Spectral moment of order 1
:type SM1: float
:param SM2: Spectral moment of order 2
:type SM2: float
:return: Variance of central frequency
:rtype: float
"""
VCF = (SM2 / SM0) - (SM1/SM0)**2
return(VCF)
###############################################################################
# #
# PREPROCESSING #
# #
###############################################################################
[docs]def phasicFilter(rawEMGSignal,samplerate, seconds=4):
""" Apply a phasic filter to the signal, with +-4 seconds from each sample
* Input:
* rawEMGSignal = emg signal as list
* samplerate = samplerate of the signal
* Output:
* phasic filtered signal
:param rawEMGSignal: the raw EMG signal
:type rawEMGSignal: list
:param samplerate: samplerate of the signal in Hz
:type samplerate: int
:return: the phasic filtered signal
:rtype: list
"""
phasicSignal = []
for sample in range(0,len(rawEMGSignal)):
smin = sample - 4 * samplerate #min sample index
smax = sample + 4 * samplerate #max sample index
#is smin is < 0 or smax > signal length, fix it to the closest real sample
if(smin < 0):
smin = sample
if(smax > len(rawEMGSignal)):
smax = sample
#substract the mean of the segment
newsample = rawEMGSignal[sample] - np.median(rawEMGSignal[smin:smax])
#move to th
phasicSignal.append(newsample)
return(phasicSignal)
def getPSD(rawEMGSignal, samplerate):
frequencies, psd = welch(rawEMGSignal, fs=samplerate,
window='hanning', # apply a Hanning window before taking the DFT
nperseg=256, # compute periodograms of 256-long segments of x
detrend='constant',scaling="spectrum") # detrend x by subtracting the mean
return([psd,frequencies])
###############################################################################
# #
# FILTERS #
# #
###############################################################################
#Define the filters
[docs]def butter_lowpass(cutoff, fs, order=5):
""" This functions generates a lowpass butter filter
:param cutoff: cutoff frequency
:type cutoff: float
:param cutoff: cutoff frequency
:type cutoff: float
:param fs: samplerate of the signal
:type fs: float
:param order: order of the Butter Filter
:type order: int
:return: butter lowpass filter
:rtype: list
"""
nyq = 0.5 * fs #Nyquist frequeny is half the sampling frequency
normal_cutoff = cutoff / nyq
b, a = butter(order, normal_cutoff, btype='low', analog=False)
return(b, a)
[docs]def butter_highpass(cutoff, fs, order=5):
""" This functions generates a higpass butter filter
:param cutoff: cutoff frequency
:type cutoff: float
:param cutoff: cutoff frequency
:type cutoff: float
:param fs: samplerate of the signal
:type fs: float
:param order: order of the Butter Filter
:type order: int
:return: butter highpass filter
:rtype: list
"""
nyq = 0.5 * fs #Nyquist frequeny is half the sampling frequency
normal_cutoff = cutoff / nyq
b, a = butter(order, normal_cutoff, btype='high', analog=False)
return(b, a)
[docs]def butter_lowpass_filter(data, cutoff, fs, order):
""" This functions apply a butter lowpass filter to a signal
:param data: ECG signal
:type data: list
:param cutoff: cutoff frequency
:type cutoff: float
:param cutoff: cutoff frequency
:type cutoff: float
:param fs: samplerate of the signal
:type fs: float
:param order: order of the Butter Filter
:type order: int
:return: lowpass filtered ECG signal
:rtype: list
"""
b, a = butter_lowpass(cutoff, fs, order=order)
y = lfilter(b, a, data)
return(y)
[docs]def butter_highpass_filter(data, cutoff, fs, order):
""" This functions apply a butter highpass filter to a signal
:param data: ECG signal
:type data: list
:param cutoff: cutoff frequency
:type cutoff: float
:param cutoff: cutoff frequency
:type cutoff: float
:param fs: samplerate of the signal
:type fs: float
:param order: order of the Butter Filter
:type order: int
:return: highpass filtered ECG signal
:rtype: list
"""
b, a = butter_highpass(cutoff, fs, order=order)
y = lfilter(b, a, data)
return(y)
###############################################################################
# #
# ENTRYPOINT #
# #
###############################################################################
[docs]def analyzeEMG(rawEMGSignal, samplerate, preprocessing=True,lowpass=50,highpass=20,threshold = 0.01 ,nseg=3,phasic_seconds=4):
""" This functions acts as entrypoint for the EMG Analysis.
* Input:
* rawEMGSignal = raw signal as list
* samplerate = samplerate of the signal
* lowpass = lowpass cutoff in Hz
* highpass = highpass cutoff in Hz
* threshold for the evaluation of ZC,MYOP,WAMP,SSC
* nseg = number of segments for MAVSLPk, MHW,MTW
* Output:
* results dictionary
"""
resultsdict = {"TimeDomain":{},"FrequencyDomain":{}}
if(preprocessing):
#Preprocessing
filteredEMGSignal = butter_lowpass_filter(rawEMGSignal, lowpass, samplerate, 2)#filter the signal with a cutoff at 1Hz and a 2th order Butterworth filter
filteredEMGSignal = butter_highpass_filter(filteredEMGSignal, highpass, samplerate, 2)#filter the signal with a cutoff at 0.05Hz and a 2th order Butterworth filter
filteredEMGSignal = phasicFilter(filteredEMGSignal, samplerate,seconds=phasic_seconds)
else:
filteredEMGSignal = rawEMGSignal
#Time Domain Analysis
resultsdict["TimeDomain"]["IEMG"] = getIEMG(filteredEMGSignal)
resultsdict["TimeDomain"]["MAV"] = getMAV(filteredEMGSignal)
resultsdict["TimeDomain"]["MAV1"] = getMAV1(filteredEMGSignal)
resultsdict["TimeDomain"]["MAV2"] = getMAV2(filteredEMGSignal)
resultsdict["TimeDomain"]["SSI"] = getSSI(filteredEMGSignal)
resultsdict["TimeDomain"]["VAR"] = getVAR(filteredEMGSignal)
resultsdict["TimeDomain"]["TM3"] = getTM(filteredEMGSignal,3)
resultsdict["TimeDomain"]["TM4"] = getTM(filteredEMGSignal,4)
resultsdict["TimeDomain"]["TM5"] = getTM(filteredEMGSignal,5)
resultsdict["TimeDomain"]["LOG"] = getLOG(filteredEMGSignal)
resultsdict["TimeDomain"]["RMS"] = getRMS(filteredEMGSignal)
resultsdict["TimeDomain"]["WL"] = getWL(filteredEMGSignal)
resultsdict["TimeDomain"]["AAC"] = getAAC(filteredEMGSignal)
resultsdict["TimeDomain"]["DASDV"] = getDASDV(filteredEMGSignal)
resultsdict["TimeDomain"]["AFB"] = getAFB(filteredEMGSignal,samplerate)
resultsdict["TimeDomain"]["ZC"] = getZC(filteredEMGSignal,threshold)
resultsdict["TimeDomain"]["MYOP"] = getMYOP(filteredEMGSignal,threshold)
resultsdict["TimeDomain"]["WAMP"] = getWAMP(filteredEMGSignal,threshold)
resultsdict["TimeDomain"]["SSC"] = getSSC(filteredEMGSignal,threshold)
resultsdict["TimeDomain"]["MAVSLPk"] = getMAVSLPk(filteredEMGSignal,nseg)
resultsdict["TimeDomain"]["HIST"] = getHIST(filteredEMGSignal,threshold=threshold)
#Frequency Domain Analysis
rawEMGPowerSpectrum, frequencies = getPSD(filteredEMGSignal,samplerate)
resultsdict["FrequencyDomain"]["MNF"] = getMNF(rawEMGPowerSpectrum, frequencies)
resultsdict["FrequencyDomain"]["MDF"] = getMDF(rawEMGPowerSpectrum, frequencies)
resultsdict["FrequencyDomain"]["PeakFrequency"] = getPeakFrequency(rawEMGPowerSpectrum, frequencies)
resultsdict["FrequencyDomain"]["MNP"] = getMNP(rawEMGPowerSpectrum)
resultsdict["FrequencyDomain"]["TTP"] = getTTP(rawEMGPowerSpectrum)
resultsdict["FrequencyDomain"]["SM1"] = getSM(rawEMGPowerSpectrum,frequencies,1)
resultsdict["FrequencyDomain"]["SM2"] = getSM(rawEMGPowerSpectrum,frequencies,2)
resultsdict["FrequencyDomain"]["SM3"] = getSM(rawEMGPowerSpectrum,frequencies,3)
resultsdict["FrequencyDomain"]["FR"] = getFR(rawEMGPowerSpectrum,frequencies)
resultsdict["FrequencyDomain"]["PSR"] = getPSR(rawEMGPowerSpectrum,frequencies)
resultsdict["FrequencyDomain"]["VCF"] = getVCF(resultsdict["FrequencyDomain"]["TTP"],resultsdict["FrequencyDomain"]["SM1"],resultsdict["FrequencyDomain"]["SM2"])
return(resultsdict)
###############################################################################
# #
# DEBUG #
# #
###############################################################################
""" For debug purposes. This runs only if this file is loaded directly and not imported """
if(__name__=='__main__'):
import os
import pickle
import pprint
import sampledata
fakesignal = sampledata.loadsampleEMG()
events = [30] #set a fake event
tmin = 0 #start from the beginning of the events
tmax = 5 #end from the beginning of the events
samplerate = 1000 #samplerate of the fake signal
for event in events: #for each event
smin = tmin*samplerate + event
smax = tmax*samplerate + event
eventSignal = fakesignal[smin:smax]
analyzedEMG = analyzeEMG(eventSignal,samplerate,preprocessing=False) #analyze it
pprint.pprint(analyzedEMG) #print the results of the analysis