Watching Linux Processes in rust/tokio async

preface

As playing around with rust on linux, I decided to transfer the [async python process reader]https://sebsch.dev/posts/2021-watch-linux-processes-async/ into a rust / tokio codebase. I changed the architecture a bit, to clarify the pattern.

differences to the previous architecture

no pub/sub sync channel

Although it is often a good idea to decouple elements of the code, the producer/consumer on the former example did not make too much sense. Since we just consume the massages and immediately dispatch the futures via fire-and-forget it just brings a lot of complexity including a sync-channel.

logging as part of the object “Process”

This time I decided to add the logic for reading the logs directly into the struct Process.

The code

The full example can be found here

Process

One process-object holds his pid, the path to the file-descriptor for stdout and the path to the executable. To be created it becomes the pit as usize. The path to fd/1 and the result to the link from exe is calculated from here. This way it is enough to just write let p = Process::new($pid) to spawn a process object.

#[derive(Debug, Clone)]
struct Process {
    pid: usize,
    std_out_path: PathBuf,
    exe: String,
}

impl Process {
    fn new(pid: usize) -> Self {
        let std_out_path = Path::new("/proc")
            .join(format!("{}", &pid))
            .join("fd/1");

        fn get_binary_link(pid: &usize) -> String {
            let bin_link = Path::new("/proc")
                .join(format!("{}", &pid))
                .join("exe");
            match fs::read_link(bin_link) {
                Err(_) => String::from("None"),
                Ok(x) => x.into_os_string().into_string().unwrap()
            }
        }

        let exe: String = get_binary_link(&pid);

        Process { pid, std_out_path, exe }
    }

    async fn follow_logs(&self) {
        let mut pos: usize = 0;
        while self.std_out_path.exists() {
            sleep(Duration::from_millis(300)).await;
            let mut contents = String::new();
            let mut file = match File::open(&self.std_out_path).await {
                Ok(f) => f,
                _ => continue,
            };
            match file.read_to_string(&mut contents).await {
                Ok(_) => {}
                _ => continue,
            };
            for (i, line) in contents.lines().enumerate() {
                if i <= pos { continue; }

                info!("[{}] {}: {}", self.pid, self.exe, line);
                pos += 1;
            }
        }
    }
}

the function follow_logs is constantly reading all lines from fd/1 into a string-buffer for the logging. To remember the line, the function holds pos: usize which is incremented on every printed log line. If something goes south while reading from fd/1 we just silently continue.

watching the processes

The function extract_processes does almost exact the same as the previous example. It crawls all directories in /proc, checks if the name is a number (PID), checks the user and checks via the parameter known if the PID is already known.

async fn extract_processes(known: Vec<usize>) -> Vec<Process> {
    let mut procs = Vec::new();
    let mut dir = tokio::fs::read_dir("/proc").await.unwrap();
    let uid = fs::metadata("/proc/self").unwrap().st_uid();

    while let Some(proc_dir) = dir.next_entry().await.unwrap() {
        let pid = proc_dir.file_name().into_string().unwrap();
        let pid = match pid.parse::<usize>() {
            Ok(pid) => pid,
            _ => continue,
        };
        if known.contains(&pid) {
            continue;
        }
        match fs::metadata(proc_dir.path()) {
            Ok(m) if m.st_uid() == uid => m,
            _ => continue
        };
        procs.push(Process::new(pid))
    }
    procs
}


async fn watch_processes() {
    let mut hist: Vec<usize> = Vec::new();
    loop {
        for process in extract_processes(hist.clone()).await {
            let pid = process.pid.clone();
            tokio::spawn(async move {
                process.follow_logs().await;
            });
            hist.push(pid);
        }
        sleep(Duration::from_millis(5000)).await;
    }
}

The watch_processes is slightly different as the example before. As described above, we god rid of the puc/sub model. Everything is now handled by this one function.

We first create a Vector for the history, this will be the parameter known in the extrator. We constantly loop and use the function extract_processes() to give us a list of new spawned processes. If there is a new one, we copy the pid, and use tokio::spawn to create a concurrent async task. Here we just call the process.follow_logs() to constantly print out the logs.

conclusion

Even it is a bit silly, the example is really fun to work on. The python example had no problems to hold all these open files and was really fast, but having everything in rust makes the program (as expected) a lot quicker and less resource consuming. Tokio did a fantastic job to handle the futures like they would be threads, so they can be easily distributed between several threads and cores by the os. When the sleep commands are removed it becomes clear the program is (at least with my configuration) evenly distributed on all my cores.

Getting rid of the producer/consumer totally makes sence. As long there is no error handling on the futures there is no need in to separating the dispatcher.