Note: We no longer publish the latest version of our code here. We primarily use a kumc-bmi github organization. The heron ETL repository, in particular, is not public. Peers in the informatics community should see MultiSiteDev for details on requesting access.

source: heron_load/ocap_file.py @ 0:42ad7288920a

heron-michigan tip
Last change on this file since 0:42ad7288920a was 0:42ad7288920a, checked in by Matt Hoag <mhoag@…>, 6 years ago

Merge with demo_concepts_3800

File size: 11.2 KB
Line 
1'''ocap_file -- least-privilege interaction with the filesystem, web
2
3Inspired by:
4
5  The Sash file object is quite similar to (though different from) the
6  E file object, which has proven in practice to supply simple,
7  intuitive, pola-disciplined interaction with the file system::
8
9    type readable = {
10         isDir : unit -> bool;
11         exists : unit -> bool;
12         subRdFiles : unit -> readable list;
13         subRdFile : string -> readable;
14         inChannel : unit -> in_channel;
15         getBytes : unit -> string;
16         fullPath : unit -> string;
17    }
18
19
20 * `How Emily Tamed the Caml`__
21   Stiegler, Marc; Miller, Mark
22   HPL-2006-116
23
24__ http://www.hpl.hp.com/techreports/2006/HPL-2006-116.html
25
26'''
27
28from urlparse import urljoin
29
30
31class ESuite(object):
32    def __repr__(self):
33        return '%s(...)' % self.__class__.__name__
34
35    @classmethod
36    def lift_doc(cls, suite):
37        for n, f in suite.items():
38            setattr(cls, n, f)
39
40    @classmethod
41    def make(cls, *args, **kwargs):
42        suite = dict(dict([(f.__name__, f) for f in args]),
43                     **kwargs)
44        suite_ld = dict(suite, lift_doc=lambda _: cls.lift_doc(suite))
45        return type(cls.__name__, (ESuite, object), suite_ld)()
46
47
48class Readable(ESuite):
49    '''Wrap the python file API in the Emily/E least-authority API.
50
51    os.path.join might not seem to need any authority,
52    but its output depends on platform, so it's not a pure function.
53
54    >>> import os
55    >>> Readable('.', os.path, os.listdir, open).isDir()
56    True
57
58    >>> x = Readable('/x', os.path, os.listdir, open)
59    >>> (x / 'y').fullPath()
60    '/x/y'
61    '''
62
63    def __new__(cls, path, os_path, os_listdir, openf):
64        def isDir(_):
65            return os_path.isdir(path)
66
67        def exists(_):
68            return os_path.exists(path)
69
70        def subRdFiles(_):
71            return (subRdFile(n)
72                    for n in os_listdir(path))
73
74        def subRdFile(_, n):
75            there = os_path.join(path, n)
76            if not there.startswith(path):
77                raise LookupError('Path does not lead to a subordinate.')
78
79            return Readable(there, os_path, os_listdir, openf)
80
81        def inChannel(_):
82            return openf(path)
83
84        def getBytes(_):
85            return openf(path).read()
86
87        def fullPath(_):
88            return os_path.abspath(path)
89
90        return cls.make(isDir, exists, subRdFiles, subRdFile, inChannel,
91                        getBytes, fullPath,
92                        __div__=subRdFile,
93                        __trueDiv=subRdFile)
94
95
96class ListReadable(ESuite):
97    '''Simulate a readable directory using a list of pathnames.
98    '''
99
100    def __new__(cls, paths, os_path, os_listdir, openf):
101        def isDir(_):
102            return True
103
104        def exists(_):
105            return True
106
107        def subRdFiles(self):
108            return (subRdFile(self, n)
109                    for n in paths)
110
111        def subRdFile(self, n):
112            if n not in paths:
113                raise IOError('not an authorized pathname: %s' % n)
114            return Readable(n, os_path, os_listdir, openf)
115
116        def inChannel(_):
117            raise IOError('cannot read directory')
118
119        def getBytes(_):
120            raise IOError('cannot read directory')
121
122        def fullPath(_):
123            return os_path.abspath(os_path.curdir)
124
125        return cls.make(isDir, exists, subRdFiles, subRdFile, inChannel,
126                        getBytes, fullPath,
127                        __div__=subRdFile,
128                        __trueDiv=subRdFile)
129
130
131def WebReadable(base, urlopener, RequestClass):
132    '''Read-only wrapping of urllib2 in the Emily/E least-authority API.
133
134    :param base: base URL
135    :param urlopener: as from `urllib2.build_opener()`
136    :param RequestClass: e.g. `urllib2.Request`
137
138    >>> urlopener = _MockMostPagesOKButSome404('Z')
139    >>> from urllib2 import Request
140    >>> rdweb = WebReadable('http://example/stuff/', urlopener, Request)
141
142    A refinement could fetch content, parse links,
143    and enumerate those that point "downward", but
144    this implementation doesn't supply directory functionality::
145
146    >>> rdweb.isDir()
147    False
148    >>> len(rdweb.subRdFiles())
149    0
150
151    Check whether a HEAD request gives a 2xx response::
152    >>> rdweb.exists()
153    True
154    >>> s = rdweb.subRdFile('Z')
155    >>> s.fullPath()
156    'http://example/stuff/Z'
157    >>> s.exists()
158    False
159
160    Get a reader for the content or just the content::
161    >>> rdweb.inChannel().read(4)
162    'page'
163    >>> rdweb.getBytes()[:4]
164    'page'
165
166    No authority is granted to URLs that don't start with `base`::
167    >>> rdweb.subRdFile('x/../../y')
168    Traceback (most recent call last):
169       ...
170    LookupError: Path does not lead to a subordinate.
171
172    Hence traversing from `/stuff/Z` to `/stuff/x` is not allowed::
173    >>> s.subRdFile('x')
174    Traceback (most recent call last):
175       ...
176    LookupError: Path does not lead to a subordinate.
177
178    .. todo:: consider taking a hint/name parameter for printing.
179    '''
180    def __repr__():
181        return 'WebReadable(...)'
182
183    def isDir():
184        return False
185
186    def exists():
187        class HeadRequest(RequestClass):
188            '''
189            ack: doshea Jan 15 2010
190            How do you send a HEAD HTTP request in Python?
191            http://stackoverflow.com/questions/107405/
192            '''
193            def get_method(self):
194                return "HEAD"
195
196        try:
197            urlopener.open(HeadRequest(base))
198            return True
199        except IOError:
200            return False
201
202    def subRdFiles():
203        return ()
204
205    def subRdFile(path):
206        there = urljoin(base, path)
207        if not there.startswith(base):
208            raise LookupError('Path does not lead to a subordinate.')
209        return WebReadable(there, urlopener, RequestClass)
210
211    def inChannel():
212        '''
213        .. todo:: wrap result of open() for strict confinement.
214        '''
215        return urlopener.open(base)
216
217    def getBytes():
218        return inChannel().read()
219
220    def fullPath():
221        return base
222
223    return edef(__repr__,
224                isDir, exists, subRdFiles, subRdFile, inChannel,
225                getBytes, fullPath)
226
227
228def WebPostable(base, urlopener, RequestClass):
229    '''Extend WebReadable with POST support.
230
231    >>> urlopener = _MockMostPagesOKButSome404('Z')
232    >>> from urllib2 import Request
233    >>> doweb = WebPostable('http://example/stuff/', urlopener, Request)
234
235    >>> doweb.post('stuff').read()
236    'you posted: stuff'
237
238    All the `ReadableWeb` methods work::
239
240    >>> doweb.subRdFile('rd').fullPath()
241    'http://example/stuff/rd'
242    '''
243    delegate = WebReadable(base, urlopener, RequestClass)
244
245    def __repr__():
246        return 'WebPostable(...)'
247
248    def post(content):
249        return urlopener.open(base, content)
250
251    return edef(__repr__, post, delegate=delegate)
252
253
254class _MockMostPagesOKButSome404(object):
255    '''Raise 404 for pages containing given strings; otherwise succeed.
256    '''
257    def __init__(self, bad):
258        self.bad = bad
259
260    def open(self, request_or_address, content=None):
261        from StringIO import StringIO
262
263        try:
264            address = request_or_address.get_full_url()
265        except AttributeError:
266            address = request_or_address
267
268        if [txt for txt in self.bad if txt in address]:
269            raise IOError('404...')
270
271        if content:
272            return StringIO('you posted: ' + content)
273
274        return StringIO('page content...')
275
276
277class Editable(ESuite):
278    '''
279    >>> import os
280    >>> x = Editable('/x', os, open)
281    >>> (x / 'y').ro().fullPath()
282    '/x/y'
283
284    '''
285    def __new__(cls, path, os, openf):
286        def _openrd(p):
287            return openf(p, 'r')
288        _ro = Readable(path, os.path, os.listdir, _openrd)
289
290        def ro(_):
291            return _ro
292
293        def subEdFiles(_):
294            return (subEdFile(n)
295                    for n in os.listdir(path))
296
297        def subEdFile(_, n):
298            there = os.path.join(path, n)
299            if not there.startswith(path):
300                raise LookupError('Path does not lead to a subordinate.')
301
302            return Editable(there, os, openf)
303
304        def outChannel(_):
305            return openf(path, 'w')
306
307        def setBytes(_, b):
308            outChannel.write(b)
309
310        def mkDir(_):
311            os.mkdir(path)
312
313        def createNewFile(_):
314            setBytes('')
315
316        def delete(_):
317            os.remove(path)
318
319        return cls.make(ro, subEdFiles, subEdFile, outChannel,
320                        setBytes, mkDir, createNewFile, delete,
321                        __div__=subEdFile,
322                        __trueDiv=subEdFile)
323
324
325class ListEditable(ESuite):
326    '''a la ListReadable
327    '''
328    def __new__(cls, paths, os, openf):
329        def _openrd(p):
330            return openf(p, 'r')
331        _ro = ListReadable(paths, os.path, os.listdir, _openrd)
332
333        def ro(_):
334            return _ro
335
336        def subEdFiles(_):
337            return (subEdFile(n)
338                    for n in paths)
339
340        def subEdFile(_, n):
341            if n not in paths:
342                raise IOError('not an authorized pathname: %s' % n)
343            return Editable(n, os, openf)
344
345        def outChannel(_):
346            raise IOError('cannot write directory')
347
348        def setBytes(_, b):
349            raise IOError('cannot write directory')
350
351        def mkDir(_):
352            raise IOError('cannot make list directory')
353
354        def createNewFile(_):
355            setBytes('')
356
357        def delete(_):
358            raise IOError('cannot delete list directory')
359
360        return cls.make(ro, subEdFiles, subEdFile, outChannel,
361                        setBytes, mkDir, createNewFile, delete,
362                        __div__=subEdFile,
363                        __trueDiv=subEdFile)
364
365
366def walk_ed(top):
367    '''ocap analog to os.walk for editables
368    '''
369    for x in _walk(top, lambda ed: ed.subEdFiles()):
370        yield x
371
372
373def walk_rd(top):
374    '''ocap analog to os.walk
375    '''
376    for x in _walk(top, lambda ed: ed.subRdFiles()):
377        yield x
378
379
380def _walk(top, sub_files):
381    '''ocap analog to os.walk
382    '''
383    subs = [(sub, sub.ro().isDir())
384            for sub in sub_files(top)]
385    dirs = [s for (s, d) in subs if d]
386    nondirs = [s for (s, d) in subs if not d]
387
388    yield top, dirs, nondirs
389
390    for subd in dirs:
391        for x in _walk(subd, sub_files):
392            yield x
393
394
395def relName(ed, anc):
396    '''Get the name of an Editable relative to an ancestor.
397    '''
398    apath = anc.ro().fullPath()
399    epath = ed.ro().fullPath()
400    assert(epath.startswith(apath))
401    return epath[len(apath) + 1:]
402
403
404def relName_rd(rd, anc):
405    '''Get the name of a Readable relative to an ancestor.
406    '''
407    apath = anc.fullPath()
408    path = rd.fullPath()
409    assert(path.startswith(apath))
410    return path[len(apath) + 1:]
411
412
413def edef(*methods, **kwargs):
414    '''Imitate E method suite definition.
415
416    .. todo:: factor out overlap with `sealing.EDef`
417    .. todo:: consider using a metaclass instead
418    ref http://stackoverflow.com/questions/100003/what-is-a-metaclass-in-python
419    '''
420    lookup = dict([(f.__name__, f) for f in methods])
421    delegate = kwargs.get('delegate', None)
422
423    class EObj(object):
424        def __getattr__(self, n):
425            if n in lookup:
426                return lookup[n]
427            if delegate is not None:
428                return getattr(delegate, n)
429            raise AttributeError(n)
430
431    return EObj()
Note: See TracBrowser for help on using the repository browser.