aboutsummaryrefslogtreecommitdiffstats
path: root/lib/fuse_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/fuse_uring.c')
-rw-r--r--lib/fuse_uring.c548
1 files changed, 548 insertions, 0 deletions
diff --git a/lib/fuse_uring.c b/lib/fuse_uring.c
new file mode 100644
index 0000000..ce56716
--- /dev/null
+++ b/lib/fuse_uring.c
@@ -0,0 +1,548 @@
+/*
+ * FUSE: Filesystem in Userspace
+ * Copyright (C) 2025 Bernd Schubert <bschubert@ddn.com>
+ *
+ * Implementation of (most of) the low-level FUSE API. The session loop
+ * functions are implemented in separate files.
+ *
+ * This program can be distributed under the terms of the GNU LGPLv2.
+ * See the file COPYING.LIB
+ */
+
+#define _GNU_SOURCE
+
+#include "fuse_i.h"
+#include "fuse_kernel.h"
+#include "fuse_uring_i.h"
+
+#include <stdlib.h>
+#include <liburing.h>
+#include <sys/sysinfo.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <string.h>
+#include <unistd.h>
+#include <numa.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <linux/sched.h>
+#include <poll.h>
+#include <sys/eventfd.h>
+
+/* Size of command data area in SQE when IORING_SETUP_SQE128 is used */
+#define FUSE_URING_MAX_SQE128_CMD_DATA 80
+
+struct fuse_ring_ent {
+ struct fuse_ring_queue *ring_queue; /* back pointer */
+ struct fuse_req req;
+
+ struct fuse_uring_req_header *req_header;
+ void *op_payload;
+ size_t req_payload_sz;
+
+ /* commit id of a fuse request */
+ uint64_t req_commit_id;
+
+ /* header and payload */
+ struct iovec iov[2];
+};
+
+struct fuse_ring_queue {
+ /* back pointer */
+ struct fuse_ring_pool *ring_pool;
+ int qid;
+ int numa_node;
+ pthread_t tid;
+ int eventfd;
+ size_t req_header_sz;
+ struct io_uring ring;
+
+ /* size depends on queue depth */
+ struct fuse_ring_ent ent[];
+};
+
+/**
+ * Main fuse_ring structure, holds all fuse-ring data
+ */
+struct fuse_ring_pool {
+ struct fuse_session *se;
+
+ /* number of queues */
+ size_t nr_queues;
+
+ /* number of per queue entries */
+ size_t queue_depth;
+
+ /* max payload size for fuse requests*/
+ size_t max_req_payload_sz;
+
+ /* size of a single queue */
+ size_t queue_mem_size;
+
+ /* pointer to the first queue */
+ struct fuse_ring_queue *queues;
+};
+
+static size_t
+fuse_ring_queue_size(const size_t q_depth)
+{
+ const size_t req_size = sizeof(struct fuse_ring_ent) * q_depth;
+
+ return sizeof(struct fuse_ring_queue) + req_size;
+}
+
+static struct fuse_ring_queue *
+fuse_uring_get_queue(struct fuse_ring_pool *fuse_ring, int qid)
+{
+ void *ptr =
+ ((char *)fuse_ring->queues) + (qid * fuse_ring->queue_mem_size);
+
+ return ptr;
+}
+
+/**
+ * return a pointer to the 80B area
+ */
+static void *fuse_uring_get_sqe_cmd(struct io_uring_sqe *sqe)
+{
+ return (void *)&sqe->cmd[0];
+}
+
+static void fuse_uring_sqe_set_req_data(struct fuse_uring_cmd_req *req,
+ const unsigned int qid,
+ const unsigned int commit_id)
+{
+ req->qid = qid;
+ req->commit_id = commit_id;
+ req->flags = 0;
+}
+
+static void
+fuse_uring_sqe_prepare(struct io_uring_sqe *sqe, struct fuse_ring_ent *req,
+ __u32 cmd_op)
+{
+ /* These fields should be written once, never change */
+ sqe->opcode = IORING_OP_URING_CMD;
+
+ /*
+ * IOSQE_FIXED_FILE: fd is the index to the fd *array*
+ * given to io_uring_register_files()
+ */
+ sqe->flags = IOSQE_FIXED_FILE;
+ sqe->fd = 0;
+
+ sqe->rw_flags = 0;
+ sqe->ioprio = 0;
+ sqe->off = 0;
+
+ io_uring_sqe_set_data(sqe, req);
+
+ sqe->cmd_op = cmd_op;
+ sqe->__pad1 = 0;
+}
+
+static int fuse_queue_setup_io_uring(struct io_uring *ring, size_t qid,
+ size_t depth, int fd, int evfd)
+{
+ int rc;
+ struct io_uring_params params = {0};
+ int files[2] = { fd, evfd };
+
+ depth += 1; /* for the eventfd poll SQE */
+
+ params.flags = IORING_SETUP_SQE128;
+
+ /* Avoid cq overflow */
+ params.flags |= IORING_SETUP_CQSIZE;
+ params.cq_entries = depth * 2;
+
+ /* These flags should help to increase performance, but actually
+ * make it a bit slower - reason should get investigated.
+ */
+ if (0) {
+ /* Has the main slow down effect */
+ params.flags |= IORING_SETUP_SINGLE_ISSUER;
+
+ // params.flags |= IORING_SETUP_DEFER_TASKRUN;
+ params.flags |= IORING_SETUP_TASKRUN_FLAG;
+
+ /* Second main effect to make it slower */
+ params.flags |= IORING_SETUP_COOP_TASKRUN;
+ }
+
+ rc = io_uring_queue_init_params(depth, ring, &params);
+ if (rc != 0) {
+ fuse_log(FUSE_LOG_ERR, "Failed to setup qid %zu: %d (%s)\n",
+ qid, rc, strerror(-rc));
+ return rc;
+ }
+
+ rc = io_uring_register_files(ring, files, 1);
+ if (rc != 0) {
+ rc = -errno;
+ fuse_log(FUSE_LOG_ERR,
+ "Failed to register files for ring idx %zu: %s",
+ qid, strerror(errno));
+ return rc;
+ }
+
+ return 0;
+}
+
+static void fuse_session_destruct_uring(struct fuse_ring_pool *fuse_ring)
+{
+ for (size_t qid = 0; qid < fuse_ring->nr_queues; qid++) {
+ struct fuse_ring_queue *queue =
+ fuse_uring_get_queue(fuse_ring, qid);
+
+ if (queue->tid != 0) {
+ int value = 1;
+
+ write(queue->eventfd, &value, sizeof(value));
+ pthread_join(queue->tid, NULL);
+ queue->tid = 0;
+ }
+
+ if (queue->eventfd >= 0) {
+ close(queue->eventfd);
+ queue->eventfd = -1;
+ }
+
+ if (queue->ring.ring_fd != -1)
+ io_uring_queue_exit(&queue->ring);
+
+ for (size_t idx = 0; idx < fuse_ring->queue_depth; idx++) {
+ struct fuse_ring_ent *ent = &queue->ent[idx];
+
+ numa_free(ent->op_payload, ent->req_payload_sz);
+ numa_free(ent->req_header, queue->req_header_sz);
+ }
+ }
+
+ free(fuse_ring->queues);
+ free(fuse_ring);
+}
+
+static int fuse_uring_prepare_fetch_sqes(struct fuse_ring_queue *queue)
+{
+ struct fuse_ring_pool *ring_pool = queue->ring_pool;
+ unsigned int sq_ready;
+ struct io_uring_sqe *sqe;
+
+ for (size_t idx = 0; idx < ring_pool->queue_depth; idx++) {
+ struct fuse_ring_ent *ent = &queue->ent[idx];
+
+ sqe = io_uring_get_sqe(&queue->ring);
+ if (sqe == NULL) {
+ /* All SQEs are idle here - no good reason this
+ * could fail
+ */
+
+ fuse_log(FUSE_LOG_ERR, "Failed to get all ring SQEs");
+ return -EIO;
+ }
+
+ fuse_uring_sqe_prepare(sqe, ent, FUSE_IO_URING_CMD_REGISTER);
+
+ /* only needed for fetch */
+ ent->iov[0].iov_base = ent->req_header;
+ ent->iov[0].iov_len = queue->req_header_sz;
+
+ ent->iov[1].iov_base = ent->op_payload;
+ ent->iov[1].iov_len = ent->req_payload_sz;
+
+ sqe->addr = (uint64_t)(ent->iov);
+ sqe->len = 2;
+
+ /* this is a fetch, kernel does not read commit id */
+ fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe),
+ queue->qid, 0);
+ }
+
+ sq_ready = io_uring_sq_ready(&queue->ring);
+ if (sq_ready != ring_pool->queue_depth) {
+ fuse_log(FUSE_LOG_ERR,
+ "SQE ready mismatch, expected %d got %d\n",
+ ring_pool->queue_depth, sq_ready);
+ return -EINVAL;
+ }
+
+ // Add the poll SQE for the eventfd to wake up on teardown
+ sqe = io_uring_get_sqe(&queue->ring);
+ if (sqe == NULL) {
+ fuse_log(FUSE_LOG_ERR, "Failed to get eventfd SQE");
+ return -EIO;
+ }
+
+ io_uring_prep_poll_add(sqe, queue->eventfd, POLLIN);
+ io_uring_sqe_set_data(sqe, (void *)(uintptr_t)queue->eventfd);
+
+ io_uring_submit(&queue->ring);
+
+ return 0;
+}
+
+static struct fuse_ring_pool *fuse_create_ring(struct fuse_session *se)
+{
+ struct fuse_ring_pool *fuse_ring = NULL;
+ const size_t nr_queues = get_nprocs_conf();
+ size_t payload_sz = se->bufsize - FUSE_BUFFER_HEADER_SIZE;
+ size_t queue_sz;
+
+ if (se->debug)
+ fuse_log(FUSE_LOG_DEBUG, "starting io-uring q-depth=%d\n",
+ se->uring.q_depth);
+
+ fuse_ring = calloc(1, sizeof(*fuse_ring));
+ if (fuse_ring == NULL) {
+ fuse_log(FUSE_LOG_ERR, "Allocating the ring failed\n");
+ goto err;
+ }
+
+ queue_sz = fuse_ring_queue_size(se->uring.q_depth);
+ fuse_ring->queues = calloc(1, queue_sz * nr_queues);
+ if (fuse_ring->queues == NULL) {
+ fuse_log(FUSE_LOG_ERR, "Allocating the queues failed\n");
+ goto err;
+ }
+
+ fuse_ring->se = se;
+ fuse_ring->nr_queues = nr_queues;
+ fuse_ring->queue_depth = se->uring.q_depth;
+ fuse_ring->max_req_payload_sz = payload_sz;
+ fuse_ring->queue_mem_size = queue_sz;
+
+ /*
+ * very basic queue initialization, that cannot fail and will
+ * allow easy cleanup if something (like mmap) fails in the middle
+ * below
+ */
+ for (size_t qid = 0; qid < nr_queues; qid++) {
+ struct fuse_ring_queue *queue =
+ fuse_uring_get_queue(fuse_ring, qid);
+
+ queue->ring.ring_fd = -1;
+ queue->numa_node = numa_node_of_cpu(qid);
+ queue->qid = qid;
+ queue->ring_pool = fuse_ring;
+ }
+
+ return fuse_ring;
+
+err:
+ if (fuse_ring)
+ fuse_session_destruct_uring(fuse_ring);
+
+ return NULL;
+}
+
+/* stub function */
+static int fuse_uring_queue_handle_cqes(struct fuse_ring_queue *queue)
+{
+ (void)queue;
+
+ return 0;
+}
+
+/**
+ * In per-core-queue configuration we have thread per core - the thread
+ * to that core
+ */
+static void fuse_uring_set_thread_core(int qid)
+{
+ cpu_set_t mask;
+ int rc;
+
+ CPU_ZERO(&mask);
+ CPU_SET(qid, &mask);
+ rc = sched_setaffinity(0, sizeof(cpu_set_t), &mask);
+ if (rc != 0)
+ fuse_log(FUSE_LOG_ERR, "Failed to bind qid=%d to its core: %s\n",
+ qid, strerror(errno));
+
+ if (0) {
+ const int policy = SCHED_IDLE;
+ const struct sched_param param = {
+ .sched_priority = sched_get_priority_min(policy),
+ };
+
+ /* Set the lowest possible priority, so that the application
+ * submitting requests is not moved away from the current core.
+ */
+ rc = sched_setscheduler(0, policy, &param);
+ if (rc != 0)
+ fuse_log(FUSE_LOG_ERR, "Failed to set scheduler: %s\n",
+ strerror(errno));
+ }
+}
+
+/*
+ * @return negative error code or io-uring file descriptor
+ */
+static int fuse_uring_init_queue(struct fuse_ring_queue *queue)
+{
+ struct fuse_ring_pool *ring = queue->ring_pool;
+ struct fuse_session *se = ring->se;
+ int res;
+ size_t page_sz = sysconf(_SC_PAGESIZE);
+
+ queue->eventfd = eventfd(0, EFD_CLOEXEC);
+ if (queue->eventfd < 0) {
+ res = -errno;
+ fuse_log(FUSE_LOG_ERR,
+ "Failed to create eventfd for qid %d: %s\n",
+ queue->qid, strerror(errno));
+ return res;
+ }
+
+ res = fuse_queue_setup_io_uring(&queue->ring, queue->qid,
+ ring->queue_depth, se->fd,
+ queue->eventfd);
+ if (res != 0) {
+ fuse_log(FUSE_LOG_ERR, "qid=%d io_uring init failed\n",
+ queue->qid);
+ goto err;
+ }
+
+ queue->req_header_sz = ROUND_UP(sizeof(struct fuse_ring_ent),
+ page_sz);
+
+ for (size_t idx = 0; idx < ring->queue_depth; idx++) {
+ struct fuse_ring_ent *ring_ent = &queue->ent[idx];
+ struct fuse_req *req = &ring_ent->req;
+
+ ring_ent->ring_queue = queue;
+
+ /*
+ * Also allocate the header to have it page aligned, which
+ * is a requirement for page pinning
+ */
+ ring_ent->req_header =
+ numa_alloc_local(queue->req_header_sz);
+ ring_ent->req_payload_sz = ring->max_req_payload_sz;
+
+ ring_ent->op_payload =
+ numa_alloc_local(ring_ent->req_payload_sz);
+
+ req->se = se;
+ pthread_mutex_init(&req->lock, NULL);
+ req->is_uring = true;
+ req->ref_cnt = 1;
+ }
+
+ res = fuse_uring_prepare_fetch_sqes(queue);
+ if (res != 0) {
+ fuse_log(
+ FUSE_LOG_ERR,
+ "Grave fuse-uring error on preparing SQEs, aborting\n");
+ se->error = -EIO;
+ fuse_session_exit(se);
+ }
+
+ return queue->ring.ring_fd;
+
+err:
+ close(queue->eventfd);
+ return res;
+}
+
+static void *fuse_uring_thread(void *arg)
+{
+ struct fuse_ring_queue *queue = arg;
+ struct fuse_ring_pool *ring_pool = queue->ring_pool;
+ struct fuse_session *se = ring_pool->se;
+ int err;
+ char thread_name[16] = { 0 };
+
+ snprintf(thread_name, 16, "fuse-ring-%d", queue->qid);
+ thread_name[15] = '\0';
+ fuse_set_thread_name(pthread_self(), thread_name);
+
+ fuse_uring_set_thread_core(queue->qid);
+
+ err = fuse_uring_init_queue(queue);
+ if (err < 0) {
+ fuse_log(FUSE_LOG_ERR, "qid=%d queue setup failed\n",
+ queue->qid);
+ goto err;
+ }
+
+ /* Not using fuse_session_exited(se), as that cannot be inlined */
+ while (!atomic_load_explicit(&se->mt_exited, memory_order_relaxed)) {
+ io_uring_submit_and_wait(&queue->ring, 1);
+
+ err = fuse_uring_queue_handle_cqes(queue);
+ if (err < 0) {
+ /*
+ * fuse-over-io-uring is not supported, operation can
+ * continue over /dev/fuse
+ */
+ if (err == -EOPNOTSUPP)
+ goto ret;
+ goto err;
+ }
+ }
+
+ return NULL;
+
+err:
+ fuse_session_exit(se);
+ret:
+ return NULL;
+}
+
+static int fuse_uring_start_ring_threads(struct fuse_ring_pool *ring)
+{
+ int rc = 0;
+
+ for (size_t qid = 0; qid < ring->nr_queues; qid++) {
+ struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+ rc = pthread_create(&queue->tid, NULL, fuse_uring_thread, queue);
+ if (rc != 0)
+ break;
+ }
+
+ return rc;
+}
+
+static int fuse_uring_sanity_check(void)
+{
+ _Static_assert(sizeof(struct fuse_uring_cmd_req) <=
+ FUSE_URING_MAX_SQE128_CMD_DATA,
+ "SQE128_CMD_DATA has 80B cmd data");
+
+ return 0;
+}
+
+int fuse_uring_start(struct fuse_session *se)
+{
+ int err = 0;
+ struct fuse_ring_pool *fuse_ring;
+
+ fuse_uring_sanity_check();
+
+ fuse_ring = fuse_create_ring(se);
+ if (fuse_ring == NULL) {
+ err = -EADDRNOTAVAIL;
+ goto out;
+ }
+
+ se->uring.pool = fuse_ring;
+
+ err = fuse_uring_start_ring_threads(fuse_ring);
+out:
+ return err;
+}
+
+int fuse_uring_stop(struct fuse_session *se)
+{
+ struct fuse_ring_pool *ring = se->uring.pool;
+
+ if (ring == NULL)
+ return 0;
+
+ fuse_session_destruct_uring(ring);
+
+ return 0;
+}