aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBernd Schubert <bschubert@ddn.com>2025-07-20 15:51:55 +0200
committerBernd Schubert <bernd@bsbernd.com>2025-07-22 14:47:12 +0200
commite915a28ec44ba0f5345eed9985e862ebe13104cb (patch)
treeceeefcca95a41fccb911728c3508cbeb553709c1
parent2f092ef1084fe72c6cc26a8cde61ee94329c2f34 (diff)
downloadlibfuse-e915a28ec44ba0f5345eed9985e862ebe13104cb.tar.gz
Split fuse-io-uring startup
Start the ring threads before sending fuse_reply_ok() so that io-uring startup issues can be non-fatal. Signed-off-by: Bernd Schubert <bschubert@ddn.com>
-rw-r--r--lib/fuse_lowlevel.c14
-rw-r--r--lib/fuse_uring.c60
-rw-r--r--lib/fuse_uring_i.h6
3 files changed, 71 insertions, 9 deletions
diff --git a/lib/fuse_lowlevel.c b/lib/fuse_lowlevel.c
index 9731b44..6afcecd 100644
--- a/lib/fuse_lowlevel.c
+++ b/lib/fuse_lowlevel.c
@@ -2897,18 +2897,22 @@ _do_init(fuse_req_t req, const fuse_ino_t nodeid, const void *op_in,
else if (arg->minor < 23)
outargsize = FUSE_COMPAT_22_INIT_OUT_SIZE;
- send_reply_ok(req, &outarg, outargsize);
-
- /* XXX: Split the start, and send SQEs only after send_reply_ok() */
+ /* XXX: Add an option to make non-available io-uring fatal */
if (enable_io_uring) {
int ring_rc = fuse_uring_start(se);
if (ring_rc != 0) {
- fuse_log(FUSE_LOG_ERR, "fuse: failed to start io-uring: %s\n",
+ fuse_log(FUSE_LOG_INFO,
+ "fuse: failed to start io-uring: %s\n",
strerror(ring_rc));
- fuse_session_exit(se);
+ outargflags &= ~FUSE_OVER_IO_URING;
+ enable_io_uring = false;
}
}
+
+ send_reply_ok(req, &outarg, outargsize);
+ if (enable_io_uring)
+ fuse_uring_wake_ring_threads(se);
}
static __attribute__((no_sanitize("thread"))) void
diff --git a/lib/fuse_uring.c b/lib/fuse_uring.c
index 10d3286..985c817 100644
--- a/lib/fuse_uring.c
+++ b/lib/fuse_uring.c
@@ -78,6 +78,15 @@ struct fuse_ring_pool {
/* size of a single queue */
size_t queue_mem_size;
+ unsigned int started_threads;
+ unsigned int failed_threads;
+
+ /* Avoid sending queue entries before FUSE_INIT reply*/
+ sem_t init_sem;
+
+ pthread_cond_t thread_start_cond;
+ pthread_mutex_t thread_start_mutex;
+
/* pointer to the first queue */
struct fuse_ring_queue *queues;
};
@@ -432,7 +441,7 @@ static int fuse_uring_prepare_fetch_sqes(struct fuse_ring_queue *queue)
io_uring_prep_poll_add(sqe, queue->eventfd, POLLIN);
io_uring_sqe_set_data(sqe, (void *)(uintptr_t)queue->eventfd);
- io_uring_submit(&queue->ring);
+ /* Only preparation until here, no submission yet */
return 0;
}
@@ -690,12 +699,24 @@ static void *fuse_uring_thread(void *arg)
fuse_uring_set_thread_core(queue->qid);
err = fuse_uring_init_queue(queue);
+
+ if (err < 0)
+ ring_pool->failed_threads++;
+ pthread_mutex_lock(&ring_pool->thread_start_mutex);
+ if (err < 0)
+ ring_pool->failed_threads++;
+ ring_pool->started_threads++;
+ pthread_cond_broadcast(&ring_pool->thread_start_cond);
+ pthread_mutex_unlock(&ring_pool->thread_start_mutex);
+
if (err < 0) {
fuse_log(FUSE_LOG_ERR, "qid=%d queue setup failed\n",
queue->qid);
- goto err;
+ goto err_non_fatal;
}
+ sem_wait(&ring_pool->init_sem);
+
/* Not using fuse_session_exited(se), as that cannot be inlined */
while (!atomic_load_explicit(&se->mt_exited, memory_order_relaxed)) {
io_uring_submit_and_wait(&queue->ring, 1);
@@ -709,6 +730,7 @@ static void *fuse_uring_thread(void *arg)
err:
fuse_session_exit(se);
+err_non_fatal:
return NULL;
}
@@ -751,13 +773,34 @@ int fuse_uring_start(struct fuse_session *se)
fuse_ring = fuse_create_ring(se);
if (fuse_ring == NULL) {
err = -EADDRNOTAVAIL;
- goto out;
+ goto err;
}
se->uring.pool = fuse_ring;
+ /* Hold off threads from send fuse ring entries (SQEs) */
+ sem_init(&fuse_ring->init_sem, 0, 0);
+ pthread_cond_init(&fuse_ring->thread_start_cond, NULL);
+ pthread_mutex_init(&fuse_ring->thread_start_mutex, NULL);
+
err = fuse_uring_start_ring_threads(fuse_ring);
-out:
+ if (err)
+ goto err;
+
+ while (fuse_ring->started_threads < fuse_ring->nr_queues) {
+ /* Wait for all threads to start */
+ if (fuse_ring->failed_threads != 0) {
+ err = -EADDRNOTAVAIL;
+ goto err;
+ }
+ }
+
+ if (fuse_ring->failed_threads != 0) {
+ err = -EADDRNOTAVAIL;
+ goto err;
+ }
+
+err:
return err;
}
@@ -772,3 +815,12 @@ int fuse_uring_stop(struct fuse_session *se)
return 0;
}
+
+void fuse_uring_wake_ring_threads(struct fuse_session *se)
+{
+ struct fuse_ring_pool *ring = se->uring.pool;
+
+ /* Wake up the threads to let them send SQEs */
+ for (size_t qid = 0; qid < ring->nr_queues; qid++)
+ sem_post(&ring->init_sem);
+}
diff --git a/lib/fuse_uring_i.h b/lib/fuse_uring_i.h
index fc23691..14418ef 100644
--- a/lib/fuse_uring_i.h
+++ b/lib/fuse_uring_i.h
@@ -30,6 +30,7 @@ void fuse_session_process_uring_cqe(struct fuse_session *se,
struct fuse_in_header;
int fuse_uring_start(struct fuse_session *se);
+void fuse_uring_wake_ring_threads(struct fuse_session *se);
int fuse_uring_stop(struct fuse_session *se);
int send_reply_uring(fuse_req_t req, int error, const void *arg,
size_t argsize);
@@ -45,6 +46,11 @@ static inline int fuse_uring_start(struct fuse_session *se FUSE_VAR_UNUSED)
return -ENOTSUP;
}
+static inline void
+fuse_uring_wake_ring_threads(struct fuse_session *se FUSE_VAR_UNUSED)
+{
+}
+
static inline int fuse_uring_stop(struct fuse_session *se FUSE_VAR_UNUSED)
{
return -ENOTSUP;