Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python3 

2# 

3# Original work copyright (C) Citrix systems 

4# Modified work copyright (C) Vates SAS and XCP-ng community 

5# 

6# This program is free software; you can redistribute it and/or modify 

7# it under the terms of the GNU Lesser General Public License as published 

8# by the Free Software Foundation; version 2.1 only. 

9# 

10# This program is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU Lesser General Public License for more details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with this program; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 

18# 

19# CEPHFSSR: Based on FileSR, mounts ceph fs share 

20 

21import errno 

22import os 

23import socket 

24import syslog as _syslog 

25import xmlrpc.client 

26from syslog import syslog 

27 

28# careful with the import order here 

29# FileSR has a circular dependency: 

30# FileSR -> blktap2 -> lvutil -> EXTSR -> FileSR 

31# importing in this order seems to avoid triggering the issue. 

32import SR 

33import SRCommand 

34import FileSR 

35# end of careful 

36import cleanup 

37import util 

38import vhdutil 

39import xs_errors 

40from lock import Lock 

41 

42CAPABILITIES = ["SR_PROBE", "SR_UPDATE", 

43 "VDI_CREATE", "VDI_DELETE", "VDI_ATTACH", "VDI_DETACH", 

44 "VDI_UPDATE", "VDI_CLONE", "VDI_SNAPSHOT", "VDI_RESIZE", "VDI_MIRROR", 

45 "VDI_GENERATE_CONFIG", 

46 "VDI_RESET_ON_BOOT/2", "ATOMIC_PAUSE"] 

47 

48CONFIGURATION = [ 

49 ['server', 'Ceph server(s) (required, ex: "192.168.0.12" or "10.10.10.10,10.10.10.26")'], 

50 ['serverpath', 'Ceph FS path (required, ex: "/")'], 

51 ['serverport', 'ex: 6789'], 

52 ['options', 'Ceph FS client name, and secretfile (required, ex: "name=admin,secretfile=/etc/ceph/admin.secret")'] 

53] 

54 

55DRIVER_INFO = { 

56 'name': 'CephFS VHD', 

57 'description': 'SR plugin which stores disks as VHD files on a CephFS storage', 

58 'vendor': 'Vates SAS', 

59 'copyright': '(C) 2020 Vates SAS', 

60 'driver_version': '1.0', 

61 'required_api_version': '1.0', 

62 'capabilities': CAPABILITIES, 

63 'configuration': CONFIGURATION 

64} 

65 

66DRIVER_CONFIG = {"ATTACH_FROM_CONFIG_WITH_TAPDISK": True} 

67 

68# The mountpoint for the directory when performing an sr_probe. All probes 

69# are guaranteed to be serialised by xapi, so this single mountpoint is fine. 

70PROBE_MOUNTPOINT = os.path.join(SR.MOUNT_BASE, "probe") 

71 

72 

73class CephFSException(Exception): 

74 def __init__(self, errstr): 

75 self.errstr = errstr 

76 

77 

78# mountpoint = /var/run/sr-mount/CephFS/uuid 

79# linkpath = mountpoint/uuid - path to SR directory on share 

80# path = /var/run/sr-mount/uuid - symlink to SR directory on share 

81class CephFSSR(FileSR.FileSR): 

82 """Ceph file-based storage repository""" 

83 

84 DRIVER_TYPE = 'cephfs' 

85 

86 def handles(sr_type): 

87 # fudge, because the parent class (FileSR) checks for smb to alter its behavior 

88 return sr_type == CephFSSR.DRIVER_TYPE or sr_type == 'smb' 

89 

90 handles = staticmethod(handles) 

91 

92 def load(self, sr_uuid): 

93 if not self._is_ceph_available(): 

94 raise xs_errors.XenError( 

95 'SRUnavailable', 

96 opterr='ceph is not installed' 

97 ) 

98 

99 self.ops_exclusive = FileSR.OPS_EXCLUSIVE 

100 self.lock = Lock(vhdutil.LOCK_TYPE_SR, self.uuid) 

101 self.sr_vditype = SR.DEFAULT_TAP 

102 self.driver_config = DRIVER_CONFIG 

103 if 'server' not in self.dconf: 

104 raise xs_errors.XenError('ConfigServerMissing') 

105 self.remoteserver = self.dconf['server'] 

106 self.remotepath = self.dconf['serverpath'] 

107 # if serverport is not specified, use default 6789 

108 if 'serverport' not in self.dconf: 

109 self.remoteport = "6789" 

110 else: 

111 self.remoteport = self.dconf['serverport'] 

112 if self.sr_ref and self.session is not None: 

113 self.sm_config = self.session.xenapi.SR.get_sm_config(self.sr_ref) 

114 else: 

115 self.sm_config = self.srcmd.params.get('sr_sm_config') or {} 

116 self.mountpoint = os.path.join(SR.MOUNT_BASE, 'CephFS', sr_uuid) 

117 self.linkpath = os.path.join(self.mountpoint, sr_uuid or "") 

118 self.path = os.path.join(SR.MOUNT_BASE, sr_uuid) 

119 self._check_o_direct() 

120 

121 def checkmount(self): 

122 return util.ioretry(lambda: ((util.pathexists(self.mountpoint) and 

123 util.ismount(self.mountpoint)) and 

124 util.pathexists(self.path))) 

125 

126 def mount(self, mountpoint=None): 

127 """Mount the remote ceph export at 'mountpoint'""" 

128 if mountpoint is None: 

129 mountpoint = self.mountpoint 

130 elif not util.is_string(mountpoint) or mountpoint == "": 

131 raise CephFSException("mountpoint not a string object") 

132 

133 try: 

134 if not util.ioretry(lambda: util.isdir(mountpoint)): 

135 util.ioretry(lambda: util.makedirs(mountpoint)) 

136 except util.CommandException as inst: 

137 raise CephFSException("Failed to make directory: code is %d" % inst.code) 

138 

139 try: 

140 options = [] 

141 if 'options' in self.dconf: 

142 options.append(self.dconf['options']) 

143 if options: 

144 options = ['-o', ','.join(options)] 

145 acc = [] 

146 for server in self.remoteserver.split(','): 

147 try: 

148 addr_info = socket.getaddrinfo(server, 0)[0] 

149 except Exception: 

150 continue 

151 

152 acc.append('[' + server + ']' if addr_info[0] == socket.AF_INET6 else server) 

153 

154 remoteserver = ','.join(acc) 

155 command = ["mount", '-t', 'ceph', remoteserver + ":" + self.remoteport + ":" + self.remotepath, mountpoint] + options 

156 util.ioretry(lambda: util.pread(command), errlist=[errno.EPIPE, errno.EIO], maxretry=2, nofail=True) 

157 except util.CommandException as inst: 

158 syslog(_syslog.LOG_ERR, 'CephFS mount failed ' + inst.__str__()) 

159 raise CephFSException("mount failed with return code %d" % inst.code) 

160 

161 # Sanity check to ensure that the user has at least RO access to the 

162 # mounted share. Windows sharing and security settings can be tricky. 

163 try: 

164 util.listdir(mountpoint) 

165 except util.CommandException: 

166 try: 

167 self.unmount(mountpoint, True) 

168 except CephFSException: 

169 util.logException('CephFSSR.unmount()') 

170 raise CephFSException("Permission denied. Please check user privileges.") 

171 

172 def unmount(self, mountpoint, rmmountpoint): 

173 try: 

174 util.pread(["umount", mountpoint]) 

175 except util.CommandException as inst: 

176 raise CephFSException("umount failed with return code %d" % inst.code) 

177 if rmmountpoint: 

178 try: 

179 os.rmdir(mountpoint) 

180 except OSError as inst: 

181 raise CephFSException("rmdir failed with error '%s'" % inst.strerror) 

182 

183 def attach(self, sr_uuid): 

184 if not self.checkmount(): 

185 try: 

186 self.mount() 

187 os.symlink(self.linkpath, self.path) 

188 except CephFSException as exc: 

189 raise SR.SROSError(12, exc.errstr) 

190 self.attached = True 

191 

192 def probe(self): 

193 try: 

194 self.mount(PROBE_MOUNTPOINT) 

195 sr_list = filter(util.match_uuid, util.listdir(PROBE_MOUNTPOINT)) 

196 self.unmount(PROBE_MOUNTPOINT, True) 

197 except (util.CommandException, xs_errors.XenError): 

198 raise 

199 # Create a dictionary from the SR uuids to feed SRtoXML() 

200 sr_dict = {sr_uuid: {} for sr_uuid in sr_list} 

201 return util.SRtoXML(sr_dict) 

202 

203 def detach(self, sr_uuid): 

204 if not self.checkmount(): 

205 return 

206 util.SMlog("Aborting GC/coalesce") 

207 cleanup.abort(self.uuid) 

208 # Change directory to avoid unmount conflicts 

209 os.chdir(SR.MOUNT_BASE) 

210 self.unmount(self.mountpoint, True) 

211 os.unlink(self.path) 

212 self.attached = False 

213 

214 def create(self, sr_uuid, size): 

215 if self.checkmount(): 

216 raise SR.SROSError(113, 'CephFS mount point already attached') 

217 

218 try: 

219 self.mount() 

220 except CephFSException as exc: 

221 # noinspection PyBroadException 

222 try: 

223 os.rmdir(self.mountpoint) 

224 except: 

225 # we have no recovery strategy 

226 pass 

227 raise SR.SROSError(111, "CephFS mount error [opterr=%s]" % exc.errstr) 

228 

229 if util.ioretry(lambda: util.pathexists(self.linkpath)): 

230 if len(util.ioretry(lambda: util.listdir(self.linkpath))) != 0: 

231 self.detach(sr_uuid) 

232 raise xs_errors.XenError('SRExists') 

233 else: 

234 try: 

235 util.ioretry(lambda: util.makedirs(self.linkpath)) 

236 os.symlink(self.linkpath, self.path) 

237 except util.CommandException as inst: 

238 if inst.code != errno.EEXIST: 

239 try: 

240 self.unmount(self.mountpoint, True) 

241 except CephFSException: 

242 util.logException('CephFSSR.unmount()') 

243 raise SR.SROSError(116, 

244 "Failed to create CephFS SR. remote directory creation error: {}".format( 

245 os.strerror(inst.code))) 

246 self.detach(sr_uuid) 

247 

248 def delete(self, sr_uuid): 

249 # try to remove/delete non VDI contents first 

250 super(CephFSSR, self).delete(sr_uuid) 

251 try: 

252 if self.checkmount(): 

253 self.detach(sr_uuid) 

254 self.mount() 

255 if util.ioretry(lambda: util.pathexists(self.linkpath)): 

256 util.ioretry(lambda: os.rmdir(self.linkpath)) 

257 util.SMlog(str(self.unmount(self.mountpoint, True))) 

258 except util.CommandException as inst: 

259 self.detach(sr_uuid) 

260 if inst.code != errno.ENOENT: 

261 raise SR.SROSError(114, "Failed to remove CephFS mount point") 

262 

263 def vdi(self, uuid, loadLocked=False): 

264 return CephFSFileVDI(self, uuid) 

265 

266 @staticmethod 

267 def _is_ceph_available(): 

268 import distutils.spawn 

269 return distutils.spawn.find_executable('ceph') 

270 

271class CephFSFileVDI(FileSR.FileVDI): 

272 def attach(self, sr_uuid, vdi_uuid): 

273 if not hasattr(self, 'xenstore_data'): 

274 self.xenstore_data = {} 

275 

276 self.xenstore_data['storage-type'] = CephFSSR.DRIVER_TYPE 

277 

278 return super(CephFSFileVDI, self).attach(sr_uuid, vdi_uuid) 

279 

280 def generate_config(self, sr_uuid, vdi_uuid): 

281 util.SMlog("SMBFileVDI.generate_config") 

282 if not util.pathexists(self.path): 

283 raise xs_errors.XenError('VDIUnavailable') 

284 resp = {'device_config': self.sr.dconf, 

285 'sr_uuid': sr_uuid, 

286 'vdi_uuid': vdi_uuid, 

287 'sr_sm_config': self.sr.sm_config, 

288 'command': 'vdi_attach_from_config'} 

289 # Return the 'config' encoded within a normal XMLRPC response so that 

290 # we can use the regular response/error parsing code. 

291 config = xmlrpc.client.dumps(tuple([resp]), "vdi_attach_from_config") 

292 return xmlrpc.client.dumps((config,), "", True) 

293 

294 def attach_from_config(self, sr_uuid, vdi_uuid): 

295 try: 

296 if not util.pathexists(self.sr.path): 

297 self.sr.attach(sr_uuid) 

298 except: 

299 util.logException("SMBFileVDI.attach_from_config") 

300 raise xs_errors.XenError('SRUnavailable', 

301 opterr='Unable to attach from config') 

302 

303 

304if __name__ == '__main__': 304 ↛ 305line 304 didn't jump to line 305, because the condition on line 304 was never true

305 SRCommand.run(CephFSSR, DRIVER_INFO) 

306else: 

307 SR.registerSR(CephFSSR)