Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target/
env/
env/
*.lock
8 changes: 8 additions & 0 deletions message-pasing-channels/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[workspace.package]
authors = ["The HugoByte Team <hello@hugobyte.com>"]
edition = "2021"
repository = "https://github.com/HugoByte/PoCs.git"
version = "0.0.1"

[workspace]
members = ["mpmc_channel", "mpsc_channel"]
42 changes: 42 additions & 0 deletions message-pasing-channels/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# **Message Passing Channels in Rust**

### Description

Implementations of message-passing channels in Rust:

- `mpsc_channel`: Utilizes an MPSC (multiple producer, single consumer) channel from the `std::sync::mpsc` module.
- `mpmc_channel`: Employs an MPMC (multiple producer, multiple consumer) channel from the `crossbeam` crate.

Both implementations demonstrate how to send and receive messages between multiple threads, calculate factorials concurrently, and store the results in a shared data structure.

### Dependencies

Both projects share the following dependencies:

- `slog`: For logging purposes.
- `slog-term`: Provides a terminal logger.
- `slog-async`: For building `Drain` for the `slog`.
- `num`: Facilitates working with large numbers using the `BigUint` type.

### Building and Running

1. Clone the repository:
```Bash
git clone https://github.com/your-username/message-passing-channels.git
```
2. Navigate to the project directory:
```Bash
cd message-passing-channels
```
3. Build the project:
```Bash
cargo build --all
```
4. Run the desired project:
```Bash**
cargo run --bin mpsc_channel
```
```Bash**
cargo run --bin mpmc_channel
```

13 changes: 13 additions & 0 deletions message-pasing-channels/mpmc_channel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "mpmc_channel"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
crossbeam = "0.8.4"
slog-term = "2.9.0"
slog-async = "2.8.0"
slog = { version = "2.7.0" }
num = "0.4.1"
82 changes: 82 additions & 0 deletions message-pasing-channels/mpmc_channel/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_term;
use slog::Drain;
extern crate num;
use num::BigUint;

static THREAD_LENGTH: u32 = 10;
static WORKER_LENGTH: u32 = 5;

fn main() {
// logger
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let logger = slog::Logger::root(drain, o!());
let (sender, receiver) = crossbeam::channel::unbounded();
let mut children = Vec::new();
let output = Arc::new(RwLock::new(HashMap::<u32, BigUint>::new()));

// receivers
for n in 0..WORKER_LENGTH {
let logger_cpy = logger.clone();
let receiver_cpy = receiver.clone();
let output_cpy = Arc::clone(&output);

children.push(thread::spawn(move || {
while let Ok(number) = receiver_cpy.recv_timeout(Duration::from_secs(3)) {
info!(logger_cpy, "Received event-{number}");
let mut output = output_cpy.write().expect("RwLock poisoned");
output.insert(number, factorial(number));
info!(logger_cpy, "Processing event-{number} successful!\n");
}
warn!(logger_cpy, "No event emitted in last 5 seconds");
warn!(logger_cpy, "Terminating worker {n}");
}));
}

// senders
for number in 0..THREAD_LENGTH {
let logger_cpy = logger.clone();
let thread_tx = sender.clone();

children.push(thread::spawn(move || {
info!(logger_cpy, "Sending event-{:?}...", number);
thread_tx.send(number).unwrap();
}));
}

for child in children {
child.join().expect("oops! the child thread panicked");
}

info!(logger, "Finished!");
info!(logger, "Factorial output: {:#?}", output.read().unwrap());
}

fn factorial(number: u32) -> BigUint {
let big_1 = 1u32.into();
let big_2 = 2u32.into();

if number < big_2 {
big_1
} else {
let prev_factorial = factorial(number.clone() - 1);
number * prev_factorial
}
}

#[test]
fn test_factorial() {
assert_eq!(factorial(0), 1u32.into());
assert_eq!(factorial(1), 1u32.into());
assert_eq!(factorial(2), 2u32.into());
assert_eq!(factorial(3), 6u32.into());
assert_eq!(factorial(10), 3628800u32.into());
}
12 changes: 12 additions & 0 deletions message-pasing-channels/mpsc_channel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "mpsc_channel"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
slog-term = "2.9.0"
slog-async = "2.8.0"
slog = { version = "2.7.0" }
num = "0.4.1"
74 changes: 74 additions & 0 deletions message-pasing-channels/mpsc_channel/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::collections::HashMap;
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
use std::time::Duration;
#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_term;
use slog::Drain;
extern crate num;
use num::BigUint;
static THREAD_LENGTH: u32 = 3;


fn main() {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let logger = slog::Logger::root(drain, o!());
let (sender, receiver) = mpsc::channel();
let mut children = Vec::new();
let logger_cpy = logger.clone();
let output = Arc::new(RwLock::new(HashMap::<u32, BigUint>::new()));
let output_cpy = Arc::clone(&output);
// receiver
children.push(thread::spawn(move || {
while let Ok(number) = receiver.recv_timeout(Duration::from_secs(3)) {
info!(logger_cpy, "Received event-{number}");
let mut output = output_cpy.write().expect("RwLock poisoned");
output.insert(number, factorial(number));
info!(logger_cpy, "Processing event-{number} successful!\n");
}
warn!(logger_cpy, "No event emitted in last 10 seconds");
}));

// senders
for number in 0..THREAD_LENGTH {
let logger_cpy = logger.clone();
let thread_tx = sender.clone();

children.push(thread::spawn(move || {
info!(logger_cpy, "Sending event-{number}...");
thread_tx.send(number).unwrap();
}));
}

for child in children {
child.join().expect("oops! the child thread panicked");
}

info!(logger, "Finished!");
info!(logger, "Factorial output: {:#?}", output.read().unwrap());
}

fn factorial(number: u32) -> BigUint {
let big_1 = 1u32.into();
let big_2 = 2u32.into();

if number < big_2 {
big_1
} else {
let prev_factorial = factorial(number.clone() - 1);
number * prev_factorial
}
}

#[test]
fn test_factorial() {
assert_eq!(factorial(0), 1u32.into());
assert_eq!(factorial(1), 1u32.into());
assert_eq!(factorial(2), 2u32.into());
assert_eq!(factorial(3), 6u32.into());
assert_eq!(factorial(10), 3628800u32.into());
}