• Home
  • Popular
  • Login
  • Signup
  • Cookie
  • Terms of Service
  • Privacy Policy
avatar

Posted by John Dev


01 Dec, 2024

Updated at 14 Dec, 2024

Tokio and Rayon. Asynchrony and parallelism

Hello everyone), I’m a beginner in Rust. In my code, I launch asynchronous functions simultaneously using Tokio. In each asynchronous function, I offload complex computations to the Rayon thread pool and wait for the result. Everything works fine. However, I don’t understand the key difference between rayon::scope() and oneshot::channel(). Which one theoretically works faster: creating a rayon::scope() or creating a oneshot::channel()?

To make the question clearer, I will provide two almost identical examples:

  • Using oneshot::channel()
// [dependencies] # Cargo.toml
// tokio = { version = "1.41.1", features = ["full"] }
// futures = "0.3.31"
// rayon = "1.10.0"

use std::time::{ Duration, Instant };
use tokio::sync::oneshot;
use futures::future;
use rayon;

#[tokio::main]
async fn main() {
    let start = Instant::now(); // DEBUG: Start timer
    let requests: Vec<usize> = (0..=10000).collect(); // Generate requests from 0 to 10000

    println!("Starting async tasks...");

    // Spawn asynchronous tasks for each request
    let handles: Vec<_> = requests
        .into_iter()
        .map(|req_id| {
            tokio::spawn(async move {
                // Step 1: Perform asynchronous request (simulated delay)
                let request_result = async_request(req_id).await;

                // Step 2: Perform computation in the standard Rayon thread pool
                let (send, recv) = oneshot::channel(); // Create a oneshot channel to send the result

                // Perform the complex computation using the standard Rayon pool
                rayon::spawn(move || {
                    let result = complex_branching_computation(request_result); // Perform the computation
                    send.send(result).expect("Failed to send computation result"); // Send the result via the oneshot channel
                });

                // Wait for the result from the Rayon thread pool
                let computation_result = recv.await.expect("Failed to receive computation result");

                // Output the result
                println!(
                    "Computation completed for request ID: {} with result: {}",
                    req_id,
                    computation_result
                );

                computation_result
            })
        })
        .collect();

    // Wait for all tasks to complete
    let results: Vec<_> = future
        ::join_all(handles).await
        .into_iter()
        .map(|res| res.expect("Task panicked"))
        .collect();

    let elapsed = start.elapsed();
    println!("Elapsed time: {:?}", elapsed); // DEBUG: Print the elapsed time
}

// Simulate an async request (e.g., an HTTP request or some other async operation)
async fn async_request(req_id: usize) -> usize {
    use tokio::time::{ sleep, Duration };
    println!("Processing request {}", req_id);
    sleep(Duration::from_millis(500)).await; // Simulate async sleep
    req_id * 2 // Return some processed data based on req_id
}

// Complex computation function, for example, factorial or sum of odd numbers
fn complex_branching_computation(input: usize) -> usize {
    println!("Performing branching computation for {}", input);
    if input % 2 == 0 {
        (1..=input).fold(1, |acc, x| (acc * x) % 1_000_000_007) // Compute factorial modulo a large prime number for even inputs
    } else {
        (1..=input).filter(|x| x % 2 == 1).sum() // Sum odd numbers for odd inputs
    }
}
  • Using rayon::scope()
// [dependencies] # Cargo.toml
// tokio = { version = "1.41.1", features = ["full"] }
// futures = "0.3.31"
// rayon = "1.10.0"

use std::time::{ Duration, Instant };
use tokio::time::sleep;
use futures::future;
use rayon;

#[tokio::main]
async fn main() {
    let start = Instant::now(); // DEBUG: Start timer
    let requests: Vec<usize> = (0..=10000).collect(); // Generate requests from 0 to 10000

    println!("Starting async tasks...");

    // Spawn asynchronous tasks for each request
    let handles: Vec<_> = requests
        .into_iter()
        .map(|req_id| {
            tokio::spawn(async move {
                // Step 1: Perform asynchronous request (simulated delay)
                let request_result = async_request(req_id).await;

                // Step 2: Perform computation using the default Rayon thread pool
                // Use Rayon’s scope for parallelism
                rayon::scope(|s| {
                    // Spawn a task in the Rayon thread pool
                    s.spawn(move |_| {
                        let computation_result = complex_branching_computation(request_result);
                        // We can handle the result here or do something else
                        println!(
                            "Computation completed for request ID: {} with result: {}",
                            req_id,
                            computation_result
                        );
                    });
                });

                // Since the computation was done within Rayon scope, we don't need to return anything here.
                // The result was handled inside the Rayon task.
            })
        })
        .collect();

    // Wait for all tasks to complete
    let results: Vec<_> = future
        ::join_all(handles).await
        .into_iter()
        .map(|res| res.expect("Task panicked"))
        .collect();

    let elapsed = start.elapsed();
    println!("Elapsed time: {:?}", elapsed); // DEBUG: Print the elapsed time
}

// Simulate an async request (e.g., an HTTP request or some other async operation)
async fn async_request(req_id: usize) -> usize {
    println!("Processing request {}", req_id);
    sleep(Duration::from_millis(500)).await; // Simulate async sleep
    req_id * 2 // Return some processed data based on req_id
}

// Complex computation function, for example, factorial or sum of odd numbers
fn complex_branching_computation(input: usize) -> usize {
    println!("Performing branching computation for {}", input);
    if input % 2 == 0 {
        (1..=input).fold(1, |acc, x| (acc * x) % 1_000_000_007) // Compute factorial modulo a large prime number for even inputs
    } else {
        (1..=input).filter(|x| x % 2 == 1).sum() // Sum odd numbers for odd inputs
    }
}

2 posts - 2 participants

Read full topic