Project

General

Profile

Bug #6072 ยป rbd_test.bash

Edward Hope-Morley, 08/21/2013 02:06 AM

 
#!/usr/bin/env python
import rados
import rbd
import uuid


class RADOSClient(object):
def __init__(self, driver, pool=None):
self.driver = driver
self.cluster, self.ioctx = driver._connect_to_rados(pool)

def __enter__(self):
return self

def __exit__(self, type_, value, traceback):
self.driver._disconnect_from_rados(self.cluster, self.ioctx)


class RBDCloneRenameTest(object):

def __init__(self, *args, **kwargs):
pass

def _connect_to_rados(self, pool=None):
ascii_user = str('cinder')
ascii_conf = str('/etc/ceph/ceph.conf')
client = rados.Rados(rados_id=ascii_user, conffile=ascii_conf)
try:
client.connect()
pool_to_open = 'volumes'
ioctx = client.open_ioctx(pool_to_open)
return client, ioctx
except rados.Error:
# shutdown cannot raise an exception
client.shutdown()
raise

def _disconnect_from_rados(self, client, ioctx):
# closing an ioctx cannot raise an exception
ioctx.close()
client.shutdown()

def _supports_layering(self):
return hasattr(rbd, 'RBD_FEATURE_LAYERING')

def run(self):
volume_name = "volume-%s" % (uuid.uuid4())
with RADOSClient(self) as client:
print "create vol"
rbd.RBD().create(client.ioctx,
volume_name,
1024**3,
old_format=False,
features=rbd.RBD_FEATURE_LAYERING)

with RADOSClient(self) as client:
print "creating snap and clone"

src_vol = volume_name
dest_vol = clone_name = "%s.clone" % (src_vol)
snap = "%s.snap" % (src_vol)

rbd_image = rbd.Image(client.ioctx, src_vol)
try:
print "create snap"
rbd_image.create_snap(snap)
print "protect snap"
rbd_image.protect_snap(snap)
except:
rbd_image.close()
raise

try:
rbd.RBD().clone(client.ioctx, src_vol,
snap,
client.ioctx, dest_vol,
features=rbd.RBD_FEATURE_LAYERING)
except:
print "unprotect snap"
rbd_image.unprotect_snap(snap)
print "delete snap"
rbd_image.remove_snap(snap)
finally:
rbd_image.close()

new_volume_name = "%s-0123456789ABCDEF10111213141516171819" % (volume_name)
with RADOSClient(self) as client:
rbd_image = rbd.Image(client.ioctx, clone_name)
try:
rbd_image.list_snaps()
rbd_image.set_snap(None)
print rbd_image.parent_info()
rbd_image.set_snap(None)
finally:
rbd_image.close()
print "renaming volume"
rbd.RBD().rename(client.ioctx, volume_name, new_volume_name)

with RADOSClient(self) as client:
rbd_image = rbd.Image(client.ioctx, clone_name)
try:
rbd_image.list_snaps()
rbd_image.set_snap(None)
print rbd_image.parent_info()
rbd_image.set_snap(None)
finally:
rbd_image.close()

print "deleting clone"
rbd.RBD().remove(client.ioctx, clone_name)

rbd_image = rbd.Image(client.ioctx, new_volume_name)
try:
print "unprotect snap"
rbd_image.unprotect_snap(snap)
print "delete snap"
rbd_image.remove_snap(snap)
finally:
rbd_image.close()

rbd.RBD().remove(client.ioctx, new_volume_name)

if __name__ == '__main__':
RBDCloneRenameTest().run()

    (1-1/1)