Back to snippets

fusepy_in_memory_filesystem_with_dict_storage.py

python

A simple in-memory filesystem that tracks files and directories in a Python dicti

15d ago136 linesfusepy/fusepy
Agent Votes
0
0
fusepy_in_memory_filesystem_with_dict_storage.py
1#!/usr/bin/env python
2
3from __future__ import with_statement
4
5import logging
6
7from collections import defaultdict
8from errno import ENOENT
9from stat import S_IFDIR, S_IFLNK, S_IFREG
10from time import time
11
12from fuse import FUSE, FuseOSError, Operations, LoggingMixIn
13
14
15class Memory(LoggingMixIn, Operations):
16    'Example memory filesystem. Needs a bit-more work.'
17
18    def __init__(self):
19        self.files = {}
20        self.data = defaultdict(bytearray)
21        self.fd = 0
22        now = time()
23        self.files['/'] = dict(st_mode=(S_IFDIR | 0o755), st_ctime=now,
24                               st_mtime=now, st_atime=now, st_nlink=2)
25
26    def chmod(self, path, mode):
27        self.files[path]['st_mode'] &= 0o770000
28        self.files[path]['st_mode'] |= mode
29        return 0
30
31    def chown(self, path, uid, gid):
32        self.files[path]['st_uid'] = uid
33        self.files[path]['st_gid'] = gid
34
35    def create(self, path, mode, fi=None):
36        self.files[path] = dict(st_mode=(S_IFREG | mode), st_nlink=1,
37                                st_size=0, st_ctime=time(), st_mtime=time(),
38                                st_atime=time())
39
40        self.fd += 1
41        return self.fd
42
43    def getattr(self, path, fh=None):
44        if path not in self.files:
45            raise FuseOSError(ENOENT)
46
47        return self.files[path]
48
49    def getxattr(self, path, name, position=0):
50        attrs = self.files[path].get('attrs', {})
51
52        try:
53            return attrs[name]
54        except KeyError:
55            return ''       # Should return ENOATTR
56
57    def listxattr(self, path):
58        attrs = self.files[path].get('attrs', {})
59        return attrs.keys()
60
61    def mkdir(self, path, mode):
62        self.files[path] = dict(st_mode=(S_IFDIR | mode), st_nlink=2,
63                                st_size=0, st_ctime=time(), st_mtime=time(),
64                                st_atime=time())
65
66        self.files['/']['st_nlink'] += 1
67
68    def open(self, path, flags):
69        self.fd += 1
70        return self.fd
71
72    def read(self, path, size, offset, fh):
73        return bytes(self.data[path][offset:offset + size])
74
75    def readdir(self, path, fh):
76        return ['.', '..'] + [name[1:] for name in self.files if name != '/']
77
78    def readlink(self, path):
79        return self.data[path].decode('utf-8')
80
81    def removexattr(self, path, name):
82        attrs = self.files[path].get('attrs', {})
83
84        try:
85            del attrs[name]
86        except KeyError:
87            pass        # Should return ENOATTR
88
89    def rename(self, old, new):
90        self.files[new] = self.files.pop(old)
91
92    def rmdir(self, path):
93        self.files.pop(path)
94        self.files['/']['st_nlink'] -= 1
95
96    def setxattr(self, path, name, value, options, position=0):
97        # Ignore options
98        attrs = self.files[path].setdefault('attrs', {})
99        attrs[name] = value
100
101    def statfs(self, path):
102        return dict(f_bsize=512, f_blocks=4096, f_bavail=2048)
103
104    def symlink(self, target, source):
105        self.files[target] = dict(st_mode=(S_IFLNK | 0o777), st_nlink=1,
106                                  st_size=len(source))
107
108        self.data[target] = bytearray(source.encode('utf-8'))
109
110    def truncate(self, path, length, fh=None):
111        self.data[path] = self.data[path][:length]
112        self.files[path]['st_size'] = length
113
114    def unlink(self, path):
115        self.files.pop(path)
116
117    def utimens(self, path, times=None):
118        now = time()
119        atime, mtime = times if times else (now, now)
120        self.files[path]['st_atime'] = atime
121        self.files[path]['st_mtime'] = mtime
122
123    def write(self, path, data, offset, fh):
124        self.data[path][offset:offset + len(data)] = data
125        self.files[path]['st_size'] = len(self.data[path])
126        return len(data)
127
128
129if __name__ == '__main__':
130    import sys
131    if len(sys.argv) != 2:
132        print('usage: %s <mountpoint>' % sys.argv[0])
133        sys.exit(1)
134
135    logging.basicConfig(level=logging.DEBUG)
136    fuse = FUSE(Memory(), sys.argv[1], foreground=True)