From 030626530cb9b41954c985d1d1bc0afb3038e74d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Bult=C3=A9?= Date: Wed, 6 Jun 2018 00:04:56 +0200 Subject: [PATCH] WIP: preparse csv plugin --- csvapi/cli.py | 5 ++++- csvapi/parser.py | 29 ++++++++++++++++++++++++----- csvapi/parseview.py | 3 ++- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/csvapi/cli.py b/csvapi/cli.py index e2b35ad..3e7fe0f 100644 --- a/csvapi/cli.py +++ b/csvapi/cli.py @@ -29,8 +29,10 @@ def cli(): help='Do not parse CSV again if DB already exists') @click.option('-w', '--max-workers', default=3, help='Max number of ThreadPoolExecutor workers') +@click.option('-m', '--parse-module', default=None, + help='CSV pre-parser module to use') @cli.command() -def serve(dbs, host, port, debug, reload, cache, max_workers): +def serve(dbs, host, port, debug, reload, cache, max_workers, parse_module): if reload: import hupper hupper.start_reloader('csvapi.cli.serve') @@ -38,4 +40,5 @@ def serve(dbs, host, port, debug, reload, cache, max_workers): app.config.CSV_CACHE_ENABLED = cache app.config.MAX_WORKERS = max_workers app.config.RESPONSE_TIMEOUT = RESPONSE_TIMEOUT + app.config.PARSE_MODULE = parse_module app.run(host=host, port=port, debug=debug) diff --git a/csvapi/parser.py b/csvapi/parser.py index b7af4a2..78e29c7 100644 --- a/csvapi/parser.py +++ b/csvapi/parser.py @@ -1,6 +1,7 @@ import os import logging +from itertools import islice import agate import agatesql # noqa @@ -10,6 +11,7 @@ log = logging.getLogger('__name__') SNIFF_LIMIT = 2048 +MAX_PREPARSE_LINES = 50 def is_binary(filepath): @@ -19,11 +21,11 @@ def is_binary(filepath): def detect_encoding(filepath): with os.popen('file {} -b --mime-encoding'.format(filepath)) as proc: - return proc.read() + return proc.read().replace('\n', '') -def from_csv(filepath, encoding='utf-8'): - return agate.Table.from_csv(filepath, sniff_limit=SNIFF_LIMIT, encoding=encoding) +def from_csv(filepath, **agate_params): + return agate.Table.from_csv(filepath, **agate_params) def from_excel(filepath): @@ -36,10 +38,27 @@ def to_sql(table, _hash, storage): table.to_sql(db_info['dsn'], db_info['db_name'], overwrite=True) -def parse(filepath, _hash, storage='.'): +def parse(filepath, _hash, storage='.', parse_module=None): if is_binary(filepath): table = from_excel(filepath) else: encoding = detect_encoding(filepath) - table = from_csv(filepath, encoding=encoding) + agate_params = { + 'encoding': encoding, + 'sniff_limit': SNIFF_LIMIT, + } + # TODO exception here do not bubble up to parseview.py :thinking: + if parse_module: + with open(filepath, encoding=encoding) as f: + try: + pm = __import__(parse_module) + except ModuleNotFoundError: + log.warning('Pre-parse module "{}" not found'.format(parse_module)) + else: + delimiter, skip_lines = pm.parse_csv(list(islice(f, MAX_PREPARSE_LINES))) + agate_params.update({ + 'delimiter': delimiter, + 'skip_lines': skip_lines, + }) + table = from_csv(filepath, **agate_params) return to_sql(table, _hash, storage) diff --git a/csvapi/parseview.py b/csvapi/parseview.py index 79fc9c6..cbcb7de 100644 --- a/csvapi/parseview.py +++ b/csvapi/parseview.py @@ -44,7 +44,8 @@ def do_parse_in_thread(): tmp.write(chunk) tmp.close() try: - parse(tmp.name, _hash, storage=request.app.config.DB_ROOT_DIR) + parse(tmp.name, _hash, storage=request.app.config.DB_ROOT_DIR, + parse_module=request.app.config.PARSE_MODULE) except Exception as e: return api_error('Error parsing CSV', details=str(e)) finally: