| Rust - A low-level echo server using io_uring
Part five of creating a web-based game in Rust. Digging into io_uring.
Aug

Introduction

This is part of a series where I’m working towards building a game from scratch using Rust. By scratch, I mean using only the standard library as much as possible. So far, the only thing that has been somewhat of a cheat was creating io_uring bindings.

If you don’t want to listen to me prattle on for pages, you can find the repo for everything here.

Today’s goal

In this article we build off what we’ve already learned about io_uring and extend that to build an async echo server. This server is:

  • Single threaded. Well, mostly, as we do use a thread for a sleep hack.
  • Asynchronous, using io_uring, an async API for Linux.
  • An educational toy, not at all production quality.

With that in mind, I’d like you to come away at the end with:

  • An understanding of how io_uring queues work.
  • One way async can function at a low level (see note below).
  • An understanding of how to build an echo server on top of io_uring.

Note: There are many ways to implement async on different operating systems and with different APIs. The io_uring API is relatively new when compared to something like epoll, which is what the tokio async runtime uses.

io_uring, again

Before we dive into code, we’re going to look at the how io_uring works in more depth than we did in the last article. Remember, like its name implies, io_uring is concerned with input and output (I/O), which means that it handles things like:

  • Reading and writing from a file
  • Reading, writing, opening and closing a socket
  • Timeout operations
  • File system operations

What io_uring is made out of, at the surface level, is relatively simple. In fact, it can be defined by three primary components:

  • Entry: A task that io_uring will execute for us.
  • Submission queue (SQ): Where we submit entries.
  • Completion queue (CQ): Where we find completed entries.

Here is the diagram that we used in the last article, though with slightly modified wording:

Overview
Submission and completion queues (remade from here)

You’ll note that we have two queues that sit between user space and kernel space. We’ll talk more about these below, but for now let’s go through the three terms from above.

Entries

There are two types of entries, submission queue entries (SQEs) and completion queue entries (CQEs). Each of these is defined by different data structures, as can be seen here, listed as the structs io_uring_sqe and io_uring_cqe.

These two structs are how we add tasks to io_uring. With submission entries we modify the struct parameters to deal with different I/O operations. After the operating system’s kernel has handled a submission, it will add a completion queue entry to the completion queue. We can then get it and check the result.

We will get a closer look at how these entries are made in the upcoming sections, but for now, just know that for the most part you’ll be learning how to submit entries and how to read completed entries.

Submission queue (SQ)

Submitting an entry to the submission queue is done in shared memory between the user (application space) and the kernel. While you can create the entry in the application space and then copy into the shared space, a more efficient approach is to get a direct pointer to the existing shared memory that will hold the entry and write the data directly to it.

Overview
Writing data directly into the shared memory space is more efficient.

When we create one of these entries we do it in two steps:

  1. Use io_uring_get_sqe to get the next submission queue entry (SQE).
  2. Use a function corresponding to the type of entry we want to create.

These two steps correspond to gettng a pointer for an SQE in shared memory, and then filling that entry. To do the latter, we can do use helper functions, or prep functions, included as part of io_uring. For example, a function like io_uring_prep_accept waits for a connection request. There are many different prep functions, each matching a corresponding I/O operation, such as reading or writing to a file.

Submission queue entries can be quite complex, as shown below, but we’ll only be focusing on a few fields in this article. For example, this is part of the original C struct for submission queue entries:

 
struct io_uring_sqe {
    __u8    opcode;         /* type of operation for this sqe */
    __u8    flags;          /* IOSQE_ flags */
    __u16   ioprio;         /* ioprio for the request */
    __s32   fd;             /* file descriptor to do IO on */
   
    // Additional unions
 	__u32	len;		/* buffer size or number of iovecs */
   
    // Additional unions
    __u64	user_data;	/* data to be passed back at completion time */ 

    // Additional unions
    __u16	personality;
   
};

With all entries, you’ll also find a number of unions listed in the docs. These unions allow a uniform structure with different fields to be used for different I/O types. You don’t need to concern yourself with too many of these for now, as we’ll only be using a few in this article.

Once we’ve added our entry to the queue, we need to call io_uring_submit in order for it to start processing our entries. The reason things are not automatically processed is that it requires a syscall to start, and those can be expensive, so you may want to queue up multiple entries and then submit them all at once.

Completion queue (CQ)

The completion queue is also in shared memory and has a structure similar to that of the submission queue. An io_uring_cqe, per the liburing C source, looks like this:

 
struct io_uring_cqe {
	__u64	user_data;	/* sqe->user_data submission passed back */
	__s32	res;		/* result code for this event */
	__u32	flags;

	/*
	 * If the ring is initialized with IORING_SETUP_CQE32, then this field
	 * contains 16-bytes of padding, doubling the size of the CQE.
	 */
	__u64 big_cqe[];


    // Additional union structs
};

You’ll note this is different from the link above, as it’s missing the big_cqe, likely due to being out of date. I pulled the code snippet above from the io_uring repo, so it should be up to date.

In this struct, res is our result, and tells us if the operation succeeded or failed. As for the others, we are really only concerned with user_data, which as you will see, will become an important part of tracking entries as they are added and removed from the queues.

Creating the server

We’re now going to dive into some code, and actually build our server. I recommend that you refer to the repo as we go through, as I will only be pulling specific code snippets. If you’re like me, you’ll want to have a more bird’s eye view on things.

We’ll start in the echo_server.rs file, where we define our server as a struct with four fields:

 
pub struct EchoServer {
    ring: IoUring,
    listener: TcpListener,
    operations: HashMap<u64, OperationData>,
    next_id: u64,
}

This is relatively simple, with each of these fields doing the following:

  • ring is composed of our two queues, submission and completion.
  • listener is the initial listening socket for the server.
  • operations matches an id to information on a submitted operation.
  • next_id is a counter, aka id used in operations.

All these are initialized in the new function of our EchoServer struct:

 
pub fn new(port: u16) -> io::Result<Self> {
    let ring = IoUring::new(QUEUE_DEPTH)?;
    let listener = TcpListener::bind(("0.0.0.0", port))?;
    listener.set_nonblocking(true)?;

    Ok(Self {
        ring,
        listener,
        operations: HashMap::new(),
        next_id: 0,
    })
}

To set up the io_uring queues With our IoUring instance, we call its new function and perform the following to setup:

 
pub fn new(entries: u32) -> io::Result<Self> {
    let mut ring: io_uring = unsafe { zeroed() };
    let ret = unsafe { io_uring_queue_init(entries, &mut ring, 0) }; // This will return and -errno upon failure

    if ret < 0 {
        return Err(io::Error::from_raw_os_error(-ret));
    }
    Ok(Self { ring })
}

We create a zeroed out io_uring instance called ring, which is then passed to the io_uring_queue_init function where it’s properly initialized in shared memory. Our ring field in EchoServer is is how we’ll access both the submission queue and completion queue.

Jumping back to the EchoServer struct, the next things we have is listener, which is using TcpListener. We could create the socket ourselves via syscalls, but here we access to as_raw_fd, which lets us get at the file descriptor without much hassle. We then set this socket to be non-blocking:

 
let listener = TcpListener::bind(("127.0.0.1", port))?;
listener.set_nonblocking(true)?;

I’m not entirely sure what this non-blocking setting does, but if you check the source for TcpListener, you’ll see if mentions this for Unix:

On Unix platforms, calling this method corresponds to calling fcntl FIONBIO.

Here is a little more on what exactly this means, though I haven’t dug too much further. It could be interesting to look into how this works.

Lastly, there are two items, operations and next_id, which are key to making sure that the operations io_uring handles for us are connected to the correct users. We’re going to talk about these more in the following sections.

That gives us the basis for our server, and we can summarize what we’ve done as follows:

  1. Defined our echo server.
  2. Set up our io_uring queues.
  3. Listening to connections via tcp.

Next we want to look at the overall process of how we work with the queues, after which we’ll go into more detail on how to submit and complete operations.

The starting point

After you create the server it will first hit the run function, which contains the primary application loop:

 
    pub fn run(&mut self) -> io::Result<()> {
        self.add_accept()?;
        self.ring.submit()?;

        loop {
            match self.ring.peek_completion() {
                Some(cqe) => self.handle_completion(cqe)?,
                None => {
                    self.ring.submit()?;
                    std::thread::sleep(Duration::from_millis(1));
                }
            }
        }
    }

Before we loop, though, we add a single accept operation to the queue. This gets us starting by listening for an incoming connection. We haven’t discussed what an accept operation is, yet, but just know that without this our server will ignore all incoming connections and just loop, checking an empty completion queue.

Now, the loop itself does the following:

  • Submit the entries to io_uring for processing.
  • Peek the queue for completions.
  • Either handle completion or submit new entries.
  • Pause for 1ms to prevent excessive polling.

There are ways to modify this such that we aren’t polling the queue, but that will have to wait until we fully implement async. For now, we’ll keep it simple, though a nice challenge might be using [io_uring_queue_init_params](https://man7.org/linux/man-pages/man3/io_uring_queue_init_params.3.html] and setting the parameters so that it polls internally.

Let’s first look at what we do in peek_completion, which is part of the iouring.rs file:

 
pub fn peek_completion(&mut self) -> Option<io_uring_cqe> {
    let mut cqe: *mut io_uring_cqe = ptr::null_mut();
    let ret = unsafe { io_uring_peek_cqe(&mut self.ring, &mut cqe) };

    if ret < 0 || cqe.is_null() {
        None
    } else {
        let result = unsafe { ptr::read(cqe) };
        unsafe { io_uring_cqe_seen(&mut self.ring, cqe) };
        Some(result)
    }
}

What happens here is that we either find a completed entry and return it, or return None. To get an entry we follow these steps:

  • Get a null pointer of type completion queue entry (CQE).
  • Peek the queue to get the next entry.
  • Read that entry into space defined by the null pointer if it exists, or return None.

We then pass this entry to our handle_completion function, which we’re going to discuss in a few sections. First, though, we’ll take a look at what sort of operations we can create and how we’ll submit them to the submission queue.

Operations

We will need to submit and handle every operation we decide to support. If you look in the echo_server.rs file, you’ll find that every add function has a corresponding handle. We’ll discuss more how these work in the following section, but for now let’s look at the three operations we’ll support:

  • Accept: Accepts a new connection
  • Receive: Receive data from a connection
  • Send: Send data to a connection

There are other operations we could support, such as Close, which allows the server to disconnect a user. Instead, we’ll just keep the connection open until user disconnects. This is not ideal, but we’ll fix it later on when we build a more full-featured server.

These operations are defined by an enum and a struct:

 
enum Operation {
    Accept,
    Receive(*mut u8),
    Send(*mut u8),
}

struct OperationData {
    op: Operation,
    fd: RawFd,
}

The fd, or file descriptor, is what holds the socket, and every socket will be associated with one connection, aka a single user. In our EchoServer struct, you’ll remember that we had operations. This is the aforementioned hashmap which associates a u64 (our unique generated id) with OperationData.

 
pub struct EchoServer {
    ring: IoUring,
    listener: TcpListener,
    operations: HashMap<u64, OperationData>,
    next_id: u64,
}

The unique id we generate for each connection will be created by this function which makes use of next_id:

 
fn generate_entry_id(&mut self, op: Operation, fd: RawFd) -> u64 {
    let user_data = self.next_id;
    self.next_id = self.next_id.wrapping_add(1);
    self.operations.insert(user_data, OperationData { op, fd });
    user_data
}

When we initialize the server, next_id is set to 0 and all we’re doing here in the generate_entry_id function is incrementing it. Yes, our id is a super advanced counter that makes use of a wrapping_add. In our operations hashmap, this counter is associated with a single operation, which in turn holds the op/operation we’re doing (Accept, Read, Write) and the fd/file descriptor.

Overview
An example of an operations hashmap for with simplified values.

But, we still have a problem with our entries, and to understand what that problem is, we’ll look at the Rust version of the SQE and CQE structs:

 
// Submission
pub struct io_uring_sqe {
    pub opcode: __u8,
    pub flags: __u8,
    pub ioprio: __u16,
    pub fd: __s32,
    pub len: __u32,
    pub user_data: __u64,
    pub personality: __u16,
    
}

// Completion
pub struct io_uring_cqe {
    pub user_data: __u64,
    pub res: __s32,
    pub flags: __u32,
    pub big_cqe: __IncompleteArrayField<__u64>,
}

Remember that we’ll be handling multiple users performing concurrent operations, and each user will have their own socket, or aka file descriptor. Also keep in mind that when we add an SQE to our submission queue we lose ownership of it. The operating system takes our request and does whatever I/O operation we asked it to do, eventually giving us back a CQE in the completion queue. Given all that, how do we know that the entry we submitted for, say, user Ferris, is the same entry we pull from the completion queue?

Overview
We need a way to track which entry belongs to which user.

Entries are not automatically tagged with some sort of id, so we need a way to track these entries such that when we pull them from the completion queue we can associate them with the proper user. Thankfully, there is one field which is the same in both the SQE and CQE, and that is user_data. along with the id we already created using generate_entry_id and stored in our operations hashmap, to track users and their entries.

This is how it will work:

  1. A user connects via a socket, aka file descriptor (fd).
  2. We accept that connection.
  3. Operations are given a unique id via generate_entry_id.
  4. These operations are stored in a hashmap with id -> operation data.
  5. An SQE is made for the operation and user_data is assigned the id.
  6. When we retrieve a CQE we look at user_data to find the corresponding operation data from the hashmap.

To summarize, we have the hashmap operations in our program that is used to track ids stored in the user_data field of SQEs and CQEs. This hashmap is what allows us to link operations back to specific sockets, and thus return messages to their associated user.

Creating submissions

At this point we’ve created a server that has access to io_uring and we’ve defined how we’ll track the operations that we submit to the queue, but how do we submit these operations? Below is an example of adding a Receive operation:

 
fn add_receive(&mut self, fd: RawFd) -> io::Result<()> {
    let buffer = Box::into_raw(Box::new([0u8; BUFFER_SIZE])) as *mut u8;
    let user_data = self.generate_entry_id(Operation::Receive(buffer), fd);

    self.ring
        .create_entry()
        .set_receive(fd, buffer as *mut u8, BUFFER_SIZE, 0, user_data);

    Ok(())
}

When we do this, we’re telling io_uring that we’re expecting information to come through our file descriptor fd (the socket) and that we want it to put whatever information is received into buffer. Buffer is defined as a Box::into_raw. The into_raw function says that we don’t want Rust to manage the memory for us, making this essentially unsafe. Don’t worry, later when we pull the corresponding CQE off the completion queue we do the following in order to release the memory:

 
unsafe {
    let _ = Box::from_raw(buffer);
}    

The above code takes that same pointer and wraps it such that Rust will mark it to be cleaned up once it goes out of scope.

The last thing in the add_receive function is the creation of the entry itself:

 
fn add_receive(&mut self, fd: RawFd) -> io::Result<()> {
    
    self.ring
        .create_entry()
        .set_receive(fd, buffer as *mut u8, BUFFER_SIZE, 0, user_data);
}

Here, we first create an empty entry, and then we pass everything that is needed for a receive on to set_receive, which looks like this:

 
pub fn set_receive(
    &mut self,
    fd: RawFd,
    buf: *mut u8,
    len: usize,
    flags: i32,
    user_data: u64,
) {
    let sqe = unsafe { io_uring_get_sqe(self.ring) };
    if !sqe.is_null() {
        unsafe {
            io_uring_prep_recv(sqe, fd, buf as *mut _, len, flags);
            (*sqe).user_data = user_data;
        }
    }
}    

This is the unsafe code that is assigning our information to the shared memory which sits between the user and kernel space. What we’re doing here is:

  • Getting a pointer to a submission queue entry.
  • Using a prep function to fill that entry.
  • Writing our to user_data, aka the unique id.

For each of our operations we follow same pattern, though with different parameters. Sometimes we exclude things like the buffer in the case of using an accept. As mentioned earlier, there are many prep functions which help us create SQEs.

Handling completions

After an operation has been completed by the OS, we’ll be able to get it off the completion queue. When that happens it will be our handle_completion function which determines what we do with it:

 
fn handle_completion(&mut self, cqe: io_uring_cqe) -> io::Result<()> {
    let user_data = cqe.user_data;
    let res = cqe.res; // This indicates the succces or failure or the operation.

    if let Some(op_data) = self.operations.remove(&user_data) {
        match op_data.op {
            Operation::Accept => self.handle_accept(res)?,
            Operation::Receive(buffer) => self.handle_receive(res, buffer, op_data.fd)?,
            Operation::Send(buffer) => self.handle_send(res, buffer, op_data.fd)?,
        }
    }

    Ok(())
}   

You’ll notice that for Send and Receive we pull out the buffer pointer that we created upon submission. This gets passed to the respective handler along with the file descriptor (fd) saved in the hashmap.

Because we have three operations (Accept, Send, Receive), we have three respective handlers. In order for the echo server to work as expected, these do what their namesake suggests as well as the following:

  • Every accept queues another receive.
  • Every send will queue another receive.
  • Every receive will queue another send using the received data.

The first two points are how we keep the file descriptor open and hence the connection open. Queueing a receive on accepting the original connection is saying, “hold the file descriptor open, we’re expecting more information.” Equally, upon the server sending a message, it queues another receive so that the user can continue to send more messages. The last point is only there because this is an echo server. Whenver it receives a message it has to send that back, hence the send message.

So, what do these handlers look like? Let’s look at the code for the handle_accept function:

 
fn handle_accept(&mut self, res: i32) -> io::Result<()> {
    if res >= 0 {
        println!("Accepted new connection: {}", res);
        self.add_receive(res)?;
    } else if res == -(EAGAIN as i32) {
        println!("No new connection available");
    } else {
        eprintln!("Accept failed with error: {}", -res);
    }

    self.add_accept()
}    

Note that it takes in the result we pulled off the completion queue, and based on whether it’s successful or not, we’ll add a receive. In all cases, though, we add another accept. This is saying that we’re ready to accept more incoming connections from other users, as mentioned above. The other handlers are similar, though some will deal with the buffer, such as our handle_receive, which which looks like:

 
fn handle_receive(&mut self, res: i32, buffer: *mut u8, fd: RawFd) -> io::Result<()> {
    if res > 0 {
        let slice = unsafe { std::slice::from_raw_parts(buffer, res as usize) };
        let text = String::from_utf8_lossy(slice);
        println!("Read {} bytes: {}", res, text);

        self.add_send(fd, buffer, res as usize)?;
    } else if res == 0 {
        println!("Connection closed");
        unsafe {
            let _ = Box::from_raw(buffer);
        }
    } else {
        eprintln!("Read failed with error: {}", -res);
        unsafe {
            let _ = Box::from_raw(buffer);
        }
    }

    Ok(())
}

You’ll notice here the Box::from_raw(buffer) code that was mentioned earlier, and which is used to release the pointer upon going out of scope (ie. at the end of this function). Additionallly, you can see if the completion is successful (res > 0), we pull out the data from the buffer that io_uring read in from the file descriptor. The code that follows is just to make it readable.

Wrapping it up

A big picture view of all this is a polling loop which checks an async queue for I/O operations run by the operating system. The server itself doesn’t not block waiting for messages, though there’s a slight pause to queue or dequeue something, which is normal.

While the code is far more than what one would do when working with high-level async, it’s actually not too complex once you break it down. The flow of information is relatively straight forward. Where we go from here, though, is towards that high-level view, and to do that we need to build an async runtime that can handle our WebSocket connections.