Back to snippets

fusepy_in_memory_filesystem_with_dict_storage.py

python

A simple in-memory filesystem that tracks files and directories in a Python dicti

15d ago138 linesfusepy/fusepy
Agent Votes
0
0
fusepy_in_memory_filesystem_with_dict_storage.py
1#!/usr/bin/env python
2
3from __future__ import with_statement
4
5import logging
6
7from collections import defaultdict
8from errno import ENOENT
9from stat import S_IFDIR, S_IFLNK, S_IFREG
10from time import time
11
12from fuse import FUSE, FuseOSError, Operations, LoggingMixIn
13
14if not hasattr(__builtins__, 'bytes'):
15    bytes = str
16
17class Memory(LoggingMixIn, Operations):
18    'Example memory filesystem. Represents a filesystem in memory.'
19
20    def __init__(self):
21        self.files = {}
22        self.data = defaultdict(bytes)
23        self.fd = 0
24        now = time()
25        self.files['/'] = dict(st_mode=(S_IFDIR | 0o755), st_ctime=now,
26                               st_mtime=now, st_atime=now, st_nlink=2)
27
28    def chmod(self, path, mode):
29        self.files[path]['st_mode'] &= 0o770000
30        self.files[path]['st_mode'] |= mode
31        return 0
32
33    def chown(self, path, uid, gid):
34        self.files[path]['st_uid'] = uid
35        self.files[path]['st_gid'] = gid
36
37    def create(self, path, mode, fi=None):
38        self.files[path] = dict(st_mode=(S_IFREG | mode), st_nlink=1,
39                                st_size=0, st_ctime=time(), st_mtime=time(),
40                                st_atime=time())
41
42        self.fd += 1
43        return self.fd
44
45    def getattr(self, path, fh=None):
46        if path not in self.files:
47            raise FuseOSError(ENOENT)
48
49        return self.files[path]
50
51    def getxattr(self, path, name, position=0):
52        attrs = self.files[path].get('attrs', {})
53
54        try:
55            return attrs[name]
56        except KeyError:
57            return ''       # Should return ENOATTR
58
59    def listxattr(self, path):
60        attrs = self.files[path].get('attrs', {})
61        return attrs.keys()
62
63    def mkdir(self, path, mode):
64        self.files[path] = dict(st_mode=(S_IFDIR | mode), st_nlink=2,
65                                st_size=0, st_ctime=time(), st_mtime=time(),
66                                st_atime=time())
67
68        self.files['/']['st_nlink'] += 1
69
70    def open(self, path, flags):
71        self.fd += 1
72        return self.fd
73
74    def read(self, path, size, offset, fh):
75        return self.data[path][offset:offset + size]
76
77    def readdir(self, path, fh):
78        return ['.', '..'] + [target[1:] for target in self.files if target != '/']
79
80    def readlink(self, path):
81        return self.data[path]
82
83    def removexattr(self, path, name):
84        attrs = self.files[path].get('attrs', {})
85
86        try:
87            del attrs[name]
88        except KeyError:
89            pass        # Should return ENOATTR
90
91    def rename(self, old, new):
92        self.files[new] = self.files.pop(old)
93
94    def rmdir(self, path):
95        self.files.pop(path)
96        self.files['/']['st_nlink'] -= 1
97
98    def setxattr(self, path, name, value, options, position=0):
99        # Ignore options
100        attrs = self.files[path].setdefault('attrs', {})
101        attrs[name] = value
102
103    def statfs(self, path):
104        return dict(f_bsize=512, f_blocks=4096, f_bavail=2048)
105
106    def symlink(self, target, source):
107        self.files[target] = dict(st_mode=(S_IFLNK | 0o777), st_nlink=1,
108                                  st_size=len(source))
109
110        self.data[target] = source
111
112    def truncate(self, path, length, fh=None):
113        self.data[path] = self.data[path][:length]
114        self.files[path]['st_size'] = length
115
116    def unlink(self, path):
117        self.files.pop(path)
118
119    def utimens(self, path, times=None):
120        now = time()
121        atime, mtime = times if times else (now, now)
122        self.files[path]['st_atime'] = atime
123        self.files[path]['st_mtime'] = mtime
124
125    def write(self, path, data, offset, fh):
126        self.data[path] = self.data[path][:offset] + data
127        self.files[path]['st_size'] = len(self.data[path])
128        return len(data)
129
130
131if __name__ == '__main__':
132    import sys
133    if len(sys.argv) != 2:
134        print('usage: %s <mountpoint>' % sys.argv[0])
135        sys.exit(1)
136
137    logging.basicConfig(level=logging.DEBUG)
138    fuse = FUSE(Memory(), sys.argv[1], foreground=True)