Source code for hansken.trace

# encoding=utf-8

from collections import namedtuple
from collections.abc import Mapping
from enum import Enum, unique
from itertools import chain
import json
import warnings

from logbook import Logger

from hansken import fetch
from hansken.abstract_trace import AbstractTrace
from hansken.query import Term, to_query
from hansken.util import (b64decode, b64encode,
                          DictView, flatten_mapping,
                          format_datetime, GeographicLocation, INVALID_DATE,
                          parse_datetime, to_attr_name, Vector, view_with_attrs)


log = Logger(__name__)


_converter = namedtuple('_converter', ('serialize', 'deserialize'))


# collection of wire / python type converters by model type
CONVERTERS = {
    # convert binary to / from base64
    'binary': _converter(
        b64encode,
        b64decode,
    ),
    # convert dates to / from datetime.datetime objects (*requiring* a timezone)
    'date': _converter(
        format_datetime,
        parse_datetime,
    ),
    # convert latLong to / from GeographicLocation
    'latLong': _converter(
        str,  # trust we'll hit GeographicLocation.__str__ or coerce a str to itself
        GeographicLocation.from_string,
    ),
    # convert vector to / from Vector
    'vector': _converter(
        str,  # trust we'll hit Vector.__str__ or coerce a str to itself
        Vector.from_base64,
    ),
}


def image_from(trace_uid):
    warnings.warn('image_from is deprecated, use image_from_uid', DeprecationWarning)
    return image_from_uid(trace_uid)


[docs] def image_from_uid(trace_uid): """ Splits *trace_uid* into its two parts, *image* and *id*, returning the first. Note that a `.Trace` object will provide these as properties `image_id` and `id`. :param str trace_uid: an image uid :return: the image UUID from *trace_uid* :rtype: `str` """ return trace_uid.split(':', 1)[0]
[docs] def image_from_trace(trace): """ Attempts to get an image id from *trace*, whether *trace* is a `.Trace` object or `dict`-like. :param trace: a `.Trace` or `dict`-like trace :return: the image UUID from *trace* :rtype: `str` """ if hasattr(trace, 'image_id'): return trace.image_id if 'image' in trace: return trace.get('image') if 'uid' in trace: return image_from_uid(trace.get('uid')) raise ValueError('unable to get image id from trace {}'.format(trace))
[docs] @unique class Privileged(Enum): """ Possible privileged states of a `.Trace`. Values that correspond to 'not privileged' (`None` or `.rejected`) are falsy, making them suitable to check whether a trace is privileged. """ suspected = 'suspected' #: trace is suspected of being privileged confirmed = 'confirmed' #: trace is confirmed to be privileged rejected = 'rejected' #: trace is confirmed to be *not* privileged def __bool__(self): # rejected value should be falsy (see class doc) return self is not Privileged.rejected def __eq__(self, other): if isinstance(other, str): # enable comparisons with str values return str(self) == other return super().__eq__(other) def __str__(self): return self.value
[docs] class Snippet(DictView): """ Snippet result, enabling rendering of a highlighted snippet of text content. Usable as a dictionary where key ``'content'`` contains a snippet of text and key ``'highlights'`` contains a list of dictionaries encoding highlighted terms in the content. """
[docs] def render(self, start='[[', end=']]'): """ Render this snippet by surrounding highlights with *start* and *end* marker strings, e.g.: .. code-block:: python >>> my_snippet.render() 'A [[snippet]] with the term "[[snippet]]" highlighted.' >>> my_snippet.render(start='<em>', end='</em>') 'A <em>snippet</em> with the term "<em>snippet</em>" highlighted.' :param start: start marker around highlights :param end: end marker around highlights :return: this `.Snippet`, highlighted as a `str` """ content = self.get('content') highlights = self.get('highlights') # gather chunks of text to be joined together later chunks = [] # start at text offset 0 prev_end = hl_end = 0 for highlight in highlights: hl_start, hl_end = highlight['start'], highlight['end'] # for each highlight, add content from last highlight up to current start (…) chunks.append(content[prev_end:hl_start]) # (…) then a highlight start marker (…) chunks.append(start) # (…) followed by the actual highlight content (…) chunks.append(content[hl_start:hl_end]) # (…) and a highlight end marker chunks.append(end) prev_end = hl_end # add a final chunk from the end of the last highlight to the end of the content string chunks.append(content[hl_end:]) return ''.join(chunks)
def __str__(self): return self.get('content')
[docs] class Trace(AbstractTrace): """ Base class for traces. Defines convenience methods to navigate or manipulate a trace. Trace data may be accessed using `open <.Trace.open>`. """ # value separating sequence numbers in trace ids (e.g. 0-1-23) ID_SEP = '-' # signal value for lazy post-init values _UNINITIALIZED = object() def __init__(self, source, context=None): super().__init__(source) # a super will likely also set these attributes, but we need these for operation without any super classes # retrieve the values from source as we're only interested in intrinsics, self.get() inspects the trace model self.image_id = source.get('image') # this will get an intrinsic property, leaving the image type as member self.id = source.get('id') self.uid = source.get('uid') self.name = source.get('name') # set parent identifiers if applicable (id/uid might not be available or trace is a root) self.parent_id = self.id[:self.id.rindex(Trace.ID_SEP)] if self.id and Trace.ID_SEP in self.id else None self.parent_uid = self.uid[:self.uid.rindex(Trace.ID_SEP)] if self.uid and Trace.ID_SEP in self.uid else None # default to an empty set, 'type' in trace.types should be possible # note that a modeled trace class below would override this self.types = self.get('types', set()) # invocations of the update method are counted, see Trace.update method self._update_count = 0 self._tags = None self._notes = None self._privileged = Trace._UNINITIALIZED # None is a valid value, use signal value self._audits = None self._context = context self._project_id = context.project_id if context else None @property def context(self): """ The `ProjectContext <hansken.remote.ProjectContext>` instance that created this `.Trace`. """ if self._context: return self._context else: raise ValueError('project context for trace {} not set'.format(self.uid)) @property def image_name(self): """ The name / description of this `.Trace`'s ``image_id``, or ``None``. """ return self.context.image_name(self.image_id) if self.image_id else None @property def parent(self): """ This `.Trace`' parent `.Trace`, or ``None`` if not applicable. """ return self.context.trace(self.parent_uid) if self.parent_uid else None
[docs] def note(self, note, refresh=None): """ Add a note to this `.Trace`. :param str note: the note itself :param refresh: if `True`, force a full project refresh, making this note immediately searchable """ self.context.note(self.uid, note, refresh=refresh)
@property def notes(self): """ The notes attached to this `.Trace`. Note that this does not include notes added by `note <.Trace.note>`. """ if self._notes is None: # TODO: wrap with something that allows to delete a note (HANSKEN-2247) self._notes = self.get('user.annotated.#note') or () return self._notes
[docs] def tag(self, tag, refresh=None): """ Tag this trace. :param str tag: the tag to set :param refresh: if `True`, force a full project refresh, making this note immediately searchable """ self.context.tag(self.uid, tag, refresh=refresh)
@property def tags(self): """ The tags attached to this `.Trace`. Note that this does not include tags added by `tag <.Trace.tag>`. """ if self._tags is None: # TODO: wrap with something that allows to delete a tag (HANSKEN-2247) self._tags = self.get('user.annotated.tags') or () return self._tags @property def privileged(self): """ The privileged state of this trace, either `None` or one of `.Privileged`. Note that `None` is not a valid value when *setting* the `.privileged` attribute, an operation that requires authorization. """ if self._privileged is Trace._UNINITIALIZED: self._privileged = self.get('user.annotated.privileged', None) if self._privileged: # value has been set, turn it into a Privileged try: self._privileged = Privileged(self._privileged) except ValueError as e: # 'reset' the attribute as if freshly constructed self._privileged = Trace._UNINITIALIZED raise ValueError( 'unknown privileged state: {}'.format(self.get('user.annotated.privileged')) ) from e return self._privileged @privileged.setter def privileged(self, status): status = Privileged(status) self.context.mark_privileged(self.uid, status) self._privileged = status @property def creator(self): """ The tool that created this `.Trace`, or ``None`` if unknown. Includes the version of that tool, e.g.: ``toolname 1.2.3``. .. note:: This value is formatted by ``hansken.py``, it is not suitable for use with queries (like finding other traces created by the same tool). """ # attempt to retrieve new-style creator metadata, fall back to old-style metadata tool = self.get('system.processed.origin.createdBy') or self.get('system.processed.tool.meta.creator') if tool: # provide the creator of this trace including its version return '{} {}'.format(tool, self.tool_versions.get(tool) or '(unknown version)') @property def tool_versions(self): """ The tools and versions that are responsible for this `.Trace`'s metadata, as a `dict` mapping the names of tools to their respective versions. Tool versions typically include the versions of critical software libraries used by those tools. """ # attempt to retrieve new-style tool information toolruns = self.get('system.processed.toolrun') if toolruns: # property should be returning {tool name: tool version}, take version from last occurrence return {run.get('tool'): run.get('version') for run in toolruns} # fall back to old tool metadata (but never None) return self.get('system.processed.tool.meta.version') or {} @property def audits(self): """ An audit log of user-initiated changes to this `.Trace` in the form of a sequence of `dict`s, ordered by the audit's creation timestamp. The audit log can be empty, but never `None`. """ if self._audits is None: # default to empty sequence self._audits = self.get('system.processed.audit') or () # createdOn should always be set, but don't crash when it's not available self._audits = sorted(self._audits, key=lambda audit: audit.get('createdOn') or INVALID_DATE) return self._audits
[docs] def tracelets(self, tracelet_type, query=None, sort=None): """ Provides or retrieves tracelets of type *type*. The exact return type of a call to `.tracelets` depends on the tracelet type being requested. If the remote defines *type* to be 'few', the result will be a `list` of `.Tracelet` objects. If the remote defines *type* to be 'many', the result will be a `.SearchResult` of `.Tracelet` objects. Note that *query* can only be used with the latter. :param tracelet_type: the tracelet type to request :param query: query to match tracelets to :param sort: ordering of tracelets :return: a sized iterable of `.Tracelet` s (iterable once) """ trace_uid_query = Term('traceUid', self.uid) # ensure we're searching for tracelets belonging to this trace query = to_query(query) & trace_uid_query if query else trace_uid_query return self.context.search_tracelets(tracelet_type, query=query, sort=sort)
@property def children(self): """ A `SearchResult <hansken.remote.SearchResult>` instance containing the child traces of this `.Trace`, if any. """ return self.context.children(self.uid) @property def data_types(self): """ A set of data type names available for this `.Trace`. These names can be used with calls to `open <.Trace.open>` or attribute access like .. code-block:: python if 'raw' in trace.data_types: # trace has a raw data stream, attribute access to data.raw.size will be safe print('raw data size:', trace.data.raw.size) for data_type in trace.data_types: # format a file name as the trace's name, using the data type name as the extension # (e.g. "some-file.raw" or "another-file.text") out_file = '{}.{}'.format(trace.name, data_type) print('writing first 64 bytes to', out_file) with open(out_file, 'wb') as out_file: # out_file now opened for writing in binary mode # write the first 64 bytes of trace's stream of type data_type to the file out_file.write(trace.open(data_type, size=64).read()) :return: data type names available for this `.Trace` (possibly empty, but never `None`) :rtype: `set` """ if hasattr(self, 'data'): # likely a modeled trace return set(self.data.keys()) # get a view on the extracted data type (possibly empty or None) (…) data = self.get('system.extracted.data') # (…) and always return a set return set(data.keys() if data else [])
[docs] def open(self, stream='raw', offset=0, size=None, key=fetch): """ Open a data stream of a named stream (default ``raw``) for this `.Trace`. .. note:: Multiple calls to `read(num_bytes)` on the stream resulting from this call works fine in Python 3, but will fail in Python 2. :param stream: stream to read :param offset: byte offset to start the stream on :param size: the number of bytes to make available :param key: key for the image of this trace (default is to fetch the key automatically, if it's available) :return: a file-like object to read bytes from the named stream :rtype: `io.BufferedReader` """ return self.context.data(self.uid, stream, offset, size, key)
[docs] def descriptor(self, stream='raw', key=fetch): """ Retrieve the data descriptor for a named stream (default ``raw``) for this `.Trace`. :param stream: stream to get the descriptor for :param key: key for the image of this trace (default is to fetch the key automatically, if it's available) :return: the stream's data descriptor (as-is) """ return self.context.descriptor(self.uid, stream, key)
@property def preview_types(self): """ A set of preview type names (mime types) available for this `.Trace`. These names can be used with calls to `preview <.Trace.preview>`. :return: preview type names available for this `.Trace` (possibly empty, but never `None`) :rtype: `set` """ previews = self.get('previews') return set(previews.keys() if previews else [])
[docs] def preview(self, mime_type): """ Gets a preview of a particular mime type, e.g. 'text/plain' or 'image/png'. :param mime_type: the preview type to get :return: `bytes` or `None` """ previews = self.get('previews') if previews and mime_type in previews: return b64decode(previews.get(mime_type)) else: # no previews or no such mime type in previews return None
[docs] def snippets(self, query, num=100, before=200, after=200): """ Generate snippets surrounding term hits from *query* in any of the data streams of this trace. :param query: the query to generate snippets for (should contain term queries, or no snippets will be generated) :param num: maximum number of snippets to return :param before: number of bytes to include before the term hits :param after: number of bytes to include after the term hits :return: `list` of `.Snippet` instances """ with self.context.search(query=Term('uid', self.uid) & to_query(query), count=1, snippets=num) as result: snippets = result.takeone(include='snippets') if snippets: # takeone() is safe, unpacking a potential None is not _, snippets = snippets if not snippets: # either no results or no snippets for the only result return [] # gather unique terms from al of the generated snippets terms = list({snippet['term'] for snippet in snippets}) image_key = self.context.key(self.image_id) image_key = b64encode(image_key) if image_key else None if hasattr(self, 'data'): # likely a modeled trace, let get() deal with data origins and categories data_sizes = {data_type: self.get('data.{}.size'.format(data_type)) for data_type in self.data_types} else: # plain trace, assume data property path to be in origin system data_sizes = {data_type: self.get('system.extracted.data.{}.size'.format(data_type)) for data_type in self.data_types} # transform all the snippets into snippet requests, using all the returned terms for each request snippets = [ {'uid': self.uid, 'imageKey': image_key, 'dataType': snippet['dataType'], 'highlights': terms, # request data from *around* the term hit # offset parameters are validated, clip to stream bounds 'start': max(0, snippet['start'] - before), 'end': min(snippet['end'] + after, data_sizes[snippet['dataType']])} for snippet in snippets ] return self.context.snippets(*snippets)
[docs] def update(self, key_or_updates=None, value=None, data=None, overwrite=False): """ Requests the remote to update or add metadata properties for this `.Trace`. .. note:: Calls to `update` will *not* update the source of the `.Trace` it's being called on. To get a `.Trace` instance including the changes made after a successful call to `update`, use ``trace.context.trace(trace.uid)`` to request a new instance of a trace with this `.Trace`'s identifier. Please note that, for performance reasons, all changes are buffered and not directly effective in subsequent search, update and import requests. As a consequence, successive changes to a single trace might be ignored. Instead, all changes to an individual trace should be bundled in a single update or import request. The project index is refreshed automatically (by default every 30 seconds), so changes will become visible eventually. :param key_or_updates: either a `str` (the metadata property to be updated) or a mapping supplying both keys and values to be updated (or `None` if only data is supplied) :param value: the value to update metadata property *key* to (used only when *key_or_updates* is a `str`) :param data: a `dict` mapping data type / stream name to bytes to be imported :param overwrite: whether properties to be imported should be overwritten if already present :return: processing information from remote """ # count invocations of the update method self._update_count += 1 if self._update_count > 1: log.warn("This trace ({}) is being updated more than once. This could result in lost updates, please " "supply a mapping with all batched updates instead of single keys and values.", self.id) updates = key_or_updates if isinstance(key_or_updates, str): updates = {key_or_updates: value} return self.context.update_trace(self, updates, data=data, overwrite=overwrite)
[docs] def child_builder(self, name=None): """ Create a `.TraceBuilder` to build a trace to be saved as a child of this `.Trace`. Note that ``name`` is a mandatory property for a trace, even though it is optional here. A ``name`` can be added later using the `.TraceBuilder.update` method. Furthermore, a new trace will only be added to the index once explicitly saved (e.g. through `.TraceBuilder.build`). :param name: the name for the trace being built :return: a `.TraceBuilder` set up to create a child trace of this `.Trace` """ builder = self.context.child_builder(self.uid) if name: # optionally add a name for the new child trace builder.update('name', name) return builder
def __repr__(self): return '<{0.__class__.__module__}.{0.__class__.__name__} {0.uid} ({0.name})>'.format(self)
class IncompleteTracePropertyError(ValueError): """ A `ValueError` raised when a trace property is missing required parts. """ pass
[docs] class TraceModel(DictView): """ Utility to deal with intricacies surrounding the trace / data model used by Hansken. Used by ``hansken.py`` to translate and validate user-specified metadata properties to their corresponding place in the data structure for a trace in Hansken. """ def __init__(self, source): super().__init__(source) self._intrinsics = set(self.get('properties', default=[])) self._origins = tuple(sorted(self.get('origins.keys', default={}).keys())) # create a mapping tracking the category of a type, for efficiency later self._categorized_types = {type_name: category_name for category_name, category in self.get('origins.categories').items() for type_name in category.get('types', default={}).keys()} self._categorized_properties = {property_name: category_name for category_name, category in self.get('origins.categories').items() for property_name in category.get('properties', default={}).keys()} # NB: it's possible that there's categories without defined types, _categorized_types.values() would skip those self._categories = set(self.get('origins.categories', default={}).keys()) self._types = set(self._categorized_types.keys()) # create a mapping tracking the data types for mapped types self._mapped_types = {type_name: set(type_details.get('keys')) for category_name, category in self.get('origins.categories').items() for type_name, type_details in category.get('types', default={}).items() # type is mapped if its 'keys' property is non-empty if type_details.get('keys')} self._data_types = set(self.get('origins.categories.extracted.types.data.keys', [])) @property def intrinsics(self): """ The intrinsic properties (properties that any trace can have, regardless of its type(s)) defined by the trace model. """ return self._intrinsics
[docs] def is_intrinsic(self, steps): """ Checks whether the property defined by *steps* is an intrinsic property. :param steps: steps through a `.Trace`' data structure :return: whether the property defined by *steps* is an intrinsic property """ return steps[0] in self.intrinsics
@property def origins(self): """ The origins defined by the trace model, typically *system* and *user*. """ return self._origins @property def categories(self): """ The categories of types and properties defined by the trace model, e.g. *extracted* or *annotated*. """ return self._categories @property def types(self): """ The trace types defined by the trace model, e.g. *file* or *classification*. """ return self._types @property def data_types(self): """ Data named data types defined by the trace model for the "data" trace type. """ return self._data_types def _expand_from_type(self, category, type_name, rest): # category and type name are already provided, lead with those yield category yield type_name try: data_types = self._mapped_types.get(type_name, set()) if data_types: # mapped type, verify the data type is valid for the type (e.g. data.raw) data_type = next(rest) if data_type in data_types: yield data_type else: raise KeyError('unknown data type "{}" for trace type "{}"'.format(data_type, type_name)) # verify the property name is valid for the type type_properties = self.get('origins.categories.{}.types.{}.properties'.format(category, type_name), default={}) property_name = next(rest) # retrieve property details or trigger a KeyError property_details = type_properties[property_name] yield property_name if property_details.get('isMap', default=False): # provide map key for mapped properties (e.g. email.headers.transfer-encoding) # glue all remaining steps for the map property together (but require at least one) yield self._required_map_key(rest) except StopIteration: # expected another element from next(rest) raise IncompleteTracePropertyError() @staticmethod def _required_map_key(steps): # join the remaining / available steps on a dot # this results in 'transfer-encoding' if that's the only remaining elements # this results in 'nsrl.os.product' if there's three remaining elements 'nsrl', 'os' and 'product' map_key = '.'.join(steps) if not map_key: # empty string, signal an 'incomplete' trace property raise IncompleteTracePropertyError() return map_key
[docs] def expand(self, name): """ Expands a trace property to 'steps' through a nested data structure. Inserts a properties category if unspecified, does *not* include an origin. :param name: the property name to expand, excluding an origin :return: a `tuple` of 'steps' :raise ValueError: when a provided *name* is not defined by the trace model or is missing required parts """ try: if '.' not in name: # either an intrinsic property or categorized property without its category if name in self.intrinsics: if self.get('properties.{}.isMap'.format(name), default=False): # no dot, missing map key for mapped property (e.g. previews) raise IncompleteTracePropertyError() # return intrinsic property as a one-tuple (e.g. name) return name, # categorized property without the specified category (e.g. tags) return self._categorized_properties[name], name # split the provided name into steps through the model steps = iter(name.split('.')) step = next(steps) if step in self.intrinsics: # mapped intrinsic property (e.g. previews.text/plain) return step, self._required_map_key(steps) if step in self.categories: category = step step = next(steps) if step in self._categorized_properties: if self._categorized_properties[step] != category: raise ValueError('unknown trace property: "{}"'.format(name)) # mapped categorized property (e.g. category.misc.something) if self.get('origins.categories.{}.properties.{}.isMap'.format(category, step), default=False): # glue all remaining steps for the map property together (but require at least one) return category, step, self._required_map_key(steps) # regular categorized property (e.g. annotated.tags) return category, step # categorized but not a property, must be a categorized type property return tuple(self._expand_from_type(category=category, type_name=step, rest=steps)) if step in self.types: # type without category, find corresponding category and expand from there category = self._categorized_types[step] return tuple(self._expand_from_type(category=category, type_name=step, rest=steps)) except (StopIteration, IncompleteTracePropertyError): # expected another step, raise new error to include full property name in error message raise IncompleteTracePropertyError('incomplete trace property: "{}"'.format(name)) except KeyError: raise ValueError('unknown trace property: "{}"'.format(name)) else: raise ValueError('unknown trace property: "{}"'.format(name))
[docs] def get_serializer(self, steps): try: if self.is_intrinsic(steps): converter = CONVERTERS.get(self.get('properties.{}.type'.format(steps[0]))) return converter.serialize if converter else None model_type = self.get('origins.categories.{}.properties.{}.type'.format(*steps[:2])) if model_type: converter = CONVERTERS.get(model_type) return converter.serialize if converter else None if steps[1] in self._mapped_types and steps[2] in self._mapped_types[steps[1]]: # steps models a mapped type property, steps[2] is the data type name, omit it to retrieve model type model_steps = steps[0], steps[1], steps[3] else: # steps models a regular type property, model type to be retrieved with the first 3 steps model_steps = steps[:3] model_type = self.get('origins.categories.{}.types.{}.properties.{}.type'.format(*model_steps)) if model_type: converter = CONVERTERS.get(model_type) return converter.serialize if converter else None except IndexError: raise ValueError('unknown trace property: "{}"'.format('.'.join(steps))) else: raise ValueError('unknown trace property: "{}"'.format('.'.join(steps)))
[docs] class TraceBuilder(DictView): """ Utility class to aid in creating user-defined traces or updating existing ones. A `.TraceBuilder` is a trace model aware view on a nested mapping, using the trace model to both validate requested updates and finding the correct spot for values in the nested mapping. This class is not intended for direct user instantiation, see - `.Trace.child_builder` - `.ProjectContext.child_builder` - `.TraceBuilder.child_builder` """ def __init__(self, model, source=None, target=None, context=None, debug=False): super().__init__(source or {}) self._model = model self._updates = set() self._data = {} self._context = context # mark debug as a 'public' value, enable user to change it later self.debug = debug self._uid = None if isinstance(target, TraceBuilder): # target 'parent' is also a TraceBuilder, defer resolving target arguments to a later time self._project_id = self._parent_uid = None self._parent_builder = target elif target: # explicit target, no parent builder self._project_id, self._parent_uid = target self._parent_builder = None else: # no known target, no issue yet, maybe user will provide target later self._project_id = self._parent_uid = self._parent_builder = None
[docs] def update(self, key_or_updates, value=None): """ Add or overwrite new metadata properties to this builder. *key_or_updates* can mix dotted properties and nested structures, all keys and values are merged before applying updates. A `.TraceModel` is used to find the proper fully qualified property names if needed, allowing both e.g. ``update('file.name', 'File Name')`` and ``update({'extracted': {'file': {'name': 'file name'}}})``. :param key_or_updates: either a `str` (the metadata property to be updated) or a mapping supplying both keys and values to be updated (or `None` if only data is supplied) :param value: the value to update metadata property *key* to (used only when *key_or_updates* is a `str`) :return: this `.TraceBuilder` """ # merge two ways of calling this method into a single data structure updates = key_or_updates if isinstance(key_or_updates, str): updates = {key_or_updates: value} # collect all updates in flattened form for key, value in flatten_mapping(updates).items(): # let the trace model expand the key to be updated (will crash for unknown properties) steps = self._model.expand(key) path, last_step = steps[:-1], steps[-1] # enter the user-specified realm for non-intrinsic properties current = self._source if not self._model.is_intrinsic(steps): current = current.setdefault('user', {}) # walk through the source to arrive at the last stop for step in path: current = current.setdefault(step, {}) # find a serializer and set the actual value where's it's supposed to go serializer = self._model.get_serializer(steps) current[last_step] = serializer(value) if serializer else value # track that we've applied an update for the steps just taken self._updates.add('.'.join(steps)) return self
[docs] def add_data(self, stream, data): """ Add data to this trace as a named stream. :param stream: name of the data stream to be added :param data: data to be attached :return: this `.TraceBuilder` """ if stream not in self._model.data_types: log.warn('data type stream name "{}" not defined by model, ' 'trace being constructed might be invalid', stream) self._data[stream] = data return self
@property def updates(self): """ A collection of updates tracked by this `.TraceBuilder`. """ # create a deterministic view on the updates this builder has applied to its source return tuple(sorted(self._updates)) @property def context(self): """ The `ProjectContext <hansken.remote.ProjectContext>` instance that created this `.TraceBuilder`. """ if self._context: return self._context else: raise ValueError('project context for trace builder not set') @property def target(self): """ The combination of *(project id, parent trace uid)* this `.TraceBuilder` applies to. """ if self._project_id and self._parent_uid: return self._project_id, self._parent_uid elif self._parent_builder is not None: # NB: an empty parent builder is falsy if not self._parent_builder._uid: raise ValueError('parent builder has no uid (yet), missing builder target parameters') # use parent's project id and parent's uid as the target for this builder return self._parent_builder._project_id, self._parent_builder._uid else: raise ValueError('builder target missing project id and/or parent trace uid')
[docs] def child_builder(self, name=None): """ Creates a new `.TraceBuilder` to build a child trace to the trace to be represented by this builder. .. note:: Parent `.TraceBuilder`s should be built using the `.build()` call *before* their child builders as the unique trace identifier (uid) for the parent is needed to build a child trace. :param name: name of the new child trace :return: a `.TraceBuilder` set up to save a new trace as the child trace of this builder """ # create an instance of our own type, supply all the arguments the child builder needs builder = type(self)(model=self._model, target=self, context=self._context, debug=self.debug) if name: # optionally add a name for the new child trace builder.update('name', name) return builder
[docs] def build(self): """ Save the trace being built by this builder to remote. .. note:: If this `.TraceBuilder` was put in debug mode, the trace is *not* sent to remote but is instead logged at warning level. :return: the new trace' uid (or `None` in debug mode) """ if self.debug: def value_path(key): if self._model.is_intrinsic(self._model.expand(key)): return key else: return 'user.{}'.format(key) log.warning( '{builder.__class__.__module__}.{builder.__class__.__name__} in debug mode, not creating with with ' 'parent {parent}; properties: {updates}; data: {data}', builder=self, parent=self.target[1], updates=', '.join(self.updates), data=', '.join( '{}: {} bytes'.format(name, len(stream)) for name, stream in sorted(self._data.items()) ) or '(no data)' ) # collect update values only, apply the inverse of update's handling of intrinsic / user values # (updates are collected without the origin prefix) update_info = {key: self.get(key) for key in map(value_path, self.updates)} # mimic a REST trace, log the trace in JSON format (value serialization is already done by update) update_info = json.dumps(update_info, ensure_ascii=False, indent=2) log.debug('trace not created: \n{trace}', trace=update_info) return None else: if self._uid: # avoid creating duplicates from the same builder raise ValueError('builder was already built (resulting uid: {})'.format(self._uid)) log.debug( 'submitting {builder.__class__.__module__}.{builder.__class__.__name__} as a new trace with ' 'parent {parent}; properties: {updates}; data: {data}', builder=self, parent=self.target[1], updates=', '.join(self.updates), data=', '.join( '{}: {} bytes'.format(name, len(stream)) for name, stream in sorted(self._data.items()) ) or '(no data)' ) self._uid = self.context.connection.create_trace(*self.target, self, data=self._data) return self._uid
def value_converter_for(property_details): """ Creates a trace value converter for a provided property value definition. Key "type" in *property_details* primarily defines the conversion. The following trace value types are converted: - *date*: a `datetime.datetime` object with timezone - *binary*: a `bytes` object If *property_details* states that the the value will be a sequence, mapping or mapping to sequences, the created converter will take this into account. :param property_details: a property definition dict :return: a callable that converts a wire format to a python type, or ``None`` (no conversion needed or supported) """ def sequence_converter(converter): # transform the sequence of raw values to a sequence of transformed values return lambda values: [converter(value) for value in values] def mapping_converter(converter): # transform the mapped values to an equivalent dict of converted values with the same mapping return lambda values: {key: converter(value) for key, value in values.items()} # converter for value type defined in the trace model converter = CONVERTERS.get(property_details.get('type')) converter = converter.deserialize if converter else None if converter: # the value has a converter, check to see if we'll be getting sequences or mappings # NB: this means that a mapping of lists is possible ({'key': ['value', 'value', …]})! if property_details.get('isList'): converter = sequence_converter(converter) if property_details.get('isMap'): converter = mapping_converter(converter) # whether still None or constructed to be something fancy, converter is now what it needs to be return converter def trace_types(model): """ Reduces a trace model dict to the bare essentials used to define the properties of a trace, e.g.: .. code-block:: python {('extracted', 'file'): {'name': None, # name is a str, no conversion needed 'createdOn': iso8601.parse_date}} :param model: the full trace model received from remote :return: a mapping that maps a trace type's category and name to its properties defined in the model along with a converter for the property's type (or ``None``), again as a mapping """ essentials = {} for category, types in model['origins']['categories'].items(): # (some categories can be empty) if types: # get the name of the trace type and whatever the trace model defines for that type for type_name, trace_type in types['types'].items(): # reduce the details for all the trace type's properties to just its type properties = {prop: value_converter_for(details) for prop, details in trace_type['properties'].items()} essentials[(category, type_name)] = properties return essentials def expand_types(model, *type_names): """ Creates a `list` of property names for the requested trace types, obtained from the provided model. Resulting property names are formatted as ``<trace_type>.<property_name>``, e.g. ``'file.createdOn'``. :param model: the model to read types and properties from :param type_names: the names of the types to be expanded :return: a `list` of typed property names """ names = [] model_types = model['origins']['categories']['extracted']['types'] for type_name in type_names: # generate property names including their type names (sorted to force deterministic behaviour) names.extend(sorted('{}.{}'.format(type_name, prop) for prop in model_types[type_name]['properties'].keys())) return names class TraceTypeView(DictView): """ Utility class to turn a trace's type into a prefixed view on the trace's source dict. """ def __init__(self, source, category, name, origins=('system', 'user'), converters=None): """ Create a new trace type view. :param source: trace source :param category: category where the named type resides :param name: name of the trace type :param origins: origins where the trace type could be sourced from, ordered by priority :param converters: mapping of property names to optional type converter (a `callable`) """ super().__init__(source) self._category = category self._name = name self._origins = origins self._converters = converters or {} def get(self, key, default=None): for origin in self._origins: # see if there's a value for <origin>.<category>.<name>.<key> value = super().get(self._separator.join((origin, self._category, self._name, key)), default=self._no_default) if value is not self._no_default: # found a valid value converter = self._converters.get(key) return converter(value) if converter else value # no origin has a value for key in source return default def keys(self): keys = set() for origin in self._origins: # retrieve trace type for each origin origin_type = super().get(self._separator.join((origin, self._category, self._name))) if isinstance(origin_type, Mapping): # create a union of keys in each origin, if available keys.update(origin_type.keys()) return keys def __iter__(self): # self._source is not rooted at the trace type, override to use keys at the view point return iter(self.keys()) def __len__(self): # self._source is not rooted at the trace type, override to use keys at the view point return len(self.keys()) def __str__(self): # NB: comprehensions create their own scope, causing zero-argument super() to fail inside them, create a # super-proxy inside the __str__ def to be able to use it inside the comprehension parent = super() # super's __str__ would return a str() of the entire source # create a str of all the chunks of data that would be available through get(), keyed by the chunk/origin's name return str({origin: parent.get('.'.join((origin, self._category, self._name))) for origin in self._origins}) def __repr__(self): return '<{0.__class__.__module__}.{0.__class__.__name__} ({0._name})>'.format(self) class TraceletTypeView(DictView): """ Utility class to wrap a tracelet value that takes type conversions into account. """ def __init__(self, source, category, name, converters=None): """ Create a new tracelet type view. :param source: tracelet source :param category: category where the named type resides :param name: name of the trace type :param converters: mapping of property names to optional type converter (a `callable`) """ super().__init__(source) self._category = category self._name = name self._converters = converters or {} def get(self, key, default=None): # no origins or path trickery to apply, _source is the tracelet itself value = super().get(key, default=self._no_default) if value is not self._no_default: converter = self._converters.get(key) return converter(value) if converter else value else: return default def trace_class_from_model(model): """ Creates a Trace class definition from the provided model. Both intrinsic properties and the properties defined for each type are automatically defined for the class and will be None should they not be present on the actual trace instance. The constructor for the resulting class accepts a single argument, being a dict as received from remote, with properties defined by model. See `.DictView` for access patterns for instances of the resulting class. :param model: the full trace model received from remote :return: a class definition for a Trace defined by model """ # read intrinsic properties from model intrinsics = {prop: value_converter_for(details) for prop, details in model['properties'].items()} # create an unnamed view class from the intrinsic properties (used as a super for TraceView, setting intrinsics) intrinsic_view = view_with_attrs(attrs=intrinsics) # determine the origins defined by the model # NB: current client and server implementations don't care about the order defined here, this may change in the # future™, sorted here to ensure deterministic behaviour origins = sorted(model['origins']['keys'].keys()) categories = model['origins']['categories'].keys() # create {(category, name) → {prop_name → converter}} mapping from the model model_types = trace_types(model) # create a reverse lookup to find the category of a type types = {name: category for category, name in model_types.keys()} # determine plural and mapped trace types plural_types = set() mapped_types = set() for category, type_name in model_types.keys(): trace_type = model['origins']['categories'][category]['types'][type_name] # plural-type defined by few/many cardinality or (legacy) isList: true if trace_type.get('cardinality') in ('few', 'many') or trace_type.get('isList'): plural_types.add((category, type_name)) # map-typed defined by "keys" being non-empty if trace_type.get('keys'): mapped_types.add((category, type_name)) # create a trace type class for each singular / simple or mapped trace type defined by the model using type: # - new type is named alike extracted.text_message (builtin type's module "abc" is prefixed in __repr__) # - new type inherits from # - TraceTypeView, able to get() values from a trace's deeply nested source dict # - an AttrView, making attribute access easier, calling get() (defined by TraceTypeView) to get the respective # values # - define no additional class variables type_classes = {(category, type_name): type('.'.join(map(to_attr_name, (category, type_name))), (TraceTypeView, view_with_attrs(type_name, properties.keys())), {}) for (category, type_name), properties in model_types.items() if (category, type_name) not in plural_types} # analogous to the singular trace types, create Python types for the plural trace types, only difference being the # TraceletTypeView over the TraceTypeView, that doesn't insert multi-origin trickery into the get() method (as # tracelet values don't support deep paths anyway, there's a list index in the way somewhere) # these types don't get the full trace source as their source, but just the tracelet value (see get() below) type_classes.update({(category, type_name): type('.'.join(map(to_attr_name, (category, type_name))), (TraceletTypeView, view_with_attrs(type_name, properties.keys())), {}) for (category, type_name), properties in model_types.items() if (category, type_name) in plural_types}) # NB: the order of super classes is significant here, Trace needs the context argument, intrinsic_view (an # AttrView) won't like it, method resolution order will make Trace.__init__ call intrinsic_view.__init__ # instead of DictView.__init__ class TraceView(Trace, intrinsic_view): def __init__(self, source, context=None): super().__init__(source, context) # collect categorized types defined for this trace, regardless of origin types = set() for origin in origins: origin = self.get(origin) or {} for category_name, category in origin.items(): types.update({(category_name, type_name) for type_name in category.keys() # drop any non-defined (category, type) combo's (like (annotated, tags)) if (category_name, type_name) in model_types.keys()}) # set trace type attributes to self for category_name, type_name in types: type_class = type_classes[(category_name, type_name)] if (category_name, type_name) in mapped_types: # gather the keys (typically data types) defined for this trace type in any origin keys = set(chain.from_iterable((self.get('.'.join((origin, category_name, type_name))) or {}).keys() for origin in origins)) type_attr = view_with_attrs(to_attr_name(type_name), keys) setattr(self, to_attr_name(type_name), type_attr({ # include key name to type name to make TraceTypeView construct the right selection key key: type_class(source, category_name, '.'.join((type_name, key)), origins, converters=model_types.get((category_name, type_name))) for key in keys })) elif (category_name, type_name) in plural_types: # plural types don't automatically get an attribute pass else: setattr(self, to_attr_name(type_name), type_class(source, category_name, type_name, origins, converters=model_types.get((category_name, type_name)))) # assign a set of type names to self self.types = {type_name for _, type_name in types} def get(self, key, default=None): # override DictView.get with a model-aware variant, allowing # 1. get('origin.category.type.property') # 2. get('category.type.property') # 3. get('type.property') # while not breaking # get('origin.category.type'), get('category.type'), get('category'), get('type'), get('origin.category'), # get('origin'), get('origin.type') steps = key.split(self._separator) candidate_origins = origins if steps[0] in origins and len(steps) >= 4: # explicit option 1, strip the origin from steps to hit conditions below while restricting the origins # that need to be checked candidate_origins = [steps.pop(0)] if steps[0] in types: # first key is a known type, insert its category (types is {name → category}) and move on steps.insert(0, types.get(steps[0])) if steps[0] in categories: # (new) first step is a category, return first available <origin>.<category>.<type>.<rest> # (depending on code above, there may be only one origin that needs to be checked) for origin in candidate_origins: value = super().get(self._separator.join([origin] + steps), default=self._no_default) if value is not self._no_default: # value is available, figure out if it needs a type conversion modeled = model_types.get((steps[0], steps[1])) if modeled and len(steps) >= 3: # determine the step that contains the actual property name # (1 index further along for mapped types) property_step = steps[2 if (steps[0], steps[1]) not in mapped_types else 3] converter = modeled.get(property_step) if converter: # converter defined for value, apply return converter(value) if modeled and len(steps) == 2 and (steps[0], steps[1]) in plural_types: # explicitly getting a plural type (on-trace / few), wrap with a view that does not take # origins into account, accessing the local dict directly type_class = type_classes[(steps[0], steps[1])] return [type_class(element, steps[0], steps[1], modeled) for element in value] return value # either option 1 or non-understandable or non-model key, use default implementation return super().get(key, default) type_doc = """Trace type attributes added from model: {}. Trace type attributes are defined when a trace has that particular trace type. Check for this with `'type_name' in trace.types`.""".format( ', '.join(to_attr_name(name) for _, name in model_types.keys()) ) TraceView.__doc__ = '{}\n\n{}'.format(Trace.__doc__, type_doc) return TraceView