⚲
Project
General
Profile
Sign in
Register
Home
Projects
Help
Search
:
rbd
All Projects
Ceph
»
rbd
Overview
Activity
Roadmap
Issues
Spent time
Download (1.38 KB)
Bug #41354
» test_volumes_thread.py
Nikola Ciprich
, 08/20/2019 04:07 PM
try
:
import
rbd
import
rados
except
Exception
as
err
:
print
(
err
)
import
time
import
argparse
import
threading
global
counter
global
end
global
vols
def
tester
(
r
):
global
counter
global
vols
global
end
while
not
end
:
for
volume
in
volumes
:
vols
.
append
(
volume
)
try
:
with
rados
.
Rados
(
conffile
=
'
/etc/ceph/ceph.conf
'
)
as
cluster
:
with
cluster
.
open_ioctx
(
"
sata
"
)
as
ioctx
:
try
:
with
rbd
.
Image
(
ioctx
,
name
=
volume
,
read_only
=
True
)
as
rbdi
:
counter
+=
1
except
rbd
.
ImageNotFound
:
print
(
"
%s volume not found
"
%
(
volume
))
except
rbd
.
ImageBusy
:
print
(
"
%s volume busy
"
%
(
volume
))
except
Exception
as
err
:
print
(
"
%s %s
"
%
(
volume
,
err
))
except
Exception
as
err
:
print
(
"
%s %s
"
%
(
volume
,
err
))
vols
.
remove
(
volume
)
if
__name__
==
"
__main__
"
:
counter
=
0
vols
=
[]
end
=
False
try
:
r
=
rbd
.
RBD
()
with
rados
.
Rados
(
conffile
=
'
/etc/ceph/ceph.conf
'
)
as
cluster
:
with
cluster
.
open_ioctx
(
"
sata
"
)
as
ioctx
:
volumes
=
r
.
list
(
ioctx
)
except
Exception
as
err
:
print
(
err
)
threads
=
[]
for
i
in
xrange
(
10
):
threads
.
append
(
threading
.
Thread
(
target
=
tester
,
args
=
(
r
,)))
for
thread
in
threads
:
thread
.
setDaemon
(
True
)
thread
.
start
()
while
True
:
try
:
time
.
sleep
(
5
)
print
(
"
Volumes tested: %s
"
%
(
counter
))
except
Exception
as
err
:
print
(
vols
,
err
)
end
=
True
for
thread
in
threads
:
thread
.
join
()
« Previous
1
2
3
Next »
(3-3/3)
Loading...