aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/fuse_i.h18
-rw-r--r--lib/fuse_loop.c6
-rw-r--r--lib/fuse_uring.c548
-rw-r--r--lib/fuse_uring_i.h18
-rw-r--r--lib/meson.build8
-rw-r--r--meson.build26
-rw-r--r--meson_options.txt3
-rwxr-xr-xtest/ci-build.sh6
8 files changed, 630 insertions, 3 deletions
diff --git a/lib/fuse_i.h b/lib/fuse_i.h
index 89a5c6f..b643e90 100644
--- a/lib/fuse_i.h
+++ b/lib/fuse_i.h
@@ -6,6 +6,9 @@
See the file COPYING.LIB
*/
+#ifndef LIB_FUSE_I_H_
+#define LIB_FUSE_I_H_
+
#include "fuse.h"
#include "fuse_lowlevel.h"
#include "util.h"
@@ -24,6 +27,7 @@
})
struct mount_opts;
+struct fuse_ring_pool;
struct fuse_req {
struct fuse_session *se;
@@ -34,6 +38,7 @@ struct fuse_req {
struct fuse_chan *ch;
int interrupted;
unsigned int ioctl_64bit : 1;
+ unsigned int is_uring : 1;
union {
struct {
uint64_t unique;
@@ -55,6 +60,11 @@ struct fuse_notify_req {
struct fuse_notify_req *prev;
};
+struct fuse_session_uring {
+ unsigned int q_depth;
+ struct fuse_ring_pool *pool;
+};
+
struct fuse_session {
char *mountpoint;
int fd;
@@ -79,7 +89,8 @@ struct fuse_session {
_Atomic size_t bufsize;
int error;
- /* This is useful if any kind of ABI incompatibility is found at
+ /*
+ * This is useful if any kind of ABI incompatibility is found at
* a later version, to 'fix' it at run time.
*/
struct libfuse_version version;
@@ -91,6 +102,9 @@ struct fuse_session {
/* true if reading requests from /dev/fuse are handled internally */
bool buf_reallocable;
+
+ /* io_uring */
+ struct fuse_session_uring uring;
};
struct fuse_chan {
@@ -269,3 +283,5 @@ static inline int convert_to_conn_want_ext(struct fuse_conn_info *conn,
return 0;
}
+
+#endif /* LIB_FUSE_I_H_*/
diff --git a/lib/fuse_loop.c b/lib/fuse_loop.c
index 410f43f..a27111c 100644
--- a/lib/fuse_loop.c
+++ b/lib/fuse_loop.c
@@ -11,7 +11,7 @@
#include "fuse_config.h"
#include "fuse_lowlevel.h"
#include "fuse_i.h"
-
+#include "fuse_uring_i.h"
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
@@ -41,6 +41,8 @@ int fuse_session_loop(struct fuse_session *se)
res = 0;
if(se->error != 0)
res = se->error;
- fuse_session_reset(se);
+
+ if (se->uring.pool)
+ fuse_uring_stop(se);
return res;
}
diff --git a/lib/fuse_uring.c b/lib/fuse_uring.c
new file mode 100644
index 0000000..ce56716
--- /dev/null
+++ b/lib/fuse_uring.c
@@ -0,0 +1,548 @@
+/*
+ * FUSE: Filesystem in Userspace
+ * Copyright (C) 2025 Bernd Schubert <bschubert@ddn.com>
+ *
+ * Implementation of (most of) the low-level FUSE API. The session loop
+ * functions are implemented in separate files.
+ *
+ * This program can be distributed under the terms of the GNU LGPLv2.
+ * See the file COPYING.LIB
+ */
+
+#define _GNU_SOURCE
+
+#include "fuse_i.h"
+#include "fuse_kernel.h"
+#include "fuse_uring_i.h"
+
+#include <stdlib.h>
+#include <liburing.h>
+#include <sys/sysinfo.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <string.h>
+#include <unistd.h>
+#include <numa.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <linux/sched.h>
+#include <poll.h>
+#include <sys/eventfd.h>
+
+/* Size of command data area in SQE when IORING_SETUP_SQE128 is used */
+#define FUSE_URING_MAX_SQE128_CMD_DATA 80
+
+struct fuse_ring_ent {
+ struct fuse_ring_queue *ring_queue; /* back pointer */
+ struct fuse_req req;
+
+ struct fuse_uring_req_header *req_header;
+ void *op_payload;
+ size_t req_payload_sz;
+
+ /* commit id of a fuse request */
+ uint64_t req_commit_id;
+
+ /* header and payload */
+ struct iovec iov[2];
+};
+
+struct fuse_ring_queue {
+ /* back pointer */
+ struct fuse_ring_pool *ring_pool;
+ int qid;
+ int numa_node;
+ pthread_t tid;
+ int eventfd;
+ size_t req_header_sz;
+ struct io_uring ring;
+
+ /* size depends on queue depth */
+ struct fuse_ring_ent ent[];
+};
+
+/**
+ * Main fuse_ring structure, holds all fuse-ring data
+ */
+struct fuse_ring_pool {
+ struct fuse_session *se;
+
+ /* number of queues */
+ size_t nr_queues;
+
+ /* number of per queue entries */
+ size_t queue_depth;
+
+ /* max payload size for fuse requests*/
+ size_t max_req_payload_sz;
+
+ /* size of a single queue */
+ size_t queue_mem_size;
+
+ /* pointer to the first queue */
+ struct fuse_ring_queue *queues;
+};
+
+static size_t
+fuse_ring_queue_size(const size_t q_depth)
+{
+ const size_t req_size = sizeof(struct fuse_ring_ent) * q_depth;
+
+ return sizeof(struct fuse_ring_queue) + req_size;
+}
+
+static struct fuse_ring_queue *
+fuse_uring_get_queue(struct fuse_ring_pool *fuse_ring, int qid)
+{
+ void *ptr =
+ ((char *)fuse_ring->queues) + (qid * fuse_ring->queue_mem_size);
+
+ return ptr;
+}
+
+/**
+ * return a pointer to the 80B area
+ */
+static void *fuse_uring_get_sqe_cmd(struct io_uring_sqe *sqe)
+{
+ return (void *)&sqe->cmd[0];
+}
+
+static void fuse_uring_sqe_set_req_data(struct fuse_uring_cmd_req *req,
+ const unsigned int qid,
+ const unsigned int commit_id)
+{
+ req->qid = qid;
+ req->commit_id = commit_id;
+ req->flags = 0;
+}
+
+static void
+fuse_uring_sqe_prepare(struct io_uring_sqe *sqe, struct fuse_ring_ent *req,
+ __u32 cmd_op)
+{
+ /* These fields should be written once, never change */
+ sqe->opcode = IORING_OP_URING_CMD;
+
+ /*
+ * IOSQE_FIXED_FILE: fd is the index to the fd *array*
+ * given to io_uring_register_files()
+ */
+ sqe->flags = IOSQE_FIXED_FILE;
+ sqe->fd = 0;
+
+ sqe->rw_flags = 0;
+ sqe->ioprio = 0;
+ sqe->off = 0;
+
+ io_uring_sqe_set_data(sqe, req);
+
+ sqe->cmd_op = cmd_op;
+ sqe->__pad1 = 0;
+}
+
+static int fuse_queue_setup_io_uring(struct io_uring *ring, size_t qid,
+ size_t depth, int fd, int evfd)
+{
+ int rc;
+ struct io_uring_params params = {0};
+ int files[2] = { fd, evfd };
+
+ depth += 1; /* for the eventfd poll SQE */
+
+ params.flags = IORING_SETUP_SQE128;
+
+ /* Avoid cq overflow */
+ params.flags |= IORING_SETUP_CQSIZE;
+ params.cq_entries = depth * 2;
+
+ /* These flags should help to increase performance, but actually
+ * make it a bit slower - reason should get investigated.
+ */
+ if (0) {
+ /* Has the main slow down effect */
+ params.flags |= IORING_SETUP_SINGLE_ISSUER;
+
+ // params.flags |= IORING_SETUP_DEFER_TASKRUN;
+ params.flags |= IORING_SETUP_TASKRUN_FLAG;
+
+ /* Second main effect to make it slower */
+ params.flags |= IORING_SETUP_COOP_TASKRUN;
+ }
+
+ rc = io_uring_queue_init_params(depth, ring, &params);
+ if (rc != 0) {
+ fuse_log(FUSE_LOG_ERR, "Failed to setup qid %zu: %d (%s)\n",
+ qid, rc, strerror(-rc));
+ return rc;
+ }
+
+ rc = io_uring_register_files(ring, files, 1);
+ if (rc != 0) {
+ rc = -errno;
+ fuse_log(FUSE_LOG_ERR,
+ "Failed to register files for ring idx %zu: %s",
+ qid, strerror(errno));
+ return rc;
+ }
+
+ return 0;
+}
+
+static void fuse_session_destruct_uring(struct fuse_ring_pool *fuse_ring)
+{
+ for (size_t qid = 0; qid < fuse_ring->nr_queues; qid++) {
+ struct fuse_ring_queue *queue =
+ fuse_uring_get_queue(fuse_ring, qid);
+
+ if (queue->tid != 0) {
+ int value = 1;
+
+ write(queue->eventfd, &value, sizeof(value));
+ pthread_join(queue->tid, NULL);
+ queue->tid = 0;
+ }
+
+ if (queue->eventfd >= 0) {
+ close(queue->eventfd);
+ queue->eventfd = -1;
+ }
+
+ if (queue->ring.ring_fd != -1)
+ io_uring_queue_exit(&queue->ring);
+
+ for (size_t idx = 0; idx < fuse_ring->queue_depth; idx++) {
+ struct fuse_ring_ent *ent = &queue->ent[idx];
+
+ numa_free(ent->op_payload, ent->req_payload_sz);
+ numa_free(ent->req_header, queue->req_header_sz);
+ }
+ }
+
+ free(fuse_ring->queues);
+ free(fuse_ring);
+}
+
+static int fuse_uring_prepare_fetch_sqes(struct fuse_ring_queue *queue)
+{
+ struct fuse_ring_pool *ring_pool = queue->ring_pool;
+ unsigned int sq_ready;
+ struct io_uring_sqe *sqe;
+
+ for (size_t idx = 0; idx < ring_pool->queue_depth; idx++) {
+ struct fuse_ring_ent *ent = &queue->ent[idx];
+
+ sqe = io_uring_get_sqe(&queue->ring);
+ if (sqe == NULL) {
+ /* All SQEs are idle here - no good reason this
+ * could fail
+ */
+
+ fuse_log(FUSE_LOG_ERR, "Failed to get all ring SQEs");
+ return -EIO;
+ }
+
+ fuse_uring_sqe_prepare(sqe, ent, FUSE_IO_URING_CMD_REGISTER);
+
+ /* only needed for fetch */
+ ent->iov[0].iov_base = ent->req_header;
+ ent->iov[0].iov_len = queue->req_header_sz;
+
+ ent->iov[1].iov_base = ent->op_payload;
+ ent->iov[1].iov_len = ent->req_payload_sz;
+
+ sqe->addr = (uint64_t)(ent->iov);
+ sqe->len = 2;
+
+ /* this is a fetch, kernel does not read commit id */
+ fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe),
+ queue->qid, 0);
+ }
+
+ sq_ready = io_uring_sq_ready(&queue->ring);
+ if (sq_ready != ring_pool->queue_depth) {
+ fuse_log(FUSE_LOG_ERR,
+ "SQE ready mismatch, expected %d got %d\n",
+ ring_pool->queue_depth, sq_ready);
+ return -EINVAL;
+ }
+
+ // Add the poll SQE for the eventfd to wake up on teardown
+ sqe = io_uring_get_sqe(&queue->ring);
+ if (sqe == NULL) {
+ fuse_log(FUSE_LOG_ERR, "Failed to get eventfd SQE");
+ return -EIO;
+ }
+
+ io_uring_prep_poll_add(sqe, queue->eventfd, POLLIN);
+ io_uring_sqe_set_data(sqe, (void *)(uintptr_t)queue->eventfd);
+
+ io_uring_submit(&queue->ring);
+
+ return 0;
+}
+
+static struct fuse_ring_pool *fuse_create_ring(struct fuse_session *se)
+{
+ struct fuse_ring_pool *fuse_ring = NULL;
+ const size_t nr_queues = get_nprocs_conf();
+ size_t payload_sz = se->bufsize - FUSE_BUFFER_HEADER_SIZE;
+ size_t queue_sz;
+
+ if (se->debug)
+ fuse_log(FUSE_LOG_DEBUG, "starting io-uring q-depth=%d\n",
+ se->uring.q_depth);
+
+ fuse_ring = calloc(1, sizeof(*fuse_ring));
+ if (fuse_ring == NULL) {
+ fuse_log(FUSE_LOG_ERR, "Allocating the ring failed\n");
+ goto err;
+ }
+
+ queue_sz = fuse_ring_queue_size(se->uring.q_depth);
+ fuse_ring->queues = calloc(1, queue_sz * nr_queues);
+ if (fuse_ring->queues == NULL) {
+ fuse_log(FUSE_LOG_ERR, "Allocating the queues failed\n");
+ goto err;
+ }
+
+ fuse_ring->se = se;
+ fuse_ring->nr_queues = nr_queues;
+ fuse_ring->queue_depth = se->uring.q_depth;
+ fuse_ring->max_req_payload_sz = payload_sz;
+ fuse_ring->queue_mem_size = queue_sz;
+
+ /*
+ * very basic queue initialization, that cannot fail and will
+ * allow easy cleanup if something (like mmap) fails in the middle
+ * below
+ */
+ for (size_t qid = 0; qid < nr_queues; qid++) {
+ struct fuse_ring_queue *queue =
+ fuse_uring_get_queue(fuse_ring, qid);
+
+ queue->ring.ring_fd = -1;
+ queue->numa_node = numa_node_of_cpu(qid);
+ queue->qid = qid;
+ queue->ring_pool = fuse_ring;
+ }
+
+ return fuse_ring;
+
+err:
+ if (fuse_ring)
+ fuse_session_destruct_uring(fuse_ring);
+
+ return NULL;
+}
+
+/* stub function */
+static int fuse_uring_queue_handle_cqes(struct fuse_ring_queue *queue)
+{
+ (void)queue;
+
+ return 0;
+}
+
+/**
+ * In per-core-queue configuration we have thread per core - the thread
+ * to that core
+ */
+static void fuse_uring_set_thread_core(int qid)
+{
+ cpu_set_t mask;
+ int rc;
+
+ CPU_ZERO(&mask);
+ CPU_SET(qid, &mask);
+ rc = sched_setaffinity(0, sizeof(cpu_set_t), &mask);
+ if (rc != 0)
+ fuse_log(FUSE_LOG_ERR, "Failed to bind qid=%d to its core: %s\n",
+ qid, strerror(errno));
+
+ if (0) {
+ const int policy = SCHED_IDLE;
+ const struct sched_param param = {
+ .sched_priority = sched_get_priority_min(policy),
+ };
+
+ /* Set the lowest possible priority, so that the application
+ * submitting requests is not moved away from the current core.
+ */
+ rc = sched_setscheduler(0, policy, &param);
+ if (rc != 0)
+ fuse_log(FUSE_LOG_ERR, "Failed to set scheduler: %s\n",
+ strerror(errno));
+ }
+}
+
+/*
+ * @return negative error code or io-uring file descriptor
+ */
+static int fuse_uring_init_queue(struct fuse_ring_queue *queue)
+{
+ struct fuse_ring_pool *ring = queue->ring_pool;
+ struct fuse_session *se = ring->se;
+ int res;
+ size_t page_sz = sysconf(_SC_PAGESIZE);
+
+ queue->eventfd = eventfd(0, EFD_CLOEXEC);
+ if (queue->eventfd < 0) {
+ res = -errno;
+ fuse_log(FUSE_LOG_ERR,
+ "Failed to create eventfd for qid %d: %s\n",
+ queue->qid, strerror(errno));
+ return res;
+ }
+
+ res = fuse_queue_setup_io_uring(&queue->ring, queue->qid,
+ ring->queue_depth, se->fd,
+ queue->eventfd);
+ if (res != 0) {
+ fuse_log(FUSE_LOG_ERR, "qid=%d io_uring init failed\n",
+ queue->qid);
+ goto err;
+ }
+
+ queue->req_header_sz = ROUND_UP(sizeof(struct fuse_ring_ent),
+ page_sz);
+
+ for (size_t idx = 0; idx < ring->queue_depth; idx++) {
+ struct fuse_ring_ent *ring_ent = &queue->ent[idx];
+ struct fuse_req *req = &ring_ent->req;
+
+ ring_ent->ring_queue = queue;
+
+ /*
+ * Also allocate the header to have it page aligned, which
+ * is a requirement for page pinning
+ */
+ ring_ent->req_header =
+ numa_alloc_local(queue->req_header_sz);
+ ring_ent->req_payload_sz = ring->max_req_payload_sz;
+
+ ring_ent->op_payload =
+ numa_alloc_local(ring_ent->req_payload_sz);
+
+ req->se = se;
+ pthread_mutex_init(&req->lock, NULL);
+ req->is_uring = true;
+ req->ref_cnt = 1;
+ }
+
+ res = fuse_uring_prepare_fetch_sqes(queue);
+ if (res != 0) {
+ fuse_log(
+ FUSE_LOG_ERR,
+ "Grave fuse-uring error on preparing SQEs, aborting\n");
+ se->error = -EIO;
+ fuse_session_exit(se);
+ }
+
+ return queue->ring.ring_fd;
+
+err:
+ close(queue->eventfd);
+ return res;
+}
+
+static void *fuse_uring_thread(void *arg)
+{
+ struct fuse_ring_queue *queue = arg;
+ struct fuse_ring_pool *ring_pool = queue->ring_pool;
+ struct fuse_session *se = ring_pool->se;
+ int err;
+ char thread_name[16] = { 0 };
+
+ snprintf(thread_name, 16, "fuse-ring-%d", queue->qid);
+ thread_name[15] = '\0';
+ fuse_set_thread_name(pthread_self(), thread_name);
+
+ fuse_uring_set_thread_core(queue->qid);
+
+ err = fuse_uring_init_queue(queue);
+ if (err < 0) {
+ fuse_log(FUSE_LOG_ERR, "qid=%d queue setup failed\n",
+ queue->qid);
+ goto err;
+ }
+
+ /* Not using fuse_session_exited(se), as that cannot be inlined */
+ while (!atomic_load_explicit(&se->mt_exited, memory_order_relaxed)) {
+ io_uring_submit_and_wait(&queue->ring, 1);
+
+ err = fuse_uring_queue_handle_cqes(queue);
+ if (err < 0) {
+ /*
+ * fuse-over-io-uring is not supported, operation can
+ * continue over /dev/fuse
+ */
+ if (err == -EOPNOTSUPP)
+ goto ret;
+ goto err;
+ }
+ }
+
+ return NULL;
+
+err:
+ fuse_session_exit(se);
+ret:
+ return NULL;
+}
+
+static int fuse_uring_start_ring_threads(struct fuse_ring_pool *ring)
+{
+ int rc = 0;
+
+ for (size_t qid = 0; qid < ring->nr_queues; qid++) {
+ struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+ rc = pthread_create(&queue->tid, NULL, fuse_uring_thread, queue);
+ if (rc != 0)
+ break;
+ }
+
+ return rc;
+}
+
+static int fuse_uring_sanity_check(void)
+{
+ _Static_assert(sizeof(struct fuse_uring_cmd_req) <=
+ FUSE_URING_MAX_SQE128_CMD_DATA,
+ "SQE128_CMD_DATA has 80B cmd data");
+
+ return 0;
+}
+
+int fuse_uring_start(struct fuse_session *se)
+{
+ int err = 0;
+ struct fuse_ring_pool *fuse_ring;
+
+ fuse_uring_sanity_check();
+
+ fuse_ring = fuse_create_ring(se);
+ if (fuse_ring == NULL) {
+ err = -EADDRNOTAVAIL;
+ goto out;
+ }
+
+ se->uring.pool = fuse_ring;
+
+ err = fuse_uring_start_ring_threads(fuse_ring);
+out:
+ return err;
+}
+
+int fuse_uring_stop(struct fuse_session *se)
+{
+ struct fuse_ring_pool *ring = se->uring.pool;
+
+ if (ring == NULL)
+ return 0;
+
+ fuse_session_destruct_uring(ring);
+
+ return 0;
+}
diff --git a/lib/fuse_uring_i.h b/lib/fuse_uring_i.h
new file mode 100644
index 0000000..fefb8a0
--- /dev/null
+++ b/lib/fuse_uring_i.h
@@ -0,0 +1,18 @@
+/*
+ * FUSE: Filesystem in Userspace
+ * Copyright (C) 2025 Bernd Schubert <bschubert@ddn.com>
+ * This program can be distributed under the terms of the GNU LGPLv2.
+ * See the file COPYING.LIB
+ */
+
+#ifndef FUSE_URING_I_H_
+#define FUSE_URING_I_H_
+
+#include "fuse_lowlevel.h"
+
+struct fuse_in_header;
+
+int fuse_uring_start(struct fuse_session *se);
+int fuse_uring_stop(struct fuse_session *se);
+
+#endif // FUSE_URING_I_H_
diff --git a/lib/meson.build b/lib/meson.build
index 6a52d06..fcd9574 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -19,6 +19,14 @@ if private_cfg.get('HAVE_ICONV')
endif
endif
+if private_cfg.get('HAVE_URING', false)
+ libfuse_sources += [ 'fuse_uring.c' ]
+ deps += [ dependency('liburing') ]
+ deps += [ dependency('numa') ]
+endif
+
+
+
libdl = cc.find_library('dl', required: false)
if libdl.found()
deps += [ libdl ]
diff --git a/meson.build b/meson.build
index 3615a5a..b663109 100644
--- a/meson.build
+++ b/meson.build
@@ -132,6 +132,32 @@ private_cfg.set('HAVE_STRUCT_STAT_ST_ATIMESPEC',
private_cfg.set('USDT_ENABLED', get_option('enable-usdt'))
+# Check for liburing with SQE128 support
+code = '''
+#include <liburing.h>
+#include <stdio.h>
+int main(void) {
+ struct io_uring ring;
+ int ret = io_uring_queue_init(1, &ring, 0);
+#ifndef IORING_SETUP_SQE128
+#error "No SQE128 support"
+#endif
+ return ret;
+}'''
+
+liburing = get_option('enable-io-uring') ? \
+ dependency('liburing', required: false) :\
+ dependency('', required: false)
+libnuma = dependency('numa', required: false)
+
+if liburing.found() and libnuma.found()
+ if cc.links(code,
+ name: 'liburing linking and SQE128 support',
+ dependencies: [liburing])
+ private_cfg.set('HAVE_URING', true)
+ endif
+endif
+
#
# Compiler configuration
#
diff --git a/meson_options.txt b/meson_options.txt
index 957c5fc..c1f8fe6 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -24,3 +24,6 @@ option('disable-libc-symbol-version', type : 'boolean', value : false,
option('enable-usdt', type : 'boolean', value : false,
description: 'Enable user statically defined tracepoints for extra observability')
+
+option('enable-io-uring', type: 'boolean', value: true,
+ description: 'Enable fuse-over-io-uring support')
diff --git a/test/ci-build.sh b/test/ci-build.sh
index 40bb79e..86230bd 100755
--- a/test/ci-build.sh
+++ b/test/ci-build.sh
@@ -146,6 +146,12 @@ export CC=clang
export CXX=clang++
sanitized_build "-Ddisable-libc-symbol-version=true"
+# Sanitized build without fuse-io-uring
+export CC=clang
+export CXX=clang++
+sanitized_build "-Denable-io-uring=false"
+
+# Build without any sanitizer
non_sanitized_build
# Documentation.