From b4dc2a5603284d74319ba16e77f4ee6c0d1f0e0d Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 31 Jan 2022 17:12:56 -0500 Subject: [PATCH 01/30] Added SQL queries to generate edgelists Fixes to SQL Commands --- torchbiggraph/examples/e2e/sql_templates.py | 72 +++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 torchbiggraph/examples/e2e/sql_templates.py diff --git a/torchbiggraph/examples/e2e/sql_templates.py b/torchbiggraph/examples/e2e/sql_templates.py new file mode 100644 index 00000000..11cb5b8f --- /dev/null +++ b/torchbiggraph/examples/e2e/sql_templates.py @@ -0,0 +1,72 @@ +type_tmp_table = """ +DROP TABLE IF EXISTS {type}_id2part +; + +create temporary table {type}_id2part as + select id, abs(random()) % {nparts} as part + from ( + select distinct source_id as id from edges where source_type='{type}' + union + select distinct destination_id as id from edges where destination_type='{type}' + ) + +; +""" + +partitioned_mapped_entities = """ +DROP TABLE IF EXISTS {type}_ids_map_{n} +; + +create table {type}_ids_map_{n} as +select + f.id + , f.part + , '{type}' as type + , (ROW_NUMBER() OVER(ORDER BY f.id)) - 1 as graph_id +from {type}_id2part f +where f.part = {n} +order by 2 desc, 1 asc +; +""" + +remap_relns = """ +DROP TABLE IF EXISTS reln_map +; + +create table reln_map as +select f.rel as id, source_type, destination_type, (ROW_NUMBER() OVER(ORDER BY f.rel)) - 1 as graph_id +from ( + select distinct rel, source_type, destination_type + from edges +) f +""" + +edgelist_cte_mapper = """ + select lhs.graph_id as source_id, rel.graph_id as rel_id, rhs.graph_id as destination_id + from edges g + join reln_map rel on (rel.id = g.rel) + join {lhs_type}_ids_map_{i} lhs on ( + lhs.id = g.source_id and + g.source_type = rel.source_type and + lhs.type = g.source_type + ) + join {rhs_type}_ids_map_{j} rhs on ( + rhs.id = g.destination_id and + g.destination_type = rel.destination_type and + rhs.type = g.destination_type + ) + where g.rel = '{rel_name}' +""" + +edges_partitioned = """ +DROP TABLE IF EXISTS edges_{i}_{j} +; + +create table edges_{i}_{j} as +{ctes} +select * +from ( +{tables} +) +; +""" \ No newline at end of file From 5b3676ccc50d76cfdd242dc8c63e1d330008bcb9 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 2 Feb 2022 14:05:36 -0500 Subject: [PATCH 02/30] Added the script to generate all PBG files from a SQL database --- torchbiggraph/examples/e2e/data_prep.py | 304 ++++++++++++++++++++++++ 1 file changed, 304 insertions(+) create mode 100644 torchbiggraph/examples/e2e/data_prep.py diff --git a/torchbiggraph/examples/e2e/data_prep.py b/torchbiggraph/examples/e2e/data_prep.py new file mode 100644 index 00000000..4385b52b --- /dev/null +++ b/torchbiggraph/examples/e2e/data_prep.py @@ -0,0 +1,304 @@ +import itertools +import h5py +import json +import logging +import multiprocessing as mp +import pandas as pd +from pathlib import Path +import sqlite3 + +from sklearn.multiclass import OutputCodeClassifier +from sql_templates import ( + edges_partitioned, + edgelist_cte_mapper, + remap_relns, + partitioned_mapped_entities, + type_tmp_table +) +import sys +import time + +logging.basicConfig( + format='%(process)d-%(levelname)s-%(message)s', + stream = sys.stdout, + level=logging.DEBUG +) + +def remap_relationships(conn): + """ + A function to remap relationships using SQL queries. + """ + logging.info("Remapping relationships") + start = time.time() + logging.debug(f"Running query: {remap_relns}\n") + conn.executescript(remap_relns) + + query = """ + select * + from reln_map + """ + logging.debug(f"Running query: {query}\n") + rels = pd.read_sql_query(query, conn) + end = time.time() + logging.info(f"Remapped relationships in {end - start}s") + return rels + + +def remap_entities(conn, entity2partitions): + """ + A function to remap entities with partitioning using SQL queries. + + This function is complicated because the partitions have to be + constructed first, and then we have to generate ordinal mappings of + entity ids. These mappings will be used to generate buckets of edges + for training and then for mapping our edges back to their original + ids for use in downstream tasks. + """ + logging.info("Remapping entities") + start=time.time() + query = "" + for entity, npartitions in entity2partitions.items(): + query = type_tmp_table.format(type=entity, nparts=npartitions) + + for i in range(npartitions): + query += partitioned_mapped_entities.format(type=entity, n=i) + logging.debug(f"Running query: {query}") + conn.executescript(query) + end = time.time() + logging.info(f"Remapped entities in {end - start}s") + + +def generate_ctes(lhs_part, rhs_part, rels, entity2partitions): + """ + This function generates the sub-table CTEs that help us generate + the completed edgelist. + """ + nctes = 0 + ctes = """ + with cte_0 as ( + """ + first = True + for _ , r in rels.iterrows(): + if lhs_part >= entity2partitions[r['source_type']]: + continue + if rhs_part >= entity2partitions[r['destination_type']]: + continue + if not first: + ctes += f", cte_{nctes} as (" + ctes += edgelist_cte_mapper.format( + rel_name=r['id'], + lhs_type=r['source_type'], + rhs_type=r['destination_type'], + i = lhs_part, + j = rhs_part, + ) + ctes += ")" + + nctes += 1 + first = False + return nctes, ctes + + +def generate_unions(nctes): + """ + This function is just a helper function for + generating the final edge list tables. + """ + subquery = "" + first = True + for i in range(nctes): + if not first: + subquery += "\tunion\n" + subquery += f"\tselect * from cte_{i}\n" + first = False + return subquery + + +def remap_edges(conn, rels, entity2partitions): + """ + A function to remap all edges to ordinal IDs + according to their type. + """ + logging.info("Remapping edges") + start = time.time() + + nentities_premap = conn.execute(""" + select count(*) from edges + ; + """).fetchall()[0][0] + + query = "" + NPARTS = max(entity2partitions.values()) + for lhs_part in range(NPARTS): + for rhs_part in range(NPARTS): + nctes, ctes = generate_ctes(lhs_part, rhs_part, rels, entity2partitions) + subquery = generate_unions(nctes) + query += edges_partitioned.format( + i = lhs_part, + j = rhs_part, + ctes=ctes, + tables=subquery + ) + + logging.debug(f"Running query: {query}") + conn.executescript(query) + + logging.debug("Confirming that we didn't drop any edges.") + nentities_postmap = 0 + for lhs_part in range(NPARTS): + for rhs_part in range(NPARTS): + nentities_postmap += conn.execute(f""" + select count(*) from edges_{lhs_part}_{rhs_part} + """).fetchall()[0][0] + + if nentities_postmap != nentities_premap: + logging.warning("DROPPED EDGES DURING REMAPPING.") + logging.warning(f"We started with {nentities_premap} and finished with {nentities_postmap}") + + end = time.time() + logging.info(f"Remapped edges in {end - start}s") + + +def load_edges(fname, conn): + """ + A simple function to load the edges into the SQL table. It is + assumed that we will have a file of the form: + | source_id | source_type | relationship_name | destination_id | destination_type | + """ + logging.info("Loading edges") + start = time.time() + cur = conn.cursor() + cur.executescript(""" + DROP TABLE IF EXISTS edges + ; + + CREATE TABLE edges ( + source_id INTEGER, + source_type TEXT, + destination_id INTEGER, + destination_type TEXT, + rel TEXT + ) + """) + + edges = pd.read_csv(fname) + edges.to_sql('edges', conn, if_exists='append', index=False) + end = time.time() + logging.info(f"Loading edges in {end - start}s") + + +def write_relations(outdir, rels, conn): + """ + A simple function to write the relevant relationship information out + for training. + """ + logging.info("Writing relations for training") + start = time.time() + out = rels.sort_values('graph_id')['id'].to_list() + with open(f'{outdir}/dynamic_rel_names.json', mode='w') as f: + json.dump(out, f, indent=4) + end = time.time() + logging.info(f"Wrote relations in {end - start}s") + + +def write_single_edge(work_packet): + """ + A function to write out a single edge-lists in the format that + PyTorch BigGraph expects. + + The work packet is expected to come contain information about + the lhs and rhs partitions for these edges, the directory + where we should put this information, and the database + connection that we should use. + """ + lhs_part, rhs_part, outdir, conn = work_packet + query = f""" + select * + from edges_{lhs_part}_{rhs_part} + ; + """ + df = pd.read_sql_query(query, conn) + print(query) + out_name = f'{outdir}/edges_{lhs_part}_{rhs_part}.h5' + with h5py.File(out_name, mode='w') as f: + # we need this for https://github.com/facebookresearch/PyTorch-BigGraph/blob/main/torchbiggraph/graph_storages.py#L400 + f.attrs['format_version'] = 1 + for dset, colname in [('lhs', 'source_id'), ('rhs', 'destination_id'), ('rel', 'rel_id')]: + f.create_dataset(dset, dtype='i', shape=(len(df),), maxshape=(None, )) + f[dset][0 : len(df)] = df[colname].tolist() + + +def write_edges(outdir, LHS_PARTS, RHS_PARTS, conn): + """ + A function to write out all edge-lists in the format + that PyTorch BigGraph expects. + """ + logging.info(f"Writing edges, {LHS_PARTS}, {RHS_PARTS}") + start = time.time() + + # I would write these using multiprocessing but SQLite connections + # aren't pickelable, and I'd like to keep this simple + worklist = list(itertools.product(range(LHS_PARTS), range(RHS_PARTS), ['training_data'], [conn])) + for w in worklist: + write_single_edge(w) + + end = time.time() + logging.info(f"Wrote edges in {end - start}s") + + +def write_entities(outdir, entity2partitions, conn): + """ + A function to write out all of the training relevant + entity information that PyTorch BigGraph expects + """ + logging.info("Writing entites for training") + start = time.time() + for entity_type, nparts in entity2partitions.items(): + for i in range(nparts): + query = f""" + select count(*) + from {entity_type}_ids_map_{i} + """ + sz = conn.execute(query).fetchall()[0][0] + with open(f'{outdir}/entity_count_{entity_type}_id_{i}.txt', mode='w') as f: + f.write(f"{sz}\n") + end = time.time() + logging.info(f"Wrote entites in {end - start}s") + + +def write_training_data(outdir, rels, entity2partitions, conn): + """ + A function to write out all of the training relevant + information that PyTorch BigGraph expects + """ + LHS_PARTS = 1 + RHS_PARTS = 1 + for i, r in rels.iterrows(): + if entity2partitions[r['source_type']] > LHS_PARTS: + LHS_PARTS = entity2partitions[r['source_type']] + if entity2partitions[r['destination_type']] > RHS_PARTS: + RHS_PARTS = entity2partitions[r['destination_type']] + + write_relations(outdir, rels, conn) + write_edges(rels, LHS_PARTS, RHS_PARTS, conn) + write_entities(outdir, entity2partitions, conn) + + +def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/'): + conn = sqlite3.connect("citationv2.db") + load_edges(edge_file_name, conn) + + entity2partitions = { + 'paper': NPARTS, + 'year': 1, + } + + rels = remap_relationships(conn) + remap_entities(conn, entity2partitions) + remap_edges(conn, rels, entity2partitions) + out = Path(outdir).mkdir(parents=True, exist_ok=True) + write_training_data(out, rels, entity2partitions, conn) + + +if __name__ == '__main__': + main() \ No newline at end of file From 75d3e5d2c571771efac8ede3ecacc7aaf757ea60 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 2 Feb 2022 14:07:40 -0500 Subject: [PATCH 03/30] Added comment . --- torchbiggraph/examples/e2e/data_prep.py | 29 +++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/torchbiggraph/examples/e2e/data_prep.py b/torchbiggraph/examples/e2e/data_prep.py index 4385b52b..11778caf 100644 --- a/torchbiggraph/examples/e2e/data_prep.py +++ b/torchbiggraph/examples/e2e/data_prep.py @@ -24,6 +24,26 @@ level=logging.DEBUG ) +""" +This is intended as a simple end-to-end example of how to get your data into +the format that PyTorch BigGraph expects using SQL. It's implemented in SQLite +for portability, but similar techniques scale to 100bn edges using cloud +databases such as BigQuery. This pipeline can be split into three different +components: + +1. Data preparation +2. Data verification/checking +3. Training + +To run the pipeline, you'll first need to download the edges.csv file, +available HERE (TODO: INSERT LINK). This graph was constructed by +taking the [ogbl-citation2](https://github.com/snap-stanford/ogb) graph, and +adding edges for both paper-citations and years-published. While this graph +might not make a huge amount of sense, it's intended to largely fulfill a +pedagogical purpose. In the data preparation stage, we first load the graph +into a SQLite database, and then we transform and partition it. +""" + def remap_relationships(conn): """ A function to remap relationships using SQL queries. @@ -286,7 +306,7 @@ def write_training_data(outdir, rels, entity2partitions, conn): def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/'): conn = sqlite3.connect("citationv2.db") - load_edges(edge_file_name, conn) + # load_edges(edge_file_name, conn) entity2partitions = { 'paper': NPARTS, @@ -294,9 +314,10 @@ def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/'): } rels = remap_relationships(conn) - remap_entities(conn, entity2partitions) - remap_edges(conn, rels, entity2partitions) - out = Path(outdir).mkdir(parents=True, exist_ok=True) + # remap_entities(conn, entity2partitions) + # remap_edges(conn, rels, entity2partitions) + Path(outdir).mkdir(parents=True, exist_ok=True) + out = Path(outdir) write_training_data(out, rels, entity2partitions, conn) From c69072ade4e10c279adca893e5a3a5cb12ab9665 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 9 Feb 2022 20:36:20 -0500 Subject: [PATCH 04/30] Moved comment to readme --- torchbiggraph/examples/e2e/data_prep.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/torchbiggraph/examples/e2e/data_prep.py b/torchbiggraph/examples/e2e/data_prep.py index 11778caf..f3b09363 100644 --- a/torchbiggraph/examples/e2e/data_prep.py +++ b/torchbiggraph/examples/e2e/data_prep.py @@ -24,26 +24,6 @@ level=logging.DEBUG ) -""" -This is intended as a simple end-to-end example of how to get your data into -the format that PyTorch BigGraph expects using SQL. It's implemented in SQLite -for portability, but similar techniques scale to 100bn edges using cloud -databases such as BigQuery. This pipeline can be split into three different -components: - -1. Data preparation -2. Data verification/checking -3. Training - -To run the pipeline, you'll first need to download the edges.csv file, -available HERE (TODO: INSERT LINK). This graph was constructed by -taking the [ogbl-citation2](https://github.com/snap-stanford/ogb) graph, and -adding edges for both paper-citations and years-published. While this graph -might not make a huge amount of sense, it's intended to largely fulfill a -pedagogical purpose. In the data preparation stage, we first load the graph -into a SQLite database, and then we transform and partition it. -""" - def remap_relationships(conn): """ A function to remap relationships using SQL queries. From 7b0f05a02f391e2605820785dee664db42def564 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 9 Feb 2022 20:36:32 -0500 Subject: [PATCH 05/30] Started to write a readme --- torchbiggraph/examples/e2e/README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 torchbiggraph/examples/e2e/README.md diff --git a/torchbiggraph/examples/e2e/README.md b/torchbiggraph/examples/e2e/README.md new file mode 100644 index 00000000..f02fa2e0 --- /dev/null +++ b/torchbiggraph/examples/e2e/README.md @@ -0,0 +1,21 @@ +This is intended as a simple end-to-end example of how to get your data into +the format that PyTorch BigGraph expects using SQL. It's implemented in SQLite +for portability, but similar techniques scale to billions of edges using cloud +databases such as BigQuery. This pipeline can be split into three different +components: + +1. Data preparation +2. Data verification/checking +3. Training + +To run the pipeline, you'll first need to download the edges.csv file, +available HERE (TODO: INSERT LINK). This graph was constructed by +taking the [ogbl-citation2](https://github.com/snap-stanford/ogb) graph, and +adding edges for both paper-citations and years-published. While this graph +might not make a huge amount of sense, it's intended to largely fulfill a +pedagogical purpose. + +In the data preparation stage, we first load the graph +into a SQLite database and then we transform and partition it. The transformation +can be understood as first generating a mapping between the graph-ids and +ordinal ids per-type that PBG will expect. \ No newline at end of file From 5c0adae1296fa5b01740b0a7acfdbbba1e050e47 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 9 Feb 2022 20:36:47 -0500 Subject: [PATCH 06/30] Added blank check file --- torchbiggraph/examples/e2e/check.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 torchbiggraph/examples/e2e/check.py diff --git a/torchbiggraph/examples/e2e/check.py b/torchbiggraph/examples/e2e/check.py new file mode 100644 index 00000000..e69de29b From a2d1e1f9a3203ccef5d3bd938e74e3bb5faa8ad9 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 14 Feb 2022 11:19:15 -0500 Subject: [PATCH 07/30] pr comments --- torchbiggraph/examples/e2e/data_prep.py | 41 +++++++++++---------- torchbiggraph/examples/e2e/sql_templates.py | 26 ++++++------- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/torchbiggraph/examples/e2e/data_prep.py b/torchbiggraph/examples/e2e/data_prep.py index f3b09363..a03c5d76 100644 --- a/torchbiggraph/examples/e2e/data_prep.py +++ b/torchbiggraph/examples/e2e/data_prep.py @@ -7,13 +7,13 @@ from pathlib import Path import sqlite3 -from sklearn.multiclass import OutputCodeClassifier +from config_template import CONFIG_TEMPLATE from sql_templates import ( edges_partitioned, edgelist_cte_mapper, remap_relns, partitioned_mapped_entities, - type_tmp_table + QUERY_MAKE_ID2PART_TBL ) import sys import time @@ -35,7 +35,7 @@ def remap_relationships(conn): query = """ select * - from reln_map + from tmp_reln_map """ logging.debug(f"Running query: {query}\n") rels = pd.read_sql_query(query, conn) @@ -58,7 +58,7 @@ def remap_entities(conn, entity2partitions): start=time.time() query = "" for entity, npartitions in entity2partitions.items(): - query = type_tmp_table.format(type=entity, nparts=npartitions) + query = QUERY_MAKE_ID2PART_TBL.format(type=entity, nparts=npartitions) for i in range(npartitions): query += partitioned_mapped_entities.format(type=entity, n=i) @@ -148,7 +148,7 @@ def remap_edges(conn, rels, entity2partitions): for lhs_part in range(NPARTS): for rhs_part in range(NPARTS): nentities_postmap += conn.execute(f""" - select count(*) from edges_{lhs_part}_{rhs_part} + select count(*) from tmp_edges_{lhs_part}_{rhs_part} """).fetchall()[0][0] if nentities_postmap != nentities_premap: @@ -163,7 +163,9 @@ def load_edges(fname, conn): """ A simple function to load the edges into the SQL table. It is assumed that we will have a file of the form: - | source_id | source_type | relationship_name | destination_id | destination_type | + | source_id | source_type | relationship_name | destination_id | destination_type |. + + For production applications you wouldn't use this step; it's just for our example. """ logging.info("Loading edges") start = time.time() @@ -201,7 +203,7 @@ def write_relations(outdir, rels, conn): logging.info(f"Wrote relations in {end - start}s") -def write_single_edge(work_packet): +def write_single_bucket(work_packet): """ A function to write out a single edge-lists in the format that PyTorch BigGraph expects. @@ -214,7 +216,7 @@ def write_single_edge(work_packet): lhs_part, rhs_part, outdir, conn = work_packet query = f""" select * - from edges_{lhs_part}_{rhs_part} + from tmp_edges_{lhs_part}_{rhs_part} ; """ df = pd.read_sql_query(query, conn) @@ -228,19 +230,19 @@ def write_single_edge(work_packet): f[dset][0 : len(df)] = df[colname].tolist() -def write_edges(outdir, LHS_PARTS, RHS_PARTS, conn): +def write_all_buckets(outdir, lhs_parts, rhs_parts, conn): """ A function to write out all edge-lists in the format that PyTorch BigGraph expects. """ - logging.info(f"Writing edges, {LHS_PARTS}, {RHS_PARTS}") + logging.info(f"Writing edges, {lhs_parts}, {rhs_parts}") start = time.time() # I would write these using multiprocessing but SQLite connections # aren't pickelable, and I'd like to keep this simple - worklist = list(itertools.product(range(LHS_PARTS), range(RHS_PARTS), ['training_data'], [conn])) + worklist = list(itertools.product(range(lhs_parts), range(rhs_parts), ['training_data'], [conn])) for w in worklist: - write_single_edge(w) + write_single_bucket(w) end = time.time() logging.info(f"Wrote edges in {end - start}s") @@ -271,20 +273,19 @@ def write_training_data(outdir, rels, entity2partitions, conn): A function to write out all of the training relevant information that PyTorch BigGraph expects """ - LHS_PARTS = 1 - RHS_PARTS = 1 + lhs_parts = 1 + rhs_parts = 1 for i, r in rels.iterrows(): - if entity2partitions[r['source_type']] > LHS_PARTS: - LHS_PARTS = entity2partitions[r['source_type']] - if entity2partitions[r['destination_type']] > RHS_PARTS: - RHS_PARTS = entity2partitions[r['destination_type']] + if entity2partitions[r['source_type']] > lhs_parts: + lhs_parts = entity2partitions[r['source_type']] + if entity2partitions[r['destination_type']] > rhs_parts: + rhs_parts = entity2partitions[r['destination_type']] write_relations(outdir, rels, conn) - write_edges(rels, LHS_PARTS, RHS_PARTS, conn) + write_all_buckets(rels, lhs_parts, rhs_parts, conn) write_entities(outdir, entity2partitions, conn) -def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/'): conn = sqlite3.connect("citationv2.db") # load_edges(edge_file_name, conn) diff --git a/torchbiggraph/examples/e2e/sql_templates.py b/torchbiggraph/examples/e2e/sql_templates.py index 11cb5b8f..4e9f123b 100644 --- a/torchbiggraph/examples/e2e/sql_templates.py +++ b/torchbiggraph/examples/e2e/sql_templates.py @@ -1,8 +1,8 @@ -type_tmp_table = """ -DROP TABLE IF EXISTS {type}_id2part +QUERY_MAKE_ID2PART_TBL = """ +DROP TABLE IF EXISTS tmp_{type}_id2part ; -create temporary table {type}_id2part as +create temporary table tmp_{type}_id2part as select id, abs(random()) % {nparts} as part from ( select distinct source_id as id from edges where source_type='{type}' @@ -14,26 +14,26 @@ """ partitioned_mapped_entities = """ -DROP TABLE IF EXISTS {type}_ids_map_{n} +DROP TABLE IF EXISTS tmp_{type}_ids_map_{n} ; -create table {type}_ids_map_{n} as +create table tmp_{type}_ids_map_{n} as select f.id , f.part , '{type}' as type , (ROW_NUMBER() OVER(ORDER BY f.id)) - 1 as graph_id -from {type}_id2part f +from tmp_{type}_id2part f where f.part = {n} order by 2 desc, 1 asc ; """ remap_relns = """ -DROP TABLE IF EXISTS reln_map +DROP TABLE IF EXISTS tmp_reln_map ; -create table reln_map as +create table tmp_reln_map as select f.rel as id, source_type, destination_type, (ROW_NUMBER() OVER(ORDER BY f.rel)) - 1 as graph_id from ( select distinct rel, source_type, destination_type @@ -44,13 +44,13 @@ edgelist_cte_mapper = """ select lhs.graph_id as source_id, rel.graph_id as rel_id, rhs.graph_id as destination_id from edges g - join reln_map rel on (rel.id = g.rel) - join {lhs_type}_ids_map_{i} lhs on ( + join tmp_reln_map rel on (rel.id = g.rel) + join tmp_{lhs_type}_ids_map_{i} lhs on ( lhs.id = g.source_id and g.source_type = rel.source_type and lhs.type = g.source_type ) - join {rhs_type}_ids_map_{j} rhs on ( + join tmp_{rhs_type}_ids_map_{j} rhs on ( rhs.id = g.destination_id and g.destination_type = rel.destination_type and rhs.type = g.destination_type @@ -59,10 +59,10 @@ """ edges_partitioned = """ -DROP TABLE IF EXISTS edges_{i}_{j} +DROP TABLE IF EXISTS tmp_edges_{i}_{j} ; -create table edges_{i}_{j} as +create table tmp_edges_{i}_{j} as {ctes} select * from ( From 334164f4c84f90e3b6d92ccc27468789f266e839 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 14 Feb 2022 11:19:25 -0500 Subject: [PATCH 08/30] Added the ability to write the config --- torchbiggraph/examples/e2e/data_prep.py | 54 +++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/torchbiggraph/examples/e2e/data_prep.py b/torchbiggraph/examples/e2e/data_prep.py index a03c5d76..a4a1635c 100644 --- a/torchbiggraph/examples/e2e/data_prep.py +++ b/torchbiggraph/examples/e2e/data_prep.py @@ -286,8 +286,55 @@ def write_training_data(outdir, rels, entity2partitions, conn): write_entities(outdir, entity2partitions, conn) +def write_rels_dict(rels): + my_rels = "" + for _, row in rels.sort_values(by="graph_id").iterrows(): + r = "{" + r += f"'name': '{row['id']}', 'lhs': '{row['source_type']}', 'rhs': '{row['destination_type']}', 'operator': op" + r += "},\n" + my_rels += r + return my_rels + + +def write_entities_dict(entity2partitions): + my_entities = "{\n" + for name, part in entity2partitions.items(): + my_entities += '{ "{name}": {"num_partitions": {part}} }'.format(name=name, part=part) + my_entities += "}\n" + return my_entities + + +def write_config(rels, entity2partitions, config_out, train_out, model_out): + with open(config_out, mode='w') as f: + f.write( + CONFIG_TEMPLATE.format( + RELN_DICT=write_rels_dict(rels), + ENTITIES_DICT=write_entities_dict(entity2partitions), + TRAINING_DIR=train_out, + MODEL_PATH=model_out, + ) + ) + + +def compute_memory_usage(entity2partitions, conn, NDIM=200): + nentities = 0 + for _type, parts in entity2partitions.items(): + ntype = 0 + for i in range(parts): + query = f""" + select count(*) as cnt + from `tmp_{_type}_ids_map_{i}` + """ + ntype = max(ntype, conn.executequery(query).fetchall()[0][0]) + nentities += ntype + + mem = 1.5 * nentities * NDIM * 8 / 1024 / 1024 / 1024 + logging.info(f"I need {mem} GBs of ram for embedding table for {NDIM} Dimensions") + + +def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/', modeldir='model/', config_dir='.'): conn = sqlite3.connect("citationv2.db") - # load_edges(edge_file_name, conn) + load_edges(edge_file_name, conn) entity2partitions = { 'paper': NPARTS, @@ -295,11 +342,12 @@ def write_training_data(outdir, rels, entity2partitions, conn): } rels = remap_relationships(conn) - # remap_entities(conn, entity2partitions) - # remap_edges(conn, rels, entity2partitions) + remap_entities(conn, entity2partitions) + remap_edges(conn, rels, entity2partitions) Path(outdir).mkdir(parents=True, exist_ok=True) out = Path(outdir) write_training_data(out, rels, entity2partitions, conn) + write_config(rels, entity2partitions, config_dir, out, modeldir) if __name__ == '__main__': From fddb146f6b4a337372e4ca85adf069c1c9ca168f Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 14 Feb 2022 11:20:41 -0500 Subject: [PATCH 09/30] Expanded readme --- torchbiggraph/examples/e2e/README.md | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/torchbiggraph/examples/e2e/README.md b/torchbiggraph/examples/e2e/README.md index f02fa2e0..9fa9504e 100644 --- a/torchbiggraph/examples/e2e/README.md +++ b/torchbiggraph/examples/e2e/README.md @@ -1,8 +1,8 @@ This is intended as a simple end-to-end example of how to get your data into the format that PyTorch BigGraph expects using SQL. It's implemented in SQLite for portability, but similar techniques scale to billions of edges using cloud -databases such as BigQuery. This pipeline can be split into three different -components: +databases such as BigQuery or SnowFlake. This pipeline can be split into three +different components: 1. Data preparation 2. Data verification/checking @@ -17,5 +17,24 @@ pedagogical purpose. In the data preparation stage, we first load the graph into a SQLite database and then we transform and partition it. The transformation -can be understood as first generating a mapping between the graph-ids and -ordinal ids per-type that PBG will expect. \ No newline at end of file +can be understood as first partitioning the entities, then generating a mapping +between the graph-ids and ordinal ids per-type that PBG will expect, and finally +writing out all the files required to train, including the config file. By +keeping track of the vertex types, we're able to specifically verify our mappings +in a fully self consistent fashion. + +Once the data has been prepared and generated, we're ready to embed the graph. We +do this by passing the generated config to `torchbiggraph_train` in the following +way: + +``` +torchbiggraph_train \ + path/to/generated/config.py +``` + +The `data_prep.py` script will also compute the approximate amount of shared memory +that will be needed for training. If the training demands are more than the +available shared memory, you'll need to regenerate your data with more partitions +than what you currently have. If you're seeing either a bus error or a OOM kill +message in the kernel ring buffer but your machine has enough ram, you'll want to +verify that `/dev/shm` is large enough to accomodate your embedding table. \ No newline at end of file From a6156b8375257d46246f426fb9b282f009df72f3 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 14 Feb 2022 11:35:25 -0500 Subject: [PATCH 10/30] Updated readme --- torchbiggraph/examples/e2e/README.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/torchbiggraph/examples/e2e/README.md b/torchbiggraph/examples/e2e/README.md index 9fa9504e..e013d0ea 100644 --- a/torchbiggraph/examples/e2e/README.md +++ b/torchbiggraph/examples/e2e/README.md @@ -1,3 +1,5 @@ +# SQL End to End Example + This is intended as a simple end-to-end example of how to get your data into the format that PyTorch BigGraph expects using SQL. It's implemented in SQLite for portability, but similar techniques scale to billions of edges using cloud @@ -37,4 +39,18 @@ that will be needed for training. If the training demands are more than the available shared memory, you'll need to regenerate your data with more partitions than what you currently have. If you're seeing either a bus error or a OOM kill message in the kernel ring buffer but your machine has enough ram, you'll want to -verify that `/dev/shm` is large enough to accomodate your embedding table. \ No newline at end of file +verify that `/dev/shm` is large enough to accomodate your embedding table. + +# Extensions + +A few changes will need to be made to use this at scale in production environment. +First, this pipeline is brittle and simplistic. For production workloads it's +probably better to use a tool like DBT or dataflow to create independent tables +in parallel. It's also important to be careful with our indices to make our joins +performant. + +When it comes time to map your buckets to hdf5 files it's almost certainly more +performant to dump them to chunked parquet/avro files and merge those files together +in parallel. Every company's compute infrastructure is going to be different +enough that this piece of code will have to be custom written. Fortunately, this +code can be written once and reused. \ No newline at end of file From a7ffad6e1a313fc3c98c16257df405a60486c460 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 14 Feb 2022 11:35:45 -0500 Subject: [PATCH 11/30] typo Typos --- torchbiggraph/examples/e2e/data_prep.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/torchbiggraph/examples/e2e/data_prep.py b/torchbiggraph/examples/e2e/data_prep.py index a4a1635c..14018564 100644 --- a/torchbiggraph/examples/e2e/data_prep.py +++ b/torchbiggraph/examples/e2e/data_prep.py @@ -259,7 +259,7 @@ def write_entities(outdir, entity2partitions, conn): for i in range(nparts): query = f""" select count(*) - from {entity_type}_ids_map_{i} + from tmp_{entity_type}_ids_map_{i} """ sz = conn.execute(query).fetchall()[0][0] with open(f'{outdir}/entity_count_{entity_type}_id_{i}.txt', mode='w') as f: @@ -281,8 +281,8 @@ def write_training_data(outdir, rels, entity2partitions, conn): if entity2partitions[r['destination_type']] > rhs_parts: rhs_parts = entity2partitions[r['destination_type']] - write_relations(outdir, rels, conn) - write_all_buckets(rels, lhs_parts, rhs_parts, conn) + # write_relations(outdir, rels, conn) + # write_all_buckets(rels, lhs_parts, rhs_parts, conn) write_entities(outdir, entity2partitions, conn) @@ -290,7 +290,7 @@ def write_rels_dict(rels): my_rels = "" for _, row in rels.sort_values(by="graph_id").iterrows(): r = "{" - r += f"'name': '{row['id']}', 'lhs': '{row['source_type']}', 'rhs': '{row['destination_type']}', 'operator': op" + r += f"'name': '{row['id']}', 'lhs': '{row['source_type']}', 'rhs': '{row['destination_type']}', 'operator': 'translation'" r += "},\n" my_rels += r return my_rels @@ -299,13 +299,14 @@ def write_rels_dict(rels): def write_entities_dict(entity2partitions): my_entities = "{\n" for name, part in entity2partitions.items(): - my_entities += '{ "{name}": {"num_partitions": {part}} }'.format(name=name, part=part) + my_entities += '{{ "{name}": {{"num_partitions": {part} }} }},\n'.format(name=name, part=part) my_entities += "}\n" return my_entities def write_config(rels, entity2partitions, config_out, train_out, model_out): - with open(config_out, mode='w') as f: + outname = Path(config_out) / 'config.py' + with open(outname, mode='w') as f: f.write( CONFIG_TEMPLATE.format( RELN_DICT=write_rels_dict(rels), @@ -332,9 +333,9 @@ def compute_memory_usage(entity2partitions, conn, NDIM=200): logging.info(f"I need {mem} GBs of ram for embedding table for {NDIM} Dimensions") -def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/', modeldir='model/', config_dir='.'): +def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/', modeldir='model/', config_dir=''): conn = sqlite3.connect("citationv2.db") - load_edges(edge_file_name, conn) + # load_edges(edge_file_name, conn) entity2partitions = { 'paper': NPARTS, @@ -342,9 +343,9 @@ def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/', modeldir } rels = remap_relationships(conn) - remap_entities(conn, entity2partitions) - remap_edges(conn, rels, entity2partitions) - Path(outdir).mkdir(parents=True, exist_ok=True) + # remap_entities(conn, entity2partitions) + # remap_edges(conn, rels, entity2partitions) + # Path(outdir).mkdir(parents=True, exist_ok=True) out = Path(outdir) write_training_data(out, rels, entity2partitions, conn) write_config(rels, entity2partitions, config_dir, out, modeldir) From 7811f13caea74e0cec8d3dc6d75d2b3761f11f57 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 14 Feb 2022 11:43:28 -0500 Subject: [PATCH 12/30] Renamed e2e dir --- .../examples/{e2e => sql_end2end}/README.md | 0 .../examples/{e2e => sql_end2end}/check.py | 0 .../examples/sql_end2end/config_template.py | 44 +++++++++++++++++++ .../{e2e => sql_end2end}/data_prep.py | 0 .../{e2e => sql_end2end}/sql_templates.py | 0 5 files changed, 44 insertions(+) rename torchbiggraph/examples/{e2e => sql_end2end}/README.md (100%) rename torchbiggraph/examples/{e2e => sql_end2end}/check.py (100%) create mode 100644 torchbiggraph/examples/sql_end2end/config_template.py rename torchbiggraph/examples/{e2e => sql_end2end}/data_prep.py (100%) rename torchbiggraph/examples/{e2e => sql_end2end}/sql_templates.py (100%) diff --git a/torchbiggraph/examples/e2e/README.md b/torchbiggraph/examples/sql_end2end/README.md similarity index 100% rename from torchbiggraph/examples/e2e/README.md rename to torchbiggraph/examples/sql_end2end/README.md diff --git a/torchbiggraph/examples/e2e/check.py b/torchbiggraph/examples/sql_end2end/check.py similarity index 100% rename from torchbiggraph/examples/e2e/check.py rename to torchbiggraph/examples/sql_end2end/check.py diff --git a/torchbiggraph/examples/sql_end2end/config_template.py b/torchbiggraph/examples/sql_end2end/config_template.py new file mode 100644 index 00000000..7a5d517c --- /dev/null +++ b/torchbiggraph/examples/sql_end2end/config_template.py @@ -0,0 +1,44 @@ +CONFIG_TEMPLATE = """ +#!/usr/bin/env python3 + +# Copyright (c) Facebook, Inc. and its affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE.txt file in the root directory of this source tree. +import torch + +def get_torchbiggraph_config(): + + config = dict( # noqa + # I/O data + entity_path="{TRAINING_DIR}", + edge_paths=[ + "{TRAINING_DIR}", + ], + checkpoint_path="{MODEL_PATH}", + # Graph structure + entities={{ + {ENTITIES_DICT} + }}, + relations=[ + {RELN_DICT} + ], + # Scoring model + dimension=200, + comparator="dot", + # Training + num_epochs=50, + num_uniform_negs=1000, + num_batch_negs=1000, + batch_size=150_000, + loss_fn="softmax", + lr=0.05, + regularization_coef=1e-3, + num_gpus=torch.cuda.device_count(), + # Evaluation during training + eval_fraction=0, # to reproduce results, we need to use all training data + ) + + return config +""" \ No newline at end of file diff --git a/torchbiggraph/examples/e2e/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py similarity index 100% rename from torchbiggraph/examples/e2e/data_prep.py rename to torchbiggraph/examples/sql_end2end/data_prep.py diff --git a/torchbiggraph/examples/e2e/sql_templates.py b/torchbiggraph/examples/sql_end2end/sql_templates.py similarity index 100% rename from torchbiggraph/examples/e2e/sql_templates.py rename to torchbiggraph/examples/sql_end2end/sql_templates.py From 087f8a29bed714e7b096a68b4ff03b6b84f941db Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 14 Feb 2022 11:57:49 -0500 Subject: [PATCH 13/30] Constants --- .../examples/sql_end2end/data_prep.py | 18 +++++++++--------- .../examples/sql_end2end/sql_templates.py | 8 ++++---- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py index 14018564..4dfcd035 100644 --- a/torchbiggraph/examples/sql_end2end/data_prep.py +++ b/torchbiggraph/examples/sql_end2end/data_prep.py @@ -9,10 +9,10 @@ from config_template import CONFIG_TEMPLATE from sql_templates import ( - edges_partitioned, - edgelist_cte_mapper, - remap_relns, - partitioned_mapped_entities, + EDGES_PARTITIONED, + EDGELIST_CTE_MAPPER, + REMAP_RELNS, + PARTITIONED_MAPPED_ENTITIES, QUERY_MAKE_ID2PART_TBL ) import sys @@ -30,8 +30,8 @@ def remap_relationships(conn): """ logging.info("Remapping relationships") start = time.time() - logging.debug(f"Running query: {remap_relns}\n") - conn.executescript(remap_relns) + logging.debug(f"Running query: {REMAP_RELNS}\n") + conn.executescript(REMAP_RELNS) query = """ select * @@ -61,7 +61,7 @@ def remap_entities(conn, entity2partitions): query = QUERY_MAKE_ID2PART_TBL.format(type=entity, nparts=npartitions) for i in range(npartitions): - query += partitioned_mapped_entities.format(type=entity, n=i) + query += PARTITIONED_MAPPED_ENTITIES.format(type=entity, n=i) logging.debug(f"Running query: {query}") conn.executescript(query) end = time.time() @@ -85,7 +85,7 @@ def generate_ctes(lhs_part, rhs_part, rels, entity2partitions): continue if not first: ctes += f", cte_{nctes} as (" - ctes += edgelist_cte_mapper.format( + ctes += EDGELIST_CTE_MAPPER.format( rel_name=r['id'], lhs_type=r['source_type'], rhs_type=r['destination_type'], @@ -133,7 +133,7 @@ def remap_edges(conn, rels, entity2partitions): for rhs_part in range(NPARTS): nctes, ctes = generate_ctes(lhs_part, rhs_part, rels, entity2partitions) subquery = generate_unions(nctes) - query += edges_partitioned.format( + query += EDGES_PARTITIONED.format( i = lhs_part, j = rhs_part, ctes=ctes, diff --git a/torchbiggraph/examples/sql_end2end/sql_templates.py b/torchbiggraph/examples/sql_end2end/sql_templates.py index 4e9f123b..092e01c8 100644 --- a/torchbiggraph/examples/sql_end2end/sql_templates.py +++ b/torchbiggraph/examples/sql_end2end/sql_templates.py @@ -13,7 +13,7 @@ ; """ -partitioned_mapped_entities = """ +PARTITIONED_MAPPED_ENTITIES = """ DROP TABLE IF EXISTS tmp_{type}_ids_map_{n} ; @@ -29,7 +29,7 @@ ; """ -remap_relns = """ +REMAP_RELNS = """ DROP TABLE IF EXISTS tmp_reln_map ; @@ -41,7 +41,7 @@ ) f """ -edgelist_cte_mapper = """ +EDGELIST_CTE_MAPPER = """ select lhs.graph_id as source_id, rel.graph_id as rel_id, rhs.graph_id as destination_id from edges g join tmp_reln_map rel on (rel.id = g.rel) @@ -58,7 +58,7 @@ where g.rel = '{rel_name}' """ -edges_partitioned = """ +EDGES_PARTITIONED = """ DROP TABLE IF EXISTS tmp_edges_{i}_{j} ; From 658c864e9d39fe856dd40a93e110872f22671f28 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 14 Feb 2022 13:59:55 -0500 Subject: [PATCH 14/30] Fixes --- torchbiggraph/examples/sql_end2end/config_template.py | 4 +--- torchbiggraph/examples/sql_end2end/data_prep.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/config_template.py b/torchbiggraph/examples/sql_end2end/config_template.py index 7a5d517c..28da5ff0 100644 --- a/torchbiggraph/examples/sql_end2end/config_template.py +++ b/torchbiggraph/examples/sql_end2end/config_template.py @@ -18,9 +18,7 @@ def get_torchbiggraph_config(): ], checkpoint_path="{MODEL_PATH}", # Graph structure - entities={{ - {ENTITIES_DICT} - }}, + entities={ENTITIES_DICT}, relations=[ {RELN_DICT} ], diff --git a/torchbiggraph/examples/sql_end2end/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py index 4dfcd035..ac166e35 100644 --- a/torchbiggraph/examples/sql_end2end/data_prep.py +++ b/torchbiggraph/examples/sql_end2end/data_prep.py @@ -262,7 +262,7 @@ def write_entities(outdir, entity2partitions, conn): from tmp_{entity_type}_ids_map_{i} """ sz = conn.execute(query).fetchall()[0][0] - with open(f'{outdir}/entity_count_{entity_type}_id_{i}.txt', mode='w') as f: + with open(f'{outdir}/entity_count_{entity_type}_{i}.txt', mode='w') as f: f.write(f"{sz}\n") end = time.time() logging.info(f"Wrote entites in {end - start}s") @@ -299,7 +299,7 @@ def write_rels_dict(rels): def write_entities_dict(entity2partitions): my_entities = "{\n" for name, part in entity2partitions.items(): - my_entities += '{{ "{name}": {{"num_partitions": {part} }} }},\n'.format(name=name, part=part) + my_entities += '\t"{name}": {{"num_partitions": {part} }},\n'.format(name=name, part=part) my_entities += "}\n" return my_entities From 22d38d573c6aa6f9d7c1beba730db0f500549763 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Mon, 14 Feb 2022 14:00:06 -0500 Subject: [PATCH 15/30] Added check.py --- torchbiggraph/examples/sql_end2end/check.py | 100 ++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/torchbiggraph/examples/sql_end2end/check.py b/torchbiggraph/examples/sql_end2end/check.py index e69de29b..7d8af7cd 100644 --- a/torchbiggraph/examples/sql_end2end/check.py +++ b/torchbiggraph/examples/sql_end2end/check.py @@ -0,0 +1,100 @@ +import argparse +import logging +import sys +import torch + +from torchbiggraph.train_cpu import ( + IterationManager, + get_num_edge_chunks, +) +from torchbiggraph.graph_storages import EDGE_STORAGES, ENTITY_STORAGES +from torchbiggraph.config import ConfigFileLoader, ConfigSchema +from torchbiggraph.types import Bucket +from torchbiggraph.util import EmbeddingHolder + +logging.basicConfig( + format='%(process)d-%(levelname)s-%(message)s', + stream = sys.stdout, + level=logging.DEBUG +) + +class Checker: + def __init__(self, config): + entity_storage = ENTITY_STORAGES.make_instance(config.entity_path) + entity_counts = {} + for entity, econf in config.entities.items(): + entity_counts[entity] = [] + for part in range(econf.num_partitions): + entity_counts[entity].append(entity_storage.load_count(entity, part)) + self.entity_counts = entity_counts + self.config = config + holder = self.holder = EmbeddingHolder(config) + + + def check_all_edges(self): + num_edge_chunks = get_num_edge_chunks(self.config) + + iteration_manager = IterationManager( + 1, + self.config.edge_paths, + num_edge_chunks, + iteration_idx=0, + ) + edge_storage = EDGE_STORAGES.make_instance(iteration_manager.edge_path) + + for _, _, edge_chunk_idx in iteration_manager: + for lhs in range(self.holder.nparts_lhs): + for rhs in range(self.holder.nparts_rhs): + cur_b = Bucket(lhs, rhs) + logging.info("Checking edge chunk: {edge_chunk_idx} for edges_{cur_b.lhs}_{cur_b.rhs}.h5") + edges = edge_storage.load_chunk_of_edges( + cur_b.lhs, + cur_b.rhs, + edge_chunk_idx, + iteration_manager.num_edge_chunks, + shared=True, + ) + + def check_edge_chunk(self, cur_b, edges): + rel_lhs_entity_counts = torch.tensor( + [self.entity_counts[r.lhs][cur_b.lhs] for r in self.config.relations] + ) + #Check LHS + edge_lhs_entity_count = rel_lhs_entity_counts[edges.rel] + + if any(edges.lhs >= edge_lhs_entity_count): + _, worst_edge_idx = (edges.lhs - edge_lhs_entity_count).max(0) + raise RuntimeError(f"edge {worst_edge_idx} has LHS entity of " + f"{edges.lhs[worst_edge_idx]} but rel " + f"{edges.rel[worst_edge_idx]} only has " + f"{edge_lhs_entity_count[worst_edge_idx]} " + "entities " + f" with r.name: {self.config.relations[edges.rel[worst_edge_idx]].name}. " + "Preprocessing bug?") + #Check RHS + rel_rhs_entity_counts = torch.tensor( + [self.entity_counts[r.rhs][cur_b.rhs] for r in self.config.relations] + ) + edge_rhs_entity_count = rel_rhs_entity_counts[edges.rel] + if any(edges.rhs >= edge_rhs_entity_count): + _, worst_edge_idx = (edges.rhs - edge_rhs_entity_count).max(0) + raise RuntimeError(f"edge {worst_edge_idx} has RHS entity of " + f"{edges.rhs[worst_edge_idx]} but rel " + f"{edges.rel[worst_edge_idx]} only has " + f"{edge_rhs_entity_count[worst_edge_idx]} " + "entities " + f" with r.name: {self.config.relations[edges.rel[worst_edge_idx]].name}. " + "Preprocessing bug?") + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("config", help="Path to config file") + parser.add_argument("-p", "--param", action="append", nargs="*") + opt = parser.parse_args() + + loader = ConfigFileLoader() + config = loader.load_config(opt.config, opt.param) + + + Checker(config).check_all_edges() + logging.info("Found no errors in the input directory") \ No newline at end of file From 0dc90325874712751b108b9cae5fdbe77b8b248e Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Tue, 15 Feb 2022 10:32:43 -0500 Subject: [PATCH 16/30] Check.py now catches errors --- torchbiggraph/examples/sql_end2end/check.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/check.py b/torchbiggraph/examples/sql_end2end/check.py index 7d8af7cd..2797b45c 100644 --- a/torchbiggraph/examples/sql_end2end/check.py +++ b/torchbiggraph/examples/sql_end2end/check.py @@ -46,7 +46,7 @@ def check_all_edges(self): for lhs in range(self.holder.nparts_lhs): for rhs in range(self.holder.nparts_rhs): cur_b = Bucket(lhs, rhs) - logging.info("Checking edge chunk: {edge_chunk_idx} for edges_{cur_b.lhs}_{cur_b.rhs}.h5") + logging.info(f"Checking edge chunk: {edge_chunk_idx} for edges_{cur_b.lhs}_{cur_b.rhs}.h5") edges = edge_storage.load_chunk_of_edges( cur_b.lhs, cur_b.rhs, @@ -54,18 +54,21 @@ def check_all_edges(self): iteration_manager.num_edge_chunks, shared=True, ) + self.check_edge_chunk(cur_b, edges) def check_edge_chunk(self, cur_b, edges): + rhs = edges.rhs.to_tensor() + lhs = edges.lhs.to_tensor() rel_lhs_entity_counts = torch.tensor( [self.entity_counts[r.lhs][cur_b.lhs] for r in self.config.relations] ) #Check LHS edge_lhs_entity_count = rel_lhs_entity_counts[edges.rel] - if any(edges.lhs >= edge_lhs_entity_count): - _, worst_edge_idx = (edges.lhs - edge_lhs_entity_count).max(0) + if any(lhs >= edge_lhs_entity_count): + _, worst_edge_idx = (lhs - edge_lhs_entity_count).max(0) raise RuntimeError(f"edge {worst_edge_idx} has LHS entity of " - f"{edges.lhs[worst_edge_idx]} but rel " + f"{lhs[worst_edge_idx]} but rel " f"{edges.rel[worst_edge_idx]} only has " f"{edge_lhs_entity_count[worst_edge_idx]} " "entities " @@ -76,10 +79,10 @@ def check_edge_chunk(self, cur_b, edges): [self.entity_counts[r.rhs][cur_b.rhs] for r in self.config.relations] ) edge_rhs_entity_count = rel_rhs_entity_counts[edges.rel] - if any(edges.rhs >= edge_rhs_entity_count): - _, worst_edge_idx = (edges.rhs - edge_rhs_entity_count).max(0) + if any(rhs >= edge_rhs_entity_count): + _, worst_edge_idx = (rhs - edge_rhs_entity_count).max(0) raise RuntimeError(f"edge {worst_edge_idx} has RHS entity of " - f"{edges.rhs[worst_edge_idx]} but rel " + f"{rhs[worst_edge_idx]} but rel " f"{edges.rel[worst_edge_idx]} only has " f"{edge_rhs_entity_count[worst_edge_idx]} " "entities " From 57d26a1a877fa29c6e198d98a9fb4421cd2aa514 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 16 Feb 2022 11:52:30 -0500 Subject: [PATCH 17/30] Added check to be a biggraph script --- setup.cfg | 1 + torchbiggraph/{examples/sql_end2end => }/check.py | 0 2 files changed, 1 insertion(+) rename torchbiggraph/{examples/sql_end2end => }/check.py (100%) diff --git a/setup.cfg b/setup.cfg index f77a285e..9849894b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,6 +53,7 @@ parquet = parquet [options.entry_points] console_scripts = + torchbiggraph_check = torchbiggraph.check:main torchbiggraph_config = torchbiggraph.config:main torchbiggraph_eval = torchbiggraph.eval:main torchbiggraph_example_fb15k = torchbiggraph.examples.fb15k:main diff --git a/torchbiggraph/examples/sql_end2end/check.py b/torchbiggraph/check.py similarity index 100% rename from torchbiggraph/examples/sql_end2end/check.py rename to torchbiggraph/check.py From d153a23473f9753fe58c6b4ba3f914297a94e1f1 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 16 Feb 2022 11:54:53 -0500 Subject: [PATCH 18/30] Reverted commented out edges --- torchbiggraph/examples/sql_end2end/data_prep.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py index ac166e35..37caa235 100644 --- a/torchbiggraph/examples/sql_end2end/data_prep.py +++ b/torchbiggraph/examples/sql_end2end/data_prep.py @@ -335,7 +335,7 @@ def compute_memory_usage(entity2partitions, conn, NDIM=200): def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/', modeldir='model/', config_dir=''): conn = sqlite3.connect("citationv2.db") - # load_edges(edge_file_name, conn) + load_edges(edge_file_name, conn) entity2partitions = { 'paper': NPARTS, @@ -343,9 +343,9 @@ def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/', modeldir } rels = remap_relationships(conn) - # remap_entities(conn, entity2partitions) - # remap_edges(conn, rels, entity2partitions) - # Path(outdir).mkdir(parents=True, exist_ok=True) + remap_entities(conn, entity2partitions) + remap_edges(conn, rels, entity2partitions) + Path(outdir).mkdir(parents=True, exist_ok=True) out = Path(outdir) write_training_data(out, rels, entity2partitions, conn) write_config(rels, entity2partitions, config_dir, out, modeldir) From 38c533e6e6fdd47298698d1895794ffcccdedda1 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 23 Feb 2022 21:04:23 -0500 Subject: [PATCH 19/30] Updated logging --- torchbiggraph/check.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/torchbiggraph/check.py b/torchbiggraph/check.py index 2797b45c..73745d1a 100644 --- a/torchbiggraph/check.py +++ b/torchbiggraph/check.py @@ -12,11 +12,7 @@ from torchbiggraph.types import Bucket from torchbiggraph.util import EmbeddingHolder -logging.basicConfig( - format='%(process)d-%(levelname)s-%(message)s', - stream = sys.stdout, - level=logging.DEBUG -) +logger = logging.getLogger("torchbiggraph") class Checker: def __init__(self, config): From dec264e007135dc579e958b602bce666c9155056 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 23 Feb 2022 21:06:40 -0500 Subject: [PATCH 20/30] more logging --- torchbiggraph/check.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/torchbiggraph/check.py b/torchbiggraph/check.py index 73745d1a..1521ff6a 100644 --- a/torchbiggraph/check.py +++ b/torchbiggraph/check.py @@ -11,6 +11,11 @@ from torchbiggraph.config import ConfigFileLoader, ConfigSchema from torchbiggraph.types import Bucket from torchbiggraph.util import EmbeddingHolder +from torchbiggraph.util import ( + set_logging_verbosity, + setup_logging, +) + logger = logging.getLogger("torchbiggraph") @@ -94,6 +99,8 @@ def check_edge_chunk(self, cur_b, edges): loader = ConfigFileLoader() config = loader.load_config(opt.config, opt.param) + set_logging_verbosity(config.verbose) + setup_logging(config.verbose) Checker(config).check_all_edges() logging.info("Found no errors in the input directory") \ No newline at end of file From e33b659a2127a9293b9f1f64626b8fece98968db Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 23 Feb 2022 21:37:22 -0500 Subject: [PATCH 21/30] Added the check script to readme --- README.md | 13 +++++++++++++ torchbiggraph/check.py | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4053d107..aed76355 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,19 @@ The outputs will be stored next to the inputs in the `data/FB15k` directory. This simple utility is only suitable for small graphs that fit entirely in memory. To handle larger data one will have to implement their own custom preprocessor. +### Checking the data + +It is advised that you check your using PBG's checking script. It catches common errors that our developers have run into before, +and can be hard to debug. This command is run by invoking the following: + +```bash +torchbiggraph_check \ + --lhs-col=0 --rel-col=1 --rhs-col=2 \ + torchbiggraph/examples/configs/fb15k_config_cpu.py +``` + +This command will throw run-time errors with informative messages if it encounters problems. You will see ''Found no errors in the input'' logged if there are no errors. + ### Training The `torchbiggraph_train` command is used to launch training. The training parameters are tucked away in a configuration file, whose path is given to the command. They can however be overridden from the command line with the `--param` flag. The sample config is used for both training and evaluation, so we will have to use the override to specify the edge set to use. diff --git a/torchbiggraph/check.py b/torchbiggraph/check.py index 1521ff6a..30b18674 100644 --- a/torchbiggraph/check.py +++ b/torchbiggraph/check.py @@ -103,4 +103,4 @@ def check_edge_chunk(self, cur_b, edges): setup_logging(config.verbose) Checker(config).check_all_edges() - logging.info("Found no errors in the input directory") \ No newline at end of file + logging.info("Found no errors in the input") \ No newline at end of file From 1c8c1e51d02f3ae84310ca6c5263aa8523e1ebc8 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Wed, 23 Feb 2022 22:15:11 -0500 Subject: [PATCH 22/30] Added command line args --- .../examples/sql_end2end/data_prep.py | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py index 37caa235..7e00857a 100644 --- a/torchbiggraph/examples/sql_end2end/data_prep.py +++ b/torchbiggraph/examples/sql_end2end/data_prep.py @@ -1,3 +1,4 @@ +import argparse import itertools import h5py import json @@ -128,9 +129,9 @@ def remap_edges(conn, rels, entity2partitions): """).fetchall()[0][0] query = "" - NPARTS = max(entity2partitions.values()) - for lhs_part in range(NPARTS): - for rhs_part in range(NPARTS): + nparts = max(entity2partitions.values()) + for lhs_part in range(nparts): + for rhs_part in range(nparts): nctes, ctes = generate_ctes(lhs_part, rhs_part, rels, entity2partitions) subquery = generate_unions(nctes) query += EDGES_PARTITIONED.format( @@ -145,8 +146,8 @@ def remap_edges(conn, rels, entity2partitions): logging.debug("Confirming that we didn't drop any edges.") nentities_postmap = 0 - for lhs_part in range(NPARTS): - for rhs_part in range(NPARTS): + for lhs_part in range(nparts): + for rhs_part in range(nparts): nentities_postmap += conn.execute(f""" select count(*) from tmp_edges_{lhs_part}_{rhs_part} """).fetchall()[0][0] @@ -333,12 +334,12 @@ def compute_memory_usage(entity2partitions, conn, NDIM=200): logging.info(f"I need {mem} GBs of ram for embedding table for {NDIM} Dimensions") -def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/', modeldir='model/', config_dir=''): +def main(nparts=2, edge_file_name='edges.csv', outdir='training_data/', modeldir='model/', config_dir=''): conn = sqlite3.connect("citationv2.db") load_edges(edge_file_name, conn) entity2partitions = { - 'paper': NPARTS, + 'paper': nparts, 'year': 1, } @@ -352,4 +353,18 @@ def main(NPARTS=2, edge_file_name='edges.csv', outdir='training_data/', modeldir if __name__ == '__main__': - main() \ No newline at end of file + parser = argparse.ArgumentParser() + parser.add_argument("npart", help="The number of partitions to split the paper_ids into") + parser.add_argument("e", help="The edges file to load in") + parser.add_argument("o", help="The directory where the training data should be stored") + parser.add_argument("m", help="The directory where the model artifacts should be stored") + parser.add_argument("c", help="The location where the generated config file will be stored") + opt = parser.parse_args() + + main( + nparts=opt.npart, + edge_file_name=opt.e, + outdir=opt.o, + modeldir=opt.m, + config_dir=opt.c + ) \ No newline at end of file From 8ace8bf658466da95a5765b66c7f4eb7f23c7de8 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Fri, 1 Apr 2022 13:51:19 -0400 Subject: [PATCH 23/30] Documentation fix --- README.md | 4 +--- torchbiggraph/examples/sql_end2end/README.md | 10 +++++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index aed76355..b8da2c03 100644 --- a/README.md +++ b/README.md @@ -113,12 +113,10 @@ This simple utility is only suitable for small graphs that fit entirely in memor ### Checking the data -It is advised that you check your using PBG's checking script. It catches common errors that our developers have run into before, -and can be hard to debug. This command is run by invoking the following: +It is advised that you check the edgelist files using PBG's checking script. It catches common errors that our developers have run into before that can be hard to debug. This command is run by invoking the following: ```bash torchbiggraph_check \ - --lhs-col=0 --rel-col=1 --rhs-col=2 \ torchbiggraph/examples/configs/fb15k_config_cpu.py ``` diff --git a/torchbiggraph/examples/sql_end2end/README.md b/torchbiggraph/examples/sql_end2end/README.md index e013d0ea..75809f3b 100644 --- a/torchbiggraph/examples/sql_end2end/README.md +++ b/torchbiggraph/examples/sql_end2end/README.md @@ -1,10 +1,10 @@ # SQL End to End Example -This is intended as a simple end-to-end example of how to get your data into -the format that PyTorch BigGraph expects using SQL. It's implemented in SQLite -for portability, but similar techniques scale to billions of edges using cloud -databases such as BigQuery or SnowFlake. This pipeline can be split into three -different components: +This is intended as a simple end-to-end example of how to get your a SQL edgelist +table into the format that PyTorch BigGraph expects using SQL queries. It's +implemented in SQLite for portability, but similar techniques scale to billions +of edges using cloud databases such as BigQuery or SnowFlake. This pipeline +can be split into three different components: 1. Data preparation 2. Data verification/checking From 31982393112a0c3231d9ec23b84102416c0be2ef Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Fri, 1 Apr 2022 13:51:30 -0400 Subject: [PATCH 24/30] Updated the check script --- torchbiggraph/check.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/torchbiggraph/check.py b/torchbiggraph/check.py index 30b18674..0130513b 100644 --- a/torchbiggraph/check.py +++ b/torchbiggraph/check.py @@ -91,7 +91,11 @@ def check_edge_chunk(self, cur_b, edges): "Preprocessing bug?") if __name__ == '__main__': - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser("""Script to check for user errors in a PBG input config and data. + +This script checks that each entity index is within range for the entity type specified by the config relation. +Preprocessing or config bugs can break this assumption, and may lead to errors or crashes during training. +""") parser.add_argument("config", help="Path to config file") parser.add_argument("-p", "--param", action="append", nargs="*") opt = parser.parse_args() From 339ba94aa0f8fddeba7ec1e1c314e5100051d9eb Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Fri, 1 Apr 2022 13:51:50 -0400 Subject: [PATCH 25/30] PR Comments and typing --- .../examples/sql_end2end/data_prep.py | 187 +++++++++++------- 1 file changed, 117 insertions(+), 70 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py index 7e00857a..fb96869e 100644 --- a/torchbiggraph/examples/sql_end2end/data_prep.py +++ b/torchbiggraph/examples/sql_end2end/data_prep.py @@ -1,4 +1,5 @@ import argparse +import copy import itertools import h5py import json @@ -7,6 +8,8 @@ import pandas as pd from pathlib import Path import sqlite3 +from sqlite3 import Connection +from typing import Dict, Tuple from config_template import CONFIG_TEMPLATE from sql_templates import ( @@ -19,13 +22,36 @@ import sys import time +DEFAULT_CFG = dict( + # Scoring model + dimension=200, + comparator="dot", + loss_fn="softmax", + # Training + num_epochs=50, + num_uniform_negs=1000, + num_batch_negs=1000, + batch_size=150_000, + lr=0.05, + regularization_coef=1e-3, + num_gpus=2, + eval_fraction=0, + # io + checkpoint_path='model', + entity_path='training_data', + edge_paths=[ + 'training_data' + ], +) + + logging.basicConfig( format='%(process)d-%(levelname)s-%(message)s', stream = sys.stdout, level=logging.DEBUG ) -def remap_relationships(conn): +def remap_relationships(conn: Connection): """ A function to remap relationships using SQL queries. """ @@ -45,7 +71,7 @@ def remap_relationships(conn): return rels -def remap_entities(conn, entity2partitions): +def remap_entities(conn: Connection, entity2partitions: Dict[str, Dict[str, int]]): """ A function to remap entities with partitioning using SQL queries. @@ -59,9 +85,9 @@ def remap_entities(conn, entity2partitions): start=time.time() query = "" for entity, npartitions in entity2partitions.items(): - query = QUERY_MAKE_ID2PART_TBL.format(type=entity, nparts=npartitions) + query = QUERY_MAKE_ID2PART_TBL.format(type=entity, nparts=npartitions['num_partitions']) - for i in range(npartitions): + for i in range(npartitions['num_partitions']): query += PARTITIONED_MAPPED_ENTITIES.format(type=entity, n=i) logging.debug(f"Running query: {query}") conn.executescript(query) @@ -69,10 +95,10 @@ def remap_entities(conn, entity2partitions): logging.info(f"Remapped entities in {end - start}s") -def generate_ctes(lhs_part, rhs_part, rels, entity2partitions): +def generate_ctes(lhs_part: int, rhs_part: int, rels: int, entity2partitions: Dict[str, Dict[str, int]]): """ - This function generates the sub-table CTEs that help us generate - the completed edgelist. + This function generates the sub-table Common Table Expressions (CTES) + that help us generate the completed edgelist. """ nctes = 0 ctes = """ @@ -80,9 +106,9 @@ def generate_ctes(lhs_part, rhs_part, rels, entity2partitions): """ first = True for _ , r in rels.iterrows(): - if lhs_part >= entity2partitions[r['source_type']]: + if lhs_part >= entity2partitions[r['source_type']]['num_partitions']: continue - if rhs_part >= entity2partitions[r['destination_type']]: + if rhs_part >= entity2partitions[r['destination_type']]['num_partitions']: continue if not first: ctes += f", cte_{nctes} as (" @@ -100,7 +126,7 @@ def generate_ctes(lhs_part, rhs_part, rels, entity2partitions): return nctes, ctes -def generate_unions(nctes): +def generate_unions(nctes: int): """ This function is just a helper function for generating the final edge list tables. @@ -115,7 +141,7 @@ def generate_unions(nctes): return subquery -def remap_edges(conn, rels, entity2partitions): +def remap_edges(conn: Connection, rels: pd.DataFrame, entity2partitions: Dict[str, Dict[str, int]]): """ A function to remap all edges to ordinal IDs according to their type. @@ -129,7 +155,7 @@ def remap_edges(conn, rels, entity2partitions): """).fetchall()[0][0] query = "" - nparts = max(entity2partitions.values()) + nparts = max([n['num_partitions'] for _, n in entity2partitions.items()]) for lhs_part in range(nparts): for rhs_part in range(nparts): nctes, ctes = generate_ctes(lhs_part, rhs_part, rels, entity2partitions) @@ -160,7 +186,7 @@ def remap_edges(conn, rels, entity2partitions): logging.info(f"Remapped edges in {end - start}s") -def load_edges(fname, conn): +def load_edges(fname: str, conn: Connection): """ A simple function to load the edges into the SQL table. It is assumed that we will have a file of the form: @@ -190,13 +216,15 @@ def load_edges(fname, conn): logging.info(f"Loading edges in {end - start}s") -def write_relations(outdir, rels, conn): +def write_relations(outdir: Path, rels: pd.DataFrame, conn: Connection): """ A simple function to write the relevant relationship information out for training. """ logging.info("Writing relations for training") start = time.time() + + outdir.mkdir(parents=True, exists_ok=True) out = rels.sort_values('graph_id')['id'].to_list() with open(f'{outdir}/dynamic_rel_names.json', mode='w') as f: json.dump(out, f, indent=4) @@ -204,7 +232,7 @@ def write_relations(outdir, rels, conn): logging.info(f"Wrote relations in {end - start}s") -def write_single_bucket(work_packet): +def write_single_bucket(work_packet: Tuple[int, int, Path, Connection]): """ A function to write out a single edge-lists in the format that PyTorch BigGraph expects. @@ -231,7 +259,7 @@ def write_single_bucket(work_packet): f[dset][0 : len(df)] = df[colname].tolist() -def write_all_buckets(outdir, lhs_parts, rhs_parts, conn): +def write_all_buckets(outdir: Path, lhs_parts: int, rhs_parts: int, conn: Connection): """ A function to write out all edge-lists in the format that PyTorch BigGraph expects. @@ -249,15 +277,19 @@ def write_all_buckets(outdir, lhs_parts, rhs_parts, conn): logging.info(f"Wrote edges in {end - start}s") -def write_entities(outdir, entity2partitions, conn): +def write_entities( + outdir: Path, + entity2partitions: Dict[str, Dict[str, int]], + conn: Connection): """ A function to write out all of the training relevant entity information that PyTorch BigGraph expects """ logging.info("Writing entites for training") start = time.time() + for entity_type, nparts in entity2partitions.items(): - for i in range(nparts): + for i in range(nparts['num_partitions']): query = f""" select count(*) from tmp_{entity_type}_ids_map_{i} @@ -269,60 +301,64 @@ def write_entities(outdir, entity2partitions, conn): logging.info(f"Wrote entites in {end - start}s") -def write_training_data(outdir, rels, entity2partitions, conn): +def write_training_data( + outdir: Path, + rels: pd.DataFrame, + entity2partitions: Dict[str, Dict[str, int]], + conn: Connection + ): """ A function to write out all of the training relevant information that PyTorch BigGraph expects """ lhs_parts = 1 rhs_parts = 1 - for i, r in rels.iterrows(): - if entity2partitions[r['source_type']] > lhs_parts: + for _, r in rels.iterrows(): + if entity2partitions[r['source_type']]['num_partitions'] > lhs_parts: lhs_parts = entity2partitions[r['source_type']] - if entity2partitions[r['destination_type']] > rhs_parts: + if entity2partitions[r['destination_type']]['num_partitions'] > rhs_parts: rhs_parts = entity2partitions[r['destination_type']] - # write_relations(outdir, rels, conn) - # write_all_buckets(rels, lhs_parts, rhs_parts, conn) + write_relations(outdir, rels, conn) + write_all_buckets(rels, lhs_parts, rhs_parts, conn) write_entities(outdir, entity2partitions, conn) -def write_rels_dict(rels): - my_rels = "" - for _, row in rels.sort_values(by="graph_id").iterrows(): - r = "{" - r += f"'name': '{row['id']}', 'lhs': '{row['source_type']}', 'rhs': '{row['destination_type']}', 'operator': 'translation'" - r += "},\n" - my_rels += r - return my_rels - +def write_config( + rels: pd.DataFrame, + entity2partitions: Dict[str, Dict[str, int]], + config_out: Path, + train_out: Path, + model_out: Path, + ndim: int = 200, + ngpus:int = 2 + ): + config_out.mkdir(parents=True, exists_ok=True) + outname = config_out / 'config.py' + rels['operator'] = 'translation' + rels = rels.rename({'id': 'name', 'source_type': 'lhs', 'destination_type': 'rhs'}, axis=1) + + cfg = copy.deepcopy(DEFAULT_CFG) + cfg['edge_paths'] = [ train_out ] + cfg['entity_path'] = train_out + cfg['checkpoint_path'] = model_out + cfg['entites'] = entity2partitions + cfg['relations'] = rels[['name', 'lhs', 'rhs', 'operator']].to_dict(orient='records') -def write_entities_dict(entity2partitions): - my_entities = "{\n" - for name, part in entity2partitions.items(): - my_entities += '\t"{name}": {{"num_partitions": {part} }},\n'.format(name=name, part=part) - my_entities += "}\n" - return my_entities - - -def write_config(rels, entity2partitions, config_out, train_out, model_out): - outname = Path(config_out) / 'config.py' with open(outname, mode='w') as f: f.write( - CONFIG_TEMPLATE.format( - RELN_DICT=write_rels_dict(rels), - ENTITIES_DICT=write_entities_dict(entity2partitions), - TRAINING_DIR=train_out, - MODEL_PATH=model_out, - ) + f"def get_torchbiggraph_config():\n\treturn {json.dumps(cfg, indent=4)}\n" ) -def compute_memory_usage(entity2partitions, conn, NDIM=200): +def compute_memory_usage( + entity2partitions: Dict[str, Dict[str, int]], + conn: Connection, + ndim: int): nentities = 0 for _type, parts in entity2partitions.items(): ntype = 0 - for i in range(parts): + for i in range(parts['num_partitions']): query = f""" select count(*) as cnt from `tmp_{_type}_ids_map_{i}` @@ -330,41 +366,52 @@ def compute_memory_usage(entity2partitions, conn, NDIM=200): ntype = max(ntype, conn.executequery(query).fetchall()[0][0]) nentities += ntype - mem = 1.5 * nentities * NDIM * 8 / 1024 / 1024 / 1024 - logging.info(f"I need {mem} GBs of ram for embedding table for {NDIM} Dimensions") + # 1.2 here is an empirical safety factor. + mem = 1.2 * nentities * ndim * 8 / 1024 / 1024 / 1024 + logging.info(f"I need {mem} GBs of ram for embedding table for {ndim} Dimensions") -def main(nparts=2, edge_file_name='edges.csv', outdir='training_data/', modeldir='model/', config_dir=''): - conn = sqlite3.connect("citationv2.db") - load_edges(edge_file_name, conn) +def main( + nparts: int = 1, + edge_file_name: str = 'edges.csv', + outdir: Path = Path('training_data/'), + modeldir: Path = Path('model/'), + config_dir: Path = Path(''), + dbname: str = 'citationv2.db') -> None: + conn = sqlite3.connect(dbname) + # load_edges(edge_file_name, conn) entity2partitions = { - 'paper': nparts, - 'year': 1, + 'paper': {'num_partitions': nparts}, + 'year': {'num_partitions': 1}, } rels = remap_relationships(conn) remap_entities(conn, entity2partitions) remap_edges(conn, rels, entity2partitions) - Path(outdir).mkdir(parents=True, exist_ok=True) - out = Path(outdir) - write_training_data(out, rels, entity2partitions, conn) - write_config(rels, entity2partitions, config_dir, out, modeldir) + + outdir.mkdir(parents=True, exists_ok=True) + config_dir.mkdir(parents=True, exists_ok=True) + modeldir.mkdir(parents=True, exists_ok=True) + + write_training_data(outdir, rels, entity2partitions, conn) + write_config(rels, entity2partitions, config_dir, outdir, modeldir) + compute_memory_usage(entity2partitions, conn, 200) if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument("npart", help="The number of partitions to split the paper_ids into") - parser.add_argument("e", help="The edges file to load in") - parser.add_argument("o", help="The directory where the training data should be stored") - parser.add_argument("m", help="The directory where the model artifacts should be stored") - parser.add_argument("c", help="The location where the generated config file will be stored") + parser.add_argument("-npart", help="The number of partitions to split the paper_ids into", type=int) + parser.add_argument("-e", help="The edges file to load in") + parser.add_argument("-o", help="The directory where the training data should be stored", default='training_data/', required=False) + parser.add_argument("-m", help="The directory where the model artifacts should be stored", default='model', required=False) + parser.add_argument("-c", help="The location where the generated config file will be stored", default='.', required=False) opt = parser.parse_args() main( nparts=opt.npart, edge_file_name=opt.e, - outdir=opt.o, - modeldir=opt.m, - config_dir=opt.c + outdir=Path(opt.o), + modeldir=Path(opt.m), + config_dir=Path(opt.c), ) \ No newline at end of file From cb5c7d0e69a686a051dd1efbd2b1e5bc6f2dd400 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Fri, 1 Apr 2022 13:55:37 -0400 Subject: [PATCH 26/30] Added return types --- .../examples/sql_end2end/data_prep.py | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py index fb96869e..a037fc53 100644 --- a/torchbiggraph/examples/sql_end2end/data_prep.py +++ b/torchbiggraph/examples/sql_end2end/data_prep.py @@ -51,7 +51,7 @@ level=logging.DEBUG ) -def remap_relationships(conn: Connection): +def remap_relationships(conn: Connection) -> pd.DataFrame: """ A function to remap relationships using SQL queries. """ @@ -71,7 +71,7 @@ def remap_relationships(conn: Connection): return rels -def remap_entities(conn: Connection, entity2partitions: Dict[str, Dict[str, int]]): +def remap_entities(conn: Connection, entity2partitions: Dict[str, Dict[str, int]]) -> None: """ A function to remap entities with partitioning using SQL queries. @@ -95,7 +95,7 @@ def remap_entities(conn: Connection, entity2partitions: Dict[str, Dict[str, int logging.info(f"Remapped entities in {end - start}s") -def generate_ctes(lhs_part: int, rhs_part: int, rels: int, entity2partitions: Dict[str, Dict[str, int]]): +def generate_ctes(lhs_part: int, rhs_part: int, rels: int, entity2partitions: Dict[str, Dict[str, int]]) -> Tuple[int, str]: """ This function generates the sub-table Common Table Expressions (CTES) that help us generate the completed edgelist. @@ -126,7 +126,7 @@ def generate_ctes(lhs_part: int, rhs_part: int, rels: int, entity2partitions: D return nctes, ctes -def generate_unions(nctes: int): +def generate_unions(nctes: int) -> str: """ This function is just a helper function for generating the final edge list tables. @@ -141,7 +141,7 @@ def generate_unions(nctes: int): return subquery -def remap_edges(conn: Connection, rels: pd.DataFrame, entity2partitions: Dict[str, Dict[str, int]]): +def remap_edges(conn: Connection, rels: pd.DataFrame, entity2partitions: Dict[str, Dict[str, int]]) -> None: """ A function to remap all edges to ordinal IDs according to their type. @@ -186,7 +186,7 @@ def remap_edges(conn: Connection, rels: pd.DataFrame, entity2partitions: Dict[s logging.info(f"Remapped edges in {end - start}s") -def load_edges(fname: str, conn: Connection): +def load_edges(fname: str, conn: Connection) -> None: """ A simple function to load the edges into the SQL table. It is assumed that we will have a file of the form: @@ -216,7 +216,7 @@ def load_edges(fname: str, conn: Connection): logging.info(f"Loading edges in {end - start}s") -def write_relations(outdir: Path, rels: pd.DataFrame, conn: Connection): +def write_relations(outdir: Path, rels: pd.DataFrame, conn: Connection) -> None: """ A simple function to write the relevant relationship information out for training. @@ -232,7 +232,7 @@ def write_relations(outdir: Path, rels: pd.DataFrame, conn: Connection): logging.info(f"Wrote relations in {end - start}s") -def write_single_bucket(work_packet: Tuple[int, int, Path, Connection]): +def write_single_bucket(work_packet: Tuple[int, int, Path, Connection]) -> None: """ A function to write out a single edge-lists in the format that PyTorch BigGraph expects. @@ -259,7 +259,7 @@ def write_single_bucket(work_packet: Tuple[int, int, Path, Connection]): f[dset][0 : len(df)] = df[colname].tolist() -def write_all_buckets(outdir: Path, lhs_parts: int, rhs_parts: int, conn: Connection): +def write_all_buckets(outdir: Path, lhs_parts: int, rhs_parts: int, conn: Connection) -> None: """ A function to write out all edge-lists in the format that PyTorch BigGraph expects. @@ -280,7 +280,7 @@ def write_all_buckets(outdir: Path, lhs_parts: int, rhs_parts: int, conn: Connec def write_entities( outdir: Path, entity2partitions: Dict[str, Dict[str, int]], - conn: Connection): + conn: Connection) -> None: """ A function to write out all of the training relevant entity information that PyTorch BigGraph expects @@ -306,7 +306,7 @@ def write_training_data( rels: pd.DataFrame, entity2partitions: Dict[str, Dict[str, int]], conn: Connection - ): + ) -> None: """ A function to write out all of the training relevant information that PyTorch BigGraph expects @@ -332,7 +332,7 @@ def write_config( model_out: Path, ndim: int = 200, ngpus:int = 2 - ): + ) -> None: config_out.mkdir(parents=True, exists_ok=True) outname = config_out / 'config.py' rels['operator'] = 'translation' @@ -354,7 +354,7 @@ def write_config( def compute_memory_usage( entity2partitions: Dict[str, Dict[str, int]], conn: Connection, - ndim: int): + ndim: int) -> None: nentities = 0 for _type, parts in entity2partitions.items(): ntype = 0 @@ -387,8 +387,8 @@ def main( } rels = remap_relationships(conn) - remap_entities(conn, entity2partitions) - remap_edges(conn, rels, entity2partitions) + # remap_entities(conn, entity2partitions) + # remap_edges(conn, rels, entity2partitions) outdir.mkdir(parents=True, exists_ok=True) config_dir.mkdir(parents=True, exists_ok=True) From 1dd8492cd6fe462fe202ff3920b2650daa472a27 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Fri, 1 Apr 2022 14:18:36 -0400 Subject: [PATCH 27/30] Little more cleanup --- .../examples/sql_end2end/data_prep.py | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py index a037fc53..021ef2db 100644 --- a/torchbiggraph/examples/sql_end2end/data_prep.py +++ b/torchbiggraph/examples/sql_end2end/data_prep.py @@ -224,7 +224,6 @@ def write_relations(outdir: Path, rels: pd.DataFrame, conn: Connection) -> None: logging.info("Writing relations for training") start = time.time() - outdir.mkdir(parents=True, exists_ok=True) out = rels.sort_values('graph_id')['id'].to_list() with open(f'{outdir}/dynamic_rel_names.json', mode='w') as f: json.dump(out, f, indent=4) @@ -269,7 +268,7 @@ def write_all_buckets(outdir: Path, lhs_parts: int, rhs_parts: int, conn: Connec # I would write these using multiprocessing but SQLite connections # aren't pickelable, and I'd like to keep this simple - worklist = list(itertools.product(range(lhs_parts), range(rhs_parts), ['training_data'], [conn])) + worklist = list(itertools.product(range(lhs_parts), range(rhs_parts), [outdir], [conn])) for w in worklist: write_single_bucket(w) @@ -315,12 +314,15 @@ def write_training_data( rhs_parts = 1 for _, r in rels.iterrows(): if entity2partitions[r['source_type']]['num_partitions'] > lhs_parts: - lhs_parts = entity2partitions[r['source_type']] + lhs_parts = entity2partitions[r['source_type']]['num_partitions'] if entity2partitions[r['destination_type']]['num_partitions'] > rhs_parts: - rhs_parts = entity2partitions[r['destination_type']] + rhs_parts = entity2partitions[r['destination_type']]['num_partitions'] + + print("LHS_PARTS: ", lhs_parts) + print("RHS_PARTS: ", rhs_parts) write_relations(outdir, rels, conn) - write_all_buckets(rels, lhs_parts, rhs_parts, conn) + write_all_buckets(outdir, lhs_parts, rhs_parts, conn) write_entities(outdir, entity2partitions, conn) @@ -333,15 +335,14 @@ def write_config( ndim: int = 200, ngpus:int = 2 ) -> None: - config_out.mkdir(parents=True, exists_ok=True) outname = config_out / 'config.py' rels['operator'] = 'translation' rels = rels.rename({'id': 'name', 'source_type': 'lhs', 'destination_type': 'rhs'}, axis=1) cfg = copy.deepcopy(DEFAULT_CFG) - cfg['edge_paths'] = [ train_out ] - cfg['entity_path'] = train_out - cfg['checkpoint_path'] = model_out + cfg['edge_paths'] = [ train_out.as_posix() ] + cfg['entity_path'] = train_out.as_posix() + cfg['checkpoint_path'] = model_out.as_posix() cfg['entites'] = entity2partitions cfg['relations'] = rels[['name', 'lhs', 'rhs', 'operator']].to_dict(orient='records') @@ -363,7 +364,9 @@ def compute_memory_usage( select count(*) as cnt from `tmp_{_type}_ids_map_{i}` """ - ntype = max(ntype, conn.executequery(query).fetchall()[0][0]) + res = pd.read_sql_query(query, conn) + res = conn.execute(query).fetchall() + ntype = max(ntype, res[0][0]) nentities += ntype # 1.2 here is an empirical safety factor. @@ -390,11 +393,11 @@ def main( # remap_entities(conn, entity2partitions) # remap_edges(conn, rels, entity2partitions) - outdir.mkdir(parents=True, exists_ok=True) - config_dir.mkdir(parents=True, exists_ok=True) - modeldir.mkdir(parents=True, exists_ok=True) + outdir.mkdir(parents=True, exist_ok=True) + config_dir.mkdir(parents=True, exist_ok=True) + modeldir.mkdir(parents=True, exist_ok=True) - write_training_data(outdir, rels, entity2partitions, conn) + # write_training_data(outdir, rels, entity2partitions, conn) write_config(rels, entity2partitions, config_dir, outdir, modeldir) compute_memory_usage(entity2partitions, conn, 200) @@ -403,9 +406,9 @@ def main( parser = argparse.ArgumentParser() parser.add_argument("-npart", help="The number of partitions to split the paper_ids into", type=int) parser.add_argument("-e", help="The edges file to load in") - parser.add_argument("-o", help="The directory where the training data should be stored", default='training_data/', required=False) - parser.add_argument("-m", help="The directory where the model artifacts should be stored", default='model', required=False) - parser.add_argument("-c", help="The location where the generated config file will be stored", default='.', required=False) + parser.add_argument("-o", help="The directory where the training data should be stored", required=False) + parser.add_argument("-m", help="The directory where the model artifacts should be stored", required=False) + parser.add_argument("-c", help="The location where the generated config file will be stored", required=False) opt = parser.parse_args() main( From d76c78c5f7c3f1b769452a8c8d86d588a70a3de6 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Fri, 1 Apr 2022 14:21:53 -0400 Subject: [PATCH 28/30] Uncommented --- torchbiggraph/examples/sql_end2end/data_prep.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py index 021ef2db..a5985023 100644 --- a/torchbiggraph/examples/sql_end2end/data_prep.py +++ b/torchbiggraph/examples/sql_end2end/data_prep.py @@ -382,7 +382,7 @@ def main( config_dir: Path = Path(''), dbname: str = 'citationv2.db') -> None: conn = sqlite3.connect(dbname) - # load_edges(edge_file_name, conn) + load_edges(edge_file_name, conn) entity2partitions = { 'paper': {'num_partitions': nparts}, @@ -390,14 +390,14 @@ def main( } rels = remap_relationships(conn) - # remap_entities(conn, entity2partitions) - # remap_edges(conn, rels, entity2partitions) + remap_entities(conn, entity2partitions) + remap_edges(conn, rels, entity2partitions) outdir.mkdir(parents=True, exist_ok=True) config_dir.mkdir(parents=True, exist_ok=True) modeldir.mkdir(parents=True, exist_ok=True) - # write_training_data(outdir, rels, entity2partitions, conn) + write_training_data(outdir, rels, entity2partitions, conn) write_config(rels, entity2partitions, config_dir, outdir, modeldir) compute_memory_usage(entity2partitions, conn, 200) From 729043c68259500a690cf42983b9feb04d80ce86 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Fri, 1 Apr 2022 14:25:24 -0400 Subject: [PATCH 29/30] removed the config template --- .../examples/sql_end2end/config_template.py | 42 ------------------- 1 file changed, 42 deletions(-) delete mode 100644 torchbiggraph/examples/sql_end2end/config_template.py diff --git a/torchbiggraph/examples/sql_end2end/config_template.py b/torchbiggraph/examples/sql_end2end/config_template.py deleted file mode 100644 index 28da5ff0..00000000 --- a/torchbiggraph/examples/sql_end2end/config_template.py +++ /dev/null @@ -1,42 +0,0 @@ -CONFIG_TEMPLATE = """ -#!/usr/bin/env python3 - -# Copyright (c) Facebook, Inc. and its affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE.txt file in the root directory of this source tree. -import torch - -def get_torchbiggraph_config(): - - config = dict( # noqa - # I/O data - entity_path="{TRAINING_DIR}", - edge_paths=[ - "{TRAINING_DIR}", - ], - checkpoint_path="{MODEL_PATH}", - # Graph structure - entities={ENTITIES_DICT}, - relations=[ - {RELN_DICT} - ], - # Scoring model - dimension=200, - comparator="dot", - # Training - num_epochs=50, - num_uniform_negs=1000, - num_batch_negs=1000, - batch_size=150_000, - loss_fn="softmax", - lr=0.05, - regularization_coef=1e-3, - num_gpus=torch.cuda.device_count(), - # Evaluation during training - eval_fraction=0, # to reproduce results, we need to use all training data - ) - - return config -""" \ No newline at end of file From 291a0f1cc007ab45815fcf9beb53f7af9fe9fe22 Mon Sep 17 00:00:00 2001 From: Thomas Markovich Date: Fri, 1 Apr 2022 14:33:13 -0400 Subject: [PATCH 30/30] Switched to cosine + ranking --- torchbiggraph/examples/sql_end2end/data_prep.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/torchbiggraph/examples/sql_end2end/data_prep.py b/torchbiggraph/examples/sql_end2end/data_prep.py index a5985023..d2bce313 100644 --- a/torchbiggraph/examples/sql_end2end/data_prep.py +++ b/torchbiggraph/examples/sql_end2end/data_prep.py @@ -25,8 +25,8 @@ DEFAULT_CFG = dict( # Scoring model dimension=200, - comparator="dot", - loss_fn="softmax", + comparator="cosine", + loss_fn="ranking", # Training num_epochs=50, num_uniform_negs=1000,