Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python3 

2# 

3# Original work copyright (C) Citrix systems 

4# Modified work copyright (C) Vates SAS and XCP-ng community 

5# 

6# This program is free software; you can redistribute it and/or modify 

7# it under the terms of the GNU Lesser General Public License as published 

8# by the Free Software Foundation; version 2.1 only. 

9# 

10# This program is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU Lesser General Public License for more details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with this program; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 

18# 

19# CEPHFSSR: Based on FileSR, mounts ceph fs share 

20 

21import errno 

22import os 

23import syslog as _syslog 

24import xmlrpc.client 

25from syslog import syslog 

26 

27# careful with the import order here 

28# FileSR has a circular dependency: 

29# FileSR -> blktap2 -> lvutil -> EXTSR -> FileSR 

30# importing in this order seems to avoid triggering the issue. 

31import SR 

32import SRCommand 

33import FileSR 

34# end of careful 

35import cleanup 

36import util 

37import vhdutil 

38import xs_errors 

39from lock import Lock 

40 

41CAPABILITIES = ["SR_PROBE", "SR_UPDATE", 

42 "VDI_CREATE", "VDI_DELETE", "VDI_ATTACH", "VDI_DETACH", 

43 "VDI_UPDATE", "VDI_CLONE", "VDI_SNAPSHOT", "VDI_RESIZE", "VDI_MIRROR", 

44 "VDI_GENERATE_CONFIG", 

45 "VDI_RESET_ON_BOOT/2", "ATOMIC_PAUSE"] 

46 

47CONFIGURATION = [ 

48 ['server', 'Ceph server(s) (required, ex: "192.168.0.12" or "10.10.10.10,10.10.10.26")'], 

49 ['serverpath', 'Ceph FS path (required, ex: "/")'], 

50 ['serverport', 'ex: 6789'], 

51 ['options', 'Ceph FS client name, and secretfile (required, ex: "name=admin,secretfile=/etc/ceph/admin.secret")'] 

52] 

53 

54DRIVER_INFO = { 

55 'name': 'CephFS VHD', 

56 'description': 'SR plugin which stores disks as VHD files on a CephFS storage', 

57 'vendor': 'Vates SAS', 

58 'copyright': '(C) 2020 Vates SAS', 

59 'driver_version': '1.0', 

60 'required_api_version': '1.0', 

61 'capabilities': CAPABILITIES, 

62 'configuration': CONFIGURATION 

63} 

64 

65DRIVER_CONFIG = {"ATTACH_FROM_CONFIG_WITH_TAPDISK": True} 

66 

67# The mountpoint for the directory when performing an sr_probe. All probes 

68# are guaranteed to be serialised by xapi, so this single mountpoint is fine. 

69PROBE_MOUNTPOINT = os.path.join(SR.MOUNT_BASE, "probe") 

70 

71 

72class CephFSException(Exception): 

73 def __init__(self, errstr): 

74 self.errstr = errstr 

75 

76 

77# mountpoint = /var/run/sr-mount/CephFS/uuid 

78# linkpath = mountpoint/uuid - path to SR directory on share 

79# path = /var/run/sr-mount/uuid - symlink to SR directory on share 

80class CephFSSR(FileSR.FileSR): 

81 """Ceph file-based storage repository""" 

82 

83 DRIVER_TYPE = 'cephfs' 

84 

85 def handles(sr_type): 

86 # fudge, because the parent class (FileSR) checks for smb to alter its behavior 

87 return sr_type == CephFSSR.DRIVER_TYPE or sr_type == 'smb' 

88 

89 handles = staticmethod(handles) 

90 

91 def load(self, sr_uuid): 

92 if not self._is_ceph_available(): 

93 raise xs_errors.XenError( 

94 'SRUnavailable', 

95 opterr='ceph is not installed' 

96 ) 

97 

98 self.ops_exclusive = FileSR.OPS_EXCLUSIVE 

99 self.lock = Lock(vhdutil.LOCK_TYPE_SR, self.uuid) 

100 self.sr_vditype = SR.DEFAULT_TAP 

101 self.driver_config = DRIVER_CONFIG 

102 if 'server' not in self.dconf: 

103 raise xs_errors.XenError('ConfigServerMissing') 

104 self.remoteserver = self.dconf['server'] 

105 self.remotepath = self.dconf['serverpath'] 

106 # if serverport is not specified, use default 6789 

107 if 'serverport' not in self.dconf: 

108 self.remoteport = "6789" 

109 else: 

110 self.remoteport = self.dconf['serverport'] 

111 if self.sr_ref and self.session is not None: 

112 self.sm_config = self.session.xenapi.SR.get_sm_config(self.sr_ref) 

113 else: 

114 self.sm_config = self.srcmd.params.get('sr_sm_config') or {} 

115 self.mountpoint = os.path.join(SR.MOUNT_BASE, 'CephFS', sr_uuid) 

116 self.linkpath = os.path.join(self.mountpoint, sr_uuid or "") 

117 self.path = os.path.join(SR.MOUNT_BASE, sr_uuid) 

118 self._check_o_direct() 

119 

120 def checkmount(self): 

121 return util.ioretry(lambda: ((util.pathexists(self.mountpoint) and 

122 util.ismount(self.mountpoint)) and 

123 util.pathexists(self.path))) 

124 

125 def mount(self, mountpoint=None): 

126 """Mount the remote ceph export at 'mountpoint'""" 

127 if mountpoint is None: 

128 mountpoint = self.mountpoint 

129 elif not util.is_string(mountpoint) or mountpoint == "": 

130 raise CephFSException("mountpoint not a string object") 

131 

132 try: 

133 if not util.ioretry(lambda: util.isdir(mountpoint)): 

134 util.ioretry(lambda: util.makedirs(mountpoint)) 

135 except util.CommandException as inst: 

136 raise CephFSException("Failed to make directory: code is %d" % inst.code) 

137 

138 try: 

139 options = [] 

140 if 'options' in self.dconf: 

141 options.append(self.dconf['options']) 

142 if options: 

143 options = ['-o', ','.join(options)] 

144 command = ["mount", '-t', 'ceph', self.remoteserver+":"+self.remoteport+":"+self.remotepath, mountpoint] + options 

145 util.ioretry(lambda: util.pread(command), errlist=[errno.EPIPE, errno.EIO], maxretry=2, nofail=True) 

146 except util.CommandException as inst: 

147 syslog(_syslog.LOG_ERR, 'CephFS mount failed ' + inst.__str__()) 

148 raise CephFSException("mount failed with return code %d" % inst.code) 

149 

150 # Sanity check to ensure that the user has at least RO access to the 

151 # mounted share. Windows sharing and security settings can be tricky. 

152 try: 

153 util.listdir(mountpoint) 

154 except util.CommandException: 

155 try: 

156 self.unmount(mountpoint, True) 

157 except CephFSException: 

158 util.logException('CephFSSR.unmount()') 

159 raise CephFSException("Permission denied. Please check user privileges.") 

160 

161 def unmount(self, mountpoint, rmmountpoint): 

162 try: 

163 util.pread(["umount", mountpoint]) 

164 except util.CommandException as inst: 

165 raise CephFSException("umount failed with return code %d" % inst.code) 

166 if rmmountpoint: 

167 try: 

168 os.rmdir(mountpoint) 

169 except OSError as inst: 

170 raise CephFSException("rmdir failed with error '%s'" % inst.strerror) 

171 

172 def attach(self, sr_uuid): 

173 if not self.checkmount(): 

174 try: 

175 self.mount() 

176 os.symlink(self.linkpath, self.path) 

177 except CephFSException as exc: 

178 raise SR.SROSError(12, exc.errstr) 

179 self.attached = True 

180 

181 def probe(self): 

182 try: 

183 self.mount(PROBE_MOUNTPOINT) 

184 sr_list = filter(util.match_uuid, util.listdir(PROBE_MOUNTPOINT)) 

185 self.unmount(PROBE_MOUNTPOINT, True) 

186 except (util.CommandException, xs_errors.XenError): 

187 raise 

188 # Create a dictionary from the SR uuids to feed SRtoXML() 

189 sr_dict = {sr_uuid: {} for sr_uuid in sr_list} 

190 return util.SRtoXML(sr_dict) 

191 

192 def detach(self, sr_uuid): 

193 if not self.checkmount(): 

194 return 

195 util.SMlog("Aborting GC/coalesce") 

196 cleanup.abort(self.uuid) 

197 # Change directory to avoid unmount conflicts 

198 os.chdir(SR.MOUNT_BASE) 

199 self.unmount(self.mountpoint, True) 

200 os.unlink(self.path) 

201 self.attached = False 

202 

203 def create(self, sr_uuid, size): 

204 if self.checkmount(): 

205 raise SR.SROSError(113, 'CephFS mount point already attached') 

206 

207 try: 

208 self.mount() 

209 except CephFSException as exc: 

210 # noinspection PyBroadException 

211 try: 

212 os.rmdir(self.mountpoint) 

213 except: 

214 # we have no recovery strategy 

215 pass 

216 raise SR.SROSError(111, "CephFS mount error [opterr=%s]" % exc.errstr) 

217 

218 if util.ioretry(lambda: util.pathexists(self.linkpath)): 

219 if len(util.ioretry(lambda: util.listdir(self.linkpath))) != 0: 

220 self.detach(sr_uuid) 

221 raise xs_errors.XenError('SRExists') 

222 else: 

223 try: 

224 util.ioretry(lambda: util.makedirs(self.linkpath)) 

225 os.symlink(self.linkpath, self.path) 

226 except util.CommandException as inst: 

227 if inst.code != errno.EEXIST: 

228 try: 

229 self.unmount(self.mountpoint, True) 

230 except CephFSException: 

231 util.logException('CephFSSR.unmount()') 

232 raise SR.SROSError(116, 

233 "Failed to create CephFS SR. remote directory creation error: {}".format( 

234 os.strerror(inst.code))) 

235 self.detach(sr_uuid) 

236 

237 def delete(self, sr_uuid): 

238 # try to remove/delete non VDI contents first 

239 super(CephFSSR, self).delete(sr_uuid) 

240 try: 

241 if self.checkmount(): 

242 self.detach(sr_uuid) 

243 self.mount() 

244 if util.ioretry(lambda: util.pathexists(self.linkpath)): 

245 util.ioretry(lambda: os.rmdir(self.linkpath)) 

246 util.SMlog(str(self.unmount(self.mountpoint, True))) 

247 except util.CommandException as inst: 

248 self.detach(sr_uuid) 

249 if inst.code != errno.ENOENT: 

250 raise SR.SROSError(114, "Failed to remove CephFS mount point") 

251 

252 def vdi(self, uuid, loadLocked=False): 

253 return CephFSFileVDI(self, uuid) 

254 

255 @staticmethod 

256 def _is_ceph_available(): 

257 import distutils.spawn 

258 return distutils.spawn.find_executable('ceph') 

259 

260class CephFSFileVDI(FileSR.FileVDI): 

261 def attach(self, sr_uuid, vdi_uuid): 

262 if not hasattr(self, 'xenstore_data'): 

263 self.xenstore_data = {} 

264 

265 self.xenstore_data['storage-type'] = CephFSSR.DRIVER_TYPE 

266 

267 return super(CephFSFileVDI, self).attach(sr_uuid, vdi_uuid) 

268 

269 def generate_config(self, sr_uuid, vdi_uuid): 

270 util.SMlog("SMBFileVDI.generate_config") 

271 if not util.pathexists(self.path): 

272 raise xs_errors.XenError('VDIUnavailable') 

273 resp = {'device_config': self.sr.dconf, 

274 'sr_uuid': sr_uuid, 

275 'vdi_uuid': vdi_uuid, 

276 'sr_sm_config': self.sr.sm_config, 

277 'command': 'vdi_attach_from_config'} 

278 # Return the 'config' encoded within a normal XMLRPC response so that 

279 # we can use the regular response/error parsing code. 

280 config = xmlrpc.client.dumps(tuple([resp]), "vdi_attach_from_config") 

281 return xmlrpc.client.dumps((config,), "", True) 

282 

283 def attach_from_config(self, sr_uuid, vdi_uuid): 

284 try: 

285 if not util.pathexists(self.sr.path): 

286 self.sr.attach(sr_uuid) 

287 except: 

288 util.logException("SMBFileVDI.attach_from_config") 

289 raise xs_errors.XenError('SRUnavailable', 

290 opterr='Unable to attach from config') 

291 

292 

293if __name__ == '__main__': 293 ↛ 294line 293 didn't jump to line 294, because the condition on line 293 was never true

294 SRCommand.run(CephFSSR, DRIVER_INFO) 

295else: 

296 SR.registerSR(CephFSSR)