From 389c3eb49131cb885d30297a6a248289d72d6aac Mon Sep 17 00:00:00 2001 From: David Farkas Date: Fri, 2 Mar 2018 18:18:11 +0100 Subject: [PATCH 1/3] First version of signed url upload for reapers --- api/api.py | 1 + api/files.py | 11 +-- api/upload.py | 192 ++++++++++++++++++++++++++++++++++++++++++++------ api/util.py | 17 +++++ 4 files changed, 196 insertions(+), 25 deletions(-) diff --git a/api/api.py b/api/api.py index db7a9dc70..140e74fb6 100644 --- a/api/api.py +++ b/api/api.py @@ -87,6 +87,7 @@ def prefix(path, routes): route('/download', Download, h='download', m=['GET', 'POST']), route('/download/summary', Download, h='summary', m=['POST']), route('/upload/', Upload, h='upload', m=['POST']), + route('/upload/signed/', Upload, h='upload_signed_url', m=['POST']), route('/clean-packfiles', Upload, h='clean_packfile_tokens', m=['POST']), route('/engine', Upload, h='engine', m=['POST']), diff --git a/api/files.py b/api/files.py index 5b2bfaeee..fe46ed692 100644 --- a/api/files.py +++ b/api/files.py @@ -15,9 +15,12 @@ DEFAULT_HASH_ALG = 'sha384' class FileProcessor(object): - def __init__(self, base, presistent_fs): + def __init__(self, base, presistent_fs, tempdir_name=None): self.base = base - self._tempdir_name = str(uuid.uuid4()) + if not tempdir_name: + self._tempdir_name = str(uuid.uuid4()) + else: + self._tempdir_name = tempdir_name self._presistent_fs = presistent_fs self._presistent_fs.makedirs(fs.path.join('tmp', self._tempdir_name), recreate=True) self._temp_fs = fs.subfs.SubFS(presistent_fs, fs.path.join('tmp', self._tempdir_name)) @@ -196,10 +199,10 @@ def get_valid_file(file_info): raise fs.errors.ResourceNotFound('File not found: %s', file_info['name']) -def get_signed_url(file_path, file_system, filename=None): +def get_signed_url(file_path, file_system, filename=None, purpose='download'): try: if hasattr(file_system, 'get_signed_url'): - return file_system.get_signed_url(file_path, filename=filename) + return file_system.get_signed_url(file_path, filename=filename, purpose=purpose) except fs.errors.NoURL: return None diff --git a/api/upload.py b/api/upload.py index 90c753141..18487c6a6 100644 --- a/api/upload.py +++ b/api/upload.py @@ -16,20 +16,22 @@ log = config.log Strategy = util.Enum('Strategy', { - 'targeted' : pl.TargetedPlacer, # Upload N files to a container. - 'engine' : pl.EnginePlacer, # Upload N files from the result of a successful job. - 'token' : pl.TokenPlacer, # Upload N files to a saved folder based on a token. - 'packfile' : pl.PackfilePlacer, # Upload N files as a new packfile to a container. - 'labelupload' : pl.LabelPlacer, - 'uidupload' : pl.UIDPlacer, - 'uidmatch' : pl.UIDMatchPlacer, - 'reaper' : pl.UIDReaperPlacer, - 'analysis' : pl.AnalysisPlacer, # Upload N files to an analysis as input and output (no db updates) - 'analysis_job': pl.AnalysisJobPlacer, # Upload N files to an analysis as output from job results - 'gear' : pl.GearPlacer + 'targeted': pl.TargetedPlacer, # Upload N files to a container. + 'engine': pl.EnginePlacer, # Upload N files from the result of a successful job. + 'token': pl.TokenPlacer, # Upload N files to a saved folder based on a token. + 'packfile': pl.PackfilePlacer, # Upload N files as a new packfile to a container. + 'labelupload': pl.LabelPlacer, + 'uidupload': pl.UIDPlacer, + 'uidmatch': pl.UIDMatchPlacer, + 'reaper': pl.UIDReaperPlacer, + 'analysis': pl.AnalysisPlacer, # Upload N files to an analysis as input and output (no db updates) + 'analysis_job': pl.AnalysisJobPlacer, # Upload N files to an analysis as output from job results + 'gear': pl.GearPlacer }) -def process_upload(request, strategy, container_type=None, id_=None, origin=None, context=None, response=None, metadata=None): + +def process_upload(request, strategy, container_type=None, id_=None, origin=None, context=None, response=None, + metadata=None): """ Universal file upload entrypoint. @@ -64,7 +66,8 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None if id_ is not None and container_type == None: raise Exception('Unspecified container type') - if container_type is not None and container_type not in ('acquisition', 'session', 'project', 'collection', 'analysis', 'gear'): + if container_type is not None and container_type not in ( + 'acquisition', 'session', 'project', 'collection', 'analysis', 'gear'): raise Exception('Unknown container type') timestamp = datetime.datetime.utcnow() @@ -104,9 +107,9 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None # Augment the cgi.FieldStorage with a variety of custom fields. # Not the best practice. Open to improvements. # These are presumbed to be required by every function later called with field as a parameter. - field.path = field.filename + field.path = field.filename if not file_processor.temp_fs.exists(field.path): - #tempdir_exists = os.path.exists(tempdir.name) + # tempdir_exists = os.path.exists(tempdir.name) raise Exception("file {} does not exist, files in tmpdir: {}".format( field.path, file_processor.temp_fs.listdir('/'), @@ -121,9 +124,9 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None # Stands in for a dedicated object... for now. file_attrs = { '_id': field.uuid, - 'name': field.filename, + 'name': field.filename, 'modified': field.modified, - 'size': field.size, + 'size': field.size, 'mimetype': field.mimetype, 'hash': field.hash, 'origin': origin, @@ -143,7 +146,7 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None raise Exception("Programmer error: response required") elif placer.sse: response.headers['Content-Type'] = 'text/event-stream; charset=utf-8' - response.headers['Connection'] = 'keep-alive' + response.headers['Connection'] = 'keep-alive' # Instead of handing the iterator off to response.app_iter, send it ourselves. # This prevents disconnections from leaving the API in a partially-complete state. @@ -156,7 +159,99 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None for item in placer.finalize(): try: response.write(item) - except Exception: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except + log.info('SSE upload progress failed to send; continuing') + + return + else: + return placer.finalize() + + +def process_signed_url_upload(request, strategy, tempdir, filename, metadata, container_type=None, id_=None, origin=None, context=None, + response=None): + """ + """ + + if not isinstance(strategy, Strategy): + raise Exception('Unknown upload strategy') + + if id_ is not None and container_type == None: + raise Exception('Unspecified container type') + + if container_type is not None and container_type not in ( + 'acquisition', 'session', 'project', 'collection', 'analysis', 'gear'): + raise Exception('Unknown container type') + + timestamp = datetime.datetime.utcnow() + + container = None + if container_type and id_: + container = hierarchy.get_container(container_type, id_) + + # The vast majority of this function's wall-clock time is spent here. + # Tempdir is deleted off disk once out of scope, so let's hold onto this reference. + file_processor = files.FileProcessor(config.get_item('persistent', 'data_path'), config.fs, tempdir_name=tempdir) + + placer_class = strategy.value + placer = placer_class(container_type, container, id_, metadata, timestamp, origin, context, file_processor) + placer.check() + + field = util.dotdict({}) + field.filename = filename + field.path = field.filename + if not file_processor.temp_fs.exists(field.path): + # tempdir_exists = os.path.exists(tempdir.name) + raise Exception("file {} does not exist, files in tmpdir: {}".format( + field.path, + file_processor.temp_fs.listdir('/'), + )) + field.size = file_processor.temp_fs.getsize(field.path) + # TODO: somehow figure out the hash of the file + # field.hash = util.format_hash(files.DEFAULT_HASH_ALG, field.hasher.hexdigest()) + field.uuid = str(uuid.uuid4()) + field.mimetype = util.guess_mimetype(field.filename) # TODO: does not honor metadata's mime type if any + field.modified = timestamp + + # create a file-attribute map commonly used elsewhere in the codebase. + # Stands in for a dedicated object... for now. + file_attrs = { + '_id': field.uuid, + 'name': field.filename, + 'modified': field.modified, + 'size': field.size, + 'mimetype': field.mimetype, + #'hash': field.hash, + 'origin': origin, + + 'type': None, + 'modality': None, + 'measurements': [], + 'tags': [], + 'info': {} + } + + file_attrs['type'] = files.guess_type_from_filename(file_attrs['name']) + placer.process_file_field(field, file_attrs) + + # Respond either with Server-Sent Events or a standard json map + if placer.sse and not response: + raise Exception("Programmer error: response required") + elif placer.sse: + response.headers['Content-Type'] = 'text/event-stream; charset=utf-8' + response.headers['Connection'] = 'keep-alive' + + # Instead of handing the iterator off to response.app_iter, send it ourselves. + # This prevents disconnections from leaving the API in a partially-complete state. + # + # Timing out between events or throwing an exception will result in undefinied behaviour. + # Right now, in our environment: + # - Timeouts may result in nginx-created 500 Bad Gateway HTML being added to the response. + # - Exceptions add some error json to the response, which is not SSE-sanitized. + + for item in placer.finalize(): + try: + response.write(item) + except Exception: # pylint: disable=broad-except log.info('SSE upload progress failed to send; continuing') return @@ -166,6 +261,14 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None class Upload(base.RequestHandler): + def _check_ticket(self, ticket_id): + ticket = config.db.uploads.find_one({'_id': ticket_id}) + if not ticket: + self.abort(404, 'no such ticket') + if ticket['ip'] != self.request.client_addr: + self.abort(400, 'ticket not for this resource or source IP') + return ticket + def upload(self, strategy): """Receive a sortable reaper upload.""" @@ -189,6 +292,51 @@ def upload(self, strategy): self.abort(500, 'strategy {} not implemented'.format(strategy)) return process_upload(self.request, strategy, origin=self.origin, context=context) + def upload_signed_url(self, strategy): + if not self.superuser_request: + user = self.uid + if not user: + self.abort(403, 'Uploading requires login') + + payload = self.request.json_body + context = {'uid': self.uid if not self.superuser_request else None} + + # TODO: Make it available for the other strategies + if strategy == 'reaper': + strategy = Strategy.reaper + else: + self.abort(500, 'strategy {} not implemented'.format(strategy)) + + # Request for download ticket + if self.get_param('ticket') == '': + metadata = payload.get('metadata', None) + filename = payload.get('filename', None) + + if not (metadata or filename): + self.abort(404, 'metadata and filename are required') + + tempdir = str(uuid.uuid4()) + # Upload into a temp folder, so we will be able to cleanup + signed_url = files.get_signed_url(fs.path.join('tmp', tempdir, filename), config.fs, purpose='upload') + + if not signed_url: + self.abort(405, 'Signed URLs are not supported with the current storage backend') + + ticket = util.upload_ticket(self.request.client_addr, self.origin, tempdir, filename, metadata) + return {'ticket': config.db.uploads.insert_one(ticket).inserted_id, + 'upload_url': signed_url} + + # Check ticket id and skip permissions check if it clears + ticket_id = self.get_param('ticket') + ticket = None + if ticket_id: + ticket = self._check_ticket(ticket_id) + if not self.origin.get('id'): + # If we don't have an origin with this request, use the ticket's origin + self.origin = ticket.get('origin') + + return process_signed_url_upload(self.request, strategy, ticket['tempdir'], ticket['filename'], ticket['metadata'], origin=self.origin, context=context) + def engine(self): """Handles file uploads from the engine""" @@ -209,9 +357,11 @@ def engine(self): 'job_ticket_id': self.get_param('job_ticket'), } if level == 'analysis': - return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, id_=cid, context=context) + return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, + id_=cid, context=context) else: - return process_upload(self.request, Strategy.engine, container_type=level, id_=cid, origin=self.origin, context=context) + return process_upload(self.request, Strategy.engine, container_type=level, id_=cid, origin=self.origin, + context=context) def clean_packfile_tokens(self): """Clean up expired upload tokens and invalid token directories. diff --git a/api/util.py b/api/util.py index 87d1a5f7e..be618e83e 100644 --- a/api/util.py +++ b/api/util.py @@ -163,6 +163,18 @@ def download_ticket(ip, origin, type_, target, filename, size, projects = None): } +def upload_ticket(ip, origin, tempdir, filename, metadata): + return { + '_id': str(uuid.uuid4()), + 'timestamp': datetime.datetime.utcnow(), + 'ip': ip, + 'tempdir': tempdir, + 'filename': filename, + 'metadata': metadata, + 'origin': origin + } + + def guess_mimetype(filepath): """Guess MIME type based on filename.""" mime, _ = mimetypes.guess_type(filepath) @@ -355,3 +367,8 @@ def parse_range_header(range_header_val, valid_units=('bytes',)): ranges.append((first, last)) return ranges + + +class dotdict(dict): + def __getattr__(self, name): + return self[name] \ No newline at end of file From eac16f5a6dd6ba6840b9e31441afe0f24539540e Mon Sep 17 00:00:00 2001 From: David Farkas Date: Thu, 8 Mar 2018 12:19:05 +0100 Subject: [PATCH 2/3] Refactoring and add unit test --- api/config.py | 2 +- api/upload.py | 140 ++++++------------------ tests/unit_tests/python/test_uploads.py | 40 +++++++ 3 files changed, 73 insertions(+), 109 deletions(-) create mode 100644 tests/unit_tests/python/test_uploads.py diff --git a/api/config.py b/api/config.py index da24f7ff9..58973b3ed 100644 --- a/api/config.py +++ b/api/config.py @@ -248,7 +248,7 @@ def initialize_db(): log_db.access_log.create_index([('timestamp', pymongo.DESCENDING)]) create_or_recreate_ttl_index('authtokens', 'timestamp', 2592000) - create_or_recreate_ttl_index('uploads', 'timestamp', 60) + #create_or_recreate_ttl_index('uploads', 'timestamp', 60) create_or_recreate_ttl_index('downloads', 'timestamp', 60) create_or_recreate_ttl_index('job_tickets', 'timestamp', 3600) # IMPORTANT: this controls job orphan logic. Ref queue.py diff --git a/api/upload.py b/api/upload.py index 18487c6a6..3f862a41a 100644 --- a/api/upload.py +++ b/api/upload.py @@ -31,7 +31,7 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None, context=None, response=None, - metadata=None): + metadata=None, file_fields=None, tempdir=None): """ Universal file upload entrypoint. @@ -78,14 +78,17 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None # The vast majority of this function's wall-clock time is spent here. # Tempdir is deleted off disk once out of scope, so let's hold onto this reference. - file_processor = files.FileProcessor(config.get_item('persistent', 'data_path'), config.fs) - form = file_processor.process_form(request) + file_processor = files.FileProcessor(config.get_item('persistent', 'data_path'), config.fs, tempdir_name=tempdir) + if not file_fields: + form = file_processor.process_form(request) + # Non-file form fields may have an empty string as filename, check for 'falsy' values + file_fields = extract_file_fields(form) - if 'metadata' in form: - try: - metadata = json.loads(form['metadata'].value) - except Exception: - raise FileStoreException('wrong format for field "metadata"') + if 'metadata' in form: + try: + metadata = json.loads(form['metadata'].value) + except Exception: + raise FileStoreException('wrong format for field "metadata"') placer_class = strategy.value placer = placer_class(container_type, container, id_, metadata, timestamp, origin, context, file_processor) @@ -95,15 +98,18 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None # or "file1", "file2", etc (if multiple). Following this convention is probably a good idea. # Here, we accept any - # Non-file form fields may have an empty string as filename, check for 'falsy' values - file_fields = extract_file_fields(form) # TODO: Change schemas to enabled targeted uploads of more than one file. # Ref docs from placer.TargetedPlacer for details. if strategy == Strategy.targeted and len(file_fields) > 1: raise FileFormException("Targeted uploads can only send one file") for field in file_fields: - field.file.close() + if hasattr(field, 'file'): + field.file.close() + field.hash = util.format_hash(files.DEFAULT_HASH_ALG, field.hasher.hexdigest()) + + if not hasattr(field, 'hash'): + field.hash = '' # Augment the cgi.FieldStorage with a variety of custom fields. # Not the best practice. Open to improvements. # These are presumbed to be required by every function later called with field as a parameter. @@ -115,7 +121,6 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None file_processor.temp_fs.listdir('/'), )) field.size = file_processor.temp_fs.getsize(field.path) - field.hash = util.format_hash(files.DEFAULT_HASH_ALG, field.hasher.hexdigest()) field.uuid = str(uuid.uuid4()) field.mimetype = util.guess_mimetype(field.filename) # TODO: does not honor metadata's mime type if any field.modified = timestamp @@ -167,98 +172,6 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None return placer.finalize() -def process_signed_url_upload(request, strategy, tempdir, filename, metadata, container_type=None, id_=None, origin=None, context=None, - response=None): - """ - """ - - if not isinstance(strategy, Strategy): - raise Exception('Unknown upload strategy') - - if id_ is not None and container_type == None: - raise Exception('Unspecified container type') - - if container_type is not None and container_type not in ( - 'acquisition', 'session', 'project', 'collection', 'analysis', 'gear'): - raise Exception('Unknown container type') - - timestamp = datetime.datetime.utcnow() - - container = None - if container_type and id_: - container = hierarchy.get_container(container_type, id_) - - # The vast majority of this function's wall-clock time is spent here. - # Tempdir is deleted off disk once out of scope, so let's hold onto this reference. - file_processor = files.FileProcessor(config.get_item('persistent', 'data_path'), config.fs, tempdir_name=tempdir) - - placer_class = strategy.value - placer = placer_class(container_type, container, id_, metadata, timestamp, origin, context, file_processor) - placer.check() - - field = util.dotdict({}) - field.filename = filename - field.path = field.filename - if not file_processor.temp_fs.exists(field.path): - # tempdir_exists = os.path.exists(tempdir.name) - raise Exception("file {} does not exist, files in tmpdir: {}".format( - field.path, - file_processor.temp_fs.listdir('/'), - )) - field.size = file_processor.temp_fs.getsize(field.path) - # TODO: somehow figure out the hash of the file - # field.hash = util.format_hash(files.DEFAULT_HASH_ALG, field.hasher.hexdigest()) - field.uuid = str(uuid.uuid4()) - field.mimetype = util.guess_mimetype(field.filename) # TODO: does not honor metadata's mime type if any - field.modified = timestamp - - # create a file-attribute map commonly used elsewhere in the codebase. - # Stands in for a dedicated object... for now. - file_attrs = { - '_id': field.uuid, - 'name': field.filename, - 'modified': field.modified, - 'size': field.size, - 'mimetype': field.mimetype, - #'hash': field.hash, - 'origin': origin, - - 'type': None, - 'modality': None, - 'measurements': [], - 'tags': [], - 'info': {} - } - - file_attrs['type'] = files.guess_type_from_filename(file_attrs['name']) - placer.process_file_field(field, file_attrs) - - # Respond either with Server-Sent Events or a standard json map - if placer.sse and not response: - raise Exception("Programmer error: response required") - elif placer.sse: - response.headers['Content-Type'] = 'text/event-stream; charset=utf-8' - response.headers['Connection'] = 'keep-alive' - - # Instead of handing the iterator off to response.app_iter, send it ourselves. - # This prevents disconnections from leaving the API in a partially-complete state. - # - # Timing out between events or throwing an exception will result in undefinied behaviour. - # Right now, in our environment: - # - Timeouts may result in nginx-created 500 Bad Gateway HTML being added to the response. - # - Exceptions add some error json to the response, which is not SSE-sanitized. - - for item in placer.finalize(): - try: - response.write(item) - except Exception: # pylint: disable=broad-except - log.info('SSE upload progress failed to send; continuing') - - return - else: - return placer.finalize() - - class Upload(base.RequestHandler): def _check_ticket(self, ticket_id): @@ -298,17 +211,22 @@ def upload_signed_url(self, strategy): if not user: self.abort(403, 'Uploading requires login') - payload = self.request.json_body context = {'uid': self.uid if not self.superuser_request else None} - # TODO: Make it available for the other strategies - if strategy == 'reaper': + if strategy in 'label': + strategy = Strategy.labelupload + elif strategy == 'uid': + strategy = Strategy.uidupload + elif strategy == 'uid-match': + strategy = Strategy.uidmatch + elif strategy == 'reaper': strategy = Strategy.reaper else: self.abort(500, 'strategy {} not implemented'.format(strategy)) # Request for download ticket if self.get_param('ticket') == '': + payload = self.request.json_body metadata = payload.get('metadata', None) filename = payload.get('filename', None) @@ -317,6 +235,7 @@ def upload_signed_url(self, strategy): tempdir = str(uuid.uuid4()) # Upload into a temp folder, so we will be able to cleanup + print(type(config.fs)) signed_url = files.get_signed_url(fs.path.join('tmp', tempdir, filename), config.fs, purpose='upload') if not signed_url: @@ -335,7 +254,12 @@ def upload_signed_url(self, strategy): # If we don't have an origin with this request, use the ticket's origin self.origin = ticket.get('origin') - return process_signed_url_upload(self.request, strategy, ticket['tempdir'], ticket['filename'], ticket['metadata'], origin=self.origin, context=context) + file_fields = [ + util.dotdict({ + 'filename': ticket['filename'] + }) + ] + return process_upload(self.request, strategy, metadata=ticket['metadata'], origin=self.origin, context=context, file_fields=file_fields, tempdir=ticket['tempdir']) def engine(self): """Handles file uploads from the engine""" diff --git a/tests/unit_tests/python/test_uploads.py b/tests/unit_tests/python/test_uploads.py new file mode 100644 index 000000000..0bdebef39 --- /dev/null +++ b/tests/unit_tests/python/test_uploads.py @@ -0,0 +1,40 @@ +import requests_mock + + +def test_signed_url_upload(as_drone, mocker): + + metadata = { + "metadata": { + "group": {"_id": "scitran"}, + "project": {"label": ""}, + "session": { + "uid": "session_uid", + "subject": {"code": "bela"} + }, + "acquisition": { + "uid": "acquisition_uid", + "files": [{"name": "test"}] + } + }, + "filename": "test" + } + + r = as_drone.post('/upload/signed/reaper?ticket=', + json=metadata) + + assert r.status_code == 405 + + mock_fs = mocker.patch('api.upload.config.fs') + mock_fs.get_signed_url.return_value = 'url' + r = as_drone.post('/upload/signed/reaper?ticket=', + json=metadata) + + assert r.ok + assert r.json['upload_url'] == 'url' + + ticket_id = r.json['ticket'] + + r = as_drone.post('/upload/signed/reaper?ticket=' + ticket_id) + assert r.ok + + assert mock_fs.move.called \ No newline at end of file From 618201c52cac998dbb1019ab2b820ca8ee61ac29 Mon Sep 17 00:00:00 2001 From: David Farkas Date: Sat, 10 Mar 2018 14:03:24 +0100 Subject: [PATCH 3/3] Add signed url support for engine and listhandler --- api/api.py | 1 - api/handlers/listhandler.py | 53 +++++- api/placer.py | 2 +- api/upload.py | 119 +++++++------ tests/unit_tests/python/conftest.py | 222 +++++++++++++++++++++++- tests/unit_tests/python/test_uploads.py | 124 +++++++++++-- 6 files changed, 448 insertions(+), 73 deletions(-) diff --git a/api/api.py b/api/api.py index 140e74fb6..db7a9dc70 100644 --- a/api/api.py +++ b/api/api.py @@ -87,7 +87,6 @@ def prefix(path, routes): route('/download', Download, h='download', m=['GET', 'POST']), route('/download/summary', Download, h='summary', m=['POST']), route('/upload/', Upload, h='upload', m=['POST']), - route('/upload/signed/', Upload, h='upload_signed_url', m=['POST']), route('/clean-packfiles', Upload, h='clean_packfile_tokens', m=['POST']), route('/engine', Upload, h='engine', m=['POST']), diff --git a/api/handlers/listhandler.py b/api/handlers/listhandler.py index 54d36da41..d6ef7ed2d 100644 --- a/api/handlers/listhandler.py +++ b/api/handlers/listhandler.py @@ -7,6 +7,8 @@ import uuid import zipfile +import fs.path + from ..web import base from .. import config, files, upload, util, validators from ..auth import listauth, always_ok @@ -348,6 +350,25 @@ class FileListHandler(ListHandler): def __init__(self, request=None, response=None): super(FileListHandler, self).__init__(request, response) + def _create_ticket(self): + payload = self.request.json_body + metadata = payload.get('metadata', None) + filename = payload.get('filename', None) + + if not (metadata or filename): + self.abort(404, 'metadata and filename are required') + + tempdir = str(uuid.uuid4()) + # Upload into a temp folder, so we will be able to cleanup + signed_url = files.get_signed_url(fs.path.join('tmp', tempdir, filename), config.fs, purpose='upload') + + if not signed_url: + self.abort(405, 'Signed URLs are not supported with the current storage backend') + + ticket = util.upload_ticket(self.request.client_addr, self.origin, tempdir, filename, metadata) + return {'ticket': config.db.uploads.insert_one(ticket).inserted_id, + 'upload_url': signed_url} + def _check_ticket(self, ticket_id, _id, filename): ticket = config.db.downloads.find_one({'_id': ticket_id}) if not ticket: @@ -356,6 +377,14 @@ def _check_ticket(self, ticket_id, _id, filename): self.abort(400, 'ticket not for this resource or source IP') return ticket + def _check_upload_ticket(self, ticket_id): + ticket = config.db.uploads.find_one({'_id': ticket_id}) + if not ticket: + self.abort(404, 'no such ticket') + if ticket['ip'] != self.request.client_addr: + self.abort(400, 'ticket not for this resource or source IP') + return ticket + @staticmethod def build_zip_info(file_path, file_system): """ @@ -552,7 +581,29 @@ def post(self, cont_name, list_name, **kwargs): permchecker, _, _, _, _ = self._initialize_request(containerutil.pluralize(cont_name), list_name, _id) permchecker(noop)('POST', _id=_id) - return upload.process_upload(self.request, upload.Strategy.targeted, container_type=containerutil.singularize(cont_name), id_=_id, origin=self.origin) + # Request for upload ticket + if self.get_param('ticket') == '': + return self._create_ticket() + + # Check ticket id and skip permissions check if it clears + ticket_id = self.get_param('ticket') + if ticket_id: + ticket = self._check_upload_ticket(ticket_id) + if not self.origin.get('id'): + # If we don't have an origin with this request, use the ticket's origin + self.origin = ticket.get('origin') + + file_fields = [ + util.dotdict({ + 'filename': ticket['filename'] + }) + ] + + return upload.process_upload(self.request, upload.Strategy.targeted, metadata=ticket['metadata'], origin=self.origin, + container_type=containerutil.singularize(cont_name), + id_=_id, file_fields=file_fields, tempdir=ticket['tempdir']) + else: + return upload.process_upload(self.request, upload.Strategy.targeted, container_type=containerutil.singularize(cont_name), id_=_id, origin=self.origin) @validators.verify_payload_exists def put(self, cont_name, list_name, **kwargs): diff --git a/api/placer.py b/api/placer.py index 2e1215092..74bfbeecf 100644 --- a/api/placer.py +++ b/api/placer.py @@ -547,7 +547,7 @@ def finalize(self): }) # Similarly, create the attributes map that is consumed by helper funcs. Clear duplication :( - # This could be coalesced into a single map thrown on file fields, for example. + # This could be coalesced into a co single map thrown on file fields, for example. # Used in the API return. cgi_attrs = { '_id': cgi_field.uuid, diff --git a/api/upload.py b/api/upload.py index 3f862a41a..2dfd9e301 100644 --- a/api/upload.py +++ b/api/upload.py @@ -20,8 +20,8 @@ 'engine': pl.EnginePlacer, # Upload N files from the result of a successful job. 'token': pl.TokenPlacer, # Upload N files to a saved folder based on a token. 'packfile': pl.PackfilePlacer, # Upload N files as a new packfile to a container. - 'labelupload': pl.LabelPlacer, - 'uidupload': pl.UIDPlacer, + 'label': pl.LabelPlacer, + 'uid': pl.UIDPlacer, 'uidmatch': pl.UIDMatchPlacer, 'reaper': pl.UIDReaperPlacer, 'analysis': pl.AnalysisPlacer, # Upload N files to an analysis as input and output (no db updates) @@ -174,6 +174,25 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None class Upload(base.RequestHandler): + def _create_ticket(self): + payload = self.request.json_body + metadata = payload.get('metadata', None) + filename = payload.get('filename', None) + + if not (metadata or filename): + self.abort(404, 'metadata and filename are required') + + tempdir = str(uuid.uuid4()) + # Upload into a temp folder, so we will be able to cleanup + signed_url = files.get_signed_url(fs.path.join('tmp', tempdir, filename), config.fs, purpose='upload') + + if not signed_url: + self.abort(405, 'Signed URLs are not supported with the current storage backend') + + ticket = util.upload_ticket(self.request.client_addr, self.origin, tempdir, filename, metadata) + return {'ticket': config.db.uploads.insert_one(ticket).inserted_id, + 'upload_url': signed_url} + def _check_ticket(self, ticket_id): ticket = config.db.uploads.find_one({'_id': ticket_id}) if not ticket: @@ -190,64 +209,20 @@ def upload(self, strategy): if not user: self.abort(403, 'Uploading requires login') - context = {'uid': self.uid if not self.superuser_request else None} - - # TODO: what enum - if strategy == 'label': - strategy = Strategy.labelupload - elif strategy == 'uid': - strategy = Strategy.uidupload - elif strategy == 'uid-match': - strategy = Strategy.uidmatch - elif strategy == 'reaper': - strategy = Strategy.reaper + if strategy in ['label', 'uid', 'uid-match', 'reaper']: + strategy = strategy.replace('-', '') + strategy = getattr(Strategy, strategy) else: self.abort(500, 'strategy {} not implemented'.format(strategy)) - return process_upload(self.request, strategy, origin=self.origin, context=context) - - def upload_signed_url(self, strategy): - if not self.superuser_request: - user = self.uid - if not user: - self.abort(403, 'Uploading requires login') context = {'uid': self.uid if not self.superuser_request else None} - if strategy in 'label': - strategy = Strategy.labelupload - elif strategy == 'uid': - strategy = Strategy.uidupload - elif strategy == 'uid-match': - strategy = Strategy.uidmatch - elif strategy == 'reaper': - strategy = Strategy.reaper - else: - self.abort(500, 'strategy {} not implemented'.format(strategy)) - - # Request for download ticket + # Request for upload ticket if self.get_param('ticket') == '': - payload = self.request.json_body - metadata = payload.get('metadata', None) - filename = payload.get('filename', None) - - if not (metadata or filename): - self.abort(404, 'metadata and filename are required') - - tempdir = str(uuid.uuid4()) - # Upload into a temp folder, so we will be able to cleanup - print(type(config.fs)) - signed_url = files.get_signed_url(fs.path.join('tmp', tempdir, filename), config.fs, purpose='upload') - - if not signed_url: - self.abort(405, 'Signed URLs are not supported with the current storage backend') - - ticket = util.upload_ticket(self.request.client_addr, self.origin, tempdir, filename, metadata) - return {'ticket': config.db.uploads.insert_one(ticket).inserted_id, - 'upload_url': signed_url} + return self._create_ticket() # Check ticket id and skip permissions check if it clears ticket_id = self.get_param('ticket') - ticket = None if ticket_id: ticket = self._check_ticket(ticket_id) if not self.origin.get('id'): @@ -259,7 +234,11 @@ def upload_signed_url(self, strategy): 'filename': ticket['filename'] }) ] - return process_upload(self.request, strategy, metadata=ticket['metadata'], origin=self.origin, context=context, file_fields=file_fields, tempdir=ticket['tempdir']) + + return process_upload(self.request, strategy, metadata=ticket['metadata'], origin=self.origin, + context=context, file_fields=file_fields, tempdir=ticket['tempdir']) + else: + return process_upload(self.request, strategy, origin=self.origin, context=context) def engine(self): """Handles file uploads from the engine""" @@ -276,16 +255,42 @@ def engine(self): self.abort(400, 'container id is required') else: cid = bson.ObjectId(cid) + context = { 'job_id': self.get_param('job'), 'job_ticket_id': self.get_param('job_ticket'), } - if level == 'analysis': - return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, - id_=cid, context=context) + + # Request for upload ticket + if self.get_param('ticket') == '': + return self._create_ticket() + + # Check ticket id and skip permissions check if it clears + ticket_id = self.get_param('ticket') + if ticket_id: + ticket = self._check_ticket(ticket_id) + if not self.origin.get('id'): + # If we don't have an origin with this request, use the ticket's origin + self.origin = ticket.get('origin') + + file_fields = [ + util.dotdict({ + 'filename': ticket['filename'] + }) + ] + + if level is not 'analysis': + return process_upload(self.request, Strategy.engine, metadata=ticket['metadata'], origin=self.origin, + context=context, file_fields=file_fields, tempdir=ticket['tempdir'], + container_type=level, id_=cid) + else: - return process_upload(self.request, Strategy.engine, container_type=level, id_=cid, origin=self.origin, - context=context) + if level == 'analysis': + return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, + id_=cid, context=context) + else: + return process_upload(self.request, Strategy.engine, container_type=level, id_=cid, origin=self.origin, + context=context) def clean_packfile_tokens(self): """Clean up expired upload tokens and invalid token directories. diff --git a/tests/unit_tests/python/conftest.py b/tests/unit_tests/python/conftest.py index 05699de65..e7b9ef9e7 100644 --- a/tests/unit_tests/python/conftest.py +++ b/tests/unit_tests/python/conftest.py @@ -1,3 +1,7 @@ +import binascii +import bson +import copy +import datetime import logging import os @@ -8,8 +12,6 @@ import webapp2 import api.config -import api.web.start - SCITRAN_CORE_DRONE_SECRET = os.environ['SCITRAN_CORE_DRONE_SECRET'] @@ -63,7 +65,13 @@ def app(): mongo_patch.start() # NOTE db and log_db is created at import time in api.config # reloading the module is needed to use the mocked MongoClient + + # Hack because of the containerhandler's import time instantiation + # with this the containerhandler will use the same mock db instance + import api.config reload(api.config) + import api.web.start + yield api.web.start.app_factory() mongo_patch.stop() es_patch.stop() @@ -87,6 +95,216 @@ def log(request): return log +@pytest.fixture(scope='function') +def randstr(request): + + def randstr(): + """Return random string prefixed with test module and function name""" + # NOTE Useful for generating required unique document fields in data_builder + # or in tests directly by using the fixture. Uses hex strings as each of + # those fields (user._id, group._id, gear.gear.name) support [a-z0-9] + + def clean(test_name): + return test_name.lower().replace('test_', '').rstrip('_').replace('_', '-') + + module = clean(request.module.__name__) + function = clean(request.function.__name__) + prefix = module + '-' + function + return prefix[:21] + '-' + binascii.hexlify(os.urandom(5)) + + return randstr + + +@pytest.fixture(scope='function') +def default_payload(): + """Return default test resource creation payloads""" + return attrdict.AttrDict({ + 'user': {'firstname': 'test', 'lastname': 'user'}, + 'group': {}, + 'project': {'public': True}, + 'session': {'public': True}, + 'acquisition': {'public': True}, + 'collection': {}, + 'gear': { + 'exchange': { + 'git-commit': 'aex', + 'rootfs-hash': 'sha384:oy', + 'rootfs-url': 'https://test.test' + }, + 'gear': { + 'author': 'test', + 'config': {}, + 'description': 'test', + 'inputs': { + 'text files (max 100K)': { + 'base': 'file', + 'name': {'pattern': '^.*.txt$'}, + 'size': {'maximum': 100000} + } + }, + 'label': 'test', + 'license': 'BSD-2-Clause', + 'source': 'https://test.test', + 'url': 'https://test.test', + 'version': '0.0.1', + }, + }, + 'job': {'inputs': {}}, + }) + + +@pytest.fixture(scope='session') +def merge_dict(): + + def merge_dict(a, b): + """Merge two dicts into the first recursively""" + for key, value in b.iteritems(): + if key in a and isinstance(a[key], dict) and isinstance(b[key], dict): + merge_dict(a[key], b[key]) + else: + a[key] = b[key] + + return merge_dict + + +@pytest.yield_fixture(scope='function') +def data_builder(as_drone, api_db, randstr): + """Yield DataBuilder instance (per test)""" + # NOTE currently there's only a single data_builder for simplicity which + # uses as_root - every resource is created/owned by the admin user + data_builder = DataBuilder(as_drone, api_db, randstr=randstr) + yield data_builder + data_builder.teardown() + + +class DataBuilder(object): + child_to_parent = { + 'project': 'group', + 'session': 'project', + 'acquisition': 'session', + } + parent_to_child = {parent: child for child, parent in child_to_parent.items()} + + def __init__(self, session, api_db, randstr=lambda: binascii.hexlify(os.urandom(10))): + self.session = session + self.api_db = api_db + self.randstr = randstr + self.resources = [] + + def __getattr__(self, name): + """Return resource specific create_* or delete_* method""" + if name.startswith('create_') or name.startswith('delete_'): + method, resource = name.split('_', 1) + if resource not in _default_payload: + raise Exception('Unknown resource type {} (from {})'.format(resource, name)) + def resource_method(*args, **kwargs): + return getattr(self, method)(resource, *args, **kwargs) + return resource_method + raise AttributeError + + def create(self, resource, **kwargs): + """Create resource in api and return it's _id""" + + # merge any kwargs on top of the default payload + payload = copy.deepcopy(_default_payload[resource]) + _merge_dict(payload, kwargs) + + # add missing required unique fields using randstr + # such fields are: [user._id, group._id, gear.gear.name] + if resource == 'user' and '_id' not in payload: + payload['_id'] = self.randstr() + '@user.com' + if resource == 'group' and '_id' not in payload: + payload['_id'] = self.randstr() + if resource == 'gear' and 'name' not in payload['gear']: + payload['gear']['name'] = self.randstr() + if resource == 'collection' and 'label' not in payload: + payload['label'] = self.randstr() + + # add missing label fields using randstr + # such fields are: [project.label, session.label, acquisition.label] + if resource in self.child_to_parent and 'label' not in payload: + payload['label'] = self.randstr() + + # add missing parent container when creating child container + if resource in self.child_to_parent: + parent = self.child_to_parent[resource] + if parent not in payload: + payload[parent] = self.get_or_create(parent) + # add missing gear when creating job + if resource == 'job' and 'gear_id' not in payload: + + # create file inputs for each job input on gear + gear_inputs = {} + for i in payload.get('inputs', {}).keys(): + gear_inputs[i] = {'base': 'file'} + + gear_doc = _default_payload['gear']['gear'] + gear_doc['inputs'] = gear_inputs + payload['gear_id'] = self.create('gear', gear=gear_doc) + + # put together the create url to post to + create_url = '/' + resource + 's' + if resource == 'gear': + create_url += '/' + payload['gear']['name'] + if resource == 'job': + create_url += '/add' + + # handle user api keys (they are set via mongo directly) + if resource == 'user': + user_api_key = payload.pop('api_key', None) + + # create resource + r = self.session.post(create_url, json=payload) + if not r.ok: + raise Exception( + 'DataBuilder failed to create {}: {}\n' + 'Payload was:\n{}'.format(resource, r.json['message'], payload)) + _id = r.json['_id'] + + # inject api key if it was provided + if resource == 'user' and user_api_key: + self.api_db.apikeys.insert_one({ + '_id': user_api_key, + 'created': datetime.datetime.utcnow(), + 'last_seen': None, + 'type': 'user', + 'uid': _id + }) + + self.resources.append((resource, _id)) + return _id + + def get_or_create(self, resource): + """Return first _id from self.resources for type `resource` (Create if not found)""" + for resource_, _id in self.resources: + if resource == resource_: + return _id + return self.create(resource) + + def teardown(self): + """Delete resources created with this DataBuilder from self.resources""" + for resource, _id in reversed(self.resources): + self.delete(resource, _id) + + def delete(self, resource, _id, recursive=False): + """Delete resource from mongodb by _id""" + if bson.ObjectId.is_valid(_id): + _id = bson.ObjectId(_id) + if recursive and resource in self.parent_to_child: + child_cont = self.parent_to_child[resource] + for child in self.api_db[child_cont + 's'].find({resource: _id}, {'_id': 1}): + self.delete(child_cont, child['_id'], recursive=recursive) + if resource == 'gear': + self.api_db.jobs.remove({'gear_id': str(_id)}) + self.api_db[resource + 's'].remove({'_id': _id}) + + +# Store return values of pytest fixtures that are also used by DataBuilder +# as "private singletons" in the module. This seemed the least confusing. +_default_payload = default_payload() +_merge_dict = merge_dict() + + class ApiAccessor(object): def __init__(self, app, **defaults): self.app = app diff --git a/tests/unit_tests/python/test_uploads.py b/tests/unit_tests/python/test_uploads.py index 0bdebef39..9b7157c56 100644 --- a/tests/unit_tests/python/test_uploads.py +++ b/tests/unit_tests/python/test_uploads.py @@ -1,13 +1,13 @@ import requests_mock -def test_signed_url_upload(as_drone, mocker): +def test_signed_url_reaper_upload(as_drone, mocker): - metadata = { + payload = { "metadata": { - "group": {"_id": "scitran"}, - "project": {"label": ""}, - "session": { + "group": {"_id": "scitran"}, + "project": {"label": ""}, + "session": { "uid": "session_uid", "subject": {"code": "bela"} }, @@ -19,22 +19,124 @@ def test_signed_url_upload(as_drone, mocker): "filename": "test" } - r = as_drone.post('/upload/signed/reaper?ticket=', - json=metadata) + r = as_drone.post('/upload/reaper?ticket=', + json=payload) assert r.status_code == 405 mock_fs = mocker.patch('api.upload.config.fs') mock_fs.get_signed_url.return_value = 'url' - r = as_drone.post('/upload/signed/reaper?ticket=', - json=metadata) + r = as_drone.post('/upload/reaper?ticket=', + json=payload) assert r.ok assert r.json['upload_url'] == 'url' ticket_id = r.json['ticket'] - r = as_drone.post('/upload/signed/reaper?ticket=' + ticket_id) + r = as_drone.post('/upload/reaper?ticket=' + ticket_id) assert r.ok - assert mock_fs.move.called \ No newline at end of file + assert mock_fs.move.called + + +def test_signed_url_label_upload(as_drone, data_builder, mocker): + group = data_builder.create_group() + + payload = { + "metadata": { + 'group': {'_id': group}, + 'project': { + 'label': 'test_project', + 'files': [{'name': 'project.csv'}] + } + }, + "filename": "project.csv" + } + + r = as_drone.post('/upload/label?ticket=', + json=payload) + + assert r.status_code == 405 + + mock_fs = mocker.patch('api.upload.config.fs') + mock_fs.get_signed_url.return_value = 'url' + r = as_drone.post('/upload/label?ticket=', + json=payload) + + assert r.ok + assert r.json['upload_url'] == 'url' + + ticket_id = r.json['ticket'] + + r = as_drone.post('/upload/label?ticket=' + ticket_id) + assert r.ok + + assert mock_fs.move.called + + +def test_engine_upload(as_drone, data_builder, mocker): + project = data_builder.create_project() + + payload = { + 'metadata': { + 'project': { + 'label': 'engine project', + 'info': {'test': 'p'}, + 'files': [ + { + 'name': 'one.csv', + 'type': 'engine type 0', + 'info': {'test': 'f0'} + } + ] + } + }, + 'filename': 'one.csv' + } + + r = as_drone.post('/engine?ticket=&level=%s&id=%s' % ('project', project), + json=payload) + + assert r.status_code == 405 + + mock_fs = mocker.patch('api.upload.config.fs') + mock_fs.get_signed_url.return_value = 'url' + r = as_drone.post('/engine?ticket=&level=%s&id=%s' % ('project', project), + json=payload) + + assert r.ok + assert r.json['upload_url'] == 'url' + + ticket_id = r.json['ticket'] + + r = as_drone.post('/engine?ticket=%s&level=%s&id=%s' % (ticket_id, 'project', project)) + assert r.ok + + assert mock_fs.move.called + + +def test_filelisthandler_signed_url_upload(as_drone, data_builder, mocker): + project = data_builder.create_project() + + payload = { + 'metadata': {}, + 'filename': 'one.csv' + } + + r = as_drone.post('/projects/' + project + '/files?ticket=', json=payload) + assert r.status_code == 405 + + mock_fs = mocker.patch('api.upload.config.fs') + mock_fs.get_signed_url.return_value = 'url' + r = as_drone.post('/projects/' + project + '/files?ticket=', json=payload) + + assert r.ok + assert r.json['upload_url'] == 'url' + + ticket_id = r.json['ticket'] + + r = as_drone.post('/projects/' + project + '/files?ticket=' + ticket_id) + assert r.ok + + assert mock_fs.move.called