Source code for pliers.stimuli.base

''' Base class for all Stims and associated functionality. '''

from abc import ABCMeta, abstractmethod
from os.path import isdir, join, basename, realpath, isfile
from glob import glob
from urllib.request import urlopen
from urllib.parse import urlparse
from collections import namedtuple
from contextlib import contextmanager
import os
import tempfile
import base64

import pandas as pd

from pliers import config
from pliers.utils import isiterable


class Stim(metaclass=ABCMeta):

    ''' Base class for all classes in the Stim hierarchy.

    Args:
        filename (str): Path to input file, if one exists.
        onset (float): Optional onset of the Stim (in seconds) with
            respect to some more general context or timeline the user wishes
            to keep track of.
        duration (float): Optional duration of the Stim, in seconds.
        order (int): Optional order of Stim within some broader context.
        name (str): Optional name to give the Stim instance. If None is
            provided, the name will be derived from the filename if one is
            defined. If no filename is defined, name will be an empty string.
    '''

    def __init__(self, filename=None, onset=None, duration=None, order=None,
                 name=None, url=None):

        self.filename = filename
        self.onset = onset
        self.duration = duration
        self.order = order
        self._history = None
        self.url = url

        if name is None:
            name = '' if self.filename is None else basename(self.filename)
        self.name = name

    @abstractmethod
    def save(self, path):
        pass

    @contextmanager
    def get_filename(self):
        ''' Return the source filename of the current Stim. '''
        if self.filename is None or not os.path.exists(self.filename):
            tf = tempfile.mktemp() + self._default_file_extension
            self.save(tf)
            yield tf
            os.remove(tf)
        else:
            yield self.filename

    @property
    def history(self):
        ''' Return stimulus history. '''
        return self._history

    @history.setter
    def history(self, history):
        self._history = history

    def __hash__(self):
        return hash((self.filename, self.name, self.onset, self.duration,
                     self.order, self.history))


def _get_stim_class(name):
    # Takes a lowercase variable name (e.g., 'video' or 'complex_text') and
    # attempts to map it to a valid pliers Stim class (e.g., VideoStim or
    # ComplexTextStim).
    name = name.lower().replace('_', '')

    if not name.endswith('stim'):
        name += 'stim'

    import pliers
    stims = pliers.stimuli.__all__

    for a in stims:
        if a.lower() == name.lower():
            return getattr(pliers.stimuli, a)

    raise KeyError("No Stim class matches '%s' (case-insensitive)." % name)


[docs]def load_stims(source, dtype=None, fail_silently=False): """ Load one or more stimuli directly from file, inferring/extracting metadata as needed. Args: source (str or list): The location to load the stim(s) from. Can be the path to a directory, to a single file, or a list of filenames. dtype (str): The type of stim to load. If dtype is None, relies on the filename extension for guidance. If dtype is provided, must be one of 'video', 'image', 'audio', or 'text'. fail_silently (bool): If True do not raise error when trying to load a missing stimulus from a list of sources. Returns: A list of Stims. """ from .video import VideoStim, ImageStim from .audio import AudioStim from .text import TextStim if isinstance(source, str): return_list = False source = [source] else: return_list = True stims = [] stim_map = { 'image': ImageStim, 'video': VideoStim, 'text': TextStim, 'audio': AudioStim } def load_file(source): source = realpath(source) import magic # requires libmagic, so import here mime = magic.from_file(source, mime=True) if not isinstance(mime, str): mime = mime.decode('utf-8') mime = mime.split('/')[0] if mime in stim_map.keys(): s = stim_map[mime](source) stims.append(s) def load_url(source): try: main_type = urlopen(source).info().get_content_maintype() # Py3 except: main_type = urlopen(source).info().getmaintype() # Py2 if main_type in stim_map.keys(): s = stim_map[main_type](url=source) stims.append(s) for s in source: if bool(urlparse(s).scheme): load_url(s) elif isdir(s): for f in glob(join(s, '*')): if isfile(f): load_file(f) elif isfile(s): load_file(s) else: if not (return_list and fail_silently): raise OSError("File not found") if return_list: return stims return stims[0]
def _get_bytestring(stim, encoding='utf-8'): if stim._bytestring is None: with stim.get_filename() as filename: with open(filename, 'rb') as f: data = f.read() stim._bytestring = base64.b64encode(data).decode(encoding=encoding) return stim._bytestring def _log_transformation(source, result, trans=None, implicit=False): if result is None or not config.get_option('log_transformations') or \ (trans is not None and not trans._loggable): return result if isiterable(result): return (_log_transformation(source, r, trans) for r in result) # Converters are no longer restricted to Stim inputs, so ensure name and # filename are set. name = getattr(source, 'name', None) filename = getattr(source, 'filename', None) values = [name, filename, source.__class__.__name__] if isinstance(result, Stim): values.extend([result.name, result.filename]) else: values.extend(['', '']) values.append(result.__class__.__name__) if trans is not None: values.append(trans.__class__.__name__) tr_attrs = [getattr(trans, attr) for attr in trans._log_attributes] values.append(str(dict(zip(trans._log_attributes, tr_attrs)))) else: values.append(['', '']) parent = source.history string = str(parent) if parent else values[2] string += '->{}/{}'.format(values[6], values[5]) values.extend([string, parent]) values.append(implicit) result.history = TransformationLog(*values) return result _trans_log = namedtuple('TransformationLog', "source_name source_file " + "source_class result_name result_file result_class " + " transformer_class transformer_params string " + "parent implicit") class TransformationLog(_trans_log): '''A namedtuple that stores information about a single transformation. ''' __slots__ = () def __str__(self): return self.string def to_df(self): def _append_row(rows, history): rows.append(history[:-3]) if history.parent: _append_row(rows, history.parent) return rows rows = _append_row([], self)[::-1] return pd.DataFrame(rows, columns=self._fields[:-3])