Skip to content
This repository was archived by the owner on Jul 21, 2022. It is now read-only.
Open
153 changes: 127 additions & 26 deletions conductor/lib/uploader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import hashlib
import time
import json
import logging
Expand All @@ -14,6 +15,11 @@

LOG_FORMATTER = logging.Formatter('%(asctime)s %(name)s%(levelname)9s %(threadName)s: %(message)s')

# Files larger than 5GB will be uploaded in parts of this size
# This value must be in sync with the File API
# TODO: Query File API for this value
MULTIPART_SIZE = 1 * 1024 * 1024 * 1024

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -46,7 +52,8 @@ def do_work(self, job, thread_int):
logger.error(message)
raise Exception(message)
self.metric_store.set_dict('file_md5s', filename, current_md5)
return (filename, current_md5)
size_bytes = os.path.getsize(filename)
return (filename, current_md5, size_bytes)

def get_md5(self, filepath):
'''
Expand Down Expand Up @@ -93,7 +100,7 @@ def __init__(self, *args, **kwargs):
super(MD5OutputWorker, self).__init__(*args, **kwargs)
self.batch_size = 20 # the controlls the batch size for http get_signed_urls
self.wait_time = 1
self.batch = {}
self.batch = []

def check_for_poison_pill(self, job):
''' we need to make sure we ship the last batch before we terminate '''
Expand All @@ -108,7 +115,7 @@ def ship_batch(self):
if self.batch:
logger.debug('sending batch: %s', self.batch)
self.put_job(self.batch)
self.batch = {}
self.batch = []

def target(self, thread_int):

Expand All @@ -119,8 +126,12 @@ def target(self, thread_int):

self.check_for_poison_pill(file_md5_tuple)

# add (filepath: md5) to the batch dict
self.batch[file_md5_tuple[0]] = file_md5_tuple[1]
# add file info to the batch list
self.batch.append({
'path': file_md5_tuple[0],
'hash': file_md5_tuple[1],
'size_bytes': file_md5_tuple[2],
})

# if the batch is self.batch_size, ship it
if len(self.batch) == self.batch_size:
Expand All @@ -142,9 +153,28 @@ def target(self, thread_int):

class HttpBatchWorker(worker.ThreadWorker):
'''
This worker receives a batched dict of (filename: md5) pairs and makes a
batched http api call which returns a list of (filename: signed_upload_url)
of files that need to be uploaded.
This worker receives a batched list of file info and makes a
batched http api call which returns a list of files with upload
info that need to be uploaded.

in_queue: {
"path": file_path,
"hash": md5,
"size_bytes": 123
}

out_queue: [
{
"hash": md5,
"url": signed_upload_url,
"multipart": True,
"path": file_path,
"upload_id": upload_id,
"parts": [
signed_multipart_url,
]
}
]

Each item in the return list is added to the out_queue.
'''
Expand All @@ -155,7 +185,7 @@ def __init__(self, *args, **kwargs):
self.project = kwargs.get('project')

def make_request(self, job):
uri_path = '/api/files/get_upload_urls'
uri_path = '/api/v2/files/get_upload_urls'
headers = {'Content-Type': 'application/json'}
data = {"upload_files": job,
"project": self.project}
Expand All @@ -180,12 +210,12 @@ def do_work(self, job, thread_int):


'''
This worker subscribes to a queue of (path,signed_upload_url) pairs.
This worker subscribes to a queue of file upload info.

For each item on the queue, it determines the size (in bytes) of the files to be
uploaded, and aggregates the total size for all uploads.

It then places the triplet (filepath, upload_url, byte_size) onto the out_queue
It then passes the input file upload info back into the out_queue

The bytes_to_upload arg is used to hold the aggregated size of all files that need
to be uploaded. Note: This is stored as an [int] in order to pass it by
Expand All @@ -199,14 +229,15 @@ def __init__(self, *args, **kwargs):

def do_work(self, job, thread_int):
'''
Job is a dict of filepath: signed_upload_url pairs.
The FileStatWorker iterates through the dict.
For each item, it aggregates the filesize in bytes, and passes each
pair as a tuple to the UploadWorker queue.
Job is a list of file upload info returned from File API.
The FileStatWorker iterates through the list.
For each item, it aggregates the filesize in bytes, and passes
the upload info into the UploadWorker queue.
'''

# iterate through a dict of (filepath: upload_url) pairs
for path, upload_url in job.iteritems():
# iterate through a list of file upload info dicts
for upload in job:
path = upload["path"]
if not os.path.isfile(path):
return None
# logger.debug('stat: %s', path)
Expand All @@ -215,15 +246,15 @@ def do_work(self, job, thread_int):
self.metric_store.increment('bytes_to_upload', byte_count)
self.metric_store.increment('num_files_to_upload')

self.put_job((path, upload_url))
self.put_job(upload)

# make sure we return None, so no message is automatically added to the out_queue
return None


class UploadWorker(worker.ThreadWorker):
'''
This worker receives a (filepath: signed_upload_url) pair and performs an upload
This worker receives file upload info and performs an upload
of the specified file to the provided url.
'''

Expand All @@ -232,6 +263,7 @@ def __init__(self, *args, **kwargs):
self.chunk_size = 1048576 # 1M
self.report_size = 10485760 # 10M
self.api_client = api_client.ApiClient()
self.project = kwargs.get('project')

def chunked_reader(self, filename):
with open(filename, 'rb') as fp:
Expand All @@ -247,11 +279,16 @@ def chunked_reader(self, filename):
self.metric_store.increment('bytes_uploaded', len(data), filename)

def do_work(self, job, thread_int):
filename = job[0]
upload_url = job[1]
'''
If the file is larger than 5GB on S3, File API will return
multiple urls for a multipart upload.
'''
filename = job["path"]
md5 = self.metric_store.get_dict('file_md5s', filename)
try:
return self.do_upload(upload_url, filename, md5)
if job["multipart"]:
return self.do_multipart_upload(job, filename, md5)
return [self.do_upload(job["url"], filename, md5)]
except:
logger.exception("Failed to upload file: %s because of:\n", filename)
real_md5 = common.get_base64_md5(filename)
Expand All @@ -269,7 +306,6 @@ def do_upload(self, upload_url, filename, md5):

We cannot reuse make_request method for S3 because it adds auth headers that
S3 does not accept. S3 also requires additional signatures to support chunked data.
For now S3 uploads will read all data in memory resulting in a smaller supported size.
'''

if "amazonaws" in upload_url:
Expand All @@ -295,9 +331,75 @@ def do_upload(self, upload_url, filename, md5):
tries=1,
use_api_key=True)

@common.DecRetry(retry_exceptions=api_client.CONNECTION_EXCEPTIONS, tries=5)
def do_multipart_upload(self, job, filename, md5):
'''
Files larger than 5GB will be split into 1GB parts and hydrated in S3
once all parts are uploaded.

On successful part upload to S3, it will return an ETag. This value must be
tracked along with the part number in order to complete and hydrate the file in S3.

class Uploader(object):
The multipart completion call can take a long time on larger files. Tests on
larger files (~100GB) took between 1 and 2 seconds to complete.
'''
uploads = []
complete_payload = {
"upload_id": job["upload_id"],
"hash": md5,
"completed_parts": [],
"project": self.project
}

# iterate over parts and upload
for part_number, part_url in enumerate(job["parts"], 1):
resp = self.do_part_upload(part_url, filename, part_number=part_number)
uploads.append(resp)
complete_payload["completed_parts"].append(resp.headers['ETag'].strip('"'))

# Complete multipart upload in order to hydrate file in S3 for availability
uri_path = '/api/v2/files/multipart/complete'
headers = {'Content-Type': 'application/json'}
self.api_client.make_request(uri_path=uri_path,
verb='POST',
headers=headers,
data=json.dumps(complete_payload),
raise_on_error=True,
use_api_key=True)

return uploads

def do_part_upload(self, upload_url, filename, part_number):
with open(filename, 'rb') as fh:
# seek to the correct part position
start = (part_number - 1) * MULTIPART_SIZE
fh.seek(start)

# read up to MULTIPART_SIZE
data = fh.read(MULTIPART_SIZE)

# calculate md5 in order to compare with returned ETag later
file_hash = hashlib.md5()
file_hash.update(data)
digest = file_hash.hexdigest()

# upload part
resp = self.api_client._make_request(verb="PUT",
conductor_url=upload_url,
headers={},
params=None,
data=data)

# verify data
etag = resp.headers['ETag']
if etag.strip('"') != digest:
error_message = "ETag for multipart upload part does not match expected md5\n"
error_message += "expected md5 is %s, returned md5 is %s" % (digest, etag)
raise Exception(error_message)
return resp


class Uploader(object):
sleep_time = 10

def __init__(self, args=None):
Expand Down Expand Up @@ -333,7 +435,7 @@ def create_manager(self, project, md5_only=False):
(HttpBatchWorker, [], {'thread_count': self.args['thread_count'],
"project": project}),
(FileStatWorker, [], {'thread_count': 1}),
(UploadWorker, [], {'thread_count': self.args['thread_count']}),
(UploadWorker, [], {'thread_count': self.args['thread_count'], "project": project}),
]

manager = worker.JobManager(job_description)
Expand Down Expand Up @@ -736,7 +838,6 @@ def resolve_arg(arg_name, args, config):
# Otherwise use the value in the config if it's there, otherwise default to None
return config.get(arg_name)


# @common.dec_timer_exitlog_level=logging.DEBUG
# def test_md5_system(dirpath):
# '''
Expand Down