Project

General

Profile

Bug #5876 ยป rbd.patch

Olivier Bonvalet, 10/28/2013 04:42 AM

View differences:

drivers/block/rbd.c
RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
};
static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
static LIST_HEAD(rbd_dev_list); /* devices */
static DEFINE_SPINLOCK(rbd_dev_list_lock);
......
if (removing)
return -ENOENT;
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
(void) get_device(&rbd_dev->dev);
set_device_ro(bdev, rbd_dev->mapping.read_only);
mutex_unlock(&ctl_mutex);
return 0;
}
......
spin_unlock_irq(&rbd_dev->lock);
rbd_assert(open_count_before > 0);
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
put_device(&rbd_dev->dev);
mutex_unlock(&ctl_mutex);
}
static const struct block_device_operations rbd_bd_ops = {
......
/*
* Initialize an rbd client instance. Success or not, this function
* consumes ceph_opts.
* consumes ceph_opts. Caller holds client_mutex.
*/
static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
{
......
kref_init(&rbdc->kref);
INIT_LIST_HEAD(&rbdc->node);
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
if (IS_ERR(rbdc->client))
goto out_mutex;
goto out_rbdc;
ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
ret = ceph_open_session(rbdc->client);
if (ret < 0)
goto out_err;
goto out_client;
spin_lock(&rbd_client_list_lock);
list_add_tail(&rbdc->node, &rbd_client_list);
spin_unlock(&rbd_client_list_lock);
mutex_unlock(&ctl_mutex);
dout("%s: rbdc %p\n", __func__, rbdc);
return rbdc;
out_err:
out_client:
ceph_destroy_client(rbdc->client);
out_mutex:
mutex_unlock(&ctl_mutex);
out_rbdc:
kfree(rbdc);
out_opt:
if (ceph_opts)
......
{
struct rbd_client *rbdc;
mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
rbdc = rbd_client_find(ceph_opts);
if (rbdc) /* using an existing client */
ceph_destroy_options(ceph_opts);
else
rbdc = rbd_client_create(ceph_opts);
mutex_unlock(&client_mutex);
return rbdc;
}
......
/* We won't fail any more, fill in the header */
down_write(&rbd_dev->header_rwsem);
if (first_time) {
header->object_prefix = object_prefix;
header->obj_order = ondisk->options.order;
......
if (rbd_dev->mapping.size != header->image_size)
rbd_dev->mapping.size = header->image_size;
up_write(&rbd_dev->header_rwsem);
return 0;
out_2big:
ret = -EIO;
......
buf = bvec_kmap_irq(bv, &flags);
memset(buf + remainder, 0,
bv->bv_len - remainder);
flush_dcache_page(bv->bv_page);
bvec_kunmap_irq(buf, &flags);
}
pos += bv->bv_len;
......
local_irq_save(flags);
kaddr = kmap_atomic(*page);
memset(kaddr + page_offset, 0, length);
flush_dcache_page(*page);
kunmap_atomic(kaddr);
local_irq_restore(flags);
......
rbd_segment_name_free(object_name);
if (!obj_request)
goto out_unwind;
/*
* set obj_request->img_request before creating the
* osd_request so that it gets the right snapc
*/
rbd_img_obj_request_add(img_request, obj_request);
if (type == OBJ_REQUEST_BIO) {
unsigned int clone_size;
......
obj_request->pages, length,
offset & ~PAGE_MASK, false, false);
/*
* set obj_request->img_request before formatting
* the osd_request so that it gets the right snapc
*/
rbd_img_obj_request_add(img_request, obj_request);
if (write_request)
rbd_osd_req_format_write(obj_request);
else
......
*/
orig_request = obj_request->obj_request;
obj_request->obj_request = NULL;
rbd_obj_request_put(orig_request);
rbd_assert(orig_request);
rbd_assert(orig_request->img_request);
......
if (!rbd_dev->parent_overlap) {
struct ceph_osd_client *osdc;
rbd_obj_request_put(orig_request);
osdc = &rbd_dev->rbd_client->client->osdc;
result = rbd_obj_request_submit(osdc, orig_request);
if (!result)
......
out:
if (orig_request->result)
rbd_obj_request_complete(orig_request);
rbd_obj_request_put(orig_request);
}
static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
......
(unsigned int)opcode);
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
rbd_obj_notify_ack(rbd_dev, notify_id);
}
......
int ret;
rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
down_write(&rbd_dev->header_rwsem);
mapping_size = rbd_dev->mapping.size;
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
if (rbd_dev->image_format == 1)
ret = rbd_dev_v1_header_info(rbd_dev);
else
......
/* If it's a mapped snapshot, validate its EXISTS flag */
rbd_exists_validate(rbd_dev);
mutex_unlock(&ctl_mutex);
up_write(&rbd_dev->header_rwsem);
if (mapping_size != rbd_dev->mapping.size) {
sector_t size;
......
if (ret < sizeof (size_buf))
return -ERANGE;
if (order)
if (order) {
*order = size_buf.order;
dout(" order %u", (unsigned int)*order);
}
*snap_size = le64_to_cpu(size_buf.size);
dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
(unsigned long long)snap_id, (unsigned int)*order,
dout(" snap_id 0x%016llx snap_size = %llu\n",
(unsigned long long)snap_id,
(unsigned long long)*snap_size);
return 0;
......
void *end;
u64 pool_id;
char *image_id;
u64 snap_id;
u64 overlap;
int ret;
......
(unsigned long long)pool_id, U32_MAX);
goto out_err;
}
parent_spec->pool_id = pool_id;
image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
if (IS_ERR(image_id)) {
ret = PTR_ERR(image_id);
goto out_err;
}
parent_spec->image_id = image_id;
ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
ceph_decode_64_safe(&p, end, snap_id, out_err);
ceph_decode_64_safe(&p, end, overlap, out_err);
if (overlap) {
rbd_spec_put(rbd_dev->parent_spec);
/*
* The parent won't change (except when the clone is
* flattened, already handled that). So we only need to
* record the parent spec we have not already done so.
*/
if (!rbd_dev->parent_spec) {
parent_spec->pool_id = pool_id;
parent_spec->image_id = image_id;
parent_spec->snap_id = snap_id;
rbd_dev->parent_spec = parent_spec;
parent_spec = NULL; /* rbd_dev now owns this */
rbd_dev->parent_overlap = overlap;
} else {
rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
}
/*
* We always update the parent overlap. If it's zero we
* treat it specially.
*/
rbd_dev->parent_overlap = overlap;
smp_mb();
if (!overlap) {
/* A null parent_spec indicates it's the initial probe */
if (parent_spec) {
/*
* The overlap has become zero, so the clone
* must have been resized down to 0 at some
* point. Treat this the same as a flatten.
*/
rbd_dev_parent_put(rbd_dev);
pr_info("%s: clone image now standalone\n",
rbd_dev->disk->disk_name);
} else {
/*
* For the initial probe, if we find the
* overlap is zero we just pretend there was
* no parent image.
*/
rbd_warn(rbd_dev, "ignoring parent of "
"clone with overlap 0\n");
}
}
out:
ret = 0;
......
bool first_time = rbd_dev->header.object_prefix == NULL;
int ret;
down_write(&rbd_dev->header_rwsem);
ret = rbd_dev_v2_image_size(rbd_dev);
if (ret)
goto out;
return ret;
if (first_time) {
ret = rbd_dev_v2_header_onetime(rbd_dev);
if (ret)
goto out;
return ret;
}
/*
......
ret = rbd_dev_v2_parent_info(rbd_dev);
if (ret)
goto out;
return ret;
/*
* Print a warning if this is the initial probe and
......
ret = rbd_dev_v2_snap_context(rbd_dev);
dout("rbd_dev_v2_snap_context returned %d\n", ret);
out:
up_write(&rbd_dev->header_rwsem);
return ret;
}
......
struct device *dev;
int ret;
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dev = &rbd_dev->dev;
dev->bus = &rbd_bus_type;
dev->type = &rbd_device_type;
......
dev_set_name(dev, "%d", rbd_dev->dev_id);
ret = device_register(dev);
mutex_unlock(&ctl_mutex);
return ret;
}
......
return (ssize_t)rc;
}
static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
{
struct list_head *tmp;
struct rbd_device *rbd_dev;
spin_lock(&rbd_dev_list_lock);
list_for_each(tmp, &rbd_dev_list) {
rbd_dev = list_entry(tmp, struct rbd_device, node);
if (rbd_dev->dev_id == dev_id) {
spin_unlock(&rbd_dev_list_lock);
return rbd_dev;
}
}
spin_unlock(&rbd_dev_list_lock);
return NULL;
}
static void rbd_dev_device_release(struct device *dev)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
......
size_t count)
{
struct rbd_device *rbd_dev = NULL;
int target_id;
struct list_head *tmp;
int dev_id;
unsigned long ul;
bool already = false;
int ret;
ret = strict_strtoul(buf, 10, &ul);
......
return ret;
/* convert to int; abort if we lost anything in the conversion */
target_id = (int) ul;
if (target_id != ul)
dev_id = (int)ul;
if (dev_id != ul)
return -EINVAL;
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
rbd_dev = __rbd_get_dev(target_id);
if (!rbd_dev) {
ret = -ENOENT;
goto done;
ret = -ENOENT;
spin_lock(&rbd_dev_list_lock);
list_for_each(tmp, &rbd_dev_list) {
rbd_dev = list_entry(tmp, struct rbd_device, node);
if (rbd_dev->dev_id == dev_id) {
ret = 0;
break;
}
}
if (!ret) {
spin_lock_irq(&rbd_dev->lock);
if (rbd_dev->open_count)
ret = -EBUSY;
else
already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
&rbd_dev->flags);
spin_unlock_irq(&rbd_dev->lock);
}
spin_unlock(&rbd_dev_list_lock);
if (ret < 0 || already)
return ret;
spin_lock_irq(&rbd_dev->lock);
if (rbd_dev->open_count)
ret = -EBUSY;
else
set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
spin_unlock_irq(&rbd_dev->lock);
if (ret < 0)
goto done;
rbd_bus_del_dev(rbd_dev);
ret = rbd_dev_header_watch_sync(rbd_dev, false);
if (ret)
rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
rbd_dev_image_release(rbd_dev);
module_put(THIS_MODULE);
ret = count;
done:
mutex_unlock(&ctl_mutex);
return ret;
return count;
}
/*
    (1-1/1)