diff options
author | Bernd Schubert <bschubert@ddn.com> | 2025-07-20 15:51:55 +0200 |
---|---|---|
committer | Bernd Schubert <bernd@bsbernd.com> | 2025-07-22 14:47:12 +0200 |
commit | e915a28ec44ba0f5345eed9985e862ebe13104cb (patch) | |
tree | ceeefcca95a41fccb911728c3508cbeb553709c1 /lib | |
parent | 2f092ef1084fe72c6cc26a8cde61ee94329c2f34 (diff) | |
download | libfuse-e915a28ec44ba0f5345eed9985e862ebe13104cb.tar.gz |
Split fuse-io-uring startup
Start the ring threads before sending fuse_reply_ok() so
that io-uring startup issues can be non-fatal.
Signed-off-by: Bernd Schubert <bschubert@ddn.com>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/fuse_lowlevel.c | 14 | ||||
-rw-r--r-- | lib/fuse_uring.c | 60 | ||||
-rw-r--r-- | lib/fuse_uring_i.h | 6 |
3 files changed, 71 insertions, 9 deletions
diff --git a/lib/fuse_lowlevel.c b/lib/fuse_lowlevel.c index 9731b44..6afcecd 100644 --- a/lib/fuse_lowlevel.c +++ b/lib/fuse_lowlevel.c @@ -2897,18 +2897,22 @@ _do_init(fuse_req_t req, const fuse_ino_t nodeid, const void *op_in, else if (arg->minor < 23) outargsize = FUSE_COMPAT_22_INIT_OUT_SIZE; - send_reply_ok(req, &outarg, outargsize); - - /* XXX: Split the start, and send SQEs only after send_reply_ok() */ + /* XXX: Add an option to make non-available io-uring fatal */ if (enable_io_uring) { int ring_rc = fuse_uring_start(se); if (ring_rc != 0) { - fuse_log(FUSE_LOG_ERR, "fuse: failed to start io-uring: %s\n", + fuse_log(FUSE_LOG_INFO, + "fuse: failed to start io-uring: %s\n", strerror(ring_rc)); - fuse_session_exit(se); + outargflags &= ~FUSE_OVER_IO_URING; + enable_io_uring = false; } } + + send_reply_ok(req, &outarg, outargsize); + if (enable_io_uring) + fuse_uring_wake_ring_threads(se); } static __attribute__((no_sanitize("thread"))) void diff --git a/lib/fuse_uring.c b/lib/fuse_uring.c index 10d3286..985c817 100644 --- a/lib/fuse_uring.c +++ b/lib/fuse_uring.c @@ -78,6 +78,15 @@ struct fuse_ring_pool { /* size of a single queue */ size_t queue_mem_size; + unsigned int started_threads; + unsigned int failed_threads; + + /* Avoid sending queue entries before FUSE_INIT reply*/ + sem_t init_sem; + + pthread_cond_t thread_start_cond; + pthread_mutex_t thread_start_mutex; + /* pointer to the first queue */ struct fuse_ring_queue *queues; }; @@ -432,7 +441,7 @@ static int fuse_uring_prepare_fetch_sqes(struct fuse_ring_queue *queue) io_uring_prep_poll_add(sqe, queue->eventfd, POLLIN); io_uring_sqe_set_data(sqe, (void *)(uintptr_t)queue->eventfd); - io_uring_submit(&queue->ring); + /* Only preparation until here, no submission yet */ return 0; } @@ -690,12 +699,24 @@ static void *fuse_uring_thread(void *arg) fuse_uring_set_thread_core(queue->qid); err = fuse_uring_init_queue(queue); + + if (err < 0) + ring_pool->failed_threads++; + pthread_mutex_lock(&ring_pool->thread_start_mutex); + if (err < 0) + ring_pool->failed_threads++; + ring_pool->started_threads++; + pthread_cond_broadcast(&ring_pool->thread_start_cond); + pthread_mutex_unlock(&ring_pool->thread_start_mutex); + if (err < 0) { fuse_log(FUSE_LOG_ERR, "qid=%d queue setup failed\n", queue->qid); - goto err; + goto err_non_fatal; } + sem_wait(&ring_pool->init_sem); + /* Not using fuse_session_exited(se), as that cannot be inlined */ while (!atomic_load_explicit(&se->mt_exited, memory_order_relaxed)) { io_uring_submit_and_wait(&queue->ring, 1); @@ -709,6 +730,7 @@ static void *fuse_uring_thread(void *arg) err: fuse_session_exit(se); +err_non_fatal: return NULL; } @@ -751,13 +773,34 @@ int fuse_uring_start(struct fuse_session *se) fuse_ring = fuse_create_ring(se); if (fuse_ring == NULL) { err = -EADDRNOTAVAIL; - goto out; + goto err; } se->uring.pool = fuse_ring; + /* Hold off threads from send fuse ring entries (SQEs) */ + sem_init(&fuse_ring->init_sem, 0, 0); + pthread_cond_init(&fuse_ring->thread_start_cond, NULL); + pthread_mutex_init(&fuse_ring->thread_start_mutex, NULL); + err = fuse_uring_start_ring_threads(fuse_ring); -out: + if (err) + goto err; + + while (fuse_ring->started_threads < fuse_ring->nr_queues) { + /* Wait for all threads to start */ + if (fuse_ring->failed_threads != 0) { + err = -EADDRNOTAVAIL; + goto err; + } + } + + if (fuse_ring->failed_threads != 0) { + err = -EADDRNOTAVAIL; + goto err; + } + +err: return err; } @@ -772,3 +815,12 @@ int fuse_uring_stop(struct fuse_session *se) return 0; } + +void fuse_uring_wake_ring_threads(struct fuse_session *se) +{ + struct fuse_ring_pool *ring = se->uring.pool; + + /* Wake up the threads to let them send SQEs */ + for (size_t qid = 0; qid < ring->nr_queues; qid++) + sem_post(&ring->init_sem); +} diff --git a/lib/fuse_uring_i.h b/lib/fuse_uring_i.h index fc23691..14418ef 100644 --- a/lib/fuse_uring_i.h +++ b/lib/fuse_uring_i.h @@ -30,6 +30,7 @@ void fuse_session_process_uring_cqe(struct fuse_session *se, struct fuse_in_header; int fuse_uring_start(struct fuse_session *se); +void fuse_uring_wake_ring_threads(struct fuse_session *se); int fuse_uring_stop(struct fuse_session *se); int send_reply_uring(fuse_req_t req, int error, const void *arg, size_t argsize); @@ -45,6 +46,11 @@ static inline int fuse_uring_start(struct fuse_session *se FUSE_VAR_UNUSED) return -ENOTSUP; } +static inline void +fuse_uring_wake_ring_threads(struct fuse_session *se FUSE_VAR_UNUSED) +{ +} + static inline int fuse_uring_stop(struct fuse_session *se FUSE_VAR_UNUSED) { return -ENOTSUP; |