Bug #12697 ยป docker-volume-ceph-rbd.diff
driver.go | ||
---|---|---|
import (
|
||
"fmt"
|
||
"io/ioutil"
|
||
"log"
|
||
"os"
|
||
"os/exec"
|
||
"path/filepath"
|
||
... | ... | |
"github.com/noahdesu/go-ceph/rados"
|
||
"github.com/noahdesu/go-ceph/rbd"
|
||
log "github.com/Sirupsen/logrus"
|
||
)
|
||
type volume struct {
|
||
... | ... | |
m sync.Mutex
|
||
}
|
||
var rbd_pool string = "rbd"
|
||
func newCephDriver(root, pool string) cephDriver {
|
||
d := cephDriver{
|
||
root: root,
|
||
pool: pool,
|
||
volumes: map[string]*volume{},
|
||
}
|
||
rbd_pool = pool
|
||
log.Printf("pool %s", pool)
|
||
return d
|
||
}
|
||
... | ... | |
out, err := exec.Command("blkid", device).Output()
|
||
if out != nil {
|
||
sout := strings.TrimRight(string(out), "\n")
|
||
log.Printf("checkFs:%s", string(out))
|
||
return sout, err
|
||
}
|
||
return "", err
|
||
... | ... | |
log.Printf("ensureFs(%q)", name)
|
||
device, err := rbdMap(name)
|
||
if err != nil {
|
||
log.Printf("ensureFs rbdmap:%s", err.Error())
|
||
return dkvolume.Response{Err: err.Error()}
|
||
}
|
||
defer rbdUnmap(*device)
|
||
... | ... | |
log.Printf("t => %q", t)
|
||
if !strings.HasSuffix(t, "TYPE=\"xfs\"") {
|
||
log.Printf("Formatting %s", *device)
|
||
out, _, err := sh(fmt.Sprintf("mkfs.xfs -f %s", *device))
|
||
out, _, err := sh(fmt.Sprintf("mkfs.xfs -f -i size=2048 %s", *device))
|
||
if err != nil {
|
||
log.Printf("Formatting error:%s", err.Error())
|
||
return dkvolume.Response{Err: err.Error()}
|
||
}
|
||
log.Printf(string(out))
|
||
... | ... | |
conn.Connect()
|
||
defer conn.Shutdown()
|
||
ioContext, err := conn.OpenIOContext("rbd")
|
||
ioContext, err := conn.OpenIOContext(rbd_pool)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
return false, err
|
||
... | ... | |
log.Printf("Create(%q)", r.Name)
|
||
d.m.Lock()
|
||
defer d.m.Unlock()
|
||
name := r.Name
|
||
name := r.Name
|
||
size := r.Size
|
||
log.Printf("create %s, size %s, pool %s\n", name, size, rbd_pool)
|
||
mountpoint := d.mountpoint(name)
|
||
if _, ok := d.volumes[mountpoint]; ok {
|
||
... | ... | |
exists, err := rbdExists(name)
|
||
if err != nil {
|
||
log.Printf("rbdExist error:%s", err.Error())
|
||
return dkvolume.Response{Err: err.Error()}
|
||
}
|
||
if !exists {
|
||
if out, _, err := sh(fmt.Sprintf("rbd create --size 128 %s", name)); err != nil {
|
||
if out, _, err := sh(fmt.Sprintf("rbd create --size %s %s -p %s", size, name, rbd_pool)); err != nil {
|
||
log.Print(string(out))
|
||
return dkvolume.Response{Err: err.Error()}
|
||
}
|
||
} else {
|
||
log.Print("rbd exist")
|
||
}
|
||
return ensureFs(name)
|
||
//return ensureFs(name)
|
||
return dkvolume.Response{}
|
||
}
|
||
func (d cephDriver) Remove(r dkvolume.Request) dkvolume.Response {
|
||
... | ... | |
if volume, ok := d.volumes[mountpoint]; ok {
|
||
if volume.connections <= 1 {
|
||
cmd := fmt.Sprintf("rbd rm %s", name)
|
||
log.Printf("rbd rm %s -p %s", name, rbd_pool)
|
||
cmd := fmt.Sprintf("rbd rm %s -p %s", name, rbd_pool)
|
||
if out, _, err := sh(cmd); err != nil {
|
||
log.Print(string(out))
|
||
return dkvolume.Response{Err: err.Error()}
|
||
... | ... | |
}
|
||
func (d cephDriver) Mount(r dkvolume.Request) dkvolume.Response {
|
||
|
||
//log.Printf("ensureFs %s in mount", r.Name)
|
||
//ensureFs(r.Name)
|
||
log.Printf("Mount(%q)", r.Name)
|
||
d.m.Lock()
|
||
defer d.m.Unlock()
|
||
... | ... | |
if os.IsNotExist(err) {
|
||
if err := os.MkdirAll(mountpoint, 0755); err != nil {
|
||
log.Printf("mkdir for %v error %s", mountpoint, err.Error())
|
||
return dkvolume.Response{Err: err.Error()}
|
||
}
|
||
} else if err != nil {
|
||
log.Printf("os.lstat %v error %s", mountpoint, err.Error())
|
||
return dkvolume.Response{Err: err.Error()}
|
||
}
|
||
if fi != nil && !fi.IsDir() {
|
||
log.Printf("%v already exist and it's not a directory", mountpoint)
|
||
return dkvolume.Response{Err: fmt.Sprintf("%v already exist and it's not a directory", mountpoint)}
|
||
}
|
||
device, err := d.mountVolume(r.Name, mountpoint);
|
||
if err != nil {
|
||
log.Printf("mount %v for %s error %s", mountpoint, r.Name, err.Error())
|
||
return dkvolume.Response{Err: err.Error()}
|
||
}
|
||
... | ... | |
}
|
||
volume.connections--
|
||
} else {
|
||
log.Printf("Unable to find volume mounted on %s", mountpoint)
|
||
return dkvolume.Response{Err: fmt.Sprintf("Unable to find volume mounted on %s", mountpoint)}
|
||
}
|
||
... | ... | |
func rbdMap(name string) (*string, error) {
|
||
log.Printf("rbdMap(%q)", name)
|
||
out, _, err := sh(fmt.Sprintf("rbd map %s", name))
|
||
out, _, err := sh(fmt.Sprintf("rbd map %s -p %s", name, rbd_pool))
|
||
if err != nil {
|
||
log.Print(string(out))
|
||
log.Print("rbdmap error %s", string(out))
|
||
return nil, err
|
||
}
|
||
device := strings.TrimRight(string(out), "\n")
|
||
... | ... | |
func rbdUnmap(device string) error {
|
||
log.Printf("rbdUnmap(%q)", device)
|
||
if out, _, err := sh(fmt.Sprintf("rbd unmap %s", device)); err != nil {
|
||
log.Print(string(out))
|
||
log.Print("rbdunmap error %s", string(out))
|
||
return err
|
||
}
|
||
return nil
|
||
... | ... | |
return nil, err
|
||
}
|
||
log.Printf("rbdMap(%q) => %q", name, *device)
|
||
if out, _, err := sh(fmt.Sprintf("mount %s %s", *device, target)); err != nil {
|
||
log.Print(string(out))
|
||
t, err := checkFs(*device)
|
||
log.Printf("t => %q", t)
|
||
if !strings.HasSuffix(t, "TYPE=\"xfs\"") {
|
||
log.Printf("Formatting %s", *device)
|
||
out, _, err := sh(fmt.Sprintf("mkfs.xfs -f -i size=2048 %s", *device))
|
||
if err != nil {
|
||
log.Printf("Formatting error:%s", err.Error())
|
||
return nil, err
|
||
}
|
||
log.Printf(string(out))
|
||
}
|
||
if out, _, err := sh(fmt.Sprintf("mount -o rw,noatime,attr2,inode64 %s %s", *device, target)); err != nil {
|
||
log.Print("mount error %s", string(out))
|
||
return nil, err
|
||
}
|
||
return device, nil
|
main.go | ||
---|---|---|
const (
|
||
cephId = "_ceph"
|
||
socketAddress = "/usr/share/docker/plugins/ceph.sock"
|
||
socketAddress = "/run/docker/plugins/ceph.sock"
|
||
)
|
||
var (
|