diff --git a/conductor/lib/uploader.py b/conductor/lib/uploader.py index 7eae929f..d0d3e660 100644 --- a/conductor/lib/uploader.py +++ b/conductor/lib/uploader.py @@ -1,4 +1,5 @@ import datetime +import hashlib import time import json import logging @@ -14,6 +15,11 @@ LOG_FORMATTER = logging.Formatter('%(asctime)s %(name)s%(levelname)9s %(threadName)s: %(message)s') +# Files larger than 5GB will be uploaded in parts of this size +# This value must be in sync with the File API +# TODO: Query File API for this value +MULTIPART_SIZE = 1 * 1024 * 1024 * 1024 + logger = logging.getLogger(__name__) @@ -46,7 +52,8 @@ def do_work(self, job, thread_int): logger.error(message) raise Exception(message) self.metric_store.set_dict('file_md5s', filename, current_md5) - return (filename, current_md5) + size_bytes = os.path.getsize(filename) + return (filename, current_md5, size_bytes) def get_md5(self, filepath): ''' @@ -93,7 +100,7 @@ def __init__(self, *args, **kwargs): super(MD5OutputWorker, self).__init__(*args, **kwargs) self.batch_size = 20 # the controlls the batch size for http get_signed_urls self.wait_time = 1 - self.batch = {} + self.batch = [] def check_for_poison_pill(self, job): ''' we need to make sure we ship the last batch before we terminate ''' @@ -108,7 +115,7 @@ def ship_batch(self): if self.batch: logger.debug('sending batch: %s', self.batch) self.put_job(self.batch) - self.batch = {} + self.batch = [] def target(self, thread_int): @@ -119,8 +126,12 @@ def target(self, thread_int): self.check_for_poison_pill(file_md5_tuple) - # add (filepath: md5) to the batch dict - self.batch[file_md5_tuple[0]] = file_md5_tuple[1] + # add file info to the batch list + self.batch.append({ + 'path': file_md5_tuple[0], + 'hash': file_md5_tuple[1], + 'size_bytes': file_md5_tuple[2], + }) # if the batch is self.batch_size, ship it if len(self.batch) == self.batch_size: @@ -142,9 +153,28 @@ def target(self, thread_int): class HttpBatchWorker(worker.ThreadWorker): ''' - This worker receives a batched dict of (filename: md5) pairs and makes a - batched http api call which returns a list of (filename: signed_upload_url) - of files that need to be uploaded. + This worker receives a batched list of file info and makes a + batched http api call which returns a list of files with upload + info that need to be uploaded. + + in_queue: { + "path": file_path, + "hash": md5, + "size_bytes": 123 + } + + out_queue: [ + { + "hash": md5, + "url": signed_upload_url, + "multipart": True, + "path": file_path, + "upload_id": upload_id, + "parts": [ + signed_multipart_url, + ] + } + ] Each item in the return list is added to the out_queue. ''' @@ -155,7 +185,7 @@ def __init__(self, *args, **kwargs): self.project = kwargs.get('project') def make_request(self, job): - uri_path = '/api/files/get_upload_urls' + uri_path = '/api/v2/files/get_upload_urls' headers = {'Content-Type': 'application/json'} data = {"upload_files": job, "project": self.project} @@ -180,12 +210,12 @@ def do_work(self, job, thread_int): ''' -This worker subscribes to a queue of (path,signed_upload_url) pairs. +This worker subscribes to a queue of file upload info. For each item on the queue, it determines the size (in bytes) of the files to be uploaded, and aggregates the total size for all uploads. -It then places the triplet (filepath, upload_url, byte_size) onto the out_queue +It then passes the input file upload info back into the out_queue The bytes_to_upload arg is used to hold the aggregated size of all files that need to be uploaded. Note: This is stored as an [int] in order to pass it by @@ -199,14 +229,15 @@ def __init__(self, *args, **kwargs): def do_work(self, job, thread_int): ''' - Job is a dict of filepath: signed_upload_url pairs. - The FileStatWorker iterates through the dict. - For each item, it aggregates the filesize in bytes, and passes each - pair as a tuple to the UploadWorker queue. + Job is a list of file upload info returned from File API. + The FileStatWorker iterates through the list. + For each item, it aggregates the filesize in bytes, and passes + the upload info into the UploadWorker queue. ''' - # iterate through a dict of (filepath: upload_url) pairs - for path, upload_url in job.iteritems(): + # iterate through a list of file upload info dicts + for upload in job: + path = upload["path"] if not os.path.isfile(path): return None # logger.debug('stat: %s', path) @@ -215,7 +246,7 @@ def do_work(self, job, thread_int): self.metric_store.increment('bytes_to_upload', byte_count) self.metric_store.increment('num_files_to_upload') - self.put_job((path, upload_url)) + self.put_job(upload) # make sure we return None, so no message is automatically added to the out_queue return None @@ -223,7 +254,7 @@ def do_work(self, job, thread_int): class UploadWorker(worker.ThreadWorker): ''' - This worker receives a (filepath: signed_upload_url) pair and performs an upload + This worker receives file upload info and performs an upload of the specified file to the provided url. ''' @@ -232,6 +263,7 @@ def __init__(self, *args, **kwargs): self.chunk_size = 1048576 # 1M self.report_size = 10485760 # 10M self.api_client = api_client.ApiClient() + self.project = kwargs.get('project') def chunked_reader(self, filename): with open(filename, 'rb') as fp: @@ -247,11 +279,16 @@ def chunked_reader(self, filename): self.metric_store.increment('bytes_uploaded', len(data), filename) def do_work(self, job, thread_int): - filename = job[0] - upload_url = job[1] + ''' + If the file is larger than 5GB on S3, File API will return + multiple urls for a multipart upload. + ''' + filename = job["path"] md5 = self.metric_store.get_dict('file_md5s', filename) try: - return self.do_upload(upload_url, filename, md5) + if job["multipart"]: + return self.do_multipart_upload(job, filename, md5) + return [self.do_upload(job["url"], filename, md5)] except: logger.exception("Failed to upload file: %s because of:\n", filename) real_md5 = common.get_base64_md5(filename) @@ -269,7 +306,6 @@ def do_upload(self, upload_url, filename, md5): We cannot reuse make_request method for S3 because it adds auth headers that S3 does not accept. S3 also requires additional signatures to support chunked data. - For now S3 uploads will read all data in memory resulting in a smaller supported size. ''' if "amazonaws" in upload_url: @@ -295,9 +331,75 @@ def do_upload(self, upload_url, filename, md5): tries=1, use_api_key=True) + @common.DecRetry(retry_exceptions=api_client.CONNECTION_EXCEPTIONS, tries=5) + def do_multipart_upload(self, job, filename, md5): + ''' + Files larger than 5GB will be split into 1GB parts and hydrated in S3 + once all parts are uploaded. + + On successful part upload to S3, it will return an ETag. This value must be + tracked along with the part number in order to complete and hydrate the file in S3. -class Uploader(object): + The multipart completion call can take a long time on larger files. Tests on + larger files (~100GB) took between 1 and 2 seconds to complete. + ''' + uploads = [] + complete_payload = { + "upload_id": job["upload_id"], + "hash": md5, + "completed_parts": [], + "project": self.project + } + + # iterate over parts and upload + for part_number, part_url in enumerate(job["parts"], 1): + resp = self.do_part_upload(part_url, filename, part_number=part_number) + uploads.append(resp) + complete_payload["completed_parts"].append(resp.headers['ETag'].strip('"')) + + # Complete multipart upload in order to hydrate file in S3 for availability + uri_path = '/api/v2/files/multipart/complete' + headers = {'Content-Type': 'application/json'} + self.api_client.make_request(uri_path=uri_path, + verb='POST', + headers=headers, + data=json.dumps(complete_payload), + raise_on_error=True, + use_api_key=True) + + return uploads + + def do_part_upload(self, upload_url, filename, part_number): + with open(filename, 'rb') as fh: + # seek to the correct part position + start = (part_number - 1) * MULTIPART_SIZE + fh.seek(start) + + # read up to MULTIPART_SIZE + data = fh.read(MULTIPART_SIZE) + + # calculate md5 in order to compare with returned ETag later + file_hash = hashlib.md5() + file_hash.update(data) + digest = file_hash.hexdigest() + + # upload part + resp = self.api_client._make_request(verb="PUT", + conductor_url=upload_url, + headers={}, + params=None, + data=data) + + # verify data + etag = resp.headers['ETag'] + if etag.strip('"') != digest: + error_message = "ETag for multipart upload part does not match expected md5\n" + error_message += "expected md5 is %s, returned md5 is %s" % (digest, etag) + raise Exception(error_message) + return resp + +class Uploader(object): sleep_time = 10 def __init__(self, args=None): @@ -333,7 +435,7 @@ def create_manager(self, project, md5_only=False): (HttpBatchWorker, [], {'thread_count': self.args['thread_count'], "project": project}), (FileStatWorker, [], {'thread_count': 1}), - (UploadWorker, [], {'thread_count': self.args['thread_count']}), + (UploadWorker, [], {'thread_count': self.args['thread_count'], "project": project}), ] manager = worker.JobManager(job_description) @@ -736,7 +838,6 @@ def resolve_arg(arg_name, args, config): # Otherwise use the value in the config if it's there, otherwise default to None return config.get(arg_name) - # @common.dec_timer_exitlog_level=logging.DEBUG # def test_md5_system(dirpath): # '''