Changeset 418:1c54f87016fb in raven-j
- Timestamp:
- 10/11/2011 05:35:34 PM (7 months ago)
- Branch:
- recap
- Location:
- heron_wsgi/admin_lib
- Files:
-
- 2 edited
-
medcenter.py (modified) (8 diffs)
-
sealing.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
heron_wsgi/admin_lib/medcenter.py
r375 r418 1 1 '''medcenter.py -- academic medical center directory/policy 2 2 3 >>> depgraph = injector.Injector([Mock()]) 4 >>> m = depgraph.get(MedCenter) 5 6 Look someone up in the enterprise directory:: 7 8 >>> a1 = m.affiliate('john.smith') 9 >>> a1 10 John Smith <john.smith@js.example> 11 >>> a1.title 3 # logging.basicConfig(level=logging.DEBUG) 4 >>> m, mock, mods, depgraph = Mock.make_stuff() 5 >>> app_secret = depgraph.get(KAppSecret) 6 7 Assuming CAS login asked us to issue a login capability:: 8 9 >>> box, req = mock.login_info('john.smith') 10 >>> login_caps = m.issue(box, req) 11 12 We can exercise an enterprise directory capability:: 13 14 >>> a1 = req.idvault_entry 15 >>> m.withId(a1, lambda a: '%s %s <%s>' % ( 16 ... a['givenname'], a['sn'], a['mail'])) 17 'John Smith <john.smith@js.example>' 18 >>> m.withId(a1, lambda a: a['title']) 12 19 'Chair of Department of Neurology' 20 21 Note: KUMC uses ou for department. @@test 13 22 14 23 We use an outboard service to check human subjects "chalk" training:: … … 19 28 url=http://localhost:8080/chalk-checker 20 29 21 >>> m.train edThru(a1)30 >>> m.training(a1) 22 31 '2012-01-01' 23 32 24 33 ''' 25 34 35 import logging 26 36 import os 27 37 import sys … … 30 40 31 41 import injector 32 from injector import inject, provides 42 from injector import inject, provides, singleton 33 43 34 44 import config 35 45 import ldaplib 46 import sealing 47 48 log = logging.getLogger(__name__) 36 49 37 50 KTrainingFunction = injector.Key('TrainingFunction') 51 KAppSecret = injector.Key('AppSecret') 38 52 39 53 CHALK_CONFIG_SECTION='chalk' 40 54 PERM_ID=__file__ + '.idvault' 55 56 @singleton 41 57 class MedCenter(object): 42 58 excluded_jobcode = "24600" 59 permissions=(PERM_ID,) 43 60 44 61 @inject(searchsvc=ldaplib.LDAPService, 45 trainingfn=KTrainingFunction) 46 def __init__(self, searchsvc, trainingfn): 62 trainingfn=KTrainingFunction, 63 app_secret=KAppSecret) 64 def __init__(self, searchsvc, trainingfn, app_secret): 65 log.debug('MedCenter.__init__ again?') 47 66 self._svc = searchsvc 48 67 self._training = trainingfn 68 self._app_secret = app_secret 69 self.sealer, self._unsealer = sealing.makeBrandPair('MedCenter') 49 70 50 71 def __repr__(self): 51 72 return "MedCenter(s, t)" 52 73 53 def affiliate(self, name):74 def _lookup(self, name): 54 75 matches = self._svc.search('(cn=%s)' % name, AccountHolder.attributes) 55 76 if len(matches) != 1: … … 59 80 raise ValueError, name # ambiguous 60 81 61 dn, attrs = matches[0] 62 63 return AccountHolder(extract_values(attrs)) 64 65 def trainedThru(self, who): 66 return self._training(who.userid()) 67 68 69 def checkFaculty(self, who): 70 if (who.kumcPersonJobcode != self.excluded_jobcode 71 and who.kumcPersonFaculty == 'Y'): 72 return 73 raise NotFaculty() 74 75 def affiliateSearch(self, max_qty, cn, sn, givenname): 82 dn, ldapattrs = matches[0] 83 return AccountHolder.rezip(ldapattrs) 84 85 def issue(self, loginbox, req): 86 u, s = self._unsealer.unseal(loginbox) 87 if s != self._app_secret: 88 log.warn('expected app_secret [%s] got [%s]', 89 self._app_secret, s) 90 return [] 91 cap = self.sealer.seal(self._lookup(u)) 92 req.idvault_entry = cap 93 return [cap] 94 95 def audit(self, cap, p): 96 log.info('MedCenter.audit(%s, %s)' % (cap, p)) 97 if not isinstance(cap, object): 98 raise TypeError 99 self._unsealer.unseal(cap) 100 101 def lookup(self, namebox): 102 name = self._unsealer.unseal(namebox) 103 return self.sealer.seal(self._lookup(name)) 104 105 def withId(self, attrsbox, thunk): 106 '''Exercise thunk(attrs) provided attrsbox unseals. 107 ''' 108 attrs = self._unsealer.unseal(attrsbox) 109 return thunk(attrs) 110 111 def withFaculty(self, attrsbox, thunk): 112 '''Exercise thunk(attrs) provided attrsbox unseals as faculty. 113 @raises: TypeError on unsealing failure 114 @raises: NotFaculty 115 ''' 116 attrs = self._unsealer.unseal(attrsbox) 117 log.debug('withFaculty: %s', attrs) 118 if (attrs['kumcPersonJobcode'] == self.excluded_jobcode 119 or attrs['kumcPersonFaculty'] != 'Y'): 120 raise NotFaculty 121 122 return thunk(attrs) 123 124 def training(self, attrbox): 125 return self._training(self._unsealer.unseal(attrbox)['cn']) 126 127 def search(self, max_qty, cn, sn, givenname): 76 128 clauses = ['(%s=%s*)' % (n, v) 77 for (n, v) in (('cn', cn), ('sn', sn), ('givenname', givenname)) 129 for (n, v) in (('cn', cn), 130 ('sn', sn), 131 ('givenname', givenname)) 78 132 if v] 79 133 if len(clauses) == 0: … … 86 140 87 141 results = self._svc.search(q, AccountHolder.attributes)[:max_qty] 88 return [AccountHolder (extract_values(attrs))89 for dn, attrs in results]90 91 92 def extract_values(attrs):142 return [AccountHolder.rezip(ldapattrs) 143 for dn, ldapattrs in results] 144 145 146 def _extract_values(attrs): 93 147 return [attrs.get(n, [None])[0] 94 148 for n in AccountHolder.attributes] … … 106 160 "kumcPersonFaculty", "kumcPersonJobcode"] 107 161 108 def __init__(self, values): 109 self._attrs = dict(zip(self.attributes, values)) 162 @classmethod 163 def rezip(cls, ldapattrs): 164 return dict(zip(cls.attributes, _extract_values(ldapattrs))) 165 166 def __init__(self, cap, attrs): 167 self.cap = cap 168 self._attrs = attrs 110 169 111 170 def __str__(self): … … 120 179 121 180 def __getattr__(self, n): 122 if n not in self.attributes:181 if n.startswith('_') or n not in self.attributes: 123 182 raise AttributeError 124 183 return self._attrs[n] … … 154 213 binder.bind(KTrainingFunction, 155 214 injector.InstanceProvider(d.trainedThru)) 156 157 @classmethod 158 def make(cls): 159 depgraph = injector.Injector(Mock()) 160 return depgraph.get(MedCenter) 215 self._app_secret = 'sekrit' 216 binder.bind(KAppSecret, 217 injector.InstanceProvider(self._app_secret)) 218 219 def set_medcenter(self, mc): 220 self._mc = mc 221 222 def login_info(self, cn): 223 box = self._mc.sealer.seal((cn, self._app_secret)) 224 req = MockRequest() 225 return box, req 226 227 @classmethod 228 def mods(cls): 229 return [Mock()] 230 231 @classmethod 232 def make_stuff(cls): 233 mods = cls.mods() 234 mod = mods[-1] 235 depgraph = injector.Injector(mods) 236 mc = depgraph.get(MedCenter) 237 mod.set_medcenter(mc) 238 return mc, mod, mods, depgraph 239 240 241 class MockRequest(dict): 242 def __init__(self, *args, **kwargs): 243 dict.__init__(self, *args, **kwargs) 244 self.__dict__ = self 161 245 162 246 class IntegrationTest(injector.Module): -
heron_wsgi/admin_lib/sealing.py
r417 r418 50 50 Traceback (most recent call last): 51 51 ... 52 UnsealingException52 TypeError 53 53 54 54 ''' … … 76 76 shared[0] = noObject 77 77 box.shareContent() 78 if (shared[0] is noObject): raise UnsealingException78 if (shared[0] is noObject): raise TypeError 79 79 contents = shared[0] 80 80 shared[0] = noObject … … 87 87 return (sealer, unsealer) 88 88 89 90 class UnsealingException(Exception):91 pass92 89 93 90 class EDef:
Note: See TracChangeset
for help on using the changeset viewer.
