Source code for pliers.extractors.api.clarifai

'''
Extractors that interact with the Clarifai API.
'''

import logging
import os
from contextlib import ExitStack

import pandas as pd

from pliers.extractors.image import ImageExtractor
from pliers.extractors.video import VideoExtractor
from pliers.extractors.base import ExtractorResult
from pliers.transformers import BatchTransformerMixin
from pliers.transformers.api import APITransformer
from pliers.utils import listify, attempt_to_import, verify_dependencies


clarifai_channel = attempt_to_import('clarifai_grpc.channel.clarifai_channel', 'clarifai_channel',
                                    ['ClarifaiChannel'])

clarifai_api = attempt_to_import('clarifai_grpc.grpc.api', 'clarifai_api', ['resources_pb2, service_pb2, service_pb2_grpc'])

class ClarifaiAPIExtractor(APITransformer):

    ''' Uses the Clarifai API to extract tags of visual stimuli.

    Args:
        api_key (str): A valid API_KEY for the Clarifai API. Only needs to be
            passed the first time the extractor is initialized.
        model (str): The name of the Clarifai model to use. If None, defaults
            to the general image tagger.
        min_value (float): A value between 0.0 and 1.0 indicating the minimum
            confidence required to return a prediction. Defaults to 0.0.
        max_concepts (int): A value between 0 and 200 indicating the maximum
            number of label predictions returned.
        select_concepts (list): List of concepts (strings) to query from the
            API. For example, ['food', 'animal'].
        rate_limit (int): The minimum number of seconds required between
            transform calls on this Transformer.
    '''

    _log_attributes = ('access_token', 'model_name', 'min_value',
                       'max_concepts', 'select_concepts')
    _env_keys = ('CLARIFAI_ACCESS_TOKEN', 'CLARIFAI_APP_ID', 'CLARIFAI_USER_ID')
    VERSION = '1.0'

    def __init__(self, access_token=None, user_id=None, app_id=None, model='general-image-recognition', min_value=None,
                 max_concepts=None, select_concepts=None, rate_limit=None,
                 batch_size=None):
        verify_dependencies(['clarifai_channel', 'clarifai_api'])
        if access_token is None:
            try:
                access_token = os.environ['CLARIFAI_ACCESS_TOKEN']
            except KeyError:
                raise ValueError("A valid Clarifai API ACCESS_TOKEN "
                                 "must be passed the first time a Clarifai "
                                 "extractor is initialized.")

        if user_id is None:
            try:
                user_id = os.environ['CLARIFAI_USER_ID']
            except KeyError:
                raise ValueError("A valid Clarifai API CLARIFAI_USER_ID "
                                 "must be passed the first time a Clarifai "
                                 "extractor is initialized.")

        if app_id is None:
            try:
                app_id = os.environ['CLARIFAI_APP_ID']
            except KeyError:
                raise ValueError("A valid Clarifai API CLARIFAI_APP_ID "
                                 "must be passed the first time a Clarifai "
                                 "extractor is initialized.")

        self.access_token = access_token
        self.api = clarifai_api.service_pb2_grpc.V2Stub(clarifai_channel.ClarifaiChannel.get_grpc_channel())
        self.metadata =  (('authorization', 'Key ' + access_token),)
        self.user_id= user_id
        self.app_id = app_id

        self.model_name = model
        self.application_id = None
        self.min_value = min_value #NA 
        self.max_concepts = max_concepts
        self.select_concepts = select_concepts
        if select_concepts:
            select_concepts = listify(select_concepts)
            self.select_concepts = [clarifai_api.resources_pb2.Concept(name=n)
                                    for n in select_concepts]
        super().__init__(rate_limit=rate_limit)

    @property
    def api_keys(self):
        return [self.access_token]

    def check_valid_keys(self):
        return None

    def _query_api(self, objects):
        verify_dependencies(['clarifai_api'])
        model_options = None
        if self.select_concepts or self.max_concepts or self.min_value:
            model_options = clarifai_api.resources_pb2.Model(
            output_info=clarifai_api.resources_pb2.OutputInfo(
                output_config=clarifai_api.resources_pb2.OutputConfig(
                    select_concepts=self.select_concepts,
                    min_value = self.min_value,
                    max_concepts = self.max_concepts
                )
            )
        )
        request = clarifai_api.service_pb2.PostModelOutputsRequest(
            model_id=self.model_name,
            user_app_id=clarifai_api.resources_pb2.UserAppIDSet(user_id=self.user_id, app_id=self.app_id),
            inputs=objects,
            model=model_options
        )
        response = self.api.PostModelOutputs(request, metadata=self.metadata)
        return response.outputs

    def _parse_annotations(self, annotation, handle_annotations=None):
        """
        Parse outputs from a clarifai extraction.

        Args:
            handle_annotations (str): How returned face annotations should be
                handled in cases where there are multiple faces.
                'first' indicates to only use the first face JSON object, all
                other values will default to including every face.
        """
        # check whether the model is the face detection model
        if self.model_name == 'face-detection':

            # if a face was detected, get at least the boundaries
            if annotation.data:
                # if specified, only return first face
                if handle_annotations == 'first':
                    annotation = [annotation.data.regions[0]]
                # else collate all faces into a multi-row dataframe
                face_results = []
                for i, d in enumerate(annotation.data.regions):
                    data_dict = {}
                    for k, v in d.region_info.bounding_box.ListFields():
                        data_dict[k.name] = v

                    for tag in d.data.concepts:
                        data_dict[tag.name] = tag.value
                    
                    face_results.append(data_dict)
                return face_results
            # return an empty dict if there was no face
        else:
            data_dict = {}
            for tag in annotation.data.concepts:
                data_dict[tag.name] = tag.value
            return data_dict


[docs]class ClarifaiAPIImageExtractor(ClarifaiAPIExtractor, BatchTransformerMixin, ImageExtractor): ''' Uses the Clarifai API to extract tags of images. Args: api_key (str): A valid API_KEY for the Clarifai API. Only needs to be passed the first time the extractor is initialized. model (str): The name of the Clarifai model to use. If None, defaults to the general image tagger. min_value (float): A value between 0.0 and 1.0 indicating the minimum confidence required to return a prediction. Defaults to 0.0. max_concepts (int): A value between 0 and 200 indicating the maximum number of label predictions returned. select_concepts (list): List of concepts (strings) to query from the API. For example, ['food', 'animal']. rate_limit (int): The minimum number of seconds required between transform calls on this Transformer. batch_size (int): Number of stims to send per batched API request. ''' _batch_size = 32
[docs] def __init__(self, access_token=None, user_id=None, app_id=None, model='general-image-recognition', min_value=None, max_concepts=None, select_concepts=None, rate_limit=None, batch_size=None): super().__init__(access_token=access_token, user_id=user_id, app_id=app_id, model=model, min_value=min_value, max_concepts=max_concepts, select_concepts=select_concepts, rate_limit=rate_limit, batch_size=batch_size)
def _extract(self, stims): verify_dependencies(['clarifai_api']) # ExitStack lets us use filename context managers simultaneously with ExitStack() as stack: imgs = [] for s in stims: if s.url: image=clarifai_api.resources_pb2.Image(url=s.url) else: f_name = stack.enter_context(s.get_filename()) with open(f_name, "rb") as f: file_bytes = f.read() image = clarifai_api.resources_pb2.Image( base64=file_bytes ) image = clarifai_api.resources_pb2.Input( data=clarifai_api.resources_pb2.Data( image=image ) ) imgs.append(image) outputs = self._query_api(imgs) extractions = [] for i, resp in enumerate(outputs): extractions.append(ExtractorResult(resp, stims[i], self)) return extractions def _to_df(self, result): if self.model_name == 'face-detection': # is a list already, no need to wrap it in one return pd.DataFrame(self._parse_annotations(result._data)) return pd.DataFrame([self._parse_annotations(result._data)])
[docs]class ClarifaiAPIVideoExtractor(ClarifaiAPIExtractor, VideoExtractor): ''' Uses the Clarifai API to extract tags from videos. Args: api_key (str): A valid API_KEY for the Clarifai API. Only needs to be passed the first time the extractor is initialized. model (str): The name of the Clarifai model to use. If None, defaults to the general image tagger. min_value (float): A value between 0.0 and 1.0 indicating the minimum confidence required to return a prediction. Defaults to 0.0. max_concepts (int): A value between 0 and 200 indicating the maximum number of label predictions returned. select_concepts (list): List of concepts (strings) to query from the API. For example, ['food', 'animal']. rate_limit (int): The minimum number of seconds required between transform calls on this Transformer. batch_size (int): Number of stims to send per batched API request. ''' def _extract(self, stim): verify_dependencies(['clarifai_api']) with stim.get_filename() as filename: with open(filename, "rb") as f: file_bytes = f.read() vids = [clarifai_api.resources_pb2.Input( data=clarifai_api.resources_pb2.Data( video=clarifai_api.resources_pb2.Video(base64=file_bytes) ) )] outputs = self._query_api(vids) return ExtractorResult(outputs, stim, self) def _to_df(self, result): onsets = [] durations = [] data = [] frames = result._data[0].data.frames for i, frame_res in enumerate(frames): tmp_res = self._parse_annotations(frame_res) # if we detect multiple faces, the parsed annotation can be multi-line if type(tmp_res) == list: for d in tmp_res: data.append(d) onset = frame_res.frame_info.time / 1000.0 if (i + 1) == len(frames): end = result.stim.duration else: end = frames[i + 1].frame_info.time / 1000.0 onsets.append(onset) durations.append(max([end - onset, 0])) result._onsets = onsets result._durations = durations df = pd.DataFrame(data) result.features = list(df.columns) else: data.append(tmp_res) onset = frame_res.frame_info.time / 1000.0 if (i + 1) == len(frames): end = result.stim.duration else: end = frames[i+1].frame_info.time / 1000.0 onsets.append(onset) # NOTE: As of Clarifai API v2 and client library 2.6.1, the API # returns more frames than it should—at least for some videos. # E.g., given a 5.5 second clip, it may return 7 frames, with the # last beginning at 6000 ms. Since this appears to be a problem on # the Clarifai end, and it's not actually clear how they're getting # this imaginary frame (I'm guessing it's the very last frame?), # we're not going to do anything about it here, except to make sure # that durations aren't negative. durations.append(max([end - onset, 0])) result._onsets = onsets result._durations = durations df = pd.DataFrame(data) result.features = list(df.columns) return df