Hello everyone), I’m a beginner in Rust. In my code, I launch asynchronous functions simultaneously using Tokio. In each asynchronous function, I offload complex computations to the Rayon thread pool and wait for the result. Everything works fine. However, I don’t understand the key difference between rayon::scope()
and oneshot::channel()
. Which one theoretically works faster: creating a rayon::scope()
or creating a oneshot::channel()
?
To make the question clearer, I will provide two almost identical examples:
oneshot::channel()
// [dependencies] # Cargo.toml
// tokio = { version = "1.41.1", features = ["full"] }
// futures = "0.3.31"
// rayon = "1.10.0"
use std::time::{ Duration, Instant };
use tokio::sync::oneshot;
use futures::future;
use rayon;
#[tokio::main]
async fn main() {
let start = Instant::now(); // DEBUG: Start timer
let requests: Vec<usize> = (0..=10000).collect(); // Generate requests from 0 to 10000
println!("Starting async tasks...");
// Spawn asynchronous tasks for each request
let handles: Vec<_> = requests
.into_iter()
.map(|req_id| {
tokio::spawn(async move {
// Step 1: Perform asynchronous request (simulated delay)
let request_result = async_request(req_id).await;
// Step 2: Perform computation in the standard Rayon thread pool
let (send, recv) = oneshot::channel(); // Create a oneshot channel to send the result
// Perform the complex computation using the standard Rayon pool
rayon::spawn(move || {
let result = complex_branching_computation(request_result); // Perform the computation
send.send(result).expect("Failed to send computation result"); // Send the result via the oneshot channel
});
// Wait for the result from the Rayon thread pool
let computation_result = recv.await.expect("Failed to receive computation result");
// Output the result
println!(
"Computation completed for request ID: {} with result: {}",
req_id,
computation_result
);
computation_result
})
})
.collect();
// Wait for all tasks to complete
let results: Vec<_> = future
::join_all(handles).await
.into_iter()
.map(|res| res.expect("Task panicked"))
.collect();
let elapsed = start.elapsed();
println!("Elapsed time: {:?}", elapsed); // DEBUG: Print the elapsed time
}
// Simulate an async request (e.g., an HTTP request or some other async operation)
async fn async_request(req_id: usize) -> usize {
use tokio::time::{ sleep, Duration };
println!("Processing request {}", req_id);
sleep(Duration::from_millis(500)).await; // Simulate async sleep
req_id * 2 // Return some processed data based on req_id
}
// Complex computation function, for example, factorial or sum of odd numbers
fn complex_branching_computation(input: usize) -> usize {
println!("Performing branching computation for {}", input);
if input % 2 == 0 {
(1..=input).fold(1, |acc, x| (acc * x) % 1_000_000_007) // Compute factorial modulo a large prime number for even inputs
} else {
(1..=input).filter(|x| x % 2 == 1).sum() // Sum odd numbers for odd inputs
}
}
rayon::scope()
// [dependencies] # Cargo.toml
// tokio = { version = "1.41.1", features = ["full"] }
// futures = "0.3.31"
// rayon = "1.10.0"
use std::time::{ Duration, Instant };
use tokio::time::sleep;
use futures::future;
use rayon;
#[tokio::main]
async fn main() {
let start = Instant::now(); // DEBUG: Start timer
let requests: Vec<usize> = (0..=10000).collect(); // Generate requests from 0 to 10000
println!("Starting async tasks...");
// Spawn asynchronous tasks for each request
let handles: Vec<_> = requests
.into_iter()
.map(|req_id| {
tokio::spawn(async move {
// Step 1: Perform asynchronous request (simulated delay)
let request_result = async_request(req_id).await;
// Step 2: Perform computation using the default Rayon thread pool
// Use Rayon’s scope for parallelism
rayon::scope(|s| {
// Spawn a task in the Rayon thread pool
s.spawn(move |_| {
let computation_result = complex_branching_computation(request_result);
// We can handle the result here or do something else
println!(
"Computation completed for request ID: {} with result: {}",
req_id,
computation_result
);
});
});
// Since the computation was done within Rayon scope, we don't need to return anything here.
// The result was handled inside the Rayon task.
})
})
.collect();
// Wait for all tasks to complete
let results: Vec<_> = future
::join_all(handles).await
.into_iter()
.map(|res| res.expect("Task panicked"))
.collect();
let elapsed = start.elapsed();
println!("Elapsed time: {:?}", elapsed); // DEBUG: Print the elapsed time
}
// Simulate an async request (e.g., an HTTP request or some other async operation)
async fn async_request(req_id: usize) -> usize {
println!("Processing request {}", req_id);
sleep(Duration::from_millis(500)).await; // Simulate async sleep
req_id * 2 // Return some processed data based on req_id
}
// Complex computation function, for example, factorial or sum of odd numbers
fn complex_branching_computation(input: usize) -> usize {
println!("Performing branching computation for {}", input);
if input % 2 == 0 {
(1..=input).fold(1, |acc, x| (acc * x) % 1_000_000_007) // Compute factorial modulo a large prime number for even inputs
} else {
(1..=input).filter(|x| x % 2 == 1).sum() // Sum odd numbers for odd inputs
}
}
2 posts - 2 participants