From 1d3e540d287850439a88b15fe7c4bd7f6a4d3bc0 Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 09:43:04 -0600 Subject: [PATCH 01/11] tz --- csv2es.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/csv2es.py b/csv2es.py index 642e3ba..38770f4 100644 --- a/csv2es.py +++ b/csv2es.py @@ -15,6 +15,8 @@ import csv import json import sys +import calendar + from threading import local import click @@ -24,7 +26,7 @@ from pyelasticsearch import ElasticHttpNotFoundError from pyelasticsearch import IndexAlreadyExistsError from retrying import retry - +from dateutil import parser __version__ = '1.0.1' thread_local = local() @@ -41,7 +43,7 @@ def echo(message, quiet): click.echo(message) -def documents_from_file(es, filename, delimiter, quiet): +def documents_from_file(es, filename, delimiter, quiet, csv_date_field, csv_date_field_gmt_offset): """ Return a generator for pulling rows from a given delimited file. @@ -62,6 +64,16 @@ def all_docs(): reader = csv.DictReader(doc_file, delimiter=delimiter, fieldnames=fieldnames) count = 0 for row in reader: + + # parse csv_date_field into elasticsearch compatible epoch_millis + if csv_date_field + date_val_str = row[csv_date_field] + if date_val_str + date_obj = parser.parse(date_val_str.strip()) + if not csv_date_field_gmt_offset + csv_date_field_gmt_offset = 0 + row[csv_date_field] = int(calendar.timegm(date_obj) - csv_date_field_gmt_offset) * 1000 + count += 1 if count % 10000 == 0: echo('Sent documents: ' + str(count), quiet) @@ -164,9 +176,14 @@ def sanitize_delimiter(delimiter, is_tab): help='Delete existing index if it exists') @click.option('--quiet', is_flag=True, required=False, help='Minimize console output') +@click.option('--csv-date-field', required=False, + help='The CSV header name that represents a date string to parsed (via python-dateutil) into an ElasticSearch epoch_millis') +@click.option('--csv-date-field-gmt-offset', required=False, + help='The GMT offset for the csv-date-field (i.e. +/- N hours)') @click.version_option(version=__version__, ) def cli(index_name, delete_index, mapping_file, doc_type, import_file, - delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, quiet): + delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, quiet, + csv_date_field, csv_date_field_gmt_offset): """ Bulk import a delimited file into a target Elasticsearch instance. Common delimited files include things like CSV and TSV. @@ -207,7 +224,7 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file, es.put_mapping(index_name, doc_type, mapping) target_delimiter = sanitize_delimiter(delimiter, tab) - documents = documents_from_file(es, import_file, target_delimiter, quiet) + documents = documents_from_file(es, import_file, target_delimiter, quiet, csv_date_field, csv_date_field_gmt_offset) perform_bulk_index(host, index_name, doc_type, documents, docs_per_chunk, bytes_per_chunk, parallel) From 48390fd30abd6f77244a9f8d4809132875b437da Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 09:48:34 -0600 Subject: [PATCH 02/11] tz --- csv2es.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/csv2es.py b/csv2es.py index 38770f4..69c62a3 100644 --- a/csv2es.py +++ b/csv2es.py @@ -66,11 +66,11 @@ def all_docs(): for row in reader: # parse csv_date_field into elasticsearch compatible epoch_millis - if csv_date_field + if csv_date_field: date_val_str = row[csv_date_field] - if date_val_str + if date_val_str: date_obj = parser.parse(date_val_str.strip()) - if not csv_date_field_gmt_offset + if not csv_date_field_gmt_offset: csv_date_field_gmt_offset = 0 row[csv_date_field] = int(calendar.timegm(date_obj) - csv_date_field_gmt_offset) * 1000 From 6a164e72feb89096d12fcc7a0bc79f263f9b796f Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 10:02:56 -0600 Subject: [PATCH 03/11] tz --- csv2es.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/csv2es.py b/csv2es.py index 69c62a3..9343ff0 100644 --- a/csv2es.py +++ b/csv2es.py @@ -16,6 +16,7 @@ import json import sys import calendar +from pprint import pprint from threading import local @@ -67,6 +68,7 @@ def all_docs(): # parse csv_date_field into elasticsearch compatible epoch_millis if csv_date_field: + pprint(row) date_val_str = row[csv_date_field] if date_val_str: date_obj = parser.parse(date_val_str.strip()) From c8e6bae38a8f8204e39e50f1fd9d6441dc82c550 Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 11:08:07 -0600 Subject: [PATCH 04/11] tz --- csv2es.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/csv2es.py b/csv2es.py index 9343ff0..f2130a6 100644 --- a/csv2es.py +++ b/csv2es.py @@ -44,7 +44,7 @@ def echo(message, quiet): click.echo(message) -def documents_from_file(es, filename, delimiter, quiet, csv_date_field, csv_date_field_gmt_offset): +def documents_from_file(es, filename, delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset): """ Return a generator for pulling rows from a given delimited file. @@ -66,15 +66,23 @@ def all_docs(): count = 0 for row in reader: + # the row from DictReader needs to be cleaned + if csv_clean_fieldnames: + cleaned_row = {} + + # strip all double quotes from keys and lower-cast + for k, v in row.iteritems(): + cleaned_row[k.replace('"',"").lower()] = v + + row = cleaned_row + # parse csv_date_field into elasticsearch compatible epoch_millis if csv_date_field: - pprint(row) date_val_str = row[csv_date_field] + if date_val_str: date_obj = parser.parse(date_val_str.strip()) - if not csv_date_field_gmt_offset: - csv_date_field_gmt_offset = 0 - row[csv_date_field] = int(calendar.timegm(date_obj) - csv_date_field_gmt_offset) * 1000 + row[csv_date_field] = int(calendar.timegm(date_obj.timetuple()) + (csv_date_field_gmt_offset * (3600))) * 1000 count += 1 if count % 10000 == 0: @@ -178,14 +186,17 @@ def sanitize_delimiter(delimiter, is_tab): help='Delete existing index if it exists') @click.option('--quiet', is_flag=True, required=False, help='Minimize console output') +@click.option('--csv-clean-fieldnames', is_flag=True, required=False, + help='Strips double quotes and lower-cases all CSV header names for proper ElasticSearch fieldnames') @click.option('--csv-date-field', required=False, help='The CSV header name that represents a date string to parsed (via python-dateutil) into an ElasticSearch epoch_millis') -@click.option('--csv-date-field-gmt-offset', required=False, +@click.option('--csv-date-field-gmt-offset', required=False, type=int, help='The GMT offset for the csv-date-field (i.e. +/- N hours)') @click.version_option(version=__version__, ) def cli(index_name, delete_index, mapping_file, doc_type, import_file, delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, quiet, - csv_date_field, csv_date_field_gmt_offset): + csv_clean_fieldnames,csv_date_field, csv_date_field_gmt_offset): + """ Bulk import a delimited file into a target Elasticsearch instance. Common delimited files include things like CSV and TSV. @@ -226,7 +237,7 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file, es.put_mapping(index_name, doc_type, mapping) target_delimiter = sanitize_delimiter(delimiter, tab) - documents = documents_from_file(es, import_file, target_delimiter, quiet, csv_date_field, csv_date_field_gmt_offset) + documents = documents_from_file(es, import_file, target_delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset) perform_bulk_index(host, index_name, doc_type, documents, docs_per_chunk, bytes_per_chunk, parallel) From a930665f4e1f56a651c1345e4d141c8d4b710a7e Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 11:09:08 -0600 Subject: [PATCH 05/11] tz --- csv2es.py | 1 - 1 file changed, 1 deletion(-) diff --git a/csv2es.py b/csv2es.py index f2130a6..7dced81 100644 --- a/csv2es.py +++ b/csv2es.py @@ -16,7 +16,6 @@ import json import sys import calendar -from pprint import pprint from threading import local From 281fe3ac3c9b6480d99549d597f5d533a557f8ad Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 11:15:15 -0600 Subject: [PATCH 06/11] tz --- .gitignore | 1 + csv2es.py | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 3010f49..aaeaa84 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ dist *.pyc *.egg-info +*.iml diff --git a/csv2es.py b/csv2es.py index 7dced81..779a8de 100644 --- a/csv2es.py +++ b/csv2es.py @@ -54,9 +54,9 @@ def documents_from_file(es, filename, delimiter, quiet, csv_clean_fieldnames, cs :return: generator returning document-indexing operations """ def all_docs(): - with open(filename, 'rb') if filename != '-' else sys.stdin as doc_file: + with open(filename, 'r') if filename != '-' else sys.stdin as doc_file: # delimited file should include the field names as the first row - fieldnames = doc_file.next().strip().split(delimiter) + fieldnames = doc_file.readline().strip().split(delimiter) echo('Using the following ' + str(len(fieldnames)) + ' fields:', quiet) for fieldname in fieldnames: echo(fieldname, quiet) @@ -183,6 +183,8 @@ def sanitize_delimiter(delimiter, is_tab): help='Parallel uploads to send at once, defaults to 1') @click.option('--delete-index', is_flag=True, required=False, help='Delete existing index if it exists') +@click.option('--existing-index', is_flag=True, required=False, + help='Don\'t create index.') @click.option('--quiet', is_flag=True, required=False, help='Minimize console output') @click.option('--csv-clean-fieldnames', is_flag=True, required=False, @@ -193,7 +195,7 @@ def sanitize_delimiter(delimiter, is_tab): help='The GMT offset for the csv-date-field (i.e. +/- N hours)') @click.version_option(version=__version__, ) def cli(index_name, delete_index, mapping_file, doc_type, import_file, - delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, quiet, + delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, existing_index, quiet, csv_clean_fieldnames,csv_date_field, csv_date_field_gmt_offset): """ @@ -222,11 +224,12 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file, except ElasticHttpNotFoundError: echo('Index ' + index_name + ' not found, nothing to delete', quiet) - try: - es.create_index(index_name) - echo('Created new index: ' + index_name, quiet) - except IndexAlreadyExistsError: - echo('Index ' + index_name + ' already exists', quiet) + if not existing_index: + try: + es.create_index(index_name) + echo('Created new index: ' + index_name, quiet) + except IndexAlreadyExistsError: + echo('Index ' + index_name + ' already exists', quiet) echo('Using document type: ' + doc_type, quiet) if mapping_file: From 4d07f7d18d64238af41dc8f27ee4c522e2466d55 Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 11:45:24 -0600 Subject: [PATCH 07/11] tz --- csv2es.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/csv2es.py b/csv2es.py index 779a8de..11ff654 100644 --- a/csv2es.py +++ b/csv2es.py @@ -27,6 +27,8 @@ from pyelasticsearch import IndexAlreadyExistsError from retrying import retry from dateutil import parser +from datetime import timedelta +from pprint import pprint __version__ = '1.0.1' thread_local = local() @@ -81,7 +83,12 @@ def all_docs(): if date_val_str: date_obj = parser.parse(date_val_str.strip()) - row[csv_date_field] = int(calendar.timegm(date_obj.timetuple()) + (csv_date_field_gmt_offset * (3600))) * 1000 + pprint(date_obj) + corrected_offset = (csv_date_field_gmt_offset * -1) + date_obj = date_obj + timedelta(hours=corrected_offset) + pprint(date_obj) + row[csv_date_field] = int(calendar.timegm(date_obj.timetuple())) * 1000 + pprint(row) count += 1 if count % 10000 == 0: From d17f4dbb08a92dac3f166e52c39f5a6ce4dc9a7f Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 11:45:49 -0600 Subject: [PATCH 08/11] tz --- csv2es.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/csv2es.py b/csv2es.py index 11ff654..e8d3b33 100644 --- a/csv2es.py +++ b/csv2es.py @@ -83,12 +83,9 @@ def all_docs(): if date_val_str: date_obj = parser.parse(date_val_str.strip()) - pprint(date_obj) corrected_offset = (csv_date_field_gmt_offset * -1) date_obj = date_obj + timedelta(hours=corrected_offset) - pprint(date_obj) row[csv_date_field] = int(calendar.timegm(date_obj.timetuple())) * 1000 - pprint(row) count += 1 if count % 10000 == 0: From b8b8c82f9cb8d7a053483131e1faa0c90ad95d69 Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 11:59:08 -0600 Subject: [PATCH 09/11] tz --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index ef07c4d..6c6e42d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,5 +12,6 @@ python: install: - pip install . + - pip install python-dateutil script: python setup.py test From d83de524962ae7ffaa5f9edfb450ab8ce29dc08a Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Tue, 12 Sep 2017 20:02:25 -0600 Subject: [PATCH 10/11] tags --- csv2es.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/csv2es.py b/csv2es.py index e8d3b33..3d11eaf 100644 --- a/csv2es.py +++ b/csv2es.py @@ -45,7 +45,7 @@ def echo(message, quiet): click.echo(message) -def documents_from_file(es, filename, delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset): +def documents_from_file(es, filename, delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset, tags): """ Return a generator for pulling rows from a given delimited file. @@ -77,6 +77,13 @@ def all_docs(): row = cleaned_row + # tags? + if tags: + kv_pairs = tags.split(",") + for kv_pair in kv_pairs: + kv = kv_pair.split("=") + row[kv[0]] = kv[1] + # parse csv_date_field into elasticsearch compatible epoch_millis if csv_date_field: date_val_str = row[csv_date_field] @@ -197,10 +204,12 @@ def sanitize_delimiter(delimiter, is_tab): help='The CSV header name that represents a date string to parsed (via python-dateutil) into an ElasticSearch epoch_millis') @click.option('--csv-date-field-gmt-offset', required=False, type=int, help='The GMT offset for the csv-date-field (i.e. +/- N hours)') +@click.option('--tags', required=False, + help='Custom static key1=val1,key2=val2 pairs to tag all entries with') @click.version_option(version=__version__, ) def cli(index_name, delete_index, mapping_file, doc_type, import_file, delimiter, tab, host, docs_per_chunk, bytes_per_chunk, parallel, existing_index, quiet, - csv_clean_fieldnames,csv_date_field, csv_date_field_gmt_offset): + csv_clean_fieldnames,csv_date_field, csv_date_field_gmt_offset, tags): """ Bulk import a delimited file into a target Elasticsearch instance. Common @@ -243,7 +252,7 @@ def cli(index_name, delete_index, mapping_file, doc_type, import_file, es.put_mapping(index_name, doc_type, mapping) target_delimiter = sanitize_delimiter(delimiter, tab) - documents = documents_from_file(es, import_file, target_delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset) + documents = documents_from_file(es, import_file, target_delimiter, quiet, csv_clean_fieldnames, csv_date_field, csv_date_field_gmt_offset, tags) perform_bulk_index(host, index_name, doc_type, documents, docs_per_chunk, bytes_per_chunk, parallel) From db5ca3721b58199cc4b262f36c87399dbfedfc9c Mon Sep 17 00:00:00 2001 From: bitsofinfo Date: Sun, 3 Nov 2019 07:07:06 -0700 Subject: [PATCH 11/11] Update README.rst --- README.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.rst b/README.rst index 5a747bc..92e5c43 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,10 @@ csv2es ========================= +-------- +IMPORTANT: This is a FORK of the original project. See this link for issues this fork addresses: https://github.com/rholder/csv2es/pulls/bitsofinfo +-------- + .. image:: https://img.shields.io/pypi/v/csv2es.svg :target: https://pypi.python.org/pypi/csv2es