Project

General

Profile

Bug #9606 ยป ceph.py

Christina Meno, 09/26/2014 11:18 AM

 

from glob import glob
import hashlib
import os
import re
import socket
import subprocess
import tempfile
import time
import struct
import msgpack
import json

# Note: do not import ceph modules at this scope, otherwise this module won't be able
# to cleanly talk to us about systems where ceph isn't installed yet.

# We apply a timeout to librados communications, because otherwise a stuck mon
# would block our emission of heartbeat events
RADOS_TIMEOUT = 20

# FIXME: We probably can't assume that <clustername>.client.admin.keyring is always
# present, although this is the case on a nicely ceph-deploy'd system
RADOS_NAME = 'client.admin'


def fire_event(data, tag):
"""
Emit a salt event to the master
"""
__salt__['event.fire_master'](data, tag) # noqa


class MonitoringError(Exception):
pass


class RadosError(MonitoringError):
"""
Something went wrong talking to Ceph with librados
"""
pass


class AdminSocketError(MonitoringError):
"""
Something went wrong talking to Ceph with a /var/run/ceph socket.
"""
pass


def rados_command(cluster_handle, prefix, args=None, decode=True):
"""
Safer wrapper for ceph_argparse.json_command, which raises
Error exception instead of relying on caller to check return
codes.

Error exception can result from:
* Timeout
* Actual legitimate errors
* Malformed JSON output

return: Decoded object from ceph, or None if empty string returned.
If decode is False, return a string (the data returned by
ceph command)
"""
if args is None:
args = {}

argdict = args.copy()
argdict['format'] = 'json'

import rados
from ceph_argparse import json_command

ret, outbuf, outs = json_command(cluster_handle,
prefix=prefix,
argdict=argdict,
timeout=RADOS_TIMEOUT)
if ret != 0:
raise rados.Error(outs)
else:
if decode:
if outbuf:
try:
return json.loads(outbuf)
except (ValueError, TypeError):
raise RadosError("Invalid JSON output for command {0}".format(argdict))
else:
return None
else:
return outbuf


# This function borrowed from /usr/bin/ceph: we should
# get ceph's python code into site-packages so that we
# can borrow things like this.
def admin_socket(asok_path, cmd, fmt=''):
"""
Send a daemon (--admin-daemon) command 'cmd'. asok_path is the
path to the admin socket; cmd is a list of strings
"""

from ceph_argparse import parse_json_funcsigs, validate_command

def do_sockio(path, cmd):
""" helper: do all the actual low-level stream I/O """
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(path)
try:
sock.sendall(cmd + '\0')
len_str = sock.recv(4)
if len(len_str) < 4:
raise RuntimeError("no data returned from admin socket")
l, = struct.unpack(">I", len_str)
ret = ''

got = 0
while got < l:
bit = sock.recv(l - got)
ret += bit
got += len(bit)

except Exception as e:
raise AdminSocketError('exception: ' + str(e))
return ret

try:
cmd_json = do_sockio(asok_path,
json.dumps({"prefix": "get_command_descriptions"}))
except Exception as e:
raise AdminSocketError('exception getting command descriptions: ' + str(e))

if cmd == 'get_command_descriptions':
return cmd_json

sigdict = parse_json_funcsigs(cmd_json, 'cli')
valid_dict = validate_command(sigdict, cmd)
if not valid_dict:
raise AdminSocketError('invalid command')

if fmt:
valid_dict['format'] = fmt

try:
ret = do_sockio(asok_path, json.dumps(valid_dict))
except Exception as e:
raise AdminSocketError('exception: ' + str(e))

return ret


SYNC_TYPES = ['mon_status',
'mon_map',
'osd_map',
'mds_map',
'pg_summary',
'health',
'config']


def md5(raw):
hasher = hashlib.md5()
hasher.update(raw)
return hasher.hexdigest()


def pg_summary(pgs_brief):
"""
Convert an O(pg count) data structure into an O(osd count) digest listing
the number of PGs in each combination of states.
"""

osds = {}
pools = {}
all_pgs = {}
for pg in pgs_brief:
for osd in pg['acting']:
try:
osd_stats = osds[osd]
except KeyError:
osd_stats = {}
osds[osd] = osd_stats

try:
osd_stats[pg['state']] += 1
except KeyError:
osd_stats[pg['state']] = 1

pool = int(pg['pgid'].split('.')[0])
try:
pool_stats = pools[pool]
except KeyError:
pool_stats = {}
pools[pool] = pool_stats

try:
pool_stats[pg['state']] += 1
except KeyError:
pool_stats[pg['state']] = 1

try:
all_pgs[pg['state']] += 1
except KeyError:
all_pgs[pg['state']] = 1

return {
'by_osd': osds,
'by_pool': pools,
'all': all_pgs
}


def transform_crushmap(data, operation):
"""
Invokes crushtool to compile or de-compile data when operation == 'set' or 'get'
respectively
returns (0 on success, transformed crushmap, errors)
"""
# write data to a tempfile because crushtool can't handle stdin :(
with tempfile.NamedTemporaryFile(delete=True) as f:
f.write(data)
f.flush()

if operation == 'set':
args = ["crushtool", "-c", f.name, '-o', '/dev/stdout']
elif operation == 'get':
args = ["crushtool", "-d", f.name]
else:
return 1, '', 'Did not specify get or set'
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
return p.returncode, stdout, stderr


def rados_commands(fsid, cluster_name, commands):
"""
Passing in both fsid and cluster_name, because the caller
should always know both, and it saves this function the trouble
of looking up one from the other.
"""

import rados
from ceph_argparse import json_command

# Open a RADOS session
cluster_handle = rados.Rados(name=RADOS_NAME, clustername=cluster_name, conffile='')
cluster_handle.connect()

results = []

# Each command is a 2-tuple of a prefix followed by an argument dictionary
for i, (prefix, argdict) in enumerate(commands):
# argdict['format'] = 'json'
if prefix == 'osd setcrushmap':
ret, stdout, outs = transform_crushmap(argdict['data'], 'set')
if ret != 0:
raise RuntimeError(outs)
ret, outbuf, outs = json_command(cluster_handle, prefix=prefix, argdict={}, timeout=RADOS_TIMEOUT, inbuf=stdout)
else:
ret, outbuf, outs = json_command(cluster_handle, prefix=prefix, argdict=argdict, timeout=RADOS_TIMEOUT, verbose=True)
if ret != 0:
return {
'error': True,
'results': results,
'error_status': outs,
'versions': cluster_status(cluster_handle, cluster_name)['versions'],
'fsid': fsid
}
if outbuf:
results.append(json.loads(outbuf))
else:
results.append(None)

# For all RADOS commands, we include the cluster map versions
# in the response, so that the caller knows which versions to
# wait for in order to see the consequences of their actions.
# TODO: not all commands will require version info on completion, consider making
# this optional.
# TODO: we should endeavor to return something clean even if we can't talk to RADOS
# enough to get version info
versions = cluster_status(cluster_handle, cluster_name)['versions']

# Success
return {
'error': False,
'results': results,
'error_status': '',
'versions': versions,
'fsid': fsid
}


def ceph_command(cluster_name, command_args):
"""
Run a Ceph CLI operation directly. This is a fallback to allow
manual execution of arbitrary commands in case the user wants to
do something that is absent or broken in Calamari proper.

:param cluster_name: Ceph cluster name, or None to run without --cluster argument
:param command_args: Command line, excluding the leading 'ceph' part.
"""

if cluster_name:
args = ["ceph", "--cluster", cluster_name] + command_args
else:
args = ["ceph"] + command_args

p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
status = p.returncode

return {
'out': stdout,
'err': stderr,
'status': status
}


def _get_config(cluster_name):
"""
Given that a mon is running on this server, query its admin socket to get
the configuration dict.

:return JSON-encoded config object
"""

try:
mon_socket = glob("/var/run/ceph/{cluster_name}-mon.*.asok".format(cluster_name=cluster_name))[0]
except IndexError:
raise AdminSocketError("Cannot find mon socket for %s" % cluster_name)
config_response = admin_socket(mon_socket, ['config', 'show'], 'json')
return config_response


def get_cluster_object(cluster_name, sync_type, since):
# TODO: for the synced objects that support it, support
# fetching older-than-present versions to allow the master
# to backfill its history.

import rados
from ceph_argparse import json_command

# Check you're asking me for something I know how to give you
assert sync_type in SYNC_TYPES

# Open a RADOS session
cluster_handle = rados.Rados(name=RADOS_NAME, clustername=cluster_name, conffile='')
cluster_handle.connect()

ret, outbuf, outs = json_command(cluster_handle,
prefix='status',
argdict={'format': 'json'},
timeout=RADOS_TIMEOUT)
status = json.loads(outbuf)
fsid = status['fsid']

if sync_type == 'config':
# Special case for config, get this via admin socket instead of librados
raw = _get_config(cluster_name)
version = md5(raw)
data = json.loads(raw)
else:
command, kwargs, version_fn = {
'mon_status': ('mon_status', {}, lambda d, r: d['election_epoch']),
'mon_map': ('mon dump', {}, lambda d, r: d['epoch']),
'osd_map': ('osd dump', {}, lambda d, r: d['epoch']),
'mds_map': ('mds dump', {}, lambda d, r: d['epoch']),
'pg_summary': ('pg dump', {'dumpcontents': ['pgs_brief']}, lambda d, r: md5(msgpack.packb(d))),
'health': ('health', {'detail': ''}, lambda d, r: md5(r))
}[sync_type]
kwargs['format'] = 'json'
ret, raw, outs = json_command(cluster_handle, prefix=command, argdict=kwargs, timeout=RADOS_TIMEOUT)
assert ret == 0

if sync_type == 'pg_summary':
data = pg_summary(json.loads(raw))
version = version_fn(data, raw)
else:
data = json.loads(raw)
version = version_fn(data, raw)

# Internally, the OSDMap includes the CRUSH map, and the 'osd tree' output
# is generated from the OSD map. We synthesize a 'full' OSD map dump to
# send back to the calamari server.
if sync_type == 'osd_map':
ret, raw, outs = json_command(cluster_handle, prefix="osd tree", argdict={
'format': 'json',
'epoch': version
}, timeout=RADOS_TIMEOUT)
assert ret == 0
data['tree'] = json.loads(raw)
# FIXME: crush dump does not support an epoch argument, so this is potentially
# from a higher-versioned OSD map than the one we've just read
ret, raw, outs = json_command(cluster_handle, prefix="osd crush dump", argdict=kwargs,
timeout=RADOS_TIMEOUT)
assert ret == 0
data['crush'] = json.loads(raw)

ret, raw, outs = json_command(cluster_handle, prefix="osd getcrushmap", argdict={'epoch': version},
timeout=RADOS_TIMEOUT)
assert ret == 0

ret, stdout, outs = transform_crushmap(raw, 'get')
assert ret == 0
data['crush_map_text'] = stdout


return {
'type': sync_type,
'fsid': fsid,
'version': version,
'data': data
}


def get_boot_time():
"""
Retrieve the 'btime' line from /proc/stat

:return integer, seconds since epoch at which system booted
"""
data = open('/proc/stat').read()
return int(re.search('^btime (\d+)$', data, re.MULTILINE).group(1))


def get_heartbeats():
"""
The goal here is *not* to give a helpful summary of
the cluster status, rather it is to give the minimum
amount if information to let an informed master decide
whether it needs to ask us for any additional information,
such as updated copies of the cluster maps.

Enumerate Ceph services running locally, for each report
its FSID, type and ID.

If a mon is running here, do some extra work:

- Report the mapping of cluster name to FSID from /etc/ceph/<cluster name>.conf
- For all clusters, report the latest versions of all cluster maps.

:return A 2-tuple of dicts for services, clusters

"""

try:
import rados
except ImportError:
# Ceph isn't installed, report no services or clusters
server_heartbeat = {
'services': {},
'boot_time': get_boot_time(),
'ceph_version': None
}
return server_heartbeat, {}

# Map of FSID to path string string
mon_sockets = {}
# FSID string to cluster name string
fsid_names = {}
# Service name to service dict
services = {}

# For each admin socket, try to interrogate the service
for filename in glob("/var/run/ceph/*.asok"):
try:
service_data = service_status(filename)
except (rados.Error, MonitoringError):
# Failed to get info for this service, stale socket or unresponsive,
# exclude it from report
pass
else:
service_name = "%s-%s.%s" % (service_data['cluster'], service_data['type'], service_data['id'])

services[service_name] = service_data
fsid_names[service_data['fsid']] = service_data['cluster']

if service_data['type'] == 'mon' and service_data['status']['rank'] in service_data['status']['quorum']:
# A mon in quorum is elegible to emit a cluster heartbeat
mon_sockets[service_data['fsid']] = filename

# Installed Ceph version (as oppose to per-service running ceph version)
ceph_version_str = __salt__['pkg.version']('ceph') # noqa
if ceph_version_str:
ceph_version = ceph_version_str
else:
ceph_version = None

# For each ceph cluster with an in-quorum mon on this node, interrogate the cluster
cluster_heartbeat = {}
for fsid, socket_path in mon_sockets.items():
try:
cluster_handle = rados.Rados(name=RADOS_NAME, clustername=fsid_names[fsid], conffile='')
cluster_handle.connect()
cluster_heartbeat[fsid] = cluster_status(cluster_handle, fsid_names[fsid])
except (rados.Error, MonitoringError):
# Something went wrong getting data for this cluster, exclude it from our report
pass

server_heartbeat = {
'services': services,
'boot_time': get_boot_time(),
'ceph_version': ceph_version
}

return server_heartbeat, cluster_heartbeat


def service_status(socket_path):
"""
Given an admin socket path, learn all we can about that service
"""
cluster_name, service_type, service_id = re.match("^(.+)-(mon|osd|mds)\.(.+)\.asok$", os.path.basename(socket_path)).groups()
# Interrogate the service for its FSID
config = json.loads(admin_socket(socket_path, ['config', 'get', 'fsid'], 'json'))
fsid = config['fsid']

status = None
if service_type == 'mon':
# For mons, we send some extra info here, because if they're out
# of quorum we may not find out from the cluster heartbeats, so
# need to use the service heartbeats to detect that.
status = json.loads(admin_socket(socket_path, ['mon_status'], 'json'))

version_response = admin_socket(socket_path, ['version'], 'json')
if version_response is not None:
service_version = json.loads(version_response)['version']
else:
service_version = None

return {
'cluster': cluster_name,
'type': service_type,
'id': service_id,
'fsid': fsid,
'status': status,
'version': service_version
}


def cluster_status(cluster_handle, cluster_name):
"""
Get a summary of the status of a ceph cluster, especially
the versions of the cluster maps.
"""
# Get map versions from 'status'
mon_status = rados_command(cluster_handle, "mon_status")
status = rados_command(cluster_handle, "status")

fsid = status['fsid']
mon_epoch = status['monmap']['epoch']
osd_epoch = status['osdmap']['osdmap']['epoch']
mds_epoch = status['mdsmap']['epoch']

# FIXME: even on a healthy system, 'health detail' contains some statistics
# that change on their own, such as 'last_updated' and the mon space usage.
# FIXME: because we're including the part with time skew data, this changes
# all the time, should just skip that part.
# Get digest of health
health_digest = md5(rados_command(cluster_handle, "health", args={'detail': ''}, decode=False))

# Get digest of brief pg info
pgs_brief = rados_command(cluster_handle, "pg dump", args={'dumpcontents': ['pgs_brief']})
pg_summary_digest = md5(msgpack.packb(pg_summary(pgs_brief)))

# Get digest of configuration
config_digest = md5(_get_config(cluster_name))

return {
'name': cluster_name,
'fsid': fsid,
'versions': {
'mon_status': mon_status['election_epoch'],
'mon_map': mon_epoch,
'osd_map': osd_epoch,
'mds_map': mds_epoch,
'pg_summary': pg_summary_digest,
'health': health_digest,
'config': config_digest
}
}


def selftest_wait(period):
"""
For self-test only. Wait for the specified period and then return None.
"""
time.sleep(period)


def selftest_block():
"""
For self-test only. Run forever
"""
while True:
time.sleep(1)


def selftest_exception():
"""
For self-test only. Throw an exception
"""
raise RuntimeError("This is a self-test exception")


def _heartbeat():
"""
Send an event to the master with the terse status
"""
service_heartbeat, cluster_heartbeat = get_heartbeats()

fire_event(service_heartbeat, 'ceph/server')
for fsid, cluster_data in cluster_heartbeat.items():
fire_event(cluster_data, 'ceph/cluster/{0}'.format(fsid))

# Return the emitted data because it's useful if debugging with salt-call
return service_heartbeat, cluster_heartbeat


def heartbeat():
try:
_heartbeat()
except:
# Swallow exceptions to work around saltstack issue #11919 in
# salt 2014.1.1. If we emitted exceptions then it could cause
# our scheduled task to stop execution. Remove this behaviour
# once the issue is fixed upstream and we are using a more
# recent salt in calamari.
pass

if __name__ == '__main__':
import uuid
bucket_name = str(uuid.uuid1())
print rados_commands(12345,
'ceph',
[['osd crush add-bucket', {'type': 'host', 'name': bucket_name, 'format': 'json'}]])

print rados_commands(12345,
'ceph',
[['osd crush add', {'args': ['host={bucket_name}'.format(bucket_name=bucket_name)],
'id': '1', # this would be fine as an int
'weight': 0.0,
'format': 'json'
}
]])
    (1-1/1)