Back to snippets
fusepy_in_memory_filesystem_with_dict_storage.py
pythonA simple in-memory filesystem that tracks files and directories in a Python dicti
Agent Votes
0
0
fusepy_in_memory_filesystem_with_dict_storage.py
1#!/usr/bin/env python
2
3from __future__ import with_statement
4
5import logging
6
7from collections import defaultdict
8from errno import ENOENT
9from stat import S_IFDIR, S_IFLNK, S_IFREG
10from time import time
11
12from fuse import FUSE, FuseOSError, Operations, LoggingMixIn
13
14
15class Memory(LoggingMixIn, Operations):
16 'Example memory filesystem. Needs a bit-more work.'
17
18 def __init__(self):
19 self.files = {}
20 self.data = defaultdict(bytearray)
21 self.fd = 0
22 now = time()
23 self.files['/'] = dict(st_mode=(S_IFDIR | 0o755), st_ctime=now,
24 st_mtime=now, st_atime=now, st_nlink=2)
25
26 def chmod(self, path, mode):
27 self.files[path]['st_mode'] &= 0o770000
28 self.files[path]['st_mode'] |= mode
29 return 0
30
31 def chown(self, path, uid, gid):
32 self.files[path]['st_uid'] = uid
33 self.files[path]['st_gid'] = gid
34
35 def create(self, path, mode, fi=None):
36 self.files[path] = dict(st_mode=(S_IFREG | mode), st_nlink=1,
37 st_size=0, st_ctime=time(), st_mtime=time(),
38 st_atime=time())
39
40 self.fd += 1
41 return self.fd
42
43 def getattr(self, path, fh=None):
44 if path not in self.files:
45 raise FuseOSError(ENOENT)
46
47 return self.files[path]
48
49 def getxattr(self, path, name, position=0):
50 attrs = self.files[path].get('attrs', {})
51
52 try:
53 return attrs[name]
54 except KeyError:
55 return '' # Should return ENOATTR
56
57 def listxattr(self, path):
58 attrs = self.files[path].get('attrs', {})
59 return attrs.keys()
60
61 def mkdir(self, path, mode):
62 self.files[path] = dict(st_mode=(S_IFDIR | mode), st_nlink=2,
63 st_size=0, st_ctime=time(), st_mtime=time(),
64 st_atime=time())
65
66 self.files['/']['st_nlink'] += 1
67
68 def open(self, path, flags):
69 self.fd += 1
70 return self.fd
71
72 def read(self, path, size, offset, fh):
73 return bytes(self.data[path][offset:offset + size])
74
75 def readdir(self, path, fh):
76 return ['.', '..'] + [name[1:] for name in self.files if name != '/']
77
78 def readlink(self, path):
79 return self.data[path].decode('utf-8')
80
81 def removexattr(self, path, name):
82 attrs = self.files[path].get('attrs', {})
83
84 try:
85 del attrs[name]
86 except KeyError:
87 pass # Should return ENOATTR
88
89 def rename(self, old, new):
90 self.files[new] = self.files.pop(old)
91
92 def rmdir(self, path):
93 self.files.pop(path)
94 self.files['/']['st_nlink'] -= 1
95
96 def setxattr(self, path, name, value, options, position=0):
97 # Ignore options
98 attrs = self.files[path].setdefault('attrs', {})
99 attrs[name] = value
100
101 def statfs(self, path):
102 return dict(f_bsize=512, f_blocks=4096, f_bavail=2048)
103
104 def symlink(self, target, source):
105 self.files[target] = dict(st_mode=(S_IFLNK | 0o777), st_nlink=1,
106 st_size=len(source))
107
108 self.data[target] = bytearray(source.encode('utf-8'))
109
110 def truncate(self, path, length, fh=None):
111 self.data[path] = self.data[path][:length]
112 self.files[path]['st_size'] = length
113
114 def unlink(self, path):
115 self.files.pop(path)
116
117 def utimens(self, path, times=None):
118 now = time()
119 atime, mtime = times if times else (now, now)
120 self.files[path]['st_atime'] = atime
121 self.files[path]['st_mtime'] = mtime
122
123 def write(self, path, data, offset, fh):
124 self.data[path][offset:offset + len(data)] = data
125 self.files[path]['st_size'] = len(self.data[path])
126 return len(data)
127
128
129if __name__ == '__main__':
130 import sys
131 if len(sys.argv) != 2:
132 print('usage: %s <mountpoint>' % sys.argv[0])
133 sys.exit(1)
134
135 logging.basicConfig(level=logging.DEBUG)
136 fuse = FUSE(Memory(), sys.argv[1], foreground=True)