


Bug #24744 ยป

Joseph Victor, 06/21/2021 08:35 PM

import boto3
import time
import multiprocessing
import random
import uuid
import datetime

ACCESS_KEY = "accesskey"
SECRET_KEY = "secretkey"
BUCKET = "bottomless"


NUM_ITERS = 5000


g_num_finished = multiprocessing.Value('i', 0)
g_value_lock = multiprocessing.Lock()
g_failed = multiprocessing.Value('i', 0)

# Sorry if my python isn't the best!
# The idea of this repro is to have two sets up threads, one running uploads and
# the other running lists. Each iteration gets a directory, d_<iteration_num>, and
# the upload threads and lists more or less try to pound the same subdirectory at the same time.
# The lists also verify that there are the right number of files in each iteration after all the
# uploaders are done.
# When I run this test (against Ceph Nano Octopus and a production deployment at Singlestore on Nautulus),
# it eventually hits the assert ahd prints out the name of a missing file. That file doesn't seem to show
# up in any list queries after that, even though the uploader must have recieved a 200 back from ceph's S3
# endpoint. Curiously, it DOES show up in get_object as existing.

def timestamp():
return"%Y-%m-%d %H:%M:%S.%f")

def ConnectToCeph():
return boto3.client(

def ListThread(thread_id, test_subdir):
ceph_conn = ConnectToCeph()
iter_verified = 0
times_failed = 0
time_of_last_fail = time.time()

# This thread is done after verifying all directories.
while iter_verified < NUM_ITERS:
assert not g_failed.value
# Randomly pick between listing the directory currently being uploaded to or listing
# the lowest un-verified directory.
num_finished = g_num_finished.value
current_upload_dir = min(NUM_ITERS, num_finished // NUM_UPLOAD_THREADS)
dir_id = random.choice([current_upload_dir, iter_verified])

# If num_finished > than the dir id times the number of upload threads, all files
# will have been uploaded.
expect_all_found = (dir_id + 1) * NUM_UPLOAD_THREADS <= num_finished

dir = "%s/d_%d/" % (test_subdir, dir_id)
seen = set([])
resp = ceph_conn.list_objects(Bucket=BUCKET, Prefix=dir)
assert not resp['IsTruncated'], resp
if 'Contents' in resp:
for obj in resp['Contents']:
assert obj['Key'][:len(dir) + 2] == (dir + "f_"), obj['Key']
seen.add(int(obj['Key'][len(dir) + 2:]))

# If dir_id is the same as the next iteration to verify, try verifying it.
if dir_id == iter_verified:
if len(seen) == FILES_PER_DIR:
print ("%s %d verified directory %d" % (timestamp(), thread_id, dir_id))
iter_verified += 1
times_failed = 0
time_of_last_fail = time.time()
elif expect_all_found:
print ("%s %d only saw %d files in directory %d" % (timestamp(), thread_id, len(seen), dir_id))
for fid in range(FILES_PER_DIR):
if fid not in seen:
missing_file = dir + "f_" + str(fid)
print ("%s %d Is this file '%s' missing? Check if missing?" % (timestamp(), thread_id, missing_file))
get_resp = ceph_conn.get_object(Key=missing_file, Bucket=BUCKET)
print ("%s %d Seems like its here! %s" % (timestamp(), thread_id, str(get_resp)))
except Exception as e:
print ("%s %d Seems like an error: %s" % (timestamp(), thread_id, str(e)))
times_failed += 1
time_of_last_fail = time.time()
if times_failed > 10:
g_failed.value = 1
assert False
def UploadThread(thread_id, test_subdir):
ceph_conn = ConnectToCeph()

# Upload empty files d_<dir_id>/f_<file_id> where the file_id mod the number of threads is
# this thread's id.
for i in range(NUM_ITERS):
assert not g_failed.value
for j in range(FILES_PER_DIR):
if j % NUM_UPLOAD_THREADS == thread_id:
filename = "%s/d_%d/f_%d" % (test_subdir, i, j)
ceph_conn.upload_file("/tmp/empty_file", BUCKET, filename)
ceph_conn.copy_object(Bucket=BUCKET, Key=filename, CopySource=BUCKET + "/" + filename, MetadataDirective="REPLACE")

print ("%s %d uploaded directory %d" % (timestamp(), thread_id, i))
# Wait till all threads move on to the next directory
with g_value_lock:
g_num_finished.value += 1
while g_num_finished.value < (i + 1) * NUM_UPLOAD_THREADS:
if __name__ == "__main__":
test_subdir = str(uuid.uuid1())
print ("Test subdir is %s" % test_subdir)

# Create an empty file to upload
with open("/tmp/empty_file", "w"):

threads = []
for i in range(NUM_LIST_THREADS):
threads.append(multiprocessing.Process(target=ListThread, args=(i, test_subdir)))
for i in range(NUM_UPLOAD_THREADS):
threads.append(multiprocessing.Process(target=UploadThread, args=(i, test_subdir)))
for t in threads:
for t in threads:
