-
Notifications
You must be signed in to change notification settings - Fork 18
Signed url upload #1099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: generic-storage
Are you sure you want to change the base?
Signed url upload #1099
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,8 @@ | |
| import uuid | ||
| import zipfile | ||
|
|
||
| import fs.path | ||
|
|
||
| from ..web import base | ||
| from .. import config, files, upload, util, validators | ||
| from ..auth import listauth, always_ok | ||
|
|
@@ -348,6 +350,25 @@ class FileListHandler(ListHandler): | |
| def __init__(self, request=None, response=None): | ||
| super(FileListHandler, self).__init__(request, response) | ||
|
|
||
| def _create_ticket(self): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please rename to |
||
| payload = self.request.json_body | ||
| metadata = payload.get('metadata', None) | ||
| filename = payload.get('filename', None) | ||
|
|
||
| if not (metadata or filename): | ||
| self.abort(404, 'metadata and filename are required') | ||
|
|
||
| tempdir = str(uuid.uuid4()) | ||
| # Upload into a temp folder, so we will be able to cleanup | ||
| signed_url = files.get_signed_url(fs.path.join('tmp', tempdir, filename), config.fs, purpose='upload') | ||
|
|
||
| if not signed_url: | ||
| self.abort(405, 'Signed URLs are not supported with the current storage backend') | ||
|
|
||
| ticket = util.upload_ticket(self.request.client_addr, self.origin, tempdir, filename, metadata) | ||
| return {'ticket': config.db.uploads.insert_one(ticket).inserted_id, | ||
| 'upload_url': signed_url} | ||
|
|
||
| def _check_ticket(self, ticket_id, _id, filename): | ||
| ticket = config.db.downloads.find_one({'_id': ticket_id}) | ||
| if not ticket: | ||
|
|
@@ -356,6 +377,14 @@ def _check_ticket(self, ticket_id, _id, filename): | |
| self.abort(400, 'ticket not for this resource or source IP') | ||
| return ticket | ||
|
|
||
| def _check_upload_ticket(self, ticket_id): | ||
| ticket = config.db.uploads.find_one({'_id': ticket_id}) | ||
| if not ticket: | ||
| self.abort(404, 'no such ticket') | ||
| if ticket['ip'] != self.request.client_addr: | ||
| self.abort(400, 'ticket not for this resource or source IP') | ||
| return ticket | ||
|
|
||
| @staticmethod | ||
| def build_zip_info(file_path, file_system): | ||
| """ | ||
|
|
@@ -552,7 +581,29 @@ def post(self, cont_name, list_name, **kwargs): | |
| permchecker, _, _, _, _ = self._initialize_request(containerutil.pluralize(cont_name), list_name, _id) | ||
| permchecker(noop)('POST', _id=_id) | ||
|
|
||
| return upload.process_upload(self.request, upload.Strategy.targeted, container_type=containerutil.singularize(cont_name), id_=_id, origin=self.origin) | ||
| # Request for upload ticket | ||
| if self.get_param('ticket') == '': | ||
| return self._create_ticket() | ||
|
|
||
| # Check ticket id and skip permissions check if it clears | ||
| ticket_id = self.get_param('ticket') | ||
| if ticket_id: | ||
| ticket = self._check_upload_ticket(ticket_id) | ||
| if not self.origin.get('id'): | ||
| # If we don't have an origin with this request, use the ticket's origin | ||
| self.origin = ticket.get('origin') | ||
|
|
||
| file_fields = [ | ||
| util.dotdict({ | ||
| 'filename': ticket['filename'] | ||
| }) | ||
| ] | ||
|
|
||
| return upload.process_upload(self.request, upload.Strategy.targeted, metadata=ticket['metadata'], origin=self.origin, | ||
| container_type=containerutil.singularize(cont_name), | ||
| id_=_id, file_fields=file_fields, tempdir=ticket['tempdir']) | ||
| else: | ||
| return upload.process_upload(self.request, upload.Strategy.targeted, container_type=containerutil.singularize(cont_name), id_=_id, origin=self.origin) | ||
|
|
||
| @validators.verify_payload_exists | ||
| def put(self, cont_name, list_name, **kwargs): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -547,7 +547,7 @@ def finalize(self): | |
| }) | ||
|
|
||
| # Similarly, create the attributes map that is consumed by helper funcs. Clear duplication :( | ||
| # This could be coalesced into a single map thrown on file fields, for example. | ||
| # This could be coalesced into a co single map thrown on file fields, for example. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix comment. |
||
| # Used in the API return. | ||
| cgi_attrs = { | ||
| '_id': cgi_field.uuid, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,20 +16,22 @@ | |
| log = config.log | ||
|
|
||
| Strategy = util.Enum('Strategy', { | ||
| 'targeted' : pl.TargetedPlacer, # Upload N files to a container. | ||
| 'engine' : pl.EnginePlacer, # Upload N files from the result of a successful job. | ||
| 'token' : pl.TokenPlacer, # Upload N files to a saved folder based on a token. | ||
| 'packfile' : pl.PackfilePlacer, # Upload N files as a new packfile to a container. | ||
| 'labelupload' : pl.LabelPlacer, | ||
| 'uidupload' : pl.UIDPlacer, | ||
| 'uidmatch' : pl.UIDMatchPlacer, | ||
| 'reaper' : pl.UIDReaperPlacer, | ||
| 'analysis' : pl.AnalysisPlacer, # Upload N files to an analysis as input and output (no db updates) | ||
| 'analysis_job': pl.AnalysisJobPlacer, # Upload N files to an analysis as output from job results | ||
| 'gear' : pl.GearPlacer | ||
| 'targeted': pl.TargetedPlacer, # Upload N files to a container. | ||
| 'engine': pl.EnginePlacer, # Upload N files from the result of a successful job. | ||
| 'token': pl.TokenPlacer, # Upload N files to a saved folder based on a token. | ||
| 'packfile': pl.PackfilePlacer, # Upload N files as a new packfile to a container. | ||
| 'label': pl.LabelPlacer, | ||
| 'uid': pl.UIDPlacer, | ||
| 'uidmatch': pl.UIDMatchPlacer, | ||
| 'reaper': pl.UIDReaperPlacer, | ||
| 'analysis': pl.AnalysisPlacer, # Upload N files to an analysis as input and output (no db updates) | ||
| 'analysis_job': pl.AnalysisJobPlacer, # Upload N files to an analysis as output from job results | ||
| 'gear': pl.GearPlacer | ||
| }) | ||
|
|
||
| def process_upload(request, strategy, container_type=None, id_=None, origin=None, context=None, response=None, metadata=None): | ||
|
|
||
| def process_upload(request, strategy, container_type=None, id_=None, origin=None, context=None, response=None, | ||
| metadata=None, file_fields=None, tempdir=None): | ||
| """ | ||
| Universal file upload entrypoint. | ||
|
|
||
|
|
@@ -64,7 +66,8 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None | |
| if id_ is not None and container_type == None: | ||
| raise Exception('Unspecified container type') | ||
|
|
||
| if container_type is not None and container_type not in ('acquisition', 'session', 'project', 'collection', 'analysis', 'gear'): | ||
| if container_type is not None and container_type not in ( | ||
| 'acquisition', 'session', 'project', 'collection', 'analysis', 'gear'): | ||
| raise Exception('Unknown container type') | ||
|
|
||
| timestamp = datetime.datetime.utcnow() | ||
|
|
@@ -75,14 +78,17 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None | |
|
|
||
| # The vast majority of this function's wall-clock time is spent here. | ||
| # Tempdir is deleted off disk once out of scope, so let's hold onto this reference. | ||
| file_processor = files.FileProcessor(config.get_item('persistent', 'data_path'), config.fs) | ||
| form = file_processor.process_form(request) | ||
| file_processor = files.FileProcessor(config.get_item('persistent', 'data_path'), config.fs, tempdir_name=tempdir) | ||
| if not file_fields: | ||
| form = file_processor.process_form(request) | ||
| # Non-file form fields may have an empty string as filename, check for 'falsy' values | ||
| file_fields = extract_file_fields(form) | ||
|
|
||
| if 'metadata' in form: | ||
| try: | ||
| metadata = json.loads(form['metadata'].value) | ||
| except Exception: | ||
| raise FileStoreException('wrong format for field "metadata"') | ||
| if 'metadata' in form: | ||
| try: | ||
| metadata = json.loads(form['metadata'].value) | ||
| except Exception: | ||
| raise FileStoreException('wrong format for field "metadata"') | ||
|
|
||
| placer_class = strategy.value | ||
| placer = placer_class(container_type, container, id_, metadata, timestamp, origin, context, file_processor) | ||
|
|
@@ -92,27 +98,29 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None | |
| # or "file1", "file2", etc (if multiple). Following this convention is probably a good idea. | ||
| # Here, we accept any | ||
|
|
||
| # Non-file form fields may have an empty string as filename, check for 'falsy' values | ||
| file_fields = extract_file_fields(form) | ||
| # TODO: Change schemas to enabled targeted uploads of more than one file. | ||
| # Ref docs from placer.TargetedPlacer for details. | ||
| if strategy == Strategy.targeted and len(file_fields) > 1: | ||
| raise FileFormException("Targeted uploads can only send one file") | ||
|
|
||
| for field in file_fields: | ||
| field.file.close() | ||
| if hasattr(field, 'file'): | ||
| field.file.close() | ||
| field.hash = util.format_hash(files.DEFAULT_HASH_ALG, field.hasher.hexdigest()) | ||
|
|
||
| if not hasattr(field, 'hash'): | ||
| field.hash = '' | ||
| # Augment the cgi.FieldStorage with a variety of custom fields. | ||
| # Not the best practice. Open to improvements. | ||
| # These are presumbed to be required by every function later called with field as a parameter. | ||
| field.path = field.filename | ||
| field.path = field.filename | ||
| if not file_processor.temp_fs.exists(field.path): | ||
| #tempdir_exists = os.path.exists(tempdir.name) | ||
| # tempdir_exists = os.path.exists(tempdir.name) | ||
| raise Exception("file {} does not exist, files in tmpdir: {}".format( | ||
| field.path, | ||
| file_processor.temp_fs.listdir('/'), | ||
| )) | ||
| field.size = file_processor.temp_fs.getsize(field.path) | ||
| field.hash = util.format_hash(files.DEFAULT_HASH_ALG, field.hasher.hexdigest()) | ||
| field.uuid = str(uuid.uuid4()) | ||
| field.mimetype = util.guess_mimetype(field.filename) # TODO: does not honor metadata's mime type if any | ||
| field.modified = timestamp | ||
|
|
@@ -121,9 +129,9 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None | |
| # Stands in for a dedicated object... for now. | ||
| file_attrs = { | ||
| '_id': field.uuid, | ||
| 'name': field.filename, | ||
| 'name': field.filename, | ||
| 'modified': field.modified, | ||
| 'size': field.size, | ||
| 'size': field.size, | ||
| 'mimetype': field.mimetype, | ||
| 'hash': field.hash, | ||
| 'origin': origin, | ||
|
|
@@ -143,7 +151,7 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None | |
| raise Exception("Programmer error: response required") | ||
| elif placer.sse: | ||
| response.headers['Content-Type'] = 'text/event-stream; charset=utf-8' | ||
| response.headers['Connection'] = 'keep-alive' | ||
| response.headers['Connection'] = 'keep-alive' | ||
|
|
||
| # Instead of handing the iterator off to response.app_iter, send it ourselves. | ||
| # This prevents disconnections from leaving the API in a partially-complete state. | ||
|
|
@@ -156,7 +164,7 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None | |
| for item in placer.finalize(): | ||
| try: | ||
| response.write(item) | ||
| except Exception: # pylint: disable=broad-except | ||
| except Exception: # pylint: disable=broad-except | ||
| log.info('SSE upload progress failed to send; continuing') | ||
|
|
||
| return | ||
|
|
@@ -166,6 +174,33 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None | |
|
|
||
| class Upload(base.RequestHandler): | ||
|
|
||
| def _create_ticket(self): | ||
| payload = self.request.json_body | ||
| metadata = payload.get('metadata', None) | ||
| filename = payload.get('filename', None) | ||
|
|
||
| if not (metadata or filename): | ||
| self.abort(404, 'metadata and filename are required') | ||
|
|
||
| tempdir = str(uuid.uuid4()) | ||
| # Upload into a temp folder, so we will be able to cleanup | ||
| signed_url = files.get_signed_url(fs.path.join('tmp', tempdir, filename), config.fs, purpose='upload') | ||
|
|
||
| if not signed_url: | ||
| self.abort(405, 'Signed URLs are not supported with the current storage backend') | ||
|
|
||
| ticket = util.upload_ticket(self.request.client_addr, self.origin, tempdir, filename, metadata) | ||
| return {'ticket': config.db.uploads.insert_one(ticket).inserted_id, | ||
| 'upload_url': signed_url} | ||
|
|
||
| def _check_ticket(self, ticket_id): | ||
| ticket = config.db.uploads.find_one({'_id': ticket_id}) | ||
| if not ticket: | ||
| self.abort(404, 'no such ticket') | ||
| if ticket['ip'] != self.request.client_addr: | ||
| self.abort(400, 'ticket not for this resource or source IP') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could argue this is a 403 because ip is our rough permission check (other than ownership of the ticket). |
||
| return ticket | ||
|
|
||
| def upload(self, strategy): | ||
| """Receive a sortable reaper upload.""" | ||
|
|
||
|
|
@@ -174,20 +209,36 @@ def upload(self, strategy): | |
| if not user: | ||
| self.abort(403, 'Uploading requires login') | ||
|
|
||
| if strategy in ['label', 'uid', 'uid-match', 'reaper']: | ||
| strategy = strategy.replace('-', '') | ||
| strategy = getattr(Strategy, strategy) | ||
| else: | ||
| self.abort(500, 'strategy {} not implemented'.format(strategy)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need the 500 here because something not accounted for won't make it past the routing logic in |
||
|
|
||
| context = {'uid': self.uid if not self.superuser_request else None} | ||
|
|
||
| # TODO: what enum | ||
| if strategy == 'label': | ||
| strategy = Strategy.labelupload | ||
| elif strategy == 'uid': | ||
| strategy = Strategy.uidupload | ||
| elif strategy == 'uid-match': | ||
| strategy = Strategy.uidmatch | ||
| elif strategy == 'reaper': | ||
| strategy = Strategy.reaper | ||
| # Request for upload ticket | ||
| if self.get_param('ticket') == '': | ||
| return self._create_ticket() | ||
|
|
||
| # Check ticket id and skip permissions check if it clears | ||
| ticket_id = self.get_param('ticket') | ||
| if ticket_id: | ||
| ticket = self._check_ticket(ticket_id) | ||
| if not self.origin.get('id'): | ||
| # If we don't have an origin with this request, use the ticket's origin | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question as before - I don't think it's possible to have an empty origin with an authenticated request. |
||
| self.origin = ticket.get('origin') | ||
|
|
||
| file_fields = [ | ||
| util.dotdict({ | ||
| 'filename': ticket['filename'] | ||
| }) | ||
| ] | ||
|
|
||
| return process_upload(self.request, strategy, metadata=ticket['metadata'], origin=self.origin, | ||
| context=context, file_fields=file_fields, tempdir=ticket['tempdir']) | ||
| else: | ||
| self.abort(500, 'strategy {} not implemented'.format(strategy)) | ||
| return process_upload(self.request, strategy, origin=self.origin, context=context) | ||
| return process_upload(self.request, strategy, origin=self.origin, context=context) | ||
|
|
||
| def engine(self): | ||
| """Handles file uploads from the engine""" | ||
|
|
@@ -204,14 +255,42 @@ def engine(self): | |
| self.abort(400, 'container id is required') | ||
| else: | ||
| cid = bson.ObjectId(cid) | ||
|
|
||
| context = { | ||
| 'job_id': self.get_param('job'), | ||
| 'job_ticket_id': self.get_param('job_ticket'), | ||
| } | ||
| if level == 'analysis': | ||
| return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, id_=cid, context=context) | ||
|
|
||
| # Request for upload ticket | ||
| if self.get_param('ticket') == '': | ||
| return self._create_ticket() | ||
|
|
||
| # Check ticket id and skip permissions check if it clears | ||
| ticket_id = self.get_param('ticket') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having URL params |
||
| if ticket_id: | ||
| ticket = self._check_ticket(ticket_id) | ||
| if not self.origin.get('id'): | ||
| # If we don't have an origin with this request, use the ticket's origin | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what situation would you expect us to not have an origin? Above we check to make sure the request has admin rights, so we only allow authenticated requests for this endpoint. |
||
| self.origin = ticket.get('origin') | ||
|
|
||
| file_fields = [ | ||
| util.dotdict({ | ||
| 'filename': ticket['filename'] | ||
| }) | ||
| ] | ||
|
|
||
| if level is not 'analysis': | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can analysis job results be uploaded directly to storage (using a ticket)? |
||
| return process_upload(self.request, Strategy.engine, metadata=ticket['metadata'], origin=self.origin, | ||
| context=context, file_fields=file_fields, tempdir=ticket['tempdir'], | ||
| container_type=level, id_=cid) | ||
|
|
||
| else: | ||
| return process_upload(self.request, Strategy.engine, container_type=level, id_=cid, origin=self.origin, context=context) | ||
| if level == 'analysis': | ||
| return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, | ||
| id_=cid, context=context) | ||
| else: | ||
| return process_upload(self.request, Strategy.engine, container_type=level, id_=cid, origin=self.origin, | ||
| context=context) | ||
|
|
||
| def clean_packfile_tokens(self): | ||
| """Clean up expired upload tokens and invalid token directories. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of disabling, I think this needs an appropriate timeout instead. It feels like it should be roughly in sync with the
job_ticketsttl, as the most important user of this feature would be the engine.