diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/fuse_i.h | 18 | ||||
-rw-r--r-- | lib/fuse_loop.c | 6 | ||||
-rw-r--r-- | lib/fuse_uring.c | 548 | ||||
-rw-r--r-- | lib/fuse_uring_i.h | 18 | ||||
-rw-r--r-- | lib/meson.build | 8 |
5 files changed, 595 insertions, 3 deletions
diff --git a/lib/fuse_i.h b/lib/fuse_i.h index 89a5c6f..b643e90 100644 --- a/lib/fuse_i.h +++ b/lib/fuse_i.h @@ -6,6 +6,9 @@ See the file COPYING.LIB */ +#ifndef LIB_FUSE_I_H_ +#define LIB_FUSE_I_H_ + #include "fuse.h" #include "fuse_lowlevel.h" #include "util.h" @@ -24,6 +27,7 @@ }) struct mount_opts; +struct fuse_ring_pool; struct fuse_req { struct fuse_session *se; @@ -34,6 +38,7 @@ struct fuse_req { struct fuse_chan *ch; int interrupted; unsigned int ioctl_64bit : 1; + unsigned int is_uring : 1; union { struct { uint64_t unique; @@ -55,6 +60,11 @@ struct fuse_notify_req { struct fuse_notify_req *prev; }; +struct fuse_session_uring { + unsigned int q_depth; + struct fuse_ring_pool *pool; +}; + struct fuse_session { char *mountpoint; int fd; @@ -79,7 +89,8 @@ struct fuse_session { _Atomic size_t bufsize; int error; - /* This is useful if any kind of ABI incompatibility is found at + /* + * This is useful if any kind of ABI incompatibility is found at * a later version, to 'fix' it at run time. */ struct libfuse_version version; @@ -91,6 +102,9 @@ struct fuse_session { /* true if reading requests from /dev/fuse are handled internally */ bool buf_reallocable; + + /* io_uring */ + struct fuse_session_uring uring; }; struct fuse_chan { @@ -269,3 +283,5 @@ static inline int convert_to_conn_want_ext(struct fuse_conn_info *conn, return 0; } + +#endif /* LIB_FUSE_I_H_*/ diff --git a/lib/fuse_loop.c b/lib/fuse_loop.c index 410f43f..a27111c 100644 --- a/lib/fuse_loop.c +++ b/lib/fuse_loop.c @@ -11,7 +11,7 @@ #include "fuse_config.h" #include "fuse_lowlevel.h" #include "fuse_i.h" - +#include "fuse_uring_i.h" #include <stdio.h> #include <stdlib.h> #include <errno.h> @@ -41,6 +41,8 @@ int fuse_session_loop(struct fuse_session *se) res = 0; if(se->error != 0) res = se->error; - fuse_session_reset(se); + + if (se->uring.pool) + fuse_uring_stop(se); return res; } diff --git a/lib/fuse_uring.c b/lib/fuse_uring.c new file mode 100644 index 0000000..ce56716 --- /dev/null +++ b/lib/fuse_uring.c @@ -0,0 +1,548 @@ +/* + * FUSE: Filesystem in Userspace + * Copyright (C) 2025 Bernd Schubert <bschubert@ddn.com> + * + * Implementation of (most of) the low-level FUSE API. The session loop + * functions are implemented in separate files. + * + * This program can be distributed under the terms of the GNU LGPLv2. + * See the file COPYING.LIB + */ + +#define _GNU_SOURCE + +#include "fuse_i.h" +#include "fuse_kernel.h" +#include "fuse_uring_i.h" + +#include <stdlib.h> +#include <liburing.h> +#include <sys/sysinfo.h> +#include <stdint.h> +#include <stdbool.h> +#include <string.h> +#include <unistd.h> +#include <numa.h> +#include <pthread.h> +#include <stdio.h> +#include <linux/sched.h> +#include <poll.h> +#include <sys/eventfd.h> + +/* Size of command data area in SQE when IORING_SETUP_SQE128 is used */ +#define FUSE_URING_MAX_SQE128_CMD_DATA 80 + +struct fuse_ring_ent { + struct fuse_ring_queue *ring_queue; /* back pointer */ + struct fuse_req req; + + struct fuse_uring_req_header *req_header; + void *op_payload; + size_t req_payload_sz; + + /* commit id of a fuse request */ + uint64_t req_commit_id; + + /* header and payload */ + struct iovec iov[2]; +}; + +struct fuse_ring_queue { + /* back pointer */ + struct fuse_ring_pool *ring_pool; + int qid; + int numa_node; + pthread_t tid; + int eventfd; + size_t req_header_sz; + struct io_uring ring; + + /* size depends on queue depth */ + struct fuse_ring_ent ent[]; +}; + +/** + * Main fuse_ring structure, holds all fuse-ring data + */ +struct fuse_ring_pool { + struct fuse_session *se; + + /* number of queues */ + size_t nr_queues; + + /* number of per queue entries */ + size_t queue_depth; + + /* max payload size for fuse requests*/ + size_t max_req_payload_sz; + + /* size of a single queue */ + size_t queue_mem_size; + + /* pointer to the first queue */ + struct fuse_ring_queue *queues; +}; + +static size_t +fuse_ring_queue_size(const size_t q_depth) +{ + const size_t req_size = sizeof(struct fuse_ring_ent) * q_depth; + + return sizeof(struct fuse_ring_queue) + req_size; +} + +static struct fuse_ring_queue * +fuse_uring_get_queue(struct fuse_ring_pool *fuse_ring, int qid) +{ + void *ptr = + ((char *)fuse_ring->queues) + (qid * fuse_ring->queue_mem_size); + + return ptr; +} + +/** + * return a pointer to the 80B area + */ +static void *fuse_uring_get_sqe_cmd(struct io_uring_sqe *sqe) +{ + return (void *)&sqe->cmd[0]; +} + +static void fuse_uring_sqe_set_req_data(struct fuse_uring_cmd_req *req, + const unsigned int qid, + const unsigned int commit_id) +{ + req->qid = qid; + req->commit_id = commit_id; + req->flags = 0; +} + +static void +fuse_uring_sqe_prepare(struct io_uring_sqe *sqe, struct fuse_ring_ent *req, + __u32 cmd_op) +{ + /* These fields should be written once, never change */ + sqe->opcode = IORING_OP_URING_CMD; + + /* + * IOSQE_FIXED_FILE: fd is the index to the fd *array* + * given to io_uring_register_files() + */ + sqe->flags = IOSQE_FIXED_FILE; + sqe->fd = 0; + + sqe->rw_flags = 0; + sqe->ioprio = 0; + sqe->off = 0; + + io_uring_sqe_set_data(sqe, req); + + sqe->cmd_op = cmd_op; + sqe->__pad1 = 0; +} + +static int fuse_queue_setup_io_uring(struct io_uring *ring, size_t qid, + size_t depth, int fd, int evfd) +{ + int rc; + struct io_uring_params params = {0}; + int files[2] = { fd, evfd }; + + depth += 1; /* for the eventfd poll SQE */ + + params.flags = IORING_SETUP_SQE128; + + /* Avoid cq overflow */ + params.flags |= IORING_SETUP_CQSIZE; + params.cq_entries = depth * 2; + + /* These flags should help to increase performance, but actually + * make it a bit slower - reason should get investigated. + */ + if (0) { + /* Has the main slow down effect */ + params.flags |= IORING_SETUP_SINGLE_ISSUER; + + // params.flags |= IORING_SETUP_DEFER_TASKRUN; + params.flags |= IORING_SETUP_TASKRUN_FLAG; + + /* Second main effect to make it slower */ + params.flags |= IORING_SETUP_COOP_TASKRUN; + } + + rc = io_uring_queue_init_params(depth, ring, ¶ms); + if (rc != 0) { + fuse_log(FUSE_LOG_ERR, "Failed to setup qid %zu: %d (%s)\n", + qid, rc, strerror(-rc)); + return rc; + } + + rc = io_uring_register_files(ring, files, 1); + if (rc != 0) { + rc = -errno; + fuse_log(FUSE_LOG_ERR, + "Failed to register files for ring idx %zu: %s", + qid, strerror(errno)); + return rc; + } + + return 0; +} + +static void fuse_session_destruct_uring(struct fuse_ring_pool *fuse_ring) +{ + for (size_t qid = 0; qid < fuse_ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = + fuse_uring_get_queue(fuse_ring, qid); + + if (queue->tid != 0) { + int value = 1; + + write(queue->eventfd, &value, sizeof(value)); + pthread_join(queue->tid, NULL); + queue->tid = 0; + } + + if (queue->eventfd >= 0) { + close(queue->eventfd); + queue->eventfd = -1; + } + + if (queue->ring.ring_fd != -1) + io_uring_queue_exit(&queue->ring); + + for (size_t idx = 0; idx < fuse_ring->queue_depth; idx++) { + struct fuse_ring_ent *ent = &queue->ent[idx]; + + numa_free(ent->op_payload, ent->req_payload_sz); + numa_free(ent->req_header, queue->req_header_sz); + } + } + + free(fuse_ring->queues); + free(fuse_ring); +} + +static int fuse_uring_prepare_fetch_sqes(struct fuse_ring_queue *queue) +{ + struct fuse_ring_pool *ring_pool = queue->ring_pool; + unsigned int sq_ready; + struct io_uring_sqe *sqe; + + for (size_t idx = 0; idx < ring_pool->queue_depth; idx++) { + struct fuse_ring_ent *ent = &queue->ent[idx]; + + sqe = io_uring_get_sqe(&queue->ring); + if (sqe == NULL) { + /* All SQEs are idle here - no good reason this + * could fail + */ + + fuse_log(FUSE_LOG_ERR, "Failed to get all ring SQEs"); + return -EIO; + } + + fuse_uring_sqe_prepare(sqe, ent, FUSE_IO_URING_CMD_REGISTER); + + /* only needed for fetch */ + ent->iov[0].iov_base = ent->req_header; + ent->iov[0].iov_len = queue->req_header_sz; + + ent->iov[1].iov_base = ent->op_payload; + ent->iov[1].iov_len = ent->req_payload_sz; + + sqe->addr = (uint64_t)(ent->iov); + sqe->len = 2; + + /* this is a fetch, kernel does not read commit id */ + fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe), + queue->qid, 0); + } + + sq_ready = io_uring_sq_ready(&queue->ring); + if (sq_ready != ring_pool->queue_depth) { + fuse_log(FUSE_LOG_ERR, + "SQE ready mismatch, expected %d got %d\n", + ring_pool->queue_depth, sq_ready); + return -EINVAL; + } + + // Add the poll SQE for the eventfd to wake up on teardown + sqe = io_uring_get_sqe(&queue->ring); + if (sqe == NULL) { + fuse_log(FUSE_LOG_ERR, "Failed to get eventfd SQE"); + return -EIO; + } + + io_uring_prep_poll_add(sqe, queue->eventfd, POLLIN); + io_uring_sqe_set_data(sqe, (void *)(uintptr_t)queue->eventfd); + + io_uring_submit(&queue->ring); + + return 0; +} + +static struct fuse_ring_pool *fuse_create_ring(struct fuse_session *se) +{ + struct fuse_ring_pool *fuse_ring = NULL; + const size_t nr_queues = get_nprocs_conf(); + size_t payload_sz = se->bufsize - FUSE_BUFFER_HEADER_SIZE; + size_t queue_sz; + + if (se->debug) + fuse_log(FUSE_LOG_DEBUG, "starting io-uring q-depth=%d\n", + se->uring.q_depth); + + fuse_ring = calloc(1, sizeof(*fuse_ring)); + if (fuse_ring == NULL) { + fuse_log(FUSE_LOG_ERR, "Allocating the ring failed\n"); + goto err; + } + + queue_sz = fuse_ring_queue_size(se->uring.q_depth); + fuse_ring->queues = calloc(1, queue_sz * nr_queues); + if (fuse_ring->queues == NULL) { + fuse_log(FUSE_LOG_ERR, "Allocating the queues failed\n"); + goto err; + } + + fuse_ring->se = se; + fuse_ring->nr_queues = nr_queues; + fuse_ring->queue_depth = se->uring.q_depth; + fuse_ring->max_req_payload_sz = payload_sz; + fuse_ring->queue_mem_size = queue_sz; + + /* + * very basic queue initialization, that cannot fail and will + * allow easy cleanup if something (like mmap) fails in the middle + * below + */ + for (size_t qid = 0; qid < nr_queues; qid++) { + struct fuse_ring_queue *queue = + fuse_uring_get_queue(fuse_ring, qid); + + queue->ring.ring_fd = -1; + queue->numa_node = numa_node_of_cpu(qid); + queue->qid = qid; + queue->ring_pool = fuse_ring; + } + + return fuse_ring; + +err: + if (fuse_ring) + fuse_session_destruct_uring(fuse_ring); + + return NULL; +} + +/* stub function */ +static int fuse_uring_queue_handle_cqes(struct fuse_ring_queue *queue) +{ + (void)queue; + + return 0; +} + +/** + * In per-core-queue configuration we have thread per core - the thread + * to that core + */ +static void fuse_uring_set_thread_core(int qid) +{ + cpu_set_t mask; + int rc; + + CPU_ZERO(&mask); + CPU_SET(qid, &mask); + rc = sched_setaffinity(0, sizeof(cpu_set_t), &mask); + if (rc != 0) + fuse_log(FUSE_LOG_ERR, "Failed to bind qid=%d to its core: %s\n", + qid, strerror(errno)); + + if (0) { + const int policy = SCHED_IDLE; + const struct sched_param param = { + .sched_priority = sched_get_priority_min(policy), + }; + + /* Set the lowest possible priority, so that the application + * submitting requests is not moved away from the current core. + */ + rc = sched_setscheduler(0, policy, ¶m); + if (rc != 0) + fuse_log(FUSE_LOG_ERR, "Failed to set scheduler: %s\n", + strerror(errno)); + } +} + +/* + * @return negative error code or io-uring file descriptor + */ +static int fuse_uring_init_queue(struct fuse_ring_queue *queue) +{ + struct fuse_ring_pool *ring = queue->ring_pool; + struct fuse_session *se = ring->se; + int res; + size_t page_sz = sysconf(_SC_PAGESIZE); + + queue->eventfd = eventfd(0, EFD_CLOEXEC); + if (queue->eventfd < 0) { + res = -errno; + fuse_log(FUSE_LOG_ERR, + "Failed to create eventfd for qid %d: %s\n", + queue->qid, strerror(errno)); + return res; + } + + res = fuse_queue_setup_io_uring(&queue->ring, queue->qid, + ring->queue_depth, se->fd, + queue->eventfd); + if (res != 0) { + fuse_log(FUSE_LOG_ERR, "qid=%d io_uring init failed\n", + queue->qid); + goto err; + } + + queue->req_header_sz = ROUND_UP(sizeof(struct fuse_ring_ent), + page_sz); + + for (size_t idx = 0; idx < ring->queue_depth; idx++) { + struct fuse_ring_ent *ring_ent = &queue->ent[idx]; + struct fuse_req *req = &ring_ent->req; + + ring_ent->ring_queue = queue; + + /* + * Also allocate the header to have it page aligned, which + * is a requirement for page pinning + */ + ring_ent->req_header = + numa_alloc_local(queue->req_header_sz); + ring_ent->req_payload_sz = ring->max_req_payload_sz; + + ring_ent->op_payload = + numa_alloc_local(ring_ent->req_payload_sz); + + req->se = se; + pthread_mutex_init(&req->lock, NULL); + req->is_uring = true; + req->ref_cnt = 1; + } + + res = fuse_uring_prepare_fetch_sqes(queue); + if (res != 0) { + fuse_log( + FUSE_LOG_ERR, + "Grave fuse-uring error on preparing SQEs, aborting\n"); + se->error = -EIO; + fuse_session_exit(se); + } + + return queue->ring.ring_fd; + +err: + close(queue->eventfd); + return res; +} + +static void *fuse_uring_thread(void *arg) +{ + struct fuse_ring_queue *queue = arg; + struct fuse_ring_pool *ring_pool = queue->ring_pool; + struct fuse_session *se = ring_pool->se; + int err; + char thread_name[16] = { 0 }; + + snprintf(thread_name, 16, "fuse-ring-%d", queue->qid); + thread_name[15] = '\0'; + fuse_set_thread_name(pthread_self(), thread_name); + + fuse_uring_set_thread_core(queue->qid); + + err = fuse_uring_init_queue(queue); + if (err < 0) { + fuse_log(FUSE_LOG_ERR, "qid=%d queue setup failed\n", + queue->qid); + goto err; + } + + /* Not using fuse_session_exited(se), as that cannot be inlined */ + while (!atomic_load_explicit(&se->mt_exited, memory_order_relaxed)) { + io_uring_submit_and_wait(&queue->ring, 1); + + err = fuse_uring_queue_handle_cqes(queue); + if (err < 0) { + /* + * fuse-over-io-uring is not supported, operation can + * continue over /dev/fuse + */ + if (err == -EOPNOTSUPP) + goto ret; + goto err; + } + } + + return NULL; + +err: + fuse_session_exit(se); +ret: + return NULL; +} + +static int fuse_uring_start_ring_threads(struct fuse_ring_pool *ring) +{ + int rc = 0; + + for (size_t qid = 0; qid < ring->nr_queues; qid++) { + struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid); + + rc = pthread_create(&queue->tid, NULL, fuse_uring_thread, queue); + if (rc != 0) + break; + } + + return rc; +} + +static int fuse_uring_sanity_check(void) +{ + _Static_assert(sizeof(struct fuse_uring_cmd_req) <= + FUSE_URING_MAX_SQE128_CMD_DATA, + "SQE128_CMD_DATA has 80B cmd data"); + + return 0; +} + +int fuse_uring_start(struct fuse_session *se) +{ + int err = 0; + struct fuse_ring_pool *fuse_ring; + + fuse_uring_sanity_check(); + + fuse_ring = fuse_create_ring(se); + if (fuse_ring == NULL) { + err = -EADDRNOTAVAIL; + goto out; + } + + se->uring.pool = fuse_ring; + + err = fuse_uring_start_ring_threads(fuse_ring); +out: + return err; +} + +int fuse_uring_stop(struct fuse_session *se) +{ + struct fuse_ring_pool *ring = se->uring.pool; + + if (ring == NULL) + return 0; + + fuse_session_destruct_uring(ring); + + return 0; +} diff --git a/lib/fuse_uring_i.h b/lib/fuse_uring_i.h new file mode 100644 index 0000000..fefb8a0 --- /dev/null +++ b/lib/fuse_uring_i.h @@ -0,0 +1,18 @@ +/* + * FUSE: Filesystem in Userspace + * Copyright (C) 2025 Bernd Schubert <bschubert@ddn.com> + * This program can be distributed under the terms of the GNU LGPLv2. + * See the file COPYING.LIB + */ + +#ifndef FUSE_URING_I_H_ +#define FUSE_URING_I_H_ + +#include "fuse_lowlevel.h" + +struct fuse_in_header; + +int fuse_uring_start(struct fuse_session *se); +int fuse_uring_stop(struct fuse_session *se); + +#endif // FUSE_URING_I_H_ diff --git a/lib/meson.build b/lib/meson.build index 6a52d06..fcd9574 100644 --- a/lib/meson.build +++ b/lib/meson.build @@ -19,6 +19,14 @@ if private_cfg.get('HAVE_ICONV') endif endif +if private_cfg.get('HAVE_URING', false) + libfuse_sources += [ 'fuse_uring.c' ] + deps += [ dependency('liburing') ] + deps += [ dependency('numa') ] +endif + + + libdl = cc.find_library('dl', required: false) if libdl.found() deps += [ libdl ] |