ixblog | ixpantia

Getting started with Async Programming in R with Promises and Rust 🦀

Written by Andrés Quintero Moreano | Sep 12, 2024 8:08:32 PM

Writing asynchronous or parallel R can be a challenge for many. Worry no more, in this guide you will go from zero to hero in writing R promises that wrap asynchronous tasks in Rust. 

Note: This is NOT going to show the most optimal way of doing things, it is only to demonstrate how to get started.

How do promises work?

A promise in R is a routine that polls some sort of task until it is able to retrieve a value or fails. Polling is like a kid asking his dad on a road trip “Are we there yet?”. This metaphor is really good for getting the fundamentals down.

The most generic version of a promise that I could come up with is this.

run_async_task <- function() { ... } # A function that returns a pollable task
task_is_done <- function(task) { ... } # Polls the task and returns if its done
task_get_result <- function(task) { ... } # Returns the result of the task

generic_promise <- function() {
  # We will poll every 50ms aprox
  POLL_INTERVAL <- 0.05
  task <- run_async_task()

  promises::promise(function(resolve, reject) {

    poll_recursive <- function() { 
      is_done <- task_is_done(task)
      if (is_done) {
        resolve(task_get_result(task))
      } else {
        later::later(poll_recursive, POLL_INTERVAL)
      }
    }
    poll_recursive()
  })

}

We should be able to implement almost any task into this template. Note that is template does not handle rejections, that might be something you want to add but that is for you to figure out in your specific use case.

Building an async task that waits for a file

Now that we have a basic understanding of how promises work, let's create a promise that only resolves once a file is created. We will copy our template and modify it slightly.

wait_until_file_exists <- function(file_path) {
  # We will poll every 50ms aprox
  POLL_INTERVAL <- 0.05

  promises::promise(function(resolve, reject) {

    poll_recursive <- function() { 
      is_done <- file.exists(file_path)
      if (is_done) {
        resolve(file_path)
      } else {
        later::later(poll_recursive, POLL_INTERVAL)
      }
    }
    poll_recursive()
  })

}

There we have it. A promise that only resolves once a file exists.

Using the promises::then function

Ok, we have a way to create promises, so what? How can we use this to build async R. Well we can use this promise to create other promises that run other R code once one or many promises get resolved.

In the file example we resolved the file_path. This is very intentional since we can use the path to do something with the file once it actually exists.

read_lines_once_it_exists <- function(file_path) {
  promises::then(
    wait_until_file_exists(file_path),
    onFulfilled = \(file_path) {
      readLines(file_path)
    }
  )
}

This function uses promises::then to create a new promise that reads the file once it exists. We create a function for the onFulfilled case. Since this promise never fails we do not need an onRejected but that case could also be handled if we wanted to.

We can then create a promise that prints the contents of the file once it exists and has been read.

promises::then(
  read_lines_once_it_exists("test.csv"),
  onFulfilled = print
)

We should see the contents of the file once it is created!

Using the %...>% pipe

The %...>% is basically just a wrapper for the promises::then function. We can rewrite our last promise from using then to using the pipe.

promises::then(
  read_lines_once_it_exists("test.csv"),
  onFulfilled = print
)

# Is equivalent to
read_lines_once_it_exists("test.csv") %...>%
  print()

Since the promises package relies on an event loop every call to promises::then or the %...>% pipe yields execution to the next task (this is very typical of any event loop runtime).

True parallel execution using Rust

This is all async programming is about in R. However, if you want to make your R code do actual parallelism and run tasks in separate threads we can call our good friend Rust and rextendr for help!

This section will assume you know some Rust.

We will start by creating a new R package called asyncFileRead. We will then initialize it using:

rextendr::use_extendr()
Now let’s modify our lib.rs file and create our parallel task. 
struct AsyncReadTask {
    // We need a reference counted atomic boolean
    // so we can modify it from a separate thread
    is_done: Arc<AtomicBool>,
    // This Mutex will allow us the get the values
    // of the file into memory in a thread-safe
    // manner
    file_content: Arc<Mutex<Option<Vec<u8>>>>,
}

Now that we have our task, let’s create the implementation that will allow us to read the file from another thread, poll the task to check if it’s done, and fetch the values once we are done.

#[extendr]
impl AsyncReadTask {
    fn new(path: String) -> Self {
        let is_done = Arc::new(AtomicBool::new(false));
        let file_content = Arc::new(Mutex::new(None));
        {
            // Ref count copy the file_content and is_done
            let file_content = Arc::clone(&file_content);
            let is_done = Arc::clone(&is_done);
            // spawn a thread that reads the file
            std::thread::spawn(move || {
                use std::io::Read;
                let mut file = File::open(path).expect("Unable to open file");
                let mut content = Vec::new();
                file.read_to_end(&mut content).expect("Unable to read file");
                // Save the file contents into the mutex
                let mut file_content = file_content.lock().expect("Unable to lock mutex");
                *file_content = Some(content);
                // Change the is_done to true
                is_done.store(true, Ordering::SeqCst);
            });
        }
        AsyncReadTask {
            file_content,
            is_done,
        }
    }
    fn is_done(&self) -> bool {
        self.is_done.load(Ordering::SeqCst)
    }
    fn get_content(&self) -> Nullable<Vec<u8>> {
        let mut file_content = self.file_content.lock().expect("Unable to lock mutex");
        file_content.take().into()
    }
}

We now have all we need to build a promise around our asynchronous read task. We can once again use our template to build it.

read_promise_async <- function(path) {
  # We will poll every 50ms aprox
  POLL_INTERVAL <- 0.05
  task <- AsyncReadTask$new(path)

  promises::promise(function(resolve, reject) {

    poll_recursive <- function() { 
      is_done <- task$is_done()
      if (is_done) {
        resolve(task$get_content())
      } else {
        later::later(poll_recursive, POLL_INTERVAL)
      }
    }
    poll_recursive()
})
}

That’s it. We have a function that runs a read operation in parallel and a promise that gets fulfilled once it is done reading the file.

Conclusion

Getting started with promises in R requires a different way of thinking about how to organize our code; it runs very differently from synchronous R code. However, once you understand the basics you can build promises that resolve different tasks and combine them to build fully async systems.