Examples
To show how hansken.py
can be used to automate various tasks, the code samples below implement a few example tasks in varying orders of complexity.
Execute simple query
Using a particular project as a starting point for a script enables an investigator to submit queries to the Hansken REST API and interact with the resulting traces as regular python objects.
By using hansken.tool.run
, we can make hansken.py
worry about command line arguments, authentication and creating a ProjectContext
for us.
hansken.tool.run()
makes it very easy to run a particular script in the context of a project.
The function to be used as ‘the code to run’ is required to be callable using only a single keyword argument name context:
# this won't work, an_arg is required, hansken.tool.run won't supply it
def my_code(an_arg, context):
print('i got an arg', an_arg)
# this will work, an_arg has a default, hansken.tool.run can call this
# using only argument context
def my_code(context, an_arg='default'):
print('i got an arg', an_arg)
An added bonus of using hansken.tool.run()
is that the script will automatically get a ‘help page’,
available by adding --help
to the command line when executing the script.
Manually creating a ProjectContext
is possible, but more work (and lazy is better, of course).
# import hansken.py's command line entry point to take care of command line
# parsing and creating a ProjectContext object for us
from hansken.tool import run
def biggest_files(context, num_files=5):
"""
Queries the Hansken REST API at base_url for all files, sorted by size and
subsequently prints information about the biggest files in the project.
:param context: a ProjectContext instance to interact with
:param num_files: the max number of files to to print
"""
# use our ProjectContext object as a context manager, automatically
# closing the context (and logging us out) when we're done
with context:
# search for files in the specified project (the project id for the
# search request is filled in by context), sort by property
# data.raw.size in descending order
files = context.search('type:file', sort='data.raw.size-')
# retrieve metadata for num_files results, the num_files biggest files
# note that iterating files would work as well, though it would iterate
# over all the file traces in the project
big_files = files.take(num_files)
# enumerate the big files to print a ranking
for spot, trace in enumerate(big_files, start=1):
print('{}. {}: {} bytes'.format(
spot,
# trace objects have attributes defined by the Hansken data
# model, we know we're dealing with files, so trace.file.path
# will surely exist
trace.file.path,
# note that it is possible to get a file without a raw data
# stream; the line below will raise an AttributeError when
# that happens (we'll leave it up to the user whether to
# explicitly check for the existence of a stream of type
# raw or not (using 'raw' in trace.data_types))
trace.data.raw.size
))
if __name__ == '__main__':
# this is all we need, run() takes care of reading the command line,
# taking out all the bits we'll need and creating a ProjectContext to
# be used with biggest_files
run(with_context=biggest_files)
Note that the query used here, type:file
, could also be submitted as a Term
query object, using Term('type', 'file')
.
List available projects
The regular starting point for a script (ProjectContext
) requires one to know the id of a project.
Under normal circumstances, an investigator will know what projects he or she is working on.
While a bit more cumbersome, a Connection
object can be used to list all available projects.
The following example uses manual command line argument ‘parsing’, bypassing hansken.tool.run()
.
While perfectly valid Python code, this will often be more complicated in the long run.
import sys
# import the Connection object, a simple REST client providing JSON-decoded
# values from the Hansken gatekeeper
from hansken.remote import Connection
def list_projects(base_url, properties=('id', 'name')):
"""
Queries the Hansken gatekeeper at base_url for all available projects and
prints the requested properties of those projects to standard out.
:param base_url: the location of the Hansken gatekeeper
:param properties: the properties to print
"""
# use a new Connection object as a context manager, automatically closing
# the connection when we're done (username and password are ignored by
# Connection if either of them are None, the default for both here)
with Connection(base_url) as connection:
# retrieve projects from the gatekeeper
projects = connection.projects()
# loop over all available projects (note that this means there is no
# output if there are no available projects)
for project in projects:
# project is a simple dict, Connection doesn't do any conversion
# other than deserializing the JSON response
# create a list of the values corresponding to the requested
# properties (use .get(key) to not crash on properties that don't
# exist)
values = ['{}={}'.format(key, project.get(key)) for key in properties]
# print the values we've just formatted from project
print(', '.join(values))
if __name__ == '__main__':
args = sys.argv[1:]
if len(args) != 1:
sys.exit('usage: ENDPOINT')
list_projects(args[0])
The above script could be adapted to make use of hansken.tool.run()
, although not by passing a with_context
argument; that won’t work without a project id.
See the with_admin
argument to hansken.tool.run()
for this, as an Admin
is also a Connection
object.
Read PNG details from trace data
While Hansken will extract a lot of information from a lot of different file types, some details might be omitted or not supported. A PNG file, for example, contains information on the bit depth and color type used for the file in its header block. The script below extracts this information from the PNG header block by reading raw data from Hansken and retrieving the corresponding bytes.
from hansken.tool import run
def png_bit_depth_from_trace(trace):
"""
Reads data from trace and reads the PNG bit depth and color type from the
IHDR chunk.
:param trace: the trace to read data from
:return: a two-tuple (bit depth, color type)
"""
# read only the first 64 bytes, we're assuming the IHDR chunk is contained in that
# use stream as a context manager to automatically close it, even in the
# case of an (assertion) error
with trace.open(size=64) as stream:
buf = stream.read(8 + 13 + 12)
# first 8 bytes are header, check for ascii PNG
header = buf[:8]
assert header[1:4] == b'PNG'
# next chunk must be of type IHDR and have 13 bytes of data + 12 bytes of chunk junk
ihdr_chunk = buf[8:8 + 13 + 12]
assert ihdr_chunk[4:8] == b'IHDR'
ihdr_data = ihdr_chunk[8:21]
# bit depth is the 9th byte, color type is the 10th byte
return ihdr_data[8], ihdr_data[9]
def png_info(context):
"""
Retrieves the first 100 PNG files from a project through a search request
and prints the color type and bit depth of those pictures as specified in
their IHDR chunk.
:param context: a ProjectContext instance to interact with
"""
# use our ProjectContext object as a context manager, automatically
# closing the context (and logging us out) when we're done
with context:
# query the REST API for all files ending in .png
pngs = context.search('type:picture file.name:*.png')
# search result is a one-off iterable, so we can't use [:100] indexing
# use take(100) to get (at most) the first 100 to avoid iterating over
# *all* the pngs in the project
pngs = pngs.take(100)
for trace in pngs:
try:
# get the bit depth and color type from the trace (function
# will open a data stream for the trace and read the bytes it
# needs)
bit_depth, color_type = png_bit_depth_from_trace(trace)
except:
# we choose to not care about the actual error, we won't know
# the bit depth and color type
bit_depth = color_type = 'unknown'
print('{}: color type={}, bit depth={}'.format(
trace.file.path,
color_type,
bit_depth
))
if __name__ == '__main__':
# again, run() takes care of creating a ProjectContext for us, calling
# png_info with it
run(with_context=png_info)
Calculate the size of a folder
Hansken won’t be able to answer any and all questions an investigator might have.
It might however be able to deliver a lot of the information needed to come up with an answer to the question.
One such question might be the total size of a particular folder on a file system; how much data is contained within that folder?
The script below ‘walks’ the tree of traces to sum up data sizes of traces contained in other traces up to a certain depth.
Note that the ‘and’ query is constructed using the binary and operator (&
) with two Term
instances.
It also uses hansken.tool.run()
in another way, passing in an ArgumentParser
that parses an additional argument.
This typically requires the use of hansken.tool.set_command()
, overriding the default behaviour of hansken.py
’s command line.
hansken.tool.create_argument_parser()
returns an ArgumentParser
to which we can add the additional arguments we’ll need.
See documentation on the argparse
module (in Python’s standard library) for more information.
from hansken.query import Term
from hansken.tool import create_argument_parser, run
from hansken.util import format_byte_size
def calc_size(trace, max_depth):
"""
Recursively calculate the 'total size' of a trace. Defined here as either
the size of trace or the sum of the sizes of its children (recursively).
Note that this is by no means meaningful or forensically correct; this is
simply a demonstration of the ability to walk a 'trace tree'.
:param trace: a starting point, a trace with either data or children
:param max_depth: the maximum recursion depth
:return: the 'total size' of trace
"""
if not max_depth:
# stop recursing, we're only proving a point, not trying to break things
return 0
if 'data' in trace.types:
# trace has data, assume it doesn't have any children and return the raw size
return trace.data.raw.size if 'raw' in trace.data_types else 0
else:
# trace has no data, assume it's a folder and sum the sizes of its children
# (and take care to decrement the max depth)
return sum(calc_size(child, max_depth - 1) for child in trace.children)
def total_folder_size(context, args):
"""
Queries the Hansken REST API for folders named *folder_name* (or, more
correctly: folders whose name contains the term *folder_name*) and
calculates their 'total size' by walking the trace tree and summing the
sizes of all the files contained in that folder.
(Yes, that can be done more efficiently by using the folders' ids, but
that's hardly the point ;))
:param context: a ProjectContext instance to interact with
:param args: arguments parsed from the commandline (an
`argparse.Namespace` object)
"""
# use our ProjectContext object as a context manager, automatically
# closing the context (and logging us out) when we're done
with context:
# construct a query object, a boolean conjunction of the queries
# type:folder and folder.name:folder_name (as only folders will have a
# property folder.name, the left operand is effectively useless,
# though)
query = Term('type', 'folder') & Term('folder.name', args.folder_name
# add full=True to make sure we're matching
# the name as-is, without tokenization
# see API doc on hansken.query for more information
full=True)
# submit the search query, loop over the resulting traces
result = context.search(query)
for trace in result:
print('{}: {}'.format(
# we've queried for folders, so we can assume all results will
# have an attribute folder.path
trace.folder.path,
# calculate the total size, format it in a human-readable way
format_byte_size(calc_size(trace, args.max_depth))
))
if __name__ == '__main__':
# create a new argparse.ArgumentParser, let hansken.py add a project id
# argument as the first positional argument
parser = create_argument_parser(requires_project=True,
description='recursively calculate the total size of named folders, '
'up to a maximum depth')
# a positional argument for the name of the folder we want to search for
parser.add_argument('folder_name', help='name of the folders to search for')
# an optional argument to set the max recursion depth, needs to be an int
parser.add_argument('--max-depth', type=int, default=4, help='maximum recursion depth')
# run our command, using the parser we were given (and added bits to)
run(with_context=total_folder_size, using_parser=parser)
Create a new trace from an existing one
Hansken can’t do everything a digital expert could; sometimes the end user knows of something that Hansken doesn’t. Because of this, Hansken allows end users to add traces to the collection of traces produced by Hansken. A common example of this would be the extraction of a database file containing chat messages from an application hasn’t doesnt (yet) support. Armed with a way to select this database, let’s add these traces to the project.
import sqlite3
from hansken.query import Term
from hansken.recipes import export
from hansken.tool import run
def read_messages(database):
"""
Retrieves chat messages from a database.
:param database: a database object to query
:return: a list of (sender, receiver, message) tuples
"""
cursor = database.cursor()
# select sender, receiver and message content from our imaginary database schema
rows = cursor.execute("""SELECT sender, receiver, message FROM messages""").fetchall()
# return all rows from the database
return rows
def extract_chat_databases(context):
"""
Queries the Hansken REST interface for database files named
*my-new-chats.db*, reading chat messages from them and attaching them
to the databases as new child traces in Hansken.
:param context: a ProjectContext instance to interact with
"""
# use our ProjectContext object as a context manager, automatically
# closing the context (and logging us out) when we're done
with context:
# find all files named 'my-new-chats.db'
results = context.search(Term('file.name', 'my-new-chats.db'))
for trace in results:
# export the trace to a temporary file (using a builtin module like tempfile would
# be better, this is left as an exercise to the reader)
export.to_file(trace, '/tmp/temporary-file.db')
# create a new SQLITE database object from our (now local) database file
database = sqlite3.connect('/tmp/temporary-file.db')
# read messages from the database (this returns the 3-tuples we need)
for sender, receiver, message in read_messages(database):
# create a new TraceBuilder to attach a new child trace to database trace
builder = trace.child_builder()
# add properties to our new trace to be, be sure to set the name (which is required)
builder.update({
'name': 'My New Chat Message',
# set a number of chatMessage properties, these need to be known to Hansken
# see Hansken's trace model documentation or be inspired by existing traces
'chatMessage.application': 'My New Chat',
'chatMessage.from': sender,
'chatMessage.to': receiver,
'chatMessage.message': message
# .update() will return the builder, allowing a chained .build() call
# to actually save the new trace
}).build()
if __name__ == '__main__':
# let run() create a ProjectContext for us, call extract_chat_databases with it
run(with_context=extract_chat_databases)
The code above creates chat message traces as child traces of a database.
As TraceBuilder.build()
will return the uid of the newly create trace object, creating a tree-like structure becomes possible.
Using ProjectContext.child_builder
, we can create builders to become child traces to any other trace’ uid, including ones that were only just created.
Create a report from a selection of traces
Having come up with an interesting selection of traces, it’s sometimes needed to create an overview of this selection
for use in another place.
Generating a simple (or complex) report of the selection can be complex, but hansken.py
provides a
reporting recipe that makes parts of this simpler.
This recipe is available of hansken.py
was installed with the “report” or “all” extras.
Firstly, we’ll need to create a template for our report.
Extending from the templates that are provided with hansken.py
makes it easy to create only the things we want to
include in the report.
The reporting recipe uses Jinja2 templates, producing HTML output (for additional information, see the Jinja2 documentation).
By extending the template hansken/base.html
, we need only include an introductory text and call a macro that turns a
selection of traces into a table:
{# extend a provided hansken.py template #}
{% extends 'hansken/base.html' %}
{# base template defines a block "preamble", fill this with an introduction of the report #}
{% block preamble %}
<h1>An example report</h1>
<p>This is an example report rendering a table of interesting traces.</p>
{% endblock %}
{# base template defines a block "content", fill this with the actual tables
(macros are imported by base template by default) #}
{% block content %}
{{ hansken.traces_table(traces, fields) }}
{% endblock %}
Just a template is not going to magically render this report.
hansken.py
has utilities to do most of the magic for us.
from hansken.recipes import report
from hansken.tool import run
def read_template():
"""
Reads the report template from file.
"""
with open('template.html') as template:
# let's say we saved our template to a file named "template.html"
return template.read()
def create_report(context):
"""
Renders a report containing a table of traces to a file named "report.html".
:param context: a ProjectContext instance to interact with
"""
# use render_string to turn a search result into an HTML string
content = report.render_string(
# the string to be rendered is the template defined above
read_template(),
# the template uses a variable "traces" as the list of traces to be turned into a table
# pass a SearchResult to the template as a keyword argument
traces=context.search('type:picture'),
# tell the traces_table macro three trace properties as the table columns
fields=('name', 'picture.width', 'picture.height')
)
with open('report.html', 'w') as out_file:
# write HTML string to a file "report.html"
out_file.write(content)
if __name__ == '__main__':
# let run() take care of creating a context for us, call create_report with that context
run(with_context=create_report)
Jinja2 can do more for us here, ‘manually’ reading a template from file is rarely needed.
Of course, we can create a template in a multiline string or letting Jinja2 take care of reading templates from files.
Additionally, hansken.py
ships with a template that turns a collection of traces into a table.
As that template already extends the base template, the implementation above could also be done as such:
from hansken.recipes import report
from hansken.tool import run
def create_report(context):
"""
Renders a report containing a table of traces to a file named "report.html".
:param context: a ProjectContext instance to interact with
"""
template = """
{% extends 'hansken/table.html' %}
{% block preamble %}
<h1>An example report</h1>
<p>This is an example report rendering a table of interesting traces.</p>
{% endblock %}
"""
# use render_string to turn a search result into an HTML string
content = report.render_string(
template,
# the template uses a variable "traces" as the list of traces to be turned into a table
# pass a SearchResult to the template as a keyword argument
traces=context.search('type:picture'),
# tell the traces_table macro three trace properties as the table columns
fields=('name', 'picture.width', 'picture.height')
)
with open('report.html', 'w') as out_file:
# write HTML string to a file "report.html"
out_file.write(content)
if __name__ == '__main__':
# let run() take care of creating a context for us, call create_report with that context
run(with_context=create_report)
If the preamble in the report generated above isn’t needed, there’s even a to_html_table
shortcut that just turns a
collection of traces into a table.
Search with deduplication
Sometimes you can have a lot of duplicated results from your search query. Currently you can solve this issue by adding the deduplicate_field
parameter to your search request.
You pass a field to the deduplicate_field
that you want to deduplicate on, for example email.messageId
. For instance in the example below the email.messageId
field is used to deduplicated on.
We are searching for emails that contain ‘hello’, that are deduplicated on the field email.messageId
and the count=10
will give the first 10 results (just to keep it simple).
The result shows the number of duplicates found per email.messageId
, the trace name and the trace uid. Duplicates are sorted on the trace id, wich means the that the trace name
would be the name of the trace with the lowest trace id.
from hansken.tool import run
def deduplicated_search(context):
with context:
# search request for emails that contain 'hello', deduplicate on email.messageId and only show first 10 results
results = context.search('type:email hello', deduplicate_field='email.messageId', count=10)
# return a search result per trace and include for each trace the total number of duplicates
for trace, total_duplicates in results.including('totalDuplicates'):
# print the total number of duplicates, the trace name and the trace uid
print(total_duplicates, trace.name, trace.uid)
if __name__ == '__main__':
run(with_context=deduplicated_search)
Add a large datastream to a single Trace
In some cases, we need to add a large datastream (larger than 31MB) to an existing trace. A common use-case for this is downloading encrypted traces from Hansken, and uploading the decrypted data back to the trace.
In this example, we upload an 80MB file to the decrypted
datastream of an existing trace.
from hansken.tool import run
def add_trace_data(context):
with context, open("file.decrypted", "rb") as file:
context.add_trace_data(trace_uid='b0b6f4d6-1119-4957-a9f3-d1d4b691a53c:0-0',
data_type='decrypted',
data=file.read()
)
if __name__ == '__main__':
run(with_context=add_trace_data)
Cleaning trace data with an HQL query
In some cases, we want to clean data belonging to specific traces after an extraction is finished. In some investigations, only specific types of traces are important, so cleaning the other types at extraction time isn’t needed and can save storage.
In this example, we clean all traces of type document
with a size larger than 1KB
with a priority high
.
from hansken.remote import ProjectContext
from hansken.tool import run
def clean_traces(context: ProjectContext):
with context:
context.queue_cleaning(query='type:document AND data.raw.size>1000',
clean_priority='high'
)
if __name__ == '__main__':
run(with_context=clean_traces)
Deleting cleaned data
In some cases it might be necessary to delete cleaned data from the cleaner cache, to force re-cleaning. This may be useful when upgrading to a new cleaners image which improves cleaning some formats for example or when clearing up some space. Each data stream belonging to a trace can have corresponding cleaned data and there are different ways to specify which cleaned data streams to remove.
By default delete_cleaned_data
deletes all the cleaned data belonging to a project. Providing an image_id
deletes all cleaned data for a specific image within
a project. The scope can be further narrowed by providing a trace_id
, which removes all cleaned data belonging to a trace. Providing the data_type
as well only
removes a specific cleaned data stream.
from hansken.remote import ProjectContext
from hansken.tool import run
def delete_cleaned_data_type(context: ProjectContext):
with context:
context.delete_cleaned_data(trace_uid='cf3bde90-df1b-4c9f-96c8-3d043d202c59:0-0', data_type='raw')
def delete_cleaned_trace(context: ProjectContext):
with context:
context.delete_cleaned_data(trace_uid='cf3bde90-df1b-4c9f-96c8-3d043d202c59:0-0')
def delete_cleaned_image(context: ProjectContext):
with context:
context.delete_cleaned_data(image_id='cf3bde90-df1b-4c9f-96c8-3d043d202c59')
def delete_cleaned_project(context: ProjectContext):
with context:
context.delete_cleaned_data()
if __name__ == '__main__':
run(with_context=delete_clean_traces)