Source code for linux_utils.luks

# linux-utils: Linux system administration tools for Python.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: February 9, 2020
# URL: https://linux-utils.readthedocs.io

"""
Python API for cryptsetup to control LUKS_ full disk encryption.

The functions in this module serve two distinct purposes:

**Low level Python API for cryptsetup**
 The following functions and class provide a low level Python API for the basic
 functionality of :man:`cryptsetup`:

 - :func:`create_image_file()`
 - :func:`generate_key_file()`
 - :func:`create_encrypted_filesystem()`
 - :func:`unlock_filesystem()`
 - :func:`lock_filesystem()`
 - :class:`TemporaryKeyFile()`

 This functionality make it easier for me to write test suites for Python
 projects involving full disk encryption, for example crypto-drive-manager_
 and rsync-system-backup_.

**Python implementation of cryptdisks_start and cryptdisks_stop**
 The command line programs :man:`cryptdisks_start` and :man:`cryptdisks_stop`
 are easy to use wrappers for :man:`cryptsetup` that parse `/etc/crypttab`_ to
 find the information they need.

 The nice thing about `/etc/crypttab`_ is that it provides a central place to
 configure the names of encrypted filesystems, so that you can refer to a
 symbolic name instead of having to constantly repeat all of the necessary
 information (the target name, source device, key file and encryption
 options).

 A not so nice thing about cryptdisks_start_ and cryptdisks_stop_ is that these
 programs (and the whole `/etc/crypttab`_ convention) appear to be specific to
 the Debian_ ecosystem.

 The functions :func:`cryptdisks_start()` and :func:`cryptdisks_stop()` emulate
 the behavior of the command line programs when needed so that Linux
 distributions that don't offer these programs can still be supported by
 projects like crypto-drive-manager_ and rsync-system-backup_.

.. _LUKS: https://en.wikipedia.org/wiki/Linux_Unified_Key_Setup
.. _crypto-drive-manager: https://pypi.python.org/pypi/crypto-drive-manager
.. _rsync-system-backup: https://pypi.python.org/pypi/rsync-system-backup
.. _/etc/crypttab: https://manpages.debian.org/crypttab
.. _Debian: https://en.wikipedia.org/wiki/Debian
"""

# Standard library modules.
import logging

# External dependencies.
from executor import ExternalCommandFailed, quote
from humanfriendly.prompts import retry_limit

# Modules included in our package.
from linux_utils import coerce_context, coerce_size
from linux_utils.crypttab import parse_crypttab

# Public identifiers that require documentation.
__all__ = (
    'DEFAULT_KEY_SIZE',
    'TemporaryKeyFile',
    'create_encrypted_filesystem',
    'create_image_file',
    'cryptdisks_start',
    'cryptdisks_stop',
    'generate_key_file',
    'lock_filesystem',
    'logger',
    'unlock_filesystem',
)

DEFAULT_KEY_SIZE = 2048
"""The default size (in bytes) of key files generated by :func:`generate_key_file()` (a number)."""

# Initialize a logger for this module.
logger = logging.getLogger(__name__)


[docs]def create_image_file(filename, size, context=None): r""" Create an image file filled with bytes containing zero (``\0``). :param filename: The pathname of the image file (a string). :param size: How large the image file should be (see :func:`.coerce_size()`). :param context: See :func:`.coerce_context()` for details. :raises: :exc:`~exceptions.ValueError` when `size` is invalid, :exc:`~executor.ExternalCommandFailed` when the command fails. """ context = coerce_context(context) size = coerce_size(size) logger.debug("Creating image file of %i bytes: %s", size, filename) head_command = 'head --bytes=%i /dev/zero > %s' context.execute(head_command % (size, quote(filename)), shell=True, tty=False)
[docs]def generate_key_file(filename, size=DEFAULT_KEY_SIZE, context=None): """ Generate a file with random contents that can be used as a key file. :param filename: The pathname of the key file (a string). :param size: How large the key file should be (see :func:`.coerce_size()`, defaults to :data:`DEFAULT_KEY_SIZE`). :param context: See :func:`.coerce_context()` for details. :raises: :exc:`~executor.ExternalCommandFailed` when the command fails. """ context = coerce_context(context) size = coerce_size(size) logger.debug("Creating key file of %i bytes: %s", size, filename) context.execute( 'dd', 'if=/dev/urandom', 'of=%s' % filename, 'bs=%i' % size, 'count=1', # I'd rather use `status=none' then silent=True, however the # `status=none' flag isn't supported on Ubuntu 12.04 which # currently runs on Travis CI, so there you go :-p. silent=True, sudo=True, ) context.execute('chown', 'root:root', filename, sudo=True) context.execute('chmod', '400', filename, sudo=True)
[docs]def create_encrypted_filesystem(device_file, key_file=None, context=None): """ Create an encrypted LUKS filesystem. :param device_file: The pathname of the block special device or file (a string). :param key_file: The pathname of the key file used to encrypt the filesystem (a string or :data:`None`). :param context: See :func:`.coerce_context()` for details. :raises: :exc:`~executor.ExternalCommandFailed` when the command fails. If no `key_file` is given the operator is prompted to choose a password. """ context = coerce_context(context) logger.debug("Creating encrypted filesystem on %s ..", device_file) format_command = ['cryptsetup'] if key_file: format_command.append('--batch-mode') format_command.append('luksFormat') format_command.append(device_file) if key_file: format_command.append(key_file) context.execute(*format_command, sudo=True, tty=(key_file is None))
[docs]def unlock_filesystem(device_file, target, key_file=None, options=None, context=None): """ Unlock an encrypted LUKS filesystem. :param device_file: The pathname of the block special device or file (a string). :param target: The mapped device name (a string). :param key_file: The pathname of the key file used to encrypt the filesystem (a string or :data:`None`). :param options: An iterable of strings with encryption options or :data:`None` (in which case the default options are used). Currently 'discard', 'readonly' and 'tries' are the only supported options (other options are silently ignored). :param context: See :func:`.coerce_context()` for details. :raises: :exc:`~executor.ExternalCommandFailed` when the command fails. If no `key_file` is given the operator is prompted to enter a password. """ context = coerce_context(context) logger.debug("Unlocking filesystem %s ..", device_file) tries = 3 open_command = ['cryptsetup'] open_options = [] if key_file: open_options.append('--key-file=%s' % key_file) if options: for opt in options: if opt == 'discard': open_options.append('--allow-discards') elif opt == 'readonly': open_options.append('--readonly') elif opt.startswith('tries='): name, _, value = opt.partition('=') tries = int(value) open_command.extend(sorted(open_options)) open_command.extend(['luksOpen', device_file, target]) for attempt in retry_limit(tries): try: context.execute(*open_command, sudo=True, tty=(key_file is None)) except ExternalCommandFailed: if attempt < tries and not key_file: logger.warning("Failed to unlock, retrying ..") else: raise else: break
[docs]def lock_filesystem(target, context=None): """ Lock a currently unlocked LUKS filesystem. :param target: The mapped device name (a string). :param context: See :func:`.coerce_context()` for details. :raises: :exc:`~executor.ExternalCommandFailed` when the command fails. """ context = coerce_context(context) logger.debug("Locking filesystem %s ..", target) close_command = ['cryptsetup', 'luksClose', target] context.execute(*close_command, sudo=True, tty=False)
[docs]def cryptdisks_start(target, context=None): """ Execute :man:`cryptdisks_start` or emulate its functionality. :param target: The mapped device name (a string). :param context: See :func:`.coerce_context()` for details. :raises: :exc:`~executor.ExternalCommandFailed` when a command fails, :exc:`~exceptions.ValueError` when no entry in `/etc/crypttab`_ matches `target`. """ context = coerce_context(context) logger.debug("Checking if `cryptdisks_start' program is installed ..") if context.find_program('cryptdisks_start'): logger.debug("Using the real `cryptdisks_start' program ..") context.execute('cryptdisks_start', target, sudo=True) else: logger.debug("Emulating `cryptdisks_start' functionality (program not installed) ..") for entry in parse_crypttab(context=context): if entry.target == target and 'luks' in entry.options: logger.debug("Matched /etc/crypttab entry: %s", entry) if entry.is_unlocked: logger.debug("Encrypted filesystem is already unlocked, doing nothing ..") else: unlock_filesystem(context=context, device_file=entry.source_device, key_file=entry.key_file, options=entry.options, target=entry.target) break else: msg = "Encrypted filesystem not listed in /etc/crypttab! (%r)" raise ValueError(msg % target)
[docs]def cryptdisks_stop(target, context=None): """ Execute :man:`cryptdisks_stop` or emulate its functionality. :param target: The mapped device name (a string). :param context: See :func:`.coerce_context()` for details. :raises: :exc:`~executor.ExternalCommandFailed` when a command fails, :exc:`~exceptions.ValueError` when no entry in `/etc/crypttab`_ matches `target`. """ context = coerce_context(context) logger.debug("Checking if `cryptdisks_stop' program is installed ..") if context.find_program('cryptdisks_stop'): logger.debug("Using the real `cryptdisks_stop' program ..") context.execute('cryptdisks_stop', target, sudo=True) else: logger.debug("Emulating `cryptdisks_stop' functionality (program not installed) ..") for entry in parse_crypttab(context=context): if entry.target == target and 'luks' in entry.options: logger.debug("Matched /etc/crypttab entry: %s", entry) if entry.is_unlocked: lock_filesystem(context=context, target=target) else: logger.debug("Encrypted filesystem is already locked, doing nothing ..") break else: msg = "Encrypted filesystem not listed in /etc/crypttab! (%r)" raise ValueError(msg % target)
[docs]class TemporaryKeyFile(object): """Context manager that makes it easier to work with temporary key files."""
[docs] def __init__(self, filename, size=DEFAULT_KEY_SIZE, context=None): """ Initialize a :class:`TemporaryKeyFile` object. Refer to :func:`generate_key_file()` for details about argument handling. """ self.context = coerce_context(context) self.filename = filename self.size = size
[docs] def __enter__(self): """Generate the temporary key file.""" generate_key_file( context=self.context, filename=self.filename, size=self.size, )
[docs] def __exit__(self, exc_type=None, exc_value=None, traceback=None): """Delete the temporary key file.""" self.context.execute('rm', '--force', self.filename, sudo=True)