Source code for pliers.extractors.audio

''' Extractors that operate on AudioStim inputs. '''

from abc import ABCMeta
from os import path
import sys
import logging

import numpy as np
from scipy import fft
import pandas as pd

from pliers.stimuli.audio import AudioStim
from pliers.stimuli.text import ComplexTextStim
from pliers.extractors.base import Extractor, ExtractorResult
from pliers.utils import attempt_to_import, verify_dependencies, listify
from pliers.support.exceptions import MissingDependencyError
from pliers.support.setup_yamnet import YAMNET_PATH

librosa = attempt_to_import('librosa')
tf = attempt_to_import('tensorflow')


class AudioExtractor(Extractor):

    ''' Base Audio Extractor class; all subclasses can only be applied to
    audio. '''
    _input_type = AudioStim


[docs]class STFTAudioExtractor(AudioExtractor): ''' Short-time Fourier Transform extractor. Args: frame_size (float): The width of the frame/window to apply an FFT to, in seconds. hop_size (float): The step size to increment the window by on each iteration, in seconds (effectively, the sampling rate). freq_bins (list or int): The set of bins or frequency bands to extract power for. If an int is passed, this is the number of bins returned, with each bin spanning an equal range of frequencies. E.g., if bins=5 and the frequency spectrum runs from 0 to 20KHz, each bin will span 4KHz. If a list is passed, each element must be a tuple or list of lower and upper frequency bounds. E.g., passing [(0, 300), (300, 3000)] would compute power in two bands, one between 0 and 300Hz, and one between 300Hz and 3KHz. spectrogram (bool): If True, plots a spectrogram of the results. Notes: code adapted from http://stackoverflow.com/questions/2459295/invertible-stft-and-istft-in-python ''' _log_attributes = ('frame_size', 'hop_size', 'freq_bins') VERSION = '1.0'
[docs] def __init__(self, frame_size=0.5, hop_size=0.1, freq_bins=5, spectrogram=False): self.frame_size = frame_size self.hop_size = hop_size self.spectrogram = spectrogram self.freq_bins = freq_bins super().__init__()
def _stft(self, stim): x = stim.data framesamp = int(self.frame_size * stim.sampling_rate) hopsamp = int(self.hop_size * stim.sampling_rate) w = np.hanning(framesamp) X = np.array([fft.fft(w * x[i:(i + framesamp)]) for i in range(0, len(x) - framesamp, hopsamp)]) nyquist_lim = int(X.shape[1] // 2) X = np.log(X[:, :nyquist_lim]) X = np.absolute(X) if self.spectrogram: import matplotlib.pyplot as plt bins = np.fft.fftfreq(framesamp, d=1. / stim.sampling_rate) bins = bins[:nyquist_lim] plt.imshow(X.T, origin='lower', aspect='auto', interpolation='nearest', cmap='RdYlBu_r', extent=[0, stim.duration, bins.min(), bins.max()]) plt.xlabel('Time') plt.ylabel('Frequency') plt.colorbar() plt.show() return X def _extract(self, stim): data = self._stft(stim) time_bins = np.arange(0., stim.duration - self.frame_size, self.hop_size) if isinstance(self.freq_bins, int): bins = [] bin_size = int(data.shape[1] / self.freq_bins) for i in range(self.freq_bins): if i == self.freq_bins - 1: bins.append((i * bin_size, data.shape[1])) else: bins.append((i * bin_size, (i + 1) * bin_size)) self.freq_bins = bins features = ['%d_%d' % fb for fb in self.freq_bins] offset = 0.0 if stim.onset is None else stim.onset index = [tb + offset for tb in time_bins] values = np.zeros((len(index), len(features))) for i, fb in enumerate(self.freq_bins): start, stop = fb values[:, i] = data[:, start:stop].mean(1) values[np.isnan(values)] = 0. values[np.isinf(values)] = 0. return ExtractorResult(values, stim, self, features=features, onsets=index, durations=self.hop_size, orders=list(range(len(index))))
[docs]class MeanAmplitudeExtractor(Extractor): ''' Mean amplitude extractor for blocks of audio with transcription. ''' _input_type = (AudioStim, ComplexTextStim) def _extract(self, stim): amps = stim.audio.data sampling_rate = stim.audio.sampling_rate elements = stim.complex_text.elements values, onsets, durations = [], [], [] for i, el in enumerate(elements): onset = sampling_rate * el.onset onsets.append(onset) duration = sampling_rate * el.duration durations.append(duration) r_onset = np.round(onset).astype(int) r_offset = np.round(onset + duration).astype(int) if not r_offset <= amps.shape[0]: raise Exception('Block ends after data.') mean_amplitude = np.mean(amps[r_onset:r_offset]) values.append(mean_amplitude) orders = list(range(len(elements))) return ExtractorResult(values, stim, self, features=['mean_amplitude'], onsets=onsets, durations=durations, orders=orders)
class LibrosaFeatureExtractor(AudioExtractor, metaclass=ABCMeta): ''' A generic class for audio extractors using the librosa library. ''' _log_attributes = ('hop_length', 'librosa_kwargs') def __init__(self, feature=None, hop_length=512, **librosa_kwargs): verify_dependencies(['librosa']) if feature: self._feature = feature self.hop_length = hop_length self.librosa_kwargs = librosa_kwargs super().__init__() def get_feature_names(self): return self._feature def _get_values(self, stim): if self._feature in ['zero_crossing_rate', 'rms', 'spectral_flatness']: return getattr(librosa.feature, self._feature)( y=stim.data, hop_length=self.hop_length, **self.librosa_kwargs) elif self._feature == 'tonnetz': return getattr(librosa.feature, self._feature)( y=stim.data, sr=stim.sampling_rate, **self.librosa_kwargs) elif self._feature in ['onset_detect', 'onset_strength_multi']: return getattr(librosa.onset, self._feature)( y=stim.data, sr=stim.sampling_rate, hop_length=self.hop_length, **self.librosa_kwargs) elif self._feature in ['tempo', 'beat_track']: return getattr(librosa.beat, self._feature)( y=stim.data, sr=stim.sampling_rate, hop_length=self.hop_length, **self.librosa_kwargs) elif self._feature in ['harmonic', 'percussive']: return getattr(librosa.effects, self._feature)( y=stim.data, **self.librosa_kwargs) elif self._feature == 'yin': return getattr(librosa, self._feature)( y=stim.data, sr=stim.sampling_rate, hop_length=self.hop_length, **self.librosa_kwargs) else: return getattr(librosa.feature, self._feature)( y=stim.data, sr=stim.sampling_rate, hop_length=self.hop_length, **self.librosa_kwargs) def _extract(self, stim): values = self._get_values(stim) if self._feature=='beat_track': beats=np.array(values[1]) values=beats values = values.T n_frames = len(values) feature_names = listify(self.get_feature_names()) onsets = librosa.frames_to_time(range(n_frames), sr=stim.sampling_rate, hop_length=self.hop_length) onsets = onsets + stim.onset if stim.onset else onsets durations = [self.hop_length / float(stim.sampling_rate)] * n_frames return ExtractorResult(values, stim, self, features=feature_names, onsets=onsets, durations=durations, orders=list(range(n_frames)))
[docs]class SpectralCentroidExtractor(LibrosaFeatureExtractor): ''' Extracts the spectral centroids from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.spectral_centroid.html.''' _feature = 'spectral_centroid'
[docs]class SpectralBandwidthExtractor(LibrosaFeatureExtractor): ''' Extracts the p'th-order spectral bandwidth from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.spectral_bandwidth.html.''' _feature = 'spectral_bandwidth'
[docs]class SpectralFlatnessExtractor(LibrosaFeatureExtractor): ''' Computes the spectral flatness from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.spectral_flatness.html.''' _feature = 'spectral_flatness'
[docs]class SpectralContrastExtractor(LibrosaFeatureExtractor): ''' Extracts the spectral contrast from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.spectral_contrast.html.''' _feature = 'spectral_contrast'
[docs] def __init__(self, n_bands=6, **kwargs): self.n_bands = n_bands super().__init__( n_bands=n_bands, **kwargs)
def get_feature_names(self): abc= ['spectral_contrast_band_%d' % i for i in range(self.n_bands + 1)] return abc
[docs]class SpectralRolloffExtractor(LibrosaFeatureExtractor): ''' Extracts the roll-off frequency from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.spectral_rolloff.html.''' _feature = 'spectral_rolloff'
[docs]class PolyFeaturesExtractor(LibrosaFeatureExtractor): ''' Extracts the coefficients of fitting an nth-order polynomial to the columns of an audio's spectrogram (via Librosa). For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.poly_features.html.''' _feature = 'poly_features'
[docs] def __init__(self, order=1, **kwargs): self.order = order super().__init__(order=order, **kwargs)
def get_feature_names(self): return ['coefficient_%d' % i for i in range(self.order + 1)]
[docs]class RMSExtractor(LibrosaFeatureExtractor): ''' Extracts root mean square (RMS) from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.rms.html.''' _feature = 'rms'
[docs]class OnsetDetectExtractor(LibrosaFeatureExtractor): ''' Detects the basic onset (onset_detect) from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.onset.onset_detect.html.''' _feature = 'onset_detect'
[docs]class TempoExtractor(LibrosaFeatureExtractor): ''' Detects the tempo (tempo) from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.beat.tempo.html.''' _feature = 'tempo'
[docs]class BeatTrackExtractor(LibrosaFeatureExtractor): ''' Dynamic programming beat tracker (beat_track) from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.beat.beat_track.html.''' _feature = 'beat_track'
[docs]class OnsetStrengthMultiExtractor(LibrosaFeatureExtractor): '''Computes the spectral flux onset strength envelope across multiple channels (onset_strength_multi) from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.onset.onset_strength_multi.html.''' _feature = 'onset_strength_multi'
[docs]class ZeroCrossingRateExtractor(LibrosaFeatureExtractor): ''' Extracts the zero-crossing rate of audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.zero_crossing_rate.html.''' _feature = 'zero_crossing_rate'
[docs]class ChromaSTFTExtractor(LibrosaFeatureExtractor): ''' Extracts a chromagram from an audio's waveform using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.chroma_stft.html.''' _feature = 'chroma_stft'
[docs] def __init__(self, n_chroma=12, **kwargs): self.n_chroma = n_chroma super().__init__(n_chroma=n_chroma, **kwargs)
def get_feature_names(self): return ['chroma_%d' % i for i in range(self.n_chroma)]
[docs]class ChromaCQTExtractor(LibrosaFeatureExtractor): ''' Extracts a constant-q chromogram from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.chroma_cqt.html.''' _feature = 'chroma_cqt'
[docs] def __init__(self, n_chroma=12, **kwargs): self.n_chroma = n_chroma super().__init__(n_chroma=n_chroma, **kwargs)
def get_feature_names(self): return ['chroma_cqt_%d' % i for i in range(self.n_chroma)]
[docs]class ChromaCENSExtractor(LibrosaFeatureExtractor): ''' Extracts a chroma variant "Chroma Energy Normalized" (CENS) chromogram from audio (via Librosa). For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.chroma_cens.html.''' _feature = 'chroma_cens'
[docs] def __init__(self, n_chroma=12, **kwargs): self.n_chroma = n_chroma super().__init__(n_chroma=n_chroma, **kwargs)
def get_feature_names(self): return ['chroma_cens_%d' % i for i in range(self.n_chroma)]
[docs]class MelspectrogramExtractor(LibrosaFeatureExtractor): ''' Extracts mel-scaled spectrogram from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.melspectrogram.html.''' _feature = 'melspectrogram'
[docs] def __init__(self, n_mels=128, **kwargs): self.n_mels = n_mels super().__init__(n_mels=n_mels, **kwargs)
def get_feature_names(self): return ['mel_%d' % i for i in range(self.n_mels)]
[docs]class MFCCExtractor(LibrosaFeatureExtractor): ''' Extracts Mel Frequency Ceptral Coefficients from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.mfcc.html.''' _feature = 'mfcc'
[docs] def __init__(self, n_mfcc=20, **kwargs): self.n_mfcc = n_mfcc super().__init__(n_mfcc=n_mfcc, **kwargs)
def get_feature_names(self): return ['mfcc_%d' % i for i in range(self.n_mfcc)]
[docs]class TonnetzExtractor(LibrosaFeatureExtractor): ''' Extracts the tonal centroids (tonnetz) from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.tonnetz.html.''' _feature = 'tonnetz' def get_feature_names(self): return ['tonal_centroid_%d' % i for i in range(6)]
[docs]class TempogramExtractor(LibrosaFeatureExtractor): ''' Extracts a tempogram from audio using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.feature.tempogram.html.''' _feature = 'tempogram'
[docs] def __init__(self, win_length=384, **kwargs): self.win_length = win_length super().__init__(win_length=win_length, **kwargs)
def get_feature_names(self): return ['tempo_%d' % i for i in range(self.win_length)]
[docs]class HarmonicExtractor(LibrosaFeatureExtractor): ''' Extracts the harmonic elements from an audio time-series using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.effects.harmonic.html.''' _feature = 'harmonic'
[docs]class PercussiveExtractor(LibrosaFeatureExtractor): ''' Extracts the percussive elements from an audio time-series using the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.effects.percussive.html.''' _feature = 'percussive'
class FundamentalFrequencyExtractor(LibrosaFeatureExtractor): ''' Extracts the fundamental frequency using the YIN algorithm as implemented in the Librosa library. For details on argument specification visit: https://librosa.org/doc/latest/generated/librosa.yin.html.''' _feature = 'yin' def __init__(self, fmin=65, fmax=2093, **kwargs): self.fmin = fmin self.fmax = fmax super().__init__(fmin=fmin, fmax=fmax, **kwargs)
[docs]class AudiosetLabelExtractor(AudioExtractor): ''' Extract probability of 521 audio event classes based on AudioSet corpus using a YAMNet architecture. Code available at: https://github.com/tensorflow/models/tree/master/research/audioset/yamnet Args: hop_size (float): size of the audio segment (in seconds) on which label extraction is performed. top_n (int): specifies how many of the highest label probabilities are returned. If None, all labels (or those passed to the labels argument) are returned. Top_n and labels are mutually exclusive. labels (list): specifies subset of labels for which probabilities are to be returned. If None, all labels (or top_n) are returned. The full list of labels is available in the audioset/yamnet repository (see yamnet_class_map.csv). weights_path (optional): full path to model weights file. If not provided, weights from pretrained YAMNet module are used. yamnet_kwargs (optional): Optional named arguments that modify input parameters for the model (see params.py file in yamnet repository) ''' _log_attributes = ('hop_size', 'top_n', 'labels', 'weights_path', 'yamnet_kwargs')
[docs] def __init__(self, hop_size=0.1, top_n=None, labels=None, weights_path=None, **yamnet_kwargs): verify_dependencies(['tensorflow']) try: sys.path.insert(0, str(YAMNET_PATH)) self.yamnet = attempt_to_import('yamnet') verify_dependencies(['yamnet']) except MissingDependencyError: msg = ('Yamnet could not be imported. To download and set up ' 'yamnet, run:\n\tpython -m pliers.support.setup_yamnet') raise MissingDependencyError(dependencies=None, custom_message=msg) if top_n and labels: raise ValueError('Top_n and labels are mutually exclusive ' 'arguments. Reinstantiate the extractor setting ' 'top_n or labels to None (or leaving it ' 'unspecified).') MODULE_PATH = path.dirname(self.yamnet.__file__) LABELS_PATH = path.join(MODULE_PATH, 'yamnet_class_map.csv') self.weights_path = weights_path or path.join(MODULE_PATH, 'yamnet.h5') self.hop_size = hop_size self.yamnet_kwargs = yamnet_kwargs or {} self.params = self.yamnet.params self.params.PATCH_HOP_SECONDS = hop_size for par, v in self.yamnet_kwargs.items(): setattr(self.params, par, v) if self.params.PATCH_WINDOW_SECONDS != 0.96: logging.warning('Custom values for PATCH_WINDOW_SECONDS were ' 'passed. YAMNet was trained on windows of 0.96s. Different ' 'values might yield unreliable results.') self.top_n = top_n all_labels = pd.read_csv(LABELS_PATH)['display_name'].tolist() if labels is not None: missing = list(set(labels) - set(all_labels)) labels = list(set(labels) & set(all_labels)) if missing: logging.warning(f'Labels {missing} do not exist. Dropping.') self.label_idx, self.labels = zip(*[(i,l) for i,l in enumerate(all_labels) if l in labels]) else: self.labels = all_labels self.label_idx = list(range(len(all_labels))) super().__init__()
def _extract(self, stim): self.params.SAMPLE_RATE = stim.sampling_rate if self.params.SAMPLE_RATE >= 2 * self.params.MEL_MAX_HZ: if self.params.SAMPLE_RATE != 16000: logging.warning( 'The sampling rate of the stimulus is ' f'{self.params.SAMPLE_RATE}Hz. YAMNet was trained on ' ' audio sampled at 16000Hz. This should not impact ' 'predictions, but you can resample the input using ' 'AudioResamplingFilter for full conformity ' 'to training.') if self.params.MEL_MIN_HZ != 125 or self.params.MEL_MAX_HZ != 7500: logging.warning( 'Custom values for MEL_MIN_HZ and MEL_MAX_HZ ' 'were passed. Changing these defaults might affect ' 'model performance.') else: raise ValueError( 'The sampling rate of your stimulus ' f'({self.params.SAMPLE_RATE}Hz) must be at least twice the ' f'value of MEL_MAX_HZ ({self.params.MEL_MAX_HZ}Hz). Upsample' ' your audio stimulus (recommended) or pass a lower value of ' 'MEL_MAX_HZ when initializing the extractor.') model = self.yamnet.yamnet_frames_model(self.params) model.load_weights(self.weights_path) preds, _ = model.predict_on_batch(np.reshape(stim.data, [1,-1])) preds = preds[:,self.label_idx] nr_lab = self.top_n or len(self.labels) idx = np.mean(preds,axis=0).argsort() preds = np.fliplr(preds[:,idx][:,-nr_lab:]) labels = [self.labels[i] for i in idx][-nr_lab:][::-1] hop = self.params.PATCH_HOP_SECONDS window = self.params.PATCH_WINDOW_SECONDS stft_window = self.params.STFT_WINDOW_SECONDS stft_hop = self.params.STFT_HOP_SECONDS dur = window + stft_window - stft_hop onsets = np.arange(start=0, stop=stim.duration - dur, step=hop) return ExtractorResult(preds, stim, self, features=labels, onsets=onsets, durations=[dur]*len(onsets), orders=list(range(len(onsets))))
class MFCCEnergyExtractor(MFCCExtractor): ''' Low-Quefrency and High-Quefrency Mel-Frequency Spectrum extractor. Extracts two auditory features representing broad-spectrum information (timbre) and fine-scale spectral structure (pitch) respectively. Derived from Hanke et al., 2015 (https://doi.org/10.12688/f1000research.6679.1) This extractor maps selected cepstral coefficients back to the spectrum domain by reconstructing the n_mfcc mel-frequency spectrum bands using the low-quefrency and high-quefrency mfcc coefficients respectively. Users can select the top or bottom n_coefs. Non-selected coefficients are zeroed out, and the result is mapped back to spectral domain using inverse DCT (using librosas's mfcc_to_mel function). Args: n_mfcc (int): specifies the number of MFCCs n_coefs (int): cepstrum coefficients to keep in the high/low quefrency spectrum hop_length (int): hop length in number of samples n_mels (int): Dimensionality of mel frequency spectrum to map back to register (str): 'low' or 'high'. Specifies which MFCCs are to be kept. norm (str): Normalization type for DCT dct_type (int): Discrete cosine transform (DCT) type, default is 2. lifter (int): If lifter>0, apply inverse liftering (inverse cepstral filtering) librosa_kwargs (optional): Optional named arguments to pass to librosa ''' _log_attributes = ( 'n_mfcc', 'n_coefs', 'hop_length', 'n_mels', 'register', 'norm','dct_type', 'lifter', 'librosa_kwargs' ) def __init__(self, n_mfcc=48, n_coefs=13, hop_length=1024, n_mels=128, register='low', norm='ortho', dct_type=2, lifter=0, **librosa_kwargs): if register not in ['low', 'high']: raise ValueError('register should \'low\' or \'high\'') if dct_type not in [1, 2, 3]: raise ValueError('dct_type should be 1, 2, or 3') self.n_mfcc = n_mfcc self.n_mels = n_mels self.n_coefs = n_coefs self.hop_length = hop_length self.register = register self.norm = norm self.dct_type = dct_type self.lifter = lifter self.librosa_kwargs = librosa_kwargs super().__init__(n_mfcc=n_mfcc, lifter=lifter, dct_type=dct_type, norm=norm, n_mels=n_mels, **librosa_kwargs) def _get_values(self,stim): vals = super()._get_values(stim) if self.register == 'low': vals[self.n_coefs:] = 0 else: vals[:self.n_coefs] = 0 mels = librosa.feature.inverse.mfcc_to_mel( vals, n_mels=self.n_mels, dct_type=self.dct_type, norm=self.norm, lifter=self.lifter) return mels def get_feature_names(self): return ['mfcc_energy_%d' % i for i in range(self.n_mels)]