Coverage for drivers/plugins/keymanagerutil.py : 61%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/python3
3"""
4Helper module used for XenRT testing of the VDI encryption feature (REQ-718).
5This module implements the key lookup plugin interface, so if it is
6installed, SM will use it to retrieve keys based on their hashes.
7This key store is backed by a file stored on disk in dom0, and helper
8functions are provided to manipulate it.
9"""
10import base64
11import os
12import os.path
13import hashlib
14import json
15import argparse
16import string
17import sys
18from random import SystemRandom
20sys.path.append('/opt/xensource/sm/')
21import util
23import XenAPI
25PROGRAM_NAME = 'keymanagerutil'
28def load_key(key_hash, vdi_uuid):
29 """
30 load_key is called by SM plugin when it needs to find the key for
31 specified key_hash from the key store
32 """
33 _check_key(key_hash, vdi_uuid)
34 try:
35 key = KeyManager(key_hash=key_hash).get_key(log_key_info=False)
36 return key
37 except KeyLookUpError:
38 return None
41def _check_key(key_hash, vdi_uuid):
42 session = XenAPI.xapi_local()
43 session.xenapi.login_with_password('root', '', '', PROGRAM_NAME)
44 try:
45 vdi = session.xenapi.VDI.get_by_uuid(vdi_uuid)
46 sm_config = session.xenapi.VDI.get_sm_config(vdi)
47 if 'key_hash' in sm_config:
48 if key_hash != sm_config['key_hash']:
49 raise Exception('A key was requested with key hash {}'
50 ' for VDI {}, but it has a different'
51 ' key_hash in its sm_config:'
52 ' {}'.format(key_hash, vdi_uuid, sm_config['key_hash']))
53 else:
54 raise Exception('Encryption key requested for VDI {}'
55 ' whose sm_config does not contain the key_hash'
56 ' entry. Its sm_config is {}'.format(vdi_uuid, sm_config))
57 finally:
58 session.xenapi.logout()
61class InputError(Exception):
62 def __init__(self, message):
63 super(InputError, self).__init__(message)
66class KeyLookUpError(Exception):
67 """Raised when the key / key hash we've requested is not in the keystore"""
69 def __init__(self, message):
70 super(KeyLookUpError, self).__init__(message)
73def _print_key_info(key=None, key_hash=None):
74 """
75 Output the key details as JSON to the standard output. This output
76 will be interpreted by XenRT.
77 """
78 data = {}
79 if key:
80 data['key_base64'] = base64.b64encode(key).decode()
81 if key_hash: 81 ↛ 83line 81 didn't jump to line 83, because the condition on line 81 was never false
82 data['key_hash'] = key_hash
83 print(json.dumps(data))
86KEYSTORE_PATH = '/tmp/keystore.json'
89def _read_keystore():
90 """If the keystore file exists, returns its contents, otherwise returns an empty dictionary."""
91 if os.path.isfile(KEYSTORE_PATH):
92 with open(KEYSTORE_PATH, "r") as key_store_file:
93 key_store = json.load(key_store_file)
94 for key_hash in key_store:
95 key_base64 = key_store[key_hash]
96 key = base64.b64decode(key_base64)
97 key_store[key_hash] = key
98 return key_store
99 else:
100 util.SMlog(f'Keystore path {KEYSTORE_PATH} not found')
101 return {}
104def _write_keystore(key_store):
105 """
106 Write the given key store contents to the key store file, which will be
107 created if it does not exist.
108 """
109 for key_hash in key_store:
110 key = key_store[key_hash]
111 key_base64 = base64.b64encode(key).decode()
112 key_store[key_hash] = key_base64
113 with open(KEYSTORE_PATH, "w+") as key_store_file:
114 json.dump(key_store, key_store_file)
115 key_store_file.write("\n")
118class KeyManager(object):
119 """
120 KeyManager is a python utility tool for generating and managing the keys in the jey store.
121 One can request KeyManager to generate the keys, passing just the type of
122 the key - either strong or weak or even the length of the key.
123 One can request KeyManger to get the key from the key store by passing key_hash.
124 One can request KeyManager to get the key_hash from the key store by passing encryption key.
125 KeyManager maintains the keystore(json record) under /tmp/keystore.json.
127 """
129 def __init__(self, key_type=None, key_length=None, key=None, key_hash=None):
130 self.key_type = key_type
131 self.key_length = key_length
132 self.key = key
133 self.key_hash = key_hash
135 def __add_to_keystore(self):
136 """
137 Update the key and key hash in the key store - requires both hash and
138 key.
139 """
140 if not self.key_hash or not self.key: 140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true
141 raise InputError("Need both key_hash and key to update into key store")
142 key_store = _read_keystore()
143 key_store[self.key_hash] = self.key
144 _write_keystore(key_store)
146 def __hash_key(self):
148 # hash the given key - requires key
149 if not self.key: 149 ↛ 150line 149 didn't jump to line 150, because the condition on line 149 was never true
150 raise InputError("Need key to hash")
152 hash_it = hashlib.new('sha256')
153 hash_it.update(b'\0' * 32)
154 hash_it.update(self.key)
155 self.key_hash = hash_it.hexdigest()
156 return self.key_hash
158 def generate(self):
159 """
160 generate the encryption key
161 Hash the generated key
162 Update the key store with key and hash
163 """
164 util.SMlog('Generating key')
165 self.key = _get_key_generator(key_length=self.key_length, key_type=self.key_type).generate()
166 self.key_hash = self.__hash_key()
167 _print_key_info(key=self.key, key_hash=self.key_hash)
168 util.SMlog(f'Generated key, hash {self.key_hash}')
169 self.__add_to_keystore()
171 def get_key(self, log_key_info=True):
172 """Fetch the key from the key store based on the key_hash - requires key hash"""
173 if not self.key_hash:
174 util.SMlog('No key hash set, cannot retrieve unknown key')
175 raise InputError("Need key hash to get the key from the key store")
177 key_store = _read_keystore()
178 key = key_store.get(self.key_hash, None)
179 if key and log_key_info: 179 ↛ 180line 179 didn't jump to line 180, because the condition on line 179 was never true
180 _print_key_info(key=key)
181 if not key:
182 util.SMlog(f'No key found in keystore for hash {self.key_hash}, known hashes {key_store.keys()}')
183 raise KeyLookUpError("No keys in the keystore which matches the given key hash")
185 return key
187 def get_keyhash(self):
188 """Fetch the key hash from the key store based on the key - requires key"""
189 if not self.key: 189 ↛ 190line 189 didn't jump to line 190, because the condition on line 189 was never true
190 raise InputError("Need key to get the key hash from the key store")
191 key_store = _read_keystore()
192 try:
193 hashes = [x for x in key_store.keys() if key_store[x] == self.key]
194 key_hash = hashes[0] if hashes else None
195 _print_key_info(key_hash=key_hash)
196 except ValueError:
197 raise KeyLookUpError("No key hash in the keystore which matches the given key")
199 def update_keystore(self):
200 """If this key hash is already in the key store, update its corresponding key"""
202 if not (self.key_hash and self.key):
203 raise InputError("Need key hash and key to update the key store")
205 key_store = _read_keystore()
206 if self.key_hash in key_store:
207 key_store[self.key_hash] = self.key
208 else:
209 raise InputError("No existing key in the keystore"
210 "with key hash {}".format(self.key_hash))
211 _write_keystore(key_store)
214def _get_key_generator(key_length=None, key_type=None):
215 if key_type == "alphanumeric": 215 ↛ 216line 215 didn't jump to line 216, because the condition on line 215 was never true
216 return AlphaNumericKeyGenerator(key_length=key_length)
217 elif key_length: 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true
218 return RandomKeyGenerator(key_length=key_length)
219 elif key_type == "weak": 219 ↛ 220line 219 didn't jump to line 220, because the condition on line 219 was never true
220 return WeakKeyGenerator()
221 elif key_type == "strong": 221 ↛ 224line 221 didn't jump to line 224, because the condition on line 221 was never false
222 return StrongKeyGenerator()
223 else:
224 raise InputError("Either key_length in byte or key_type(\"strong OR weak\")"
225 " should be specified to generate the key")
228class RandomKeyGenerator(object):
229 """Generates a completely random key of the specified length"""
231 def __init__(self, key_length):
232 self.key_length = key_length
234 def generate(self):
235 """Generate a completely random byte sequence"""
236 return os.urandom(self.key_length)
239class StrongKeyGenerator(RandomKeyGenerator):
240 """Generates a completely random 512-bit key"""
242 def __init__(self):
243 super(StrongKeyGenerator, self).__init__(key_length=64)
246class WeakKeyGenerator(RandomKeyGenerator):
247 """Generates a completely random 256-bit key"""
249 def __init__(self):
250 super(WeakKeyGenerator, self).__init__(key_length=32)
253class AlphaNumericKeyGenerator(object):
254 """Generates alphanumeric keys"""
256 def __init__(self, key_length=None):
257 self.key_length = 64 if key_length is None else key_length
259 def generate(self):
260 """Generate a completely random alphanumeric sequence"""
261 keys_from = string.ascii_letters + string.digits
262 return ("".join([SystemRandom().choice(keys_from) for _ in range(self.key_length)])).encode("utf-8")
265if __name__ == '__main__': 265 ↛ 267line 265 didn't jump to line 267, because the condition on line 265 was never true
267 parser = argparse.ArgumentParser()
269 parser.add_argument('--generatekey', action='store_true', dest='generate',
270 default=False,
271 help="Generates the encryption key based on the given either keytype or keylength")
273 parser.add_argument('--getkey', action='store_true', dest='get_key',
274 default=False, help="To get the key from the keystore based on the given key hash")
276 parser.add_argument('--getkeyhash', action='store_true', dest='get_key_hash',
277 default=False, help="To get the key hash from the keystore based on the given key")
279 parser.add_argument('--updatekeystore', action='store_true', dest='update_keystore',
280 default=False,
281 help="If needs to update the already existing key in the keystore pass the keyHash and new key")
283 parser.add_argument('--keytype', action='store', dest='key_type', default=None,
284 help='Type of the key: values expected weak or strong')
286 parser.add_argument('--keylength', action='store', default=None, type=int,
287 dest='key_length',
288 help='length of the encryption key in byte')
290 parser.add_argument('--keyhash', action='store', dest='key_hash', default=None,
291 help='Encryption key')
293 parser.add_argument('--key', action='store', dest='key', default=None,
294 help='Base64-encoded encryption key')
296 parser_input = parser.parse_args()
298 if parser_input.key:
299 parser_input.key = base64.b64decode(parser_input.key)
301 if parser_input.generate:
302 KeyManager(key_type=parser_input.key_type, key_length=parser_input.key_length).generate()
303 elif parser_input.get_key:
304 KeyManager(key_hash=parser_input.key_hash).get_key()
305 elif parser_input.get_key_hash:
306 KeyManager(key=parser_input.key).get_keyhash()
307 elif parser_input.update_keystore:
308 KeyManager(key_hash=parser_input.key_hash, key=parser_input.key).update_keystore()