Project

General

Profile

Revision 30e26cab

ID30e26cab90e77c648fd72eeff1bed01c0eb0c49b
Parent 9a496335
Child 8105bb2b

Added by John Spray about 10 years ago

cthulhu: Issue multiple smaller PG create commands

This is in order to avoiding hitting the mon_osd_max_split_count
limit introduced in Firefly.

Fixes: #7290

View differences:

cthulhu/cthulhu/manager/pool_request_factory.py
1 1
from cthulhu.log import log
2 2
from cthulhu.manager.request_factory import RequestFactory
3
from cthulhu.manager.types import OsdMap
3
from cthulhu.manager.types import OsdMap, Config
4 4
from cthulhu.manager.user_request import OsdMapModifyingRequest, PgCreatingRequest
5 5

  
6 6
# Valid values for the 'var' argument to 'ceph osd pool set'
7 7
POOL_PROPERTIES = ["size", "min_size", "crash_replay_interval", "pg_num", "pgp_num", "crush_ruleset", "hashpspool"]
8 8

  
9
# In Ceph versions before mon_osd_max_split_count, assume it is set to this
10
LEGACY_MON_OSD_MAX_SPLIT_COUNT = "32"
11

  
9 12

  
10 13
class PoolRequestFactory(RequestFactory):
11 14
    def _resolve_pool(self, pool_id):
12
        for pool in self._cluster_monitor.get_sync_object_data(OsdMap)['pools']:
13
            if pool['pool'] == pool_id:
14
                return pool
15
        else:
16
            raise ValueError("Pool %s not found" % pool_id)
15
        osd_map = self._cluster_monitor.get_sync_object(OsdMap)
16
        return osd_map.pools_by_id[pool_id]
17 17

  
18 18
    def _pool_attribute_commands(self, pool_name, attributes):
19 19
        commands = []
......
64 64
                                      self._cluster_monitor.fsid, self._cluster_monitor.name, commands)
65 65

  
66 66
    def update(self, pool_id, attributes):
67
        # TODO: this is a primitive form of adding PGs, not yet sufficient for
68
        # real use because it leaves pgp_num unset.
69
        pool_name = self._resolve_pool(pool_id)['pool_name']
67
        osd_map = self._cluster_monitor.get_sync_object(OsdMap)
68
        pool = self._resolve_pool(pool_id)
69
        pool_name = pool['pool_name']
70 70

  
71
        if 'pg_num' in attributes and 'pgp_num' in attributes:
72
            # Special case when setting pgp_num and pg_num: have to do some extra work
71
        if 'pg_num' in attributes:
72
            # Special case when setting pg_num: have to do some extra work
73 73
            # to wait for PG creation between setting these two fields.
74
            pgp_num = attributes['pgp_num']
75
            del attributes['pgp_num']
74
            final_pg_count = attributes['pg_num']
75

  
76
            if 'pgp_num' in attributes:
77
                pgp_num = attributes['pgp_num']
78
                del attributes['pgp_num']
79
            else:
80
                pgp_num = attributes['pg_num']
81
            del attributes['pg_num']
82

  
76 83
            pre_create_commands = self._pool_attribute_commands(pool_name, attributes)
77
            post_create_commands = [("osd pool set", {'pool': pool_name, 'var': 'pgp_num', 'val': pgp_num})]
78
            expected_pgs = attributes['pg_num']
84

  
85
            # This setting is new in Ceph Firefly, where it defaults to 32.  For older revisions, we simply
86
            # pretend that the setting exists with a default setting.
87
            mon_osd_max_split_count = int(self._cluster_monitor.get_sync_object_data(Config).get(
88
                'mon_osd_max_split_count', LEGACY_MON_OSD_MAX_SPLIT_COUNT))
89
            initial_pg_count = pool['pg_num']
90
            n_osds = min(initial_pg_count, len(osd_map.osds_by_id))
91
            # The rules about creating PGs:
92
            #  where N_osds = min(old_pg_count, osd_count)
93
            #    the number of new PGs divided by N_osds may not be greater than mon_osd_max_split_count
94
            block_size = mon_osd_max_split_count * n_osds
95

  
79 96
            return PgCreatingRequest(
80
                "Growing pool '{name}' to {size} PGs".format(name=pool_name, size=expected_pgs),
97
                "Growing pool '{name}' to {size} PGs".format(name=pool_name, size=final_pg_count),
81 98
                self._cluster_monitor.fsid, self._cluster_monitor.name,
82
                pre_create_commands, post_create_commands, pool_id, expected_pgs)
99
                pre_create_commands,
100
                pool_id, pool_name, pgp_num,
101
                initial_pg_count, final_pg_count, block_size)
83 102
        else:
84 103
            commands = self._pool_attribute_commands(pool_name, attributes)
85 104
            if not commands:
86 105
                raise NotImplementedError(attributes)
87 106

  
88
            # TOOD: provide some machine-readable indication of which objects are affected
107
            # TODO: provide some machine-readable indication of which objects are affected
89 108
            # by a particular request.
90 109
            # Perhaps subclass Request for each type of object, and have that subclass provide
91 110
            # both the patches->commands mapping and the human readable and machine readable
cthulhu/cthulhu/manager/user_request.py
255 255
    Specialization of OsdMapModifyingRequest to issue a request
256 256
    to issue a second set of commands after PGs created by an
257 257
    initial set of commands have left the 'creating' state.
258

  
259
    This handles issuing multiple smaller "osd pool set pg_num" calls when
260
    the number of new PGs requested is greater than mon_osd_max_split_count,
261
    caller is responsible for telling us how many we may create at once.
258 262
    """
259 263
    PRE_CREATE = 'pre_create'
260 264
    CREATING = 'creating'
261 265
    POST_CREATE = 'post_create'
262 266

  
263
    def __init__(self, headline, fsid, cluster_name, commands, post_create_commands, pool_id, pg_count):
264
        super(PgCreatingRequest, self).__init__(headline, fsid, cluster_name, commands)
265
        self._post_create_commands = post_create_commands
267
    # I need to know:
268
    # - starting number of PGs
269
    # - goal number of PGs
270
    # - how many PGs I may create in one go.
271

  
272
    def __init__(self, headline, fsid, cluster_name, commands,
273
                 pool_id, pool_name, pgp_num,
274
                 initial_pg_count, final_pg_count, block_size):
275
        """
276
        :param commands: Commands to execute before creating PGs
277
        :param initial_pg_count: How many PGs the pool has before we change anything
278
        :param final_pg_count: How many PGs the pool should have when we are done
279
        :param block_size: How many PGs we may create in one "osd pool set" command
280
        """
266 281

  
267 282
        self._phase = self.PRE_CREATE
268 283
        self._await_osd_version = None
269 284

  
270 285
        self._pool_id = pool_id
271
        self._pg_count = pg_count
286
        self._pool_name = pool_name
287
        self._final_count = final_pg_count
288
        self._initial_count = initial_pg_count
289
        self._block_size = block_size
272 290

  
273 291
        self._headline = headline
274 292

  
293
        self._intermediate_goal = min(self._final_count, self._initial_count + self._block_size)
294
        commands.append(('osd pool set', {
295
            'pool': self._pool_name,
296
            'var': 'pg_num',
297
            'val': self._intermediate_goal
298
        }))
299
        self._still_to_create = self._final_count - self._initial_count
300

  
301
        self._post_create_commands = [("osd pool set", {'pool': pool_name, 'var': 'pgp_num', 'val': pgp_num})]
302

  
303
        super(PgCreatingRequest, self).__init__(headline, fsid, cluster_name, commands)
304

  
275 305
    @property
276 306
    def status(self):
277 307
        if self._phase == self.CREATING:
278
            return "Waiting for PGs to be created"
308
            total_creating = (self._final_count - self._initial_count)
309
            created = total_creating - self._still_to_create
310

  
311
            if self._intermediate_goal != self._final_count:
312
                currently_creating_min = max(self._intermediate_goal - self._block_size, self._initial_count)
313
                currently_creating_max = self._intermediate_goal
314
                return "Waiting for PG creation (%s/%s), currently creating PGs %s-%s" % (
315
                    created, total_creating, currently_creating_min, currently_creating_max)
316
            else:
317
                return "Waiting for PG creation (%s/%s)" % (created, total_creating)
279 318
        else:
280 319
            return super(PgCreatingRequest, self).status
281 320

  
282 321
    def complete_jid(self, result):
283 322
        if self._phase == self.PRE_CREATE:
323
            self.log.debug("PgCreatingRequest.complete_jid PRE_CREATE->CREATING")
284 324
            # The initial tranche of jobs has completed, start waiting
285 325
            # for PG creation to complete
286 326
            self.jid = None
287 327
            self._await_osd_version = result['versions']['osd_map']
288 328
            self._phase = self.CREATING
289
            self.log.debug("PgCreatingRequest PRE_CREATE->CREATING")
290 329
        elif self._phase == self.POST_CREATE:
330
            self.log.debug("PgCreatingRequest.complete_jid POST_CREATE->complete")
291 331
            # Act just like an OSD map modification
292 332
            super(PgCreatingRequest, self).complete_jid(result)
333
        elif self._phase == self.CREATING:
334
            self.jid = None
335
            self.log.debug(
336
                "PgCreatingRequest.complete_jid: successfully issued request for %s" % self._intermediate_goal)
293 337

  
294 338
    def on_map(self, sync_type, sync_objects):
295 339
        self.log.debug("PgCreatingRequest %s %s" % (sync_type.str, self._phase))
......
312 356
                        if 'creating' not in states:
313 357
                            pg_counter += 1
314 358

  
315
                self.log.debug("PgCreatingRequest.on_map: pg_counter=%s/%s" % (pg_counter, self._pg_count))
316
                if pg_counter >= self._pg_count:
317
                    self._phase = self.POST_CREATE
318
                    self.log.debug("PgCreatingRequest CREATING->POST_CREATE")
319
                    self._submit(self._post_create_commands)
359
                self._still_to_create = max(self._final_count - pg_counter, 0)
360
                self.log.debug("PgCreatingRequest.on_map: pg_counter=%s/%s (final %s)" % (
361
                    pg_counter, self._intermediate_goal, self._final_count))
362
                if pg_counter >= self._intermediate_goal:
363
                    if self._intermediate_goal == self._final_count:
364
                        self._phase = self.POST_CREATE
365
                        self.log.debug("PgCreatingRequest.on_map CREATING->POST_CREATE")
366
                        self._submit(self._post_create_commands)
367
                    else:
368
                        self.log.debug("PgCreatingREQUEST.on_map CREATING->CREATING")
369
                        self._intermediate_goal = min(self._final_count, self._intermediate_goal + self._block_size)
370
                        # Request another tranche of PGs up to _block_size
371
                        self._submit([('osd pool set', {
372
                            'pool': self._pool_name,
373
                            'var': 'pg_num',
374
                            'val': self._intermediate_goal
375
                        })])
376

  
320 377
        elif self._phase == self.POST_CREATE:
321 378
            super(PgCreatingRequest, self).on_map(sync_type, sync_objects)
minion-sim/minion_sim/ceph_cluster.py
637 637
    "auth_supported": "",
638 638
    "rgw_thread_pool_size": "100",
639 639
    "mon_globalid_prealloc": "100",
640
    "filestore_fiemap": "false"
640
    "filestore_fiemap": "false",
641
    "mon_osd_max_split_count": "32"
641 642
}
642 643
""")
643 644

  
......
1237 1238
                pool['pg_num'], val
1238 1239
            ))
1239 1240
            # Growing a pool, creating PGs
1241
            new_pg_count = val - pool['pg_num']
1242
            osd_count = min(pool['pg_num'], len(self._objects['osd_map']['osds']))
1243
            if new_pg_count > osd_count * int(self._objects['config']['mon_osd_max_split_count']):
1244
                raise RuntimeError("Exceeded mon_osd_max_split_count")
1240 1245
            self._create_pgs(pool['pool'], range(pool['pg_num'], val))
1241 1246

  
1242 1247
        if var == 'pgp_num':

Also available in: Unified diff