diff --git a/copy-openalex-csv copy.sql b/copy-openalex-csv copy.sql new file mode 100644 index 0000000..086ab00 --- /dev/null +++ b/copy-openalex-csv copy.sql @@ -0,0 +1,53 @@ +--authors + +\copy openalex.authors (id, orcid, display_name, display_name_alternatives, works_count, cited_by_count, last_known_institution, works_api_url, updated_date) from program 'gunzip -c /app/csv-files/authors.csv.gz' csv header +\copy openalex.authors_ids (author_id, openalex, orcid, scopus, twitter, wikipedia, mag) from program 'gunzip -c /app/csv-files/authors_ids.csv.gz' csv header +\copy openalex.authors_counts_by_year (author_id, year, works_count, cited_by_count, oa_works_count) from program 'gunzip -c /app/csv-files/authors_counts_by_year.csv.gz' csv header + +-- topics + +\copy openalex.topics (id, display_name, subfield_id, subfield_display_name, field_id, field_display_name, domain_id, domain_display_name, description, keywords, works_api_url, wikipedia_id, works_count, cited_by_count, updated_date) from program 'gunzip -c /app/csv-files/topics.csv.gz' csv header + +--concepts + +\copy openalex.concepts (id, wikidata, display_name, level, description, works_count, cited_by_count, image_url, image_thumbnail_url, works_api_url, updated_date) from program 'gunzip -c /app/csv-files/concepts.csv.gz' csv header +\copy openalex.concepts_ancestors (concept_id, ancestor_id) from program 'gunzip -c /app/csv-files/concepts_ancestors.csv.gz' csv header +\copy openalex.concepts_counts_by_year (concept_id, year, works_count, cited_by_count, oa_works_count) from program 'gunzip -c /app/csv-files/concepts_counts_by_year.csv.gz' csv header +\copy openalex.concepts_ids (concept_id, openalex, wikidata, wikipedia, umls_aui, umls_cui, mag) from program 'gunzip -c /app/csv-files/concepts_ids.csv.gz' csv header +\copy openalex.concepts_related_concepts (concept_id, related_concept_id, score) from program 'gunzip -c /app/csv-files/concepts_related_concepts.csv.gz' csv header + +--institutions + +\copy openalex.institutions (id, ror, display_name, country_code, type, homepage_url, image_url, image_thumbnail_url, display_name_acronyms, display_name_alternatives, works_count, cited_by_count, works_api_url, updated_date) from program 'gunzip -c /app/csv-files/institutions.csv.gz' csv header +\copy openalex.institutions_ids (institution_id, openalex, ror, grid, wikipedia, wikidata, mag) from program 'gunzip -c /app/csv-files/institutions_ids.csv.gz' csv header +\copy openalex.institutions_geo (institution_id, city, geonames_city_id, region, country_code, country, latitude, longitude) from program 'gunzip -c /app/csv-files/institutions_geo.csv.gz' csv header +\copy openalex.institutions_associated_institutions (institution_id, associated_institution_id, relationship) from program 'gunzip -c /app/csv-files/institutions_associated_institutions.csv.gz' csv header +\copy openalex.institutions_counts_by_year (institution_id, year, works_count, cited_by_count, oa_works_count) from program 'gunzip -c /app/csv-files/institutions_counts_by_year.csv.gz' csv header + +--publishers + +\copy openalex.publishers (id, display_name, alternate_titles, country_codes, hierarchy_level, parent_publisher, works_count, cited_by_count, sources_api_url, updated_date) from program 'gunzip -c /app/csv-files/publishers.csv.gz' csv header +\copy openalex.publishers_ids (publisher_id, openalex, ror, wikidata) from program 'gunzip -c /app/csv-files/publishers_ids.csv.gz' csv header +\copy openalex.publishers_counts_by_year (publisher_id, year, works_count, cited_by_count, oa_works_count) from program 'gunzip -c /app/csv-files/publishers_counts_by_year.csv.gz' csv header + +--sources + +\copy openalex.sources (id, issn_l, issn, display_name, publisher, works_count, cited_by_count, is_oa, is_in_doaj, homepage_url, works_api_url, updated_date) from program 'gunzip -c /app/csv-files/sources.csv.gz' csv header +\copy openalex.sources_ids (source_id, openalex, issn_l, issn, mag, wikidata, fatcat) from program 'gunzip -c /app/csv-files/sources_ids.csv.gz' csv header +\copy openalex.sources_counts_by_year (source_id, year, works_count, cited_by_count, oa_works_count) from program 'gunzip -c /app/csv-files/sources_counts_by_year.csv.gz' csv header + +--works + +\copy openalex.works (id, doi, title, display_name, publication_year, publication_date, type, cited_by_count, is_retracted, is_paratext, cited_by_api_url, abstract_inverted_index, language) from program 'gunzip -c /app/csv-files/works.csv.gz' csv header +\copy openalex.works_primary_locations (work_id, source_id, landing_page_url, pdf_url, is_oa, version, license) from program 'gunzip -c /app/csv-files/works_primary_locations.csv.gz' csv header +\copy openalex.works_locations (work_id, source_id, landing_page_url, pdf_url, is_oa, version, license) from program 'gunzip -c /app/csv-files/works_locations.csv.gz' csv header +\copy openalex.works_best_oa_locations (work_id, source_id, landing_page_url, pdf_url, is_oa, version, license) from program 'gunzip -c /app/csv-files/works_best_oa_locations.csv.gz' csv header +\copy openalex.works_authorships (work_id, author_position, author_id, institution_id, raw_affiliation_string) from program 'gunzip -c /app/csv-files/works_authorships.csv.gz' csv header +\copy openalex.works_biblio (work_id, volume, issue, first_page, last_page) from program 'gunzip -c /app/csv-files/works_biblio.csv.gz' csv header +\copy openalex.works_topics (work_id, topic_id, score) from program 'gunzip -c /app/csv-files/works_topics.csv.gz' csv header +\copy openalex.works_concepts (work_id, concept_id, score) from program 'gunzip -c /app/csv-files/works_concepts.csv.gz' csv header +\copy openalex.works_ids (work_id, openalex, doi, mag, pmid, pmcid) from program 'gunzip -c /app/csv-files/works_ids.csv.gz' csv header +\copy openalex.works_mesh (work_id, descriptor_ui, descriptor_name, qualifier_ui, qualifier_name, is_major_topic) from program 'gunzip -c /app/csv-files/works_mesh.csv.gz' csv header +\copy openalex.works_open_access (work_id, is_oa, oa_status, oa_url, any_repository_has_fulltext) from program 'gunzip -c /app/csv-files/works_open_access.csv.gz' csv header +\copy openalex.works_referenced_works (work_id, referenced_work_id) from program 'gunzip -c /app/csv-files/works_referenced_works.csv.gz' csv header +\copy openalex.works_related_works (work_id, related_work_id) from program 'gunzip -c /app/csv-files/works_related_works.csv.gz' csv header \ No newline at end of file diff --git a/copy-openalex-csv.sql b/copy-openalex-csv.sql index bb372a0..395bba3 100644 --- a/copy-openalex-csv.sql +++ b/copy-openalex-csv.sql @@ -50,5 +50,4 @@ \copy openalex.works_mesh (work_id, descriptor_ui, descriptor_name, qualifier_ui, qualifier_name, is_major_topic) from program 'gunzip -c csv-files/works_mesh.csv.gz' csv header \copy openalex.works_open_access (work_id, is_oa, oa_status, oa_url, any_repository_has_fulltext) from program 'gunzip -c csv-files/works_open_access.csv.gz' csv header \copy openalex.works_referenced_works (work_id, referenced_work_id) from program 'gunzip -c csv-files/works_referenced_works.csv.gz' csv header -\copy openalex.works_related_works (work_id, related_work_id) from program 'gunzip -c csv-files/works_related_works.csv.gz' csv header - +\copy openalex.works_related_works (work_id, related_work_id) from program 'gunzip -c csv-files/works_related_works.csv.gz' csv header \ No newline at end of file diff --git a/flatten-openalex-jsonl-parallel.py b/flatten-openalex-jsonl-parallel.py new file mode 100644 index 0000000..3cab913 --- /dev/null +++ b/flatten-openalex-jsonl-parallel.py @@ -0,0 +1,594 @@ +import csv +import glob +import gzip +import json +import os +import shutil +import multiprocessing +from concurrent.futures import ProcessPoolExecutor +from functools import partial + +SNAPSHOT_DIR = 'openalex-snapshot' +CSV_DIR = 'csv-files' +TEMP_DIR = 'temp-shards' + +if not os.path.exists(CSV_DIR): + os.mkdir(CSV_DIR) + +if not os.path.exists(TEMP_DIR): + os.mkdir(TEMP_DIR) + +FILES_PER_ENTITY = int(os.environ.get('OPENALEX_DEMO_FILES_PER_ENTITY', '0')) + +csv_files = { + 'authors': { + 'authors': { + 'name': os.path.join(CSV_DIR, 'authors.csv.gz'), + 'columns': [ + 'id', 'orcid', 'display_name', 'display_name_alternatives', + 'works_count', 'cited_by_count', + 'last_known_institution', 'works_api_url', 'updated_date', + ] + }, + 'ids': { + 'name': os.path.join(CSV_DIR, 'authors_ids.csv.gz'), + 'columns': [ + 'author_id', 'openalex', 'orcid', 'scopus', 'twitter', + 'wikipedia', 'mag' + ] + }, + 'counts_by_year': { + 'name': os.path.join(CSV_DIR, 'authors_counts_by_year.csv.gz'), + 'columns': [ + 'author_id', 'year', 'works_count', 'cited_by_count', + 'oa_works_count' + ] + } + }, + 'concepts': { + 'concepts': { + 'name': os.path.join(CSV_DIR, 'concepts.csv.gz'), + 'columns': [ + 'id', 'wikidata', 'display_name', 'level', 'description', + 'works_count', 'cited_by_count', 'image_url', + 'image_thumbnail_url', 'works_api_url', 'updated_date' + ] + }, + 'ancestors': { + 'name': os.path.join(CSV_DIR, 'concepts_ancestors.csv.gz'), + 'columns': ['concept_id', 'ancestor_id'] + }, + 'counts_by_year': { + 'name': os.path.join(CSV_DIR, 'concepts_counts_by_year.csv.gz'), + 'columns': ['concept_id', 'year', 'works_count', 'cited_by_count', + 'oa_works_count'] + }, + 'ids': { + 'name': os.path.join(CSV_DIR, 'concepts_ids.csv.gz'), + 'columns': ['concept_id', 'openalex', 'wikidata', 'wikipedia', + 'umls_aui', 'umls_cui', 'mag'] + }, + 'related_concepts': { + 'name': os.path.join(CSV_DIR, 'concepts_related_concepts.csv.gz'), + 'columns': ['concept_id', 'related_concept_id', 'score'] + } + }, + 'topics': { + 'topics': { + 'name': os.path.join(CSV_DIR, 'topics.csv.gz'), + 'columns': ['id', 'display_name', 'subfield_id', + 'subfield_display_name', 'field_id', + 'field_display_name', + 'domain_id', 'domain_display_name', 'description', + 'keywords', 'works_api_url', 'wikipedia_id', + 'works_count', 'cited_by_count', 'updated_date', 'siblings'] + } + }, + 'institutions': { + 'institutions': { + 'name': os.path.join(CSV_DIR, 'institutions.csv.gz'), + 'columns': [ + 'id', 'ror', 'display_name', 'country_code', 'type', + 'homepage_url', 'image_url', 'image_thumbnail_url', + 'display_name_acronyms', 'display_name_alternatives', + 'works_count', 'cited_by_count', 'works_api_url', + 'updated_date' + ] + }, + 'ids': { + 'name': os.path.join(CSV_DIR, 'institutions_ids.csv.gz'), + 'columns': [ + 'institution_id', 'openalex', 'ror', 'grid', 'wikipedia', + 'wikidata', 'mag' + ] + }, + 'geo': { + 'name': os.path.join(CSV_DIR, 'institutions_geo.csv.gz'), + 'columns': [ + 'institution_id', 'city', 'geonames_city_id', 'region', + 'country_code', 'country', 'latitude', + 'longitude' + ] + }, + 'associated_institutions': { + 'name': os.path.join(CSV_DIR, + 'institutions_associated_institutions.csv.gz'), + 'columns': [ + 'institution_id', 'associated_institution_id', 'relationship' + ] + }, + 'counts_by_year': { + 'name': os.path.join(CSV_DIR, 'institutions_counts_by_year.csv.gz'), + 'columns': [ + 'institution_id', 'year', 'works_count', 'cited_by_count', + 'oa_works_count' + ] + } + }, + 'publishers': { + 'publishers': { + 'name': os.path.join(CSV_DIR, 'publishers.csv.gz'), + 'columns': [ + 'id', 'display_name', 'alternate_titles', 'country_codes', + 'hierarchy_level', 'parent_publisher', + 'works_count', 'cited_by_count', 'sources_api_url', + 'updated_date' + ] + }, + 'counts_by_year': { + 'name': os.path.join(CSV_DIR, 'publishers_counts_by_year.csv.gz'), + 'columns': ['publisher_id', 'year', 'works_count', 'cited_by_count', + 'oa_works_count'] + }, + 'ids': { + 'name': os.path.join(CSV_DIR, 'publishers_ids.csv.gz'), + 'columns': ['publisher_id', 'openalex', 'ror', 'wikidata'] + }, + }, + 'sources': { + 'sources': { + 'name': os.path.join(CSV_DIR, 'sources.csv.gz'), + 'columns': [ + 'id', 'issn_l', 'issn', 'display_name', 'publisher', + 'works_count', 'cited_by_count', 'is_oa', + 'is_in_doaj', 'homepage_url', 'works_api_url', 'updated_date' + ] + }, + 'ids': { + 'name': os.path.join(CSV_DIR, 'sources_ids.csv.gz'), + 'columns': ['source_id', 'openalex', 'issn_l', 'issn', 'mag', + 'wikidata', 'fatcat'] + }, + 'counts_by_year': { + 'name': os.path.join(CSV_DIR, 'sources_counts_by_year.csv.gz'), + 'columns': ['source_id', 'year', 'works_count', 'cited_by_count', + 'oa_works_count'] + }, + }, + 'works': { + 'works': { + 'name': os.path.join(CSV_DIR, 'works.csv.gz'), + 'columns': [ + 'id', 'doi', 'title', 'display_name', 'publication_year', + 'publication_date', 'type', 'cited_by_count', + 'is_retracted', 'is_paratext', 'cited_by_api_url', + 'abstract_inverted_index', 'language' + ] + }, + 'primary_locations': { + 'name': os.path.join(CSV_DIR, 'works_primary_locations.csv.gz'), + 'columns': [ + 'work_id', 'source_id', 'landing_page_url', 'pdf_url', 'is_oa', + 'version', 'license' + ] + }, + 'locations': { + 'name': os.path.join(CSV_DIR, 'works_locations.csv.gz'), + 'columns': [ + 'work_id', 'source_id', 'landing_page_url', 'pdf_url', 'is_oa', + 'version', 'license' + ] + }, + 'best_oa_locations': { + 'name': os.path.join(CSV_DIR, 'works_best_oa_locations.csv.gz'), + 'columns': [ + 'work_id', 'source_id', 'landing_page_url', 'pdf_url', 'is_oa', + 'version', 'license' + ] + }, + 'authorships': { + 'name': os.path.join(CSV_DIR, 'works_authorships.csv.gz'), + 'columns': [ + 'work_id', 'author_position', 'author_id', 'institution_id', + 'raw_affiliation_string' + ] + }, + 'biblio': { + 'name': os.path.join(CSV_DIR, 'works_biblio.csv.gz'), + 'columns': [ + 'work_id', 'volume', 'issue', 'first_page', 'last_page' + ] + }, + 'topics': { + 'name': os.path.join(CSV_DIR, 'works_topics.csv.gz'), + 'columns': [ + 'work_id', 'topic_id', 'score' + ] + }, + 'concepts': { + 'name': os.path.join(CSV_DIR, 'works_concepts.csv.gz'), + 'columns': [ + 'work_id', 'concept_id', 'score' + ] + }, + 'ids': { + 'name': os.path.join(CSV_DIR, 'works_ids.csv.gz'), + 'columns': [ + 'work_id', 'openalex', 'doi', 'mag', 'pmid', 'pmcid' + ] + }, + 'mesh': { + 'name': os.path.join(CSV_DIR, 'works_mesh.csv.gz'), + 'columns': [ + 'work_id', 'descriptor_ui', 'descriptor_name', 'qualifier_ui', + 'qualifier_name', 'is_major_topic' + ] + }, + 'open_access': { + 'name': os.path.join(CSV_DIR, 'works_open_access.csv.gz'), + 'columns': [ + 'work_id', 'is_oa', 'oa_status', 'oa_url', + 'any_repository_has_fulltext' + ] + }, + 'referenced_works': { + 'name': os.path.join(CSV_DIR, 'works_referenced_works.csv.gz'), + 'columns': [ + 'work_id', 'referenced_work_id' + ] + }, + 'related_works': { + 'name': os.path.join(CSV_DIR, 'works_related_works.csv.gz'), + 'columns': [ + 'work_id', 'related_work_id' + ] + }, + }, +} + +def init_dict_writer(csv_file, file_spec, **kwargs): + writer = csv.DictWriter( + csv_file, fieldnames=file_spec['columns'], **kwargs + ) + writer.writeheader() + return writer + +def merge_shards(entity_type, shard_paths): + """Merges gzipped shards into final files using binary concatenation.""" + spec = csv_files[entity_type] + for table_name, table_spec in spec.items(): + final_path = table_spec['name'] + table_shards = [s[table_name] for s in shard_paths if table_name in s] + + if not table_shards: + continue + + print(f"Merging {len(table_shards)} shards into {final_path}...") + + # Write header to a new file + with gzip.open(final_path, 'wt', encoding='utf-8') as f_out: + writer = csv.DictWriter(f_out, fieldnames=table_spec['columns']) + writer.writeheader() + + # Append gzipped shards at binary level + with open(final_path, 'ab') as f_out: + for shard_path in table_shards: + with open(shard_path, 'rb') as f_in: + shutil.copyfileobj(f_in, f_out) + os.remove(shard_path) + +def process_single_file(entity_type, jsonl_file_name): + """Worker function to process a single JSONL file into shards.""" + # Create a unique shard prefix by using the relative path from SNAPSHOT_DIR + rel_path = os.path.relpath(jsonl_file_name, SNAPSHOT_DIR) + # Replace slashes and other potentially problematic characters + unique_name = rel_path.replace(os.sep, '_').replace('.', '_') + shard_prefix = f"{entity_type}_{unique_name}_" + file_spec = csv_files[entity_type] + + shard_files = {} + shard_writers = {} + shard_paths = {table_name: os.path.join(TEMP_DIR, f"{shard_prefix}{table_name}.csv.gz") + for table_name in file_spec} + + # Check if this file is already done (resumability) + if all(os.path.exists(p) and os.path.getsize(p) > 0 for p in shard_paths.values()): + return shard_paths + + # Open all needed CSV shards for this entity type + for table_name, table_spec in file_spec.items(): + shard_path = shard_paths[table_name] + f = gzip.open(shard_path, 'wt', encoding='utf-8') + shard_files[table_name] = f + # We ignore extra fields to be robust against schema additions + shard_writers[table_name] = csv.DictWriter(f, fieldnames=table_spec['columns'], extrasaction='ignore') + + try: + print(f"Processing {jsonl_file_name}...") + with gzip.open(jsonl_file_name, 'r') as jsonl_file: + for line in jsonl_file: + if not line.strip(): + continue + item = json.loads(line) + + if entity_type == 'authors': + flatten_author_item(item, shard_writers) + elif entity_type == 'topics': + flatten_topic_item(item, shard_writers) + elif entity_type == 'concepts': + flatten_concept_item(item, shard_writers) + elif entity_type == 'institutions': + flatten_institution_item(item, shard_writers) + elif entity_type == 'publishers': + flatten_publisher_item(item, shard_writers) + elif entity_type == 'sources': + flatten_source_item(item, shard_writers) + elif entity_type == 'works': + flatten_work_item(item, shard_writers) + finally: + for f in shard_files.values(): + if f: f.close() + + return shard_paths + +# --- Entity-specific flattening logic (extracted from original script) --- + +def flatten_author_item(author, writers): + if not (author_id := author.get('id')): + return + + author['display_name_alternatives'] = json.dumps(author.get('display_name_alternatives'), ensure_ascii=False) + author['last_known_institution'] = (author.get('last_known_institution') or {}).get('id') + writers['authors'].writerow(author) + + if author_ids := author.get('ids'): + author_ids['author_id'] = author_id + writers['ids'].writerow(author_ids) + + if counts_by_year := author.get('counts_by_year'): + for count_by_year in counts_by_year: + count_by_year['author_id'] = author_id + writers['counts_by_year'].writerow(count_by_year) + +def flatten_topic_item(topic, writers): + if not (topic_id := topic.get('id')): + return + topic['keywords'] = '; '.join(topic.get('keywords') or []) + for key in ('subfield', 'field', 'domain'): + if topic.get(key): + topic[f'{key}_id'] = topic[key].get('id') + topic[f'{key}_display_name'] = topic[key].get('display_name') + del topic[key] + + # Handle older vs newer OpenAlex schema for 'updated' / 'updated_date' + if 'updated' in topic: + topic['updated_date'] = topic['updated'] + del topic['updated'] + + if topic.get('ids'): + topic['wikipedia_id'] = topic['ids'].get('wikipedia') + del topic['ids'] + if 'created_date' in topic: del topic['created_date'] + writers['topics'].writerow(topic) + +def flatten_concept_item(concept, writers): + if not (concept_id := concept.get('id')): + return + writers['concepts'].writerow(concept) + + if concept_ids := concept.get('ids'): + concept_ids['concept_id'] = concept_id + concept_ids['umls_aui'] = json.dumps(concept_ids.get('umls_aui'), ensure_ascii=False) + concept_ids['umls_cui'] = json.dumps(concept_ids.get('umls_cui'), ensure_ascii=False) + writers['ids'].writerow(concept_ids) + + if ancestors := concept.get('ancestors'): + for ancestor in ancestors: + if ancestor_id := ancestor.get('id'): + writers['ancestors'].writerow({'concept_id': concept_id, 'ancestor_id': ancestor_id}) + + if counts_by_year := concept.get('counts_by_year'): + for count_by_year in counts_by_year: + count_by_year['concept_id'] = concept_id + writers['counts_by_year'].writerow(count_by_year) + + if related_concepts := concept.get('related_concepts'): + for related_concept in related_concepts: + if related_concept_id := related_concept.get('id'): + writers['related_concepts'].writerow({ + 'concept_id': concept_id, + 'related_concept_id': related_concept_id, + 'score': related_concept.get('score') + }) + +def flatten_institution_item(institution, writers): + if not (institution_id := institution.get('id')): + return + institution['display_name_acronyms'] = json.dumps(institution.get('display_name_acronyms'), ensure_ascii=False) + institution['display_name_alternatives'] = json.dumps(institution.get('display_name_alternatives'), ensure_ascii=False) + writers['institutions'].writerow(institution) + + if institution_ids := institution.get('ids'): + institution_ids['institution_id'] = institution_id + writers['ids'].writerow(institution_ids) + + if institution_geo := institution.get('geo'): + institution_geo['institution_id'] = institution_id + writers['geo'].writerow(institution_geo) + + associated = institution.get('associated_institutions', institution.get('associated_insitutions')) + if associated: + for assoc in associated: + if assoc_id := assoc.get('id'): + writers['associated_institutions'].writerow({ + 'institution_id': institution_id, + 'associated_institution_id': assoc_id, + 'relationship': assoc.get('relationship') + }) + + if counts_by_year := institution.get('counts_by_year'): + for count_by_year in counts_by_year: + count_by_year['institution_id'] = institution_id + writers['counts_by_year'].writerow(count_by_year) + +def flatten_publisher_item(publisher, writers): + if not (publisher_id := publisher.get('id')): + return + publisher['alternate_titles'] = json.dumps(publisher.get('alternate_titles'), ensure_ascii=False) + publisher['country_codes'] = json.dumps(publisher.get('country_codes'), ensure_ascii=False) + writers['publishers'].writerow(publisher) + + if publisher_ids := publisher.get('ids'): + publisher_ids['publisher_id'] = publisher_id + writers['ids'].writerow(publisher_ids) + + if counts_by_year := publisher.get('counts_by_year'): + for count_by_year in counts_by_year: + count_by_year['publisher_id'] = publisher_id + writers['counts_by_year'].writerow(count_by_year) + +def flatten_source_item(source, writers): + if not (source_id := source.get('id')): + return + source['issn'] = json.dumps(source.get('issn')) + writers['sources'].writerow(source) + + if source_ids := source.get('ids'): + source_ids['source_id'] = source_id + source_ids['issn'] = json.dumps(source_ids.get('issn')) + writers['ids'].writerow(source_ids) + + if counts_by_year := source.get('counts_by_year'): + for count_by_year in counts_by_year: + count_by_year['source_id'] = source_id + writers['counts_by_year'].writerow(count_by_year) + +def flatten_work_item(work, writers): + if not (work_id := work.get('id')): + return + + if (abstract := work.get('abstract_inverted_index')) is not None: + work['abstract_inverted_index'] = json.dumps(abstract, ensure_ascii=False) + writers['works'].writerow(work) + + for loc_key in ['primary_location', 'best_oa_location']: + loc = work.get(loc_key) or {} + if loc.get('source', {}).get('id'): + writers[f"{loc_key}s"].writerow({ + 'work_id': work_id, + 'source_id': loc['source']['id'], + 'landing_page_url': loc.get('landing_page_url'), + 'pdf_url': loc.get('pdf_url'), + 'is_oa': loc.get('is_oa'), + 'version': loc.get('version'), + 'license': loc.get('license'), + }) + + for loc in work.get('locations', []): + if loc.get('source', {}).get('id'): + writers['locations'].writerow({ + 'work_id': work_id, + 'source_id': loc['source']['id'], + 'landing_page_url': loc.get('landing_page_url'), + 'pdf_url': loc.get('pdf_url'), + 'is_oa': loc.get('is_oa'), + 'version': loc.get('version'), + 'license': loc.get('license'), + }) + + for authorship in work.get('authorships', []): + if author_id := authorship.get('author', {}).get('id'): + insts = authorship.get('institutions') or [None] + inst_ids = [i.get('id') if i else None for i in insts] + for inst_id in inst_ids: + if inst_id or len(inst_ids) == 1: + writers['authorships'].writerow({ + 'work_id': work_id, + 'author_position': authorship.get('author_position'), + 'author_id': author_id, + 'institution_id': inst_id, + 'raw_affiliation_string': authorship.get('raw_affiliation_string'), + }) + + if biblio := work.get('biblio'): + biblio['work_id'] = work_id + writers['biblio'].writerow(biblio) + + for topic in work.get('topics', []): + if topic_id := topic.get('id'): + writers['topics'].writerow({'work_id': work_id, 'topic_id': topic_id, 'score': topic.get('score')}) + + for concept in work.get('concepts', []): + if concept_id := concept.get('id'): + writers['concepts'].writerow({'work_id': work_id, 'concept_id': concept_id, 'score': concept.get('score')}) + + if ids := work.get('ids'): + ids['work_id'] = work_id + writers['ids'].writerow(ids) + + for mesh in work.get('mesh', []): + mesh['work_id'] = work_id + writers['mesh'].writerow(mesh) + + if oa := work.get('open_access'): + oa['work_id'] = work_id + writers['open_access'].writerow(oa) + + for ref in work.get('referenced_works', []): + if ref: writers['referenced_works'].writerow({'work_id': work_id, 'referenced_work_id': ref}) + + for rel in work.get('related_works', []): + if rel: writers['related_works'].writerow({'work_id': work_id, 'related_work_id': rel}) + +# --- Coordination logic --- + +def flatten_entity(entity_type): + """Level 2 Parallelism: Processes JSONL files for an entity type in parallel.""" + spec = csv_files[entity_type] + # Check if all final files for this entity already exist (Skip finished entities) + # We exclude 'works' from this auto-skip because it often exists as a partial file + if entity_type != 'works' and all(os.path.exists(table['name']) for table in spec.values()): + print(f"Final files for {entity_type} already exist. Skipping...") + return + + files = glob.glob(os.path.join(SNAPSHOT_DIR, 'data', entity_type, '*', '*.gz')) + if FILES_PER_ENTITY: + files = files[:FILES_PER_ENTITY] + + if not files: + print(f"No files found for {entity_type}") + return + + # Use a process pool for the files of this entity + # Use as many workers as possible while leaving a margin of 30 cores + num_workers = max(1, os.cpu_count() - 30) + print(f"Starting pool with {num_workers} workers for {entity_type}...") + with ProcessPoolExecutor(max_workers=num_workers) as executor: + shard_paths = list(executor.map(partial(process_single_file, entity_type), files)) + + # Merge all shards created for this entity + merge_shards(entity_type, shard_paths) + print(f"Finished flattening for {entity_type}.") + +if __name__ == '__main__': + entities = ['authors', 'concepts', 'topics', 'institutions', 'publishers', 'sources', 'works'] + + for entity in entities: + print(f"Starting to process entity: {entity}") + flatten_entity(entity) + + # Clean up temp directory + if os.path.exists(TEMP_DIR) and not os.listdir(TEMP_DIR): + os.rmdir(TEMP_DIR) + + print("All entities processed successfully.") diff --git a/flattener-verification.md b/flattener-verification.md new file mode 100644 index 0000000..38d1497 --- /dev/null +++ b/flattener-verification.md @@ -0,0 +1,32 @@ +# Parallel Flattener Verification Report + +**Date:** 2026-02-05 +**Log File Analysis:** `/home/ywu47/rag_prototype/flatten.log` + +## Status Summary +The parallel flattener script (`flatten-openalex-jsonl-parallel.py`) was verified for correctness and completeness based on the current execution logs. + +### 1. Error Check +- **Result:** No errors found. +- **Details:** A full scan of `flatten.log` for "Error", "Traceback", and "Exception" returned zero matches. + +### 2. Entity Completion Status +The following entities have successfully finished both the parallel processing of JSONL files and the binary merging of shards: + +| Entity | Shards Processed | Shards Merged | Status | +| :--- | :--- | :--- | :--- | +| **authors** | 438 | 438 | ✅ Complete | +| **concepts** | 3 | 3 | ✅ Complete | +| **topics** | 1 | 1 | ✅ Complete | +| **institutions** | 61 | 61 | ✅ Complete | +| **publishers** | 50 | 50 | ✅ Complete | +| **sources** | 42 | 42 | ✅ Complete | +| **works** | Pending | Pending | 🔄 In Progress | + +### 3. Verification Details +- **Shard Consistency:** The number of "Processing" entries in the log matches the "Merging" count for every completed entity. +- **Cleanup Check:** The `temp-shards/` directory was inspected. No orphaned shards from the completed entities remain, confirming that the merge-then-delete logic executed successfully. +- **Ongoing Work:** The script is currently processing the `works` entity, which is the final and largest component of the dataset. + +--- +*Report generated by Antigravity Assistant.* diff --git a/load_resilient.sh b/load_resilient.sh new file mode 100755 index 0000000..0b7e94e --- /dev/null +++ b/load_resilient.sh @@ -0,0 +1,72 @@ +#!/bin/bash + +# Usage with nohup: +# nohup ./openalex-documentation-scripts/load_resilient.sh > load_resilient.log 2>&1 & +# +# To monitor progress: +# tail -f load_resilient.log + +set -e + +# Configuration +DB_HOST="10.230.100.200" +DB_PORT="5432" +DB_NAME="agentic_fs_research" +DB_USER="ywu47" +export PGPASSWORD='Gakki0611wuyi!' + +# Table list to load +TABLES=("topics" "works" "works_authorships" "works_best_oa_locations" "works_biblio" "works_concepts" "works_locations" "works_mesh" "works_open_access" "works_primary_locations" "works_referenced_works" "works_topics") + +load_table_resilient() { + local table=$1 + local csv="csv-files/${table}.csv.gz" + + echo "---------------------------------------------------" + echo "Processing table: $table" + echo "---------------------------------------------------" + + # 1. Clear existing data in the table to avoid duplicates/partials + podman run --rm -e PGPASSWORD="$PGPASSWORD" postgres:15-alpine psql -h "$DB_HOST" -p "$DB_PORT" -d "$DB_NAME" -U "$DB_USER" -c "TRUNCATE openalex.$table;" + + # 2. Split and load in chunks of 1 million rows + # We use temporary files for chunks to ensure speed and reliability + mkdir -p tmp_chunks + + echo "Splitting $csv into chunks..." + gunzip -c "$csv" | split -l 1000000 --additional-suffix=.csv - tmp_chunks/${table}_chunk_ + + # Get the header from the first chunk + header=$(head -n 1 tmp_chunks/${table}_chunk_aa.csv) + + for chunk in tmp_chunks/${table}_chunk_*; do + echo "Loading chunk: $chunk" + + # If it's not the first chunk, it won't have the header. + # But split includes header only in the first chunk. + # So we use psql's COPY for each chunk. + + if [ "$chunk" == "tmp_chunks/${table}_chunk_aa.csv" ]; then + # First chunk has the header + podman run --rm -v $(pwd):/app:Z -e PGPASSWORD="$PGPASSWORD" postgres:15-alpine psql -h "$DB_HOST" -p "$DB_PORT" -d "$DB_NAME" -U "$DB_USER" -c "\copy openalex.$table from '/app/$chunk' with (format csv, header true);" + else + # Other chunks don't have the header, prepend it or use psql without header + podman run --rm -v $(pwd):/app:Z -e PGPASSWORD="$PGPASSWORD" postgres:15-alpine psql -h "$DB_HOST" -p "$DB_PORT" -d "$DB_NAME" -U "$DB_USER" -c "\copy openalex.$table from '/app/$chunk' with (format csv, header false);" + fi + + if [ $? -ne 0 ]; then + echo "ERROR: Failed to load chunk $chunk. Skipping to next chunk..." + else + rm "$chunk" + fi + done + + rm -rf tmp_chunks +} + +# Main execution +for table in "${TABLES[@]}"; do + load_table_resilient "$table" +done + +echo "All specified tables processed." diff --git a/openalex-pg-schema.sql b/openalex-pg-schema.sql index b081e54..f0f84cd 100644 --- a/openalex-pg-schema.sql +++ b/openalex-pg-schema.sql @@ -1,3 +1,4 @@ +-- Active: 1770169352525@@10.230.100.200@5432@agentic_fs_research -- -- PostgreSQL database dump -- @@ -6,14 +7,23 @@ -- Dumped by pg_dump version 14.1 SET statement_timeout = 0; + SET lock_timeout = 0; + SET idle_in_transaction_session_timeout = 0; + SET client_encoding = 'UTF8'; + SET standard_conforming_strings = on; -SELECT pg_catalog.set_config('search_path', '', false); + +SELECT pg_catalog.set_config ('search_path', '', false); + SET check_function_bodies = false; + SET xmloption = content; + SET client_min_messages = warning; + SET row_security = off; -- @@ -22,7 +32,6 @@ SET row_security = off; CREATE SCHEMA openalex; - SET default_tablespace = ''; SET default_table_access_method = heap; @@ -43,7 +52,6 @@ CREATE TABLE openalex.authors ( updated_date timestamp without time zone ); - -- -- Name: authors_counts_by_year; Type: TABLE; Schema: openalex; Owner: - -- @@ -56,7 +64,6 @@ CREATE TABLE openalex.authors_counts_by_year ( oa_works_count integer ); - -- -- Name: authors_ids; Type: TABLE; Schema: openalex; Owner: - -- @@ -71,7 +78,6 @@ CREATE TABLE openalex.authors_ids ( mag bigint ); - CREATE TABLE openalex.topics ( id text NOT NULL, display_name text, @@ -87,7 +93,8 @@ CREATE TABLE openalex.topics ( wikipedia_id text, works_count integer, cited_by_count integer, - updated_date timestamp without time zone + updated_date timestamp without time zone, + siblings text ); -- @@ -108,7 +115,6 @@ CREATE TABLE openalex.concepts ( updated_date timestamp without time zone ); - -- -- Name: concepts_ancestors; Type: TABLE; Schema: openalex; Owner: - -- @@ -118,7 +124,6 @@ CREATE TABLE openalex.concepts_ancestors ( ancestor_id text ); - -- -- Name: concepts_counts_by_year; Type: TABLE; Schema: openalex; Owner: - -- @@ -131,7 +136,6 @@ CREATE TABLE openalex.concepts_counts_by_year ( oa_works_count integer ); - -- -- Name: concepts_ids; Type: TABLE; Schema: openalex; Owner: - -- @@ -146,7 +150,6 @@ CREATE TABLE openalex.concepts_ids ( mag bigint ); - -- -- Name: concepts_related_concepts; Type: TABLE; Schema: openalex; Owner: - -- @@ -157,7 +160,6 @@ CREATE TABLE openalex.concepts_related_concepts ( score real ); - -- -- Name: institutions; Type: TABLE; Schema: openalex; Owner: - -- @@ -179,7 +181,6 @@ CREATE TABLE openalex.institutions ( updated_date timestamp without time zone ); - -- -- Name: institutions_associated_institutions; Type: TABLE; Schema: openalex; Owner: - -- @@ -190,7 +191,6 @@ CREATE TABLE openalex.institutions_associated_institutions ( relationship text ); - -- -- Name: institutions_counts_by_year; Type: TABLE; Schema: openalex; Owner: - -- @@ -203,7 +203,6 @@ CREATE TABLE openalex.institutions_counts_by_year ( oa_works_count integer ); - -- -- Name: institutions_geo; Type: TABLE; Schema: openalex; Owner: - -- @@ -219,7 +218,6 @@ CREATE TABLE openalex.institutions_geo ( longitude real ); - -- -- Name: institutions_ids; Type: TABLE; Schema: openalex; Owner: - -- @@ -234,7 +232,6 @@ CREATE TABLE openalex.institutions_ids ( mag bigint ); - -- -- Name: publishers; Type: TABLE; Schema: openalex; Owner: - -- @@ -252,7 +249,6 @@ CREATE TABLE openalex.publishers ( updated_date timestamp without time zone ); - -- -- Name: publishers_counts_by_year; Type: TABLE; Schema: openalex; Owner: - -- @@ -265,7 +261,6 @@ CREATE TABLE openalex.publishers_counts_by_year ( oa_works_count integer ); - -- -- Name: publishers_ids; Type: TABLE; Schema: openalex; Owner: - -- @@ -277,7 +272,6 @@ CREATE TABLE openalex.publishers_ids ( wikidata text ); - -- -- Name: sources; Type: TABLE; Schema: openalex; Owner: - -- @@ -297,7 +291,6 @@ CREATE TABLE openalex.sources ( updated_date timestamp without time zone ); - -- -- Name: sources_counts_by_year; Type: TABLE; Schema: openalex; Owner: - -- @@ -310,7 +303,6 @@ CREATE TABLE openalex.sources_counts_by_year ( oa_works_count integer ); - -- -- Name: sources_ids; Type: TABLE; Schema: openalex; Owner: - -- @@ -325,7 +317,6 @@ CREATE TABLE openalex.sources_ids ( fatcat text ); - -- -- Name: works; Type: TABLE; Schema: openalex; Owner: - -- @@ -360,7 +351,6 @@ CREATE TABLE openalex.works_primary_locations ( license text ); - -- -- Name: works_locations; Type: TABLE; Schema: openalex; Owner: - -- @@ -375,7 +365,6 @@ CREATE TABLE openalex.works_locations ( license text ); - -- -- Name: works_best_oa_locations; Type: TABLE; Schema: openalex; Owner: - -- @@ -390,7 +379,6 @@ CREATE TABLE openalex.works_best_oa_locations ( license text ); - -- -- Name: works_authorships; Type: TABLE; Schema: openalex; Owner: - -- @@ -403,7 +391,6 @@ CREATE TABLE openalex.works_authorships ( raw_affiliation_string text ); - -- -- Name: works_biblio; Type: TABLE; Schema: openalex; Owner: - -- @@ -436,7 +423,6 @@ CREATE TABLE openalex.works_concepts ( score real ); - -- -- Name: works_ids; Type: TABLE; Schema: openalex; Owner: - -- @@ -450,7 +436,6 @@ CREATE TABLE openalex.works_ids ( pmcid text ); - -- -- Name: works_mesh; Type: TABLE; Schema: openalex; Owner: - -- @@ -464,7 +449,6 @@ CREATE TABLE openalex.works_mesh ( is_major_topic boolean ); - -- -- Name: works_open_access; Type: TABLE; Schema: openalex; Owner: - -- @@ -477,7 +461,6 @@ CREATE TABLE openalex.works_open_access ( any_repository_has_fulltext boolean ); - -- -- Name: works_referenced_works; Type: TABLE; Schema: openalex; Owner: - -- @@ -487,7 +470,6 @@ CREATE TABLE openalex.works_referenced_works ( referenced_work_id text ); - -- -- Name: works_related_works; Type: TABLE; Schema: openalex; Owner: - -- @@ -497,7 +479,6 @@ CREATE TABLE openalex.works_related_works ( related_work_id text ); - ---- ---- Name: authors_counts_by_year authors_counts_by_year_pkey; Type: CONSTRAINT; Schema: openalex; Owner: - ---- @@ -632,14 +613,12 @@ CREATE TABLE openalex.works_related_works ( CREATE INDEX concepts_ancestors_concept_id_idx ON openalex.concepts_ancestors USING btree (concept_id); - -- -- Name: concepts_related_concepts_concept_id_idx; Type: INDEX; Schema: openalex; Owner: - -- CREATE INDEX concepts_related_concepts_concept_id_idx ON openalex.concepts_related_concepts USING btree (concept_id); - -- -- Name: concepts_related_concepts_related_concept_id_idx; Type: INDEX; Schema: openalex; Owner: - -- @@ -652,21 +631,18 @@ CREATE INDEX concepts_related_concepts_related_concept_id_idx ON openalex.concep CREATE INDEX works_primary_locations_work_id_idx ON openalex.works_primary_locations USING btree (work_id); - -- -- Name: works_locations_work_id_idx; Type: INDEX; Schema: openalex; Owner: - -- CREATE INDEX works_locations_work_id_idx ON openalex.works_locations USING btree (work_id); - -- -- Name: works_best_oa_locations_work_id_idx; Type: INDEX; Schema: openalex; Owner: - -- CREATE INDEX works_best_oa_locations_work_id_idx ON openalex.works_best_oa_locations USING btree (work_id); - -- -- PostgreSQL database dump complete --- +-- \ No newline at end of file diff --git a/optimization-plan.md b/optimization-plan.md new file mode 100644 index 0000000..277b228 --- /dev/null +++ b/optimization-plan.md @@ -0,0 +1,42 @@ +# Optimization Plan - Parallelize OpenAlex Flattening + +Create a new standalone script `flatten-openalex-jsonl-parallel.py` that parallelizes the flattening process to significantly reduce processing time. + +## Why Multiprocessing? + +Python has a Global Interpreter Lock (GIL) that prevents multiple threads from executing Python bytecodes at once. This means that for CPU-intensive tasks like parsing JSON and writing CSVs, **threading** wouldn't provide a significant speedup on a single machine. + +**Multiprocessing** bypasses the GIL by creating entirely separate memory spaces and Python interpreters for each task, allowing us to truly use all available CPU cores. + +## Proposed Changes + +### Parallel script creation + +#### [NEW] [flatten-openalex-jsonl-parallel.py](file:///home/ywu47/rag_prototype/openalex-documentation-scripts/flatten-openalex-jsonl-parallel.py) + +- **Introduction of Two-Level Parallelism**: + - **Level 1 (Inter-Entity)**: Use `multiprocessing.Process` to run `flatten_authors`, `flatten_works`, etc., simultaneously. + - **Level 2 (Intra-Entity)**: Within each entity flattener (e.g., `flatten_authors`), use a `ProcessPoolExecutor`. + - It creates a "pool" of worker processes (usually matching your CPU core count). + - It takes the 100+ `.gz` files for an entity and hands them out to workers as they become free. + - This ensures we are always crunching data on all cores until the entire entity is done. +- **Efficient Disk I/O (Binary Concatenation)**: + - **The Problem**: We want a single output file (e.g., `authors.csv.gz`) to feed into Postgres. If we use 8 processes to process 100 JSONL files in parallel, and all 8 processes try to write their results into that single `authors.csv.gz` at the same time, the file would get corrupted and performance would tank due to "wait times" (file locking). + - **The Solution**: + 1. We give each process its own private, **temporary shard** to write to (e.g., `authors_part1.csv.gz`). Since no two processes share a shard, they can all write at maximum speed without interference. + 2. Once all shards are done, the main process "glues" them together at the binary level. This is nearly instantaneous and results in the final, single file we need. +- **Maintain Compatibility**: + - The final output file structure and location will remain the same (`csv-files/*.csv.gz`), ensuring that `copy-openalex-csv.sql` continues to work without modifications. + +## Verification Plan + +### Automated Tests +- Since there are no existing unit tests, I will perform a functional verification: + 1. Run the script in "demo mode" (`OPENALEX_DEMO_FILES_PER_ENTITY=1`). + 2. Check that the `csv-files/` directory is populated with `.csv.gz` files. + 3. Use `gunzip -c csv-files/authors.csv.gz | head -n 5` to verify the header and data format. + 4. Verify that multiple entities are processed (check that both `authors.csv.gz` and `works.csv.gz` are created). + +### Manual Verification +- Monitor the CPU usage during execution to ensure multiple cores are being utilized. +- Compare the number of rows in the generated CSVs with a sequential run (on a small subset) if possible. diff --git a/resume-copy.sql b/resume-copy.sql new file mode 100644 index 0000000..e8ae574 --- /dev/null +++ b/resume-copy.sql @@ -0,0 +1,20 @@ +-- Resume loading from the works table +-- This script clears the works table first to ensure no partial data remains from previous aborted attempts + +TRUNCATE openalex.works; + +--works + +\copy openalex.works (id, doi, title, display_name, publication_year, publication_date, type, cited_by_count, is_retracted, is_paratext, cited_by_api_url, abstract_inverted_index, language) from program 'gunzip -c /app/csv-files/works.csv.gz' csv header +\copy openalex.works_primary_locations (work_id, source_id, landing_page_url, pdf_url, is_oa, version, license) from program 'gunzip -c /app/csv-files/works_primary_locations.csv.gz' csv header +\copy openalex.works_locations (work_id, source_id, landing_page_url, pdf_url, is_oa, version, license) from program 'gunzip -c /app/csv-files/works_locations.csv.gz' csv header +\copy openalex.works_best_oa_locations (work_id, source_id, landing_page_url, pdf_url, is_oa, version, license) from program 'gunzip -c /app/csv-files/works_best_oa_locations.csv.gz' csv header +\copy openalex.works_authorships (work_id, author_position, author_id, institution_id, raw_affiliation_string) from program 'gunzip -c /app/csv-files/works_authorships.csv.gz' csv header +\copy openalex.works_biblio (work_id, volume, issue, first_page, last_page) from program 'gunzip -c /app/csv-files/works_biblio.csv.gz' csv header +\copy openalex.works_topics (work_id, topic_id, score) from program 'gunzip -c /app/csv-files/works_topics.csv.gz' csv header +\copy openalex.works_concepts (work_id, concept_id, score) from program 'gunzip -c /app/csv-files/works_concepts.csv.gz' csv header +\copy openalex.works_ids (work_id, openalex, doi, mag, pmid, pmcid) from program 'gunzip -c /app/csv-files/works_ids.csv.gz' csv header +\copy openalex.works_mesh (work_id, descriptor_ui, descriptor_name, qualifier_ui, qualifier_name, is_major_topic) from program 'gunzip -c /app/csv-files/works_mesh.csv.gz' csv header +\copy openalex.works_open_access (work_id, is_oa, oa_status, oa_url, any_repository_has_fulltext) from program 'gunzip -c /app/csv-files/works_open_access.csv.gz' csv header +\copy openalex.works_referenced_works (work_id, referenced_work_id) from program 'gunzip -c /app/csv-files/works_referenced_works.csv.gz' csv header +\copy openalex.works_related_works (work_id, related_work_id) from program 'gunzip -c /app/csv-files/works_related_works.csv.gz' csv header \ No newline at end of file