Я пытаюсь загрузить 10 Гб файл на AWS S3, а кто-то сказал использовать S3 Multipart автозагрузку, так что я наткнулся на чью-то GitHub сутью:Почему этот код python boto S3 не работает?
import os
import sys
import glob
import subprocess
import contextlib
import functools
import multiprocessing
from multiprocessing.pool import IMapIterator
from optparse import OptionParser
from boto.s3.connection import S3Connection
#import rfc822
import boto
AWS_ACCESS_KEY_ID = 'KEY ID HERE'
AWS_SECRET_ACCESS_KEY = 'ACCESS KEY HERE'
def main(transfer_file, bucket_name, s3_key_name=None, use_rr=True,
make_public=True, cores=None):
if s3_key_name is None:
s3_key_name = os.path.basename(transfer_file)
conn = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
bucket = conn.lookup(bucket_name)
if bucket is None:
bucket = conn.create_bucket(bucket_name)
mb_size = os.path.getsize(transfer_file)/1e6
if mb_size < 10:
_standard_transfer(bucket, s3_key_name, transfer_file, use_rr)
else:
_multipart_upload(bucket, s3_key_name, transfer_file, mb_size, use_rr,
cores)
s3_key = bucket.get_key(s3_key_name)
if make_public:
s3_key.set_acl("public-read")
def upload_cb(complete, total):
sys.stdout.write(".")
sys.stdout.flush()
def _standard_transfer(bucket, s3_key_name, transfer_file, use_rr):
print(" Upload with standard transfer, not multipart", end=' ')
new_s3_item = bucket.new_key(s3_key_name)
new_s3_item.set_contents_from_filename(transfer_file, reduced_redundancy=use_rr,
cb=upload_cb, num_cb=10)
print()
def map_wrap(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
def mp_from_ids(mp_id, mp_keyname, mp_bucketname):
"""Get the multipart upload from the bucket and multipart IDs.
This allows us to reconstitute a connection to the upload
from within multiprocessing functions.
"""
conn = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
bucket = conn.lookup(mp_bucketname)
mp = boto.s3.multipart.MultiPartUpload(bucket)
mp.key_name = mp_keyname
mp.id = mp_id
return mp
@map_wrap
def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part):
"""Transfer a part of a multipart upload. Designed to be run in parallel.
"""
mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname)
print(" Transferring", i, part)
with open(part) as t_handle:
mp.upload_part_from_file(t_handle, i+1)
os.remove(part)
def _multipart_upload(bucket, s3_key_name, tarball, mb_size, use_rr=True,
cores=None):
"""Upload large files using Amazon's multipart upload functionality.
"""
def split_file(in_file, mb_size, split_num=5):
prefix = os.path.join(os.path.dirname(in_file),
"%sS3PART" % (os.path.basename(s3_key_name)))
# require a split size between 5Mb (AWS minimum) and 250Mb
split_size = int(max(min(mb_size/(split_num * 2.0), 250), 5))
if not os.path.exists("%saa" % prefix):
cl = ["split", "-b%sm" % split_size, in_file, prefix]
subprocess.check_call(cl)
return sorted(glob.glob("%s*" % prefix))
mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
print(mp.id)
print(mp.key_name)
with multimap(cores) as pmap:
for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part) for (i, part) in enumerate(split_file(tarball, mb_size, cores)))):
pass
mp.complete_upload()
@contextlib.contextmanager
def multimap(cores=None):
"""Provide multiprocessing imap like function.
The context manager handles setting up the pool, worked around interrupt issues
and terminating the pool on completion.
"""
if cores is None:
cores = max(multiprocessing.cpu_count() - 1, 1)
def wrapper(func):
def wrap(self, timeout=None):
return func(self, timeout=timeout if timeout is not None else 1e100)
return wrap
IMapIterator.next = wrapper(IMapIterator.next)
pool = multiprocessing.Pool(cores)
yield pool.imap
pool.terminate()
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-r", "--norr", dest="use_rr",
action="store_false", default=True)
parser.add_option("-p", "--public", dest="make_public",
action="store_true", default=False)
parser.add_option("-c", "--cores", dest="cores",
default=multiprocessing.cpu_count())
(options, args) = parser.parse_args()
if len(args) < 2:
print("No Args")
sys.exit()
kwargs = dict(use_rr=options.use_rr, make_public=options.make_public,
cores=int(options.cores))
main(*args, **kwargs)
но не работает, и я не знаю, как исправить ошибку "TypeError: transfer_part() недостающий 4 необходимые позиционных аргументы: 'mp_keyname', 'mp_bucketname', 'я' и 'часть'"
EDIT:
Полная Трассировка ошибки с просьбой:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "test.py", line 53, in wrapper
return f(*args, **kwargs)
TypeError: transfer_part() missing 4 required positional arguments: 'mp_keyname', 'mp_bucketname', 'i', and 'part'
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "test.py", line 132, in <module>
main(*args, **kwargs)
File "test.py", line 34, in main
cores)
File "test.py", line 96, in _multipart_upload
for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part) for (i, part) in enumerate(split_file(tarball, mb_size, cores)))):
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 689, in next
raise value
TypeError: transfer_part() missing 4 required positional arguments: 'mp_keyname', 'mp_bucketname', 'i', and 'part'
Просьба показать полную ошибку. [ask] – boardrider
добавлена полная ошибка –