|
#!/usr/bin/python
|
|
|
|
import sys
|
|
import boto
|
|
import boto.s3.key
|
|
import boto.s3.connection
|
|
|
|
# change your test users, access and secret keys here
|
|
|
|
user = [{'username': 'testuser1', 'access_key': 'BAX3MDFDOP3486G240TW', 'secret_key': 'te+6j+E7ROa7NDyouUQeZoOzCx7p1Hv95S3TpOkA'},
|
|
{'username': 'testuser2', 'access_key': 'ECBRXYBYCBY7J6VD9PHD', 'secret_key': '2DpVj83Baf3lzMwg0YV7qStTUv7qPGigG2rPIHJg'}]
|
|
|
|
uploadfile = '/etc/os-release'
|
|
bucket_name = user[0]['username'] + '-99999-transfer-bug-bucket'
|
|
file_name = user[0]['username'] + '-99999-test-file'
|
|
|
|
hostname = 'localhost'
|
|
|
|
def connect_s3(user_id=0):
|
|
return boto.connect_s3(
|
|
aws_access_key_id = user[user_id]['access_key'],
|
|
aws_secret_access_key = user[user_id]['secret_key'],
|
|
is_secure = False,
|
|
host = hostname,
|
|
port = 8000,
|
|
calling_format = boto.s3.connection.OrdinaryCallingFormat(),
|
|
)
|
|
|
|
def pause(next_task):
|
|
raw_input('Press <Enter> to continue with ' + next_task + '...')
|
|
|
|
def print_content(user_id):
|
|
print "Content as per " + user[user_id]['username']
|
|
connection = connect_s3(user_id)
|
|
bucket = connection.get_bucket(bucket_name, validate=False)
|
|
print bucket.get_acl()
|
|
for k in bucket.list():
|
|
try:
|
|
k2 = bucket.get_key(k.name)
|
|
print "'{name}' by '{owner}' ['meta1': '{meta1}', 'meta2': '{meta2}']".format(
|
|
name = k.name,
|
|
owner = k.owner.display_name,
|
|
meta1 = k2.get_metadata('meta1'),
|
|
meta2 = k2.get_metadata('meta2'),
|
|
)
|
|
print "'{name}' => {acl}".format(
|
|
name = k.name,
|
|
acl = k.get_acl()
|
|
)
|
|
except boto.exception.S3ResponseError:
|
|
print "Error reading " + k.name
|
|
|
|
def create_and_grant():
|
|
print "Creating '{bucket}".format(
|
|
bucket = bucket_name
|
|
)
|
|
conn = connect_s3(0)
|
|
conn2 = connect_s3(1)
|
|
bucket = conn.create_bucket(bucket_name)
|
|
bucket.add_user_grant('FULL_CONTROL', conn2.get_canonical_user_id(), recursive=True)
|
|
k = boto.s3.key.Key(bucket)
|
|
k.key = file_name
|
|
k.set_contents_from_filename(uploadfile)
|
|
k.add_user_grant('FULL_CONTROL', conn2.get_canonical_user_id())
|
|
|
|
def copy_object():
|
|
print "Copying object"
|
|
conn = connect_s3(1)
|
|
bucket = conn.get_bucket(bucket_name, validate=False)
|
|
bucket.copy_key(file_name + "-02", bucket_name, file_name)
|
|
|
|
create_and_grant()
|
|
pause ("copy object")
|
|
copy_object()
|
|
pause("print results")
|
|
print_content(0)
|
|
print_content(1)
|