Watching Linux Processes in python async

preface

In the last time I’m tinkering a bit with asyncio and Linux processes. During this I tried to combine these by writing a prgramm to watch all processes of the current user and log all process outputs into one shell.

Note: There will be a lot of file openings and saving of logs in memory. I didn’t optimize the code a lot, so keep an eye on your memory if you decide to run the examples!

Reading Linux processes

Since on unix everything is a file, also processes can be found in the file tree. To see what process PID 62122 does, we can look into the following directory:

❯ ll /proc/62122/
.r--r--r-- 0 sebastian 28 Mar 16:34 arch_status
dr-xr-xr-x - sebastian 28 Mar 16:34 attr
.rw-r--r-- 0 sebastian 28 Mar 16:34 autogroup
.r-------- 0 sebastian 28 Mar 16:34 auxv
.r--r--r-- 0 sebastian 28 Mar 16:34 cgroup
.-w------- 0 sebastian 28 Mar 16:34 clear_refs
.r--r--r-- 0 sebastian 28 Mar 16:34 cmdline
.rw-r--r-- 0 sebastian 28 Mar 16:34 comm
.rw-r--r-- 0 sebastian 28 Mar 16:34 coredump_filter
.r--r--r-- 0 sebastian 28 Mar 16:34 cpu_resctrl_groups
.r--r--r-- 0 sebastian 28 Mar 16:34 cpuset
lrwxrwxrwx 0 sebastian 28 Mar 16:34 cwd -> /home/sebastian
.r-------- 0 sebastian 28 Mar 16:34 environ
lrwxrwxrwx 0 sebastian 28 Mar 16:24 exe -> /usr/bin/kdeinit5
dr-x------ - sebastian 28 Mar 16:34 fd
...

Interesting for us would be the binary for this process and the output. To find out to whom the process belongs we can just look into the link exec:

❯ readlink /proc/62122/exe
/usr/bin/kdeinit5

To find the output of the process we can have a look into the file-descriptors at /proc/PID/fd. The Manpage describes the directory:

/proc/[pid]/fd/ This is a subdirectory containing one entry for each file which the process has open, named by its file descriptor, and which is a symbolic link to the actual file. Thus, 0 is standard input, 1 standard output, 2 standard error, and so on.

For file descriptors for pipes and sockets, the entries will be symbolic links whose content is the file type with the inode. A readlink(2) call on this file returns a string in the format:

❯ ll /proc/62122/fd/
lr-x------  64 sebastian 28 Mar 16:46 0 -> pipe:[36636]
l-wx------  64 sebastian 28 Mar 16:43 1 -> /home/sebastian/.local/share/sddm/xorg-session.log
l-wx------  64 sebastian 28 Mar 16:46 2 -> /home/sebastian/.local/share/sddm/xorg-session.log
lrwx------  64 sebastian 28 Mar 16:46 3 -> anon_inode:[eventfd]
lrwx------@ 64 sebastian 28 Mar 16:46 4 -> socket:[760078]
lr-x------  64 sebastian 28 Mar 16:46 5 -> /home/sebastian/.local/share/baloo/index
lrwx------  64 sebastian 28 Mar 16:46 6 -> /home/sebastian/.local/share/baloo/index-lock

Since we want to have a look at the direct output of the process we can just look into /proc/PID/fd/1 and ignore the rest.

sudo tail /proc/62122/fd/1
2021-03-28 16:21:22,323 [ 141147]   WARN - com.intellij.ide.IdeEventQueue - Too many mouse clicks (15)!!! 
kdeinit5: Got EXEC_NEW '/usr/lib/qt/plugins/kf5/kio/tags.so' from launcher.
kdeinit5: preparing to launch '/usr/lib/qt/plugins/kf5/kio/tags.so'
kdeinit5: Got EXEC_NEW '/usr/lib/qt/plugins/kf5/kio/tags.so' from launcher.
kdeinit5: preparing to launch '/usr/lib/qt/plugins/kf5/kio/tags.so'
2021-03-28 16:33:40,428 [ 879252]   WARN - com.intellij.util.xmlb.Binding - no accessors for org.jetbrains.idea.perforce.perforce.ConnectionId 
2021-03-28 16:43:21,181 [1460005]  ERROR - terminal.emulator.JediEmulator - Error processing OSC 1;..ocess-watcher 
2021-03-28 16:43:21,204 [1460028]  ERROR - terminal.emulator.JediEmulator - Unhandled Control sequence
parsed                        :ESC[?2004h
bytes read                    :ESC[

Tadaa! We captured the output of the kdeinit5 process PID 62122. Now the only thing we have to do is to process these commands for every process of the current user in async python.

The code

The full example can be found here

Async model

The basic model of this piece of software is a simple producer/consumer model

[watch processes]  -- Queue(new process) --> [operate on this process]  -- (create task) --> [log lines]
  1. we are constantly checking if we have new processes
    • if yes, we send the information to a waiting consumer via asyncio.Queue
  2. the consumer awaits processes and creates one background task for every
  3. the task reads constantly all lines on fd/1 and logs them to the current shell, if the directory disappears, the process was killed this means the task terminates too

Collect process data

To collect the data we create a dataclass. Here we store the pid, some paths (to reduce the usage of the noisy os.path.join()) and a field for the position of the already processes log lines. There is also a method to check if the process is still alive, this method just checks if the directory can be found in the file tree, if not the process is gone.

@dataclass
class Process:
    pid: str
    proc_path: str
    pid_path: str
    fd_stdout_path: str
    out_pos = 0

    def alive(self):
        return os.path.exists(self.pid_path)

def get_processes(known_processes: list[str]) -> Generator[Process]:
    for pid in os.listdir('/proc'):
        if (
                pid.isdigit()
                and pid not in known_processes
                and os.stat(pid_path := os.path.join('/proc', pid)).st_uid == os.getuid()
                and os.path.exists(exec_path := os.path.join('/proc', pid, 'exe'))
        ):
            yield Process(
                pid=pid,
                proc_path=os.readlink(exec_path),
                pid_path=pid_path,
                fd_stdout_path=os.path.join(pid_path, 'fd', '1'),
            )

To extract the information we just iterate through all dirs in /proc; see if is a digit (so it is a pid), check the owner, and if there is a binary bound to the process. This function gets a list of all already known pids, they will be skipped.

When all of these things are fulfilled, we create a Process-object and yield this to the caller.

Producer-Consumer

This uses the example from asyncio to create a simple consumer/producer model using the asyncio.Queue.

async def create_watcher_tasks(queue: asyncio.Queue[Process]):
    """consumer"""
    loop = asyncio.get_event_loop()
    while process := await queue.get():
        loop.create_task(stdout_logging(process))

async def monitor_proc(queue: asyncio.Queue[Process]):
    """producer"""
    known_processes: list[str] = []
    while True:
        for process in get_processes(known_processes):
            known_processes.append(process.pid)
            await queue.put(process)
        await asyncio.sleep(1)

monitor_proc constantly checks for new processes, if there is a new one, it’s pid is stored in the list known_processes+ and transfered to the get_processes to avoid duplicated operations. All new processes produced into the queue.

On the other side of the queue the function create_watcher_tasks is consuming. If there is a message arriving, a new background task stdout_logging is created to operate on the stdout logs. This means every process has its own task or future.

File handling / logging

async def open_file(filepath: str) -> list[str]:
    try:
        async with AIOFile(filepath, mode='r') as file_handler:
            lines = await file_handler.read()
            return lines.split('\n')
    except OSError:
        return []


async def stdout_logging(process: Process):
    logging.info(f"reading output for {process}")
    while process.alive():
        lines_std = (await open_file(process.fd_stdout_path))[process.out_pos:]
        process.out_pos += len(lines_std)
        for line in lines_std:
            process.out_pos += 1
            logging.info(f"{process.proc_path}: {line}")
        await asyncio.sleep(0.3)
    logging.info(f"stop reading output for {process}")

The function stdout_logging receives a process-object and constantly checks for changes in the fd/1 file handler. To memorize which lines are already processed it uses the field process.out_pos which is incremeted on every log-line. To operate on the logs it uses the async function open_file to extract the content of the process output.

conclusion

Linux makes it really easy to capture processes outputs, since everything is a file we just have to open the file fd/1 to get the content of the process_out.

Building a simple producer/consumer model in async python seems to be a perfect fit to work on a large number of operations like here. Even without having any (really needed) optimisations this program is running stable and doesn’t uses to much of the resources.