aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/.gitignore2
-rw-r--r--test/Makefile7
-rw-r--r--test/Makefile.am2
-rw-r--r--test/conftest.py85
-rw-r--r--test/pytest.ini2
-rw-r--r--test/test.c2
-rwxr-xr-xtest/test_examples.py345
-rwxr-xr-xtest/test_fuse.py32
-rw-r--r--test/util.py38
9 files changed, 506 insertions, 9 deletions
diff --git a/test/.gitignore b/test/.gitignore
index 7b95378..b7041be 100644
--- a/test/.gitignore
+++ b/test/.gitignore
@@ -1,2 +1,2 @@
-!Makefile
test
+__pycache__/
diff --git a/test/Makefile b/test/Makefile
deleted file mode 100644
index 738dd53..0000000
--- a/test/Makefile
+++ /dev/null
@@ -1,7 +0,0 @@
-CC=gcc
-CFLAGS=-Wall -W
-
-all: test
-
-clean:
- rm -f *.o test
diff --git a/test/Makefile.am b/test/Makefile.am
new file mode 100644
index 0000000..55d6950
--- /dev/null
+++ b/test/Makefile.am
@@ -0,0 +1,2 @@
+bin_PROGRAMS = test
+test_SOURCES = test.c
diff --git a/test/conftest.py b/test/conftest.py
new file mode 100644
index 0000000..0da2f4b
--- /dev/null
+++ b/test/conftest.py
@@ -0,0 +1,85 @@
+import sys
+import pytest
+import time
+import re
+
+# If a test fails, wait a moment before retrieving the captured
+# stdout/stderr. When using a server process, this makes sure that we capture
+# any potential output of the server that comes *after* a test has failed. For
+# example, if a request handler raises an exception, the server first signals an
+# error to FUSE (causing the test to fail), and then logs the exception. Without
+# the extra delay, the exception will go into nowhere.
+@pytest.mark.hookwrapper
+def pytest_pyfunc_call(pyfuncitem):
+ outcome = yield
+ failed = outcome.excinfo is not None
+ if failed:
+ time.sleep(1)
+
+@pytest.fixture()
+def pass_capfd(request, capfd):
+ '''Provide capfd object to UnitTest instances'''
+ request.instance.capfd = capfd
+
+def check_test_output(capfd):
+ (stdout, stderr) = capfd.readouterr()
+
+ # Write back what we've read (so that it will still be printed.
+ sys.stdout.write(stdout)
+ sys.stderr.write(stderr)
+
+ # Strip out false positives
+ for (pattern, flags, count) in capfd.false_positives:
+ cp = re.compile(pattern, flags)
+ (stdout, cnt) = cp.subn('', stdout, count=count)
+ if count == 0 or count - cnt > 0:
+ stderr = cp.sub('', stderr, count=count - cnt)
+
+ for pattern in ('exception', 'error', 'warning', 'fatal', 'traceback',
+ 'fault', 'crash(?:ed)?', 'abort(?:ed)'):
+ cp = re.compile(r'\b{}\b'.format(pattern), re.IGNORECASE | re.MULTILINE)
+ hit = cp.search(stderr)
+ if hit:
+ raise AssertionError('Suspicious output to stderr (matched "%s")' % hit.group(0))
+ hit = cp.search(stdout)
+ if hit:
+ raise AssertionError('Suspicious output to stdout (matched "%s")' % hit.group(0))
+
+def register_output(self, pattern, count=1, flags=re.MULTILINE):
+ '''Register *pattern* as false positive for output checking
+
+ This prevents the test from failing because the output otherwise
+ appears suspicious.
+ '''
+
+ self.false_positives.append((pattern, flags, count))
+
+# This is a terrible hack that allows us to access the fixtures from the
+# pytest_runtest_call hook. Among a lot of other hidden assumptions, it probably
+# relies on tests running sequential (i.e., don't dare to use e.g. the xdist
+# plugin)
+current_capfd = None
+@pytest.yield_fixture(autouse=True)
+def save_cap_fixtures(request, capfd):
+ global current_capfd
+ capfd.false_positives = []
+
+ # Monkeypatch in a function to register false positives
+ type(capfd).register_output = register_output
+
+ if request.config.getoption('capture') == 'no':
+ capfd = None
+ current_capfd = capfd
+ bak = current_capfd
+ yield
+
+ # Try to catch problems with this hack (e.g. when running tests
+ # simultaneously)
+ assert bak is current_capfd
+ current_capfd = None
+
+@pytest.hookimpl(trylast=True)
+def pytest_runtest_call(item):
+ capfd = current_capfd
+ if capfd is not None:
+ check_test_output(capfd)
diff --git a/test/pytest.ini b/test/pytest.ini
new file mode 100644
index 0000000..bc4af36
--- /dev/null
+++ b/test/pytest.ini
@@ -0,0 +1,2 @@
+[pytest]
+addopts = --verbose --assert=rewrite --tb=native -x
diff --git a/test/test.c b/test/test.c
index 5d5750d..2b38bb2 100644
--- a/test/test.c
+++ b/test/test.c
@@ -993,7 +993,7 @@ static int do_test_open_acc(int flags, const char *flags_str, int mode, int err)
int res;
int fd;
- start_test("open_acc(%s) mode: 0%03o error: '%s'", flags_str, mode,
+ start_test("open_acc(%s) mode: 0%03o message: '%s'", flags_str, mode,
strerror(err));
unlink(testfile);
res = create_file(testfile, data, datalen);
diff --git a/test/test_examples.py b/test/test_examples.py
new file mode 100755
index 0000000..31d2fae
--- /dev/null
+++ b/test/test_examples.py
@@ -0,0 +1,345 @@
+#!/usr/bin/env python3
+
+if __name__ == '__main__':
+ import pytest
+ import sys
+ sys.exit(pytest.main([__file__] + sys.argv[1:]))
+
+import subprocess
+import os
+import sys
+import pytest
+import stat
+import shutil
+import filecmp
+import errno
+from tempfile import NamedTemporaryFile
+from util import wait_for_mount, umount, cleanup
+
+basename = os.path.join(os.path.dirname(__file__), '..')
+TEST_FILE = __file__
+
+with open(TEST_FILE, 'rb') as fh:
+ TEST_DATA = fh.read()
+
+def name_generator(__ctr=[0]):
+ __ctr[0] += 1
+ return 'testfile_%d' % __ctr[0]
+
+@pytest.mark.parametrize("name", ('hello', 'hello_ll'))
+def test_hello(tmpdir, name):
+ mnt_dir = str(tmpdir)
+ cmdline = [os.path.join(basename, 'example', name),
+ '-f', mnt_dir ]
+ mount_process = subprocess.Popen(cmdline)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ assert os.listdir(mnt_dir) == [ 'hello' ]
+ filename = os.path.join(mnt_dir, 'hello')
+ with open(filename, 'r') as fh:
+ assert fh.read() == 'Hello World!\n'
+ with pytest.raises(IOError) as exc_info:
+ open(filename, 'r+')
+ assert exc_info.value.errno == errno.EACCES
+ with pytest.raises(IOError) as exc_info:
+ open(filename + 'does-not-exist', 'r+')
+ assert exc_info.value.errno == errno.ENOENT
+ except:
+ cleanup(mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+@pytest.mark.parametrize("name", ('fusexmp', 'fusexmp_fh'))
+def test_fusexmp_fh(tmpdir, name):
+ mnt_dir = str(tmpdir.mkdir('mnt'))
+ src_dir = str(tmpdir.mkdir('src'))
+
+ cmdline = [os.path.join(basename, 'example', name),
+ '-f', '-o' , 'use_ino,readdir_ino,kernel_cache',
+ mnt_dir ]
+ mount_process = subprocess.Popen(cmdline)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ work_dir = os.path.join(mnt_dir, src_dir)
+ tst_write(work_dir)
+ tst_mkdir(work_dir)
+ tst_symlink(work_dir)
+ tst_mknod(work_dir)
+ if os.getuid() == 0:
+ tst_chown(work_dir)
+ # Underlying fs may not have full nanosecond resolution
+ tst_utimens(work_dir, ns_tol=1000)
+ tst_link(work_dir)
+ tst_readdir(work_dir)
+ tst_statvfs(work_dir)
+ tst_truncate_path(work_dir)
+ tst_truncate_fd(work_dir)
+ tst_unlink(work_dir)
+ tst_passthrough(src_dir, work_dir)
+ except:
+ cleanup(mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+def test_fioc(tmpdir):
+ mnt_dir = str(tmpdir)
+ testfile = os.path.join(mnt_dir, 'fioc')
+ cmdline = [os.path.join(basename, 'example', 'fioc'),
+ '-f', mnt_dir ]
+ mount_process = subprocess.Popen(cmdline)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+
+ base_cmd = [ os.path.join(basename, 'example', 'fioclient'),
+ testfile ]
+ assert subprocess.check_output(base_cmd) == b'0\n'
+ with open(testfile, 'wb') as fh:
+ fh.write(b'foobar')
+ assert subprocess.check_output(base_cmd) == b'6\n'
+ subprocess.check_call(base_cmd + [ '3' ])
+ with open(testfile, 'rb') as fh:
+ assert fh.read()== b'foo'
+ except:
+ cleanup(mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+def test_fsel(tmpdir):
+ mnt_dir = str(tmpdir)
+ cmdline = [os.path.join(basename, 'example', 'fsel'),
+ '-f', mnt_dir ]
+ mount_process = subprocess.Popen(cmdline)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ cmdline = [ os.path.join(basename, 'example', 'fselclient') ]
+ subprocess.check_call(cmdline, cwd=mnt_dir)
+ except:
+ cleanup(mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+def checked_unlink(filename, path, isdir=False):
+ fullname = os.path.join(path, filename)
+ if isdir:
+ os.rmdir(fullname)
+ else:
+ os.unlink(fullname)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert filename not in os.listdir(path)
+
+def tst_mkdir(mnt_dir):
+ dirname = name_generator()
+ fullname = mnt_dir + "/" + dirname
+ os.mkdir(fullname)
+ fstat = os.stat(fullname)
+ assert stat.S_ISDIR(fstat.st_mode)
+ assert os.listdir(fullname) == []
+ assert fstat.st_nlink in (1,2)
+ assert dirname in os.listdir(mnt_dir)
+ checked_unlink(dirname, mnt_dir, isdir=True)
+
+def tst_symlink(mnt_dir):
+ linkname = name_generator()
+ fullname = mnt_dir + "/" + linkname
+ os.symlink("/imaginary/dest", fullname)
+ fstat = os.lstat(fullname)
+ assert stat.S_ISLNK(fstat.st_mode)
+ assert os.readlink(fullname) == "/imaginary/dest"
+ assert fstat.st_nlink == 1
+ assert linkname in os.listdir(mnt_dir)
+ checked_unlink(linkname, mnt_dir)
+
+def tst_mknod(mnt_dir):
+ filename = os.path.join(mnt_dir, name_generator())
+ shutil.copyfile(TEST_FILE, filename)
+ fstat = os.lstat(filename)
+ assert stat.S_ISREG(fstat.st_mode)
+ assert fstat.st_nlink == 1
+ assert os.path.basename(filename) in os.listdir(mnt_dir)
+ assert filecmp.cmp(TEST_FILE, filename, False)
+ checked_unlink(filename, mnt_dir)
+
+def tst_chown(mnt_dir):
+ filename = os.path.join(mnt_dir, name_generator())
+ os.mkdir(filename)
+ fstat = os.lstat(filename)
+ uid = fstat.st_uid
+ gid = fstat.st_gid
+
+ uid_new = uid + 1
+ os.chown(filename, uid_new, -1)
+ fstat = os.lstat(filename)
+ assert fstat.st_uid == uid_new
+ assert fstat.st_gid == gid
+
+ gid_new = gid + 1
+ os.chown(filename, -1, gid_new)
+ fstat = os.lstat(filename)
+ assert fstat.st_uid == uid_new
+ assert fstat.st_gid == gid_new
+
+ checked_unlink(filename, mnt_dir, isdir=True)
+
+def tst_write(mnt_dir):
+ name = os.path.join(mnt_dir, name_generator())
+ shutil.copyfile(TEST_FILE, name)
+ assert filecmp.cmp(name, TEST_FILE, False)
+ checked_unlink(name, mnt_dir)
+
+def tst_unlink(mnt_dir):
+ name = os.path.join(mnt_dir, name_generator())
+ data1 = b'foo'
+ data2 = b'bar'
+
+ with open(os.path.join(mnt_dir, name), 'wb+', buffering=0) as fh:
+ fh.write(data1)
+ checked_unlink(name, mnt_dir)
+ fh.write(data2)
+ fh.seek(0)
+ assert fh.read() == data1+data2
+
+def tst_statvfs(mnt_dir):
+ os.statvfs(mnt_dir)
+
+def tst_link(mnt_dir):
+ name1 = os.path.join(mnt_dir, name_generator())
+ name2 = os.path.join(mnt_dir, name_generator())
+ shutil.copyfile(TEST_FILE, name1)
+ assert filecmp.cmp(name1, TEST_FILE, False)
+ os.link(name1, name2)
+
+ fstat1 = os.lstat(name1)
+ fstat2 = os.lstat(name2)
+
+ assert fstat1 == fstat2
+ assert fstat1.st_nlink == 2
+
+ assert os.path.basename(name2) in os.listdir(mnt_dir)
+ assert filecmp.cmp(name1, name2, False)
+ os.unlink(name2)
+ fstat1 = os.lstat(name1)
+ assert fstat1.st_nlink == 1
+ os.unlink(name1)
+
+def tst_readdir(mnt_dir):
+ dir_ = os.path.join(mnt_dir, name_generator())
+ file_ = dir_ + "/" + name_generator()
+ subdir = dir_ + "/" + name_generator()
+ subfile = subdir + "/" + name_generator()
+
+ os.mkdir(dir_)
+ shutil.copyfile(TEST_FILE, file_)
+ os.mkdir(subdir)
+ shutil.copyfile(TEST_FILE, subfile)
+
+ listdir_is = os.listdir(dir_)
+ listdir_is.sort()
+ listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
+ listdir_should.sort()
+ assert listdir_is == listdir_should
+
+ os.unlink(file_)
+ os.unlink(subfile)
+ os.rmdir(subdir)
+ os.rmdir(dir_)
+
+def tst_truncate_path(mnt_dir):
+ assert len(TEST_DATA) > 1024
+
+ filename = os.path.join(mnt_dir, name_generator())
+ with open(filename, 'wb') as fh:
+ fh.write(TEST_DATA)
+
+ fstat = os.stat(filename)
+ size = fstat.st_size
+ assert size == len(TEST_DATA)
+
+ # Add zeros at the end
+ os.truncate(filename, size + 1024)
+ assert os.stat(filename).st_size == size + 1024
+ with open(filename, 'rb') as fh:
+ assert fh.read(size) == TEST_DATA
+ assert fh.read(1025) == b'\0' * 1024
+
+ # Truncate data
+ os.truncate(filename, size - 1024)
+ assert os.stat(filename).st_size == size - 1024
+ with open(filename, 'rb') as fh:
+ assert fh.read(size) == TEST_DATA[:size-1024]
+
+ os.unlink(filename)
+
+def tst_truncate_fd(mnt_dir):
+ assert len(TEST_DATA) > 1024
+ with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
+ fd = fh.fileno()
+ fh.write(TEST_DATA)
+ fstat = os.fstat(fd)
+ size = fstat.st_size
+ assert size == len(TEST_DATA)
+
+ # Add zeros at the end
+ os.ftruncate(fd, size + 1024)
+ assert os.fstat(fd).st_size == size + 1024
+ fh.seek(0)
+ assert fh.read(size) == TEST_DATA
+ assert fh.read(1025) == b'\0' * 1024
+
+ # Truncate data
+ os.ftruncate(fd, size - 1024)
+ assert os.fstat(fd).st_size == size - 1024
+ fh.seek(0)
+ assert fh.read(size) == TEST_DATA[:size-1024]
+
+def tst_utimens(mnt_dir, ns_tol=0):
+ filename = os.path.join(mnt_dir, name_generator())
+ os.mkdir(filename)
+ fstat = os.lstat(filename)
+
+ atime = fstat.st_atime + 42.28
+ mtime = fstat.st_mtime - 42.23
+ if sys.version_info < (3,3):
+ os.utime(filename, (atime, mtime))
+ else:
+ atime_ns = fstat.st_atime_ns + int(42.28*1e9)
+ mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
+ os.utime(filename, None, ns=(atime_ns, mtime_ns))
+
+ fstat = os.lstat(filename)
+
+ assert abs(fstat.st_atime - atime) < 1e-3
+ assert abs(fstat.st_mtime - mtime) < 1e-3
+ if sys.version_info >= (3,3):
+ assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol
+ assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol
+
+ checked_unlink(filename, mnt_dir, isdir=True)
+
+def tst_passthrough(src_dir, mnt_dir):
+ name = name_generator()
+ src_name = os.path.join(src_dir, name)
+ mnt_name = os.path.join(src_dir, name)
+ assert name not in os.listdir(src_dir)
+ assert name not in os.listdir(mnt_dir)
+ with open(src_name, 'w') as fh:
+ fh.write('Hello, world')
+ assert name in os.listdir(src_dir)
+ assert name in os.listdir(mnt_dir)
+ assert os.stat(src_name) == os.stat(mnt_name)
+
+ name = name_generator()
+ src_name = os.path.join(src_dir, name)
+ mnt_name = os.path.join(src_dir, name)
+ assert name not in os.listdir(src_dir)
+ assert name not in os.listdir(mnt_dir)
+ with open(mnt_name, 'w') as fh:
+ fh.write('Hello, world')
+ assert name in os.listdir(src_dir)
+ assert name in os.listdir(mnt_dir)
+ assert os.stat(src_name) == os.stat(mnt_name)
diff --git a/test/test_fuse.py b/test/test_fuse.py
new file mode 100755
index 0000000..bbba6e0
--- /dev/null
+++ b/test/test_fuse.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python3
+import pytest
+import sys
+
+if __name__ == '__main__':
+ sys.exit(pytest.main([__file__] + sys.argv[1:]))
+
+import subprocess
+import os
+from util import wait_for_mount, umount, cleanup
+
+basename = os.path.join(os.path.dirname(__file__), '..')
+
+def test_fuse(tmpdir):
+ mnt_dir = str(tmpdir.mkdir('mnt'))
+ src_dir = str(tmpdir.mkdir('src'))
+
+ cmdline = [ os.path.join(basename, 'example', 'fusexmp_fh'),
+ '-f', '-o' , 'use_ino,readdir_ino,kernel_cache',
+ mnt_dir ]
+ mount_process = subprocess.Popen(cmdline)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ cmdline = [ os.path.join(basename, 'test', 'test'),
+ os.path.join(mnt_dir, src_dir),
+ ':' + src_dir ]
+ subprocess.check_call(cmdline)
+ except:
+ cleanup(mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
diff --git a/test/util.py b/test/util.py
new file mode 100644
index 0000000..48ec995
--- /dev/null
+++ b/test/util.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python3
+import subprocess
+import pytest
+import os
+import time
+
+def wait_for_mount(mount_process, mnt_dir):
+ elapsed = 0
+ while elapsed < 30:
+ if os.path.ismount(mnt_dir):
+ return True
+ if mount_process.poll() is not None:
+ pytest.fail('file system process terminated prematurely')
+ time.sleep(0.1)
+ elapsed += 0.1
+ pytest.fail("mountpoint failed to come up")
+
+def cleanup(mnt_dir):
+ subprocess.call(['fusermount', '-z', '-u', mnt_dir],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.STDOUT)
+
+def umount(mount_process, mnt_dir):
+ subprocess.check_call(['fusermount', '-z', '-u', mnt_dir])
+ assert not os.path.ismount(mnt_dir)
+
+ # Give mount process a little while to terminate. Popen.wait(timeout)
+ # was only added in 3.3...
+ elapsed = 0
+ while elapsed < 30:
+ code = mount_process.poll()
+ if code is not None:
+ if code == 0:
+ return
+ pytest.fail('file system process terminated with code %s' % (code,))
+ time.sleep(0.1)
+ elapsed += 0.1
+ pytest.fail('mount process did not terminate')