|
import boto3
|
|
import time
|
|
import multiprocessing
|
|
import random
|
|
import uuid
|
|
import datetime
|
|
|
|
ENDPOINT_URL = "http://127.0.0.1:8000"
|
|
ACCESS_KEY = "accesskey"
|
|
SECRET_KEY = "secretkey"
|
|
BUCKET = "bottomless"
|
|
|
|
COPY_MODE = False
|
|
|
|
NUM_ITERS = 5000
|
|
NUM_UPLOAD_THREADS = 10
|
|
NUM_LIST_THREADS = 3
|
|
|
|
FILES_PER_DIR = 1000
|
|
|
|
g_num_finished = multiprocessing.Value('i', 0)
|
|
g_value_lock = multiprocessing.Lock()
|
|
g_failed = multiprocessing.Value('i', 0)
|
|
|
|
# Sorry if my python isn't the best!
|
|
#
|
|
# The idea of this repro is to have two sets up threads, one running uploads and
|
|
# the other running lists. Each iteration gets a directory, d_<iteration_num>, and
|
|
# the upload threads and lists more or less try to pound the same subdirectory at the same time.
|
|
# The lists also verify that there are the right number of files in each iteration after all the
|
|
# uploaders are done.
|
|
#
|
|
# When I run this test (against Ceph Nano Octopus and a production deployment at Singlestore on Nautulus),
|
|
# it eventually hits the assert ahd prints out the name of a missing file. That file doesn't seem to show
|
|
# up in any list queries after that, even though the uploader must have recieved a 200 back from ceph's S3
|
|
# endpoint. Curiously, it DOES show up in get_object as existing.
|
|
|
|
def timestamp():
|
|
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
|
|
|
|
def ConnectToCeph():
|
|
return boto3.client(
|
|
's3',
|
|
aws_access_key_id=ACCESS_KEY,
|
|
aws_secret_access_key=SECRET_KEY,
|
|
endpoint_url=ENDPOINT_URL)
|
|
|
|
def ListThread(thread_id, test_subdir):
|
|
ceph_conn = ConnectToCeph()
|
|
iter_verified = 0
|
|
times_failed = 0
|
|
time_of_last_fail = time.time()
|
|
|
|
# This thread is done after verifying all directories.
|
|
#
|
|
while iter_verified < NUM_ITERS:
|
|
assert not g_failed.value
|
|
|
|
# Randomly pick between listing the directory currently being uploaded to or listing
|
|
# the lowest un-verified directory.
|
|
#
|
|
num_finished = g_num_finished.value
|
|
current_upload_dir = min(NUM_ITERS, num_finished // NUM_UPLOAD_THREADS)
|
|
dir_id = random.choice([current_upload_dir, iter_verified])
|
|
|
|
# If num_finished > than the dir id times the number of upload threads, all files
|
|
# will have been uploaded.
|
|
#
|
|
expect_all_found = (dir_id + 1) * NUM_UPLOAD_THREADS <= num_finished
|
|
|
|
dir = "%s/d_%d/" % (test_subdir, dir_id)
|
|
seen = set([])
|
|
resp = ceph_conn.list_objects(Bucket=BUCKET, Prefix=dir)
|
|
assert not resp['IsTruncated'], resp
|
|
if 'Contents' in resp:
|
|
for obj in resp['Contents']:
|
|
assert obj['Key'][:len(dir) + 2] == (dir + "f_"), obj['Key']
|
|
seen.add(int(obj['Key'][len(dir) + 2:]))
|
|
|
|
# If dir_id is the same as the next iteration to verify, try verifying it.
|
|
#
|
|
if dir_id == iter_verified:
|
|
if len(seen) == FILES_PER_DIR:
|
|
print ("%s %d verified directory %d" % (timestamp(), thread_id, dir_id))
|
|
iter_verified += 1
|
|
times_failed = 0
|
|
time_of_last_fail = time.time()
|
|
elif expect_all_found:
|
|
print ("%s %d only saw %d files in directory %d" % (timestamp(), thread_id, len(seen), dir_id))
|
|
for fid in range(FILES_PER_DIR):
|
|
if fid not in seen:
|
|
missing_file = dir + "f_" + str(fid)
|
|
print ("%s %d Is this file '%s' missing? Check if missing?" % (timestamp(), thread_id, missing_file))
|
|
try:
|
|
get_resp = ceph_conn.get_object(Key=missing_file, Bucket=BUCKET)
|
|
print ("%s %d Seems like its here! %s" % (timestamp(), thread_id, str(get_resp)))
|
|
except Exception as e:
|
|
print ("%s %d Seems like an error: %s" % (timestamp(), thread_id, str(e)))
|
|
times_failed += 1
|
|
time_of_last_fail = time.time()
|
|
if times_failed > 10:
|
|
g_failed.value = 1
|
|
assert False
|
|
|
|
def UploadThread(thread_id, test_subdir):
|
|
ceph_conn = ConnectToCeph()
|
|
|
|
# Upload empty files d_<dir_id>/f_<file_id> where the file_id mod the number of threads is
|
|
# this thread's id.
|
|
#
|
|
for i in range(NUM_ITERS):
|
|
assert not g_failed.value
|
|
for j in range(FILES_PER_DIR):
|
|
if j % NUM_UPLOAD_THREADS == thread_id:
|
|
filename = "%s/d_%d/f_%d" % (test_subdir, i, j)
|
|
ceph_conn.upload_file("/tmp/empty_file", BUCKET, filename)
|
|
if COPY_MODE:
|
|
ceph_conn.copy_object(Bucket=BUCKET, Key=filename, CopySource=BUCKET + "/" + filename, MetadataDirective="REPLACE")
|
|
|
|
print ("%s %d uploaded directory %d" % (timestamp(), thread_id, i))
|
|
|
|
# Wait till all threads move on to the next directory
|
|
#
|
|
with g_value_lock:
|
|
g_num_finished.value += 1
|
|
while g_num_finished.value < (i + 1) * NUM_UPLOAD_THREADS:
|
|
time.sleep(0.001)
|
|
|
|
if __name__ == "__main__":
|
|
test_subdir = str(uuid.uuid1())
|
|
print ("Test subdir is %s" % test_subdir)
|
|
|
|
# Create an empty file to upload
|
|
#
|
|
with open("/tmp/empty_file", "w"):
|
|
pass
|
|
|
|
threads = []
|
|
for i in range(NUM_LIST_THREADS):
|
|
threads.append(multiprocessing.Process(target=ListThread, args=(i, test_subdir)))
|
|
for i in range(NUM_UPLOAD_THREADS):
|
|
threads.append(multiprocessing.Process(target=UploadThread, args=(i, test_subdir)))
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
|
|
|
|
|