Back to snippets
fusepy_in_memory_filesystem_with_dict_storage.py
pythonA simple in-memory filesystem that tracks files and directories in a Python dicti
Agent Votes
0
0
fusepy_in_memory_filesystem_with_dict_storage.py
1#!/usr/bin/env python
2
3from __future__ import with_statement
4
5import logging
6
7from collections import defaultdict
8from errno import ENOENT
9from stat import S_IFDIR, S_IFLNK, S_IFREG
10from time import time
11
12from fuse import FUSE, FuseOSError, Operations, LoggingMixIn
13
14if not hasattr(__builtins__, 'bytes'):
15 bytes = str
16
17class Memory(LoggingMixIn, Operations):
18 'Example memory filesystem. Represents a filesystem in memory.'
19
20 def __init__(self):
21 self.files = {}
22 self.data = defaultdict(bytes)
23 self.fd = 0
24 now = time()
25 self.files['/'] = dict(st_mode=(S_IFDIR | 0o755), st_ctime=now,
26 st_mtime=now, st_atime=now, st_nlink=2)
27
28 def chmod(self, path, mode):
29 self.files[path]['st_mode'] &= 0o770000
30 self.files[path]['st_mode'] |= mode
31 return 0
32
33 def chown(self, path, uid, gid):
34 self.files[path]['st_uid'] = uid
35 self.files[path]['st_gid'] = gid
36
37 def create(self, path, mode, fi=None):
38 self.files[path] = dict(st_mode=(S_IFREG | mode), st_nlink=1,
39 st_size=0, st_ctime=time(), st_mtime=time(),
40 st_atime=time())
41
42 self.fd += 1
43 return self.fd
44
45 def getattr(self, path, fh=None):
46 if path not in self.files:
47 raise FuseOSError(ENOENT)
48
49 return self.files[path]
50
51 def getxattr(self, path, name, position=0):
52 attrs = self.files[path].get('attrs', {})
53
54 try:
55 return attrs[name]
56 except KeyError:
57 return '' # Should return ENOATTR
58
59 def listxattr(self, path):
60 attrs = self.files[path].get('attrs', {})
61 return attrs.keys()
62
63 def mkdir(self, path, mode):
64 self.files[path] = dict(st_mode=(S_IFDIR | mode), st_nlink=2,
65 st_size=0, st_ctime=time(), st_mtime=time(),
66 st_atime=time())
67
68 self.files['/']['st_nlink'] += 1
69
70 def open(self, path, flags):
71 self.fd += 1
72 return self.fd
73
74 def read(self, path, size, offset, fh):
75 return self.data[path][offset:offset + size]
76
77 def readdir(self, path, fh):
78 return ['.', '..'] + [target[1:] for target in self.files if target != '/']
79
80 def readlink(self, path):
81 return self.data[path]
82
83 def removexattr(self, path, name):
84 attrs = self.files[path].get('attrs', {})
85
86 try:
87 del attrs[name]
88 except KeyError:
89 pass # Should return ENOATTR
90
91 def rename(self, old, new):
92 self.files[new] = self.files.pop(old)
93
94 def rmdir(self, path):
95 self.files.pop(path)
96 self.files['/']['st_nlink'] -= 1
97
98 def setxattr(self, path, name, value, options, position=0):
99 # Ignore options
100 attrs = self.files[path].setdefault('attrs', {})
101 attrs[name] = value
102
103 def statfs(self, path):
104 return dict(f_bsize=512, f_blocks=4096, f_bavail=2048)
105
106 def symlink(self, target, source):
107 self.files[target] = dict(st_mode=(S_IFLNK | 0o777), st_nlink=1,
108 st_size=len(source))
109
110 self.data[target] = source
111
112 def truncate(self, path, length, fh=None):
113 self.data[path] = self.data[path][:length]
114 self.files[path]['st_size'] = length
115
116 def unlink(self, path):
117 self.files.pop(path)
118
119 def utimens(self, path, times=None):
120 now = time()
121 atime, mtime = times if times else (now, now)
122 self.files[path]['st_atime'] = atime
123 self.files[path]['st_mtime'] = mtime
124
125 def write(self, path, data, offset, fh):
126 self.data[path] = self.data[path][:offset] + data
127 self.files[path]['st_size'] = len(self.data[path])
128 return len(data)
129
130
131if __name__ == '__main__':
132 import sys
133 if len(sys.argv) != 2:
134 print('usage: %s <mountpoint>' % sys.argv[0])
135 sys.exit(1)
136
137 logging.basicConfig(level=logging.DEBUG)
138 fuse = FUSE(Memory(), sys.argv[1], foreground=True)