Project

General

Profile

Bug #24744 ยป ceph_repro.py

Joseph Victor, 06/21/2021 08:35 PM

 
import boto3
import time
import multiprocessing
import random
import uuid
import datetime

ENDPOINT_URL = "http://127.0.0.1:8000"
ACCESS_KEY = "accesskey"
SECRET_KEY = "secretkey"
BUCKET = "bottomless"

COPY_MODE = False

NUM_ITERS = 5000
NUM_UPLOAD_THREADS = 10
NUM_LIST_THREADS = 3

FILES_PER_DIR = 1000

g_num_finished = multiprocessing.Value('i', 0)
g_value_lock = multiprocessing.Lock()
g_failed = multiprocessing.Value('i', 0)

# Sorry if my python isn't the best!
#
# The idea of this repro is to have two sets up threads, one running uploads and
# the other running lists. Each iteration gets a directory, d_<iteration_num>, and
# the upload threads and lists more or less try to pound the same subdirectory at the same time.
# The lists also verify that there are the right number of files in each iteration after all the
# uploaders are done.
#
# When I run this test (against Ceph Nano Octopus and a production deployment at Singlestore on Nautulus),
# it eventually hits the assert ahd prints out the name of a missing file. That file doesn't seem to show
# up in any list queries after that, even though the uploader must have recieved a 200 back from ceph's S3
# endpoint. Curiously, it DOES show up in get_object as existing.

def timestamp():
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")

def ConnectToCeph():
return boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
endpoint_url=ENDPOINT_URL)

def ListThread(thread_id, test_subdir):
ceph_conn = ConnectToCeph()
iter_verified = 0
times_failed = 0
time_of_last_fail = time.time()

# This thread is done after verifying all directories.
#
while iter_verified < NUM_ITERS:
assert not g_failed.value
# Randomly pick between listing the directory currently being uploaded to or listing
# the lowest un-verified directory.
#
num_finished = g_num_finished.value
current_upload_dir = min(NUM_ITERS, num_finished // NUM_UPLOAD_THREADS)
dir_id = random.choice([current_upload_dir, iter_verified])

# If num_finished > than the dir id times the number of upload threads, all files
# will have been uploaded.
#
expect_all_found = (dir_id + 1) * NUM_UPLOAD_THREADS <= num_finished

dir = "%s/d_%d/" % (test_subdir, dir_id)
seen = set([])
resp = ceph_conn.list_objects(Bucket=BUCKET, Prefix=dir)
assert not resp['IsTruncated'], resp
if 'Contents' in resp:
for obj in resp['Contents']:
assert obj['Key'][:len(dir) + 2] == (dir + "f_"), obj['Key']
seen.add(int(obj['Key'][len(dir) + 2:]))

# If dir_id is the same as the next iteration to verify, try verifying it.
#
if dir_id == iter_verified:
if len(seen) == FILES_PER_DIR:
print ("%s %d verified directory %d" % (timestamp(), thread_id, dir_id))
iter_verified += 1
times_failed = 0
time_of_last_fail = time.time()
elif expect_all_found:
print ("%s %d only saw %d files in directory %d" % (timestamp(), thread_id, len(seen), dir_id))
for fid in range(FILES_PER_DIR):
if fid not in seen:
missing_file = dir + "f_" + str(fid)
print ("%s %d Is this file '%s' missing? Check if missing?" % (timestamp(), thread_id, missing_file))
try:
get_resp = ceph_conn.get_object(Key=missing_file, Bucket=BUCKET)
print ("%s %d Seems like its here! %s" % (timestamp(), thread_id, str(get_resp)))
except Exception as e:
print ("%s %d Seems like an error: %s" % (timestamp(), thread_id, str(e)))
times_failed += 1
time_of_last_fail = time.time()
if times_failed > 10:
g_failed.value = 1
assert False
def UploadThread(thread_id, test_subdir):
ceph_conn = ConnectToCeph()

# Upload empty files d_<dir_id>/f_<file_id> where the file_id mod the number of threads is
# this thread's id.
#
for i in range(NUM_ITERS):
assert not g_failed.value
for j in range(FILES_PER_DIR):
if j % NUM_UPLOAD_THREADS == thread_id:
filename = "%s/d_%d/f_%d" % (test_subdir, i, j)
ceph_conn.upload_file("/tmp/empty_file", BUCKET, filename)
if COPY_MODE:
ceph_conn.copy_object(Bucket=BUCKET, Key=filename, CopySource=BUCKET + "/" + filename, MetadataDirective="REPLACE")

print ("%s %d uploaded directory %d" % (timestamp(), thread_id, i))
# Wait till all threads move on to the next directory
#
with g_value_lock:
g_num_finished.value += 1
while g_num_finished.value < (i + 1) * NUM_UPLOAD_THREADS:
time.sleep(0.001)
if __name__ == "__main__":
test_subdir = str(uuid.uuid1())
print ("Test subdir is %s" % test_subdir)

# Create an empty file to upload
#
with open("/tmp/empty_file", "w"):
pass

threads = []
for i in range(NUM_LIST_THREADS):
threads.append(multiprocessing.Process(target=ListThread, args=(i, test_subdir)))
for i in range(NUM_UPLOAD_THREADS):
threads.append(multiprocessing.Process(target=UploadThread, args=(i, test_subdir)))
for t in threads:
t.start()
for t in threads:
t.join()


    (1-1/1)