From ee5b12adebb32483f6c32169c9af3c2cd36f7664 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Thu, 15 Feb 2024 18:40:21 +0530 Subject: [PATCH 1/2] chore: Add rust implementation of mpsc and mpmc message passing channels --- .gitignore | 3 +- .../mpmc_channel/Cargo.toml | 12 +++++ .../mpmc_channel/src/main.rs | 54 +++++++++++++++++++ .../mpsc_channel/Cargo.toml | 11 ++++ .../mpsc_channel/src/main.rs | 48 +++++++++++++++++ 5 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 message-pasing-channels/mpmc_channel/Cargo.toml create mode 100644 message-pasing-channels/mpmc_channel/src/main.rs create mode 100644 message-pasing-channels/mpsc_channel/Cargo.toml create mode 100644 message-pasing-channels/mpsc_channel/src/main.rs diff --git a/.gitignore b/.gitignore index a147089..0d41899 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target/ -env/ \ No newline at end of file +env/ +*.lock \ No newline at end of file diff --git a/message-pasing-channels/mpmc_channel/Cargo.toml b/message-pasing-channels/mpmc_channel/Cargo.toml new file mode 100644 index 0000000..15dbf7c --- /dev/null +++ b/message-pasing-channels/mpmc_channel/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "mpmc_channel" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +crossbeam = "0.8.4" +slog-term = "2.9.0" +slog-async = "2.8.0" +slog = { version = "2.7.0" } \ No newline at end of file diff --git a/message-pasing-channels/mpmc_channel/src/main.rs b/message-pasing-channels/mpmc_channel/src/main.rs new file mode 100644 index 0000000..abb6d9c --- /dev/null +++ b/message-pasing-channels/mpmc_channel/src/main.rs @@ -0,0 +1,54 @@ +use std::thread::{self, sleep}; +use std::time::Duration; +#[macro_use] +extern crate slog; +extern crate slog_async; +extern crate slog_term; +use slog::Drain; + +static THREAD_LENGTH: i32 = 10; +static WORKER_LENGTH: i32 = 5; + +fn main() { + // logger + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let logger = slog::Logger::root(drain, o!()); + let (sender, receiver) = crossbeam::channel::unbounded(); + let mut children = Vec::new(); + + // receivers + for n in 0..WORKER_LENGTH { + let logger_cpy = logger.clone(); + let receiver_cpy = receiver.clone(); + + children.push(thread::spawn(move || { + while let Ok(event) = receiver_cpy.recv_timeout(Duration::from_secs(5)) { + info!(logger_cpy, "Received event-{event}"); + info!(logger_cpy, "Processing event-{event}"); + sleep(Duration::from_secs(5)); + info!(logger_cpy, "Processing event-{event} successful!\n"); + } + warn!(logger_cpy, "No event emitted in last 10 seconds"); + warn!(logger_cpy, "Terminating worker {n}"); + })); + } + + // senders + for id in 0..THREAD_LENGTH { + let logger_cpy = logger.clone(); + let thread_tx = sender.clone(); + + children.push(thread::spawn(move || { + info!(logger_cpy, "Sending event-{id}..."); + thread_tx.send(id).unwrap(); + })); + } + + for child in children { + child.join().expect("oops! the child thread panicked"); + } + + info!(logger, "Finished!"); +} \ No newline at end of file diff --git a/message-pasing-channels/mpsc_channel/Cargo.toml b/message-pasing-channels/mpsc_channel/Cargo.toml new file mode 100644 index 0000000..252fde1 --- /dev/null +++ b/message-pasing-channels/mpsc_channel/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "mpsc_channel" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +slog-term = "2.9.0" +slog-async = "2.8.0" +slog = { version = "2.7.0" } \ No newline at end of file diff --git a/message-pasing-channels/mpsc_channel/src/main.rs b/message-pasing-channels/mpsc_channel/src/main.rs new file mode 100644 index 0000000..527fb3d --- /dev/null +++ b/message-pasing-channels/mpsc_channel/src/main.rs @@ -0,0 +1,48 @@ +use std::sync::mpsc; +use std::thread::{self, sleep}; +use std::time::Duration; +#[macro_use] +extern crate slog; +extern crate slog_async; +extern crate slog_term; +use slog::Drain; + +static THREAD_LENGTH: i32 = 10; + +fn main() { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let logger = slog::Logger::root(drain, o!()); + let (sender, receiver) = mpsc::channel(); + let mut children = Vec::new(); + let logger_cpy = logger.clone(); + + // receiver + children.push(thread::spawn(move || { + while let Ok(event) = receiver.recv_timeout(Duration::from_secs(5)) { + info!(logger_cpy, "Received event-{event}"); + info!(logger_cpy, "Processing event-{event}"); + sleep(Duration::from_secs(5)); + info!(logger_cpy, "Processing event-{event} successful!\n"); + } + warn!(logger_cpy, "No event emitted in last 10 seconds"); + })); + + // senders + for id in 0..THREAD_LENGTH { + let logger_cpy = logger.clone(); + let thread_tx = sender.clone(); + + children.push(thread::spawn(move || { + info!(logger_cpy, "Sending event-{id}..."); + thread_tx.send(id).unwrap(); + })); + } + + for child in children { + child.join().expect("oops! the child thread panicked"); + } + + info!(logger, "Finished!"); +} From 3a010bc5a4c6c465c70dbdd8d4a49e0b72d4ffd0 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Mon, 19 Feb 2024 16:03:48 +0530 Subject: [PATCH 2/2] chore: process factorial of the number when receiving the event in channels implementation --- message-pasing-channels/Cargo.toml | 8 +++ message-pasing-channels/README.md | 42 +++++++++++++++ .../mpmc_channel/Cargo.toml | 3 +- .../mpmc_channel/src/main.rs | 54 ++++++++++++++----- .../mpsc_channel/Cargo.toml | 3 +- .../mpsc_channel/src/main.rs | 50 ++++++++++++----- 6 files changed, 133 insertions(+), 27 deletions(-) create mode 100644 message-pasing-channels/Cargo.toml create mode 100644 message-pasing-channels/README.md diff --git a/message-pasing-channels/Cargo.toml b/message-pasing-channels/Cargo.toml new file mode 100644 index 0000000..8245d0a --- /dev/null +++ b/message-pasing-channels/Cargo.toml @@ -0,0 +1,8 @@ +[workspace.package] +authors = ["The HugoByte Team "] +edition = "2021" +repository = "https://github.com/HugoByte/PoCs.git" +version = "0.0.1" + +[workspace] +members = ["mpmc_channel", "mpsc_channel"] \ No newline at end of file diff --git a/message-pasing-channels/README.md b/message-pasing-channels/README.md new file mode 100644 index 0000000..3ffc444 --- /dev/null +++ b/message-pasing-channels/README.md @@ -0,0 +1,42 @@ +# **Message Passing Channels in Rust** + +### Description + +Implementations of message-passing channels in Rust: + +- `mpsc_channel`: Utilizes an MPSC (multiple producer, single consumer) channel from the `std::sync::mpsc` module. +- `mpmc_channel`: Employs an MPMC (multiple producer, multiple consumer) channel from the `crossbeam` crate. + +Both implementations demonstrate how to send and receive messages between multiple threads, calculate factorials concurrently, and store the results in a shared data structure. + +### Dependencies + +Both projects share the following dependencies: + +- `slog`: For logging purposes. +- `slog-term`: Provides a terminal logger. +- `slog-async`: For building `Drain` for the `slog`. +- `num`: Facilitates working with large numbers using the `BigUint` type. + +### Building and Running + +1. Clone the repository: + ```Bash + git clone https://github.com/your-username/message-passing-channels.git + ``` +2. Navigate to the project directory: + ```Bash + cd message-passing-channels + ``` +3. Build the project: + ```Bash + cargo build --all + ``` +4. Run the desired project: + ```Bash** + cargo run --bin mpsc_channel + ``` + ```Bash** + cargo run --bin mpmc_channel + ``` + diff --git a/message-pasing-channels/mpmc_channel/Cargo.toml b/message-pasing-channels/mpmc_channel/Cargo.toml index 15dbf7c..fedf443 100644 --- a/message-pasing-channels/mpmc_channel/Cargo.toml +++ b/message-pasing-channels/mpmc_channel/Cargo.toml @@ -9,4 +9,5 @@ edition = "2021" crossbeam = "0.8.4" slog-term = "2.9.0" slog-async = "2.8.0" -slog = { version = "2.7.0" } \ No newline at end of file +slog = { version = "2.7.0" } +num = "0.4.1" \ No newline at end of file diff --git a/message-pasing-channels/mpmc_channel/src/main.rs b/message-pasing-channels/mpmc_channel/src/main.rs index abb6d9c..a092b83 100644 --- a/message-pasing-channels/mpmc_channel/src/main.rs +++ b/message-pasing-channels/mpmc_channel/src/main.rs @@ -1,13 +1,17 @@ -use std::thread::{self, sleep}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::thread; use std::time::Duration; #[macro_use] extern crate slog; extern crate slog_async; extern crate slog_term; use slog::Drain; +extern crate num; +use num::BigUint; -static THREAD_LENGTH: i32 = 10; -static WORKER_LENGTH: i32 = 5; +static THREAD_LENGTH: u32 = 10; +static WORKER_LENGTH: u32 = 5; fn main() { // logger @@ -17,32 +21,34 @@ fn main() { let logger = slog::Logger::root(drain, o!()); let (sender, receiver) = crossbeam::channel::unbounded(); let mut children = Vec::new(); + let output = Arc::new(RwLock::new(HashMap::::new())); // receivers for n in 0..WORKER_LENGTH { let logger_cpy = logger.clone(); let receiver_cpy = receiver.clone(); + let output_cpy = Arc::clone(&output); children.push(thread::spawn(move || { - while let Ok(event) = receiver_cpy.recv_timeout(Duration::from_secs(5)) { - info!(logger_cpy, "Received event-{event}"); - info!(logger_cpy, "Processing event-{event}"); - sleep(Duration::from_secs(5)); - info!(logger_cpy, "Processing event-{event} successful!\n"); + while let Ok(number) = receiver_cpy.recv_timeout(Duration::from_secs(3)) { + info!(logger_cpy, "Received event-{number}"); + let mut output = output_cpy.write().expect("RwLock poisoned"); + output.insert(number, factorial(number)); + info!(logger_cpy, "Processing event-{number} successful!\n"); } - warn!(logger_cpy, "No event emitted in last 10 seconds"); + warn!(logger_cpy, "No event emitted in last 5 seconds"); warn!(logger_cpy, "Terminating worker {n}"); })); } // senders - for id in 0..THREAD_LENGTH { + for number in 0..THREAD_LENGTH { let logger_cpy = logger.clone(); let thread_tx = sender.clone(); children.push(thread::spawn(move || { - info!(logger_cpy, "Sending event-{id}..."); - thread_tx.send(id).unwrap(); + info!(logger_cpy, "Sending event-{:?}...", number); + thread_tx.send(number).unwrap(); })); } @@ -51,4 +57,26 @@ fn main() { } info!(logger, "Finished!"); -} \ No newline at end of file + info!(logger, "Factorial output: {:#?}", output.read().unwrap()); +} + +fn factorial(number: u32) -> BigUint { + let big_1 = 1u32.into(); + let big_2 = 2u32.into(); + + if number < big_2 { + big_1 + } else { + let prev_factorial = factorial(number.clone() - 1); + number * prev_factorial + } +} + +#[test] +fn test_factorial() { + assert_eq!(factorial(0), 1u32.into()); + assert_eq!(factorial(1), 1u32.into()); + assert_eq!(factorial(2), 2u32.into()); + assert_eq!(factorial(3), 6u32.into()); + assert_eq!(factorial(10), 3628800u32.into()); +} diff --git a/message-pasing-channels/mpsc_channel/Cargo.toml b/message-pasing-channels/mpsc_channel/Cargo.toml index 252fde1..f82cef1 100644 --- a/message-pasing-channels/mpsc_channel/Cargo.toml +++ b/message-pasing-channels/mpsc_channel/Cargo.toml @@ -8,4 +8,5 @@ edition = "2021" [dependencies] slog-term = "2.9.0" slog-async = "2.8.0" -slog = { version = "2.7.0" } \ No newline at end of file +slog = { version = "2.7.0" } +num = "0.4.1" diff --git a/message-pasing-channels/mpsc_channel/src/main.rs b/message-pasing-channels/mpsc_channel/src/main.rs index 527fb3d..cb32bd6 100644 --- a/message-pasing-channels/mpsc_channel/src/main.rs +++ b/message-pasing-channels/mpsc_channel/src/main.rs @@ -1,13 +1,16 @@ -use std::sync::mpsc; -use std::thread::{self, sleep}; +use std::collections::HashMap; +use std::sync::{mpsc, Arc, RwLock}; +use std::thread; use std::time::Duration; #[macro_use] extern crate slog; extern crate slog_async; extern crate slog_term; use slog::Drain; +extern crate num; +use num::BigUint; +static THREAD_LENGTH: u32 = 3; -static THREAD_LENGTH: i32 = 10; fn main() { let decorator = slog_term::TermDecorator::new().build(); @@ -17,26 +20,27 @@ fn main() { let (sender, receiver) = mpsc::channel(); let mut children = Vec::new(); let logger_cpy = logger.clone(); - + let output = Arc::new(RwLock::new(HashMap::::new())); + let output_cpy = Arc::clone(&output); // receiver children.push(thread::spawn(move || { - while let Ok(event) = receiver.recv_timeout(Duration::from_secs(5)) { - info!(logger_cpy, "Received event-{event}"); - info!(logger_cpy, "Processing event-{event}"); - sleep(Duration::from_secs(5)); - info!(logger_cpy, "Processing event-{event} successful!\n"); + while let Ok(number) = receiver.recv_timeout(Duration::from_secs(3)) { + info!(logger_cpy, "Received event-{number}"); + let mut output = output_cpy.write().expect("RwLock poisoned"); + output.insert(number, factorial(number)); + info!(logger_cpy, "Processing event-{number} successful!\n"); } warn!(logger_cpy, "No event emitted in last 10 seconds"); })); // senders - for id in 0..THREAD_LENGTH { + for number in 0..THREAD_LENGTH { let logger_cpy = logger.clone(); let thread_tx = sender.clone(); children.push(thread::spawn(move || { - info!(logger_cpy, "Sending event-{id}..."); - thread_tx.send(id).unwrap(); + info!(logger_cpy, "Sending event-{number}..."); + thread_tx.send(number).unwrap(); })); } @@ -45,4 +49,26 @@ fn main() { } info!(logger, "Finished!"); + info!(logger, "Factorial output: {:#?}", output.read().unwrap()); +} + +fn factorial(number: u32) -> BigUint { + let big_1 = 1u32.into(); + let big_2 = 2u32.into(); + + if number < big_2 { + big_1 + } else { + let prev_factorial = factorial(number.clone() - 1); + number * prev_factorial + } +} + +#[test] +fn test_factorial() { + assert_eq!(factorial(0), 1u32.into()); + assert_eq!(factorial(1), 1u32.into()); + assert_eq!(factorial(2), 2u32.into()); + assert_eq!(factorial(3), 6u32.into()); + assert_eq!(factorial(10), 3628800u32.into()); }