2626import ldap
2727from cachetools .func import lru_cache
2828from flask import current_app
29- from sqlalchemy .orm .exc import NoResultFound
30- from werkzeug .local import LocalProxy
31-
29+ from flask_login import current_user
30+ from invenio_access .permissions import Permission , superuser_access
3231from invenio_accounts .models import Role , User
3332from invenio_oauthclient .models import RemoteAccount
33+ from sqlalchemy .orm .exc import NoResultFound
34+ from werkzeug .local import LocalProxy
3435
3536from .errors import DoesNotExistInLDAP
3637
3738_datastore = LocalProxy (lambda : current_app .extensions ['security' ].datastore )
3839
3940
41+ def is_super_user ():
42+ return Permission (superuser_access ).can ()
43+
44+
45+ def get_admin_roles_for_user (latest = True ):
46+ """Return list of schemas user can admin."""
47+ from cap .modules .schemas .imp import (
48+ get_cached_indexed_schemas_for_user_admin ,
49+ )
50+
51+ roles = []
52+ schemas = get_cached_indexed_schemas_for_user_admin (
53+ latest = latest , user_id = current_user .id
54+ )
55+
56+ for schema in schemas :
57+ roles .append (f"{ schema .name } " )
58+
59+ return roles
60+
61+
62+ USER_UI_ROLES = {
63+ 'superuser' : is_super_user ,
64+ 'schema-admin' : get_admin_roles_for_user ,
65+ }
66+
67+
68+ def get_user_ui_roles ():
69+ roles = []
70+ for method_name , method in USER_UI_ROLES .items ():
71+ result = method ()
72+ if isinstance (result , bool ) and result :
73+ roles .append (method_name )
74+ elif isinstance (result , list ):
75+ for role in result :
76+ roles .append (f"{ method_name } :{ role } " )
77+ return roles
78+
79+
4080def _query_ldap (base , query , fields ):
4181 lc = ldap .initialize ('ldap://xldap.cern.ch' )
42- lc .search_ext (base ,
43- ldap . SCOPE_ONELEVEL ,
44- query ,
45- fields ,
46- serverctrls = [
47- ldap . controls . SimplePagedResultsControl ( True ,
48- size = 7 ,
49- cookie = '' )
50- ] )
82+ lc .search_ext (
83+ base ,
84+ ldap . SCOPE_ONELEVEL ,
85+ query ,
86+ fields ,
87+ serverctrls = [
88+ ldap . controls . SimplePagedResultsControl ( True , size = 7 , cookie = '' )
89+ ],
90+ )
5191
5292 return lc .result ()[1 ]
5393
@@ -60,16 +100,20 @@ def get_user_mail_from_ldap(display_name):
60100 res = _query_ldap (
61101 base = 'OU=Users,OU=Organic Units,DC=cern,DC=ch' ,
62102 query = '(&(cernAccountType=Primary)(displayName={}))' .format (
63- display_name ),
64- fields = ['mail' ])
103+ display_name
104+ ),
105+ fields = ['mail' ],
106+ )
65107
66108 if not res :
67109 parts = display_name .split (' ' )
68110 res = _query_ldap (
69111 base = 'OU=Users,OU=Organic Units,DC=cern,DC=ch' ,
70112 query = '(&(cernAccountType=Primary)(givenName={}*)(sn=*{}))' .format (
71- parts [0 ], parts [- 1 ]),
72- fields = ['mail' ])
113+ parts [0 ], parts [- 1 ]
114+ ),
115+ fields = ['mail' ],
116+ )
73117
74118 if len (res ) != 1 :
75119 raise DoesNotExistInLDAP
@@ -85,16 +129,19 @@ def does_user_exist_in_ldap(mail):
85129 res = _query_ldap (
86130 base = 'OU=Users,OU=Organic Units,DC=cern,DC=ch' ,
87131 query = '(&(cernAccountType=Primary)(mail={}))' .format (mail ),
88- fields = ['mail' ])
132+ fields = ['mail' ],
133+ )
89134
90135 return True if res else False
91136
92137
93138def does_egroup_exist_in_ldap (mail ):
94139 """Query ldap to check if user exists."""
95- res = _query_ldap (base = 'OU=e-groups,OU=Workgroups,DC=cern,DC=ch' ,
96- query = 'mail={}' .format (mail ),
97- fields = ['mail' ])
140+ res = _query_ldap (
141+ base = 'OU=e-groups,OU=Workgroups,DC=cern,DC=ch' ,
142+ query = 'mail={}' .format (mail ),
143+ fields = ['mail' ],
144+ )
98145
99146 return True if res else False
100147
0 commit comments