Coverage for drivers/plugins/keymanagerutil : 0%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
#!/usr/bin/python
""" Helper module used for XenRT testing of the VDI encryption feature (REQ-718). This module implements the key lookup plugin interface, so if it is installed, SM will use it to retrieve keys based on their hashes. This key store is backed by a file stored on disk in dom0, and helper functions are provided to manipulate it. """ from __future__ import print_function
import base64 import os import os.path import hashlib import json import argparse import string from random import SystemRandom
import XenAPI
PROGRAM_NAME = 'keymanagerutil'
def load_key(key_hash, vdi_uuid): """ load_key is called by SM plugin when it needs to find the key for specified key_hash from the key store """ _check_key(key_hash, vdi_uuid) try: key = KeyManager(key_hash=key_hash).get_key(log_key_info=False) return key except KeyLookUpError: return None
def _check_key(key_hash, vdi_uuid): session = XenAPI.xapi_local() session.xenapi.login_with_password('root', '', '', PROGRAM_NAME) try: vdi = session.xenapi.VDI.get_by_uuid(vdi_uuid) sm_config = session.xenapi.VDI.get_sm_config(vdi) if 'key_hash' in sm_config: if key_hash != sm_config['key_hash']: raise Exception('A key was requested with key hash {}' ' for VDI {}, but it has a different' ' key_hash in its sm_config:' ' {}'.format(key_hash, vdi_uuid, sm_config['key_hash'])) else: raise Exception('Encryption key requested for VDI {}' ' whose sm_config does not contain the key_hash' ' entry. Its sm_config is {}'.format(vdi_uuid, sm_config)) finally: session.xenapi.logout()
class InputError(Exception): def __init__(self, message): super(InputError, self).__init__(message)
class KeyLookUpError(Exception): """Raised when the key / key hash we've requested is not in the keystore""" def __init__(self, message): super(KeyLookUpError, self).__init__(message)
def _print_key_info(key=None, key_hash=None): """ Output the key details as JSON to the standard output. This output will be interpreted by XenRT. """ data = {} if key: data['key_base64'] = base64.b64encode(key) if key_hash: data['key_hash'] = key_hash print(json.dumps(data))
KEYSTORE_PATH = '/tmp/keystore.json'
def _read_keystore(): """If the keystore file exists, returns its contents, otherwise returns an empty dictionary.""" if os.path.isfile(KEYSTORE_PATH): with open(KEYSTORE_PATH, "r") as key_store_file: key_store = json.load(key_store_file) for key_hash in key_store: key_base64 = key_store[key_hash] key = base64.b64decode(key_base64) key_store[key_hash] = key return key_store else: return {}
def _write_keystore(key_store): """ Write the given key store contents to the key store file, which will be created if it does not exist. """ for key_hash in key_store: key = key_store[key_hash] key_base64 = base64.b64encode(key) key_store[key_hash] = key_base64 with open(KEYSTORE_PATH, "w+") as key_store_file: json.dump(key_store, key_store_file) key_store_file.write("\n")
class KeyManager(object):
""" KeyManager is a python utility tool for generating and managing the keys in the jey store. One can request KeyManager to generate the keys, passing just the type of the key - either strong or weak or even the length of the key. One can request KeyManger to get the key from the key store by passing key_hash. One can request KeyManager to get the key_hash from the key store by passing encryption key. KeyManager maintains the keystore(json record) under /tmp/keystore.json.
""" def __init__(self, key_type=None, key_length=None, key=None, key_hash=None): self.key_type = key_type self.key_length = key_length self.key = key self.key_hash = key_hash
def __add_to_keystore(self): """ Update the key and key hash in the key store - requires both hash and key. """ if not self.key_hash or not self.key: raise InputError("Need both key_hash and key to update into key store") key_store = _read_keystore() key_store[self.key_hash] = self.key _write_keystore(key_store)
def __hash_key(self):
#hash the given key - requires key if not self.key: raise InputError("Need key to hash")
hash_it = hashlib.new('sha256') hash_it.update(b'\0' * 32) hash_it.update(self.key) self.key_hash = hash_it.hexdigest() return self.key_hash
def generate(self): """ generate the encryption key Hash the generated key Update the key store with key and hash """ self.key = _get_key_generator(key_length=self.key_length, key_type=self.key_type).generate() self.key_hash = self.__hash_key() _print_key_info(key=self.key, key_hash=self.key_hash) self.__add_to_keystore()
def get_key(self, log_key_info=True): """Fetch the key from the key store based on the key_hash - requires key hash""" if not self.key_hash: raise InputError("Need key hash to get the key from the key store")
key_store = _read_keystore() key = key_store.get(self.key_hash, None) if key and log_key_info: _print_key_info(key=key) if not key: raise KeyLookUpError("No keys in the keystore which matches the given key hash")
return key
def get_keyhash(self): """Fetch the key hash from the key store based on the key - requires key""" if not self.key: raise InputError("Need key to get the key hash from the key store") key_store = _read_keystore() try: key_hash = key_store.keys()[key_store.values().index(self.key)] _print_key_info(key_hash=key_hash) except ValueError: raise KeyLookUpError("No key hash in the keystore which matches the given key")
def update_keystore(self): """If this key hash is already in the key store, update its corresponding key"""
if not (self.key_hash and self.key): raise InputError("Need key hash and key to update the key store")
key_store = _read_keystore() if self.key_hash in key_store: key_store[self.key_hash] = self.key else: raise InputError("No existing key in the keystore" "with key hash {}".format(self.key_hash)) _write_keystore(key_store)
def _get_key_generator(key_length=None, key_type=None): if key_type == "alphanumeric": return AlphaNumericKeyGenerator(key_length=key_length) elif key_length: return RandomKeyGenerator(key_length=key_length) elif key_type == "weak": return WeakKeyGenerator() elif key_type == "strong": return StrongKeyGenerator() else: raise InputError("Either key_length in byte or key_type(\"strong OR weak\")" " should be specified to generate the key")
class RandomKeyGenerator(object): """Generates a completely random key of the specified length"""
def __init__(self, key_length): self.key_length = key_length
def generate(self): """Generate a completely random byte sequence""" return os.urandom(self.key_length)
class StrongKeyGenerator(RandomKeyGenerator): """Generates a completely random 512-bit key"""
def __init__(self): super(StrongKeyGenerator, self).__init__(key_length=64)
class WeakKeyGenerator(RandomKeyGenerator): """Generates a completely random 256-bit key"""
def __init__(self): super(WeakKeyGenerator, self).__init__(key_length=32)
class AlphaNumericKeyGenerator(object): """Generates alphanumeric keys"""
def __init__(self, key_length=None): self.key_length = 64 if key_length is None else key_length
def generate(self): """Generate a completely random alphanumeric sequence""" keys_from = string.ascii_letters + string.digits return "".join([SystemRandom().choice(keys_from) for _ in range(self.key_length)])
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--generatekey', action='store_true', dest='generate', default=False, help="Generates the encryption key based on the given either keytype or keylength")
parser.add_argument('--getkey', action='store_true', dest='get_key', default=False, help="To get the key from the keystore based on the given key hash")
parser.add_argument('--getkeyhash', action='store_true', dest='get_key_hash', default=False, help="To get the key hash from the keystore based on the given key")
parser.add_argument('--updatekeystore', action='store_true', dest='update_keystore', default=False, help="If needs to update the already existing key in the keystore pass the keyHash and new key")
parser.add_argument('--keytype', action='store', dest='key_type', default=None, help='Type of the key: values expected weak or strong')
parser.add_argument('--keylength', action='store', default=None, type=int, dest='key_length', help='length of the encryption key in byte')
parser.add_argument('--keyhash', action='store', dest='key_hash', default=None, help='Encryption key')
parser.add_argument('--key', action='store', dest='key', default=None, help='Base64-encoded encryption key')
parser_input = parser.parse_args()
if parser_input.key: parser_input.key = base64.b64decode(parser_input.key)
if parser_input.generate: KeyManager(key_type=parser_input.key_type, key_length=parser_input.key_length).generate() elif parser_input.get_key: KeyManager(key_hash=parser_input.key_hash).get_key() elif parser_input.get_key_hash: KeyManager(key=parser_input.key).get_keyhash() elif parser_input.update_keystore: KeyManager(key_hash=parser_input.key_hash, key=parser_input.key).update_keystore()
|