From a285818fb1c1b4e49b9cb3dc489d73eeb2de059d Mon Sep 17 00:00:00 2001 From: tdehoff1 Date: Tue, 25 Nov 2025 15:39:30 -0500 Subject: [PATCH 01/15] started hybrid version --- hybrid.cpp | 284 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) create mode 100644 hybrid.cpp diff --git a/hybrid.cpp b/hybrid.cpp new file mode 100644 index 0000000..7f72c90 --- /dev/null +++ b/hybrid.cpp @@ -0,0 +1,284 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +vector readers_q; +vector>> reducer_queues; +unordered_map global_counts; + +size_t total_words; +size_t files_remain; +int num_reducers; + +omp_lock_t readers_lock; +vector reducer_locks; +omp_lock_t global_counts_lock; + +void process_word(string &w) { + // Remove punctuation at beginning + while (!w.empty() && ispunct(w[0])) { + w.erase(0, 1); + } + // Remove punctuation at end + while (!w.empty() && ispunct(w[w.size() - 1])) { + w.pop_back(); + } + // Convert all letters to lowercase + for (size_t i = 0; i < w.length(); ++i) { + if (isupper(w[i])) { + w[i] = tolower(w[i]); + } + } +} + +void read_file (char* fname) { + size_t wc = 0; + ifstream fin(fname); + if (!fin) { + fprintf(stderr, "error: unable to open input file: %s\n", fname); + exit(1); + } + + // Process words in chunks to reduce locking + const int chunk_size = 1024; // select the best chunk size + vector words; + words.reserve(chunk_size); + + string word; + while (fin >> word) { + process_word(word); + if (!word.empty()) { // avoid pushing empty strings + wc++; + words.push_back(word); + } + } + omp_set_lock(&readers_lock); + readers_q.insert(readers_q.end(), make_move_iterator(words.begin()), make_move_iterator(words.end())); + omp_unset_lock(&readers_lock); + + #pragma omp atomic + total_words += wc; + + #pragma omp atomic + files_remain--; +} + +int hash_str(string s, int R) { + int sum = 0; + for (unsigned char c : s) { + sum += c; + } + return sum % R; +} + +void mapping_step() { + unordered_map buckets; + + // Grab elemnts from the work q in chunks + const int chunk_size = 1024; // find which chunk size works the best + vector working_batch; + working_batch.reserve(chunk_size); + + while (true) { + working_batch.clear(); + + // Lock and grab new chunk of elements if queue is not empty + omp_set_lock(&readers_lock); + for (size_t i = 0; i < chunk_size && !readers_q.empty(); ++i) { + working_batch.push_back(readers_q.back()); + readers_q.pop_back(); + } + omp_unset_lock(&readers_lock); + + if (!working_batch.empty()) { + // Queue not empty -- process new elements + for (size_t i = 0; i < working_batch.size(); ++i) { + buckets[working_batch[i]]++; + } + } + else { + int remaining; + // Shared global variable -- must be read atomically + #pragma omp atomic read + remaining = files_remain; + + if (remaining == 0) { + // Queue empty and all files are processed + break; + } + else { + // Mappers are ahead of readers + #pragma omp taskyield + } + } + } + + // Push thread's results into the reducer queues + for (auto el : buckets) { + int index = hash_str(el.first, num_reducers); + omp_set_lock(&reducer_locks[index]); + reducer_queues[index].push(el); + omp_unset_lock(&reducer_locks[index]); + } +} + +void reduce_step(int id) { + // Use local hash table for partial results + unordered_map local_result; + while (!reducer_queues[id].empty()) { + pair cur_entry = reducer_queues[id].front(); + reducer_queues[id].pop(); + local_result[cur_entry.first] += cur_entry.second; + } + // Merge partial results into global results + omp_set_lock(&global_counts_lock); + for (auto &el : local_result) { + global_counts[el.first] += el.second; + } + omp_unset_lock(&global_counts_lock); +} + +int main(int argc, char* argv[]) { + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided); + + int rank, size; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + + if (provided < MPI_THREAD_FUNNELED) { + printf("Error: MPI_THREAD_FUNNELED is not supported.\n"); + MPI_Abort(MPI_COMM_WORLD, 1); + } else { + printf("Rank %d: threading level provided = %d\n", rank, provided); + } + + if (argc < 2) { + fprintf(stderr, "usage: %s \n", argv[0]); + MPI_Abort(MPI_COMM_WORLD, 1); + } + + int n_threads = omp_get_max_threads(); + int num_mappers = n_threads; + num_reducers = n_threads * 2; // Works best on my laptop -- test on ISAAC + files_remain = argc - 1;\ + + if (rank == 0) { + cerr << "Testing " << n_threads << " thread(s), " << size << " processes\n"; + } + + omp_init_lock(&readers_lock); + omp_init_lock(&global_counts_lock); + reducer_locks.resize(num_reducers); + for (int i = 0; i < num_reducers; ++i) { + omp_init_lock(&reducer_locks[i]); + } + reducer_queues.resize(num_reducers); + + double start, end, start_r, start_p; + start = MPI_Wtime(); + + int done = 0; + // File reading step + if (rank == 0) { + size_t f_count = 1; + MPI_Status stat; + int tmp; + + while (f_count < argc) { + // Use tag = 1 for requests + MPI_Recv(&tmp, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &stat); + int requesting_rank = stat.MPI_SOURCE; + // Use tag = 2 for responds + MPI_Send(&f_count, 1, MPI_INT, requesting_rank, 2, MPI_COMM_WORLD); + f_count++; + } + // All files have been distributed here, broadcast "work done" + done = 1; + MPI_Bcast(&done, 1, MPI_INT, 0, MPI_COMM_WORLD); + } + else { + int rec_buff = 0; + while (!done) { + // Send non-blocking request + MPI_Send(&rec_buff, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + MPI_Recv(&rec_buff, 1, MPI_INT, 0, 2, MPI_COMM_WORLD, MPI_STATUSES_IGNORE); + + read_file(argv[rec_buff]); + } + } + + #pragma omp parallel + { + #pragma omp single + { + + + while (argv[f_count]) { + #pragma omp task firstprivate(f_count) + { + read_file(argv[f_count]); + } + f_count++; + } + + // Mapping step + for (int i = 0; i < num_mappers; ++i) { + #pragma omp task + { + mapping_step(); + } + } + } + } + + start_r = omp_get_wtime(); + // Reducing step + #pragma omp parallel for + for (int i = 0; i < num_reducers; ++i) { + reduce_step(i); + } + + start_p = omp_get_wtime(); + vector> counts; + for (auto &el : global_counts) { + counts.emplace_back(el.first, el.second); + } + + // Sort in alphabetical order + sort(counts.begin(), counts.end(), + [](const auto &a, const auto &b) { + return a.first < b.first; + }); + + // Print step + cout << "Filename: " << argv[1] << ", total words: " << total_words << endl; + for (size_t i = 0; i < counts.size(); ++i) { + cout << "[" << i << "] " << counts[i].first << ": " << counts[i].second << endl; + } + + end = omp_get_wtime(); + // Use cerr to always print in terminal + cerr << "OpenMP time: " << (end - start) * 1000 << " ms\n"; + cerr << " File read & Map time: " << (start_r - start) * 1000 << " ms\n"; + cerr << " Reducing time: " << (start_p - start_r) * 1000 << " ms\n"; + cerr << " Sort & Print time: " << (end - start_p) * 1000 << " ms\n"; + + omp_destroy_lock(&readers_lock); + omp_destroy_lock(&global_counts_lock); + for (int i = 0; i < num_reducers; ++i) { + omp_destroy_lock(&reducer_locks[i]); + } + + MPI_Finalize(); + return 0; +} From d5ce516781015d39c26d496411137a84843a5fd6 Mon Sep 17 00:00:00 2001 From: tdehoff1 Date: Tue, 25 Nov 2025 17:37:42 -0500 Subject: [PATCH 02/15] added file multithreaded file reading --- hybrid.cpp | 204 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 129 insertions(+), 75 deletions(-) diff --git a/hybrid.cpp b/hybrid.cpp index 7f72c90..8826106 100644 --- a/hybrid.cpp +++ b/hybrid.cpp @@ -18,6 +18,8 @@ unordered_map global_counts; size_t total_words; size_t files_remain; int num_reducers; +int num_readers; +int readers_avail; omp_lock_t readers_lock; vector reducer_locks; @@ -41,6 +43,9 @@ void process_word(string &w) { } void read_file (char* fname) { + #pragma omp atomic + readers_avail--; + size_t wc = 0; ifstream fin(fname); if (!fin) { @@ -70,6 +75,9 @@ void read_file (char* fname) { #pragma omp atomic files_remain--; + + #pragma omp atomic + readers_avail++; } int hash_str(string s, int R) { @@ -170,7 +178,10 @@ int main(int argc, char* argv[]) { int n_threads = omp_get_max_threads(); int num_mappers = n_threads; num_reducers = n_threads * 2; // Works best on my laptop -- test on ISAAC - files_remain = argc - 1;\ + files_remain = argc - 1; + + num_readers = n_threads / 2; + readers_avail = num_readers; if (rank == 0) { cerr << "Testing " << n_threads << " thread(s), " << size << " processes\n"; @@ -187,97 +198,140 @@ int main(int argc, char* argv[]) { double start, end, start_r, start_p; start = MPI_Wtime(); - int done = 0; // File reading step if (rank == 0) { size_t f_count = 1; + size_t active_ranks = size - 1; MPI_Status stat; int tmp; + int flag; - while (f_count < argc) { - // Use tag = 1 for requests - MPI_Recv(&tmp, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &stat); - int requesting_rank = stat.MPI_SOURCE; - // Use tag = 2 for responds - MPI_Send(&f_count, 1, MPI_INT, requesting_rank, 2, MPI_COMM_WORLD); - f_count++; + while (active_ranks > 0) { + MPI_Iprobe(MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &flag, &stat); + if (!flag && readers_avail > 0) { + #pragma omp parallel + { + #pragma omp single + { + if (f_count < argc) { + #pragma omp task + { + cerr << "rank " << rank << " starts reading a file\n"; + read_file(argv[f_count]); + } + f_count++; + } + } + } + } + else { + // Use tag = 1 for requests + MPI_Recv(&tmp, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &stat); + int requesting_rank = stat.MPI_SOURCE; + + int send_buff = -1; + if (f_count < argc) { + send_buff = f_count; + f_count++; + } + else { + // This rank receives -1 for "work done" + active_ranks--; + } + + // Use tag = 2 for responds + MPI_Send(&send_buff, 1, MPI_INT, requesting_rank, 2, MPI_COMM_WORLD); + } } - // All files have been distributed here, broadcast "work done" - done = 1; - MPI_Bcast(&done, 1, MPI_INT, 0, MPI_COMM_WORLD); } else { int rec_buff = 0; - while (!done) { - // Send non-blocking request - MPI_Send(&rec_buff, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); - MPI_Recv(&rec_buff, 1, MPI_INT, 0, 2, MPI_COMM_WORLD, MPI_STATUSES_IGNORE); - - read_file(argv[rec_buff]); - } - } - - #pragma omp parallel - { - #pragma omp single - { - - - while (argv[f_count]) { - #pragma omp task firstprivate(f_count) - { - read_file(argv[f_count]); + while (true) { + if (readers_avail > 0) { + // Send request + MPI_Send(&rec_buff, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + // Receive file number or -1 for "work done" + MPI_Recv(&rec_buff, 1, MPI_INT, 0, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + if (rec_buff == -1) { + break; } - f_count++; - } - - // Mapping step - for (int i = 0; i < num_mappers; ++i) { - #pragma omp task + #pragma omp parallel { - mapping_step(); + #pragma omp single + { + #pragma omp task + { + cerr << "rank " << rank << " starts reading a file\n"; + read_file(argv[rec_buff]); + } + } } } } } - start_r = omp_get_wtime(); - // Reducing step - #pragma omp parallel for - for (int i = 0; i < num_reducers; ++i) { - reduce_step(i); - } - - start_p = omp_get_wtime(); - vector> counts; - for (auto &el : global_counts) { - counts.emplace_back(el.first, el.second); - } - - // Sort in alphabetical order - sort(counts.begin(), counts.end(), - [](const auto &a, const auto &b) { - return a.first < b.first; - }); - - // Print step - cout << "Filename: " << argv[1] << ", total words: " << total_words << endl; - for (size_t i = 0; i < counts.size(); ++i) { - cout << "[" << i << "] " << counts[i].first << ": " << counts[i].second << endl; - } - - end = omp_get_wtime(); - // Use cerr to always print in terminal - cerr << "OpenMP time: " << (end - start) * 1000 << " ms\n"; - cerr << " File read & Map time: " << (start_r - start) * 1000 << " ms\n"; - cerr << " Reducing time: " << (start_p - start_r) * 1000 << " ms\n"; - cerr << " Sort & Print time: " << (end - start_p) * 1000 << " ms\n"; - - omp_destroy_lock(&readers_lock); - omp_destroy_lock(&global_counts_lock); - for (int i = 0; i < num_reducers; ++i) { - omp_destroy_lock(&reducer_locks[i]); - } + // #pragma omp parallel + // { + // #pragma omp single + // { + + + // while (argv[f_count]) { + // #pragma omp task firstprivate(f_count) + // { + // read_file(argv[f_count]); + // } + // f_count++; + // } + + // // Mapping step + // for (int i = 0; i < num_mappers; ++i) { + // #pragma omp task + // { + // mapping_step(); + // } + // } + // } + // } + + // start_r = omp_get_wtime(); + // // Reducing step + // #pragma omp parallel for + // for (int i = 0; i < num_reducers; ++i) { + // reduce_step(i); + // } + + // start_p = omp_get_wtime(); + // vector> counts; + // for (auto &el : global_counts) { + // counts.emplace_back(el.first, el.second); + // } + + // // Sort in alphabetical order + // sort(counts.begin(), counts.end(), + // [](const auto &a, const auto &b) { + // return a.first < b.first; + // }); + + // // Print step + // cout << "Filename: " << argv[1] << ", total words: " << total_words << endl; + // for (size_t i = 0; i < counts.size(); ++i) { + // cout << "[" << i << "] " << counts[i].first << ": " << counts[i].second << endl; + // } + + // end = omp_get_wtime(); + // // Use cerr to always print in terminal + // cerr << "OpenMP time: " << (end - start) * 1000 << " ms\n"; + // cerr << " File read & Map time: " << (start_r - start) * 1000 << " ms\n"; + // cerr << " Reducing time: " << (start_p - start_r) * 1000 << " ms\n"; + // cerr << " Sort & Print time: " << (end - start_p) * 1000 << " ms\n"; + + // omp_destroy_lock(&readers_lock); + // omp_destroy_lock(&global_counts_lock); + // for (int i = 0; i < num_reducers; ++i) { + // omp_destroy_lock(&reducer_locks[i]); + // } MPI_Finalize(); return 0; From f1b8d239b809e856f4b56404bd6654bf2f5b4a3c Mon Sep 17 00:00:00 2001 From: tdehoff1 Date: Tue, 25 Nov 2025 18:13:13 -0500 Subject: [PATCH 03/15] minor updates + debug print --- hybrid.cpp | 101 ++++++++++++++++++++++++++--------------------------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/hybrid.cpp b/hybrid.cpp index 8826106..b727ce1 100644 --- a/hybrid.cpp +++ b/hybrid.cpp @@ -198,72 +198,70 @@ int main(int argc, char* argv[]) { double start, end, start_r, start_p; start = MPI_Wtime(); - // File reading step - if (rank == 0) { - size_t f_count = 1; - size_t active_ranks = size - 1; - MPI_Status stat; - int tmp; - int flag; - - while (active_ranks > 0) { - MPI_Iprobe(MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &flag, &stat); - if (!flag && readers_avail > 0) { - #pragma omp parallel - { - #pragma omp single - { + #pragma omp parallel + { + #pragma omp master + { + // File reading step + if (rank == 0) { + size_t f_count = 1; + size_t active_ranks = size - 1; + MPI_Status stat; + int tmp; + int flag; + + while (active_ranks > 0) { + // Check if any ranks sent a pending request + MPI_Iprobe(MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &flag, &stat); + // If not, generate tasks for master rank theads + if (!flag && readers_avail > 0) { if (f_count < argc) { #pragma omp task { - cerr << "rank " << rank << " starts reading a file\n"; + cerr << "rank " << rank << " starts reading a file " << argv[f_count] << "\n"; read_file(argv[f_count]); + cerr << "rank " << rank << " read " << total_words << "words\n"; } f_count++; } } + else { + // Use tag = 1 for requests + MPI_Recv(&tmp, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &stat); + int requesting_rank = stat.MPI_SOURCE; + + int send_buff = -1; + if (f_count < argc) { + send_buff = f_count; + f_count++; + } + else { + // This rank receives -1 for "work done" + active_ranks--; + } + // Use tag = 2 for responds + MPI_Send(&send_buff, 1, MPI_INT, requesting_rank, 2, MPI_COMM_WORLD); + } } } else { - // Use tag = 1 for requests - MPI_Recv(&tmp, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &stat); - int requesting_rank = stat.MPI_SOURCE; - - int send_buff = -1; - if (f_count < argc) { - send_buff = f_count; - f_count++; - } - else { - // This rank receives -1 for "work done" - active_ranks--; - } + int rec_buff = 0; + while (true) { + if (readers_avail > 0) { + // Send request + MPI_Send(&rec_buff, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + // Receive file number or -1 for "work done" + MPI_Recv(&rec_buff, 1, MPI_INT, 0, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + if (rec_buff == -1) { + break; + } - // Use tag = 2 for responds - MPI_Send(&send_buff, 1, MPI_INT, requesting_rank, 2, MPI_COMM_WORLD); - } - } - } - else { - int rec_buff = 0; - while (true) { - if (readers_avail > 0) { - // Send request - MPI_Send(&rec_buff, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); - // Receive file number or -1 for "work done" - MPI_Recv(&rec_buff, 1, MPI_INT, 0, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - if (rec_buff == -1) { - break; - } - #pragma omp parallel - { - #pragma omp single - { #pragma omp task { - cerr << "rank " << rank << " starts reading a file\n"; + cerr << "rank " << rank << " starts reading a file " << argv[rec_buff] << "\n"; read_file(argv[rec_buff]); + cerr << "rank " << rank << " read " << total_words << "words\n"; } } } @@ -271,6 +269,7 @@ int main(int argc, char* argv[]) { } } + // #pragma omp parallel // { // #pragma omp single From 31eabc8693078180ccf0e38e1cf6c7fc52555f89 Mon Sep 17 00:00:00 2001 From: tdehoff1 Date: Tue, 25 Nov 2025 18:19:23 -0500 Subject: [PATCH 04/15] added hybrid compile command and run script --- Makefile | 7 +++++-- hybrid_run.sh | 24 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 hybrid_run.sh diff --git a/Makefile b/Makefile index a3892bd..de0f60c 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CC = g++ CFLAGS = -fopenmp -Wall -O3 LDFLAGS = -lm -all: seq omp +all: seq omp hybrid seq: sequential.cpp $(CC) $(CFLAGS) -o $@ $< $(LDFLAGS) @@ -10,5 +10,8 @@ seq: sequential.cpp omp: omp.cpp $(CC) $(CFLAGS) -o $@ $< $(LDFLAGS) +hybrid: + mpicc $(CFLAGS) -o $@ $< $(LDFLAGS) + clean: - rm -f seq omp *.txt + rm -f seq omp hybrid *.txt diff --git a/hybrid_run.sh b/hybrid_run.sh new file mode 100644 index 0000000..c6aa882 --- /dev/null +++ b/hybrid_run.sh @@ -0,0 +1,24 @@ +#!/bin/bash +#SBATCH -J final-hybrid #Job name +#SBATCH -A acf-utk0011 #Write your project account associated to utia condo +#SBATCH -p short +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=8 #--ntasks-per-node is used when we want to define the number of processes per node +#SBATCH --cpus-per-task=4 +#SBATCH --time=00:20:00 +#SBATCH -o hybrid.o%j +#SBATCH --qos=short + +TEST_DIR="raw_text_input" + +args=() + +for file in "$TEST_DIR"/*; do + args+=("$file") +done + +export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK + +module load openmpi/4.1.5-gcc + +srun -n 8 ./hybrid "${args[@]}" > hybrid_out.txt From 2218112cca47b88b814f914b81fb77bbb6e06c73 Mon Sep 17 00:00:00 2001 From: Holden Roaten Date: Tue, 25 Nov 2025 20:01:35 -0500 Subject: [PATCH 05/15] replaced pair with make_pair for sanity --- sequential.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sequential.cpp b/sequential.cpp index d3d5440..2c2566b 100644 --- a/sequential.cpp +++ b/sequential.cpp @@ -51,7 +51,7 @@ int main(int argc, char* argv[]) { // Map step if (!word.empty()) { // avoid pushing empty strings file_word_count++; - raw_tuples.push_back(pair(word, 1)); + raw_tuples.push_back(make_pair(word, 1)); } } @@ -68,7 +68,7 @@ int main(int argc, char* argv[]) { for (size_t i = 0; i < entry.second.size(); ++i) { sum += entry.second[i]; } - counts.push_back(pair(entry.first, sum)); + counts.push_back(make_pair(entry.first, sum)); } // Sort in alphabetical order From 5be4215c6dd7699059f04354cf4c6b0c2cbafa70 Mon Sep 17 00:00:00 2001 From: tdehoff1 Date: Tue, 25 Nov 2025 21:30:31 -0500 Subject: [PATCH 06/15] sequential code takes multiple files --- sequential.cpp | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/sequential.cpp b/sequential.cpp index d3d5440..65cede5 100644 --- a/sequential.cpp +++ b/sequential.cpp @@ -28,31 +28,36 @@ void process_word(string &w) { int main(int argc, char* argv[]) { if (argc < 2) { - fprintf(stderr, "usage: %s \n", argv[0]); + fprintf(stderr, "usage: %s \n", argv[0]); return 1; } + vector> raw_tuples; + size_t file_word_count = 0; + double start, end; start = omp_get_wtime(); - // File reading step - ifstream fin(argv[1]); - if (!fin) { - fprintf(stderr, "error: unable to open input file: %s\n", argv[1]); - return 1; - } - string word; - vector> raw_tuples; - size_t file_word_count = 0; + // File reading step + size_t f_count = 1; + while (argv[f_count]) { + ifstream fin(argv[f_count]); + if (!fin) { + fprintf(stderr, "error: unable to open input file: %s\n", argv[f_count]); + return 1; + } - while (fin >> word) { - process_word(word); - // Map step - if (!word.empty()) { // avoid pushing empty strings - file_word_count++; - raw_tuples.push_back(pair(word, 1)); + string word; + while (fin >> word) { + process_word(word); + // Map step + if (!word.empty()) { // avoid pushing empty strings + file_word_count++; + raw_tuples.push_back(pair(word, 1)); + } } + f_count++; } // Shuffle step From 8ca2947bee3909731c860320b7d679b71101b6a4 Mon Sep 17 00:00:00 2001 From: tdehoff1 Date: Tue, 25 Nov 2025 21:32:51 -0500 Subject: [PATCH 07/15] fixed pair issue --- sequential.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sequential.cpp b/sequential.cpp index 65cede5..8ccade1 100644 --- a/sequential.cpp +++ b/sequential.cpp @@ -54,7 +54,7 @@ int main(int argc, char* argv[]) { // Map step if (!word.empty()) { // avoid pushing empty strings file_word_count++; - raw_tuples.push_back(pair(word, 1)); + raw_tuples.push_back({word, 1}); } } f_count++; @@ -73,7 +73,7 @@ int main(int argc, char* argv[]) { for (size_t i = 0; i < entry.second.size(); ++i) { sum += entry.second[i]; } - counts.push_back(pair(entry.first, sum)); + counts.push_back({entry.first, sum}); } // Sort in alphabetical order From ccadb463bb15dbc1459e9befeaad125844e7f4c1 Mon Sep 17 00:00:00 2001 From: tdehoff1 Date: Tue, 25 Nov 2025 21:39:29 -0500 Subject: [PATCH 08/15] added fix for non-ascii chars --- omp.cpp | 8 ++++---- sequential.cpp | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/omp.cpp b/omp.cpp index c1e9798..46d2e2d 100644 --- a/omp.cpp +++ b/omp.cpp @@ -23,12 +23,12 @@ vector reducer_locks; omp_lock_t global_counts_lock; void process_word(string &w) { - // Remove punctuation at beginning - while (!w.empty() && ispunct(w[0])) { + // Remove punctuation and non-ascii chars at beginning + while (!w.empty() && w[0] > 0 &&ispunct(w[0])) { w.erase(0, 1); } - // Remove punctuation at end - while (!w.empty() && ispunct(w[w.size() - 1])) { + // Remove punctuation and non-ascii chars at end + while (!w.empty() && w[w.size() - 1] > 0 && ispunct(w[w.size() - 1])) { w.pop_back(); } // Convert all letters to lowercase diff --git a/sequential.cpp b/sequential.cpp index 8ccade1..73ef929 100644 --- a/sequential.cpp +++ b/sequential.cpp @@ -10,12 +10,12 @@ using namespace std; void process_word(string &w) { - // Remove punctuation at beginning - while (!w.empty() && ispunct(w[0])) { + // Remove punctuation and non-ascii chars at beginning + while (!w.empty() && w[0] > 0 &&ispunct(w[0])) { w.erase(0, 1); } - // Remove punctuation at end - while (!w.empty() && ispunct(w[w.size() - 1])) { + // Remove punctuation and non-ascii chars at end + while (!w.empty() && w[w.size() - 1] > 0 && ispunct(w[w.size() - 1])) { w.pop_back(); } // Convert all letters to lowercase From 26201cfe35bb535622d652191985f6c27c484ae6 Mon Sep 17 00:00:00 2001 From: tdehoff1 Date: Tue, 25 Nov 2025 21:45:16 -0500 Subject: [PATCH 09/15] added fix for non-ascii chars - 2 --- omp.cpp | 18 ++++++++++++++---- sequential.cpp | 18 ++++++++++++++---- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/omp.cpp b/omp.cpp index 46d2e2d..8ad940f 100644 --- a/omp.cpp +++ b/omp.cpp @@ -24,12 +24,22 @@ omp_lock_t global_counts_lock; void process_word(string &w) { // Remove punctuation and non-ascii chars at beginning - while (!w.empty() && w[0] > 0 &&ispunct(w[0])) { - w.erase(0, 1); + while (!w.empty()) { + signed char c = w.front(); + if (c < 0 || ispunct(c)) { + w.erase(0, 1); + continue; + } + break; } // Remove punctuation and non-ascii chars at end - while (!w.empty() && w[w.size() - 1] > 0 && ispunct(w[w.size() - 1])) { - w.pop_back(); + while (!w.empty()) { + signed char c = w.back(); + if (c < 0 || ispunct(c)) { + w.pop_back(); + continue; + } + break; } // Convert all letters to lowercase for (size_t i = 0; i < w.length(); ++i) { diff --git a/sequential.cpp b/sequential.cpp index 73ef929..5f81d82 100644 --- a/sequential.cpp +++ b/sequential.cpp @@ -11,12 +11,22 @@ using namespace std; void process_word(string &w) { // Remove punctuation and non-ascii chars at beginning - while (!w.empty() && w[0] > 0 &&ispunct(w[0])) { - w.erase(0, 1); + while (!w.empty()) { + signed char c = w.front(); + if (c < 0 || ispunct(c)) { + w.erase(0, 1); + continue; + } + break; } // Remove punctuation and non-ascii chars at end - while (!w.empty() && w[w.size() - 1] > 0 && ispunct(w[w.size() - 1])) { - w.pop_back(); + while (!w.empty()) { + signed char c = w.back(); + if (c < 0 || ispunct(c)) { + w.pop_back(); + continue; + } + break; } // Convert all letters to lowercase for (size_t i = 0; i < w.length(); ++i) { From 3d61270575eb8d31c520e36c0a49749a804919c5 Mon Sep 17 00:00:00 2001 From: tdehoff1 Date: Wed, 26 Nov 2025 12:01:01 -0500 Subject: [PATCH 10/15] updated file read step --- hybrid.cpp | 106 ++++++++++++++++++----------------------------------- 1 file changed, 35 insertions(+), 71 deletions(-) diff --git a/hybrid.cpp b/hybrid.cpp index b727ce1..7885bf0 100644 --- a/hybrid.cpp +++ b/hybrid.cpp @@ -11,7 +11,7 @@ using namespace std; -vector readers_q; +unordered_map readers_map; vector>> reducer_queues; unordered_map global_counts; @@ -26,13 +26,23 @@ vector reducer_locks; omp_lock_t global_counts_lock; void process_word(string &w) { - // Remove punctuation at beginning - while (!w.empty() && ispunct(w[0])) { - w.erase(0, 1); + // Remove punctuation and non-ascii chars at beginning + while (!w.empty()) { + signed char c = w.front(); + if (c < 0 || ispunct(c)) { + w.erase(0, 1); + continue; + } + break; } - // Remove punctuation at end - while (!w.empty() && ispunct(w[w.size() - 1])) { - w.pop_back(); + // Remove punctuation and non-ascii chars at end + while (!w.empty()) { + signed char c = w.back(); + if (c < 0 || ispunct(c)) { + w.pop_back(); + continue; + } + break; } // Convert all letters to lowercase for (size_t i = 0; i < w.length(); ++i) { @@ -42,7 +52,7 @@ void process_word(string &w) { } } -void read_file (char* fname) { +void read_and_map (char* fname) { #pragma omp atomic readers_avail--; @@ -67,7 +77,9 @@ void read_file (char* fname) { } } omp_set_lock(&readers_lock); - readers_q.insert(readers_q.end(), make_move_iterator(words.begin()), make_move_iterator(words.end())); + for (string &s : words) { + readers_map[s]++; + } omp_unset_lock(&readers_lock); #pragma omp atomic @@ -88,57 +100,6 @@ int hash_str(string s, int R) { return sum % R; } -void mapping_step() { - unordered_map buckets; - - // Grab elemnts from the work q in chunks - const int chunk_size = 1024; // find which chunk size works the best - vector working_batch; - working_batch.reserve(chunk_size); - - while (true) { - working_batch.clear(); - - // Lock and grab new chunk of elements if queue is not empty - omp_set_lock(&readers_lock); - for (size_t i = 0; i < chunk_size && !readers_q.empty(); ++i) { - working_batch.push_back(readers_q.back()); - readers_q.pop_back(); - } - omp_unset_lock(&readers_lock); - - if (!working_batch.empty()) { - // Queue not empty -- process new elements - for (size_t i = 0; i < working_batch.size(); ++i) { - buckets[working_batch[i]]++; - } - } - else { - int remaining; - // Shared global variable -- must be read atomically - #pragma omp atomic read - remaining = files_remain; - - if (remaining == 0) { - // Queue empty and all files are processed - break; - } - else { - // Mappers are ahead of readers - #pragma omp taskyield - } - } - } - - // Push thread's results into the reducer queues - for (auto el : buckets) { - int index = hash_str(el.first, num_reducers); - omp_set_lock(&reducer_locks[index]); - reducer_queues[index].push(el); - omp_unset_lock(&reducer_locks[index]); - } -} - void reduce_step(int id) { // Use local hash table for partial results unordered_map local_result; @@ -176,11 +137,10 @@ int main(int argc, char* argv[]) { } int n_threads = omp_get_max_threads(); - int num_mappers = n_threads; num_reducers = n_threads * 2; // Works best on my laptop -- test on ISAAC files_remain = argc - 1; - num_readers = n_threads / 2; + num_readers = n_threads; readers_avail = num_readers; if (rank == 0) { @@ -209,18 +169,20 @@ int main(int argc, char* argv[]) { MPI_Status stat; int tmp; int flag; + int local_avail; while (active_ranks > 0) { // Check if any ranks sent a pending request MPI_Iprobe(MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &flag, &stat); + // If not, generate tasks for master rank theads - if (!flag && readers_avail > 0) { + #pragma omp atomic read + local_avail = readers_avail; + if (!flag && local_avail > 0) { if (f_count < argc) { #pragma omp task { - cerr << "rank " << rank << " starts reading a file " << argv[f_count] << "\n"; - read_file(argv[f_count]); - cerr << "rank " << rank << " read " << total_words << "words\n"; + read_and_map(argv[f_count]); } f_count++; } @@ -245,9 +207,12 @@ int main(int argc, char* argv[]) { } } else { + int local_avail; int rec_buff = 0; while (true) { - if (readers_avail > 0) { + #pragma omp atomic read + local_avail = readers_avail; + if (local_avail > 0) { // Send request MPI_Send(&rec_buff, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); // Receive file number or -1 for "work done" @@ -259,16 +224,15 @@ int main(int argc, char* argv[]) { #pragma omp task { - cerr << "rank " << rank << " starts reading a file " << argv[rec_buff] << "\n"; - read_file(argv[rec_buff]); - cerr << "rank " << rank << " read " << total_words << "words\n"; + read_and_map(argv[rec_buff]); } } } } } } - + end = MPI_Wtime(); + cerr << "File reading + mapping took " << (end - start) * 1000 << " ms\n"; // #pragma omp parallel // { From 77e29d68f023ad71ad0819fb0c03dfee20da487e Mon Sep 17 00:00:00 2001 From: Tatiana Dehoff Date: Wed, 26 Nov 2025 12:33:21 -0500 Subject: [PATCH 11/15] Removed intermediate q in the reduce step --- omp.cpp | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/omp.cpp b/omp.cpp index 8ad940f..4fc66e7 100644 --- a/omp.cpp +++ b/omp.cpp @@ -11,7 +11,7 @@ using namespace std; vector readers_q; -vector>> reducer_queues; +vector>> reducer_queues; unordered_map global_counts; size_t total_words; @@ -135,22 +135,15 @@ void mapping_step() { for (auto el : buckets) { int index = hash_str(el.first, num_reducers); omp_set_lock(&reducer_locks[index]); - reducer_queues[index].push(el); + reducer_queues[index].push_back(el); omp_unset_lock(&reducer_locks[index]); } } void reduce_step(int id) { - // Use local hash table for partial results - unordered_map local_result; - while (!reducer_queues[id].empty()) { - pair cur_entry = reducer_queues[id].front(); - reducer_queues[id].pop(); - local_result[cur_entry.first] += cur_entry.second; - } // Merge partial results into global results omp_set_lock(&global_counts_lock); - for (auto &el : local_result) { + for (auto &el : reducer_queues[id]) { global_counts[el.first] += el.second; } omp_unset_lock(&global_counts_lock); From 8164ccdf85cc438529b1cb2eaa55bbf29220a881 Mon Sep 17 00:00:00 2001 From: Tatiana Melnichenko Date: Wed, 26 Nov 2025 14:55:55 -0500 Subject: [PATCH 12/15] Intermediate hybrid version --- hybrid.cpp | 276 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 197 insertions(+), 79 deletions(-) diff --git a/hybrid.cpp b/hybrid.cpp index 7885bf0..75bbe44 100644 --- a/hybrid.cpp +++ b/hybrid.cpp @@ -11,8 +11,9 @@ using namespace std; -unordered_map readers_map; -vector>> reducer_queues; +vector readers_q; +vector>> send_buffers; +vector>> reducer_queues; unordered_map global_counts; size_t total_words; @@ -20,8 +21,10 @@ size_t files_remain; int num_reducers; int num_readers; int readers_avail; +int total_ranks; omp_lock_t readers_lock; +vector mappers_locks; vector reducer_locks; omp_lock_t global_counts_lock; @@ -45,14 +48,15 @@ void process_word(string &w) { break; } // Convert all letters to lowercase - for (size_t i = 0; i < w.length(); ++i) { - if (isupper(w[i])) { - w[i] = tolower(w[i]); + for (char &ch : w) { + unsigned char c = static_cast(ch); + if (isupper(c)) { + ch = tolower(c); } } } -void read_and_map (char* fname) { +void read_file (char* fname) { #pragma omp atomic readers_avail--; @@ -77,9 +81,7 @@ void read_and_map (char* fname) { } } omp_set_lock(&readers_lock); - for (string &s : words) { - readers_map[s]++; - } + readers_q.insert(readers_q.end(), make_move_iterator(words.begin()), make_move_iterator(words.end())); omp_unset_lock(&readers_lock); #pragma omp atomic @@ -100,12 +102,122 @@ int hash_str(string s, int R) { return sum % R; } +void mapping_step() { + unordered_map buckets; + + // Grab elemnts from the work q in chunks + const int chunk_size = 1024; // find which chunk size works the best + vector working_batch; + working_batch.reserve(chunk_size); + + while (true) { + working_batch.clear(); + + // Lock and grab new chunk of elements if queue is not empty + omp_set_lock(&readers_lock); + for (size_t i = 0; i < chunk_size && !readers_q.empty(); ++i) { + working_batch.push_back(readers_q.back()); + readers_q.pop_back(); + } + omp_unset_lock(&readers_lock); + + if (!working_batch.empty()) { + // Queue not empty -- process new elements + for (size_t i = 0; i < working_batch.size(); ++i) { + buckets[working_batch[i]]++; + } + } + else { + int remaining; + // Shared global variable -- must be read atomically + #pragma omp atomic read + remaining = files_remain; + + if (remaining == 0) { + // Queue empty and all files are processed + break; + } + // Mappers are ahead of readers + #pragma omp taskyield + } + } + + // Push thread's results into the reducer queues + for (auto el : buckets) { + int dst_rank = hash_str(el.first, total_ranks); + + omp_set_lock(&mappers_locks[dst_rank]); + send_buffers[dst_rank].push_back(el); + omp_unset_lock(&mappers_locks[dst_rank]); + } +} + +void send_data(int my_rank) { + for (int i = 0; i < total_ranks; ++i) { + // Skip sending to yourself, send to reducer queues + if (i == my_rank) { + for (auto &el : send_buffers[i]) { + int ind = hash_str(el.first, num_reducers); + omp_set_lock(&reducer_locks[ind]); + reducer_queues[ind].push_back(el); + omp_unset_lock(&reducer_locks[ind]); + } + } + else { + // Send total number of elements first + int N = send_buffers[i].size(); + MPI_Send(&N, 1, MPI_INT, i, 0, MPI_COMM_WORLD); + // Send each element individually + for (int j = 0; j < N; ++j) { + string &w = send_buffers[i][j].first; + int64_t count = (int64_t)send_buffers[i][j].second; + int w_len = w.length(); + // Send word length separately + MPI_Send(&w_len, 1, MPI_INT, i, 0, MPI_COMM_WORLD); + // Send word and frequency count + MPI_Send(w.data(), w_len, MPI_CHAR, i, 0, MPI_COMM_WORLD); + MPI_Send(&count, 1, MPI_INT64_T, i, 0, MPI_COMM_WORLD); + } + } + } +} + +void receive_data(int my_rank) { + for (int i = 0; i < total_ranks; ++i) { + if (i == my_rank) { + continue; + } + + // Receive total buffer size + int N; + MPI_Recv(&N, 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + for (int j = 0; j < N; ++j) { + // Receive words + int w_len; + string w; + MPI_Recv(&w_len, 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&w[0], 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + // Receive counts + int64_t count; + MPI_Recv(&count, 1, MPI_INT64_T, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + // Push to reducer queue + int ind = hash_str(w, num_reducers); + omp_set_lock(&reducer_locks[ind]); + reducer_queues[ind].push_back({w, (size_t)count}); + omp_unset_lock(&reducer_locks[ind]); + } + } +} + void reduce_step(int id) { // Use local hash table for partial results unordered_map local_result; while (!reducer_queues[id].empty()) { pair cur_entry = reducer_queues[id].front(); - reducer_queues[id].pop(); + reducer_queues[id].pop_back(); local_result[cur_entry.first] += cur_entry.second; } // Merge partial results into global results @@ -127,8 +239,6 @@ int main(int argc, char* argv[]) { if (provided < MPI_THREAD_FUNNELED) { printf("Error: MPI_THREAD_FUNNELED is not supported.\n"); MPI_Abort(MPI_COMM_WORLD, 1); - } else { - printf("Rank %d: threading level provided = %d\n", rank, provided); } if (argc < 2) { @@ -142,6 +252,7 @@ int main(int argc, char* argv[]) { num_readers = n_threads; readers_avail = num_readers; + total_ranks = size; if (rank == 0) { cerr << "Testing " << n_threads << " thread(s), " << size << " processes\n"; @@ -153,9 +264,14 @@ int main(int argc, char* argv[]) { for (int i = 0; i < num_reducers; ++i) { omp_init_lock(&reducer_locks[i]); } + mappers_locks.resize(num_readers); + for (int i = 0; i < num_readers; ++i) { + omp_init_lock(&mappers_locks[i]); + } reducer_queues.resize(num_reducers); + send_buffers.resize(total_ranks); - double start, end, start_r, start_p; + double start, end, start_c, start_r, start_p; start = MPI_Wtime(); #pragma omp parallel @@ -164,7 +280,7 @@ int main(int argc, char* argv[]) { { // File reading step if (rank == 0) { - size_t f_count = 1; + int f_count = 1; size_t active_ranks = size - 1; MPI_Status stat; int tmp; @@ -182,9 +298,14 @@ int main(int argc, char* argv[]) { if (f_count < argc) { #pragma omp task { - read_and_map(argv[f_count]); + read_file(argv[f_count]); } f_count++; + + #pragma omp task + { + mapping_step(); + } } } else { @@ -224,77 +345,74 @@ int main(int argc, char* argv[]) { #pragma omp task { - read_and_map(argv[rec_buff]); + read_file(argv[rec_buff]); + } + + #pragma omp task + { + mapping_step(); } } } } } } + start_c = MPI_Wtime(); + if (rank == 0) { + cerr << "File reading + mapping took " << (start_c - start) * 1000 << " ms\n"; + } + + // Even ranks send first to avoid deadlock + if (rank % 2 == 0) { + send_data(rank); + receive_data(rank); + } + else { + receive_data(rank); + send_data(rank); + } + + start_r = MPI_Wtime(); + // Reducing step + #pragma omp parallel for + for (int i = 0; i < num_reducers; ++i) { + reduce_step(i); + } + + start_p = MPI_Wtime(); + vector> counts; + for (auto &el : global_counts) { + counts.emplace_back(el.first, el.second); + } + + // Sort in alphabetical order + sort(counts.begin(), counts.end(), + [](const auto &a, const auto &b) { + return a.first < b.first; + }); + + // Print step + if (rank == 0) { + cout << "Filename: " << argv[1] << ", total words: " << total_words << endl; + for (size_t i = 0; i < counts.size(); ++i) { + cout << "[" << i << "] " << counts[i].first << ": " << counts[i].second << endl; + } + } + end = MPI_Wtime(); - cerr << "File reading + mapping took " << (end - start) * 1000 << " ms\n"; - - // #pragma omp parallel - // { - // #pragma omp single - // { - - - // while (argv[f_count]) { - // #pragma omp task firstprivate(f_count) - // { - // read_file(argv[f_count]); - // } - // f_count++; - // } - - // // Mapping step - // for (int i = 0; i < num_mappers; ++i) { - // #pragma omp task - // { - // mapping_step(); - // } - // } - // } - // } - - // start_r = omp_get_wtime(); - // // Reducing step - // #pragma omp parallel for - // for (int i = 0; i < num_reducers; ++i) { - // reduce_step(i); - // } - - // start_p = omp_get_wtime(); - // vector> counts; - // for (auto &el : global_counts) { - // counts.emplace_back(el.first, el.second); - // } - - // // Sort in alphabetical order - // sort(counts.begin(), counts.end(), - // [](const auto &a, const auto &b) { - // return a.first < b.first; - // }); - - // // Print step - // cout << "Filename: " << argv[1] << ", total words: " << total_words << endl; - // for (size_t i = 0; i < counts.size(); ++i) { - // cout << "[" << i << "] " << counts[i].first << ": " << counts[i].second << endl; - // } - - // end = omp_get_wtime(); - // // Use cerr to always print in terminal - // cerr << "OpenMP time: " << (end - start) * 1000 << " ms\n"; - // cerr << " File read & Map time: " << (start_r - start) * 1000 << " ms\n"; - // cerr << " Reducing time: " << (start_p - start_r) * 1000 << " ms\n"; - // cerr << " Sort & Print time: " << (end - start_p) * 1000 << " ms\n"; - - // omp_destroy_lock(&readers_lock); - // omp_destroy_lock(&global_counts_lock); - // for (int i = 0; i < num_reducers; ++i) { - // omp_destroy_lock(&reducer_locks[i]); - // } + if (rank == 0) { + // Use cerr to always print in terminal + cerr << "OpenMP time: " << (end - start) * 1000 << " ms\n"; + cerr << " File read & Map time: " << (start_r - start) * 1000 << " ms\n"; + cerr << " Reducing time: " << (start_p - start_r) * 1000 << " ms\n"; + cerr << " Sort & Print time: " << (end - start_p) * 1000 << " ms\n"; + } + + omp_destroy_lock(&readers_lock); + omp_destroy_lock(&global_counts_lock); + for (int i = 0; i < num_reducers; ++i) { + omp_destroy_lock(&reducer_locks[i]); + } MPI_Finalize(); return 0; From e6d6594e57441815b6c412927fe87d80975548f2 Mon Sep 17 00:00:00 2001 From: Tatiana Melnichenko Date: Wed, 26 Nov 2025 18:09:35 -0500 Subject: [PATCH 13/15] working hybrid --- hybrid.cpp | 205 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 140 insertions(+), 65 deletions(-) diff --git a/hybrid.cpp b/hybrid.cpp index 75bbe44..0b4e3e2 100644 --- a/hybrid.cpp +++ b/hybrid.cpp @@ -12,9 +12,9 @@ using namespace std; vector readers_q; -vector>> send_buffers; -vector>> reducer_queues; -unordered_map global_counts; +vector>> send_buffers; +vector>> reducer_queues; +unordered_map global_counts; size_t total_words; size_t files_remain; @@ -22,6 +22,7 @@ int num_reducers; int num_readers; int readers_avail; int total_ranks; +int num_mappers; omp_lock_t readers_lock; vector mappers_locks; @@ -103,7 +104,7 @@ int hash_str(string s, int R) { } void mapping_step() { - unordered_map buckets; + unordered_map buckets; // Grab elemnts from the work q in chunks const int chunk_size = 1024; // find which chunk size works the best @@ -152,31 +153,66 @@ void mapping_step() { } } -void send_data(int my_rank) { +void exchange_data(int my_rank) { for (int i = 0; i < total_ranks; ++i) { // Skip sending to yourself, send to reducer queues if (i == my_rank) { for (auto &el : send_buffers[i]) { int ind = hash_str(el.first, num_reducers); - omp_set_lock(&reducer_locks[ind]); reducer_queues[ind].push_back(el); - omp_unset_lock(&reducer_locks[ind]); } } else { // Send total number of elements first - int N = send_buffers[i].size(); - MPI_Send(&N, 1, MPI_INT, i, 0, MPI_COMM_WORLD); - // Send each element individually - for (int j = 0; j < N; ++j) { - string &w = send_buffers[i][j].first; - int64_t count = (int64_t)send_buffers[i][j].second; - int w_len = w.length(); - // Send word length separately - MPI_Send(&w_len, 1, MPI_INT, i, 0, MPI_COMM_WORLD); - // Send word and frequency count - MPI_Send(w.data(), w_len, MPI_CHAR, i, 0, MPI_COMM_WORLD); - MPI_Send(&count, 1, MPI_INT64_T, i, 0, MPI_COMM_WORLD); + int send_N = send_buffers[i].size(); + int rec_N; + + MPI_Sendrecv(&send_N, 1, MPI_INT, i, 0, + &rec_N, 1, MPI_INT, i, 0, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + for (int j = 0; j < max(send_N, rec_N); ++j) { + int send_len = 0; + int send_count = 0; + string *send_word = nullptr; + + if (j < send_N) { + send_word = &send_buffers[i][j].first; + send_len = send_word->length(); + send_count = send_buffers[i][j].second; + } + + int recv_len = 0; + int recv_count = 0; + string recv_word; + + // Exchange lengths + MPI_Sendrecv( + &send_len, 1, MPI_INT, i, 0, + &recv_len, 1, MPI_INT, i, 0, + MPI_COMM_WORLD, MPI_STATUS_IGNORE + ); + recv_word.resize(recv_len); + + // Exchange words + MPI_Sendrecv( + send_len ? send_word->data() : nullptr, send_len, MPI_CHAR, i, 0, + recv_len ? &recv_word[0] : nullptr, recv_len, MPI_CHAR, i, 0, + MPI_COMM_WORLD, MPI_STATUS_IGNORE + ); + + // Exchange counts + MPI_Sendrecv( + &send_count, 1, MPI_INT, i, 0, + &recv_count, 1, MPI_INT, i, 0, + MPI_COMM_WORLD, MPI_STATUS_IGNORE + ); + + // Store received element + if (recv_len > 0) { + int idx = hash_str(recv_word, num_reducers); + reducer_queues[idx].push_back({recv_word, recv_count}); + } } } } @@ -190,34 +226,31 @@ void receive_data(int my_rank) { // Receive total buffer size int N; - MPI_Recv(&N, 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&N, 1, MPI_INT, i, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); for (int j = 0; j < N; ++j) { // Receive words int w_len; string w; - MPI_Recv(&w_len, 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - MPI_Recv(&w[0], 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&w_len, 1, MPI_INT, i, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + w.resize(w_len); + MPI_Recv(&w[0], w_len, MPI_CHAR, i, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // Receive counts - int64_t count; - MPI_Recv(&count, 1, MPI_INT64_T, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + int count; + MPI_Recv(&count, 1, MPI_INT, i, 4, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // Push to reducer queue int ind = hash_str(w, num_reducers); - omp_set_lock(&reducer_locks[ind]); - reducer_queues[ind].push_back({w, (size_t)count}); - omp_unset_lock(&reducer_locks[ind]); + reducer_queues[ind].push_back({w, count}); } } } void reduce_step(int id) { // Use local hash table for partial results - unordered_map local_result; - while (!reducer_queues[id].empty()) { - pair cur_entry = reducer_queues[id].front(); - reducer_queues[id].pop_back(); + unordered_map local_result; + for (auto &cur_entry : reducer_queues[id]) { local_result[cur_entry.first] += cur_entry.second; } // Merge partial results into global results @@ -228,6 +261,40 @@ void reduce_step(int id) { omp_unset_lock(&global_counts_lock); } +void gather_results(int my_rank) { + if (my_rank == 0) { + for (int i = 1; i < total_ranks; ++i) { + int N; + MPI_Recv(&N, 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + for (int j = 0; j < N; ++j) { + int len; + MPI_Recv(&len, 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + string w; + w.resize(len); + MPI_Recv(&w[0], len, MPI_CHAR, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + int count; + MPI_Recv(&count, 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + global_counts[w] += count; + } + } + } + else { + int N = global_counts.size(); + MPI_Send(&N, 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + + for (auto &el : global_counts) { + string w = el.first; + int len = w.length(); + int count = el.second; + + MPI_Send(&len, 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + MPI_Send(w.data(), len, MPI_CHAR, 0, 0, MPI_COMM_WORLD); + MPI_Send(&count, 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + } + } +} + int main(int argc, char* argv[]) { int provided; MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided); @@ -248,9 +315,10 @@ int main(int argc, char* argv[]) { int n_threads = omp_get_max_threads(); num_reducers = n_threads * 2; // Works best on my laptop -- test on ISAAC - files_remain = argc - 1; + files_remain = 0; num_readers = n_threads; + num_mappers = n_threads; readers_avail = num_readers; total_ranks = size; @@ -264,8 +332,8 @@ int main(int argc, char* argv[]) { for (int i = 0; i < num_reducers; ++i) { omp_init_lock(&reducer_locks[i]); } - mappers_locks.resize(num_readers); - for (int i = 0; i < num_readers; ++i) { + mappers_locks.resize(total_ranks); + for (int i = 0; i < total_ranks; ++i) { omp_init_lock(&mappers_locks[i]); } reducer_queues.resize(num_reducers); @@ -282,33 +350,36 @@ int main(int argc, char* argv[]) { if (rank == 0) { int f_count = 1; size_t active_ranks = size - 1; + bool done = false; MPI_Status stat; int tmp; int flag; int local_avail; - while (active_ranks > 0) { + while (active_ranks > 0 || !done) { // Check if any ranks sent a pending request MPI_Iprobe(MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &flag, &stat); // If not, generate tasks for master rank theads #pragma omp atomic read local_avail = readers_avail; - if (!flag && local_avail > 0) { + + if (!done && !flag && local_avail > 0) { if (f_count < argc) { + #pragma omp atomic + files_remain++; + #pragma omp task { read_file(argv[f_count]); } f_count++; - - #pragma omp task - { - mapping_step(); - } + } + else { + done = true; } } - else { + else if (size > 1) { // Use tag = 1 for requests MPI_Recv(&tmp, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &stat); int requesting_rank = stat.MPI_SOURCE; @@ -343,34 +414,29 @@ int main(int argc, char* argv[]) { break; } - #pragma omp task - { - read_file(argv[rec_buff]); - } + #pragma omp atomic + files_remain++; #pragma omp task { - mapping_step(); + read_file(argv[rec_buff]); } } } } + + // Mapping step + for (int i = 0; i < num_mappers; ++i) { + #pragma omp task + { + mapping_step(); + } + } } } start_c = MPI_Wtime(); - if (rank == 0) { - cerr << "File reading + mapping took " << (start_c - start) * 1000 << " ms\n"; - } - // Even ranks send first to avoid deadlock - if (rank % 2 == 0) { - send_data(rank); - receive_data(rank); - } - else { - receive_data(rank); - send_data(rank); - } + exchange_data(rank); start_r = MPI_Wtime(); // Reducing step @@ -379,8 +445,13 @@ int main(int argc, char* argv[]) { reduce_step(i); } + // Nothing to gather for single rank + if (total_ranks > 1) { + gather_results(rank); + } + start_p = MPI_Wtime(); - vector> counts; + vector> counts; for (auto &el : global_counts) { counts.emplace_back(el.first, el.second); } @@ -394,16 +465,20 @@ int main(int argc, char* argv[]) { // Print step if (rank == 0) { cout << "Filename: " << argv[1] << ", total words: " << total_words << endl; - for (size_t i = 0; i < counts.size(); ++i) { - cout << "[" << i << "] " << counts[i].first << ": " << counts[i].second << endl; - } + // ISAAC is having issues printing too much output, only print the number of unique words + // Error: srun: error: eio_handle_mainloop: Abandoning IO 60 secs after job shutdown initiated + cout << "Unique words found: " << counts.size() << endl; + // for (size_t i = 0; i < counts.size(); ++i) { + // cout << "[" << i << "] " << counts[i].first << ": " << counts[i].second << endl; + // } } end = MPI_Wtime(); if (rank == 0) { // Use cerr to always print in terminal - cerr << "OpenMP time: " << (end - start) * 1000 << " ms\n"; - cerr << " File read & Map time: " << (start_r - start) * 1000 << " ms\n"; + cerr << "Hybrid time: " << (end - start) * 1000 << " ms\n"; + cerr << " File read & Map time: " << (start_c - start) * 1000 << " ms\n"; + cerr << " Communication time: " << (start_r - start_c) * 1000 << " ms\n"; cerr << " Reducing time: " << (start_p - start_r) * 1000 << " ms\n"; cerr << " Sort & Print time: " << (end - start_p) * 1000 << " ms\n"; } From 7652a4f33f80faea4f9317bed147c4de40d1213f Mon Sep 17 00:00:00 2001 From: Tatiana Melnichenko Date: Wed, 26 Nov 2025 18:13:49 -0500 Subject: [PATCH 14/15] fixed total word count --- hybrid.cpp | 39 +++++++++------------------------------ 1 file changed, 9 insertions(+), 30 deletions(-) diff --git a/hybrid.cpp b/hybrid.cpp index 0b4e3e2..ac937a5 100644 --- a/hybrid.cpp +++ b/hybrid.cpp @@ -218,35 +218,6 @@ void exchange_data(int my_rank) { } } -void receive_data(int my_rank) { - for (int i = 0; i < total_ranks; ++i) { - if (i == my_rank) { - continue; - } - - // Receive total buffer size - int N; - MPI_Recv(&N, 1, MPI_INT, i, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - for (int j = 0; j < N; ++j) { - // Receive words - int w_len; - string w; - MPI_Recv(&w_len, 1, MPI_INT, i, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - w.resize(w_len); - MPI_Recv(&w[0], w_len, MPI_CHAR, i, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - // Receive counts - int count; - MPI_Recv(&count, 1, MPI_INT, i, 4, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - // Push to reducer queue - int ind = hash_str(w, num_reducers); - reducer_queues[ind].push_back({w, count}); - } - } -} - void reduce_step(int id) { // Use local hash table for partial results unordered_map local_result; @@ -450,6 +421,14 @@ int main(int argc, char* argv[]) { gather_results(rank); } + size_t global_total_words = 0; + MPI_Reduce(&total_words, &global_total_words, 1, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + + if (rank == 0) { + total_words = global_total_words; + } + start_p = MPI_Wtime(); vector> counts; for (auto &el : global_counts) { @@ -464,7 +443,7 @@ int main(int argc, char* argv[]) { // Print step if (rank == 0) { - cout << "Filename: " << argv[1] << ", total words: " << total_words << endl; + cout << "Filename: " << argv[1] << ", total words: " << global_total_words << endl; // ISAAC is having issues printing too much output, only print the number of unique words // Error: srun: error: eio_handle_mainloop: Abandoning IO 60 secs after job shutdown initiated cout << "Unique words found: " << counts.size() << endl; From 006c5aa2ece33217d62d0a45049c73e11541573c Mon Sep 17 00:00:00 2001 From: Holden Roaten Date: Fri, 28 Nov 2025 18:07:55 -0500 Subject: [PATCH 15/15] fixed printing errors --- hybrid.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/hybrid.cpp b/hybrid.cpp index ac937a5..6288be7 100644 --- a/hybrid.cpp +++ b/hybrid.cpp @@ -443,13 +443,14 @@ int main(int argc, char* argv[]) { // Print step if (rank == 0) { - cout << "Filename: " << argv[1] << ", total words: " << global_total_words << endl; + ofstream out("hybrid_out.txt"); + out << "Filename: " << argv[1] << ", total words: " << global_total_words << "\n"; // ISAAC is having issues printing too much output, only print the number of unique words // Error: srun: error: eio_handle_mainloop: Abandoning IO 60 secs after job shutdown initiated - cout << "Unique words found: " << counts.size() << endl; - // for (size_t i = 0; i < counts.size(); ++i) { - // cout << "[" << i << "] " << counts[i].first << ": " << counts[i].second << endl; - // } + out << "Unique words found: " << counts.size() << "\n"; + for (size_t i = 0; i < counts.size(); ++i) { + out << "[" << i << "] " << counts[i].first << ": " << counts[i].second << "\n"; + }; } end = MPI_Wtime();