Coverage for drivers/plugins/keymanagerutil.py : 0%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/python3
3"""
4Helper module used for XenRT testing of the VDI encryption feature (REQ-718).
5This module implements the key lookup plugin interface, so if it is
6installed, SM will use it to retrieve keys based on their hashes.
7This key store is backed by a file stored on disk in dom0, and helper
8functions are provided to manipulate it.
9"""
10import base64
11import os
12import os.path
13import hashlib
14import json
15import argparse
16import string
17from random import SystemRandom
19import XenAPI
21PROGRAM_NAME = 'keymanagerutil'
24def load_key(key_hash, vdi_uuid):
25 """
26 load_key is called by SM plugin when it needs to find the key for
27 specified key_hash from the key store
28 """
29 _check_key(key_hash, vdi_uuid)
30 try:
31 key = KeyManager(key_hash=key_hash).get_key(log_key_info=False)
32 return key
33 except KeyLookUpError:
34 return None
37def _check_key(key_hash, vdi_uuid):
38 session = XenAPI.xapi_local()
39 session.xenapi.login_with_password('root', '', '', PROGRAM_NAME)
40 try:
41 vdi = session.xenapi.VDI.get_by_uuid(vdi_uuid)
42 sm_config = session.xenapi.VDI.get_sm_config(vdi)
43 if 'key_hash' in sm_config:
44 if key_hash != sm_config['key_hash']:
45 raise Exception('A key was requested with key hash {}'
46 ' for VDI {}, but it has a different'
47 ' key_hash in its sm_config:'
48 ' {}'.format(key_hash, vdi_uuid, sm_config['key_hash']))
49 else:
50 raise Exception('Encryption key requested for VDI {}'
51 ' whose sm_config does not contain the key_hash'
52 ' entry. Its sm_config is {}'.format(vdi_uuid, sm_config))
53 finally:
54 session.xenapi.logout()
57class InputError(Exception):
58 def __init__(self, message):
59 super(InputError, self).__init__(message)
62class KeyLookUpError(Exception):
63 """Raised when the key / key hash we've requested is not in the keystore"""
65 def __init__(self, message):
66 super(KeyLookUpError, self).__init__(message)
69def _print_key_info(key=None, key_hash=None):
70 """
71 Output the key details as JSON to the standard output. This output
72 will be interpreted by XenRT.
73 """
74 data = {}
75 if key:
76 data['key_base64'] = base64.b64encode(key).decode()
77 if key_hash:
78 data['key_hash'] = key_hash
79 print(json.dumps(data))
82KEYSTORE_PATH = '/tmp/keystore.json'
85def _read_keystore():
86 """If the keystore file exists, returns its contents, otherwise returns an empty dictionary."""
87 if os.path.isfile(KEYSTORE_PATH):
88 with open(KEYSTORE_PATH, "r") as key_store_file:
89 key_store = json.load(key_store_file)
90 for key_hash in key_store:
91 key_base64 = key_store[key_hash]
92 key = base64.b64decode(key_base64)
93 key_store[key_hash] = key
94 return key_store
95 else:
96 return {}
99def _write_keystore(key_store):
100 """
101 Write the given key store contents to the key store file, which will be
102 created if it does not exist.
103 """
104 for key_hash in key_store:
105 key = key_store[key_hash]
106 key_base64 = base64.b64encode(key).decode()
107 key_store[key_hash] = key_base64
108 with open(KEYSTORE_PATH, "w+") as key_store_file:
109 json.dump(key_store, key_store_file)
110 key_store_file.write("\n")
113class KeyManager(object):
114 """
115 KeyManager is a python utility tool for generating and managing the keys in the jey store.
116 One can request KeyManager to generate the keys, passing just the type of
117 the key - either strong or weak or even the length of the key.
118 One can request KeyManger to get the key from the key store by passing key_hash.
119 One can request KeyManager to get the key_hash from the key store by passing encryption key.
120 KeyManager maintains the keystore(json record) under /tmp/keystore.json.
122 """
124 def __init__(self, key_type=None, key_length=None, key=None, key_hash=None):
125 self.key_type = key_type
126 self.key_length = key_length
127 self.key = key
128 self.key_hash = key_hash
130 def __add_to_keystore(self):
131 """
132 Update the key and key hash in the key store - requires both hash and
133 key.
134 """
135 if not self.key_hash or not self.key:
136 raise InputError("Need both key_hash and key to update into key store")
137 key_store = _read_keystore()
138 key_store[self.key_hash] = self.key
139 _write_keystore(key_store)
141 def __hash_key(self):
143 # hash the given key - requires key
144 if not self.key:
145 raise InputError("Need key to hash")
147 hash_it = hashlib.new('sha256')
148 hash_it.update(b'\0' * 32)
149 hash_it.update(self.key)
150 self.key_hash = hash_it.hexdigest()
151 return self.key_hash
153 def generate(self):
154 """
155 generate the encryption key
156 Hash the generated key
157 Update the key store with key and hash
158 """
159 self.key = _get_key_generator(key_length=self.key_length, key_type=self.key_type).generate()
160 self.key_hash = self.__hash_key()
161 _print_key_info(key=self.key, key_hash=self.key_hash)
162 self.__add_to_keystore()
164 def get_key(self, log_key_info=True):
165 """Fetch the key from the key store based on the key_hash - requires key hash"""
166 if not self.key_hash:
167 raise InputError("Need key hash to get the key from the key store")
169 key_store = _read_keystore()
170 key = key_store.get(self.key_hash, None)
171 if key and log_key_info:
172 _print_key_info(key=key)
173 if not key:
174 raise KeyLookUpError("No keys in the keystore which matches the given key hash")
176 return key
178 def get_keyhash(self):
179 """Fetch the key hash from the key store based on the key - requires key"""
180 if not self.key:
181 raise InputError("Need key to get the key hash from the key store")
182 key_store = _read_keystore()
183 try:
184 key_hash = key_store.keys()[key_store.values().index(self.key)]
185 _print_key_info(key_hash=key_hash)
186 except ValueError:
187 raise KeyLookUpError("No key hash in the keystore which matches the given key")
189 def update_keystore(self):
190 """If this key hash is already in the key store, update its corresponding key"""
192 if not (self.key_hash and self.key):
193 raise InputError("Need key hash and key to update the key store")
195 key_store = _read_keystore()
196 if self.key_hash in key_store:
197 key_store[self.key_hash] = self.key
198 else:
199 raise InputError("No existing key in the keystore"
200 "with key hash {}".format(self.key_hash))
201 _write_keystore(key_store)
204def _get_key_generator(key_length=None, key_type=None):
205 if key_type == "alphanumeric":
206 return AlphaNumericKeyGenerator(key_length=key_length)
207 elif key_length:
208 return RandomKeyGenerator(key_length=key_length)
209 elif key_type == "weak":
210 return WeakKeyGenerator()
211 elif key_type == "strong":
212 return StrongKeyGenerator()
213 else:
214 raise InputError("Either key_length in byte or key_type(\"strong OR weak\")"
215 " should be specified to generate the key")
218class RandomKeyGenerator(object):
219 """Generates a completely random key of the specified length"""
221 def __init__(self, key_length):
222 self.key_length = key_length
224 def generate(self):
225 """Generate a completely random byte sequence"""
226 return os.urandom(self.key_length)
229class StrongKeyGenerator(RandomKeyGenerator):
230 """Generates a completely random 512-bit key"""
232 def __init__(self):
233 super(StrongKeyGenerator, self).__init__(key_length=64)
236class WeakKeyGenerator(RandomKeyGenerator):
237 """Generates a completely random 256-bit key"""
239 def __init__(self):
240 super(WeakKeyGenerator, self).__init__(key_length=32)
243class AlphaNumericKeyGenerator(object):
244 """Generates alphanumeric keys"""
246 def __init__(self, key_length=None):
247 self.key_length = 64 if key_length is None else key_length
249 def generate(self):
250 """Generate a completely random alphanumeric sequence"""
251 keys_from = string.ascii_letters + string.digits
252 return ("".join([SystemRandom().choice(keys_from) for _ in range(self.key_length)])).encode("utf-8")
255if __name__ == '__main__':
257 parser = argparse.ArgumentParser()
259 parser.add_argument('--generatekey', action='store_true', dest='generate',
260 default=False,
261 help="Generates the encryption key based on the given either keytype or keylength")
263 parser.add_argument('--getkey', action='store_true', dest='get_key',
264 default=False, help="To get the key from the keystore based on the given key hash")
266 parser.add_argument('--getkeyhash', action='store_true', dest='get_key_hash',
267 default=False, help="To get the key hash from the keystore based on the given key")
269 parser.add_argument('--updatekeystore', action='store_true', dest='update_keystore',
270 default=False,
271 help="If needs to update the already existing key in the keystore pass the keyHash and new key")
273 parser.add_argument('--keytype', action='store', dest='key_type', default=None,
274 help='Type of the key: values expected weak or strong')
276 parser.add_argument('--keylength', action='store', default=None, type=int,
277 dest='key_length',
278 help='length of the encryption key in byte')
280 parser.add_argument('--keyhash', action='store', dest='key_hash', default=None,
281 help='Encryption key')
283 parser.add_argument('--key', action='store', dest='key', default=None,
284 help='Base64-encoded encryption key')
286 parser_input = parser.parse_args()
288 if parser_input.key:
289 parser_input.key = base64.b64decode(parser_input.key)
291 if parser_input.generate:
292 KeyManager(key_type=parser_input.key_type, key_length=parser_input.key_length).generate()
293 elif parser_input.get_key:
294 KeyManager(key_hash=parser_input.key_hash).get_key()
295 elif parser_input.get_key_hash:
296 KeyManager(key=parser_input.key).get_keyhash()
297 elif parser_input.update_keystore:
298 KeyManager(key_hash=parser_input.key_hash, key=parser_input.key).update_keystore()