diff --git a/common/inc/common.h b/common/inc/common.h index 637403a..e5bbde5 100644 --- a/common/inc/common.h +++ b/common/inc/common.h @@ -36,13 +36,14 @@ typedef uint32_t delta_info_t; * @brief Coordonates of the read that matched in the reference genome. */ typedef struct { - union { - uint64_t coord; + //union { + //uint64_t coord; struct { uint32_t seed_nr; - uint32_t seq_nr; + uint32_t seq_nr:31; + uint32_t nodp:1; }; - }; + //}; } dpu_result_coord_t; /** @@ -119,4 +120,20 @@ typedef struct { uint8_t nbr[ALIGN_DPU(SIZE_NEIGHBOUR_IN_BYTES)]; } coords_and_nbr_t; + +// configuration for variant calling using frequency table + +// to activate use of reads with indels +#define USE_INDEL +// to activate use of mapq score +//#define USE_MAPQ_SCORE + +// various parameters/thresholds +#define DIST_PAIR_THRESHOLD 1 +#define DIST_SINGLE_THRESHOLD 0 +#define MAPQ_SCALING_FACTOR 2 +#define READ_DIST_LOWER_BOUND 50 +#define READ_DIST_UPPER_BOUND 2000 +#define depth_filter depth_filter3 + #endif /* __COMMON_H__ */ diff --git a/dpu/inc/dout.h b/dpu/inc/dout.h index 490cd0d..0095d3a 100644 --- a/dpu/inc/dout.h +++ b/dpu/inc/dout.h @@ -68,8 +68,9 @@ void dout_init(unsigned int tid, dout_t *dout); * @param seed_nr Recorded seed number. * @param seq_nr Recorded sequence number. * @param stats To update statistical report. + * @param nodp True if the result was from nodp, false if from odpd. */ -void dout_add(dout_t *dout, uint32_t num, unsigned int score, uint32_t seed_nr, uint32_t seq_nr, dpu_tasklet_stats_t *stats); +void dout_add(dout_t *dout, uint32_t num, unsigned int score, uint32_t seed_nr, uint32_t seq_nr, dpu_tasklet_stats_t *stats, uint8_t nodp); /** * @brief locates a swap page for a given data out structure. diff --git a/dpu/src/dout.c b/dpu/src/dout.c index 97cdcc7..18cafaa 100644 --- a/dpu/src/dout.c +++ b/dpu/src/dout.c @@ -25,7 +25,7 @@ void dout_init(unsigned int tid, dout_t *dout) dout_clear(dout); } -void dout_add(dout_t *dout, uint32_t num, unsigned int score, uint32_t seed_nr, uint32_t seq_nr, dpu_tasklet_stats_t *stats) +void dout_add(dout_t *dout, uint32_t num, unsigned int score, uint32_t seed_nr, uint32_t seq_nr, dpu_tasklet_stats_t *stats, uint8_t nodp) { dpu_result_out_t *new_out; if (dout->nb_cached_out == MAX_LOCAL_RESULTS_PER_READ) { @@ -49,6 +49,7 @@ void dout_add(dout_t *dout, uint32_t num, unsigned int score, uint32_t seed_nr, new_out->score = score; new_out->coord.seed_nr = seed_nr; new_out->coord.seq_nr = seq_nr; + new_out->coord.nodp = nodp; dout->nb_cached_out++; dout->nb_results++; diff --git a/dpu/src/odpd_opt.S b/dpu/src/odpd_opt.S index 3b5a4c2..feb5760 100644 --- a/dpu/src/odpd_opt.S +++ b/dpu/src/odpd_opt.S @@ -6,7 +6,7 @@ #define COST_SUB 10 #define COST_GAPO 11 #define COST_GAPE 1 -#define COST_INIT 99 +#define COST_INIT 999 #define LINE_SIZE ( 6*4 ) #define d0off ( 0*4 ) diff --git a/dpu/src/task.c b/dpu/src/task.c index cae848e..949bfe9 100644 --- a/dpu/src/task.c +++ b/dpu/src/task.c @@ -55,7 +55,11 @@ __host dpu_compute_time_t DPU_COMPUTE_TIME_VAR; /** * @brief Maximum score allowed. */ -#define MAX_SCORE (40) +#if SIZE_READ>120 +#define MAX_SCORE 40 +#else +#define MAX_SCORE 40 +#endif /** * @brief Number of reference read to be fetch per mram read @@ -121,6 +125,10 @@ static void compare_neighbours(sysname_t tasklet_id, uint32_t *mini, coords_and_ STATS_STORE_NODP_TIME(tasklet_stats, (end + acc - start)); STATS_INCR_NB_NODP_CALLS(*tasklet_stats); + bool nodp = true; + + //TODO uncomment for indel +#ifdef USE_INDEL if (score_nodp == UINT_MAX) { STATS_GET_START_TIME(start, acc, end); @@ -129,7 +137,9 @@ static void compare_neighbours(sysname_t tasklet_id, uint32_t *mini, coords_and_ STATS_GET_END_TIME(end, acc); STATS_STORE_ODPD_TIME(tasklet_stats, (end + acc - start)); STATS_INCR_NB_ODPD_CALLS(*tasklet_stats); + nodp = false; } +#endif if (score > *mini) { return; @@ -148,7 +158,7 @@ static void compare_neighbours(sysname_t tasklet_id, uint32_t *mini, coords_and_ } dout_add(dout, request->num, (unsigned int)score, cached_coords_and_nbr->coord.seed_nr, cached_coords_and_nbr->coord.seq_nr, - tasklet_stats); + tasklet_stats, nodp); } static void compute_request(sysname_t tasklet_id, coords_and_nbr_t *cached_coords_and_nbr, uint8_t *current_read_nbr, diff --git a/host/CMakeLists.txt b/host/CMakeLists.txt index c083595..1bdbfb6 100644 --- a/host/CMakeLists.txt +++ b/host/CMakeLists.txt @@ -21,7 +21,7 @@ file(GLOB_RECURSE SOURCES src/*.c) add_executable(upvc ${SOURCES}) target_include_directories(upvc PUBLIC "${DPU_HOST_INCLUDE_DIRECTORIES}" inc/ ../common/inc/) -target_link_libraries(upvc ${DPU_HOST_LIBRARIES} pthread) +target_link_libraries(upvc ${DPU_HOST_LIBRARIES} pthread m) set(NB_DPU_MARK) if (NB_DPU) diff --git a/host/inc/accumulateread.h b/host/inc/accumulateread.h index 74f5c59..8be55a4 100644 --- a/host/inc/accumulateread.h +++ b/host/inc/accumulateread.h @@ -6,6 +6,7 @@ #define __ACCUMULATEREAD_H__ #include "common.h" +#include typedef struct { nb_result_t nb_res; @@ -13,7 +14,7 @@ typedef struct { } acc_results_t; acc_results_t *accumulate_get_buffer(unsigned int dpu_id, unsigned int pass_id); -acc_results_t accumulate_get_result(unsigned int pass_id); +acc_results_t accumulate_get_result(unsigned int pass_id, bool free_results); void accumulate_read(unsigned int pass_id, unsigned int dpu_offset); diff --git a/host/inc/debug.h b/host/inc/debug.h new file mode 100644 index 0000000..06dc925 --- /dev/null +++ b/host/inc/debug.h @@ -0,0 +1,84 @@ +#ifndef __DEBUG_H__ +#define __DEBUG_H__ +#include + +#define V_QUIET 0 +#define V_FATAL 1 +#define V_ERROR 2 +#define V_WARN 3 +#define V_INFO 4 +#define V_DEBUG 5 +#define V_TRACE 6 + +#define VERBOSE V_WARN +#define VERBOSE_COLORS true +#define VERBOSE_LOG_LEVEL true +#define VERBOSE_TIMESTAMP true + +#if VERBOSE_COLORS +#define VERBOSE_COLOR_START_FATAL "\033[41m" +#define VERBOSE_COLOR_START_ERROR "\033[31m" +#define VERBOSE_COLOR_START_WARN "\033[33m" +#define VERBOSE_COLOR_START_INFO "\033[32m" +#define VERBOSE_COLOR_START_DEBUG "\033[34m" +#define VERBOSE_COLOR_START_TRACE "\033[36m" +#define VERBOSE_COLOR_END "\033[0m" +#else +#define VERBOSE_COLOR_START_FATAL +#define VERBOSE_COLOR_START_ERROR +#define VERBOSE_COLOR_START_WARN +#define VERBOSE_COLOR_START_INFO +#define VERBOSE_COLOR_START_DEBUG +#define VERBOSE_COLOR_START_TRACE +#define VERBOSE_COLOR_END +#endif + +#if VERBOSE_LOG_LEVEL +#define VERBOSE_PRINT_PREFIX(level) VERBOSE_COLOR_START_##level #level "\t" VERBOSE_COLOR_END +#else +#define VERBOSE_PRINT_PREFIX(level) +#endif + +#if VERBOSE_TIMESTAMP +#define VERBOSE_PRINT_TIMESTAMP() float t= (float) clock()/CLOCKS_PER_SEC; fprintf(stderr, "(%02d:%02d:%02.3f)", (int) (t/3600), (int) (t/60)%60, (float) ((int) (t*1000)%60000)/1000.); +#else +#define VERBOSE_PRINT_TIMESTAMP() +#endif + +#if VERBOSE>=V_TRACE +#define LOG_TRACE(string, ...) {VERBOSE_PRINT_TIMESTAMP() fprintf(stderr, VERBOSE_PRINT_PREFIX(TRACE) string, ## __VA_ARGS__);} +#else +#define LOG_TRACE(...) +#endif + +#if VERBOSE>=V_DEBUG +#define LOG_DEBUG(string, ...) {VERBOSE_PRINT_TIMESTAMP() fprintf(stderr, VERBOSE_PRINT_PREFIX(DEBUG) string, ## __VA_ARGS__);} +#else +#define LOG_DEBUG(...) +#endif + +#if VERBOSE>=V_INFO +#define LOG_INFO(string, ...) {VERBOSE_PRINT_TIMESTAMP() fprintf(stderr, VERBOSE_PRINT_PREFIX(INFO) string, ## __VA_ARGS__);} +#else +#define LOG_INFO(...) +#endif + +#if VERBOSE>=V_WARN +#define LOG_WARN(string, ...) {VERBOSE_PRINT_TIMESTAMP() fprintf(stderr, VERBOSE_PRINT_PREFIX(WARN) string, ## __VA_ARGS__);} +#else +#define LOG_WARN(...) +#endif + +#if VERBOSE>=V_ERROR +#define LOG_ERROR(string, ...) {VERBOSE_PRINT_TIMESTAMP() fprintf(stderr, VERBOSE_PRINT_PREFIX(ERROR) string, ## __VA_ARGS__);} +#else +#define LOG_ERROR(...) +#endif + +#if VERBOSE>=V_FATAL +#define LOG_FATAL(string, ...) {VERBOSE_PRINT_TIMESTAMP() fprintf(stderr, VERBOSE_PRINT_PREFIX(FATAL) string, ## __VA_ARGS__);} +#else +#define LOG_FATAL(...) +#endif + +#endif // __DEBUG_H__ diff --git a/host/inc/genome.h b/host/inc/genome.h index eb5bbe6..7fe6b04 100644 --- a/host/inc/genome.h +++ b/host/inc/genome.h @@ -6,6 +6,7 @@ #define __GENOME_H__ #include +#include #define MAX_SEQ_GEN (24) // max number of chromosomes #define MAX_SEQ_NAME_SIZE (8) @@ -13,12 +14,12 @@ typedef struct { uint32_t magic; uint32_t version; - uint32_t nb_seq; - uint64_t pt_seq[MAX_SEQ_GEN]; - uint64_t len_seq[MAX_SEQ_GEN]; + uint32_t nb_seq; // nb of chromosomes (24) + uint64_t pt_seq[MAX_SEQ_GEN]; // offset of each chromosome in data + uint64_t len_seq[MAX_SEQ_GEN]; // length of chromosome in data uint64_t fasta_file_size; char seq_name[MAX_SEQ_GEN][MAX_SEQ_NAME_SIZE]; - int8_t *data; + int8_t *data; //genome de reference 1B = 1 nucleotide int32_t *mapping_coverage; } genome_t; @@ -30,4 +31,31 @@ void genome_free(); genome_t *genome_get(); +struct frequency_info { + + float freq; + unsigned int score; + unsigned int unsure_score; +}; + +#pragma pack(push,1) +struct variants_codependence_info { + int16_t key; + uint8_t codependence_count; +}; + +#define COD_LIST_SIZE 4 +struct variants_codependence_info_list { + struct variants_codependence_info_list* next_list; + struct variants_codependence_info content[COD_LIST_SIZE]; +}; +#pragma pack(pop) + +void add_codependence_info(struct variants_codependence_info_list** next_variants_info_list, int16_t other_index_delta, uint8_t current_letter, uint8_t other_letter, unsigned int genome_size, pthread_mutex_t* mutex); + +struct frequency_info** get_frequency_table(); +struct variants_codependence_info_list** get_codependence_table(pthread_mutex_t* mutex); +void free_frequency_table(); +void free_codependence_chunks(); + #endif /* __GENOME_H__ */ diff --git a/host/inc/getread.h b/host/inc/getread.h index e2654ea..58b8253 100644 --- a/host/inc/getread.h +++ b/host/inc/getread.h @@ -11,6 +11,7 @@ int get_reads_in_buffer(unsigned int pass_id); int8_t *get_reads_buffer(unsigned int pass_id); +float *get_reads_quality_buffer(unsigned int pass_id); void get_reads(FILE *fpe1, FILE *fpe2, unsigned int pass_id); diff --git a/host/inc/mapping_file.h b/host/inc/mapping_file.h new file mode 100644 index 0000000..715369a --- /dev/null +++ b/host/inc/mapping_file.h @@ -0,0 +1,16 @@ +/** + * Copyright 2021 - A Moisson-Franckhauser & UPMEM + */ +#ifndef __SAM_H__ +#define __SAM_H__ + +#include "processread.h" + +void open_mapping_file(); +//TODO : either reuse this code or delete it +//void write_mapping_read(uint64_t genome_pos, uint8_t *code, int8_t *read); +void write_read_mapping_from_backtrack(char *chromosome_name, uint64_t genome_pos, backtrack_t *backtrack_end, int8_t *read, int read_id); +void write_read_mapping(char *chromosome_name, uint64_t genome_pos, uint8_t *code, uint8_t *read); +void close_mapping_file(); + +#endif /* __SAM_H__ */ diff --git a/host/inc/parse_args.h b/host/inc/parse_args.h index 2514826..3c95fe3 100644 --- a/host/inc/parse_args.h +++ b/host/inc/parse_args.h @@ -36,6 +36,8 @@ unsigned int get_nb_thread_for_simu(); bool get_index_with_dpus(); +bool get_use_frequency_table(); + /** * @brief Parse and validate the argument of the application. */ diff --git a/host/inc/processread.h b/host/inc/processread.h index acb3cb3..dae9cc3 100644 --- a/host/inc/processread.h +++ b/host/inc/processread.h @@ -7,6 +7,20 @@ #include +typedef struct { + int type; + int ix; + int jx; +} backtrack_t; + + +#define CODE_MATCH 0 +#define CODE_SUB 10 +#define CODE_DEL 11 +#define CODE_INS 12 +#define CODE_END 13 +#define CODE_ERR 14 + void process_read(FILE *fpe1, FILE *fpe2, int round, unsigned int pass_id); void process_read_init(); diff --git a/host/inc/profiling.h b/host/inc/profiling.h new file mode 100644 index 0000000..8028c6a --- /dev/null +++ b/host/inc/profiling.h @@ -0,0 +1,109 @@ +#ifndef __PROFILING_H__ +#define __PROFILING_H__ + +#include + +#define PROFILE_STAT false +#define STAT_MAX_SUBSTEPS 10 + +struct time_stat_t { + clock_t total_time; + unsigned int number_calls; + clock_t substep_total_times[STAT_MAX_SUBSTEPS]; +}; + +struct time_stat_t profiling[25]; + +#define STAT_SUB_ONLY_PATH 0 +#define STAT_DPD 1 +#define STAT_CODE_ALIGNMENT 2 +#define STAT_ADD_TO_NON_MAPPED_READ 3 +#define STAT_GET_READ_UPDATE_POSITIONS 4 +#define STAT_UPDATE_FREQUENCY_TABLE 5 +#define STAT_DO_PROCESS_READ 6 +#define STAT_PROCESS_READ 7 +#define STAT_EXEC_ROUND 8 +#define STAT_EXEC_DPUS 9 +#define STAT_THREAD_GET_READS 10 +#define STAT_THREAD_DISPATCH 11 +#define STAT_THREAD_ACC 12 +#define STAT_THREAD_PROCESS 13 +#define STAT_DO_MAPPING 14 +#define STAT_ADD_CODEPENDENCE_INFO 15 +#define STAT_GET_NEW_CODEPENDENCE_INFO 16 +#define STAT_ALLOCATE_NEW_CHUNK 17 + +#define PRINT_MICROSECONDS(t) \ + if (t 0) { \ + printf("\t\t%d:\t", i); \ + PRINT_MICROSECONDS(profiling[FUNCTION].substep_total_times[i]) \ + printf("\n"); \ + } \ + } + +#define PRINT_ALL_FUNCTION_STAT() \ + printf("\nprofiling:\n\n"); \ + PRINT_FUNCTION_STAT(STAT_DO_MAPPING); \ + PRINT_FUNCTION_STAT(STAT_THREAD_PROCESS); \ + PRINT_FUNCTION_STAT(STAT_THREAD_ACC); \ + PRINT_FUNCTION_STAT(STAT_THREAD_DISPATCH); \ + PRINT_FUNCTION_STAT(STAT_THREAD_GET_READS); \ + PRINT_FUNCTION_STAT(STAT_EXEC_DPUS); \ + PRINT_FUNCTION_STAT(STAT_EXEC_ROUND); \ + PRINT_FUNCTION_STAT(STAT_PROCESS_READ); \ + PRINT_FUNCTION_STAT(STAT_DO_PROCESS_READ); \ + PRINT_FUNCTION_STAT(STAT_ADD_TO_NON_MAPPED_READ); \ + PRINT_FUNCTION_STAT(STAT_UPDATE_FREQUENCY_TABLE); \ + PRINT_FUNCTION_STAT(STAT_GET_READ_UPDATE_POSITIONS); \ + PRINT_FUNCTION_STAT(STAT_CODE_ALIGNMENT); \ + PRINT_FUNCTION_STAT(STAT_SUB_ONLY_PATH); \ + PRINT_FUNCTION_STAT(STAT_DPD); \ + PRINT_FUNCTION_STAT(STAT_ADD_CODEPENDENCE_INFO); \ + PRINT_FUNCTION_STAT(STAT_GET_NEW_CODEPENDENCE_INFO); \ + PRINT_FUNCTION_STAT(STAT_ALLOCATE_NEW_CHUNK); + +#else + +#define STAT_RECORD_START(FUNCTION) +#define STAT_RECORD_STEP(FUNCTION, STEP_N) +#define STAT_RECORD_LAST_STEP(FUNCTION, STEP_N) +#define PRINT_FUNCTION_STAT(FUNCTION) +#define PRINT_ALL_FUNCTION_STAT() + +#endif + + +#endif /* __PROFILING_H__ */ diff --git a/host/inc/upvc.h b/host/inc/upvc.h index 544a6b6..1dfa9f2 100644 --- a/host/inc/upvc.h +++ b/host/inc/upvc.h @@ -7,8 +7,8 @@ #define VERSION "VERSION 1.8" #define MAX_READS_BUFFER (512 * 1024) /* Maximum number of read by round */ -#define NB_READS_BUFFER (16) /* To be increase if enough legacy memory available */ -#define NB_DISPATCH_AND_ACC_BUFFER (4) /* To be increase if enough legacy memory available */ +#define NB_READS_BUFFER (128) /* To be increase if enough legacy memory available */ +#define NB_DISPATCH_AND_ACC_BUFFER (32) /* To be increase if enough legacy memory available */ #define NB_ROUND (1) #define COST_SUB 10 diff --git a/host/inc/vartree.h b/host/inc/vartree.h index efc9c20..bb4feaa 100644 --- a/host/inc/vartree.h +++ b/host/inc/vartree.h @@ -5,15 +5,15 @@ #ifndef __VARTREE_H__ #define __VARTREE_H__ -#define MAX_SIZE_ALLELE 8 +#define MAX_SIZE_ALLELE 21 #include typedef struct variant { uint32_t score; uint32_t depth; - char ref[MAX_SIZE_ALLELE]; - char alt[MAX_SIZE_ALLELE]; + char ref[MAX_SIZE_ALLELE+1]; + char alt[MAX_SIZE_ALLELE+1]; struct variant *next; } variant_t; diff --git a/host/src/accumulateread.c b/host/src/accumulateread.c index 6b81477..95b401c 100644 --- a/host/src/accumulateread.c +++ b/host/src/accumulateread.c @@ -6,6 +6,8 @@ #include "common.h" #include "index.h" #include "upvc.h" +#include "profiling.h" +#include "debug.h" #include #include @@ -158,7 +160,7 @@ static void merge_bucket_and_acc_list( } } -acc_results_t accumulate_get_result(unsigned int pass_id) +acc_results_t accumulate_get_result(unsigned int pass_id, bool free_results) { if (result_file[pass_id] == NULL) { static const dpu_result_out_t dummy_res = { .num = -1 }; @@ -175,17 +177,23 @@ acc_results_t accumulate_get_result(unsigned int pass_id) size_t size = ftell(result_file[pass_id]); rewind(result_file[pass_id]); + LOG_INFO("allocating %lu to accumulate results\n", size); dpu_result_out_t *results = (dpu_result_out_t *)malloc(size); assert(results != NULL); size_t size_read = fread(results, size, 1, result_file[pass_id]); assert(size_read == 1); + if (free_results) { + fclose(result_file[pass_id]); + result_file[pass_id] = NULL; + } return (acc_results_t) { .nb_res = (size / sizeof(dpu_result_out_t)) - 1, .results = results }; } void accumulate_read(unsigned int pass_id, unsigned int dpu_offset) { printf("DPU_OFFSET: %u - PASS_ID: %u\n", dpu_offset, pass_id); + PRINT_ALL_FUNCTION_STAT(); nb_dpus_used_current_run = MIN(index_get_nb_dpu() - dpu_offset, nb_dpus_per_run); acc_res = RESULTS_BUFFERS(pass_id); @@ -206,6 +214,7 @@ void accumulate_read(unsigned int pass_id, unsigned int dpu_offset) return; } + LOG_INFO("allocating %lu for bucket_elems\n", sizeof(bucket_elem_t) * total_nb_res); bucket_elems = (bucket_elem_t *)malloc(sizeof(bucket_elem_t) * total_nb_res); assert(bucket_elems != NULL); @@ -232,11 +241,12 @@ void accumulate_read(unsigned int pass_id, unsigned int dpu_offset) } // Get data from FILE * - acc_results_t acc_res_from_file = accumulate_get_result(pass_id); + acc_results_t acc_res_from_file = accumulate_get_result(pass_id, false); unsigned int nb_read = acc_res_from_file.nb_res + total_nb_res; size_t size = sizeof(dpu_result_out_t) * (nb_read + 1); // Alloc the merged and sorted tab of result for this pass + LOG_INFO("allocating %lu for dpu results\n", size); dpu_result_out_t *merged_result_tab = malloc(size); assert(merged_result_tab != NULL); @@ -290,6 +300,7 @@ void accumulate_init(unsigned int max_nb_pass) result_file = (FILE **)calloc(nb_pass, sizeof(FILE *)); assert(result_file != NULL); + LOG_INFO("allocating %lu for results_buffers\n", sizeof(acc_results_t) * nb_dpus_per_run * NB_DISPATCH_AND_ACC_BUFFER); for (unsigned int each_pass = 0; each_pass < NB_DISPATCH_AND_ACC_BUFFER; each_pass++) { results_buffers[each_pass] = (acc_results_t *)malloc(sizeof(acc_results_t) * nb_dpus_per_run); assert(results_buffers[each_pass] != NULL); diff --git a/host/src/dpu_backend.c b/host/src/dpu_backend.c index 4db32e2..c1176e1 100644 --- a/host/src/dpu_backend.c +++ b/host/src/dpu_backend.c @@ -251,7 +251,7 @@ void run_on_dpu(unsigned int dpu_offset, unsigned int pass_id, sem_t *dispatch_f void init_backend_dpu(unsigned int *nb_dpus_per_run) { - const char *profile = "cycleAccurate=true,nrJobsPerRank=64"; + const char *profile = "cycleAccurate=true,nrJobsPerRank=64,dispatchOnAllRanks=true"; DPU_ASSERT(dpu_alloc(get_nb_dpu(), profile, &devices.all_ranks)); DPU_ASSERT(dpu_load_from_incbin(devices.all_ranks, &upvc_dpu_program, NULL)); diff --git a/host/src/genome.c b/host/src/genome.c index 92b561a..157f915 100644 --- a/host/src/genome.c +++ b/host/src/genome.c @@ -7,10 +7,13 @@ #include #include #include +#include #include "genome.h" #include "parse_args.h" #include "upvc.h" +#include "debug.h" +#include "profiling.h" #define MAX_BUF_SIZE (1024) #define GENOME_BINARY "genome.bin" @@ -39,6 +42,7 @@ void genome_load() && "Wrong header, make sure you have generated your MRAMs with the same version of UPVC that you are using."); assert(genome.version == GENOME_VERSION && "Could not load a genome generated with a different version of UPVC."); + LOG_INFO("allocating genome data (%luMB + %luMB)\n", sizeof(int8_t) * genome.fasta_file_size/1000000, sizeof(int32_t) * genome.fasta_file_size/1000000); genome.data = (int8_t *)malloc(sizeof(int8_t) * genome.fasta_file_size); genome.mapping_coverage = (int32_t *)calloc(sizeof(int32_t), genome.fasta_file_size); assert(genome.data != NULL && genome.mapping_coverage != NULL); @@ -69,6 +73,7 @@ void genome_create() genome.fasta_file_size = ftell(genome_file); rewind(genome_file); + LOG_INFO("allocating genome data (%luMB)\n", sizeof(int8_t) * genome.fasta_file_size/1000000); genome.data = (int8_t *)malloc(sizeof(int8_t) * genome.fasta_file_size); assert(genome.data != NULL); genome.nb_seq = 0; @@ -114,3 +119,140 @@ void genome_free() free(genome.data); free(genome.mapping_coverage); } + +/** + * Frequency table + * 5 entries (A, C, T, G, -) + * Each entry is a table of the size of the reference genome + **/ +static struct frequency_info* frequency_table[5]; +static struct variants_codependence_info_list** variant_codependences; +#define NB_CODEPENDENCE_CHUNK_DIV (1<<10) +struct codependence_chunk* last_allocated_codependence_chunk[NB_CODEPENDENCE_CHUNK_DIV]; +static bool init_frequency_table = false; +static bool init_codependence_table = false; + +#define ALLOC_CHUNK_SIZE 100 +struct codependence_chunk { + struct codependence_chunk* previous_chunk; + struct codependence_chunk* next_chunk; + unsigned int next_slot_free; + struct variants_codependence_info_list codependence_info_lists[ALLOC_CHUNK_SIZE]; +}; + +static void allocate_new_codependence_chunk(int i) { + STAT_RECORD_START(STAT_ALLOCATE_NEW_CHUNK); + LOG_INFO("allocating codependence_chunk (%lu)\n", sizeof(struct codependence_chunk)); + struct codependence_chunk* new_chunk = calloc(1, sizeof(struct codependence_chunk)); + assert(new_chunk != NULL); + new_chunk->previous_chunk = last_allocated_codependence_chunk[i]; + new_chunk->next_slot_free = 0; + last_allocated_codependence_chunk[i] = new_chunk; + STAT_RECORD_LAST_STEP(STAT_ALLOCATE_NEW_CHUNK, 0); +} + +static struct variants_codependence_info_list* get_new_codependence_info_list(int i) { + STAT_RECORD_START(STAT_GET_NEW_CODEPENDENCE_INFO); + if (last_allocated_codependence_chunk[i]->next_slot_free >= ALLOC_CHUNK_SIZE) { + allocate_new_codependence_chunk(i); + } + STAT_RECORD_LAST_STEP(STAT_GET_NEW_CODEPENDENCE_INFO, 0); + return &(last_allocated_codependence_chunk[i]->codependence_info_lists[(last_allocated_codependence_chunk[i]->next_slot_free)++]); +} + +void add_codependence_info(struct variants_codependence_info_list** next_variants_info_list, int16_t other_index_delta, uint8_t current_letter, uint8_t other_letter, unsigned int genome_size, pthread_mutex_t* mutex) +{ + STAT_RECORD_START(STAT_ADD_CODEPENDENCE_INFO); + //struct variants_codependence_info_list** next_variants_info_list = &(freq_info->variants_codependence_list); + struct variants_codependence_info_list* last_variants_info_list = NULL; + int16_t key = (int16_t) ((other_index_delta<<4) | ((other_letter&0x3)<<2) | (current_letter&0x3)); + for (;*next_variants_info_list != NULL; next_variants_info_list = &((*next_variants_info_list)->next_list)) { + for (int i=0; icontent[i].key == key) { + if ((*next_variants_info_list)->content[i].codependence_count < UINT8_MAX) + (*next_variants_info_list)->content[i].codependence_count++; + STAT_RECORD_LAST_STEP(STAT_ADD_CODEPENDENCE_INFO, 0); + return; + } + } + last_variants_info_list = *next_variants_info_list; + } + if (last_variants_info_list != NULL) { + for (int i=0; icontent[i].key == 0) { + last_variants_info_list->content[i].key = key; + last_variants_info_list->content[i].codependence_count = 1; + STAT_RECORD_LAST_STEP(STAT_ADD_CODEPENDENCE_INFO, 1); + return; + } + } + } + int chunk_div_index = (other_index_delta*NB_CODEPENDENCE_CHUNK_DIV)/genome_size; + STAT_RECORD_STEP(STAT_ADD_CODEPENDENCE_INFO, 2); + pthread_mutex_lock(mutex); + STAT_RECORD_STEP(STAT_ADD_CODEPENDENCE_INFO, 3); + struct variants_codependence_info_list* new_info_list = get_new_codependence_info_list(chunk_div_index); + pthread_mutex_unlock(mutex); + new_info_list->next_list = NULL; + new_info_list->content[0].key = key; + new_info_list->content[0].codependence_count = 1; + *next_variants_info_list = new_info_list; + STAT_RECORD_LAST_STEP(STAT_ADD_CODEPENDENCE_INFO, 4); +} + +struct frequency_info** get_frequency_table() { + + if(!init_frequency_table) { + init_frequency_table = true; + LOG_INFO("allocating frequency_table (5x%luMB)\n", sizeof(struct frequency_info)*genome.fasta_file_size/1000000); + // allocate frequency_table on first call + for(int i = 0; i < 5; ++i) { + frequency_table[i] = (struct frequency_info*)calloc(genome.fasta_file_size, sizeof(struct frequency_info)); + assert(frequency_table[i] != NULL); + } + } + return frequency_table; +} + +struct variants_codependence_info_list** get_codependence_table(pthread_mutex_t* mutex) { + if(!init_codependence_table) { + pthread_mutex_lock(mutex); + if (!init_codependence_table) { + LOG_INFO("allocating variant codependences lists (%luMB)\n", genome.fasta_file_size*sizeof(void*)/1000000); + variant_codependences = (struct variants_codependence_info_list**)calloc(genome.fasta_file_size, sizeof(void*)); + assert(variant_codependences != NULL); + for (int i=0; iprevious_chunk; + free(last_allocated_codependence_chunk[i]); + } + } + init_codependence_table = false; + } +} + diff --git a/host/src/getread.c b/host/src/getread.c index e3a8197..6e06645 100644 --- a/host/src/getread.c +++ b/host/src/getread.c @@ -4,11 +4,14 @@ #include #include +#include #include #include #include +#include #include "common.h" +#include "debug.h" #include "getread.h" #include "upvc.h" @@ -17,18 +20,24 @@ static int nb_reads[NB_READS_BUFFER]; static int8_t *reads_buffers[NB_READS_BUFFER]; +static float *reads_quality_buffers[NB_READS_BUFFER]; +static float quality_lookup_table[43]; +static bool lookup = false; #define PASS(pass_id) (pass_id % NB_READS_BUFFER) +#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) + /** * @brief Parse the file "f" to get the next read in the file and its pair. * - * @param f File to parse. - * @param read1 Output the next read in the file. - * @param read2 Output the pair of the next read in the file. + * @param f File to parse. + * @param read1 Output the next read in the file. + * @param read2 Output the pair of the next read in the file. + * @param read_quality_factor Output the quality factor of the read. * * @return The size of the read. */ -static int get_seq_fast_AQ(FILE *f, int8_t *read1, int8_t *read2) +static int get_seq_fast_AQ(FILE *f, int8_t *read1, int8_t *read2, float *read_quality_factor) { static const int invnt[4] = { 2, 3, 0, 1 }; int offset = 0; @@ -46,10 +55,10 @@ static int get_seq_fast_AQ(FILE *f, int8_t *read1, int8_t *read2) * it means that we need the skip the first 14 characters of the read. */ if (comment[1] == '>') { - sscanf(&comment[2], "%d", &offset); - } - int i; - for (i = 0; i < SIZE_READ - offset; i++) { + sscanf(&comment[2], "%d", &offset); + } + int i; + for (i = 0; i < SIZE_READ - offset; i++) { read1[i] = (((int)sequence_buffer[i]) >> 1) & 3; read2[SIZE_READ - i - 1 - offset] = invnt[read1[i]]; } @@ -64,9 +73,18 @@ static int get_seq_fast_AQ(FILE *f, int8_t *read1, int8_t *read2) if (fgets(comment, MAX_BUF_SIZE, f) == NULL) { /* Commentary */ return -1; } - if (fgets(sequence_buffer, MAX_SEQ_SIZE, f) == NULL) { /* Line with sequence quality information (unused) */ + if (fgets(sequence_buffer, MAX_SEQ_SIZE, f) == NULL) { /* Line with sequence quality information */ return -1; } + // store quality information + for (i = 0; i < SIZE_READ - offset; i++) { + int Q = sequence_buffer[i]; + read_quality_factor[i] = quality_lookup_table[Q-33]; + } + for (; i < SIZE_READ; i++) { + read_quality_factor[i] = 1.0f; + } + return SIZE_READ; } @@ -75,16 +93,36 @@ void get_reads(FILE *fpe1, FILE *fpe2, unsigned int pass_id) int nb_read = 0; pass_id = PASS(pass_id); + if(!lookup) { + + for(uint8_t i = 33; i < 76; ++i) { + quality_lookup_table[i - 33] = MAX(0.001f, 1.0f - pow(10.0f, -(i-33)/10.0f)); + /*printf("%d %f\n", i, quality_lookup_table[i - 33]);*/ + } + lookup = true; + } + int8_t *reads_buffer = reads_buffers[pass_id]; + float* reads_quality_buffer = reads_quality_buffers[pass_id]; if (reads_buffer == NULL) { + LOG_INFO("allocating %lu for reads_buffer\n", MAX_READS_BUFFER*SIZE_READ*(1+sizeof(float))); reads_buffer = (int8_t *)malloc(MAX_READS_BUFFER * SIZE_READ); + reads_quality_buffer = (float *)malloc(MAX_READS_BUFFER/2 * SIZE_READ * sizeof(float)); assert(reads_buffer != NULL); + assert(reads_quality_buffer != NULL); reads_buffers[pass_id] = reads_buffer; + reads_quality_buffers[pass_id] = reads_quality_buffer; } while (nb_read < MAX_READS_BUFFER) { - if ((get_seq_fast_AQ(fpe1, &reads_buffer[(nb_read + 0) * SIZE_READ], &reads_buffer[(nb_read + 1) * SIZE_READ]) <= 0) - || (get_seq_fast_AQ(fpe2, &reads_buffer[(nb_read + 2) * SIZE_READ], &reads_buffer[(nb_read + 3) * SIZE_READ]) <= 0)) + /* + if (pass_id>=20) // FIXME : remove this condition used for debugging + break; + */ + if ((get_seq_fast_AQ(fpe1, &reads_buffer[(nb_read + 0) * SIZE_READ], &reads_buffer[(nb_read + 1) * SIZE_READ], + &reads_quality_buffer[nb_read/2 * SIZE_READ]) <= 0) + || (get_seq_fast_AQ(fpe2, &reads_buffer[(nb_read + 2) * SIZE_READ], &reads_buffer[(nb_read + 3) * SIZE_READ], + &reads_quality_buffer[(nb_read/2 + 1) * SIZE_READ]) <= 0)) break; nb_read += 4; } @@ -95,6 +133,7 @@ void get_reads(FILE *fpe1, FILE *fpe2, unsigned int pass_id) int get_reads_in_buffer(unsigned int pass_id) { return nb_reads[PASS(pass_id)]; } int8_t *get_reads_buffer(unsigned int pass_id) { return reads_buffers[PASS(pass_id)]; } +float *get_reads_quality_buffer(unsigned int pass_id) { return reads_quality_buffers[PASS(pass_id)]; } int get_input_info(FILE *f, size_t *read_size, size_t *nb_read) { diff --git a/host/src/mapping_file.c b/host/src/mapping_file.c new file mode 100644 index 0000000..8e33737 --- /dev/null +++ b/host/src/mapping_file.c @@ -0,0 +1,183 @@ +/** + * Copyright 2021 - A. Moisson-Franckhauser & UPMEM + */ + +#include +#include + +#include "index.h" +#include "mapping_file.h" +#include "genome.h" +#include "common.h" +#include "parse_args.h" +#include "processread.h" +#include "debug.h" + +#define MAP_FILENAME "read_alignments.map" +#define MAP_VERSION "1.6" +#define PROGRAM_NAME "upvc" +#define MAX_CIGAR_LENGTH (3*SIZE_READ) +#define MAX_PATCH_LENGTH (3*SIZE_READ) + +#define CIGAR_MATCH '=' +#define CIGAR_MISMATCH 'X' +#define CIGAR_INSERT 'I' +#define CIGAR_DELETE 'D' + +FILE *mapping_file; + +static char *get_mapping_filename() +{ + static char filename[FILENAME_MAX]; + sprintf(filename, "%s", MAP_FILENAME); + LOG_TRACE("mapping filename : \"%s\"\n", filename); + return filename; +} + +void open_mapping_file() +{ + LOG_DEBUG("opening mapping file\n"); + char *filename = get_mapping_filename(); + mapping_file = fopen(filename, "w"); + if (mapping_file == NULL) + { + LOG_FATAL("couldn't open mapping file; errno : %u\n", errno); + } + LOG_DEBUG("openned mapping file : %p\n", mapping_file); +} + +void write_read_mapping_from_backtrack(char *chromosome_name, uint64_t genome_pos, backtrack_t *backtrack_end, int8_t *read, int read_id) +{ + char patch[MAX_PATCH_LENGTH]; + int patch_idx=MAX_PATCH_LENGTH; + patch[--patch_idx] = 0; + + char nucleotide[4] = {'A', 'C', 'T', 'G'}; + + uint8_t read_letter; + for (;backtrack_end->type != CODE_END; backtrack_end--) { + read_letter = read[backtrack_end->jx]; + if (read_letter < 4) { + read_letter = nucleotide[read_letter]; + } else { + read_letter = read_letter & (!0x20); + } + switch (backtrack_end->type) { + case 0: + patch[--patch_idx] = '-'; + break; + case CODE_SUB: + patch[--patch_idx] = read_letter | 0x20; + break; + case CODE_INS: + patch[--patch_idx] = read_letter; + break; + case CODE_DEL: + patch[--patch_idx] = '/'; + break; + } + } + fprintf(mapping_file, "%s\t%lu\t%s\t%d\n", chromosome_name, genome_pos, &patch[patch_idx], read_id); +} + +void write_read_mapping(char *chromosome_name, uint64_t genome_pos, uint8_t *code, uint8_t *read) { + char patch[MAX_PATCH_LENGTH]; + int patch_idx=0; + int read_idx=0; + + uint32_t code_idx; + char nucleotide[4]= {'A', 'C', 'T', 'G'}; + uint8_t last_action = CODE_INS; + for (code_idx=0; code[code_idx] < CODE_END;) + { + uint8_t action = code[code_idx++]; + uint8_t position = read_idx; + uint8_t letter='E'; + if (action>3) + { + position = code[code_idx++]; + if (code[code_idx] < 5) + { + letter = nucleotide[code[code_idx++]]; + } else { + letter = nucleotide[(code[code_idx++] && 0x6)>>1]; + } + } + for (;read_idx>1]|0x20;//lowercase + } + // patch[patch_idx++] = '='; + } + switch (action) + { + case CODE_SUB: + patch[patch_idx++] = letter|0x20;//lowercase + read_idx++; + break; + case CODE_DEL: + patch[patch_idx++] = '/'; + break; + case CODE_INS: + patch[patch_idx++] = letter; + read_idx++; + break; + default: + //Consider the code read is not an action but a letter associated with the previous action + if (action < 5) + { + letter = nucleotide[action]; + } else { + letter = nucleotide[(action && 0x6) >> 1]; + } + switch (last_action) + { + case CODE_SUB: + patch[patch_idx++] = letter|0x20;//lowercase + read_idx++; + break; + case CODE_DEL: + patch[patch_idx++] = '/'; + break; + case CODE_INS: + patch[patch_idx++] = letter; + read_idx++; + break; + } + } + } + if (code[code_idx] == CODE_ERR) + { + LOG_ERROR("found CODE_ERR in read code\n"); + return; + } + if (code[code_idx] != CODE_END) + { + LOG_ERROR("found unsuspected code : %u\n", code[code_idx]); + for (uint32_t i = 0; i>1]|0x20;//lowercase + } + // patch[patch_idx++] = '='; + } + patch[patch_idx++] = 0; + fprintf(mapping_file, "%s\t%lu\t%s\n", chromosome_name, genome_pos, patch); +} + +void close_mapping_file() +{ + LOG_TRACE("closing mapping file\n"); + fclose(mapping_file); +} diff --git a/host/src/parse_args.c b/host/src/parse_args.c index 5748041..fef1b51 100644 --- a/host/src/parse_args.c +++ b/host/src/parse_args.c @@ -25,6 +25,7 @@ static char *input_path = NULL; static bool simulation_mode = false; static bool no_filter = false; static bool index_with_dpus = false; +static bool use_freq_table = false; static goal_t goal = goal_unknown; static unsigned int nb_dpu = DPU_ALLOCATE_ALL; static unsigned int nb_thread_for_simu = UINT_MAX; @@ -35,14 +36,15 @@ static void usage() { ERROR_EXIT(ERR_USAGE, "\nusage: %s -i -g [ -s [ -t ] | -n ] [ -d " - "]\n" + "] [ -q ]\n" "options:\n" "\t-i\tInput prefix that will be used to find the inputs files\n" "\t-g\tGoal of the run - values=index|map\n" "\t-d\tTry to use Hardware DPU to help indexing\n" "\t-s\tSimulation mode (not compatible with -n)\n" "\t-t\tNumber of thread to use to simulate DPUs (only in simulation mode) (default: 1/2 of the threads of the system)\n" - "\t-n\tNumber of DPUs to use when not in simulation mode (default: use all available DPUs)\n", + "\t-n\tNumber of DPUs to use when not in simulation mode (default: use all available DPUs)\n" + "\t-q\tUse a frequency table for variant calling (more precise but doesn't call indels)", prog_name); } @@ -162,6 +164,14 @@ static void validate_nb_thread_for_simu(const char *nb_thread_for_simu_str) unsigned int get_nb_thread_for_simu() { return nb_thread_for_simu; } +/**************************************************************************************/ +/**************************************************************************************/ +static void validate_use_frequency_table() { + use_freq_table = true; +} + +bool get_use_frequency_table() { return use_freq_table; } + /**************************************************************************************/ /**************************************************************************************/ void validate_args(int argc, char **argv) @@ -172,7 +182,7 @@ void validate_args(int argc, char **argv) prog_name = strdup(argv[0]); check_permission(); - while ((opt = getopt(argc, argv, "dfsi:g:n:t:")) != -1) { + while ((opt = getopt(argc, argv, "dfsqi:g:n:t:")) != -1) { switch (opt) { case 'd': validate_index_with_dpus_mode(); @@ -195,6 +205,9 @@ void validate_args(int argc, char **argv) case 'f': validate_no_filter(); break; + case 'q': + validate_use_frequency_table(); + break; default: ERROR("unknown option"); usage(); diff --git a/host/src/processread.c b/host/src/processread.c index 5f3b555..f781e0f 100644 --- a/host/src/processread.c +++ b/host/src/processread.c @@ -7,51 +7,79 @@ #include #include #include +#include #include "accumulateread.h" +#include "common.h" +#include "debug.h" #include "genome.h" #include "getread.h" #include "processread.h" +#include "mapping_file.h" #include "upvc.h" #include "vartree.h" +#include "parse_args.h" +#include "profiling.h" + +#define DEBUG_READ_MAPPING false #define SIZE_INSERT_MEAN (400) #define SIZE_INSERT_STD (3 * 50) -#define CODE_SUB 10 -#define CODE_DEL 11 -#define CODE_INS 12 -#define CODE_END 13 -#define CODE_ERR 14 - #define CODE_A 0 /* ('A'>>1)&3 41H 0100 0001 */ #define CODE_C 1 /* ('C'>>1)&3 43H 0100 0011 */ #define CODE_T 2 /* ('T'>>1)&3 54H 0101 0100 */ #define CODE_G 3 /* ('G'>>1)&3 47H 0100 0111 */ -#define PQD_INIT_VAL (99) +#define PQD_INIT_VAL (999) -#define PATH_SUBSTITUTION (0) -#define PATH_INSERTION (1) -#define PATH_DELETION (2) +#if SIZE_READ>120 +#define MAX_SUBSTITUTION 20 +#else +#define MAX_SUBSTITUTION 20 +#endif + +#define MAX_SCORE_DIFFERENCE_WITH_BEST 40 +#define MAX_CONSIDERED_MAPPINGS 4000 -typedef struct { - int type; - int ix; - int jx; -} backtrack_t; static int min(int a, int b) { return a < b ? a : b; } -static void DPD_compute(int s1, int s2, int *Dij, int Dijm, int Dimj, int Dimjm, int *Pij, int Pijm, int *Qij, int Qimj, int *xij) +static unsigned int reads_not_correct_cost = 0; +static unsigned int reads_correct_cost_sub_only = 0; +static unsigned int reads_correct_cost_DPD = 0; + +static void DPD_compute( + int s1, int s2, int *Dij, int Dijm, int Dimj, int Dimjm, int *Pij, int Pijm, int *Qij, int Qimj, int *xij) { + /* Compute values for D,P,Q and x at (i,j) from values at (i-1,j), (i,j-1), (i-1,j-1). + * i represents relative index in reference genome (starting from where the read is mapped). + * j represents index in read. + * Dij is the minimum cost to align the first i nucleotides from genome with the first j nucleotides from read. + * Pij is the same minimum cost but assuming the last operation was an insertion. + * Qij is the same minimum cost but assuming the last operation was a deletion. + * xij is the chosen operation to reach the cost in Dij : + * 0 means read and reference genome match + * 1 is for a substitution + * 2 is for an insertion + * 3 is for a deletion + */ int min_QP, d; + // Compute cost in case of insertion : + // D_i_j-1 + COST_GAPO in case of first insertion + // D_i_j-1 + COST_GAPE in case insertion directly follows another insertion (In this case, D_i_j-1 = P_i_j-1) *Pij = min(Dijm + COST_GAPO, Pijm + COST_GAPE); + // Similar for deletion : *Qij = min(Dimj + COST_GAPO, Qimj + COST_GAPE); *xij = 0; + //x_i_j is the backtracking + *xij = 0; + int x; + // Get minimum between Pij (which assumes insertion) and Qij (which assumes deletion) + // and store the associated operation in x (2 for insertion and 3 for deletion). if (*Pij < *Qij) { min_QP = *Pij; x = 2; @@ -59,11 +87,18 @@ static void DPD_compute(int s1, int s2, int *Dij, int Dijm, int Dimj, int Dimjm, min_QP = *Qij; x = 3; } + + // Compute the diagonal score in d : + // Dimjm (D[i-1][j-1]) if genome and read match (genome[read_map_address+SIZE_SEED+i]==read[j]) + // Dimjm + COST_SUB if genome and read do not match (genome[read_map_address+SIZE_SEED+i]!=read[j]) d = Dimjm; if ((s1 & 3) != (s2 & 3)) { d += COST_SUB; *xij = 1; } + + // If diagonal score is best, store it in Dij (in this case, xij has already been set to the correct value) + // Otherwise, set Dij to the best score between insertion and deletion (and set xij correspondingly). if (d < min_QP) { *Dij = d; } else { @@ -72,24 +107,63 @@ static void DPD_compute(int s1, int s2, int *Dij, int Dijm, int Dimj, int Dimjm, } } -int DPD(int8_t *s1, int8_t *s2, backtrack_t *backtrack, int size_neighbour_in_symbols) +int subOnlyPath(int8_t *s1, int8_t *s2, backtrack_t* backtrack, backtrack_t ** backtrack_end) +{ + STAT_RECORD_START(STAT_SUB_ONLY_PATH); + int score = 0; + backtrack[0].type = CODE_END; + (*backtrack_end) = &backtrack[1]; + for (int i=0; itype = CODE_SUB; + (*backtrack_end)->ix = i; + (*backtrack_end)->jx = i; + (*backtrack_end)++; + score += COST_SUB; + } else { + (*backtrack_end)->type = 0; + (*backtrack_end)->ix = i; + (*backtrack_end)->jx = i; + (*backtrack_end)++; + } + } + (*backtrack_end)--; + STAT_RECORD_LAST_STEP(STAT_SUB_ONLY_PATH, 0); + return score; +} + +int DPD(int8_t *s1, int8_t *s2, backtrack_t *backtrack, backtrack_t ** backtrack_end) { - int matrix_size = size_neighbour_in_symbols + 1; + STAT_RECORD_START(STAT_DPD); + /* s1 is a pointer to the genome where the end of the seed was mapped + * s2 is a pointer to the end of the seed in the read + * backtrack is a data structure that will be populated by this function with a sequence of operations + * (insertions, deletions, substitutions or match) paired with their corresponding indices in genome and read + */ + // Matrices are only computed for neighbours (ie: nucleotides not in the seed). + int matrix_size = SIZE_READ + 1; + // Only NB_DIAG (odd) diagonals are computed for each matrix. Values outside this diagonal aren't considered. + // ie: we only consider indices where j-diagonal < i < j+diagonal int diagonal = (NB_DIAG / 2) + 1; + // D is the matrix of scores. + // P is the matrix of scores assuming last operation is an insertion. + // Q is the matrix of scores assuming last operation is a deletion. + // X is the matrix of actual operations used for backtracking. int D[matrix_size][matrix_size]; int P[matrix_size][matrix_size]; int Q[matrix_size][matrix_size]; int X[matrix_size][matrix_size]; + // Best score found at the end of computation (upper row/right-most column of D matrix) int min_score = PQD_INIT_VAL; + // Indices of best score found in the upper-right row/column of D matrix int min_score_i_idx = 0; int min_score_j_idx = 0; - int align_distance = 1; - for (int i = 0; i < matrix_size; i++) { - for (int j = 0; j < matrix_size; j++) { - D[i][j] = 0; - } - } + + // Set first row and column of P and Q to high values + // Set first row and column of D to correct values + // FIXME : It would probably make more sense to set D[i][0] and D[0][i] to i*COST_GAPO + // but this should also be changed accordingly in DPU then. for (int i = 0; i <= diagonal; i++) { P[i][0] = PQD_INIT_VAL; P[0][i] = PQD_INIT_VAL; @@ -99,6 +173,22 @@ int DPD(int8_t *s1, int8_t *s2, backtrack_t *backtrack, int size_neighbour_in_sy D[0][i] = i * COST_SUB; } + STAT_RECORD_STEP(STAT_DPD, 0); + /* D matrix at this point : (assuming COST_SUB=1 and diagonal = 3 ie: NB_DIAG=5) + * (0,0) in bottom left + * + * | + * | + * |3 + * ^|2 + * ||1 + * j|0 1 2 3 + * +-------------- + * i-> + */ + + + // Compute first trapezoid (bottom left part) for (int i = 1; i < diagonal; i++) { for (int j = 1; j < i + diagonal; j++) { DPD_compute(s1[i - 1], s2[j - 1], &D[i][j], D[i][j - 1], D[i - 1][j], D[i - 1][j - 1], &P[i][j], P[i][j - 1], @@ -107,6 +197,26 @@ int DPD(int8_t *s1, int8_t *s2, backtrack_t *backtrack, int size_neighbour_in_sy Q[i][i + diagonal] = PQD_INIT_VAL; D[i][i + diagonal] = PQD_INIT_VAL; } + STAT_RECORD_STEP(STAT_DPD, 1); + + /* D matrix at this point : (assuming COST_SUB=1 and diagonal = 3 ie: NB_DIAG=5) + * (0,0) in bottom left + * An x corresponds to a value that has been computed. + * An M corresponds to PQD_INIT_VAL + * + * | + * | + * | M + * | M x + * |3 x x + * ^|2 x x + * ||1 x x + * j|0 1 2 3 + * +-------------- + * i-> + */ + + // Compute most of the diagonal for (int i = diagonal; i < matrix_size - diagonal; i++) { P[i][i - diagonal] = PQD_INIT_VAL; D[i][i - diagonal] = PQD_INIT_VAL; @@ -117,7 +227,32 @@ int DPD(int8_t *s1, int8_t *s2, backtrack_t *backtrack, int size_neighbour_in_sy Q[i][i + diagonal] = PQD_INIT_VAL; D[i][i + diagonal] = PQD_INIT_VAL; } + STAT_RECORD_STEP(STAT_DPD, 2); + /* D matrix at this point : (assuming COST_SUB=1, diagonal = 3 ie: NB_DIAG=5, and matrix_size=12) + * (0,0) in bottom left + * An x corresponds to a value that has been computed. + * An M corresponds to PQD_INIT_VAL + * + * +-----------------------+ + * | M | + * | M x | + * | M x x | + * | M x x x | + * | M x x x x | + * | M x x x x x | + * | M x x x x x M | + * | M x x x x x M | + * |3 x x x x x M | + * ^|2 x x x x M | + * ||1 x x x M | + * j|0 1 2 M | + * +-----------------------+ + * i-> + */ + + // Compute last trapezoid (top right part) + // (And check for best score in top-most row of D matrix) for (int i = matrix_size - diagonal; i < matrix_size; i++) { P[i][i - diagonal] = PQD_INIT_VAL; D[i][i - diagonal] = PQD_INIT_VAL; @@ -131,6 +266,31 @@ int DPD(int8_t *s1, int8_t *s2, backtrack_t *backtrack, int size_neighbour_in_sy min_score_j_idx = matrix_size - 1; } } + STAT_RECORD_STEP(STAT_DPD, 3); + + /* D matrix at this point : (assuming COST_SUB=1, diagonal = 3 ie: NB_DIAG=5, and matrix_size=12) + * (0,0) in bottom left + * An x corresponds to a value that has been computed. + * An M corresponds to PQD_INIT_VAL + * + * +-----------------------+ + * | M x x x| + * | M x x x x| + * | M x x x x x| + * | M x x x x x M| + * | M x x x x x M | + * | M x x x x x M | + * | M x x x x x M | + * | M x x x x x M | + * |3 x x x x x M | + * ^|2 x x x x M | + * ||1 x x x M | + * j|0 1 2 M | + * +-----------------------+ + * i-> + */ + + // Find the best score in right-most column of D matrix (if it is better than the best from the top row) for (int j = matrix_size - diagonal; j < matrix_size; j++) { if (D[matrix_size - 1][j] < min_score) { min_score = D[matrix_size - 1][j]; @@ -138,305 +298,395 @@ int DPD(int8_t *s1, int8_t *s2, backtrack_t *backtrack, int size_neighbour_in_sy min_score_j_idx = j; } } + STAT_RECORD_STEP(STAT_DPD, 4); + // backtrack step { int i = min_score_i_idx; int j = min_score_j_idx; backtrack[0].type = CODE_END; - while ((i > 0) && (j > 0)) { - if (X[i][j] == 0) { - i--; - j--; - } else { - if (X[i][j] == 1) { - i--; - j--; - backtrack[align_distance].type = CODE_SUB; - backtrack[align_distance].ix = i; - backtrack[align_distance].jx = j; - align_distance++; - } else { - if (X[i][j] == 2) { - j--; - backtrack[align_distance].type = CODE_INS; - backtrack[align_distance].ix = i; - backtrack[align_distance].jx = j; - align_distance++; - } else { - i--; - backtrack[align_distance].type = CODE_DEL; - backtrack[align_distance].ix = i; - backtrack[align_distance].jx = j; - align_distance++; - } - } - } - } + (*backtrack_end) = &backtrack[1]; + while ((i > 0) || (j > 0)) { + if(X[i][j] == 0) { + // Operation 0 : sequences match and nothing was done : decrease both indices. + i--; + j--; + (*backtrack_end)->type = CODE_MATCH; + (*backtrack_end)->ix = i; + (*backtrack_end)->jx = j; + (*backtrack_end)++; + } else if(X[i][j] == 1) { + // Operation 1 : substitution : decrease both indices and store substitution code. + i--; + j--; + (*backtrack_end)->type = CODE_SUB; + (*backtrack_end)->ix = i; + (*backtrack_end)->jx = j; + (*backtrack_end)++; + } else if(X[i][j] == 2) { + // Operation 2 : insertion : decrease read index but not genome index and store insertion code. + j--; + (*backtrack_end)->type = CODE_INS; + (*backtrack_end)->ix = i; + (*backtrack_end)->jx = j; + (*backtrack_end)++; + } else { + // Operation 3 : deletion : decrease genome index but not read index and store deletion code. + i--; + (*backtrack_end)->type = CODE_DEL; + (*backtrack_end)->ix = i; + (*backtrack_end)->jx = j; + (*backtrack_end)++; + } + } + (*backtrack_end)--; } - return align_distance; + STAT_RECORD_LAST_STEP(STAT_DPD, 5); + return min_score; } -/* - * encoding of differences between read and sequence of the reference genome. - * coding: substitution CODE_SUB pos x - * deletion CODE_DEL pos x+ - * insertion CODE_INS pos x+ - * end CODE_END - * - * x = A | C | G | T - * x+ = a sequence of at least 1 element (i.e. A, C, G ou T) - * pos = integer (8 bits) : give the offset of the variant from the start of the read - * - * example S 12 A D 56 A T G I 87 T C X ==> substitution (A) position 12, deletion (ATG) position 56, insertion (TC) position 87 - * The code is return in "code" as a table of int8_t - */ +#define NB_FREQ_TABLE_MUTEXES (1<<10) +static pthread_mutex_t freq_table_mutexes[NB_FREQ_TABLE_MUTEXES+1]; +//static pthread_mutex_t print_mutex; +static pthread_mutex_t codependence_list_mutex; -static int code_alignment(uint8_t *code, int score, int8_t *gen, int8_t *read, unsigned size_neighbour_in_symbols) +bool update_frequency_table( + genome_t *ref_genome, + dpu_result_out_t *result_tab, + int8_t *reads_buffer, + float *reads_quality_buffer, + int pos, + float mapq, + bool unsure + ) { - int code_idx, computed_score, backtrack_idx; - int size_read = SIZE_READ; - int size_neighbour = size_neighbour_in_symbols; - backtrack_t backtrak[size_read]; - - if (score == 0) { - code[0] = CODE_END; - return 1; - } + STAT_RECORD_START(STAT_UPDATE_FREQUENCY_TABLE); + struct frequency_info ** frequency_table = get_frequency_table(); + struct variants_codependence_info_list** codependence_table = get_codependence_table(&codependence_list_mutex); + uint64_t genome_pos = ref_genome->pt_seq[result_tab[pos].coord.seq_nr] + result_tab[pos].coord.seed_nr; + int num = result_tab[pos].num; + int8_t *read = reads_buffer + (num * SIZE_READ); + float *read_quality = reads_quality_buffer + (num>>1)*SIZE_READ; + bool invert_read = num & 1; + + backtrack_t backtrack[SIZE_READ<<1]; + backtrack_t * backtrack_end = backtrack; - /* First, looking for subsititution only */ - code_idx = 0; - computed_score = 0; - for (int i = SIZE_SEED; i < size_neighbour + SIZE_SEED; i++) { - if ((gen[i] & 3) != read[i]) { - computed_score += COST_SUB; - code[code_idx++] = CODE_SUB; - code[code_idx++] = i; - code[code_idx++] = read[i]; - if (computed_score > score) { - break; + /* First, try substitutions only */ + unsigned int computed_score = subOnlyPath(&ref_genome->data[genome_pos], read, backtrack, &backtrack_end); + STAT_RECORD_STEP(STAT_UPDATE_FREQUENCY_TABLE, 0); + + if (computed_score > result_tab[pos].score) { + computed_score = DPD(&ref_genome->data[genome_pos], read, backtrack, &backtrack_end); + if (computed_score == result_tab[pos].score) { + reads_correct_cost_DPD++; + } else { + reads_not_correct_cost++; } - } - } - code[code_idx++] = CODE_END; - if (computed_score == score) - return code_idx; - - /* Otherwise, re-compute the matrix (only some diagonals) and put in backtrack the path */ - backtrack_idx = DPD(gen, read, backtrak, size_neighbour_in_symbols + SIZE_SEED); - if (backtrack_idx == -1) { - code[0] = CODE_ERR; - return 1; + } else { + reads_correct_cost_sub_only++; } + STAT_RECORD_STEP(STAT_UPDATE_FREQUENCY_TABLE, 1); - backtrack_idx--; - code_idx = 0; - while (backtrack_idx > 0) { - if (backtrak[backtrack_idx].type == CODE_SUB) { - code[code_idx++] = CODE_SUB; - code[code_idx++] = backtrak[backtrack_idx].jx - 1; - code[code_idx++] = read[backtrak[backtrack_idx].jx - 1]; - backtrack_idx--; - } else { - if (backtrak[backtrack_idx].type == CODE_DEL) { - int backtrack_jx = backtrak[backtrack_idx].jx; - code[code_idx++] = CODE_DEL; - code[code_idx++] = backtrak[backtrack_idx].ix; - code[code_idx++] = gen[backtrak[backtrack_idx].ix] & 3; - backtrack_idx--; - while ((backtrak[backtrack_idx].type == CODE_DEL) && (backtrack_jx == backtrak[backtrack_idx].jx)) { - code[code_idx++] = gen[backtrak[backtrack_idx].ix] & 3; - backtrack_idx--; - } - } else { - int backtrack_ix = backtrak[backtrack_idx].ix; - code[code_idx++] = CODE_INS; - code[code_idx++] = backtrak[backtrack_idx].jx - 1; - code[code_idx++] = read[backtrak[backtrack_idx].jx]; - backtrack_idx--; - while ((backtrak[backtrack_idx].type == CODE_INS) && (backtrack_ix == backtrak[backtrack_idx].ix)) { - code[code_idx++] = read[backtrak[backtrack_idx].jx]; - backtrack_idx--; - } + #if DEBUG_READ_MAPPING + write_read_mapping_from_backtrack(ref_genome->seq_name[result_tab[pos].coord.seq_nr], result_tab[pos].coord.seed_nr, backtrack_end, read, num); + #endif + + STAT_RECORD_STEP(STAT_UPDATE_FREQUENCY_TABLE, 2); + + // /!\ Pointer arithmetics + unsigned int mutex_index = (genome_pos*NB_FREQ_TABLE_MUTEXES)/ref_genome->fasta_file_size; + unsigned int end_mutex_index = ((genome_pos+backtrack_end->ix)*NB_FREQ_TABLE_MUTEXES)/ref_genome->fasta_file_size; + pthread_mutex_lock(&freq_table_mutexes[mutex_index]); + if (end_mutex_index != mutex_index) { + pthread_mutex_lock(&freq_table_mutexes[end_mutex_index]); + } + STAT_RECORD_STEP(STAT_UPDATE_FREQUENCY_TABLE, 3); + bool has_indel = false; + uint8_t read_letter; + //printf("genome_pos=%lu\n", genome_pos); + unsigned int variants_found = 0; + unsigned int variants_found_positions[SIZE_READ]; + uint8_t variants_found_letters[SIZE_READ]; + for (; backtrack_end > backtrack; backtrack_end--) { + unsigned int current_position = genome_pos+backtrack_end->ix; + //printf("backtrack_end->ix=%u; current_position=%u\n", backtrack_end->ix, current_position); + if (current_position > ref_genome->fasta_file_size) { + continue; } - } + switch (backtrack_end->type) { + case CODE_SUB: + read_letter = read[backtrack_end->jx]; + if (read_letter > 3) { + read_letter = read_letter>>1 & 0x3; + } + + // update codependence info + for (unsigned int i=0; ifasta_file_size, &codependence_list_mutex); + add_codependence_info(&codependence_table[variants_found_positions[i]], (int16_t) (current_position-variants_found_positions[i]), variants_found_letters[i], read_letter, ref_genome->fasta_file_size, &codependence_list_mutex); + //pthread_mutex_unlock(&codependence_list_mutex); + } + variants_found_positions[variants_found] = current_position; + variants_found_letters[variants_found] = read_letter; + variants_found++; + + frequency_table[read_letter][current_position].freq += mapq * read_quality[invert_read ? SIZE_READ-backtrack_end->jx-1 : backtrack_end->jx]; + frequency_table[read_letter][current_position].score++; + + + if (unsure) { + frequency_table[read_letter][current_position].unsure_score++; + } + break; + case CODE_MATCH: + read_letter = read[backtrack_end->jx]; + if (read_letter > 3) { + read_letter = read_letter>>1 & 0x3; + } + frequency_table[read_letter][current_position].freq += mapq * read_quality[invert_read ? SIZE_READ-backtrack_end->jx-1 : backtrack_end->jx]; + frequency_table[read_letter][current_position].score++; + + + if (unsure) { + frequency_table[read_letter][current_position].unsure_score++; + } + break; + case CODE_INS: + case CODE_DEL: + has_indel = true; + //LOG_WARN("unhandled indel\n"); + // TODO: handle indels + break; + } + } + STAT_RECORD_STEP(STAT_UPDATE_FREQUENCY_TABLE, 4); + pthread_mutex_unlock(&freq_table_mutexes[mutex_index]); + if (end_mutex_index != mutex_index) { + pthread_mutex_unlock(&freq_table_mutexes[end_mutex_index]); + } + STAT_RECORD_LAST_STEP(STAT_UPDATE_FREQUENCY_TABLE, 5); + return has_indel; +} + +static volatile unsigned int curr_match; +static pthread_mutex_t curr_match_mutex; +unsigned int acquire_curr_match() +{ + pthread_mutex_lock(&curr_match_mutex); + return curr_match; +} +void release_curr_match(unsigned int new_curr_match) +{ + curr_match = new_curr_match; + pthread_mutex_unlock(&curr_match_mutex); + return; +} + +static pthread_barrier_t barrier; + +typedef struct { + unsigned int nb_match; + dpu_result_out_t *result_tab; + int round; + int8_t *reads_buffer; + float *reads_quality_buffer; + genome_t *ref_genome; + FILE *fpe1; + FILE *fpe2; +} process_read_arg_t; + +static uint64_t nr_reads_total = 0ULL; +static uint64_t nr_reads_total_from_dpus = 0ULL; +static uint64_t nr_reads_non_mapped = 0ULL; +static uint64_t nr_reads_with_indels = 0ULL; +static pthread_mutex_t nr_reads_mutex = PTHREAD_MUTEX_INITIALIZER; + +#define MISMATCH_COUNT(X) (X.score / 10) +#define INVALID_SCORE 1000 + +static void keep_best_2_scores(unsigned score, unsigned* P1, unsigned *P2, unsigned x1, unsigned x2, unsigned* best_score) { + + if(score < best_score[0]) { + + // move current to next position + best_score[1] = best_score[0]; + P1[1] = P1[0]; + P2[1] = P2[0]; + // update first position + best_score[0] = score; + P1[0] = x1; + P2[0] = x2; + } + else if (score < best_score[1]) { + + // update second position + best_score[1] = score; + P1[1] = x1; + P2[1] = x2; + } +} + +static unsigned get_nb_scores(unsigned int * best_score) { + + unsigned np = 0; + if(best_score[0] < INVALID_SCORE) { + np++; + if(best_score[1] < INVALID_SCORE) np++; } - code[code_idx++] = CODE_END; - return code_idx; + return np; } -static void set_variant( - dpu_result_out_t result_match, genome_t *ref_genome, int8_t *reads_buffer, unsigned int size_neighbour_in_symbols) +static void set_variant(dpu_result_out_t result_match, genome_t *ref_genome, int8_t *reads_buffer) { - uint32_t code_result_idx; - uint8_t code_result_tab[256]; - int8_t *read; + int8_t *read = &reads_buffer[result_match.num * SIZE_READ]; char nucleotide[4] = { 'A', 'C', 'T', 'G' }; uint64_t genome_pos = ref_genome->pt_seq[result_match.coord.seq_nr] + result_match.coord.seed_nr; - int size_read = SIZE_READ; /* Get the differences betweend the read and the sequence of the reference genome that match */ - read = &reads_buffer[result_match.num * size_read]; - code_alignment(code_result_tab, result_match.score, &ref_genome->data[genome_pos], read, size_neighbour_in_symbols); - if (code_result_tab[0] == CODE_ERR) - return; + read = &reads_buffer[result_match.num * SIZE_READ]; - /* Update "mapping_coverage" with the number of reads that match at this position of the genome */ - for (int i = 0; i < size_read; i++) { - ref_genome->mapping_coverage[genome_pos + i] += 1; - } + // code_alignment(code_result_tab, result_match.score, &ref_genome->data[genome_pos], read, size_neighbour_in_symbols); + backtrack_t backtrack[SIZE_READ<<1]; + backtrack_t * backtrack_end = backtrack; - code_result_idx = 0; - while (code_result_tab[code_result_idx] != CODE_END) { - int code_result = code_result_tab[code_result_idx]; - int64_t pos_variant_read = code_result_tab[code_result_idx + 1]; - int64_t pos_variant_genome = genome_pos + pos_variant_read; - int ref_pos = 0; - int alt_pos = 0; - variant_t *newvar = (variant_t *)malloc(sizeof(variant_t)); - newvar->depth = 1; - newvar->score = result_match.score; - newvar->next = NULL; - if (code_result == CODE_SUB) { - /* SNP = 0,1,2,3 (code A,C,T,G) */ - int snp = code_result_tab[code_result_idx + 2]; - newvar->ref[ref_pos++] = nucleotide[ref_genome->data[pos_variant_genome] & 3]; - newvar->alt[alt_pos++] = nucleotide[snp & 3]; - - code_result_idx += 3; - } else if (code_result == CODE_INS) { - int64_t ps_var_genome = pos_variant_genome; - int64_t ps_var_read = pos_variant_read; - code_result_idx += 2; - - while (code_result_tab[code_result_idx] < 4) { - ps_var_read++; - code_result_idx++; - } + /* First, try substitutions only */ + unsigned int computed_score = subOnlyPath(&ref_genome->data[genome_pos], read, backtrack, &backtrack_end); - while (ref_genome->data[ps_var_genome] == read[ps_var_read]) { - ps_var_genome--; - ps_var_read--; - pos_variant_genome--; - pos_variant_read--; + if (computed_score > result_match.score) { + computed_score = DPD(&ref_genome->data[genome_pos], read, backtrack, &backtrack_end); + if (computed_score == result_match.score) { + reads_correct_cost_DPD++; + } else { + reads_not_correct_cost++; } + } else { + reads_correct_cost_sub_only++; + } - newvar->ref[ref_pos++] = nucleotide[ref_genome->data[pos_variant_genome] & 3]; + if (backtrack_end->type == CODE_ERR) + return; - while (pos_variant_read <= ps_var_read) { - newvar->alt[alt_pos++] = nucleotide[read[pos_variant_read] & 3]; - if (alt_pos >= MAX_SIZE_ALLELE - 1) { - free(newvar); - return; - } - pos_variant_read++; - } + #if DEBUG_READ_MAPPING + write_read_mapping_from_backtrack(ref_genome->seq_name[result_match.coord.seq_nr], result_match.coord.seed_nr, backtrack_end, read, result_match.num); + #endif - } else if (code_result == CODE_DEL) { - int64_t ps_var_genome = pos_variant_genome; - int64_t ps_var_read = pos_variant_read; - code_result_idx += 2; + /* Update "mapping_coverage" with the number of reads that match at this position of the genome */ + for (int i = 0; i < SIZE_READ; i++) { + ref_genome->mapping_coverage[genome_pos + i] += 1; + } + if (result_match.coord.seq_nr > ref_genome->nb_seq) { + LOG_WARN("skipping a read on a bogus sequence: %d\n", result_match.coord.seq_nr); + return; + } - while (code_result_tab[code_result_idx] < 4) { - ps_var_genome++; - code_result_idx++; + for (; backtrack_end > backtrack; backtrack_end--) { + unsigned int current_position = genome_pos+backtrack_end->ix; + //printf("backtrack_end->ix=%u; current_position=%u\n", backtrack_end->ix, current_position); + if (current_position > ref_genome->fasta_file_size) { + continue; } - while (ref_genome->data[ps_var_genome] == read[ps_var_read]) { - ps_var_read--; - ps_var_genome--; - pos_variant_genome--; - pos_variant_read--; - } + if (backtrack_end->type != 0) { + variant_t *newvar = (variant_t *)malloc(sizeof(variant_t)); + newvar->depth = 1; + newvar->score = result_match.score; + newvar->next = NULL; + int alt_idx = 0; + int ref_idx = 0; + int64_t pos_variant_genome = genome_pos + backtrack_end->ix; - newvar->alt[alt_pos++] = nucleotide[ref_genome->data[pos_variant_genome] & 3]; + switch (backtrack_end->type) { + case CODE_INS: + case CODE_DEL: + pos_variant_genome--; + newvar->ref[ref_idx++] = nucleotide[ref_genome->data[genome_pos+(backtrack_end+1)->ix] & 0x3]; + newvar->alt[alt_idx++] = nucleotide[read[(backtrack_end+1)->jx] & 0x3]; + break; + } - while (pos_variant_genome <= ps_var_genome) { - newvar->ref[ref_pos++] = nucleotide[ref_genome->data[pos_variant_genome] & 3]; - if (ref_pos >= MAX_SIZE_ALLELE - 1) { - free(newvar); - return; + for (;backtrack_end->type != 0 && backtrack_end->type != CODE_END; backtrack_end--) { + switch (backtrack_end->type) { + case CODE_SUB: + if (alt_idx >= MAX_SIZE_ALLELE || ref_idx >= MAX_SIZE_ALLELE) { + LOG_WARN("clipped a variant because it was too complex") + goto insert_variant; + } + newvar->ref[ref_idx++] = nucleotide[ref_genome->data[genome_pos+backtrack_end->ix] & 0x3]; + newvar->alt[alt_idx++] = nucleotide[read[backtrack_end->jx] & 0x3]; + break; + case CODE_INS: + if (alt_idx >= MAX_SIZE_ALLELE) { + LOG_WARN("clipped a variant because it was too complex") + goto insert_variant; + } + newvar->alt[alt_idx++] = nucleotide[read[backtrack_end->jx] & 0x3]; + break; + case CODE_DEL: + if (ref_idx >= MAX_SIZE_ALLELE) { + LOG_WARN("clipped a variant because it was too complex") + goto insert_variant; + } + newvar->ref[ref_idx++] = nucleotide[ref_genome->data[genome_pos+backtrack_end->ix] & 0x3]; + break; + } } - pos_variant_genome++; + insert_variant: + newvar->ref[ref_idx] = '\0'; + newvar->alt[alt_idx] = '\0'; + variant_tree_insert( + newvar, result_match.coord.seq_nr, pos_variant_genome - ref_genome->pt_seq[result_match.coord.seq_nr]); + } - pos_variant_genome -= ref_pos; - } - newvar->ref[ref_pos] = '\0'; - newvar->alt[alt_pos] = '\0'; - variant_tree_insert( - newvar, result_match.coord.seq_nr, pos_variant_genome + 1 - ref_genome->pt_seq[result_match.coord.seq_nr]); } } static pthread_mutex_t non_mapped_mutex; static void add_to_non_mapped_read(int numread, int round, FILE *fpe1, FILE *fpe2, int8_t *reads_buffer) { + STAT_RECORD_START(STAT_ADD_TO_NON_MAPPED_READ); if (fpe1 == NULL || fpe2 == NULL) return; pthread_mutex_lock(&non_mapped_mutex); + STAT_RECORD_STEP(STAT_ADD_TO_NON_MAPPED_READ, 0); char nucleotide[4] = { 'A', 'C', 'T', 'G' }; - int size_read = SIZE_READ; - int8_t *read = &reads_buffer[numread * size_read]; + int8_t *read = &reads_buffer[numread * SIZE_READ]; fprintf(fpe1, ">>%d\n", SIZE_SEED * (round + 1)); - for (int j = SIZE_SEED; j < size_read; j++) { + for (int j = SIZE_SEED; j < SIZE_READ; j++) { fprintf(fpe1, "%c", nucleotide[read[j] & 3]); } for (int j = 0; j < SIZE_SEED; j++) { fprintf(fpe1, "A"); } fprintf(fpe1, "\n"); - read = &reads_buffer[(numread + 2) * size_read]; + read = &reads_buffer[(numread + 2) * SIZE_READ]; fprintf(fpe2, ">>%d\n", SIZE_SEED * (round + 1)); - for (int j = SIZE_SEED; j < size_read; j++) { + for (int j = SIZE_SEED; j < SIZE_READ; j++) { fprintf(fpe2, "%c", nucleotide[read[j] & 3]); } for (int j = 0; j < SIZE_SEED; j++) { fprintf(fpe2, "A"); } fprintf(fpe2, "\n"); + STAT_RECORD_STEP(STAT_ADD_TO_NON_MAPPED_READ, 1); pthread_mutex_unlock(&non_mapped_mutex); + STAT_RECORD_LAST_STEP(STAT_ADD_TO_NON_MAPPED_READ, 2); } -static volatile unsigned int curr_match; -static pthread_mutex_t curr_match_mutex; -unsigned int acquire_curr_match() -{ - pthread_mutex_lock(&curr_match_mutex); - return curr_match; -} -void release_curr_match(unsigned int new_curr_match) -{ - curr_match = new_curr_match; - pthread_mutex_unlock(&curr_match_mutex); - return; -} - -static pthread_barrier_t barrier; - -typedef struct { - unsigned int nb_match; - dpu_result_out_t *result_tab; - int round; - int8_t *reads_buffer; - genome_t *ref_genome; - FILE *fpe1; - FILE *fpe2; -} process_read_arg_t; - -static uint64_t nr_reads_total = 0ULL; -static uint64_t nr_reads_non_mapped = 0ULL; -static pthread_mutex_t nr_reads_mutex = PTHREAD_MUTEX_INITIALIZER; - +/*#define USE_MAPQ_SCORE*/ static void do_process_read(process_read_arg_t *arg) { + STAT_RECORD_START(STAT_DO_PROCESS_READ); const unsigned int nb_match = arg->nb_match; dpu_result_out_t *result_tab = arg->result_tab; int round = arg->round; int8_t *reads_buffer = arg->reads_buffer; + float *reads_quality_buffer = arg->reads_quality_buffer; genome_t *ref_genome = arg->ref_genome; FILE *fpe1 = arg->fpe1; FILE *fpe2 = arg->fpe2; - unsigned int size_neighbour_in_symbols = (SIZE_NEIGHBOUR_IN_BYTES - DELTA_NEIGHBOUR(round)) * 4; /* * The number of a pair is given by "num_read / 4 " (see dispatch_read function) @@ -453,84 +703,290 @@ static void do_process_read(process_read_arg_t *arg) */ while (true) { - unsigned int i; - if ((i = acquire_curr_match()) >= nb_match) { - release_curr_match(i); - return; + STAT_RECORD_STEP(STAT_DO_PROCESS_READ, 0); + unsigned int i; + if ((i = acquire_curr_match()) >= nb_match) { + release_curr_match(i); + STAT_RECORD_LAST_STEP(STAT_DO_PROCESS_READ, 6); + return; + } + int numpair = result_tab[i].num / 4; + unsigned int j = i; + while ((j < nb_match) && (numpair == result_tab[j].num / 4)) { + j++; + } + release_curr_match(j); + STAT_RECORD_STEP(STAT_DO_PROCESS_READ, 1); + + // find best mapping scores for each of the four reads considered + unsigned int best_individual_scores[4] = {UINT32_MAX, UINT32_MAX, UINT32_MAX, UINT32_MAX}; + for (unsigned int x=i; x result_tab[x].score) + best_individual_scores[t] = result_tab[x].score; + } + + unsigned int nb_considered_mappings[4] = {0,0,0,0}; + unsigned int considered_mappings[4][MAX_CONSIDERED_MAPPINGS]; + for (unsigned int x=i; x result_tab[x].score && nb_considered_mappings[t] 4000) { + LOG_WARN("nb_considered_mappings[%d] = %u\n", l, nb_considered_mappings[l]); + } + } + + // i = start index in result_tab + // j = stop index in result_tab + // select best couples of paired reads + STAT_RECORD_STEP(STAT_DO_PROCESS_READ, 2); + unsigned int P1[2] = {i,i}; + unsigned int P2[2] = {j-1,j-1}; + unsigned int pos1, pos2; + unsigned int best_score[2] = { INVALID_SCORE, INVALID_SCORE }; + + for (unsigned int x1 = 0; x1 < nb_considered_mappings[0]; x1++) { + pos1 = result_tab[considered_mappings[0][x1]].coord.seed_nr; + int score1 = result_tab[considered_mappings[0][x1]].score; + for (unsigned int x2 = 0; x2 < nb_considered_mappings[3]; x2++) { + pos2 = result_tab[considered_mappings[3][x2]].coord.seed_nr; + int score2 = result_tab[considered_mappings[3][x2]].score; + if ((abs((int)pos2 - (int)pos1) > READ_DIST_LOWER_BOUND && (abs((int)pos2 - (int)pos1) < READ_DIST_UPPER_BOUND))) { + // update if this is one of the two best scores + keep_best_2_scores(score1 + score2, P1, P2, considered_mappings[0][x1], considered_mappings[3][x2], best_score); + } } - int numpair = result_tab[i].num / 4; - unsigned int j = i; - while ((j < nb_match) && (numpair == result_tab[j].num / 4)) { - j++; + } + for (unsigned int x1 = 0; x1 < nb_considered_mappings[1]; x1++) { + pos1 = result_tab[considered_mappings[1][x1]].coord.seed_nr; + int score1 = result_tab[considered_mappings[1][x1]].score; + for (unsigned int x2 = 0; x2 < nb_considered_mappings[2]; x2++) { + pos2 = result_tab[considered_mappings[2][x2]].coord.seed_nr; + int score2 = result_tab[considered_mappings[2][x2]].score; + if ((abs((int)pos2 - (int)pos1) > READ_DIST_LOWER_BOUND && (abs((int)pos2 - (int)pos1) < READ_DIST_UPPER_BOUND))) { + // update if this is one of the two best scores + keep_best_2_scores(score1 + score2, P1, P2, considered_mappings[1][x1], considered_mappings[2][x2], best_score); + } } - release_curr_match(j); - - // i = start index in result_tab - // j = stop index in result_tab - // select best couples of paired reads - unsigned int P1[1000]; - unsigned int P2[1000]; - unsigned int np = 0; - unsigned int pos1, pos2, t1, t2; - unsigned int best_score = 1000; - // test all significant pairs of reads (0,3) & (1,2) - for (unsigned int x1 = i; x1 < j; x1++) { - t1 = result_tab[x1].num % 4; - pos1 = result_tab[x1].coord.seed_nr; - for (unsigned int x2 = i + 1; x2 < j; x2++) { - pos2 = result_tab[x2].coord.seed_nr; - t2 = result_tab[x2].num % 4; - if (t1 + t2 == 3) // select significant pair - { - if ((abs((int)pos2 - (int)pos1) > 130 && (abs((int)pos2 - (int)pos1) < 430))) { - if (result_tab[x1].score + result_tab[x2].score < best_score) { - np = 0; - best_score = result_tab[x1].score + result_tab[x2].score; - P1[np] = x1; - P2[np] = x2; - np++; - } else { - if (result_tab[x1].score + result_tab[x2].score == best_score) { - P1[np] = x1; - P2[np] = x2; - if (np < 999) - np++; - } - } - } - } + } + STAT_RECORD_STEP(STAT_DO_PROCESS_READ, 3); + + bool update = false; + bool hasIndel = false; + + unsigned np = get_nb_scores(best_score); + if (np > 0) { + LOG_DEBUG("found at least a pair (%u)\n", np); + + if(np == 2) {//if(false) { + + // found at least 2 matching pairs of positions. Check the delta between the two pairs to + // decide whether we should keep the best pair + int delta = abs((int)(MISMATCH_COUNT(result_tab[P1[0]]) + MISMATCH_COUNT(result_tab[P2[0]])) + - (int)(MISMATCH_COUNT(result_tab[P1[1]]) + MISMATCH_COUNT(result_tab[P2[1]]))); + + float mapq = 1.0f; +#ifdef USE_MAPQ_SCORE + int delta_corrected = MISMATCH_COUNT(result_tab[P1[0]]) + + MISMATCH_COUNT(result_tab[P2[0]]) + MAPQ_SCALING_FACTOR * ((2 * (MAX_SUBSTITUTION + 1)) - delta); + if(delta_corrected < 0) { + LOG_WARN("negative delta for square root %d\n", delta_corrected); + } + else if(delta >= DIST_PAIR_THRESHOLD) { + mapq = 1.0 - sqrt((double)delta_corrected / SIZE_READ); +#else + if(delta >= DIST_PAIR_THRESHOLD) { +#endif + if (get_use_frequency_table()) { + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P1[0], mapq, false); + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P2[0], mapq, false); + } else { + set_variant(result_tab[P1[0]], ref_genome, reads_buffer); + set_variant(result_tab[P2[0]], ref_genome, reads_buffer); + } + update = true; + } else { + if (get_use_frequency_table()) { + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P1[0], mapq, true); + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P2[0], mapq, true); + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P1[1], mapq, true); + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P2[1], mapq, true); + } else { + set_variant(result_tab[P1[0]], ref_genome, reads_buffer); + set_variant(result_tab[P2[0]], ref_genome, reads_buffer); + set_variant(result_tab[P1[1]], ref_genome, reads_buffer); + set_variant(result_tab[P2[1]], ref_genome, reads_buffer); } + // LOG_WARN("unusable pair (%u)\n", result_tab[i].num/4); + } } - if (np > 0) { - // choose the less covered zone on the genome - int x = 0; - uint64_t genome_pos; - int cov1, cov2; - int min_cov = 1000; - ; - for (unsigned int kk = 0; kk < np; kk++) { - genome_pos = ref_genome->pt_seq[result_tab[P1[kk]].coord.seq_nr] + result_tab[P1[kk]].coord.seed_nr; - cov1 = ref_genome->mapping_coverage[genome_pos]; - genome_pos = ref_genome->pt_seq[result_tab[P2[kk]].coord.seq_nr] + result_tab[P2[kk]].coord.seed_nr; - cov2 = ref_genome->mapping_coverage[genome_pos]; - if (cov1 + cov2 < min_cov) { - x = kk; - min_cov = cov1 + cov2; - } + else if(np) { // only one result, take it + int delta = abs((int)(MISMATCH_COUNT(result_tab[P1[0]]) + MISMATCH_COUNT(result_tab[P2[0]])) - (2 * (MAX_SUBSTITUTION + 1))); + float mapq = 1.0f; +#ifdef USE_MAPQ_SCORE + int delta_corrected = MISMATCH_COUNT(result_tab[P1[0]]) + MISMATCH_COUNT(result_tab[P2[0]]) + MAPQ_SCALING_FACTOR * ((2 * (MAX_SUBSTITUTION + 1)) - delta); + if(delta_corrected < 0) { + LOG_WARN("negative delta (np == 1) for square root %d\n", delta_corrected); + } + else if(delta >= DIST_PAIR_THRESHOLD) { + mapq = 1.0 - sqrt((double)delta_corrected / SIZE_READ); +#else + if(delta >= DIST_PAIR_THRESHOLD) { +#endif + if (get_use_frequency_table()) { + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P1[0], mapq, false); + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P2[0], mapq, false); + } else { + set_variant(result_tab[P1[0]], ref_genome, reads_buffer); + set_variant(result_tab[P2[0]], ref_genome, reads_buffer); } + update = true; + }/* else { + LOG_WARN("unusable pair (%u)\n", result_tab[i].num/4); + }*/ + } + }// else { + if (true) { + + // check mapping of R1 and R2 independently + unsigned int best_score_R1[2] = { INVALID_SCORE, INVALID_SCORE }; + unsigned int best_score_R2[2] = { INVALID_SCORE, INVALID_SCORE }; + unsigned int throwaway[2]; + P1[0] = 0; + P2[0] = 0; + P1[1] = 0; + P2[1] = 0; + for (unsigned int read = i; read < j; read++) { + unsigned t1 = result_tab[read].num % 4; + if(t1 < 2) { // PE1 or RPE1 + keep_best_2_scores(result_tab[read].score, P1, throwaway, read, 0, best_score_R1); + } + else { // PE2 or RPE2 + keep_best_2_scores(result_tab[read].score, throwaway, P2, 0, read, best_score_R2); + } + } + + unsigned np1 = get_nb_scores(best_score_R1), np2 = get_nb_scores(best_score_R2); + if(np1 == 2) { + + int delta = abs((int)MISMATCH_COUNT(result_tab[P1[0]]) - (int)MISMATCH_COUNT(result_tab[P1[1]])); + + float mapq = 1.0f; +#ifdef USE_MAPQ_SCORE + int delta_corrected = MISMATCH_COUNT(result_tab[P1[0]]) + MAPQ_SCALING_FACTOR * ((MAX_SUBSTITUTION + 1) - delta); - set_variant(result_tab[P1[x]], ref_genome, reads_buffer, size_neighbour_in_symbols); - set_variant(result_tab[P2[x]], ref_genome, reads_buffer, size_neighbour_in_symbols); - } else { - pthread_mutex_lock(&nr_reads_mutex); + if(delta_corrected < 0) { + LOG_WARN("negative delta (np1 == 2) for square root %d\n", delta_corrected); + } + else if(delta >= DIST_SINGLE_THRESHOLD) { + mapq = 1.0 - sqrt((double)delta_corrected / SIZE_READ); +#else + if(delta >= DIST_SINGLE_THRESHOLD) { +#endif + if (get_use_frequency_table()) { + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P1[0], mapq, false); + } else { + set_variant(result_tab[P1[0]], ref_genome, reads_buffer); + } + update = true; + } + } + else if(np1) { + int delta = abs((int)MISMATCH_COUNT(result_tab[P1[0]]) - (MAX_SUBSTITUTION + 1)); + + float mapq = 1.0f; +#ifdef USE_MAPQ_SCORE + int delta_corrected = MISMATCH_COUNT(result_tab[P1[0]]) + MAPQ_SCALING_FACTOR * ((MAX_SUBSTITUTION + 1) - delta); + + if(delta_corrected < 0) { + LOG_WARN("negative delta (np1 == 1) for square root %d\n", delta_corrected); + } + else if(delta >= DIST_SINGLE_THRESHOLD) { + mapq = 1.0 - sqrt((double)delta_corrected / SIZE_READ); +#else + if(delta >= DIST_SINGLE_THRESHOLD) { +#endif + if (get_use_frequency_table()) { + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P1[0], mapq, false); + } else { + set_variant(result_tab[P1[0]], ref_genome, reads_buffer); + } + update = true; + } + } + + if(np2 == 2) { + + int delta = abs((int)MISMATCH_COUNT(result_tab[P2[0]]) - (int)MISMATCH_COUNT(result_tab[P2[1]])); + + float mapq = 1.0f; +#ifdef USE_MAPQ_SCORE + int delta_corrected = MISMATCH_COUNT(result_tab[P2[0]]) + MAPQ_SCALING_FACTOR * ((MAX_SUBSTITUTION + 1) - delta); + + if(delta_corrected < 0) { + LOG_WARN("negative delta (np2 == 2) for square root %d, %d %d %d %d\n", + delta_corrected, MISMATCH_COUNT(result_tab[P2[0]]), MISMATCH_COUNT(result_tab[P2[1]]), MAX_SUBSTITUTION + 1, delta); + } + else if(delta >= DIST_SINGLE_THRESHOLD) { + mapq = 1.0 - sqrt((double)delta_corrected / SIZE_READ); +#else + if(delta >= DIST_SINGLE_THRESHOLD) { +#endif + + if (get_use_frequency_table()) { + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P2[0], mapq, false); + } else { + set_variant(result_tab[P2[0]], ref_genome, reads_buffer); + } + update = true; + } + } + else if(np2) { + int delta = abs((int)MISMATCH_COUNT(result_tab[P2[0]]) - (MAX_SUBSTITUTION + 1)); + + float mapq = 1.0f; +#ifdef USE_MAPQ_SCORE + int delta_corrected = MISMATCH_COUNT(result_tab[P2[0]]) + MAPQ_SCALING_FACTOR * ((MAX_SUBSTITUTION + 1) - delta); + + if(delta_corrected < 0) { + LOG_WARN("negative delta (np2 == 1) for square root %d\n", delta_corrected); + } + else if (delta >= DIST_SINGLE_THRESHOLD) { + mapq = 1.0 - sqrt((double)delta_corrected / SIZE_READ); +#else + if (delta >= DIST_SINGLE_THRESHOLD) { +#endif + if (get_use_frequency_table()) { + hasIndel |= update_frequency_table(ref_genome, result_tab, reads_buffer, reads_quality_buffer, P2[0], mapq, false); + } else { + set_variant(result_tab[P2[0]], ref_genome, reads_buffer); + } + update = true; + } + } + STAT_RECORD_STEP(STAT_DO_PROCESS_READ, 4); + if(!update) { + //pthread_mutex_lock(&nr_reads_mutex); nr_reads_non_mapped++; - pthread_mutex_unlock(&nr_reads_mutex); + //pthread_mutex_unlock(&nr_reads_mutex); add_to_non_mapped_read(numpair * 4, round, fpe1, fpe2, reads_buffer); } + if(hasIndel) + nr_reads_with_indels++; pthread_mutex_lock(&nr_reads_mutex); - nr_reads_total++; + nr_reads_total_from_dpus++; pthread_mutex_unlock(&nr_reads_mutex); + } + STAT_RECORD_STEP(STAT_DO_PROCESS_READ, 5); } + STAT_RECORD_LAST_STEP(STAT_DO_PROCESS_READ, 6); } #define PROCESS_READ_THREAD (8) @@ -541,23 +997,31 @@ static bool stop_threads = false; void process_read(FILE *fpe1, FILE *fpe2, int round, unsigned int pass_id) { + STAT_RECORD_START(STAT_PROCESS_READ); int8_t *reads_buffer = get_reads_buffer(pass_id); - acc_results_t acc_res = accumulate_get_result(pass_id); - + float *reads_quality_buffer = get_reads_quality_buffer(pass_id); + acc_results_t acc_res = accumulate_get_result(pass_id, true); + nr_reads_total += get_reads_in_buffer(pass_id) / 4; curr_match = 0; args.nb_match = acc_res.nb_res; args.result_tab = acc_res.results; args.round = round; args.reads_buffer = reads_buffer; + args.reads_quality_buffer = reads_quality_buffer; args.fpe1 = fpe1; args.fpe2 = fpe2; + STAT_RECORD_STEP(STAT_PROCESS_READ, 0); pthread_barrier_wait(&barrier); + STAT_RECORD_STEP(STAT_PROCESS_READ, 1); do_process_read(&args); + STAT_RECORD_STEP(STAT_PROCESS_READ, 2); pthread_barrier_wait(&barrier); + STAT_RECORD_STEP(STAT_PROCESS_READ, 3); free(acc_res.results); + STAT_RECORD_LAST_STEP(STAT_PROCESS_READ, 4); } static void *process_read_thread_fct(void *arg) @@ -573,11 +1037,17 @@ static void *process_read_thread_fct(void *arg) void process_read_init() { +#if DEBUG_READ_MAPPING + open_mapping_file(); +#endif genome_t *ref_genome = genome_get(); args.ref_genome = ref_genome; assert(pthread_mutex_init(&curr_match_mutex, NULL) == 0); assert(pthread_mutex_init(&non_mapped_mutex, NULL) == 0); + for (int i=0; i #include "accumulateread.h" +#include "debug.h" #include "dispatch.h" #include "dpu_backend.h" #include "genome.h" @@ -21,6 +22,7 @@ #include "simu_backend.h" #include "upvc.h" #include "vartree.h" +#include "profiling.h" #include "backends_functions.h" @@ -39,6 +41,7 @@ static sem_t getreads_to_dispatch_sem, dispatch_to_exec_sem, exec_to_dispatch_se void *thread_get_reads(__attribute__((unused)) void *arg) { + STAT_RECORD_START(STAT_THREAD_GET_READS); FOREACH_RUN(dpu_offset) { FOR(NB_READS_BUFFER) { sem_post(&accprocess_to_getreads_sem); } @@ -55,32 +58,44 @@ void *thread_get_reads(__attribute__((unused)) void *arg) FOR(NB_READS_BUFFER) { sem_wait(&accprocess_to_getreads_sem); } } + STAT_RECORD_LAST_STEP(STAT_THREAD_GET_READS, 0); return NULL; } void exec_dpus() { + STAT_RECORD_START(STAT_EXEC_DPUS); FOREACH_RUN(dpu_offset) { backends_functions.load_mram(dpu_offset, 0); + STAT_RECORD_STEP(STAT_EXEC_DPUS, 0); + sem_wait(&dispatch_to_exec_sem); + STAT_RECORD_STEP(STAT_EXEC_DPUS, 1); + FOREACH_PASS(each_pass) { backends_functions.run_dpu( dpu_offset, each_pass, &exec_to_dispatch_sem, &acc_to_exec_sem, &exec_to_acc_sem, &dispatch_to_exec_sem); } + STAT_RECORD_STEP(STAT_EXEC_DPUS, 2); + backends_functions.wait_dpu(); + STAT_RECORD_STEP(STAT_EXEC_DPUS, 3); + sem_post(&exec_to_acc_sem); } + STAT_RECORD_LAST_STEP(STAT_EXEC_DPUS, 4); } void *thread_dispatch(__attribute__((unused)) void *arg) { + STAT_RECORD_START(STAT_THREAD_DISPATCH); FOREACH_RUN(dpu_offset) { FOR(NB_DISPATCH_AND_ACC_BUFFER) @@ -104,11 +119,13 @@ void *thread_dispatch(__attribute__((unused)) void *arg) sem_wait(&exec_to_dispatch_sem); } } + STAT_RECORD_LAST_STEP(STAT_THREAD_DISPATCH, 0); return NULL; } void *thread_acc(__attribute__((unused)) void *arg) { + STAT_RECORD_START(STAT_THREAD_ACC); FOREACH_RUN(dpu_offset) { FOR(NB_DISPATCH_AND_ACC_BUFFER) @@ -140,27 +157,38 @@ void *thread_acc(__attribute__((unused)) void *arg) sem_wait(&acc_to_exec_sem); } } + STAT_RECORD_LAST_STEP(STAT_THREAD_ACC, 0); return NULL; } void *thread_process(__attribute__((unused)) void *arg) { + struct timespec start_time, current_time; + STAT_RECORD_START(STAT_THREAD_PROCESS); sem_wait(&acc_to_process_sem); + clock_gettime(CLOCK_MONOTONIC_RAW, &start_time); + STAT_RECORD_STEP(STAT_THREAD_PROCESS, 0); FOREACH_PASS(each_pass) { process_read(fope1, fope2, round, each_pass); + clock_gettime(CLOCK_MONOTONIC_RAW, ¤t_time); + int spent = current_time.tv_sec-start_time.tv_sec; + printf("time spent: %02dh%02dm%02ds (%ds), id:%d\n", spent/3600, spent/60%60, spent%60, spent, each_pass); sem_post(&accprocess_to_getreads_sem); sem_wait(&acc_to_process_sem); } + STAT_RECORD_STEP(STAT_THREAD_PROCESS, 1); sem_post(&accprocess_to_getreads_sem); + STAT_RECORD_LAST_STEP(STAT_THREAD_PROCESS, 2); return NULL; } static void exec_round() { + STAT_RECORD_START(STAT_EXEC_ROUND); char filename[FILENAME_MAX]; char *input_prefix = get_input_path(); static unsigned int max_nb_pass; @@ -172,6 +200,7 @@ static void exec_round() fipe1 = fopen(filename, "r"); CHECK_FILE(fipe1, filename); assert(get_input_info(fipe1, &read_size1, &nb_read1) == 0); + printf("read_size1 %zu SIZE_READ %u\n", read_size1, SIZE_READ); assert(read_size1 == SIZE_READ); sprintf(filename, "%s_PE2.fastq", input_prefix); @@ -201,7 +230,9 @@ static void exec_round() assert(unlink(filename) == 0); } + STAT_RECORD_STEP(STAT_EXEC_ROUND, 0); accumulate_init(max_nb_pass); + STAT_RECORD_STEP(STAT_EXEC_ROUND, 1); pthread_t tid_get_reads; pthread_t tid_dispatch; @@ -225,6 +256,7 @@ static void exec_round() assert(ret == 0); ret = sem_init(&accprocess_to_getreads_sem, 0, 0); assert(ret == 0); + STAT_RECORD_STEP(STAT_EXEC_ROUND, 2); // CREATE ret = pthread_create(&tid_get_reads, NULL, thread_get_reads, NULL); @@ -235,9 +267,11 @@ static void exec_round() assert(ret == 0); ret = pthread_create(&tid_process, NULL, thread_process, NULL); assert(ret == 0); + STAT_RECORD_STEP(STAT_EXEC_ROUND, 3); // EXECUTE exec_dpus(); + STAT_RECORD_STEP(STAT_EXEC_ROUND, 4); // JOIN ret = pthread_join(tid_get_reads, NULL); @@ -248,6 +282,7 @@ static void exec_round() assert(ret == 0); ret = pthread_join(tid_process, NULL); assert(ret == 0); + STAT_RECORD_STEP(STAT_EXEC_ROUND, 5); // DESTROY ret = sem_destroy(&getreads_to_dispatch_sem); @@ -264,19 +299,26 @@ static void exec_round() assert(ret == 0); ret = sem_destroy(&accprocess_to_getreads_sem); assert(ret == 0); + STAT_RECORD_STEP(STAT_EXEC_ROUND, 6); accumulate_free(); + STAT_RECORD_STEP(STAT_EXEC_ROUND, 7); fclose(fipe1); fclose(fipe2); + STAT_RECORD_LAST_STEP(STAT_EXEC_ROUND, 8); } static void do_mapping() { + STAT_RECORD_START(STAT_DO_MAPPING); backends_functions.init_backend(&nb_dpus_per_run); - variant_tree_init(); + if (!get_use_frequency_table()) { + variant_tree_init(); + } dispatch_init(); process_read_init(); + STAT_RECORD_STEP(STAT_DO_MAPPING, 0); for (round = 0; round < NB_ROUND; round++) { printf("#################\n" @@ -285,12 +327,17 @@ static void do_mapping() round); exec_round(); } + STAT_RECORD_STEP(STAT_DO_MAPPING, 1); create_vcf(); + STAT_RECORD_STEP(STAT_DO_MAPPING, 2); process_read_free(); dispatch_free(); - variant_tree_free(); + if (!get_use_frequency_table()) { + variant_tree_free(); + } backends_functions.free_backend(); + STAT_RECORD_LAST_STEP(STAT_DO_MAPPING, 3); } static void print_time() @@ -309,9 +356,12 @@ static void print_time() int main(int argc, char *argv[]) { validate_args(argc, argv); + memset(profiling, 0, sizeof(profiling)); printf("%s\n", VERSION); print_time(); + for (int i=0; i<15; i++) + profiling[i] = (struct time_stat_t){0}; printf("Information:\n"); printf("\tread size: %d\n", SIZE_READ); @@ -360,7 +410,9 @@ int main(int argc, char *argv[]) printf("time: %f\n" "process time: %f\n" "ratio: %f\n", - time, process_time, process_time / time * 100.0); + time, process_time, process_time / time); + + PRINT_ALL_FUNCTION_STAT(); index_free(); genome_free(); diff --git a/host/src/vartree.c b/host/src/vartree.c index 0fd3d25..5ba08c6 100644 --- a/host/src/vartree.c +++ b/host/src/vartree.c @@ -2,17 +2,94 @@ * Copyright 2016-2019 - Dominique Lavenier & UPMEM */ +#include #include #include #include #include #include +#include #include "common.h" #include "genome.h" #include "parse_args.h" #include "upvc.h" #include "vartree.h" +#include "debug.h" + + +#define DUMP_FREQUENCY_TABLE false +#define FREQ_TABLE_DUMP_FILE_NAME "frequency_table.bin" +#if DUMP_FREQUENCY_TABLE +FILE *freq_table_dump_file; + +void open_freq_table() +{ + static char filename[FILENAME_MAX]; + sprintf(filename, "%s", FREQ_TABLE_DUMP_FILE_NAME); + freq_table_dump_file = fopen(filename, "w"); + if (freq_table_dump_file == NULL) { + LOG_FATAL("couldn't open frequency table dumping file; errno: %u\n", errno); + } +} + +void close_freq_table() +{ + fclose(freq_table_dump_file); +} + + + +//The pragma pack shouldn't have any effect here as of now but it might have an effect if struct content is modified +//In which case it should probably be kept. +#pragma pack(push,1) +struct freq_table_dump_entry_t{ + float freqs[5]; + uint16_t scores[5]; +}; +#pragma pack(pop) + +// freq_table_dump format: +// 0: uint8_t number of sequences = {n} +// 1 - n*4: uint16_t[n] sequence start addresses +// n*4+1 - end: struct freq_table_dump_entry_t[...] table entries +void write_freq_table_header(genome_t *ref_genome) +{ + uint8_t number_sequences = ref_genome->nb_seq; + uint64_t sequence_offsets[256]; + uint64_t header_size = 1 + sizeof(uint64_t)* (uint64_t) number_sequences; + uint64_t total_seq_length=0; + for(uint8_t seq_number=0; seq_numbernb_seq; seq_number++) { + sequence_offsets[seq_number] = header_size + total_seq_length*sizeof(struct freq_table_dump_entry_t); + total_seq_length += ref_genome->len_seq[seq_number]; + } + fwrite(&number_sequences, 1, 1, freq_table_dump_file); + fwrite(sequence_offsets, sizeof(uint64_t), number_sequences, freq_table_dump_file); +} + +void dump_freq_table_entry(int address, struct frequency_info **frequency_table) +{ + struct freq_table_dump_entry_t entry; + for (int i=0; i<5; i++) { + entry.freqs[i] = frequency_table[i][address].freq; + entry.scores[i] = frequency_table[i][address].score; + } + fwrite(&entry, sizeof(struct freq_table_dump_entry_t), 1, freq_table_dump_file); +} + +void dump_freq_table(genome_t *ref_genome, struct frequency_info **frequency_table) +{ + open_freq_table(); + write_freq_table_header(ref_genome); + for (uint32_t seq_number = 0; seq_number < ref_genome->nb_seq; seq_number++) { + for (uint64_t seq_position = 0; seq_position < ref_genome->len_seq[seq_number]; seq_position++) { + dump_freq_table_entry(ref_genome->pt_seq[seq_number] + seq_position, frequency_table); + } + } + close_freq_table(); +} + +#endif // DUMP_FREQUENCY_TABLE static variant_t **variant_list[MAX_SEQ_GEN] = { NULL }; static pthread_mutex_t mutex; @@ -22,6 +99,7 @@ void variant_tree_init() genome_t *genome = genome_get(); pthread_mutex_init(&mutex, NULL); for (unsigned int each_seq = 0; each_seq < genome->nb_seq; each_seq++) { + LOG_INFO("allocating variant_list (%luMB)\n", sizeof(variant_t*) * genome->len_seq[each_seq]/1000000); variant_list[each_seq] = (variant_t **)calloc(genome->len_seq[each_seq], sizeof(variant_t *)); } } @@ -36,14 +114,13 @@ void variant_tree_insert(variant_t *var, uint32_t seq_nr, uint32_t offset_in_chr vars->depth++; vars->score += var->score; free(var); - goto end; + pthread_mutex_unlock(&mutex); + return; } vars = vars->next; } var->next = *entry; *entry = var; - -end: pthread_mutex_unlock(&mutex); } @@ -103,7 +180,7 @@ depth_filter_t indel_filter[] = { [10] = { 1, 30 }, [11] = { 1, 40 }, }; -#elif (SIZE_READ == 150) +#elif (SIZE_READ <= 160) && (SIZE_READ>=140) depth_filter_t sub_filter[] = { [3] = { 15, 16 }, [4] = { 17, 20 }, @@ -151,6 +228,38 @@ static bool homopolymer(int8_t *seq, int offset) return true; } +static bool print_var_from_freq_table(variant_t *var, uint32_t seq_nr, uint64_t seq_pos, genome_t *ref_genome, FILE *vcf_file) +{ + char *chr = ref_genome->seq_name[seq_nr]; + uint64_t genome_pos = ref_genome->pt_seq[seq_nr] + seq_pos; + uint32_t cov = ref_genome->mapping_coverage[genome_pos]; + uint32_t depth = var->depth; + uint32_t score = var->score / depth; + + uint32_t ref_len = strlen(var->ref); + uint32_t alt_len = strlen(var->alt); + + if (!get_no_filter()) { + + if (ref_len == alt_len) { /* SUBSTITUTION */ + if (depth > 20) { + depth = 20; + } + } else { /* INSERTION OR DELETION */ + if (depth < 2) { + return false; + } else if (depth > 11) { + depth = 11; + } + } + } + + fprintf(vcf_file, "%s\t%lu\t.\t%s\t%s\t.\t.\tDEPTH=%d;COV=%d;SCORE=%d\n", chr, seq_pos+1, var->ref, var->alt, var->depth, cov, + score); + + return true; +} + static bool print_variant_tree(variant_t *var, uint32_t seq_nr, uint64_t seq_pos, genome_t *ref_genome, FILE *vcf_file) { char *chr = ref_genome->seq_name[seq_nr]; @@ -169,36 +278,178 @@ static bool print_variant_tree(variant_t *var, uint32_t seq_nr, uint64_t seq_pos return false; } - if (get_no_filter()) - goto print; + if (!get_no_filter()) { - if (ref_len == alt_len) { /* SUBSTITUTION */ - if (depth < 3) { - return false; - } else if (depth > 20) { - depth = 20; - } - if (!(score <= sub_filter[depth].score && percentage >= sub_filter[depth].percentage)) { - return false; - } - } else { /* INSERTION OR DELETION */ - if (depth < 2) { - return false; - } else if (depth > 11) { - depth = 11; - } - if (!(score <= indel_filter[depth].score && percentage >= indel_filter[depth].percentage)) { - return false; + if (ref_len == alt_len) { /* SUBSTITUTION */ + if (depth < 3) { + return false; + } else if (depth > 20) { + depth = 20; + } + if (!(score <= sub_filter[depth].score && percentage >= sub_filter[depth].percentage)) { + return false; + } + } else { /* INSERTION OR DELETION */ + if (depth < 2) { + return false; + } else if (depth > 11) { + depth = 11; + } + if (!(score <= indel_filter[depth].score && percentage >= indel_filter[depth].percentage)) { + return false; + } } } -print: - fprintf(vcf_file, "%s\t%lu\t.\t%s\t%s\t.\t.\tDEPTH=%d;COV=%d;SCORE=%d\n", chr, seq_pos, var->ref, var->alt, var->depth, cov, - score); + fprintf(vcf_file, "%s\t%lu\t.\t%s\t%s\t.\t.\tDEPTH=%d;COV=%d;SCORE=%d\n", chr, seq_pos+1, var->ref, var->alt, var->depth, cov, + score); return true; } +/** + Few configurations suggested by Bertil + (i) D=3: 25%, D=4: 20%, D=5: 15%; D>=6: 10% + (ii) D=2: 30%, D=3: 25%, D=4: 20%, D=5: 15%; D>=6: 10% + (iii) D=3: 20%, D=4: 15%, D>=5: 10%; + **/ + +__attribute__((unused)) uint32_t depth_filter1(float freq) { + if(freq < 10.0f) + return UINT_MAX; + if(freq < 15.0f) + return 6; + if(freq < 20.0f) + return 5; + if(freq < 25.0f) + return 4; + return 3; +} + +__attribute__((unused)) uint32_t depth_filter2(float freq) { + if(freq < 10.0f) + return UINT_MAX; + if(freq < 15.0f) + return 6; + if(freq < 20.0f) + return 5; + if(freq < 25.0f) + return 4; + if(freq < 30.0f) + return 3; + return 2; +} + +__attribute__((unused)) uint32_t depth_filter3(float freq) { + if(freq < 10.0f) + return UINT_MAX; + if(freq < 15.0f) + return 5; + if(freq < 20.0f) + return 4; + return 3; +} + +__attribute__((unused)) uint32_t depth_filter_a(float freq) { + if(freq >= 20) + return 3; + if(freq >= 15) + return 5; + return UINT_MAX; +} + +__attribute__((unused)) uint32_t depth_filter_permissive(float freq) { + if (freq< 10.0f) { + return 3; + } + if (freq<20.0f) { + return 2; + } + return 1; +} + +__attribute__((unused)) uint32_t depth_filter_fixed_3(float freq) { + + if(freq < 20.0f) + return UINT_MAX; + return 3; +} + +__attribute__((unused)) uint32_t depth_filter_fixed_3_f15(float freq) { + + if(freq < 15.0f) + return UINT_MAX; + return 3; +} + +#define AFFINE_B 2.9795 +#define AFFINE_A 0.152036 + +float reverse_filter(uint32_t score) { + return AFFINE_A*(float)score + AFFINE_B; +} + +FILE * dbg_file = NULL; +FILE * sub_file = NULL; + +static void get_most_frequent_variant(genome_t * ref_genome, struct frequency_info ** frequency_table, uint32_t seq_number, uint64_t seq_position, variant_t * results) { + + const char nucleotide[4] = { 'A', 'C', 'T', 'G' }; + + uint64_t genome_pos = ref_genome->pt_seq[seq_number] + seq_position; + + float total = 0; + for(int i = 0; i < 5; ++i) { + total += frequency_table[i][genome_pos].freq; + results[i].depth = 0; + results[i].score = 0; + } + if(total == 0) + return ;//results; + + for(int i = 0; i < 5; ++i) { + float freq = frequency_table[i][genome_pos].freq; + //uint32_t score = frequency_table[i][genome_pos].score; + if(i == ref_genome->data[genome_pos]) { + continue; // not a variant if the same nucleotide as in reference genome + } + + // if frequency and depth pass the threshold, consider it a variant + if(freq > reverse_filter(total)) { + + // this is a substitution, create variant + results[i].score = frequency_table[i][genome_pos].score; + results[i].depth = frequency_table[i][genome_pos].score; + results[i].ref[0] = nucleotide[ref_genome->data[genome_pos]]; + results[i].ref[1] = '\0'; + results[i].alt[0] = nucleotide[i]; + results[i].alt[1] = '\0'; + } else { + results[i].score = 0; + results[i].depth = 0; + } + } + //printf("get_most_frequent_variant: genome_pos %lu, nucleotide max freq %d %f %c\n", genome_pos, nucId, max, nucId >= 0 ? nucleotide[nucId] : '-'); + + // return results; +} + +static void add_codependence_to_freq_table(struct frequency_info** frequency_table, struct variants_codependence_info_list* codependence_list, uint64_t position, uint8_t letter, float freq_update) { + for (;codependence_list != NULL; codependence_list=codependence_list->next_list) { + for (int i = 0; icontent[i].key != 0; i++) { + uint8_t current_letter = codependence_list->content[i].key & 0x3; + if (current_letter == letter) { + uint8_t other_letter = (codependence_list->content[i].key >> 2) & 0x3; + int64_t position_delta = codependence_list->content[i].key >> 4; + frequency_table[other_letter][position + position_delta].freq += (codependence_list->content[i].codependence_count) * freq_update; + } + } + } +} + +#define POSITIVE_COD_INFLUENCE 0.0103517 +#define NEGATIVE_COD_INFLUENCE -0.80034 + void create_vcf() { double start_time = my_clock(); @@ -235,20 +486,176 @@ void create_vcf() /* ####### END OF HEADER ####### */ - /* for each sequence in the genome */ - for (uint32_t seq_number = 0; seq_number < ref_genome->nb_seq; seq_number++) { - /* for each position in the sequence */ - for (uint64_t seq_position = 0; seq_position < ref_genome->len_seq[seq_number]; seq_position++) { - variant_t *var = variant_list[seq_number][seq_position]; - while (var != NULL) { - nb_variant += print_variant_tree(var, seq_number, seq_position, ref_genome, vcf_file) ? 1 : 0; - var = var->next; + uint32_t nb_pos_multiple_var = 0; + + /** + * Dump debugging information: frequency table for a given set of positions + **/ +#if 0 + dbg_file = fopen("freq_debug.txt", "w"); + sub_file = fopen("subst.txt", "r"); + assert(sub_file); + unsigned seq = 0; + uint64_t pos = 0; + static char nucleotide[4] = { 'A', 'C', 'T', 'G' }; + fprintf(dbg_file, "# seq pos ref-nucleotide frequencies:A C T G\n"); + while (EOF != fscanf(sub_file, "%u %lu\n", &seq, &pos)) + { + assert(seq > 0); + seq--; + assert(pos > 0); + pos--; + for (int inc = -2; inc <= 2; inc++) { + if(inc > 0 && pos + inc >= ref_genome->len_seq[seq]) continue; + if(inc < 0 && pos < abs(inc)) continue; + uint64_t genome_pos = ref_genome->pt_seq[seq] + pos + inc; + if(genome_pos > genome_get()->fasta_file_size) { + printf("WARNING: wrong genome position %lu. seq %u pos %lu inc %d\n", genome_pos, seq, pos, inc); + continue; + } + fprintf(dbg_file, "%u %lu %c ", seq + 1, pos + inc + 1, nucleotide[ref_genome->data[genome_pos]]); + float total = 0.0f; + for(int m = 0; m < 5; ++m) { + total += frequency_table[m][genome_pos].freq; + } + for(int m = 0; m < 4; ++m) { + fprintf(dbg_file, "(%f %u %f %u) ", + frequency_table[m][genome_pos].freq, frequency_table[m][genome_pos].score, + frequency_table[m][genome_pos].freq * 100.0 / total, depth_filter(frequency_table[m][genome_pos].freq * 100.0 / total)); + } + fprintf(dbg_file, "\n"); + } + } + fclose(dbg_file); + fclose(sub_file); +#endif + + + unsigned int uncovered_nucleotides = 0; + unsigned int badly_covered_nucleotides = 0; + unsigned int well_covered_nucleotides = 0; + unsigned int overly_covered_nucleotides = 0; + unsigned int max_coverage = 0; + unsigned int position_most_coverage = 0; + unsigned int chromosome_most_coverage = 99999; + uint64_t total_coverage = 0; + uint64_t total_cov_squared = 0; + variant_t variants_to_call[5]; + if (get_use_frequency_table()) { // Using freq-table and not var-tree + struct frequency_info **frequency_table = get_frequency_table(); + struct variants_codependence_info_list** codependence_table = get_codependence_table(NULL);// giving a NULL pointer should only work if the table is already allocated; which should be the case +#if DUMP_FREQUENCY_TABLE + printf("dumping frequency table...\n"); + dump_freq_table(ref_genome, frequency_table); + printf("table done dumping; starting variant calling...\n"); +#endif + // First pass on the frequency table to take into account variant codependence + LOG_INFO("doing first pass of vc\n"); + /* for each sequence in the genome */ + for (uint32_t seq_number = 0; seq_number < ref_genome->nb_seq; seq_number++) { + /* for each position in the sequence */ + LOG_INFO("sequence %u\n", seq_number); + for (uint64_t seq_position = 0; seq_position < ref_genome->len_seq[seq_number]; seq_position++) { + get_most_frequent_variant(ref_genome, frequency_table, seq_number, seq_position, variants_to_call); + for (uint8_t i=0; i<4; i++) { + uint64_t genome_pos = ref_genome->pt_seq[seq_number] + seq_position; + if (variants_to_call[i].depth) { + add_codependence_to_freq_table(frequency_table, codependence_table[genome_pos], genome_pos, i, POSITIVE_COD_INFLUENCE); + } else if (frequency_table[i][genome_pos].score>0) { + add_codependence_to_freq_table(frequency_table, codependence_table[genome_pos], genome_pos, i, NEGATIVE_COD_INFLUENCE); + } + } + } + } + + free_codependence_chunks(); + + LOG_INFO("doing second and final pass of vc\n"); + /* for each sequence in the genome */ + for (uint32_t seq_number = 0; seq_number < ref_genome->nb_seq; seq_number++) { + /* for each position in the sequence */ + LOG_INFO("sequence %u\n", seq_number); + for (uint64_t seq_position = 0; seq_position < ref_genome->len_seq[seq_number]; seq_position++) { + + get_most_frequent_variant(ref_genome, frequency_table, seq_number, seq_position, variants_to_call); + unsigned int total_score = 0; + total_score += frequency_table[0][ref_genome->pt_seq[seq_number] + seq_position].score; + total_score += frequency_table[1][ref_genome->pt_seq[seq_number] + seq_position].score; + total_score += frequency_table[2][ref_genome->pt_seq[seq_number] + seq_position].score; + total_score += frequency_table[3][ref_genome->pt_seq[seq_number] + seq_position].score; + total_coverage += total_score; + total_cov_squared += total_score*total_score; + //total_score += frequency_table[4][ref_genome->pt_seq[seq_number] + seq_position].score; + if (total_score == 0) { + uncovered_nucleotides++; + } else if (total_score < 10) { + badly_covered_nucleotides++; + } else if (total_score < 90) { + well_covered_nucleotides++; + } else { + overly_covered_nucleotides++; + if (total_score > max_coverage) { + max_coverage = total_score; + chromosome_most_coverage = seq_number; + position_most_coverage = seq_position; + } + } + int nb_var = 0; + for(int i = 0; i < 5; ++i) { + variant_t * var = &variants_to_call[i]; + if(var->depth) { + // LOG_DEBUG("calling variant %d at %u:%lu, freq:%f\n", i, seq_number, seq_position, frequency_table[i][ref_genome->pt_seq[seq_number] + seq_position].freq); + nb_variant += print_var_from_freq_table(var, seq_number, seq_position, ref_genome, vcf_file) ? 1 : 0; + nb_var++; + } + if(nb_var > 1) + nb_pos_multiple_var++; + } + } + } + + unsigned long total_nucleotides = overly_covered_nucleotides+well_covered_nucleotides+badly_covered_nucleotides+uncovered_nucleotides; + printf("\tuncovered nucleotides: %lu (%lu.%lu%%)\n", + (long)uncovered_nucleotides, + (long)uncovered_nucleotides*100/total_nucleotides, + (long)uncovered_nucleotides*10000/total_nucleotides%100); + printf("\tbadly covered nucleotides (less than 10 reads): %lu (%lu.%lu%%)\n", + (long)badly_covered_nucleotides, + (long)badly_covered_nucleotides*100/total_nucleotides, + (long)badly_covered_nucleotides*10000/total_nucleotides%100); + printf("\twell covered nucleotides (10 to 90 reads): %lu (%lu.%lu%%)\n", + (long)well_covered_nucleotides, + (long)well_covered_nucleotides*100/total_nucleotides, + (long)well_covered_nucleotides*10000/total_nucleotides%100); + printf("\toverly covered nucleotides (more than 90 reads): %lu (%lu.%lu%%)\n", + (long)overly_covered_nucleotides, + (long)overly_covered_nucleotides*100/total_nucleotides, + (long)overly_covered_nucleotides*10000/total_nucleotides%100); + printf("\tmax coverage: %u reads\n", max_coverage); + printf("\tmax coverage position: chr%u:%u\n", chromosome_most_coverage, position_most_coverage); + printf("\ttotal coverage: %lu (eq %lu reads; or %lux coverage)\n", total_coverage, (long)total_coverage/SIZE_READ, (long)total_coverage/total_nucleotides); + double mean = ((double)total_coverage) / (double) total_nucleotides; + printf("\tmean cov: %f (std dev: %f)\n", mean, sqrt((double)total_cov_squared/(double)total_nucleotides - mean*mean)); + + } else { // Using var-tree and not freq-table + LOG_INFO("doing first and only pass of vc\n"); + for (uint32_t seq_number = 0; seq_number < ref_genome->nb_seq; seq_number++) { + /* for each position in the sequence */ + LOG_INFO("sequence %u\n", seq_number); + for (uint64_t seq_position = 0; seq_position < ref_genome->len_seq[seq_number]; seq_position++) { + variant_t *var = variant_list[seq_number][seq_position]; + while (var != NULL) { + nb_variant += print_variant_tree(var, seq_number, seq_position, ref_genome, vcf_file) ? 1 : 0; + var = var->next; + } } } } + free_frequency_table(); fclose(vcf_file); - printf("\tnumber of variants: %d\n", nb_variant); + printf("\tnumber of variants: %d (multiple %d)\n", nb_variant, nb_pos_multiple_var); printf("\ttime: %lf s\n", my_clock() - start_time); + fflush(stdout); } diff --git a/tests/compareVCF.py b/tests/compareVCF.py index 0678c85..779526f 100755 --- a/tests/compareVCF.py +++ b/tests/compareVCF.py @@ -241,7 +241,7 @@ def print_stat(tp_stats, fp_stats, nb_tp, nb_fp, nb_cm): ############################################################################### -def compute_for_pos(v1_infos, v2_infos, tp, fp, tp_stat, fp_stat): +def compute_for_pos(v1_infos, v2_infos, tp, fp, tp_stat, fp_stat, chr, pos, dump): for (ref, alt), v1_info in v1_infos.items(): if (ref, alt) in v2_infos: tp += 1 @@ -249,19 +249,24 @@ def compute_for_pos(v1_infos, v2_infos, tp, fp, tp_stat, fp_stat): else: fp += 1 update_stat(fp_stat, v1_info) + if(dump): + print(chr, pos, ref, "->", alt) return tp, fp -def compute(V1, V2, tp_stat, fp_stat): +def compute(V1, V2, tp_stat, fp_stat, dump): tp = 0 fp = 0 for (chr, pos), v1_infos in V1.items(): if (chr, pos) in V2: tp, fp = compute_for_pos( - v1_infos, V2[(chr, pos)], tp, fp, tp_stat, fp_stat) + v1_infos, V2[(chr, pos)], tp, fp, tp_stat, fp_stat, chr, pos, dump) else: fp += len(v1_infos) update_stat_for_pos(fp_stat, v1_infos) + if(dump): + for (ref, alt), v1_info in v1_infos.items(): + print(chr, pos, ref, "->", alt) return tp, fp @@ -272,13 +277,13 @@ def print_VCF_quality(tp, fp, fn, cm, len_upvc, len_ref): print("cm:\t%.2f%%\t(%d/%d)" % (per(cm, len_ref), cm, len_ref)) -def compute_data(V_ref, V_upvc, len_ref, len_upvc): +def compute_data(V_ref, V_upvc, len_ref, len_upvc, dump=False): stats = {"tp": {}, "fp": {}} if args.enable_stat else None tp_stat = stats["tp"] if args.enable_stat else None fp_stat = stats["fp"] if args.enable_stat else None - tp, fp = compute(V_upvc, V_ref, tp_stat, fp_stat) - cm, fn = compute(V_ref, V_upvc, None, None) + tp, fp = compute(V_upvc, V_ref, tp_stat, fp_stat, False) + cm, fn = compute(V_ref, V_upvc, None, None, dump) # print_stat(tp_stat, fp_stat, tp, fp, cm) @@ -438,7 +443,7 @@ def get_data(filename, extract): args.upvc_file, True) print("\nsubstitution") -compute_data(SUB_ref, SUB_upvc, len_ref_sub, len_upvc_sub) +compute_data(SUB_ref, SUB_upvc, len_ref_sub, len_upvc_sub, True) print("\ninsertions") compute_data(INS_ref, INS_upvc, len_ref_ins, len_upvc_ins) diff --git a/tests/igvlike-focus.py b/tests/igvlike-focus.py new file mode 100755 index 0000000..a4887fb --- /dev/null +++ b/tests/igvlike-focus.py @@ -0,0 +1,190 @@ +#! /usr/bin/python3 +import argparse +import itertools +import mmap +import sys + +verbose = False + +def main(arguments): + global verbose + if arguments.verbosity: + verbose = len(arguments.verbosity) + chromosome = "chr"+str(arguments.chr) + start = arguments.index-arguments.context + end = arguments.index+arguments.context+1 + if arguments.before != None: + start = arguments.index-arguments.before + if arguments.after != None: + end = arguments.index+arguments.after + fasta_file_name = None + mapping_file_names = [] + for f in arguments.files: + extension = f.split(".")[-1] + if extension == "fasta": + if fasta_file_name == None: + fasta_file_name = f + else: + print("Can't handle more than one fasta file") + return 1 + elif extension == "map": + mapping_file_names.append(f) + else: + print("unknown file extension :", extension) + print("for :", f) + print("please only use fasta and map files"); + + insertions = [0]*(end-start) + block = [] + + if fasta_file_name != None: + block.append(list(zip(range(start, end), get_genome_part(fasta_file_name, chromosome, start, end, arguments.index)))) + + if verbose: + print("parsing intersecting mappings") + for mapping_start_address, mapping_string in get_intersecting_mappings(mapping_file_names, chromosome, start, end, arguments.max_read_size): + if verbose>1: + print("found mapping at :", mapping_start_address) + this_line_insertions = [0]*(end-start+60) + block.append([]) + address = max(start-1, mapping_start_address-1) + i = address-mapping_start_address+1 + while address": + current_chr = line[1:].strip() + continue + endline_address = address+line_length + if current_chr == chromosome and endline_address>start: + for i in range(max(start-address, 0), min(end-address, line_length)): + if address+i == index-1: + yield line[i].upper() + else: + yield line[i].lower() + if endline_address>end: + return + address = endline_address + +def get_intersecting_mappings(file_names, chromosome, start, end, max_read_size): + for file_name in file_names: + with get_index(file_name) as index_file: + with open(file_name, mode="r", encoding="utf-8") as map_file: + with mmap.mmap(map_file.fileno(), length=0, access=mmap.ACCESS_READ) as mmap_map_file: + last_address=0 + last_chr = None + current_chr = None + address = 0 + for line in index_file.readlines(): + last_address = address + last_chr = current_chr + current_chr, index, address = line.split() + index = int(index) + address = int(address) + if current_chr == chromosome and index >= start or current_chr != chromosome and last_chr == chromosome: + if verbose>1: + print("starting reading ("+current_chr+":"+str(index)+") at "+str(address)) + break + mmap_map_file.seek(int(last_address)) + yielded_line = False + for line in mmap_readlines(mmap_map_file): + if line == b'': + continue + current_chr = line.split()[0].decode() + index = int(line.split()[1]) + if not(yielded_line) and index+max_read_sizeend-1: + break + if current_chr!=chromosome and yielded_line: + break + +def mmap_readlines(mmap_obj): + while True: + try: + yield mmap_obj.readline() + except: + return + +def get_index(file_name): + try: + return open(file_name+".index", mode="r") + except IOError: + create_index(file_name) + return open(file_name+".index", mode="r") + +def create_index(file_name): + print("creating index for", file_name) + with open(file_name+".index", mode="w") as index_file: + with open(file_name, mode="r", encoding="utf-8") as map_file: + with mmap.mmap(map_file.fileno(), length=0, access=mmap.ACCESS_READ) as mmap_map_file: + if verbose: + print("file openned for indexing") + for i in itertools.count(): + if verbose>1: + print("indexing :", i*10000000) + try: + mmap_map_file.seek(i*10000000) + except ValueError: + return + if i>0: + mmap_map_file.readline() + line_address = mmap_map_file.tell() + line = mmap_map_file.readline().split() + chromosome = line[0] + index = line[1] + index_file.write(chromosome.decode()+"\t"+index.decode()+"\t"+str(line_address)+"\n") + + +if __name__=="__main__": + parser = argparse.ArgumentParser(description="show read mappings around a specific part of the genome.") + parser.add_argument("files", nargs="+", type=str, help="Fasta and map files to read from") + parser.add_argument("-c", "--chr", type=int, dest="chr", help="The chromosome in which to show the read mappings") + parser.add_argument("-i", "--index", type=int, dest="index", help="The address around which to show read mappings") + parser.add_argument("-A", "--after", default=None, type=int, dest='after', help="how many bases to show after address") + parser.add_argument("-B", "--before", default=None, type=int, dest='before', help="how many bases to show before address") + parser.add_argument("-C", "--context", default=40, type=int, dest='context', help="how many bases to show before and after address") + parser.add_argument("-s", "--max-read-size", default=150, type=int, dest="max_read_size", help="the maximum length of a read; preferably exact") + parser.add_argument("-v", action="append_const", dest="verbosity", const=True, help="add verbosity; flag may be set multiple times for more verbosity") + parser.add_argument("-p", "--progressive", action='store_true', dest="progressive", help="print results everytime a new read is found") + main(parser.parse_args()) diff --git a/tests/read_freq_dump.py b/tests/read_freq_dump.py new file mode 100755 index 0000000..eeddd9f --- /dev/null +++ b/tests/read_freq_dump.py @@ -0,0 +1,81 @@ +#! /usr/bin/python3 +import argparse +import itertools +import mmap +import sys +import struct + +class freq_dump: + def __init__(self, file_name): + self.file_object = open(file_name, mode="rb") + self.mmap_freq_dump = mmap.mmap(self.file_object.fileno(), length=0, access=mmap.ACCESS_READ) + self.n_seq = self.mmap_freq_dump.read_byte() + self.seq_starts = [int.from_bytes(self.mmap_freq_dump.read(8), byteorder='little') for _ in range(self.n_seq)] + + def read_address(self, sequence_id, index): + self.mmap_freq_dump.seek(self.seq_starts[sequence_id] + index*30) + freqs = [struct.unpack('f', self.mmap_freq_dump.read(4))[0] for _ in range(5)] + scores = [int.from_bytes(self.mmap_freq_dump.read(2), byteorder='little') for _ in range(5)] + return freqs, scores + + def get_string_from_address(self, seq_n, index): + freqs, scores = self.read_address(seq_n-1, index-1) + return " ".join(str(v) for v in [f'{seq_n: >2}', f'{index: >8}', "", " ".join(f'{v:7.2f}' for v in freqs), "\t", " ".join(f'{v: >4}' for v in scores)]) + + def __del__(self): + self.close() + + def close(self): + self.mmap_freq_dump.close() + self.file_object.close() + + +def main(args): + context = args.context + dump_file_name = args.dump_file + freq_dump_obj = freq_dump(dump_file_name) + for address_file_name in args.address_files: + with open(address_file_name, "r") as address_file: + focuses = [] + last_dump = (0,0) + for _ in range(context): + line = address_file.readline() + if line=="": + break + split_line = line.split(" ") + seq_n = int(split_line[0]) + index = int(split_line[1]) + focuses.append((seq_n, index)) + while focuses!=[]: + line = address_file.readline() + if line!="": + split_line = line.split(" ") + seq_n = int(split_line[0]) + index = int(split_line[1]) + focuses.append((seq_n, index)) + seq_n, index = focuses[0] + start = index-context + end = index+context + if last_dump[0] == seq_n and last_dump[1]+1 >= start: + start = max(start, last_dump[1]+1) + else: + print() + for i in range(start, end+1): + if (seq_n, i) in focuses: + print("* "+freq_dump_obj.get_string_from_address(seq_n, i)) + else: + print(" "+freq_dump_obj.get_string_from_address(seq_n, i)) + last_dump = (seq_n, end) + focuses.pop(0) + freq_dump_obj.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="show frequency table dump in human readable format around specific indices in the genome") + parser.add_argument("dump_file", type=str, help="Frequency table bin file to read from") + parser.add_argument("address_files", nargs="+", type=str, help="files containing addresses around which to show frequency table") + parser.add_argument("-C", "--context", default=2, type=int, dest="context", help="how many addresses to show before and after the focused addresses") + try: + main(parser.parse_args()) + except BrokenPipeError: + pass