""" Zabbix module for ceph-mgr Collect statistics from Ceph cluster and every X seconds send data to a Zabbix server using the zabbix_sender executable. """ import json import errno from subprocess import Popen, PIPE from threading import Event from mgr_module import MgrModule def avg(data): return sum(data) / float(len(data)) class ZabbixSender(object): def __init__(self, sender, host, port, hostname,log): self.sender = sender self.host = host self.port = port self.hostname = hostname self.log = log def send(self, data): if len(data) == 0: return cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s', self.hostname, '-vv', '-i', '-'] proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) for key, value in data.items(): proc.stdin.write('{0} ceph.{1} {2}\n'.format(self.hostname, key, value)) stdout, stderr = proc.communicate() if proc.returncode != 0: raise RuntimeError('%s exited non-zero: %s' % (self.sender, stderr)) self.log.debug('Zabbix Sender: %s', stdout.rstrip()) class Module(MgrModule): run = False config = dict() ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2} config_keys = { 'zabbix_sender': '/usr/bin/zabbix_sender', 'zabbix_host': None, 'zabbix_port': 10051, 'identifier': None, 'interval': 60 } COMMANDS = [ { "cmd": "zabbix config-set name=key,type=CephString " "name=value,type=CephString", "desc": "Set a configuration value", "perm": "rw" }, { "cmd": "zabbix config-show", "desc": "Show current configuration", "perm": "r" }, { "cmd": "zabbix send", "desc": "Force sending data to Zabbux", "perm": "rw" }, { "cmd": "zabbix self-test", "desc": "Run a self-test on the Zabbix module", "perm": "r" } ] def __init__(self, *args, **kwargs): super(Module, self).__init__(*args, **kwargs) self.event = Event() def init_module_config(self): for key, default in self.config_keys.items(): value = self.get_localized_config(key, default) if value is None: raise RuntimeError('Configuration key {0} not set; "ceph ' 'config-key set mgr/zabbix/{0} ' '"'.format(key)) self.set_config_option(key, value) def set_config_option(self, option, value): if option not in self.config_keys.keys(): raise RuntimeError('{0} is a unknown configuration ' 'option'.format(option)) if option in ['zabbix_port', 'interval']: try: value = int(value) except (ValueError, TypeError): raise RuntimeError('invalid {0} configured. Please specify ' 'a valid integer'.format(option)) if option == 'interval' and value < 10: raise RuntimeError('interval should be set to at least 10 seconds') self.config[option] = value def get_data(self): data = dict() health = json.loads(self.get('health')['json']) # 'status' is luminous+, 'overall_status' is legacy mode. data['overall_status'] = health.get('status', health.get('overall_status')) data['overall_status_int'] = \ self.ceph_health_mapping.get(data['overall_status']) mon_status = json.loads(self.get('mon_status')['json']) data['num_mon'] = len(mon_status['monmap']['mons']) df = self.get('df') data['num_pools'] = len(df['pools']) data['total_objects'] = df['stats']['total_objects'] data['total_used_bytes'] = df['stats']['total_used_bytes'] data['total_bytes'] = df['stats']['total_bytes'] data['total_avail_bytes'] = df['stats']['total_avail_bytes'] wr_ops = 0 rd_ops = 0 wr_bytes = 0 rd_bytes = 0 for pool in df['pools']: wr_ops += pool['stats']['wr'] rd_ops += pool['stats']['rd'] wr_bytes += pool['stats']['wr_bytes'] rd_bytes += pool['stats']['rd_bytes'] data['wr_ops'] = wr_ops data['rd_ops'] = rd_ops data['wr_bytes'] = wr_bytes data['rd_bytes'] = rd_bytes osd_map = self.get('osd_map') data['num_osd'] = len(osd_map['osds']) data['osd_nearfull_ratio'] = osd_map['nearfull_ratio'] data['osd_full_ratio'] = osd_map['full_ratio'] data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio'] data['num_pg_temp'] = len(osd_map['pg_temp']) num_up = 0 num_in = 0 for osd in osd_map['osds']: if osd['up'] == 1: num_up += 1 if osd['in'] == 1: num_in += 1 data['num_osd_up'] = num_up data['num_osd_in'] = num_in osd_fill = list() osd_apply_latency = list() osd_commit_latency = list() osd_stats = self.get('osd_stats') for osd in osd_stats['osd_stats']: osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100) osd_apply_latency.append(osd['perf_stat']['apply_latency_ms']) osd_commit_latency.append(osd['perf_stat']['commit_latency_ms']) try: data['osd_max_fill'] = max(osd_fill) data['osd_min_fill'] = min(osd_fill) data['osd_avg_fill'] = avg(osd_fill) except ValueError: pass try: data['osd_latency_apply_max'] = max(osd_apply_latency) data['osd_latency_apply_min'] = min(osd_apply_latency) data['osd_latency_apply_avg'] = avg(osd_apply_latency) data['osd_latency_commit_max'] = max(osd_commit_latency) data['osd_latency_commit_min'] = min(osd_commit_latency) data['osd_latency_commit_avg'] = avg(osd_commit_latency) except ValueError: pass pg_summary = self.get('pg_summary') num_pg = 0 for state, num in pg_summary['all'].items(): num_pg += num data['num_pg'] = num_pg return data def send(self): data = self.get_data() self.log.debug('Sending data to Zabbix server %s', self.config['zabbix_host']) self.log.debug(data) try: zabbix = ZabbixSender(self.config['zabbix_sender'], self.config['zabbix_host'], self.config['zabbix_port'], self.config['identifier'], self.log) zabbix.send(data) except Exception as exc: self.log.error('Exception when sending: %s', exc) def handle_command(self, command): if command['prefix'] == 'zabbix config-show': return 0, json.dumps(self.config), '' elif command['prefix'] == 'zabbix config-set': key = command['key'] value = command['value'] if not value: return -errno.EINVAL, '', 'Value should not be empty or None' self.log.debug('Setting configuration option %s to %s', key, value) self.set_config_option(key, value) self.set_localized_config(key, value) return 0, 'Configuration option {0} updated'.format(key), '' elif command['prefix'] == 'zabbix send': self.send() return 0, 'Sending data to Zabbix', '' elif command['prefix'] == 'zabbix self-test': self.self_test() return 0, 'Self-test succeeded', '' else: return (-errno.EINVAL, '', "Command not found '{0}'".format(command['prefix'])) def shutdown(self): self.log.info('Stopping zabbix') self.run = False self.event.set() def serve(self): self.log.debug('Zabbix module starting up') self.run = True self.init_module_config() for key, value in self.config.items(): self.log.debug('%s: %s', key, value) while self.run: self.log.debug('Waking up for new iteration') # Sometimes fetching data fails, should be fixed by PR #16020 try: self.send() except Exception as exc: self.log.error(exc) interval = self.config['interval'] self.log.debug('Sleeping for %d seconds', interval) self.event.wait(interval) def self_test(self): data = self.get_data() if data['overall_status'] not in self.ceph_health_mapping: raise RuntimeError('No valid overall_status found in data') int(data['overall_status_int']) if data['num_mon'] < 1: raise RuntimeError('num_mon is smaller than 1')