|
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import random
|
|
import sys
|
|
|
|
import boto
|
|
import boto.s3.connection
|
|
from boto.exception import S3ResponseError
|
|
|
|
|
|
HOST = 'XXX'
|
|
PORT = 80
|
|
CREDENTIALS = {
|
|
'access_key': 'XXX',
|
|
'secret_key': 'XXX',
|
|
}
|
|
|
|
|
|
def create_connection():
|
|
return boto.connect_s3(
|
|
host=HOST,
|
|
port=PORT,
|
|
aws_access_key_id=CREDENTIALS['access_key'],
|
|
aws_secret_access_key=CREDENTIALS['secret_key'],
|
|
calling_format = boto.s3.connection.OrdinaryCallingFormat(),
|
|
is_secure=False
|
|
)
|
|
|
|
def gen_block(size_a, size_b, i):
|
|
random.seed(i)
|
|
return size_b * ''.join(chr(random.randint(0,255)) for x in xrange(size_a))
|
|
|
|
|
|
|
|
def main(argv0):
|
|
|
|
bucket_name = 'test-bucket'
|
|
key_name = 'test-key'
|
|
|
|
# Connect to S3
|
|
s3c = create_connection()
|
|
|
|
# Bucket
|
|
b = s3c.create_bucket(bucket_name)
|
|
|
|
# Create a test file with custom content-type
|
|
k = b.new_key(key_name)
|
|
k.set_contents_from_string(
|
|
gen_block(1024, 1024, 0),
|
|
headers = { 'Content-Type': 'application/x-test' }
|
|
)
|
|
k.close()
|
|
|
|
# Check content-type of original
|
|
k = b.get_key(key_name)
|
|
print "Original content-type: %s" % k.content_type
|
|
k.close()
|
|
|
|
# Move it
|
|
k = b.get_key(key_name)
|
|
k.copy(k.bucket.name, key_name + '.copy')
|
|
k.delete()
|
|
k.close()
|
|
|
|
# Check content type of copy
|
|
k = b.new_key(key_name + '.copy')
|
|
print "Copy content-type: %s" % k.content_type
|
|
k.close()
|
|
|
|
# Purge bucket
|
|
for k in b.list():
|
|
k.delete()
|
|
s3c.delete_bucket(bucket_name)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main(*sys.argv)
|