Source code for glinda.content

import logging

from ietfparse import algorithms, datastructures, errors, headers
from tornado import web, escape


LOGGER = logging.getLogger(__name__)


class _ContentHandler(object):
    """
    Translate between dictionaries and bytes.

    Instances of this class translate between request and response
    objects represented as dictionaries and the byte strings that
    Tornado wants.  The translation is implemented using callable
    hooks that handle bytes and/or strings.  If a byte-related hook
    is available, then it is used; otherwise, the string-related hook
    is used and the translation between bytes and strings is handled
    inside of the method.

    """

    def __init__(self, content_type):
        super(_ContentHandler, self).__init__()
        self.content_type = content_type
        self.dict_to_string = None
        self.string_to_dict = None
        self.dict_to_bytes = None
        self.bytes_to_dict = None
        self.default_encoding = None

    def unpack_bytes(self, obj_bytes, encoding=None):
        """Unpack a byte stream into a dictionary."""
        assert self.bytes_to_dict or self.string_to_dict
        encoding = encoding or self.default_encoding
        LOGGER.debug('%r decoding %d bytes with encoding of %s',
                     self, len(obj_bytes), encoding)
        if self.bytes_to_dict:
            return escape.recursive_unicode(self.bytes_to_dict(obj_bytes))
        return self.string_to_dict(obj_bytes.decode(encoding))

    def pack_bytes(self, obj_dict, encoding=None):
        """Pack a dictionary into a byte stream."""
        assert self.dict_to_bytes or self.dict_to_string
        encoding = encoding or self.default_encoding or 'utf-8'
        LOGGER.debug('%r encoding dict with encoding %s', self, encoding)
        if self.dict_to_bytes:
            return None, self.dict_to_bytes(obj_dict)
        try:
            return encoding, self.dict_to_string(obj_dict).encode(encoding)
        except LookupError as error:
            raise web.HTTPError(
                406, 'failed to encode result %r', error,
                reason='target charset {0} not found'.format(encoding))
        except UnicodeEncodeError as error:
            LOGGER.warning('failed to encode text as %s - %s, trying utf-8',
                           encoding, str(error))
            return 'utf-8', self.dict_to_string(obj_dict).encode('utf-8')

    def __repr__(self):
        return '<{}.{} for {} unpacks {}, packs {}>'.format(
            self.__module__, self.__class__.__name__,
            self.content_type,
            'binary' if self.bytes_to_dict else 'str',
            'binary' if self.dict_to_bytes else 'str',
        )


_content_handlers = {}
_content_types = {}


[docs]def register_text_type(content_type, default_encoding, dumper, loader): """ Register handling for a text-based content type. :param str content_type: content type to register the hooks for :param str default_encoding: encoding to use if none is present in the request :param dumper: called to decode a string into a dictionary. Calling convention: ``dumper(obj_dict).encode(encoding) -> bytes`` :param loader: called to encode a dictionary to a string. Calling convention: ``loader(obj_bytes.decode(encoding)) -> dict`` The decoding of a text content body takes into account decoding the binary request body into a string before calling the underlying dump/load routines. """ content_type = headers.parse_content_type(content_type) content_type.parameters.clear() key = str(content_type) _content_types[key] = content_type handler = _content_handlers.setdefault(key, _ContentHandler(key)) handler.dict_to_string = dumper handler.string_to_dict = loader handler.default_encoding = default_encoding or handler.default_encoding
[docs]def register_binary_type(content_type, dumper, loader): """ Register handling for a binary content type. :param str content_type: content type to register the hooks for :param dumper: called to decode bytes into a dictionary. Calling convention: ``dumper(obj_dict) -> bytes``. :param loader: called to encode a dictionary into a byte string. Calling convention: ``loader(obj_bytes) -> dict`` """ content_type = headers.parse_content_type(content_type) content_type.parameters.clear() key = str(content_type) _content_types[key] = content_type handler = _content_handlers.setdefault(key, _ContentHandler(key)) handler.dict_to_bytes = dumper handler.bytes_to_dict = loader
[docs]def clear_handlers(): """Clears registered type handlers.""" _content_handlers.clear() _content_types.clear()
[docs]class HandlerMixin(object): """ Mix this in over ``RequestHandler`` to enable content handling. """ def __init__(self, *args, **kwargs): super(HandlerMixin, self).__init__(*args, **kwargs) self._request_body = None @property def registered_content_types(self): """Yields the currently registered content types in some order.""" for content_type in _content_types.keys(): yield content_type
[docs] def get_request_body(self): """ Decodes the request body and returns it. :return: the decoded request body as a :class:`dict` instance. :raises: :class:`tornado.web.HTTPError` if the body cannot be decoded (415) or if decoding fails (400) """ if self._request_body is None: content_type_str = self.request.headers.get( 'Content-Type', 'application/octet-stream') LOGGER.debug('decoding request body of type %s', content_type_str) content_type = headers.parse_content_type(content_type_str) try: selected, requested = algorithms.select_content_type( [content_type], _content_types.values()) except errors.NoMatch: raise web.HTTPError( 415, 'cannot decoded content type %s', content_type_str, reason='Unexpected content type') handler = _content_handlers[str(selected)] try: self._request_body = handler.unpack_bytes( self.request.body, encoding=content_type.parameters.get('charset'), ) except ValueError as error: raise web.HTTPError( 400, 'failed to decode content body - %r', error, reason='Content body decode failure') return self._request_body
[docs] def send_response(self, response_dict): """ Encode a response according to the request. :param dict response_dict: the response to send :raises: :class:`tornado.web.HTTPError` if no acceptable content type exists This method will encode `response_dict` using the most appropriate encoder based on the :mailheader:`Accept` request header and the available encoders. The result is written to the client by calling ``self.write`` after setting the response content type using ``self.set_header``. """ accept = headers.parse_http_accept_header( self.request.headers.get('Accept', '*/*')) try: selected, _ = algorithms.select_content_type( accept, _content_types.values()) except errors.NoMatch: raise web.HTTPError(406, 'no acceptable content type for %s in %r', accept, _content_types.values(), reason='Content Type Not Acceptable') LOGGER.debug('selected %s as outgoing content type', selected) handler = _content_handlers[str(selected)] accept = self.request.headers.get('Accept-Charset', '*') charsets = headers.parse_accept_charset(accept) charset = charsets[0] if charsets[0] != '*' else None LOGGER.debug('encoding response body using %r with encoding %s', handler, charset) encoding, response_bytes = handler.pack_bytes(response_dict, encoding=charset) if encoding: # don't overwrite the value in _content_types copied = datastructures.ContentType(selected.content_type, selected.content_subtype, selected.parameters) copied.parameters['charset'] = encoding selected = copied self.set_header('Content-Type', str(selected)) self.write(response_bytes)