# encoding=utf-8
from base64 import b64decode as real_b64decode, b64encode as real_b64encode
import binascii
from collections import namedtuple
from collections.abc import Mapping, Sequence
from datetime import datetime, timezone
from io import RawIOBase
import math
import re
import struct
from ijson import basic_parse as ijson_basic_parse, items as ijson_items, parse as ijson_parse
import iso8601
from logbook import Logger
log = Logger(__name__)
# signal value used when parsing an invalid date
INVALID_DATE = datetime.min.replace(tzinfo=timezone.utc)
# typing is a bit bugged; b64encode uses bytes for both input and output, convert output to str
[docs]
def b64encode(s):
"""
Encodes `bytes` to a `str` using default Base64 encoding.
:param s: `bytes`
:return: *s*, Base64-encoded
:rtype: `str`
"""
return str(real_b64encode(s), 'ascii')
[docs]
def b64decode(s, validate=False):
"""
Decode a Base64-encoded string.
:param s: the string to decode
:param validate: validate the input against the Base64 alphabet
:return: decoded byte string
:rtype: `bytes`
:raise TypeError: when an invalid character is found
"""
if not isinstance(s, bytes):
s = bytes(s, 'ascii')
try:
return real_b64decode(s, validate=validate)
except binascii.Error as e:
raise TypeError(str(e)) from e
def omit_empty(mapping):
"""
Filters mapping to a new dict with non-``None`` values.
:param mapping: dict to filter
:return: dict containing keys and values from mapping that are non-empty
"""
return {key: value for key, value in mapping.items() if value is not None}
def glue_url(*path):
"""
Glues parts of a url path to each other using /s, dropping any steps that
are None.
:param path: steps to join
:return: a url path
"""
return '/'.join(str(step).strip('/ ') for step in path if step is not None) # omit steps that are None
def json_events(fd):
"""
Generates prefixed json parsing events. See also ijson.parse.
:param fd: file-like object to read from
:return: an event generator
"""
return ijson_parse(ijson_basic_parse(fd, use_float=True))
def json_items(events, prefix):
"""
Generates python values parsed from the provided event stream that
conform to prefix. See also ijson.items.
:param events: a json event stream (see json_events)
:param prefix: prefix of elements to generate (dot-separated, use "item"
for array items)
:return: a value generator
"""
return ijson_items(events, prefix)
# binary suffixes for byte sizes
_byte_sizes = ('bytes', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB')
def parse_byte_size(value):
"""
Turns a human-readable byte size into a numeric value.
Suffixes are always interpreted as binary and can take a single or triple
character form (e.g. *m*, *MiB* for megabytes), casing is ignored.
:param value: a human-readable file size
:return: an `int`-representation of *value*
"""
try:
# match '15', '15k', '15KiB', '1.2 M', …
match = re.match(r'^(?P<num>(?:\d*\.)?\d+)\s*(?P<suffix>[a-z]*)$', value, re.IGNORECASE)
num, suffix = match.groups()
num = float(num)
# default order 0 (just bytes)
order = 0
if suffix:
# there was a suffix in value, determine the order by looking at the suffix' index in _byte_sizes
order = next((i for i, size in enumerate(_byte_sizes) if size.lower().startswith(suffix.lower())))
# calculate the byte size as the product of the supplied value and the binary order multiplier
return round(num * (1 << (order * 10)))
except Exception as e:
# any exception (when match of next fails, for example) result in an invalid byte size error
raise ValueError('invalid byte size: {}'.format(value)) from e
def format_datetime(ts):
"""
Coverts a `datetime.datetime` object into ISO-8601 wire format, while
requiring a timezone.
:param ts: the `datetime` to be converted
:return: *ts*, in ISO-8601 format
"""
if not ts.tzinfo:
raise ValueError('datetime without timezone')
return ts.isoformat()
def parse_datetime(value):
"""
Parses an ISO8601-formatted timestamp into a `datetime` object.
Returns `.INVALID_DATE` when an invalid value is passed (e.g. a negative
year component, which is not supported by `datetime`).
:param value: an ISO8601-formatted `str`
:return: a `datetime` object representing the same value as *value*
"""
try:
return iso8601.parse_date(value)
except iso8601.ParseError:
# NB: value "-1234-01-21…" would technically be valid in ISO 8601, though Python's datetime does not allow it
log.warning('failed to parse {} as datetime, defaulting to {}', value, INVALID_DATE, exc_info=True)
return INVALID_DATE
[docs]
class GeographicLocation(namedtuple('GeographicLocation', ('latitude', 'longitude'))):
"""
Describes a 2-tuple *(latitude, longitude)* to translate location data to
and from Hansken's wire format.
"""
_pattern = re.compile(r'^(?P<latitude>[+\-]?\d+\.\d+)(?P<longitude>[+\-]?\d+.\d+)/?$')
[docs]
@classmethod
def from_string(cls, value):
"""
Parses a string value into a `GeographicLocation`.
:param value: a string-representation to be parsed
:return: a value of type `cls`, parsed from *value*
"""
match = cls._pattern.match(value)
if not match:
raise TypeError('invalid {cls.__name__}: {value}'.format(cls=cls, value=value))
# use keyword arguments to instantiate cls (extensions beware :))
return cls(latitude=float(match.group('latitude')), longitude=float(match.group('longitude')))
def __str__(self):
# use zero-padded floating point values with forced sign rounded to a 5 digit fraction
return '{location.latitude:+09.5f}{location.longitude:+010.5f}'.format(location=self)
[docs]
class Vector(Sequence):
"""
Describes a sequence of floating point numbers to translate vector data to
and from Hansken's wire format.
"""
# compile a struct for values in a Vector: big-endian 32 bit float
_struct = struct.Struct('>f')
[docs]
@classmethod
def from_base64(cls, value):
"""
Creates a `.Vector` from *value*, assuming it will be base64-encoded
sequence of big-endian 4-byte IEEE-754 floating points.
:param value: a base64-representation to be parsed
:return: a new `.Vector`
"""
return cls(b64decode(value))
[docs]
@classmethod
def from_sequence(cls, seq):
"""
Creates a `.Vector` from *seq*, assuming it will be a sequence of
elements compatible with the ``f`` struct format character.
:param seq: a sequence of `float`
:return: a new `.Vector`
"""
size = cls._struct.size
# allocate a buffer that would fit the entire sequence
buffer = bytearray(len(seq) * size)
for idx, element in enumerate(seq):
# pack all elements into the pre-allocated buffer
cls._struct.pack_into(buffer, idx * size, element)
# wrap buffer into bytes to make sure the result's _raw attribute is a bytes (required by __bytes__)
return cls(bytes(buffer))
def __init__(self, data):
"""
Creates a new `.Vector` backed by the provided data.
:param data: a byte buffer, aligned to 4 bytes
"""
if len(data) % self._struct.size:
raise TypeError(f'misaligned data, {len(data)} not a multiple of {self._struct.size}')
self._raw = data
def __len__(self):
return len(self._raw) // self._struct.size
def __getitem__(self, item):
size = self._struct.size
if isinstance(item, slice):
# or self[idx], but bypass a bunch of type and size checks
return [self._struct.unpack_from(self._raw, offset=idx * size)[0]
# let range and slice generate the actual indices we need
for idx in range(*item.indices(len(self)))]
if not isinstance(item, int):
# mimic the error list would create
raise TypeError(f'indices should integers or slices, not {item.__class__.__name__}')
if item >= len(self):
# we're responsible for this check (__iter__ relies on this behaviour)
raise IndexError
# unpack a single value from the underlying buffer
# (unpack always returns a tuple, we always need the first and only element)
return self._struct.unpack_from(self._raw, offset=item * size)[0]
def __eq__(self, other):
# define equality as the same type and identical data
return isinstance(other, Vector) and self._raw == other._raw
def __hash__(self):
return hash((13, self._raw)) # avoid a hash collision with the hash of _raw
def __bytes__(self):
return self._raw
def __str__(self):
return b64encode(bytes(self))
def __repr__(self):
return f'{self.__class__.__name__}({list(self)})'
class MultiContext:
"""
Context manager delegating its `__enter__` and `__exit__` methods to the
provided set of context managers.
`__enter__` is called on all delegates in the order they've been supplied.
`Exception` s raised from these calls are collected, if any of the
delegates raised an `Exception`, a `ValueError` is raised with an error
message and the recorded `Exception` instances. `__exit__` is *not* called
in this case.
`__exit__` is called on all delegates in the the reverse order they've been
supplied. The exception detail arguments for `__exit__` are passed to the
delegates verbatim. The return value for `__exit__` will be truthy if any
of the delegate calls return a truthy value (which would cause the
interpreter to suppress the error.
Any delegates raising an `Exception` of their own on `__exit__` produce
undefined behaviour; errors on `__exit__` should be avoided.
Use `.MultiContext` as a context manager:
.. code-block:: python
with MultiContext(manager1, manager2):
do_things()
# note that this is equivalent to
with manager1, manager2:
do_things()
# MultiContext can be used as a return type for multiple context managers
with create_context_managers():
do_things()
"""
def __init__(self, *contexts):
"""
Creates a new `.MultiContext`, delegating its `__enter__` and
`__exit__` calls to the provided delegate context managers.
:param contexts: delegate context managers
"""
self.contexts = contexts
def __enter__(self):
errors = []
for context in self.contexts:
try:
context.__enter__()
except Exception as e:
# warn here to indicate which context failed to enter
log.warn('context {} failed to __enter__', context, e)
errors.append(e)
if errors:
# don't silence errors
raise ValueError('failed to __enter__ MultiContext', *errors)
# always return self to make sure self.__exit__ is called
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# default to falsy return value
suppress = False
for context in reversed(self.contexts):
# allow short circuiting the or, but make sure delegate.__exit__ is called
suppress = context.__exit__(exc_type, exc_val, exc_tb) or suppress
return suppress
class Namespace:
"""
Utility class to create a tiny namespace with some simple attributes.
"""
def __init__(self, **kwargs):
for attr, value in kwargs.items():
setattr(self, attr, value)
def extend(self, **kwargs):
for attr, value in kwargs.items():
setattr(self, attr, value)
class DictView(Mapping):
"""
Dict-like class with separator-aware value retrieval. A dot (.) is used as
the default separator. Values can be retrieved in one of three methods
(the first two of which are like a normal dict):
- calling get: view.get('some.property')
- indexing: view['some.property']
- attribute access: view.some.property
If the retrieved property is a dict, it is wrapped in a DictView to enable
further DictView access methods.
"""
# sentinel value used to check when a KeyError should be raised rather than returning a value
_no_default = object()
def __init__(self, source, separator='.'):
self._source = source
self._separator = separator
def get(self, key, default=None):
try:
value = self._source
for step in key.split(self._separator):
value = value[step]
if isinstance(value, dict):
return DictView(value)
else:
return value
except KeyError:
return default
def __len__(self):
return len(self._source)
def __iter__(self):
return iter(self._source)
def __contains__(self, item):
return self.get(item) is not None
def __getitem__(self, item):
value = self.get(item, self._no_default)
if value is self._no_default:
raise KeyError(item)
else:
return value
def __str__(self):
return str(self._source)
def view_with_attrs(name=None, attrs=()): # default attrs to an empty tuple (no attributes)
"""
Creates a class definition that is an extension of DictView. Attribute
names are cached and used to lazily get them (either as their original
name or the PEP8-style equivalent) from the instance's source when
accessed using __getattr__. For use in interactive shells, __dir__
includes the named attributes translated to PEP8-style names.
:param name: name of the generated view class
:param attrs: attributes to be defined for the requested class
:return: extension of DictView that loads attributes from the provided
source dict, renamed to python's conventions
"""
# transform a sequence of attribute names to a mapping of {name: factory}
# factory None will not touch the value
if not isinstance(attrs, Mapping):
attrs = {attr: None for attr in attrs}
def to_value(value, name):
if value is None:
# avoid handing factories None, None is identical to 'not set'
return value
factory = attrs.get(name)
# let factory transform the value if available
return factory(value) if factory else value
class AttrView(DictView):
def __init__(self, source):
super().__init__(source)
def __getattr__(self, item):
if item in attrs:
# attribute is defined, fetch from parent
return to_value(self.get(item), item)
prop = to_property_name(item)
if prop in attrs:
# translated attribute is defined, fetch the translation from parent
return to_value(self.get(prop), prop)
# no such attribute
raise AttributeError(item)
def __dir__(self):
base = set(super().__dir__())
# extend the set of attrs reported up the mro with those explicitly defined for this AttrView
# NB: the result of dir()/__dir__ is not guaranteed to be 'correct', it is meant for interactive inspection
return sorted(base | {to_attr_name(attr) for attr in attrs})
if name is not None:
AttrView.__name__ = name
return AttrView
[docs]
def to_property_name(name):
"""
Converts a name to a property name by convention (*camelCase*) by
converting *snake_case* to *snakeCase*.
:param name: name to convert
:return: *name* converted to a property name
"""
return re.sub(r'_[a-z]', lambda match: match.group()[1].upper(), name)
[docs]
def to_attr_name(name):
"""
Converts a name to an attribute name by convention (*snake_case*) by
converting *camelCase* to *camel_case*.
:param name: name to convert
:return: *name* converted to a attribute name
"""
return re.sub(r'[a-z][A-Z]', lambda match: '_'.join(match.group().lower()), name).lower()
[docs]
def to_class_name(name):
"""
Converts a name to a class name by convention (*PascalCase*) by converting
*snake_case* to *SnakeCase*.
:param name: name to convert
:return: *name* converted to a class name
"""
name = re.sub(r'_[A-z]', lambda match: match.group()[1].upper(), name)
return name[0].upper() + name[1:]
class ChunkedIO(RawIOBase):
"""
Turns a generator of chunks into raw I/O, which can in turn be fed to
something like an `io.BufferedReader`.
.. code-block:: python
# requesting an imaginary big file, to be sent chunked
response = requests.get('http://big-files.net/big-file.bin',
stream=True) # make sure to request a stream
# using 8KiB bytes chunk size, turn the response into a BufferedReader
reader = io.BufferedReader(
ChunkedResponseIO(response.iter_content(8192))
)
"""
def __init__(self, chunks):
"""
Creates a `.ChunkedIO`, reading chunks of `bytes` from *chunks*.
.. note::
`.ChunkedIO` does no type checking on *chunks* or the chunks it
yields; only `bytes` chunks will likely work as expected.
:param chunks: an iterable yielding `bytes`
"""
self._chunks = chunks
self._current = None
def seekable(self):
return False
def readable(self):
return True
def writable(self):
return False
def readinto(self, b):
if self.closed:
return 0
max_read = len(b)
num_read = 0
try:
# reuse current (partial) chunk or use the next chunk if the current one is None or empty
self._current = self._current or next(self._chunks)
# keep reading till num_read reaches max_read
while num_read < max_read:
# split current chunk into a buffer to be used right now and one for the next call
buf, self._current = self._current[:max_read - num_read], self._current[max_read - num_read:]
# put the data to be used now into b
b[num_read:num_read + len(buf)] = buf
num_read += len(buf)
# reuse current (partial) chunk or use the next chunk if the current one is None or empty
self._current = self._current or next(self._chunks)
except StopIteration:
# self._chunks is exhausted, mark self._current as invalid
self._current = None
return num_read
def close(self):
self._chunks.close()
super().close()
[docs]
def flatten_mapping(mapping, separator='.', prefix=None):
"""
Flattens *mapping* into a single level `dict` by concatenating nested keys.
:param mapping: the mapping to be flattened
:param separator: separator to be used to concatenate nested keys
:param prefix: prefix for all keys in the nested result (typically only
useful for recursive calls)
:return: *mapping*, flattened into a single level `dict`
"""
# collect dict items (tuples) in a list
items = []
for key, value in mapping.items():
if prefix:
# update the key to be used if there's a prefix
key = separator.join((prefix, key))
try:
# attempt recursive flattening (…)
items.extend(flatten_mapping(value, prefix=key).items())
except AttributeError:
# (…) not a mapping, apparently, just track this item
items.append((key, value))
# return a flattened version of mapping
return dict(items)