diff --git a/.gitignore b/.gitignore index a147089..0d41899 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target/ -env/ \ No newline at end of file +env/ +*.lock \ No newline at end of file diff --git a/message-pasing-channels/Cargo.toml b/message-pasing-channels/Cargo.toml new file mode 100644 index 0000000..8245d0a --- /dev/null +++ b/message-pasing-channels/Cargo.toml @@ -0,0 +1,8 @@ +[workspace.package] +authors = ["The HugoByte Team "] +edition = "2021" +repository = "https://github.com/HugoByte/PoCs.git" +version = "0.0.1" + +[workspace] +members = ["mpmc_channel", "mpsc_channel"] \ No newline at end of file diff --git a/message-pasing-channels/README.md b/message-pasing-channels/README.md new file mode 100644 index 0000000..3ffc444 --- /dev/null +++ b/message-pasing-channels/README.md @@ -0,0 +1,42 @@ +# **Message Passing Channels in Rust** + +### Description + +Implementations of message-passing channels in Rust: + +- `mpsc_channel`: Utilizes an MPSC (multiple producer, single consumer) channel from the `std::sync::mpsc` module. +- `mpmc_channel`: Employs an MPMC (multiple producer, multiple consumer) channel from the `crossbeam` crate. + +Both implementations demonstrate how to send and receive messages between multiple threads, calculate factorials concurrently, and store the results in a shared data structure. + +### Dependencies + +Both projects share the following dependencies: + +- `slog`: For logging purposes. +- `slog-term`: Provides a terminal logger. +- `slog-async`: For building `Drain` for the `slog`. +- `num`: Facilitates working with large numbers using the `BigUint` type. + +### Building and Running + +1. Clone the repository: + ```Bash + git clone https://github.com/your-username/message-passing-channels.git + ``` +2. Navigate to the project directory: + ```Bash + cd message-passing-channels + ``` +3. Build the project: + ```Bash + cargo build --all + ``` +4. Run the desired project: + ```Bash** + cargo run --bin mpsc_channel + ``` + ```Bash** + cargo run --bin mpmc_channel + ``` + diff --git a/message-pasing-channels/mpmc_channel/Cargo.toml b/message-pasing-channels/mpmc_channel/Cargo.toml new file mode 100644 index 0000000..fedf443 --- /dev/null +++ b/message-pasing-channels/mpmc_channel/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "mpmc_channel" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +crossbeam = "0.8.4" +slog-term = "2.9.0" +slog-async = "2.8.0" +slog = { version = "2.7.0" } +num = "0.4.1" \ No newline at end of file diff --git a/message-pasing-channels/mpmc_channel/src/main.rs b/message-pasing-channels/mpmc_channel/src/main.rs new file mode 100644 index 0000000..a092b83 --- /dev/null +++ b/message-pasing-channels/mpmc_channel/src/main.rs @@ -0,0 +1,82 @@ +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::Duration; +#[macro_use] +extern crate slog; +extern crate slog_async; +extern crate slog_term; +use slog::Drain; +extern crate num; +use num::BigUint; + +static THREAD_LENGTH: u32 = 10; +static WORKER_LENGTH: u32 = 5; + +fn main() { + // logger + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let logger = slog::Logger::root(drain, o!()); + let (sender, receiver) = crossbeam::channel::unbounded(); + let mut children = Vec::new(); + let output = Arc::new(RwLock::new(HashMap::::new())); + + // receivers + for n in 0..WORKER_LENGTH { + let logger_cpy = logger.clone(); + let receiver_cpy = receiver.clone(); + let output_cpy = Arc::clone(&output); + + children.push(thread::spawn(move || { + while let Ok(number) = receiver_cpy.recv_timeout(Duration::from_secs(3)) { + info!(logger_cpy, "Received event-{number}"); + let mut output = output_cpy.write().expect("RwLock poisoned"); + output.insert(number, factorial(number)); + info!(logger_cpy, "Processing event-{number} successful!\n"); + } + warn!(logger_cpy, "No event emitted in last 5 seconds"); + warn!(logger_cpy, "Terminating worker {n}"); + })); + } + + // senders + for number in 0..THREAD_LENGTH { + let logger_cpy = logger.clone(); + let thread_tx = sender.clone(); + + children.push(thread::spawn(move || { + info!(logger_cpy, "Sending event-{:?}...", number); + thread_tx.send(number).unwrap(); + })); + } + + for child in children { + child.join().expect("oops! the child thread panicked"); + } + + info!(logger, "Finished!"); + info!(logger, "Factorial output: {:#?}", output.read().unwrap()); +} + +fn factorial(number: u32) -> BigUint { + let big_1 = 1u32.into(); + let big_2 = 2u32.into(); + + if number < big_2 { + big_1 + } else { + let prev_factorial = factorial(number.clone() - 1); + number * prev_factorial + } +} + +#[test] +fn test_factorial() { + assert_eq!(factorial(0), 1u32.into()); + assert_eq!(factorial(1), 1u32.into()); + assert_eq!(factorial(2), 2u32.into()); + assert_eq!(factorial(3), 6u32.into()); + assert_eq!(factorial(10), 3628800u32.into()); +} diff --git a/message-pasing-channels/mpsc_channel/Cargo.toml b/message-pasing-channels/mpsc_channel/Cargo.toml new file mode 100644 index 0000000..f82cef1 --- /dev/null +++ b/message-pasing-channels/mpsc_channel/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "mpsc_channel" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +slog-term = "2.9.0" +slog-async = "2.8.0" +slog = { version = "2.7.0" } +num = "0.4.1" diff --git a/message-pasing-channels/mpsc_channel/src/main.rs b/message-pasing-channels/mpsc_channel/src/main.rs new file mode 100644 index 0000000..cb32bd6 --- /dev/null +++ b/message-pasing-channels/mpsc_channel/src/main.rs @@ -0,0 +1,74 @@ +use std::collections::HashMap; +use std::sync::{mpsc, Arc, RwLock}; +use std::thread; +use std::time::Duration; +#[macro_use] +extern crate slog; +extern crate slog_async; +extern crate slog_term; +use slog::Drain; +extern crate num; +use num::BigUint; +static THREAD_LENGTH: u32 = 3; + + +fn main() { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let logger = slog::Logger::root(drain, o!()); + let (sender, receiver) = mpsc::channel(); + let mut children = Vec::new(); + let logger_cpy = logger.clone(); + let output = Arc::new(RwLock::new(HashMap::::new())); + let output_cpy = Arc::clone(&output); + // receiver + children.push(thread::spawn(move || { + while let Ok(number) = receiver.recv_timeout(Duration::from_secs(3)) { + info!(logger_cpy, "Received event-{number}"); + let mut output = output_cpy.write().expect("RwLock poisoned"); + output.insert(number, factorial(number)); + info!(logger_cpy, "Processing event-{number} successful!\n"); + } + warn!(logger_cpy, "No event emitted in last 10 seconds"); + })); + + // senders + for number in 0..THREAD_LENGTH { + let logger_cpy = logger.clone(); + let thread_tx = sender.clone(); + + children.push(thread::spawn(move || { + info!(logger_cpy, "Sending event-{number}..."); + thread_tx.send(number).unwrap(); + })); + } + + for child in children { + child.join().expect("oops! the child thread panicked"); + } + + info!(logger, "Finished!"); + info!(logger, "Factorial output: {:#?}", output.read().unwrap()); +} + +fn factorial(number: u32) -> BigUint { + let big_1 = 1u32.into(); + let big_2 = 2u32.into(); + + if number < big_2 { + big_1 + } else { + let prev_factorial = factorial(number.clone() - 1); + number * prev_factorial + } +} + +#[test] +fn test_factorial() { + assert_eq!(factorial(0), 1u32.into()); + assert_eq!(factorial(1), 1u32.into()); + assert_eq!(factorial(2), 2u32.into()); + assert_eq!(factorial(3), 6u32.into()); + assert_eq!(factorial(10), 3628800u32.into()); +}