diff --git a/Include/fuse_common.pxd b/Include/fuse_common.pxd index cddbc33..f315fa8 100644 --- a/Include/fuse_common.pxd +++ b/Include/fuse_common.pxd @@ -42,6 +42,11 @@ cdef extern from * nogil: # fuse_common.h should not be included struct fuse_chan: pass + struct fuse_pollhandle: + pass + + void fuse_pollhandle_destroy(fuse_pollhandle *ph) + struct fuse_loop_config: int clone_fd unsigned max_idle_threads diff --git a/Include/fuse_lowlevel.pxd b/Include/fuse_lowlevel.pxd index fe21193..099d6f3 100644 --- a/Include/fuse_lowlevel.pxd +++ b/Include/fuse_lowlevel.pxd @@ -119,6 +119,8 @@ cdef extern from "" nogil: off_t offset, off_t length, fuse_file_info *fi) except * void (*readdirplus) (fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, fuse_file_info *fi) except * + void (*poll) (fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi, + fuse_pollhandle *ph) except * # Reply functions @@ -137,6 +139,7 @@ cdef extern from "" nogil: fuse_buf_copy_flags flags) int fuse_reply_statfs(fuse_req_t req, statvfs *stbuf) int fuse_reply_xattr(fuse_req_t req, size_t count) + int fuse_reply_poll(fuse_req_t req, unsigned revents) size_t fuse_add_direntry(fuse_req_t req, const_char *buf, size_t bufsize, const_char *name, struct_stat *stbuf, @@ -157,6 +160,7 @@ cdef extern from "" nogil: fuse_buf_copy_flags flags) int fuse_lowlevel_notify_retrieve(fuse_session *se, fuse_ino_t ino, size_t size, off_t offset, void *cookie) + int fuse_lowlevel_notify_poll(fuse_pollhandle *ph) # Utility functions void *fuse_req_userdata(fuse_req_t req) diff --git a/src/pyfuse3/__init__.pyi b/src/pyfuse3/__init__.pyi index dfefd2b..5c4b7ae 100644 --- a/src/pyfuse3/__init__.pyi +++ b/src/pyfuse3/__init__.pyi @@ -129,6 +129,10 @@ class FUSEError(Exception): def __init__(self, errno: int) -> None: ... def __str__(self) -> str: ... +class PollHandle: + def __getstate__(self) -> None: ... + def notify(self) -> None: ... + def listdir(path: str) -> List[str]: ... def syncfs(path: str) -> str: ... def setxattr(path: str, name: str, value: bytes, namespace: NamespaceT = ...) -> None: ... diff --git a/src/pyfuse3/__init__.pyx b/src/pyfuse3/__init__.pyx index 2a1e193..e4a233c 100644 --- a/src/pyfuse3/__init__.pyx +++ b/src/pyfuse3/__init__.pyx @@ -504,6 +504,66 @@ cdef class FUSEError(Exception): return strerror(self.errno_) +@cython.freelist(10) +cdef class PollHandle: + ''' + Opaque handle for delivering poll(2) readiness notifications. + + Instances of this class are created by pyfuse3 and passed to + `Operations.poll`. The filesystem may keep a reference and later + call `PollHandle.notify` on the handle to wake up any process currently + blocked in :manpage:`poll(2)`, :manpage:`select(2)` or + :manpage:`epoll_wait(2)` for the corresponding file descriptor. + + A single notification is sufficient to clear all pending waiters; + further notifications on the same handle are harmless but redundant. + + The underlying ``fuse_pollhandle`` is automatically destroyed when + the Python object is garbage collected, so filesystems should simply + drop the reference when the notification is no longer needed. + ''' + + cdef fuse_pollhandle *_ph + + def __cinit__(self): + self._ph = NULL + + def __init__(self): + raise TypeError('PollHandle cannot be instantiated directly') + + def __dealloc__(self): + if self._ph is not NULL: + fuse_pollhandle_destroy(self._ph) + self._ph = NULL + + def __getstate__(self): + raise PicklingError("PollHandle instances can't be pickled") + + def notify(self): + ''' + Notify IO readiness for this poll handle. + + After this returns, any process waiting in :manpage:`poll(2)`, + :manpage:`select(2)` or :manpage:`epoll_wait(2)` on the + corresponding file descriptor will be woken so it can re-poll + the filesystem for the current readiness mask. + + A single notification is enough to clear all pending waiters; + calling this method again on the same handle is harmless but + redundant. The handle remains valid until its Python reference is + dropped, at which point the underlying ``fuse_pollhandle`` is + destroyed. + ''' + + cdef int ret + + with nogil: + ret = fuse_lowlevel_notify_poll(self._ph) + + if ret != 0: + raise OSError(-ret, 'fuse_lowlevel_notify_poll returned: ' + strerror(-ret)) + + def listdir(path): '''Like `os.listdir`, but releases the GIL. diff --git a/src/pyfuse3/_pyfuse3.py b/src/pyfuse3/_pyfuse3.py index 48cb1e5..24de0b0 100644 --- a/src/pyfuse3/_pyfuse3.py +++ b/src/pyfuse3/_pyfuse3.py @@ -35,6 +35,7 @@ EntryAttributes, FileInfo, FUSEError, + PollHandle, ReaddirToken, RequestContext, SetattrFields, @@ -451,6 +452,44 @@ async def fsync(self, fh: FileHandleT, datasync: bool) -> None: raise FUSEError(errno.ENOSYS) + async def poll( + self, + inode: InodeT, + fh: FileHandleT, + poll_handle: Optional["PollHandle"], + ctx: "RequestContext", + ) -> int: + '''Check IO readiness on an open file. + + This method is called when a process performs :manpage:`poll(2)`, + :manpage:`select(2)` or :manpage:`epoll_wait(2)` on a file descriptor + backed by *fh* (returned by a prior `open` or `create` call). *inode* + identifies the inode that *fh* refers to. + + The method must return the bitwise-or of the currently active poll + events (e.g. ``select.POLLIN``, ``select.POLLOUT``, ``select.POLLPRI``). + If no events are currently ready, return ``0``. + + If *poll_handle* is ``None``, the kernel is only asking for the + current readiness mask -- no process is queued waiting for a + notification. The filesystem should just return the current event + bitmask without storing anything. + + If *poll_handle* is not ``None``, the kernel is requesting to be + notified the next time readiness changes. The filesystem should + store the handle and later call `PollHandle.notify` exactly once + when a relevant event becomes available. Each `~Operations.poll` + call produces a fresh handle; storing a new handle implicitly + drops any previously held one (which destroys the underlying + libfuse object). + + If this method raises ``FUSEError(errno.ENOSYS)`` (the default), + the kernel will fall back to a default poll implementation and + will not call this handler again for the lifetime of the mount. + ''' + + raise FUSEError(errno.ENOSYS) + async def opendir(self, inode: InodeT, ctx: "RequestContext") -> FileHandleT: '''Open the directory with inode *inode*. diff --git a/src/pyfuse3/handlers.pxi b/src/pyfuse3/handlers.pxi index a424be2..aa7d8c4 100644 --- a/src/pyfuse3/handlers.pxi +++ b/src/pyfuse3/handlers.pxi @@ -836,6 +836,40 @@ async def fuse_access_async (_Container c): +cdef void fuse_poll (fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi, + fuse_pollhandle *ph): + cdef _Container c = _Container() + cdef PollHandle py_ph + c.req = req + c.ino = ino + if fi is NULL: + c.fh = 0 + else: + c.fh = fi.fh + if ph is NULL: + py_ph = None + else: + py_ph = PollHandle.__new__(PollHandle) + py_ph._ph = ph + save_retval(fuse_poll_async(c, py_ph)) + +async def fuse_poll_async (_Container c, PollHandle py_ph): + cdef int ret + cdef unsigned revents + + ctx = get_request_context(c.req) + try: + result = await operations.poll(c.ino, c.fh, py_ph, ctx) + except FUSEError as e: + ret = fuse_reply_err(c.req, e.errno) + else: + revents = (result if result is not None else 0) + ret = fuse_reply_poll(c.req, revents) + + if ret != 0: + log.error('fuse_poll(): fuse_reply_* failed with %s', strerror(-ret)) + + cdef void fuse_create (fuse_req_t req, fuse_ino_t parent, const_char *name, mode_t mode, fuse_file_info *fi): cdef _Container c = _Container() diff --git a/src/pyfuse3/internal.pxi b/src/pyfuse3/internal.pxi index 336c0b1..08a7dd5 100644 --- a/src/pyfuse3/internal.pxi +++ b/src/pyfuse3/internal.pxi @@ -69,6 +69,7 @@ cdef void init_fuse_ops(): fuse_ops.create = fuse_create fuse_ops.forget_multi = fuse_forget_multi fuse_ops.write_buf = fuse_write_buf + fuse_ops.poll = fuse_poll cdef make_fuse_args(args, fuse_args* f_args): cdef char* arg diff --git a/test/test_fs.py b/test/test_fs.py index ed1bbe0..0fab05a 100755 --- a/test/test_fs.py +++ b/test/test_fs.py @@ -21,6 +21,7 @@ import logging import multiprocessing import os +import select import stat import threading import time @@ -34,6 +35,7 @@ FileInfo, FUSEError, InodeT, + PollHandle, ReaddirToken, RequestContext, ) @@ -59,11 +61,22 @@ def get_mp(): @pytest.fixture() def testfs(tmpdir): + yield from _mount_fs(tmpdir, Fs) + + +@pytest.fixture() +def pollfs(tmpdir): + yield from _mount_fs(tmpdir, PollTestFs) + + +def _mount_fs(tmpdir, fs_class): mnt_dir = str(tmpdir) mp = get_mp() with mp.Manager() as mgr: cross_process = mgr.Namespace() - mount_process = mp.Process(target=run_fs, args=(mnt_dir, cross_process)) + mount_process = mp.Process( + target=run_fs, args=(mnt_dir, cross_process, fs_class) + ) mount_process.start() try: @@ -118,6 +131,38 @@ def test_notify_store(testfs): assert not fs_state.read_called +def test_notify_poll(pollfs): + (mnt_dir, fs_state) = pollfs + path = os.path.join(mnt_dir, 'pollable') + + with open(path, 'rb', buffering=0) as fh: + poller = select.poll() + poller.register(fh.fileno(), select.POLLPRI) + + events = [] + + def poll_wait(): + events.extend(poller.poll(5000)) + + thread = threading.Thread(target=poll_wait) + thread.start() + + deadline = time.monotonic() + 5 + while time.monotonic() < deadline and not fs_state.poll_handle_received: + time.sleep(0.01) + + assert fs_state.poll_called + assert fs_state.poll_handle_received + assert not events + + pyfuse3.setxattr(mnt_dir, 'command', b'poll_ready') + thread.join(5) + assert not thread.is_alive() + assert events + assert events[0][0] == fh.fileno() + assert events[0][1] & select.POLLPRI + + def test_entry_timeout(testfs): (mnt_dir, fs_state) = testfs fs_state.entry_timeout = 1 @@ -267,11 +312,105 @@ async def setxattr(self, inode, name, value, ctx): elif value == b'terminate': pyfuse3.terminate() + else: raise FUSEError(errno.EINVAL) -def run_fs(mountpoint, cross_process): +class PollTestFs(Fs): + def __init__(self, cross_process): + super().__init__(cross_process) + self.poll_name = b"pollable" + self.poll_inode = cast(InodeT, pyfuse3.ROOT_INODE + 2) + self.poll_handle: PollHandle | None = None + self.status.poll_called = False + self.status.poll_handle_received = False + self.status.poll_ready = False + + async def getattr(self, inode, ctx=None): + if inode != self.poll_inode: + return await super().getattr(inode, ctx) + + entry = EntryAttributes() + entry.st_mode = stat.S_IFREG | 0o644 + entry.st_size = 0 + stamp = int(1438467123.985654 * 1e9) + entry.st_atime_ns = stamp + entry.st_ctime_ns = stamp + entry.st_mtime_ns = stamp + entry.st_gid = os.getgid() + entry.st_uid = os.getuid() + entry.st_ino = inode + entry.entry_timeout = self.status.entry_timeout + entry.attr_timeout = self.status.attr_timeout + self.status.getattr_called = True + return entry + + async def lookup(self, parent_inode, name, ctx): + if name != self.poll_name: + return await super().lookup(parent_inode, name, ctx) + if parent_inode != pyfuse3.ROOT_INODE: + raise pyfuse3.FUSEError(errno.ENOENT) + self.status.lookup_called = True + return await self.getattr(self.poll_inode, ctx) + + async def readdir(self, fh, start_id, token): + assert fh == pyfuse3.ROOT_INODE + entries = ( + (self.hello_name, self.hello_inode), + (self.poll_name, self.poll_inode), + ) + for idx, (name, inode) in enumerate(entries): + if idx < start_id: + continue + if not pyfuse3.readdir_reply(token, name, await self.getattr(inode), idx + 1): + break + + async def open(self, inode, flags, ctx): + if inode != self.poll_inode: + return await super().open(inode, flags, ctx) + if flags & os.O_RDWR or flags & os.O_WRONLY: + raise pyfuse3.FUSEError(errno.EACCES) + return FileInfo(fh=FileHandleT(inode)) + + async def read(self, fh, off, size): + if fh != self.poll_inode: + return await super().read(fh, off, size) + return b'' + + async def poll( + self, + inode: InodeT, + fh: FileHandleT, + poll_handle: PollHandle | None, + ctx: RequestContext, + ) -> int: + assert inode == self.poll_inode + assert fh == self.poll_inode + + self.status.poll_called = True + if poll_handle is not None: + self.poll_handle = poll_handle + self.status.poll_handle_received = True + + if self.status.poll_ready: + return select.POLLPRI + + return 0 + + async def setxattr(self, inode, name, value, ctx): + if value != b'poll_ready': + return await super().setxattr(inode, name, value, ctx) + if inode != pyfuse3.ROOT_INODE or name != b'command': + raise FUSEError(errno.ENOTSUP) + self.status.poll_ready = True + if self.poll_handle is None: + raise FUSEError(errno.EINVAL) + self.poll_handle.notify() + self.poll_handle = None + + +def run_fs(mountpoint, cross_process, fs_class=Fs): # Logging (note that we run in a new process, so we can't # rely on direct log capture and instead print to stdout) root_logger = logging.getLogger() @@ -285,7 +424,7 @@ def run_fs(mountpoint, cross_process): root_logger.addHandler(handler) root_logger.setLevel(logging.DEBUG) - testfs = Fs(cross_process) + testfs = fs_class(cross_process) fuse_options = set(pyfuse3.default_options) fuse_options.add('fsname=pyfuse3_testfs') pyfuse3.init(testfs, mountpoint, fuse_options)