From 4e0aea6a96146115e2fb3b8c4a4c75325ad894d7 Mon Sep 17 00:00:00 2001 From: Miklos Szeredi Date: Mon, 8 Nov 2010 17:11:46 +0100 Subject: libfuse: support zero copy writes in lowlevel interface Add new ->write_buf() method to low level interface. This allows passig a generic buffer, either containing a memory buffer or a file descriptor. This allows implementing zero copy writes. Add fuse_session_receive_buf() and fuse_session_process_buf() which may be used in event loop implementations to replace fuse_chan_recv() and fuse_session_process() respectively. --- ChangeLog | 8 ++ include/fuse_common.h | 1 + include/fuse_lowlevel.h | 53 +++++++++ lib/fuse_i.h | 7 ++ lib/fuse_loop.c | 11 +- lib/fuse_loop_mt.c | 11 +- lib/fuse_lowlevel.c | 292 ++++++++++++++++++++++++++++++++++++++++++++---- lib/fuse_session.c | 28 +++++ lib/fuse_versionscript | 2 + 9 files changed, 388 insertions(+), 25 deletions(-) diff --git a/ChangeLog b/ChangeLog index 6b42706..ab5eca1 100644 --- a/ChangeLog +++ b/ChangeLog @@ -18,6 +18,14 @@ * Fix fuse_buf_copy() if already at the end of the buffers + * Add new ->write_buf() method to low level interface. This + allows passig a generic buffer, either containing a memory buffer + or a file descriptor. This allows implementing zero copy writes. + + * Add fuse_session_receive_buf() and fuse_session_process_buf() + which may be used in event loop implementations to replace + fuse_chan_recv() and fuse_session_process() respectively. + 2010-10-14 Miklos Szeredi * Use LTLIBICONV when linking libfuse. This fixes building against diff --git a/include/fuse_common.h b/include/fuse_common.h index 7c651ff..625a536 100644 --- a/include/fuse_common.h +++ b/include/fuse_common.h @@ -101,6 +101,7 @@ struct fuse_file_info { #define FUSE_CAP_DONT_MASK (1 << 6) #define FUSE_CAP_SPLICE_WRITE (1 << 7) #define FUSE_CAP_SPLICE_MOVE (1 << 8) +#define FUSE_CAP_SPLICE_READ (1 << 9) /** * Ioctl flags diff --git a/include/fuse_lowlevel.h b/include/fuse_lowlevel.h index 5592544..9132846 100644 --- a/include/fuse_lowlevel.h +++ b/include/fuse_lowlevel.h @@ -877,6 +877,31 @@ struct fuse_lowlevel_ops { */ void (*poll) (fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi, struct fuse_pollhandle *ph); + + /** + * Write data made available in a buffer + * + * This is a more generic version of the ->write() method. If + * FUSE_CAP_SPLICE_READ is set in fuse_conn_info.want and the + * kernel supports splicing from the fuse device, then the + * data will be made available in pipe for supporting zero + * copy data transfer. + * + * Introduced in version 2.9 + * + * Valid replies: + * fuse_reply_write + * fuse_reply_err + * + * @param req request handle + * @param ino the inode number + * @param bufv buffer containing the data + * @param off offset to write to + * @param fi file information + */ + void (*write_buf) (fuse_req_t req, fuse_ino_t ino, + struct fuse_bufvec *bufv, off_t off, + struct fuse_file_info *fi); }; /** @@ -1394,6 +1419,34 @@ struct fuse_chan *fuse_session_next_chan(struct fuse_session *se, void fuse_session_process(struct fuse_session *se, const char *buf, size_t len, struct fuse_chan *ch); +/** + * Process a raw request supplied in a generic buffer + * + * This is a more generic version of fuse_session_process(). The + * fuse_buf may contain a memory buffer or a pipe file descriptor. + * + * @param se the session + * @param buf the fuse_buf containing the request + * @param ch channel on which the request was received + */ +void fuse_session_process_buf(struct fuse_session *se, + const struct fuse_buf *buf, struct fuse_chan *ch); + +/** + * Receive a raw request supplied in a generic buffer + * + * This is a more generic version of fuse_chan_recv(). The fuse_buf + * supplied to this function contains a suitably allocated memory + * buffer. This may be overwritten with a file descriptor buffer. + * + * @param se the session + * @param buf the fuse_buf to store the request in + * @param chp pointer to the channel + * @return the actual size of the raw request, or -errno on error + */ +int fuse_session_receive_buf(struct fuse_session *se, struct fuse_buf *buf, + struct fuse_chan **chp); + /** * Destroy a session * diff --git a/lib/fuse_i.h b/lib/fuse_i.h index 0206336..6d10b1c 100644 --- a/lib/fuse_i.h +++ b/lib/fuse_i.h @@ -15,6 +15,12 @@ struct fuse_ll; struct fuse_session { struct fuse_session_ops op; + int (*receive_buf)(struct fuse_session *se, struct fuse_buf *buf, + struct fuse_chan **chp); + + void (*process_buf)(void *data, const struct fuse_buf *buf, + struct fuse_chan *ch); + void *data; volatile int exited; @@ -51,6 +57,7 @@ struct fuse_ll { int big_writes; int no_splice_write; int no_splice_move; + int no_splice_read; struct fuse_lowlevel_ops op; int got_init; struct cuse_data *cuse_data; diff --git a/lib/fuse_loop.c b/lib/fuse_loop.c index 104c5d4..b7b4ca4 100644 --- a/lib/fuse_loop.c +++ b/lib/fuse_loop.c @@ -25,12 +25,19 @@ int fuse_session_loop(struct fuse_session *se) while (!fuse_session_exited(se)) { struct fuse_chan *tmpch = ch; - res = fuse_chan_recv(&tmpch, buf, bufsize); + struct fuse_buf fbuf = { + .mem = buf, + .size = bufsize, + }; + + res = fuse_session_receive_buf(se, &fbuf, &tmpch); + if (res == -EINTR) continue; if (res <= 0) break; - fuse_session_process(se, buf, res, tmpch); + + fuse_session_process_buf(se, &fbuf, tmpch); } free(buf); diff --git a/lib/fuse_loop_mt.c b/lib/fuse_loop_mt.c index 05935d5..a76713b 100644 --- a/lib/fuse_loop_mt.c +++ b/lib/fuse_loop_mt.c @@ -70,10 +70,14 @@ static void *fuse_do_work(void *data) while (!fuse_session_exited(mt->se)) { int isforget = 0; struct fuse_chan *ch = mt->prevch; + struct fuse_buf fbuf = { + .mem = w->buf, + .size = w->bufsize, + }; int res; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - res = fuse_chan_recv(&ch, w->buf, w->bufsize); + res = fuse_session_receive_buf(mt->se, &fbuf, &ch); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); if (res == -EINTR) continue; @@ -95,7 +99,8 @@ static void *fuse_do_work(void *data) * This disgusting hack is needed so that zillions of threads * are not created on a burst of FORGET messages */ - if (((struct fuse_in_header *) w->buf)->opcode == FUSE_FORGET) + if (!(fbuf.flags & FUSE_BUF_IS_FD) && + ((struct fuse_in_header *) fbuf.mem)->opcode == FUSE_FORGET) isforget = 1; if (!isforget) @@ -104,7 +109,7 @@ static void *fuse_do_work(void *data) fuse_start_thread(mt); pthread_mutex_unlock(&mt->lock); - fuse_session_process(mt->se, w->buf, res, ch); + fuse_session_process_buf(mt->se, &fbuf, ch); pthread_mutex_lock(&mt->lock); if (!isforget) diff --git a/lib/fuse_lowlevel.c b/lib/fuse_lowlevel.c index 50c4715..9d3fa98 100644 --- a/lib/fuse_lowlevel.c +++ b/lib/fuse_lowlevel.c @@ -22,6 +22,7 @@ #include #include #include +#include #ifndef F_LINUX_SPECIFIC_BASE #define F_LINUX_SPECIFIC_BASE 1024 @@ -40,6 +41,13 @@ struct fuse_pollhandle { struct fuse_ll *f; }; +static size_t pagesize; + +static __attribute__((constructor)) void fuse_ll_init_pagesize(void) +{ + pagesize = getpagesize(); +} + static void convert_stat(const struct stat *stbuf, struct fuse_attr *attr) { attr->ino = stbuf->st_ino; @@ -425,7 +433,7 @@ static struct fuse_ll_pipe *fuse_ll_get_pipe(struct fuse_ll *f) /* *the default size is 16 pages on linux */ - llp->size = getpagesize() * 16; + llp->size = pagesize * 16; llp->can_grow = 1; pthread_setspecific(f->pipe_key, llp); @@ -434,6 +442,15 @@ static struct fuse_ll_pipe *fuse_ll_get_pipe(struct fuse_ll *f) return llp; } +static void fuse_ll_clear_pipe(struct fuse_ll *f) +{ + struct fuse_ll_pipe *llp = pthread_getspecific(f->pipe_key); + if (llp) { + pthread_setspecific(f->pipe_key, NULL); + fuse_ll_pipe_free(llp); + } +} + static int send_reply_iov_buf(fuse_req_t req, const struct iovec *iov, int count, const char *buf, size_t len) { @@ -491,10 +508,6 @@ static int fuse_reply_data_iov(fuse_req_t req, struct iovec *iov, int iov_count, .count = 1, }; - static size_t pagesize = 0; - if (!pagesize) - pagesize = getpagesize(); - if (req->f->broken_splice_nonblock) goto fallback; @@ -677,8 +690,7 @@ static int fuse_reply_data_iov(fuse_req_t req, struct iovec *iov, int iov_count, return 0; clear_pipe: - pthread_setspecific(req->f->pipe_key, NULL); - fuse_ll_pipe_free(llp); + fuse_ll_clear_pipe(req->f); return res; fallback: @@ -1106,6 +1118,50 @@ static void do_write(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) fuse_reply_err(req, ENOSYS); } +static void do_write_buf(fuse_req_t req, fuse_ino_t nodeid, const void *inarg, + const struct fuse_buf *ibuf) +{ + struct fuse_buf buf = *ibuf; + struct fuse_bufvec bufv = { + .buf = &buf, + .count = 1, + }; + struct fuse_write_in *arg = (struct fuse_write_in *) inarg; + struct fuse_file_info fi; + + memset(&fi, 0, sizeof(fi)); + fi.fh = arg->fh; + fi.fh_old = fi.fh; + fi.writepage = arg->write_flags & 1; + + if (req->f->conn.proto_minor < 9) { + buf.mem = ((char *) arg) + FUSE_COMPAT_WRITE_IN_SIZE; + buf.size -= sizeof(struct fuse_in_header) + + FUSE_COMPAT_WRITE_IN_SIZE; + assert(!(buf.flags & FUSE_BUF_IS_FD)); + } else { + fi.lock_owner = arg->lock_owner; + fi.flags = arg->flags; + if (!(buf.flags & FUSE_BUF_IS_FD)) + buf.mem = PARAM(arg); + + buf.size -= sizeof(struct fuse_in_header) + + sizeof(struct fuse_write_in); + } + if (buf.size < arg->size) { + fprintf(stderr, "fuse: do_write_buf: buffer size too small\n"); + fuse_reply_err(req, EIO); + return; + } + buf.size = arg->size; + + req->f->op.write_buf(req, nodeid, &bufv, arg->offset, &fi); + + /* Need to reset the pipe if ->write_buf() didn't consume all data */ + if ((ibuf->flags & FUSE_BUF_IS_FD) && bufv.idx < bufv.count) + fuse_ll_clear_pipe(req->f); +} + static void do_flush(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) { struct fuse_flush_in *arg = (struct fuse_flush_in *) inarg; @@ -1543,10 +1599,13 @@ static void do_init(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) if (req->f->conn.proto_minor >= 14) { f->conn.capable |= FUSE_CAP_SPLICE_WRITE | FUSE_CAP_SPLICE_MOVE; + f->conn.capable |= FUSE_CAP_SPLICE_READ; if (!f->no_splice_write) f->conn.want |= FUSE_CAP_SPLICE_WRITE; if (!f->no_splice_move) f->conn.want |= FUSE_CAP_SPLICE_MOVE; + if (f->op.write_buf && !f->no_splice_read) + f->conn.want |= FUSE_CAP_SPLICE_READ; } if (f->atomic_o_trunc) @@ -1816,26 +1875,63 @@ static const char *opname(enum fuse_opcode opcode) return fuse_ll_ops[opcode].name; } -static void fuse_ll_process(void *data, const char *buf, size_t len, - struct fuse_chan *ch) +static int fuse_ll_copy_from_pipe(struct fuse_buf *dst, + struct fuse_bufvec *srcv) +{ + int res; + struct fuse_bufvec dstv = { .buf = dst, .count = 1 }; + + res = fuse_buf_copy(&dstv, srcv, 0); + if (res < 0) { + fprintf(stderr, "fuse: copy from pipe: %s\n", strerror(-res)); + return res; + } + if (res < dst->size) { + fprintf(stderr, "fuse: copy from pipe: short read\n"); + return -1; + } + return 0; +} + +static void fuse_ll_process_buf(void *data, const struct fuse_buf *buf, + struct fuse_chan *ch) { struct fuse_ll *f = (struct fuse_ll *) data; - struct fuse_in_header *in = (struct fuse_in_header *) buf; - const void *inarg = buf + sizeof(struct fuse_in_header); + const size_t write_header_size = sizeof(struct fuse_in_header) + + sizeof(struct fuse_write_in); + struct fuse_bufvec bufv = { .buf = buf, .count = 1 }; + struct fuse_buf tmpbuf = { .size = write_header_size }; + struct fuse_in_header *in; + const void *inarg; struct fuse_req *req; + void *mbuf = NULL; int err; + int res; - if (f->debug) - fprintf(stderr, - "unique: %llu, opcode: %s (%i), nodeid: %lu, insize: %zu\n", - (unsigned long long) in->unique, - opname((enum fuse_opcode) in->opcode), in->opcode, - (unsigned long) in->nodeid, len); + if (buf->flags & FUSE_BUF_IS_FD) { + if (buf->size < tmpbuf.size) + tmpbuf.size = buf->size; + + mbuf = malloc(tmpbuf.size); + if (mbuf == NULL) { + fprintf(stderr, "fuse: failed to allocate header\n"); + goto clear_pipe; + } + tmpbuf.mem = mbuf; + + res = fuse_ll_copy_from_pipe(&tmpbuf, &bufv); + if (res < 0) + goto clear_pipe; + + in = mbuf; + } else { + in = buf->mem; + } req = (struct fuse_req *) calloc(1, sizeof(struct fuse_req)); if (req == NULL) { fprintf(stderr, "fuse: failed to allocate request\n"); - return; + goto clear_pipe; } req->f = f; @@ -1848,6 +1944,14 @@ static void fuse_ll_process(void *data, const char *buf, size_t len, list_init_req(req); fuse_mutex_init(&req->lock); + if (f->debug) + fprintf(stderr, + "unique: %llu, opcode: %s (%i), nodeid: %lu, insize: %zu\n", + (unsigned long long) in->unique, + opname((enum fuse_opcode) in->opcode), in->opcode, + (unsigned long) in->nodeid, buf->size); + + err = EIO; if (!f->got_init) { enum fuse_opcode expected; @@ -1878,11 +1982,56 @@ static void fuse_ll_process(void *data, const char *buf, size_t len, if (intr) fuse_reply_err(intr, EAGAIN); } - fuse_ll_ops[in->opcode].func(req, in->nodeid, inarg); + + if ((buf->flags & FUSE_BUF_IS_FD) && write_header_size < buf->size && + (in->opcode != FUSE_WRITE || !f->op.write_buf)) { + void *newmbuf; + + err = ENOMEM; + newmbuf = realloc(mbuf, buf->size); + if (newmbuf == NULL) + goto reply_err; + mbuf = newmbuf; + + tmpbuf = (struct fuse_buf) { + .size = buf->size - write_header_size, + .mem = mbuf + write_header_size, + }; + res = fuse_ll_copy_from_pipe(&tmpbuf, &bufv); + err = -res; + if (res < 0) + goto reply_err; + + in = mbuf; + } + + inarg = (void *) &in[1]; + if (in->opcode == FUSE_WRITE && f->op.write_buf) + do_write_buf(req, in->nodeid, inarg, buf); + else + fuse_ll_ops[in->opcode].func(req, in->nodeid, inarg); + +out_free: + free(mbuf); return; - reply_err: +reply_err: fuse_reply_err(req, err); +clear_pipe: + if (buf->flags & FUSE_BUF_IS_FD) + fuse_ll_clear_pipe(f); + goto out_free; +} + +static void fuse_ll_process(void *data, const char *buf, size_t len, + struct fuse_chan *ch) +{ + struct fuse_buf fbuf = { + .mem = (void *) buf, + .size = len, + }; + + fuse_ll_process_buf(data, &fbuf, ch); } enum { @@ -1906,6 +2055,7 @@ static struct fuse_opt fuse_ll_opts[] = { { "big_writes", offsetof(struct fuse_ll, big_writes), 1}, { "no_splice_write", offsetof(struct fuse_ll, no_splice_write), 1}, { "no_splice_move", offsetof(struct fuse_ll, no_splice_move), 1}, + { "no_splice_read", offsetof(struct fuse_ll, no_splice_read), 1}, FUSE_OPT_KEY("max_read=", FUSE_OPT_KEY_DISCARD), FUSE_OPT_KEY("-h", KEY_HELP), FUSE_OPT_KEY("--help", KEY_HELP), @@ -1934,6 +2084,7 @@ static void fuse_ll_help(void) " -o no_remote_lock disable remote file locking\n" " -o no_splice_write don't use splice to write to the fuse device\n" " -o no_splice_move don't move data while splicing to the fuse device\n" +" -o no_splice_read don't use splice to read from the fuse device\n" ); } @@ -1987,6 +2138,104 @@ static void fuse_ll_pipe_destructor(void *data) fuse_ll_pipe_free(llp); } +static int fuse_ll_receive_buf(struct fuse_session *se, struct fuse_buf *buf, + struct fuse_chan **chp) +{ + struct fuse_chan *ch = *chp; + struct fuse_ll *f = fuse_session_data(se); + size_t bufsize = buf->size; + struct fuse_ll_pipe *llp; + struct fuse_buf tmpbuf; + int err; + int res; + + if (f->conn.proto_minor < 14 || !(f->conn.want & FUSE_CAP_SPLICE_READ)) + goto fallback; + + llp = fuse_ll_get_pipe(f); + if (llp == NULL) + goto fallback; + + if (llp->size < bufsize) { + if (llp->can_grow) { + res = fcntl(llp->pipe[0], F_SETPIPE_SZ, bufsize); + if (res == -1) { + llp->can_grow = 0; + goto fallback; + } + llp->size = res; + } + if (llp->size < bufsize) + goto fallback; + } + + res = splice(fuse_chan_fd(ch), NULL, llp->pipe[1], NULL, bufsize, 0); + err = errno; + + if (fuse_session_exited(se)) + return 0; + + if (res == -1) { + if (err == ENODEV) { + fuse_session_exit(se); + return 0; + } + if (err != EINTR && err != EAGAIN) + perror("fuse: splice from device"); + return -err; + } + + if (res < sizeof(struct fuse_in_header)) { + fprintf(stderr, "short splice from fuse device\n"); + return -EIO; + } + + tmpbuf = (struct fuse_buf) { + .size = res, + .flags = FUSE_BUF_IS_FD, + .fd = llp->pipe[0], + }; + + /* + * Don't bother with zero copy for small requests. + * fuse_loop_mt() needs to check for FORGET so this more than + * just an optimization. + */ + if (res < sizeof(struct fuse_in_header) + + sizeof(struct fuse_write_in) + pagesize) { + struct fuse_bufvec src = { .buf = &tmpbuf, .count = 1 }; + struct fuse_bufvec dst = { .buf = buf, .count = 1 }; + + res = fuse_buf_copy(&dst, &src, 0); + if (res < 0) { + fprintf(stderr, "fuse: copy from pipe: %s\n", + strerror(-res)); + fuse_ll_clear_pipe(f); + return res; + } + if (res < tmpbuf.size) { + fprintf(stderr, "fuse: copy from pipe: short read\n"); + fuse_ll_clear_pipe(f); + return -EIO; + } + buf->size = tmpbuf.size; + return buf->size; + } + + *buf = tmpbuf; + + return res; + +fallback: + res = fuse_chan_recv(chp, buf->mem, bufsize); + if (res <= 0) + return res; + + buf->size = res; + + return res; +} + /* * always call fuse_lowlevel_new_common() internally, to work around a * misfeature in the FreeBSD runtime linker, which links the old @@ -2044,6 +2293,9 @@ struct fuse_session *fuse_lowlevel_new_common(struct fuse_args *args, if (!se) goto out_key_destroy; + se->receive_buf = fuse_ll_receive_buf; + se->process_buf = fuse_ll_process_buf; + return se; out_key_destroy: diff --git a/lib/fuse_session.c b/lib/fuse_session.c index 3758627..c55f250 100644 --- a/lib/fuse_session.c +++ b/lib/fuse_session.c @@ -80,6 +80,34 @@ void fuse_session_process(struct fuse_session *se, const char *buf, size_t len, se->op.process(se->data, buf, len, ch); } +void fuse_session_process_buf(struct fuse_session *se, + const struct fuse_buf *buf, struct fuse_chan *ch) +{ + if (se->process_buf) { + se->process_buf(se->data, buf, ch); + } else { + assert(!(buf->flags & FUSE_BUF_IS_FD)); + fuse_session_process(se->data, buf->mem, buf->size, ch); + } +} + +int fuse_session_receive_buf(struct fuse_session *se, struct fuse_buf *buf, + struct fuse_chan **chp) +{ + int res; + + if (se->receive_buf) { + res = se->receive_buf(se, buf, chp); + } else { + res = fuse_chan_recv(chp, buf->mem, buf->size); + if (res > 0) + buf->size = res; + } + + return res; +} + + void fuse_session_destroy(struct fuse_session *se) { if (se->op.destroy) diff --git a/lib/fuse_versionscript b/lib/fuse_versionscript index 860c403..2f531e1 100644 --- a/lib/fuse_versionscript +++ b/lib/fuse_versionscript @@ -185,6 +185,8 @@ FUSE_2.9 { fuse_buf_copy; fuse_buf_size; fuse_reply_data; + fuse_session_process_buf; + fuse_session_receive_buf; local: *; -- cgit v1.2.3