Revision 30e26cab
cthulhu: Issue multiple smaller PG create commands
This is in order to avoiding hitting the mon_osd_max_split_count
limit introduced in Firefly.
Fixes: #7290
cthulhu/cthulhu/manager/pool_request_factory.py | ||
---|---|---|
1 | 1 |
from cthulhu.log import log |
2 | 2 |
from cthulhu.manager.request_factory import RequestFactory |
3 |
from cthulhu.manager.types import OsdMap |
|
3 |
from cthulhu.manager.types import OsdMap, Config
|
|
4 | 4 |
from cthulhu.manager.user_request import OsdMapModifyingRequest, PgCreatingRequest |
5 | 5 |
|
6 | 6 |
# Valid values for the 'var' argument to 'ceph osd pool set' |
7 | 7 |
POOL_PROPERTIES = ["size", "min_size", "crash_replay_interval", "pg_num", "pgp_num", "crush_ruleset", "hashpspool"] |
8 | 8 |
|
9 |
# In Ceph versions before mon_osd_max_split_count, assume it is set to this |
|
10 |
LEGACY_MON_OSD_MAX_SPLIT_COUNT = "32" |
|
11 |
|
|
9 | 12 |
|
10 | 13 |
class PoolRequestFactory(RequestFactory): |
11 | 14 |
def _resolve_pool(self, pool_id): |
12 |
for pool in self._cluster_monitor.get_sync_object_data(OsdMap)['pools']: |
|
13 |
if pool['pool'] == pool_id: |
|
14 |
return pool |
|
15 |
else: |
|
16 |
raise ValueError("Pool %s not found" % pool_id) |
|
15 |
osd_map = self._cluster_monitor.get_sync_object(OsdMap) |
|
16 |
return osd_map.pools_by_id[pool_id] |
|
17 | 17 |
|
18 | 18 |
def _pool_attribute_commands(self, pool_name, attributes): |
19 | 19 |
commands = [] |
... | ... | |
64 | 64 |
self._cluster_monitor.fsid, self._cluster_monitor.name, commands) |
65 | 65 |
|
66 | 66 |
def update(self, pool_id, attributes): |
67 |
# TODO: this is a primitive form of adding PGs, not yet sufficient for
|
|
68 |
# real use because it leaves pgp_num unset.
|
|
69 |
pool_name = self._resolve_pool(pool_id)['pool_name']
|
|
67 |
osd_map = self._cluster_monitor.get_sync_object(OsdMap)
|
|
68 |
pool = self._resolve_pool(pool_id)
|
|
69 |
pool_name = pool['pool_name']
|
|
70 | 70 |
|
71 |
if 'pg_num' in attributes and 'pgp_num' in attributes:
|
|
72 |
# Special case when setting pgp_num and pg_num: have to do some extra work
|
|
71 |
if 'pg_num' in attributes: |
|
72 |
# Special case when setting pg_num: have to do some extra work |
|
73 | 73 |
# to wait for PG creation between setting these two fields. |
74 |
pgp_num = attributes['pgp_num'] |
|
75 |
del attributes['pgp_num'] |
|
74 |
final_pg_count = attributes['pg_num'] |
|
75 |
|
|
76 |
if 'pgp_num' in attributes: |
|
77 |
pgp_num = attributes['pgp_num'] |
|
78 |
del attributes['pgp_num'] |
|
79 |
else: |
|
80 |
pgp_num = attributes['pg_num'] |
|
81 |
del attributes['pg_num'] |
|
82 |
|
|
76 | 83 |
pre_create_commands = self._pool_attribute_commands(pool_name, attributes) |
77 |
post_create_commands = [("osd pool set", {'pool': pool_name, 'var': 'pgp_num', 'val': pgp_num})] |
|
78 |
expected_pgs = attributes['pg_num'] |
|
84 |
|
|
85 |
# This setting is new in Ceph Firefly, where it defaults to 32. For older revisions, we simply |
|
86 |
# pretend that the setting exists with a default setting. |
|
87 |
mon_osd_max_split_count = int(self._cluster_monitor.get_sync_object_data(Config).get( |
|
88 |
'mon_osd_max_split_count', LEGACY_MON_OSD_MAX_SPLIT_COUNT)) |
|
89 |
initial_pg_count = pool['pg_num'] |
|
90 |
n_osds = min(initial_pg_count, len(osd_map.osds_by_id)) |
|
91 |
# The rules about creating PGs: |
|
92 |
# where N_osds = min(old_pg_count, osd_count) |
|
93 |
# the number of new PGs divided by N_osds may not be greater than mon_osd_max_split_count |
|
94 |
block_size = mon_osd_max_split_count * n_osds |
|
95 |
|
|
79 | 96 |
return PgCreatingRequest( |
80 |
"Growing pool '{name}' to {size} PGs".format(name=pool_name, size=expected_pgs),
|
|
97 |
"Growing pool '{name}' to {size} PGs".format(name=pool_name, size=final_pg_count),
|
|
81 | 98 |
self._cluster_monitor.fsid, self._cluster_monitor.name, |
82 |
pre_create_commands, post_create_commands, pool_id, expected_pgs) |
|
99 |
pre_create_commands, |
|
100 |
pool_id, pool_name, pgp_num, |
|
101 |
initial_pg_count, final_pg_count, block_size) |
|
83 | 102 |
else: |
84 | 103 |
commands = self._pool_attribute_commands(pool_name, attributes) |
85 | 104 |
if not commands: |
86 | 105 |
raise NotImplementedError(attributes) |
87 | 106 |
|
88 |
# TOOD: provide some machine-readable indication of which objects are affected
|
|
107 |
# TODO: provide some machine-readable indication of which objects are affected
|
|
89 | 108 |
# by a particular request. |
90 | 109 |
# Perhaps subclass Request for each type of object, and have that subclass provide |
91 | 110 |
# both the patches->commands mapping and the human readable and machine readable |
cthulhu/cthulhu/manager/user_request.py | ||
---|---|---|
255 | 255 |
Specialization of OsdMapModifyingRequest to issue a request |
256 | 256 |
to issue a second set of commands after PGs created by an |
257 | 257 |
initial set of commands have left the 'creating' state. |
258 |
|
|
259 |
This handles issuing multiple smaller "osd pool set pg_num" calls when |
|
260 |
the number of new PGs requested is greater than mon_osd_max_split_count, |
|
261 |
caller is responsible for telling us how many we may create at once. |
|
258 | 262 |
""" |
259 | 263 |
PRE_CREATE = 'pre_create' |
260 | 264 |
CREATING = 'creating' |
261 | 265 |
POST_CREATE = 'post_create' |
262 | 266 |
|
263 |
def __init__(self, headline, fsid, cluster_name, commands, post_create_commands, pool_id, pg_count): |
|
264 |
super(PgCreatingRequest, self).__init__(headline, fsid, cluster_name, commands) |
|
265 |
self._post_create_commands = post_create_commands |
|
267 |
# I need to know: |
|
268 |
# - starting number of PGs |
|
269 |
# - goal number of PGs |
|
270 |
# - how many PGs I may create in one go. |
|
271 |
|
|
272 |
def __init__(self, headline, fsid, cluster_name, commands, |
|
273 |
pool_id, pool_name, pgp_num, |
|
274 |
initial_pg_count, final_pg_count, block_size): |
|
275 |
""" |
|
276 |
:param commands: Commands to execute before creating PGs |
|
277 |
:param initial_pg_count: How many PGs the pool has before we change anything |
|
278 |
:param final_pg_count: How many PGs the pool should have when we are done |
|
279 |
:param block_size: How many PGs we may create in one "osd pool set" command |
|
280 |
""" |
|
266 | 281 |
|
267 | 282 |
self._phase = self.PRE_CREATE |
268 | 283 |
self._await_osd_version = None |
269 | 284 |
|
270 | 285 |
self._pool_id = pool_id |
271 |
self._pg_count = pg_count |
|
286 |
self._pool_name = pool_name |
|
287 |
self._final_count = final_pg_count |
|
288 |
self._initial_count = initial_pg_count |
|
289 |
self._block_size = block_size |
|
272 | 290 |
|
273 | 291 |
self._headline = headline |
274 | 292 |
|
293 |
self._intermediate_goal = min(self._final_count, self._initial_count + self._block_size) |
|
294 |
commands.append(('osd pool set', { |
|
295 |
'pool': self._pool_name, |
|
296 |
'var': 'pg_num', |
|
297 |
'val': self._intermediate_goal |
|
298 |
})) |
|
299 |
self._still_to_create = self._final_count - self._initial_count |
|
300 |
|
|
301 |
self._post_create_commands = [("osd pool set", {'pool': pool_name, 'var': 'pgp_num', 'val': pgp_num})] |
|
302 |
|
|
303 |
super(PgCreatingRequest, self).__init__(headline, fsid, cluster_name, commands) |
|
304 |
|
|
275 | 305 |
@property |
276 | 306 |
def status(self): |
277 | 307 |
if self._phase == self.CREATING: |
278 |
return "Waiting for PGs to be created" |
|
308 |
total_creating = (self._final_count - self._initial_count) |
|
309 |
created = total_creating - self._still_to_create |
|
310 |
|
|
311 |
if self._intermediate_goal != self._final_count: |
|
312 |
currently_creating_min = max(self._intermediate_goal - self._block_size, self._initial_count) |
|
313 |
currently_creating_max = self._intermediate_goal |
|
314 |
return "Waiting for PG creation (%s/%s), currently creating PGs %s-%s" % ( |
|
315 |
created, total_creating, currently_creating_min, currently_creating_max) |
|
316 |
else: |
|
317 |
return "Waiting for PG creation (%s/%s)" % (created, total_creating) |
|
279 | 318 |
else: |
280 | 319 |
return super(PgCreatingRequest, self).status |
281 | 320 |
|
282 | 321 |
def complete_jid(self, result): |
283 | 322 |
if self._phase == self.PRE_CREATE: |
323 |
self.log.debug("PgCreatingRequest.complete_jid PRE_CREATE->CREATING") |
|
284 | 324 |
# The initial tranche of jobs has completed, start waiting |
285 | 325 |
# for PG creation to complete |
286 | 326 |
self.jid = None |
287 | 327 |
self._await_osd_version = result['versions']['osd_map'] |
288 | 328 |
self._phase = self.CREATING |
289 |
self.log.debug("PgCreatingRequest PRE_CREATE->CREATING") |
|
290 | 329 |
elif self._phase == self.POST_CREATE: |
330 |
self.log.debug("PgCreatingRequest.complete_jid POST_CREATE->complete") |
|
291 | 331 |
# Act just like an OSD map modification |
292 | 332 |
super(PgCreatingRequest, self).complete_jid(result) |
333 |
elif self._phase == self.CREATING: |
|
334 |
self.jid = None |
|
335 |
self.log.debug( |
|
336 |
"PgCreatingRequest.complete_jid: successfully issued request for %s" % self._intermediate_goal) |
|
293 | 337 |
|
294 | 338 |
def on_map(self, sync_type, sync_objects): |
295 | 339 |
self.log.debug("PgCreatingRequest %s %s" % (sync_type.str, self._phase)) |
... | ... | |
312 | 356 |
if 'creating' not in states: |
313 | 357 |
pg_counter += 1 |
314 | 358 |
|
315 |
self.log.debug("PgCreatingRequest.on_map: pg_counter=%s/%s" % (pg_counter, self._pg_count)) |
|
316 |
if pg_counter >= self._pg_count: |
|
317 |
self._phase = self.POST_CREATE |
|
318 |
self.log.debug("PgCreatingRequest CREATING->POST_CREATE") |
|
319 |
self._submit(self._post_create_commands) |
|
359 |
self._still_to_create = max(self._final_count - pg_counter, 0) |
|
360 |
self.log.debug("PgCreatingRequest.on_map: pg_counter=%s/%s (final %s)" % ( |
|
361 |
pg_counter, self._intermediate_goal, self._final_count)) |
|
362 |
if pg_counter >= self._intermediate_goal: |
|
363 |
if self._intermediate_goal == self._final_count: |
|
364 |
self._phase = self.POST_CREATE |
|
365 |
self.log.debug("PgCreatingRequest.on_map CREATING->POST_CREATE") |
|
366 |
self._submit(self._post_create_commands) |
|
367 |
else: |
|
368 |
self.log.debug("PgCreatingREQUEST.on_map CREATING->CREATING") |
|
369 |
self._intermediate_goal = min(self._final_count, self._intermediate_goal + self._block_size) |
|
370 |
# Request another tranche of PGs up to _block_size |
|
371 |
self._submit([('osd pool set', { |
|
372 |
'pool': self._pool_name, |
|
373 |
'var': 'pg_num', |
|
374 |
'val': self._intermediate_goal |
|
375 |
})]) |
|
376 |
|
|
320 | 377 |
elif self._phase == self.POST_CREATE: |
321 | 378 |
super(PgCreatingRequest, self).on_map(sync_type, sync_objects) |
minion-sim/minion_sim/ceph_cluster.py | ||
---|---|---|
637 | 637 |
"auth_supported": "", |
638 | 638 |
"rgw_thread_pool_size": "100", |
639 | 639 |
"mon_globalid_prealloc": "100", |
640 |
"filestore_fiemap": "false" |
|
640 |
"filestore_fiemap": "false", |
|
641 |
"mon_osd_max_split_count": "32" |
|
641 | 642 |
} |
642 | 643 |
""") |
643 | 644 |
|
... | ... | |
1237 | 1238 |
pool['pg_num'], val |
1238 | 1239 |
)) |
1239 | 1240 |
# Growing a pool, creating PGs |
1241 |
new_pg_count = val - pool['pg_num'] |
|
1242 |
osd_count = min(pool['pg_num'], len(self._objects['osd_map']['osds'])) |
|
1243 |
if new_pg_count > osd_count * int(self._objects['config']['mon_osd_max_split_count']): |
|
1244 |
raise RuntimeError("Exceeded mon_osd_max_split_count") |
|
1240 | 1245 |
self._create_pgs(pool['pool'], range(pool['pg_num'], val)) |
1241 | 1246 |
|
1242 | 1247 |
if var == 'pgp_num': |
Also available in: Unified diff