diff options
Diffstat (limited to 'lib/fuse_uring.c')
-rw-r--r-- | lib/fuse_uring.c | 60 |
1 files changed, 56 insertions, 4 deletions
diff --git a/lib/fuse_uring.c b/lib/fuse_uring.c index 10d3286..985c817 100644 --- a/lib/fuse_uring.c +++ b/lib/fuse_uring.c @@ -78,6 +78,15 @@ struct fuse_ring_pool { /* size of a single queue */ size_t queue_mem_size; + unsigned int started_threads; + unsigned int failed_threads; + + /* Avoid sending queue entries before FUSE_INIT reply*/ + sem_t init_sem; + + pthread_cond_t thread_start_cond; + pthread_mutex_t thread_start_mutex; + /* pointer to the first queue */ struct fuse_ring_queue *queues; }; @@ -432,7 +441,7 @@ static int fuse_uring_prepare_fetch_sqes(struct fuse_ring_queue *queue) io_uring_prep_poll_add(sqe, queue->eventfd, POLLIN); io_uring_sqe_set_data(sqe, (void *)(uintptr_t)queue->eventfd); - io_uring_submit(&queue->ring); + /* Only preparation until here, no submission yet */ return 0; } @@ -690,12 +699,24 @@ static void *fuse_uring_thread(void *arg) fuse_uring_set_thread_core(queue->qid); err = fuse_uring_init_queue(queue); + + if (err < 0) + ring_pool->failed_threads++; + pthread_mutex_lock(&ring_pool->thread_start_mutex); + if (err < 0) + ring_pool->failed_threads++; + ring_pool->started_threads++; + pthread_cond_broadcast(&ring_pool->thread_start_cond); + pthread_mutex_unlock(&ring_pool->thread_start_mutex); + if (err < 0) { fuse_log(FUSE_LOG_ERR, "qid=%d queue setup failed\n", queue->qid); - goto err; + goto err_non_fatal; } + sem_wait(&ring_pool->init_sem); + /* Not using fuse_session_exited(se), as that cannot be inlined */ while (!atomic_load_explicit(&se->mt_exited, memory_order_relaxed)) { io_uring_submit_and_wait(&queue->ring, 1); @@ -709,6 +730,7 @@ static void *fuse_uring_thread(void *arg) err: fuse_session_exit(se); +err_non_fatal: return NULL; } @@ -751,13 +773,34 @@ int fuse_uring_start(struct fuse_session *se) fuse_ring = fuse_create_ring(se); if (fuse_ring == NULL) { err = -EADDRNOTAVAIL; - goto out; + goto err; } se->uring.pool = fuse_ring; + /* Hold off threads from send fuse ring entries (SQEs) */ + sem_init(&fuse_ring->init_sem, 0, 0); + pthread_cond_init(&fuse_ring->thread_start_cond, NULL); + pthread_mutex_init(&fuse_ring->thread_start_mutex, NULL); + err = fuse_uring_start_ring_threads(fuse_ring); -out: + if (err) + goto err; + + while (fuse_ring->started_threads < fuse_ring->nr_queues) { + /* Wait for all threads to start */ + if (fuse_ring->failed_threads != 0) { + err = -EADDRNOTAVAIL; + goto err; + } + } + + if (fuse_ring->failed_threads != 0) { + err = -EADDRNOTAVAIL; + goto err; + } + +err: return err; } @@ -772,3 +815,12 @@ int fuse_uring_stop(struct fuse_session *se) return 0; } + +void fuse_uring_wake_ring_threads(struct fuse_session *se) +{ + struct fuse_ring_pool *ring = se->uring.pool; + + /* Wake up the threads to let them send SQEs */ + for (size_t qid = 0; qid < ring->nr_queues; qid++) + sem_post(&ring->init_sem); +} |