diff --git a/.vscode/launch.json b/.vscode/launch.json index 6426605..5082ee9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,7 +4,6 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ - { "name": "abcdesktop.io", "type": "debugpy", diff --git a/Dockerfile.alpine b/Dockerfile.alpine index 75a835b..90ccf84 100644 --- a/Dockerfile.alpine +++ b/Dockerfile.alpine @@ -14,8 +14,9 @@ RUN apk update --no-cache && \ krb5 \ krb5-libs \ libgsasl \ - "libssl3>3.5.4" \ - "libcrypto3>3.5.4" + bash + +SHELL ["/bin/bash", "-c"] # gss-ntlmssp # krb5-user @@ -40,9 +41,9 @@ RUN apk add --no-cache \ musl-dev \ py3-ldap \ py3-ldap-pyc \ - py3-python-gssapi-pyc \ - py3-python-gssapi \ - krb5-dev \ + py3-python-gssapi-pyc \ + py3-python-gssapi \ + krb5-dev \ geoip-dev \ openldap-dev @@ -67,16 +68,24 @@ COPY . . RUN pip install --upgrade pip && \ pip install --no-cache-dir -r requirements.txt + +# get ASNNumber database +# IPASN data files can be created by downloading MRT/RIB BGP archives from Routeviews (or similar sources), +# and parsing them using provided scripts that tail the BGP AS-Path. +RUN pyasn_util_download.py --latestv46 --filename rib.bz2 && \ + pyasn_util_convert.py --single rib.bz2 ipasn_db.dat && \ + rm -f rib.bz2 + # remove dev package # should better use a builder instance # but I can't find how to grab all new libraries RUN apk del --no-cache \ krb5-dev \ - gcc \ - musl-dev \ - krb5-dev \ - geoip-dev \ - openldap-dev + gcc \ + musl-dev \ + krb5-dev \ + geoip-dev \ + openldap-dev # create log directory RUN mkdir -p /var/pyos/logs diff --git a/Dockerfile.ubuntu b/Dockerfile.ubuntu index e2530c2..1d5e7f1 100644 --- a/Dockerfile.ubuntu +++ b/Dockerfile.ubuntu @@ -8,9 +8,9 @@ RUN apt-get update && apt-get upgrade -y && apt-get clean && rm -rf /var/lib/ap # install python RUN apt-get update && apt-get install -y --no-install-recommends \ - python3 \ + python3 \ python3-pip \ - python3-virtualenv \ + python3-virtualenv \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* @@ -18,15 +18,15 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \ wget \ gcc \ - make \ - python3-dev \ + make \ + python3-dev \ libffi-dev \ libkrb5-dev \ - libsasl2-dev \ - libsasl2-dev \ - libldap2-dev \ + libsasl2-dev \ + libsasl2-dev \ + libldap2-dev \ libgeoip-dev \ - libssl-dev \ + libssl-dev \ rustc \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* @@ -81,6 +81,13 @@ RUN virtualenv /var/pyos && \ source /var/pyos/bin/activate && \ pip install --no-cache-dir -r requirements.txt +# get ASNNumber database +# IPASN data files can be created by downloading MRT/RIB BGP archives from Routeviews (or similar sources), +# and parsing them using provided scripts that tail the BGP AS-Path. +RUN source /var/pyos/bin/activate && \ + python3 bin/pyasn_util_download.py --latestv46 --filename rib.bz2 && \ + python3 bin/pyasn_util_convert.py --single rib.bz2 ipasn_db.dat && \ + rm -f rib.bz2 # remove dev lib RUN apt-get remove -y \ diff --git a/controllers/auth_controller.py b/controllers/auth_controller.py index 38bb650..4041226 100755 --- a/controllers/auth_controller.py +++ b/controllers/auth_controller.py @@ -359,9 +359,8 @@ def buildsecret(self): password = args.get('password') if not isinstance(password, str): raise cherrypy.HTTPError(400, 'Bad request invalid password parameter') - - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() # build a login dict arg object with provider set to AD args_login = { @@ -675,12 +674,12 @@ def login(self): # get params from json request args = cherrypy.request.json # can raise exception - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() # push a start message to database cache info services.messageinfo.start( user.userid, "b.Launching desktop") # launch the user desktop - return self.root.composer._launchdesktop( auth, user, args) + return self.root.composer._launchdesktop( auth, user, roles, args) @cherrypy.expose @@ -691,9 +690,9 @@ def refreshtoken(self): # no params from json request # args = cherrypy.request.json # can raise exception - (auth, user) = self.validate_env() + (auth, user, roles) = self.validate_env() # update token - jwt_user_token = services.auth.update_token( auth=auth, user=user, roles=None ) + jwt_user_token = services.auth.update_token( auth=auth, user=user, roles=roles ) # add no-cache nosniff HTTP headers cherrypy.response.headers[ 'Cache-Control'] = 'no-cache, private' # disable content or MIME sniffing which is used to override response Content-Type headers diff --git a/controllers/composer_controller.py b/controllers/composer_controller.py index bc701af..558b299 100755 --- a/controllers/composer_controller.py +++ b/controllers/composer_controller.py @@ -25,6 +25,8 @@ from oc.cherrypy import Results from oc.od.base_controller import BaseController +from oc.auth.authservice import AuthInfo, AuthUser, AuthRoles # to read AuthInfo, AuthUser, AuthRoles + logger = logging.getLogger(__name__) @@ -46,7 +48,7 @@ def __init__(self, config_controller=None): @cherrypy.tools.json_out() def ocrun(self): self.logger.debug('') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() args = cherrypy.request.json if not isinstance(args, dict): raise cherrypy.HTTPError( status=400, message='invalid parameters') @@ -64,7 +66,7 @@ def ocrun(self): raise cherrypy.HTTPError( status=400, message='ocrun error') return Results.success(result=result) - def LocaleSettingsLanguage( self, user ): + def LocaleSettingsLanguage( self, user:dict ): # add current locale from http Accept-Language to AuthUser locale = oc.i18n.detectLocale(cherrypy.request.headers.get('Accept-Language'), oc.od.settings.supportedLocales) user['locale'] = locale @@ -76,19 +78,19 @@ def launchdesktop(self): # increase timeout when creating the first user pod cherrypy.response.timeout = 300 self.logger.debug('launchdesktop:validate_env') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() # add lang to user dict self.logger.debug('launchdesktop:LocaleSettingsLanguage') self.LocaleSettingsLanguage( user ) self.logger.debug('launchdesktop:_launchdesktop') - result = self._launchdesktop(auth, user, cherrypy.request.json) + result = self._launchdesktop(auth, user, roles, cherrypy.request.json) return result @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() def list_applications_by_phase(self): - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() args = cherrypy.request.json if type(args) is not dict: return cherrypy.HTTPError( status=400, message='invalid args parameters') @@ -103,7 +105,7 @@ def list_applications_by_phase(self): @cherrypy.tools.json_in() def getlogs(self): self.logger.debug('') - (auth, user ) = self.validate_env() + (auth, user, roles ) = self.validate_env() logs = oc.od.composer.logdesktop(auth, user) return Results.success(result=logs) @@ -112,7 +114,7 @@ def getlogs(self): @cherrypy.tools.json_out() def stopcontainer(self): self.logger.debug('') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() args = cherrypy.request.json if type(args) is not dict: return cherrypy.HTTPError( status=400, message='invalid args parameters') @@ -134,7 +136,7 @@ def stopcontainer(self): @cherrypy.tools.json_out() def logcontainer(self): self.logger.debug('') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() args = cherrypy.request.json if not isinstance( args, dict): return cherrypy.HTTPError( status=400, message='invalid parameters') @@ -158,7 +160,7 @@ def logcontainer(self): @cherrypy.tools.json_out() def envcontainer(self): self.logger.debug('') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() args = cherrypy.request.json if not isinstance( args, dict): raise cherrypy.HTTPError( status=400, message='invalid parameters' ) @@ -181,7 +183,7 @@ def envcontainer(self): @cherrypy.tools.json_out() def removecontainer(self): self.logger.debug('') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() args = cherrypy.request.json if not isinstance( args, dict): return cherrypy.HTTPError( status=400, message='invalid parameters' ) @@ -208,7 +210,7 @@ def removecontainer(self): @cherrypy.tools.json_out() def listcontainer(self): self.logger.debug('') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() result = oc.od.composer.listContainerApps(auth, user) return Results.success(result=result) @@ -217,7 +219,7 @@ def listcontainer(self): @cherrypy.tools.json_in() def refreshdesktoptoken(self): self.logger.debug('') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() desktop = oc.od.composer.finddesktop(authinfo=auth, userinfo=user) # check desktop object @@ -256,7 +258,7 @@ def getdesktopdescription(self): # check if request is allowed, raise an exception if deny self.is_permit_request() # check if user is authenticated and identified, raise an exception if not - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() result = oc.od.composer.getdesktopdescription(auth, user) if not isinstance( result, dict ): raise cherrypy.HTTPError( status=400, message='failed to getdesktopdescription') @@ -269,7 +271,7 @@ def getdesktopdescription(self): def getuserapplist(self): self.logger.debug('') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() userappdict = {} # list all applications allowed for this user (auth) appdict = services.apps.user_appdict( auth, filtered_public_attr_list=True) @@ -285,8 +287,9 @@ def getuserapplist(self): userapplist = list( userappdict.values() ) # return succes data return Results.success(result=userapplist) + - def _launchdesktop(self, auth, user, args): + def _launchdesktop(self, auth:AuthInfo, user:AuthUser, roles:AuthRoles, args:dict): self.logger.debug('') # @@ -297,12 +300,7 @@ def _launchdesktop(self, auth, user, args): # raise it again # try: - # read the user ip source address for accounting and log history data - webclient_sourceipaddr = oc.cherrypy.getclientipaddr() - args[ 'WEBCLIENT_SOURCEIPADDR' ] = webclient_sourceipaddr - - # open a new desktop - desktop = oc.od.composer.opendesktop( auth, user, args ) + desktop = oc.od.composer.opendesktop( auth, user, roles, args ) # safe check for desktop type if not isinstance(desktop, oc.od.desktop.ODDesktop): @@ -408,7 +406,7 @@ def get_target_ip_route(self, target, websocketrouting ): @cherrypy.tools.json_out() @cherrypy.tools.json_in() def listsecrets(self): - (auth, user ) = self.validate_env() + (auth, user,roles) = self.validate_env() # list secrets secrets = oc.od.composer.listAllSecretsByUser(auth, user) list_secrets = list( secrets ) diff --git a/controllers/core_controller.py b/controllers/core_controller.py index d475182..bacfd9a 100755 --- a/controllers/core_controller.py +++ b/controllers/core_controller.py @@ -96,7 +96,7 @@ def getmessageinfo(self)->bytes: # route content type to handler routecontenttype = { 'text/plain': self.handler_messageinfo_text, 'application/json': self.handler_messageinfo_json } try: - (_, user ) = self.validate_env() + (_auth, user, _roles) = self.validate_env() message = services.messageinfo.popflush(user.userid) lambdaroute = self.getlambdaroute( routecontenttype, defaultcontenttype='application/json' )( message ) except Exception as e: diff --git a/controllers/manager_controller.py b/controllers/manager_controller.py index dcd1a61..ce64fa7 100755 --- a/controllers/manager_controller.py +++ b/controllers/manager_controller.py @@ -333,7 +333,7 @@ def desktop( self, *args ): @cherrypy.tools.json_out() def images( self )->str: self.is_permit_request() - if cherrypy.request.method == 'GET': + if cherrypy.request.method == 'GET': return self.handle_images_GET() elif cherrypy.request.method == 'DELETE': return self.handle_images_DELETE() @@ -356,7 +356,7 @@ def handle_images_GET( self )->str: @cherrypy.tools.json_out() def image( self, image:str=None, node:str=None ): self.is_permit_request() - if cherrypy.request.method == 'GET': + if cherrypy.request.method == 'GET': return self.handle_image_GET( image=image ) elif cherrypy.request.method == 'PUT': return self.handle_image_PUT( json_images=cherrypy.request.json, node=node ) @@ -645,6 +645,6 @@ def handle_ban_DELETE( self, collection, args ): @cherrypy.tools.json_out() def dry_run_desktop(self): self.logger.debug('validate_env') - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() result = oc.od.composer.sampledesktop(auth, user) return result \ No newline at end of file diff --git a/controllers/store_controller.py b/controllers/store_controller.py index cb90159..fe58d55 100755 --- a/controllers/store_controller.py +++ b/controllers/store_controller.py @@ -36,7 +36,7 @@ def __init__(self, config_controller=None): @cherrypy.tools.json_in() def set(self): # Check auth - (auth, user ) = self.validate_env() + (auth, user, roles ) = self.validate_env() arguments = cherrypy.request.json if not isinstance(arguments,dict) : return Results.error( message='invalid parameters' ) @@ -58,7 +58,7 @@ def set(self): def get(self): # Check auth - (auth, user ) = self.validate_env() + (auth, user, roles ) = self.validate_env() arguments = cherrypy.request.json if not isinstance(arguments,dict) : @@ -93,7 +93,7 @@ def wrapped_get( self, userid, key ): @cherrypy.tools.json_in() @cherrypy.tools.allow(methods=['POST']) def getcollection(self): - (auth, user ) = self.validate_env() + (auth, user, roles) = self.validate_env() userid = user.userid arguments = cherrypy.request.json if not isinstance(arguments,dict) : diff --git a/controllers/user_controller.py b/controllers/user_controller.py index 706b43c..3948e4e 100755 --- a/controllers/user_controller.py +++ b/controllers/user_controller.py @@ -45,7 +45,7 @@ def getinfo(self): @cherrypy.tools.json_in() def getlocation(self): # self.logger.debug('') - (auth, user) = self.validate_env() + (auth, user, roles) = self.validate_env() location = oc.od.user.getlocation( auth ) return Results.success(result=location) @@ -56,10 +56,12 @@ def whoami(self): # self.logger.debug('') auth = None user = None + roles = None # same has super().validate_env # but do not fail or ban ipaddr if services.auth.isauthenticated and services.auth.isidentified: user = services.auth.user auth = services.auth.auth + roles = services.auth.roles userinfo = oc.od.user.whoami( auth, user ) return userinfo \ No newline at end of file diff --git a/oc/auth/authservice.py b/oc/auth/authservice.py index 53a248f..7141461 100755 --- a/oc/auth/authservice.py +++ b/oc/auth/authservice.py @@ -27,7 +27,6 @@ from urllib.parse import urlparse from ldap import filter as ldap_filter import ldap3 -import base64 # # from ldap3.utils.log import set_library_log_detail_level, get_detail_level_name, set_library_log_hide_sensitive_data, EXTENDED @@ -49,6 +48,7 @@ import oc.logging +from oc.od.asnumber import ODASNumber import oc.pyutils as pyutils import oc.od.resolvdns import jwt @@ -450,7 +450,7 @@ def todict( self ): def merge( self, newauthinfo ): # merge only data object if not isinstance( newauthinfo, AuthInfo): - raise ValueError( f"merge error invalid AuthInfo object type {type(newauthinfo)}" ) + raise ValueError( f"merge error invalid AuthInfo object type {type(newauthinfo)}" ) mergedeep.merge(newauthinfo.data, self.data, strategy=mergedeep.Strategy.ADDITIVE) self.data = newauthinfo.data return self @@ -476,7 +476,7 @@ def update( self, manager, result, success, reason='' ): class AuthCache(object): NotSet = object() - def __init__(self, dict_token=None, auth_duration_in_milliseconds=None, origin=None): + def __init__(self, dict_token:dict=None, auth_duration_in_milliseconds:int=None, origin=None): self.reset() if isinstance(dict_token, dict): self.setuser( dict_token.get('user')) @@ -529,8 +529,16 @@ def isValidAuth(self): def roles(self): return self._roles - def setroles( self, valuedict ): - self._roles = AuthRoles( valuedict ) + def setroles( self, rolevalues:dict|list ): + myroles=rolevalues + # we convert a list of role to a dict with role as key and None as value + # to be able to use the same code for list or dict of roles + if isinstance( rolevalues, list): + myroles = {} + for role in rolevalues: + if isinstance( role, str): + myroles[role] = None + self._roles = AuthRoles( myroles ) @property def auth(self): @@ -651,15 +659,12 @@ def current(self): # check if we can use cached request data # to prevent decode twice update the request object # by adding the cherrypy.request.odauthcache attribut - if not hasattr(cherrypy.request, 'odauthcache') : + if not hasattr(cherrypy.request, 'odauthcache'): # attr is not found - # parse_auth_request() will decode the token - # self.logger.debug( "current http request has no odauthcache" ) - cherrypy.request.odauthcache = self.parse_auth_request() - else: - # self.logger.debug( f"current http request has cached odauthcache" ) - pass - return cherrypy.request.odauthcache + # parse_auth_request() will decode the token + # self.logger.debug( "current http request has no odauthcache" ) + cherrypy.request.odauthcache = self.parse_auth_request() + return cherrypy.request.odauthcache @property def user(self): @@ -808,7 +813,7 @@ def getclientdata(self): return { 'managers': list(map(lambda m: m.getclientdata(), self.managers.values())) } - def reduce_auth_data( self, auth ): + def reduce_auth_data( self, auth:AuthInfo )->dict: """reduce_token reduce token data to return only @@ -816,17 +821,21 @@ def reduce_auth_data( self, auth ): auth (_type_): _description_ """ auth_data_reduce = {} # return an empty auth_data_reduce by default - + if isinstance( auth.data, dict ): # filter to this entries - for entry in [ 'domain', 'labels' ] : - if auth.data.get(entry) : - auth_data_reduce[entry] = auth.data.get(entry) - + if isinstance( auth.data.get('domain'), str ): + auth_data_reduce['domain'] = auth.data.get('domain') + if isinstance( auth.data.get('labels'), dict ): + auth_data_reduce['labels'] = {} + for key, value in auth.data.get('labels').items(): + if key.isalnum(): # and isinstance(value, str): + auth_data_reduce['labels'][key] = value + return auth_data_reduce - def update_token( self, auth, user, roles=None ): + def update_token( self, auth:AuthInfo, user:AuthUser, roles:AuthRoles ): """update_token remove unused data @@ -853,33 +862,63 @@ def update_token( self, auth, user, roles=None ): # create jwt_role_reduce (futur usage) # roles=None as default parameter - jwt_role_reduce = {} + jwt_role_reduce = roles # encode new jwt jwt_token = self.jwt.encode( auth=jwt_auth_reduce, user=jwt_user_reduce, roles=jwt_role_reduce ) return jwt_token - def compiledcondition( self, condition, user, roles, provider=None, auth=None ): + def compiledcondition( self, condition:dict, user:dict, roles:list, provider=None, auth=None )->bool: - def isPrimaryGroup(user, primaryGroupID): + def isPrimaryGroup(user:dict, primaryGroupID:str)->bool: # if user is not a dict return False if not isinstance(user, dict): return False - # primary group id is uniqu for if user.get('primaryGroupID') == primaryGroupID: return True return False - def isTimeAfter( timeafter ) : + def isTimeAfter( timeafter ): + return False + + def isTimeBefore( timebefore ): + return False + + def __isASNumber( ipsource:str, asnumber:str )->bool: + bReturn = False + try: + if isinstance( oc.od.services.services.asnumber, ODASNumber ): + bReturn = oc.od.services.services.asnumber.lookup( ipsource, asnumber ) + except Exception as e: + logger.error( e ) + bReturn = False + # self.logger.debug( f"ipsource={ipsource} is in network={network} return {bReturn}") + return bReturn + + def _isASNumber( ipsource:str, asnumber:str )->bool: + # self.logger.debug(locals()) + if isinstance( asnumber, list ): + for n in asnumber: + if __isASNumber( ipsource, n ): + return True + elif isinstance( asnumber, str ): + return __isASNumber( ipsource, asnumber ) return False - def isTimeBefore( timebefore ) : + def isASNumber(ipsource:str, asnumber:str )->bool: + # self.logger.debug(locals()) + if isinstance( ipsource, list ): + for ip in ipsource: + if _isASNumber( ip, asnumber ): + return True + elif isinstance( ipsource, str): + return _isASNumber( ipsource, asnumber ) return False - def isGeoLocation(user, geolocation): + def isGeoLocation(user:dict, geolocation:dict)->bool: # user.get('geolocation'): {accuracy: 14.884, latitude: 48.8555131, longitude: 2.3752174} # haversine.haversine() user_geolocation = user.get('geolocation') @@ -905,7 +944,7 @@ def isGeoLocation(user, geolocation): return True return False - def isHttpHeader( requestheader, rulesheader ): + def isHttpHeader( requestheader:dict, rulesheader:dict )->bool: if not isinstance( rulesheader, dict): logger.error(f"invalid value type http header {type(rulesheader)}, dict is expected in rule" ) return False @@ -915,7 +954,7 @@ def isHttpHeader( requestheader, rulesheader ): return False return True - def existHttpHeader( requestheader, rulesheader ): + def existHttpHeader( requestheader:dict, rulesheader:list )->bool: if not isinstance( rulesheader, list): logger.error(f"invalid value type http header {type(rulesheader)}, list is expected in rule" ) return False @@ -931,7 +970,7 @@ def isBoolean( value ): return False return value - def isMemberOf(roles, groups ) : + def isMemberOf(roles:list, groups:list)->bool: # self.logger.debug(locals()) if not isinstance(roles,list): roles = [roles] @@ -948,7 +987,7 @@ def isMemberOf(roles, groups ) : return True return False - def __isinNetwork( ipsource, network ): + def __isinNetwork( ipsource:str, network:str )->bool: bReturn = False try: if IPAddress(ipsource) in IPNetwork( network ): @@ -956,10 +995,9 @@ def __isinNetwork( ipsource, network ): except Exception as e: logger.error( e ) bReturn = False - # self.logger.debug( f"ipsource={ipsource} is in network={network} return {bReturn}") return bReturn - def _isinNetwork( ipsource, network ): + def _isinNetwork( ipsource:str, network:str|list )->bool: # self.logger.debug(locals()) if isinstance( network, list ): for n in network: @@ -969,7 +1007,7 @@ def _isinNetwork( ipsource, network ): return __isinNetwork( ipsource, network ) return False - def isinNetwork( ipsource, network ): + def isinNetwork( ipsource:str, network:str|list )->bool: # self.logger.debug(locals()) if isinstance( ipsource, list ): for ip in ipsource: @@ -1090,6 +1128,14 @@ def isAttribut(user, attribut, start_with=None, equal=None ): if result == condition.get( 'expected'): compiled_result = True + asnumber = condition.get('asnumber') + if isinstance(asnumber, (str, list) ) : + ipsource = getclientipaddr() + # self.logger.debug( f"asnumber rules ipsource={ipsource}" ) + result = isASNumber( ipsource, asnumber ) + if result == condition.get( 'expected' ): + compiled_result = True + network = condition.get('network') if isinstance(network, (str, list) ) : ipsource = getclientipaddr() @@ -1183,7 +1229,7 @@ def compiledrule( self, name:str, rule:dict, thread_compiled_result, user, roles return result - def compiledrules( self, rules, user, roles, provider=None, auth=None, use_memcache=False, memcache=None ): + def compiledrules( self, rules:dict, user:dict, roles, provider=None, auth=None, use_memcache=False, memcache=None ): # # 'rule-ship': { 'conditions' : { 'memberOf': [ 'cn=ship_crew,ou=people,dc=planetexpress,dc=com'] }, # 'expected' : True, @@ -1205,7 +1251,20 @@ def compiledrules( self, rules, user, roles, provider=None, auth=None, use_memca # self.logger.debug('') + # default values buildcompiledrules = {} + # add builtin additional tags + # always add + # - ipsource tag + # - asnumber tag if not none + # - all user info values with prefix 'user.' to avoid conflict with other tags + ipsource = getclientipaddr() + buildcompiledrules[ 'ipsource' ] = ipsource + asnumber = oc.od.services.services.asnumber.getasn( ipsource ) + if isinstance( asnumber, str ): + buildcompiledrules[ 'asnumber' ] = asnumber + + # add rules if not isinstance( rules, dict ): return buildcompiledrules @@ -1226,6 +1285,7 @@ def compiledrules( self, rules, user, roles, provider=None, auth=None, use_memca except Exception as e: self.logger.error(f"rules {name} compilation failed {e} skipping rule") + """ # same version with thread support compilerule_timeout = 640 # seconds @@ -1267,7 +1327,7 @@ def compiledrules( self, rules, user, roles, provider=None, auth=None, use_memca return buildcompiledrules - def findproviderusingrules(self, manager ): + def findproviderusingrules(self, manager:str ): provider = None # default value # get explicit manager dict @@ -1339,7 +1399,7 @@ def is_default_metalogin_provider( self ): - def metalogin(self, provider, manager=None, **arguments): + def metalogin(self, provider:str, manager=None, **arguments): """ [metalogin] same as login but use meta directory to select user informations like DOMAIN \\ SAMAccountName and Kerberos realm @@ -1538,7 +1598,7 @@ def metalogin(self, provider, manager=None, **arguments): - def findproviderbydomainprefix( self, providers, domain ): + def findproviderbydomainprefix( self, providers:list, domain:str ): """[summary] find a provider using the DOMAIN ActiveDirectory domain name return the provider object for this domain @@ -1584,7 +1644,7 @@ def finddefaultprovider( self, providers): return default_provider - def logintrytofindaprovider( self, manager ): + def logintrytofindaprovider( self, manager:str ): # manager must be explicit if manager != 'explicit': raise AuthenticationFailureError('No authentication provider can be found') @@ -1622,7 +1682,7 @@ def mesuretimeserver_auth_duration( self,server_utctimestamp): auth_duration_in_milliseconds = (server_endoflogin_utctimestamp - server_utctimestamp)/1000 # in float second return auth_duration_in_milliseconds - def update_user_resqueted_executeclassname(self, auth, user_requested_features:dict)->None: + def update_user_resqueted_executeclassname(self, auth:AuthInfo, user_requested_features:dict)->None: # update auth.data['labels'] with user_requested_features entries if not isinstance( user_requested_features ,dict ): return @@ -1640,7 +1700,7 @@ def update_user_resqueted_executeclassname(self, auth, user_requested_features:d auth.data['labels'][feature_name] = feature_value - def login(self, provider, manager=None, **arguments): + def login(self, provider:str, manager=None, **arguments): self.logger.debug('') auth = None # must be define to prevent referenced before assignment exception pdr = None # must be define to prevent referenced before assignment exception @@ -1654,7 +1714,7 @@ def login(self, provider, manager=None, **arguments): # provider is None # can raise exception # do everythings possible to find one provider - self.logger.debug( f"provider is None, login try to find a provider using manager={manager}" ) + self.logger.debug( f"provider is None, login is trying to find a provider using manager={manager}" ) provider = self.logintrytofindaprovider( manager ) # look for an auth manager @@ -1700,14 +1760,12 @@ def login(self, provider, manager=None, **arguments): # if the provider has rules defined then # compile data using rules # runs the rules to get associated labels tag - if pdr.rules: - auth.data['labels'] = self.compiledrules( rules=pdr.rules, user=userinfo, roles=roles, provider=pdr, auth=auth ) + auth.data['labels'] = self.compiledrules( rules=pdr.rules, user=userinfo, roles=roles, provider=pdr, auth=auth ) # update auth.data['labels']['executeclassname'] # if user requests feature executeclassname self.update_user_resqueted_executeclassname( auth, arguments.get('features') ) - # dump labels for debug self.logger.debug( f"labels {auth.data.get('labels')}") # end of auth, mesuretimeserver_auth_duration @@ -1721,10 +1779,7 @@ def login(self, provider, manager=None, **arguments): ) reason = f"a.Authentication on { pdr.getdisplaydescription() } successful in {auth_duration_in_milliseconds:.2f} s" # float two digits after comma - response.update( manager=mgr, - result=myauthcache, - success=True, - reason=reason ) + response.update( manager=mgr, result=myauthcache, success=True, reason=reason ) finally: if isinstance( pdr, ODAuthProviderBase): @@ -2273,7 +2328,7 @@ def getclientdata(self): data['state'] = state return data - def authenticate(self, code=None, **params): + def authenticate(self, code=None, **params)->AuthInfo: oauthsession = OAuth2Session( self.client_id, scope=self.scope, redirect_uri=self.redirect_uri) authorization_response = self.redirect_uri_prefix + '?' + cherrypy.request.query_string token = oauthsession.fetch_token( self.token_url, client_secret=self.client_secret, include_client_id=self.include_client_id, authorization_response=authorization_response ) @@ -2282,7 +2337,7 @@ def authenticate(self, code=None, **params): return authinfo - def getuserinfo(self, authinfo, **params): + def getuserinfo(self, authinfo:AuthInfo, **params): # retrieve the token object from the previous authinfo oauthsession = authinfo.token @@ -2377,7 +2432,7 @@ def getroles(self, authinfo, userinfo, **params): # return empty list self.logger.debug(f"provider {self.name} is a auth_only={self.auth_only}, no roles can be read return {roles}") return roles - + if isinstance( userinfo.get('groups'), list ): roles = userinfo.get('groups') @@ -2986,7 +3041,7 @@ def getuserinfo(self, authinfo, **params): return userinfo - def getroles(self, authinfo, userinfo, **params): + def getroles(self, authinfo:AuthInfo, userinfo:AuthUser, **params): self.logger.debug('') roles = [] @@ -3858,7 +3913,7 @@ def getuserinfo(self, authinfo, **params): - def getroles(self, authinfo, userinfo, **params): + def getroles(self, authinfo:AuthInfo, userinfo:AuthUser, **params): self.logger.debug('') token = authinfo.token if not self.recursive_search: diff --git a/oc/auth/namedlib.py b/oc/auth/namedlib.py index 09c105e..c5153c5 100755 --- a/oc/auth/namedlib.py +++ b/oc/auth/namedlib.py @@ -46,8 +46,50 @@ def normalize_name_volunename(name:str)->str: # volume name max length is 63 chars to lowercase return normalize_name( name )[0:63].lower() -def normalize_name_label(name:str)->str: - return normalize_name(name) + +def normalize_label(data:str)->str: + # must be 63 characters or less (can be empty), + # unless empty, must begin and end with an alphanumeric character ([a-z0-9A-Z]), + # could contain dashes (-), underscores (_), dots (.), and alphanumerics between. + if not isinstance( data, str ): + return None + + if len(data) == 0: + return data + + mydata = str(data) + try: + # must begin with an alphanumeric character ([a-z0-9A-Z]), + while not mydata[0].isalnum(): + mydata=mydata[1::] + except IndexError: + return None + + newdata = '' + for c in mydata: + if c.isalnum() or c == '-' or c == '_' or c == '.': + newdata = newdata + c + else: + newdata = newdata + '-' + + try: + # must end with an alphanumeric character ([a-z0-9A-Z]), + while not newdata[-1].isalnum(): + newdata=newdata[:-1] + except IndexError: + return None + + newdata = newdata[0:63] + + try: + # must end with an alphanumeric character ([a-z0-9A-Z]), + while not newdata[-1].isalnum(): + newdata=newdata[:-1] + except IndexError: + return None + + return newdata + def normalize_networkname(name:str)->str: return normalize_name(name) @@ -77,14 +119,6 @@ def normalize_char( c ): else: return '_' -# (([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?') -def normalize_label( name:str )->str: - # permit only DNS name [a-z][A-Z][0-9]- - newname = '' - for c in name: - newname = newname + normalize_char(c) - return newname - # Take care def normalize_shell_variable(myvar:str)->str: newNormalizedVar = quote( myvar) diff --git a/oc/cherrypy.py b/oc/cherrypy.py index 473f384..2e3796a 100755 --- a/oc/cherrypy.py +++ b/oc/cherrypy.py @@ -56,6 +56,10 @@ def getclientreal_ip(): pass return realip +def getuseragent(): + user_agent = cherrypy.request.headers.get('User-Agent') + return user_agent + def getclientxforwardedfor_listip(): clientiplist = [] xforwardedfor = cherrypy.request.headers.get('X-Forwarded-For') diff --git a/oc/datastore.py b/oc/datastore.py index 04d5e63..72e0324 100755 --- a/oc/datastore.py +++ b/oc/datastore.py @@ -34,10 +34,14 @@ def addtocollection(self, databasename, collectionname, datadict): @oc.logging.with_logger() class ODMongoDatastoreClient(ODDatastoreClient): - def __init__(self, mongodburl, databasename=None): - self.authenticationDatabase = 'admin' + def __init__(self, mongodburl:str, mongodbparam:str=None, databasename:str=None): self.databasename = databasename self.mongodburl = mongodburl + if not isinstance( mongodbparam, str): + self.mongodbparam = '' + else: + self.mongodbparam = mongodbparam + # Defaults to 20000 (20 seconds). # set to 5000 (5 seconds). # self.connectTimeoutMS = 3000 @@ -53,29 +57,20 @@ def createhosturl( self, databasename ): url = None if isinstance(databasename, str ): # url = f"{self.mongodburl}/{databasename}?directConnection=true&replicaSet=rs0&authSource={databasename}" - # url = f"{self.mongodburl}/{databasename}?authSource={databasename}" - url = f"{self.mongodburl}/{databasename}?replicaSet=rs0&authSource={databasename}" + # url = f"{self.mongodburl}/{databasename}?replicaSet=rs0&authSource={databasename}" + url = f"{self.mongodburl}/{databasename}?{self.mongodbparam}&authSource={databasename}" else: url = self.mongodburl return url - def createclient(self, databasename): - # self.logger.debug( f"databasename={databasename}") - # hosturl = self.createhosturl( databasename ) - # self.logger.debug( f"hosturl={hosturl}") + def createclient(self, databasename:str): hosturl = self.createhosturl( databasename ) - # self.logger.debug( f"createclient MongoClient {hosturl}") mongo_client = MongoClient(host=hosturl) - # connectTimeoutMS=self.connectTimeoutMS, - # socketTimeoutMS=self.socketTimeoutMS, - # serverSelectionTimeoutMS=self.serverSelectionTimeoutMS ) - # server_info = mongo_client.server_info() - # self.logger.debug( f"server_info={server_info}") return mongo_client def get_document_value_in_collection(self, databasename, collectionname, key): obj = None - self.logger.debug( f"database={databasename} collectionname={collectionname} key={key}" ) + # self.logger.debug( f"database={databasename} collectionname={collectionname} key={key}" ) try: client = self.createclient(databasename) collection = client[databasename][collectionname] @@ -230,47 +225,4 @@ def list_collections(self, databasename:str): client.close() except Exception as e : self.logger.error( f"list_collections {e}" ) - return collections - - """ - def config_replicaset( self, replicaset_name ): - mongoclientcfg = ODMongoDatastoreClient( self.config.hosturl ) - host = mongoclientcfg.gethost() - config = { '_id': replicaset_name, 'members': [ { '_id':0, 'host': host } ] } - return config - - def create_replicaset( self, replicaset_name ): - self.logger.info(f"create replicaset {replicaset_name}") - c = MongoClient(self.serverfqdn, self.serverport, directConnection=True ) - try: - # set a default configuration - config = self.config_replicaset( replicaset_name ) - repl_status = c.admin.command("replSetInitiate", config, allowable_errors=True) - except pymongo.errors.OperationFailure as e: - if e.code == 23: # already initialized - # another process has done before - self.logger.info( f"{self.serverfqdn} already use replicatset") - return True - else: - self.logger.error( e ) - except Exception as e: - self.logger.error( e ) - return False - - def getstatus_replicaset( self, replicaset_name): - self.logger.info(f"read replicaset {replicaset_name} status") - c = MongoClient(self.serverfqdn, self.serverport, directConnection=True ) - try: - repl_status = c.admin.command("replSetGetStatus") - if isinstance( repl_status, dict ): - if int(repl_status.get('ok')) == 1: - # repl_status.get('set') == replicaset_name - self.logger.info( f"{self.serverfqdn} already uses replicatset {repl_status.get('set')}") - return True - except pymongo.errors.OperationFailure as e: - if e.code == 94: # no replset config has been received - self.logger.info("no replset config has been received") - except Exception as e: - self.logger.error( e ) - return False - """ + return collections \ No newline at end of file diff --git a/oc/lib.py b/oc/lib.py index 694335e..fad188f 100755 --- a/oc/lib.py +++ b/oc/lib.py @@ -35,7 +35,7 @@ def randomStringwithDigitsAndSymbols(stringLength=10): alphabet = string.ascii_letters + string.digits return ''.join(secrets.choice(alphabet) for i in range(stringLength)) -def remove_accents(input_str): +def remove_accents(input_str:str)->str: """[remove_accents] remove accents in string and set to lower case Args: diff --git a/oc/od/apps.py b/oc/od/apps.py index d77f743..7bf3195 100755 --- a/oc/od/apps.py +++ b/oc/od/apps.py @@ -32,7 +32,7 @@ class ODApps: """ ODApps manage application list """ - def __init__(self, mongodburl=None ): + def __init__(self, mongodburl:str=None, mongodbparam:str=None): self.lock = threading.Lock() self.myglobal_list = {} self.build_image_counter = 0 @@ -56,11 +56,11 @@ def __init__(self, mongodburl=None ): self.index_name = 'id' # id is the name of the image repoTags[0] self.image_collection_name = 'image' if isinstance( mongodburl, str) : - self.datastore = oc.datastore.ODMongoDatastoreClient(mongodburl, self.databasename) + self.datastore = oc.datastore.ODMongoDatastoreClient(mongodburl, mongodbparam, self.databasename) self.init_collection( collection_name=self.image_collection_name ) def init_collection( self, collection_name ): - mongo_client = oc.datastore.ODMongoDatastoreClient.createclient(self.datastore,self.databasename) + mongo_client = self.datastore.createclient(self.databasename) db = mongo_client[self.databasename] col = db[collection_name] try: @@ -70,11 +70,11 @@ def init_collection( self, collection_name ): mongo_client.close() def get_collection(self, collection_name ): - mongo_client = oc.datastore.ODMongoDatastoreClient.createclient(self.datastore,self.databasename) + mongo_client = self.datastore.createclient(self.databasename) db = mongo_client[self.databasename] return db[collection_name] - def append_app_to_collection( self, app ): + def append_app_to_collection( self, app:dict )->bool: if not isinstance( app, dict): return False myapp = app.copy() # copy @@ -513,20 +513,21 @@ def json_imagetoapp( self, json_image:dict )->dict: def add_json_image_to_collection( self, json_image:str )->list|None: applist = None + # if json image is a list add each image in list if isinstance( json_image, list ): applist = [] for image in json_image: myapp = self.json_imagetoapp( image ) - if isinstance( myapp, dict ): - if self.append_app_to_collection( myapp ): - applist.append( myapp ) - + if self.append_app_to_collection( myapp ): + applist.append( myapp ) + + # if json image is a dict add it in collection if isinstance( json_image, dict ): myapp = self.json_imagetoapp( json_image ) - if isinstance( myapp, dict ): - if self.append_app_to_collection( myapp ): - applist = myapp + if self.append_app_to_collection( myapp ): + applist = myapp + return applist @staticmethod @@ -788,11 +789,13 @@ def start_mongo_watcher(self): def stop_mongo_watcher(self): - self.thread_event.set() if isinstance(self.watcher_thread, threading.Thread): - if self.watcher_thread.is_alive(): + if hasattr(self.watcher_thread, 'set'): + self.thread_event.set() + if hasattr(self.watcher_thread, 'is_alive') and self.watcher_thread.is_alive(): self.logger.debug("MongoDB watcher_thread.join()...") - self.watcher_thread.join() + # self.watcher_thread.join() + self.watcher_thread = None else: self.logger.debug("MongoDB watcher_thread is not alive.") else: diff --git a/oc/od/asnumber.py b/oc/od/asnumber.py new file mode 100644 index 0000000..b57e5b3 --- /dev/null +++ b/oc/od/asnumber.py @@ -0,0 +1,47 @@ +import logging +import pyasn +import oc.logging + +logger = logging.getLogger(__name__) + +@oc.logging.with_logger() +class ODASNumber: + + def __init__(self, database:str='ipasn_db.dat' ): + self.asndb = None + try: + self.asndb = pyasn.pyasn(database) + except Exception as e: + self.logger.error( f"Error while loading ASNumber database {database} : {e}" ) + + def isinitialized(self): + return isinstance( self.asndb, pyasn.pyasn) + + + def getasn( self, ipaddr:str ) -> str: + asn = None + if not self.isinitialized(): + self.logger.error( "ASNumber database not initialized" ) + return asn + try: + asn, prefix = self.asndb.lookup( ipaddr ) + except Exception as e: + self.logger.error( e ) + return str(asn) if asn is not None else None + + def lookup( self, ipaddr:str, asnumber )->bool: + bReturn = False + if not self.isinitialized(): + self.logger.error( "ASNumber database not initialized" ) + return bReturn + try: + asn, prefix = self.asndb.lookup( ipaddr ) + if isinstance( asnumber, str): + if str(asn) == asnumber: + bReturn = True + elif isinstance( asnumber, list): + if str(asn) in asnumber: + bReturn = True + except Exception as e: + self.logger.error( e ) + return bReturn \ No newline at end of file diff --git a/oc/od/base_controller.py b/oc/od/base_controller.py index 3d8a6bc..aaf3358 100644 --- a/oc/od/base_controller.py +++ b/oc/od/base_controller.py @@ -143,11 +143,12 @@ def validate_env(self): user = services.auth.user auth = services.auth.auth + roles = services.auth.roles if self.isban_login(user.userid): raise cherrypy.HTTPError( status=401, message='user is banned') - return (auth, user) + return (auth, user, roles) def fail_ip( self, ipAddr:str=None ): if not isinstance( ipAddr, str): @@ -212,9 +213,12 @@ def is_permit_request(self): is_ip_filter = self.ipfilter() # Check if the controller has an ip filter # if both filters are set, at least one must match # self.logger.debug( f"is_api_filter={is_api_filter}, is_ip_filter={is_ip_filter}" ) - if not is_api_filter and not is_ip_filter: - self.raise_http_error_message( '403.1 - Execute access forbidden' ) - + if not is_api_filter or not is_ip_filter: + if not is_ip_filter: + self.raise_http_error_message( '403.7 - IP address access denied' ) + if not is_api_filter: + self.raise_http_error_message( '403.1 - Execute access forbidden' ) + if isinstance( self.requestsallowed, dict ): # read the request path path = cherrypy.request.path_info @@ -235,12 +239,25 @@ def is_permit_request(self): self.raise_http_error_message( '403.8 - Site access denied' ) def apifilter(self): + """apifilter + check if the request apikey is in the permitted apikey list + if no apikey list is set, return True + Returns: + bool: True if the request apikey is in the permitted apikey list or no list is set + """ self.logger.debug('') if isinstance(self.apikey, list): return self.is_apikey() return True def ipfilter( self ): + """ipfilter + check if the client ip address is in the permitted network list + if no network list is set, return True + + Returns: + bool: True if the client ip address is in the permitted network list or no list is set + """ self.logger.debug('') if not isinstance(self.ipnetworklistfilter, list) : return True diff --git a/oc/od/composer.py b/oc/od/composer.py index f5932dd..70ab864 100755 --- a/oc/od/composer.py +++ b/oc/od/composer.py @@ -13,10 +13,9 @@ # Author: abcdesktop.io team # Software description: cloud native desktop service # -import os import logging +import ua_parser from typing_extensions import assert_type -import requests from oc.cherrypy import getclientipaddr from oc.od.desktop import ODDesktop @@ -24,12 +23,13 @@ import oc.od.orchestrator from oc.od.services import services -from oc.auth.authservice import AuthInfo, AuthUser # to read AuthInfo and AuthUser +from oc.auth.authservice import AuthInfo, AuthUser, AuthRoles # to read AuthInfo and AuthUser from oc.od.error import ODError import oc.od.appinstancestatus import oc.od.desktop import oc.od.services import oc.od.tracking +import oc.od.settings # type need for garbage collector from kubernetes.client.models.v1_pod_list import V1PodList @@ -89,7 +89,29 @@ def securitypoliciesmatchlabelvalue( desktop:ODDesktop, authinfo:AuthInfo, label return result -def opendesktop(authinfo:AuthInfo, userinfo:AuthUser, args ): +def get_webclient_os_family(): + desktop_theme = oc.od.settings.desktop.get('theme') + if isinstance(desktop_theme, str): + if desktop_theme == 'autodetect': + desktop_theme = parse_user_agent_os_family() + return desktop_theme + +def parse_user_agent_os_family()->str: + os_family = None # default value as fallback + try: + user_agent = oc.cherrypy.getuseragent() + ua_parsed = ua_parser.parse(user_agent) + if isinstance( ua_parsed, ua_parser.core.Result): + os_family = ua_parsed.os.family.replace(' ', '').lower() + # Mac OS/X -> macosx + # Linux -> linux + # Windows -> windows + except Exception as e: + logger.error(e) + return os_family + + +def opendesktop(authinfo:AuthInfo, userinfo:AuthUser, rolesinfo:AuthRoles, args:dict ): """open a new or return a desktop Args: authinfo (AuthInfo): authentification data @@ -167,7 +189,12 @@ def opendesktop(authinfo:AuthInfo, userinfo:AuthUser, args ): # create a new desktop # logger.debug( 'Cold start, creating your new desktop' ) - desktop = createdesktop( authinfo, userinfo, args) + + # read http headers for accounting and log history data + args[ 'ABCDESKTOP_WEBCLIENT_SOURCEIPADDR' ] = oc.cherrypy.getclientipaddr() + args[ 'ABCDESKTOP_WEBCLIENT_USERAGENT_OS_FAMILY' ] = get_webclient_os_family() # parse_user_agent_os_family() + # open a new desktop + desktop = createdesktop( authinfo, userinfo, rolesinfo, args) if isinstance( desktop, ODDesktop) : oc.od.tracking.addstartnewentryindesktophistory(authinfo, userinfo, desktop ) services.accounting.accountex( desktoptype, 'createsuccess') @@ -598,7 +625,8 @@ def createDesktopArguments( authinfo, userinfo, args ): # add environment variables env = createExecuteEnvironment( authinfo, userinfo ) # add source ip addr as WEBCLIENT_SOURCEIPADDR var env - env.update( { 'WEBCLIENT_SOURCEIPADDR': args.get('WEBCLIENT_SOURCEIPADDR') } ) + env.update( { 'ABCDESKTOP_WEBCLIENT_SOURCEIPADDR': args.get('ABCDESKTOP_WEBCLIENT_SOURCEIPADDR') } ) + env.update( { 'ABCDESKTOP_WEBCLIENT_USERAGENT_OS_FAMILY': args.get('ABCDESKTOP_WEBCLIENT_USERAGENT_OS_FAMILY') } ) myCreateDesktopArguments = { 'env' : env } return myCreateDesktopArguments @@ -608,7 +636,7 @@ def resumedesktop( authinfo:AuthInfo, userinfo:AuthUser ) -> ODDesktop: return myDesktop -def createdesktop( authinfo:AuthInfo, userinfo:AuthUser, args ): +def createdesktop( authinfo:AuthInfo, userinfo:AuthUser, rolesinfo:AuthRoles, args ): """create a new desktop Args: @@ -632,6 +660,7 @@ def createdesktop( authinfo:AuthInfo, userinfo:AuthUser, args ): # Create the desktop myDesktop = myOrchestrator.createdesktop( userinfo=userinfo, authinfo=authinfo, + rolesinfo=rolesinfo, **myCreateDesktopArguments ) if isinstance( myDesktop, oc.od.desktop.ODDesktop ): @@ -652,7 +681,7 @@ def createdesktop( authinfo:AuthInfo, userinfo:AuthUser, args ): return myDesktop -def dry_run_desktop(authinfo:AuthInfo, userinfo:AuthUser): +def dry_run_desktop(authinfo:AuthInfo, userinfo:AuthUser, rolesinfo:AuthRoles): """dry_run_desktop create a desktop with dry_run mode, this is used to test the desktop creation without creating a pod or a container @@ -670,7 +699,7 @@ def dry_run_desktop(authinfo:AuthInfo, userinfo:AuthUser): myOrchestrator.desktoplaunchprogress = dry_run_on_desktoplaunchprogress_info # Create the desktop dry_run - jsonDesktop = myOrchestrator.createdesktop( authinfo, userinfo, **myCreateDesktopArguments ) + jsonDesktop = myOrchestrator.createdesktop( authinfo, userinfo, rolesinfo, **myCreateDesktopArguments ) return jsonDesktop diff --git a/oc/od/fail2ban.py b/oc/od/fail2ban.py index 3631e30..a517e6d 100644 --- a/oc/od/fail2ban.py +++ b/oc/od/fail2ban.py @@ -8,7 +8,7 @@ @oc.logging.with_logger() class ODFail2ban: - def __init__(self, mongodburl, fail2banconfig={}): + def __init__(self, mongodburl:str, mongodbparam:str=None, fail2banconfig:dict={}): self.databasename = 'fail2ban' self.ip_collection_name = 'ipaddr' self.login_collection_name = 'login' @@ -16,7 +16,7 @@ def __init__(self, mongodburl, fail2banconfig={}): self.failmaxvaluebeforeban = fail2banconfig.get('failsbeforeban', 5 ) # specify a positive non-zero value self.banexpireAfterSeconds = fail2banconfig.get('banexpireafterseconds', 30*60 ) self.protectedNetworks = fail2banconfig.get('protectednetworks', [] ) - self.datastore = oc.datastore.ODMongoDatastoreClient(mongodburl, self.databasename) + self.datastore = oc.datastore.ODMongoDatastoreClient(mongodburl=mongodburl, mongodbparam=mongodbparam, databasename=self.databasename) self.collections_name = [ self.ip_collection_name, self.login_collection_name ] self.sanity_filter = { self.ip_collection_name:"0123456789.", @@ -50,7 +50,7 @@ def sanity( self, value, filter ): def init_collection( self, collection_name ): self.logger.debug(f"{self.databasename} {collection_name}") - mongo_client = oc.datastore.ODMongoDatastoreClient.createclient(self.datastore, self.databasename ) + mongo_client = self.datastore.createclient(self.databasename) db = mongo_client[self.databasename] col = db[collection_name] try: @@ -80,7 +80,7 @@ def test( self ): self.logger.debug( f"dump list is {list_ban_dummy_ipaddr}") def get_collection(self, collection_name ): - mongo_client = oc.datastore.ODMongoDatastoreClient.createclient(self.datastore, self.databasename) + mongo_client = self.datastore.createclient(self.databasename) db = mongo_client[self.databasename] return db[collection_name] @@ -88,7 +88,7 @@ def fail( self, value, collection_name ): myfail = None collection = self.get_collection( collection_name ) bfind = collection.find_one({ self.index_name: value}) - myfail = self.updateorinsert( collection=collection, bUpdate=bfind, value=value, counter=1 ) + myfail = self.updateorinsert( collection=collection, bUpdate=bfind, value=value, counter=1 ) return myfail def fail_ip( self, value ): @@ -146,7 +146,6 @@ def isban( self, value, collection_name ): # if ban is not enable nothing to do if not self.enable: return False - bReturn = False # sanity check @@ -170,11 +169,9 @@ def updateorinsert( self, collection, bUpdate, value, counter ): q = collection.update_one({ self.index_name: value, self.index_date: utc_timestamp}, {'$inc' : { self.counter : counter } }) else: q = collection.insert_one({ self.index_name: value, self.index_date: utc_timestamp, self.counter : counter }) - q = {"n": 1, "Inserted": 1, "ok": 1.0, "updatedExisting": False } return q - def ban( self, value, collection_name ): - myban = None + def ban( self, value:str, collection_name:str )->dict: if not self.sanity( value, self.sanity_filter.get(collection_name)): error_message = f"bad value sanity check {value} for {collection_name}" self.logger.error(error_message) @@ -182,9 +179,13 @@ def ban( self, value, collection_name ): collection = self.get_collection( collection_name ) bfind = collection.find_one({ self.index_name: value}) myban = self.updateorinsert( collection=collection, bUpdate=bfind, value=value, counter=self.failmaxvaluebeforeban ) + ban_result = {} if isinstance( myban, pymongo.results.UpdateResult ): - myban = myban.raw_result - return myban + ban_result = { 'n': myban.raw_result.get('n'), 'ok': myban.raw_result.get('ok'), 'updatedExisting': myban.raw_result.get('updatedExisting') } + if isinstance( myban, pymongo.results.InsertOneResult ): + ban_ok = '1' if myban.acknowledged else '0' + ban_result = { 'n': 1, 'ok': ban_ok } + return ban_result def drop( self, collection_name ): collection = self.get_collection( collection_name ) @@ -197,10 +198,14 @@ def unban( self, value, collection_name ): self.logger.error("bad parameter sanity check") return myban collection = self.get_collection( collection_name ) + # self.logger.debug( f"collection.delete_one {value} in {collection_name}") delete_one = collection.delete_one({ self.index_name: value}) + # self.logger.debug( f"delete_one result: {delete_one} type={type(delete_one)}" ) + unban_result = {} if isinstance( delete_one, pymongo.results.DeleteResult ): - myban = delete_one.raw_result - return myban + # filter the result to return only the number of deleted document and the status of the operation + unban_result = { 'n': delete_one.raw_result.get('n'), 'ok': delete_one.raw_result.get('ok') } + return unban_result def listban( self, collection_name ): ban_list = [] diff --git a/oc/od/kuberneteswatcher.py b/oc/od/kuberneteswatcher.py index d697511..64b8439 100644 --- a/oc/od/kuberneteswatcher.py +++ b/oc/od/kuberneteswatcher.py @@ -98,10 +98,16 @@ def loopforevent( self ): except client.exceptions.ApiException as e: self.logger.error( f"{type(e)} {e}" ) - if e.status == 401: + if hasattr(e,'status') and e.status == 401 : self.logger.fatal( f"exit loopforevent threading, this error is fatal" ) - # exit(-1) return + + if hasattr(e, 'status') and e.status == 504 and \ + hasattr(e, 'reason') and 'Too large resource version' in e.reason : + self.logger.debug( f"retrying after Timeout: Too large resource version ApiException {e}") + break # break this for loop and retry watch streaming + + self.logger.error( f"{type(e)} {e}" ) time.sleep( 60 ) # wait a minute to prevent log avalanche except Exception as e: diff --git a/oc/od/orchestrator.py b/oc/od/orchestrator.py index ca39006..299dfbf 100755 --- a/oc/od/orchestrator.py +++ b/oc/od/orchestrator.py @@ -17,7 +17,6 @@ from typing_extensions import assert_type import oc.logging from oc.od.apps import ODApps -import oc.od.error import oc.od.settings import oc.lib import oc.auth.namedlib @@ -34,7 +33,6 @@ import requests import copy import threading -import hashlib from kubernetes import client, config, watch from kubernetes.stream import stream @@ -101,7 +99,7 @@ import oc.od.appinstancestatus from oc.od.error import ODAPIError, ODError # import all error classes from oc.od.desktop import ODDesktop -from oc.auth.authservice import AuthInfo, AuthUser # to read AuthInfo and AuthUser +from oc.auth.authservice import AuthInfo, AuthUser, AuthRoles # to read AuthInfo, AuthUser, AuthRoles from oc.od.vnc_password import ODVncPassword logger = logging.getLogger(__name__) @@ -190,7 +188,7 @@ def get_containername( self, authinfo, userinfo, currentcontainertype, myuuid ): # name = oc.auth.namedlib.normalize_name_dnsname( name ) # return name - def get_normalized_username(self, name ): + def get_normalized_username(self, name:str ): """[get_normalized_username] return a username without accent to be use in label and container name Args: @@ -201,22 +199,22 @@ def get_normalized_username(self, name ): """ return oc.lib.remove_accents( name ) - def resumedesktop(self, authinfo, userinfo, **kwargs): + def resumedesktop(self, authinfo:AuthInfo, userinfo:AuthUser, **kwargs): raise NotImplementedError(f"{type(self)}.resumedesktop") - def createdesktop(self, authinfo, userinfo, **kwargs): + def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, rolesinfo:AuthRoles, **kwargs): raise NotImplementedError(f"{type(self)}.createdesktop") - def build_volumes( self, authinfo, userinfo, volume_type, secrets_requirement, rules, **kwargs): + def build_volumes( self, authinfo:AuthInfo, userinfo:AuthUser, volume_type, secrets_requirement, rules, **kwargs): raise NotImplementedError(f"{type(self)}.build_volumes") - def findDesktopByUser( self, authinfo, userinfo ): + def findDesktopByUser( self, authinfo:AuthInfo, userinfo:AuthUser ): raise NotImplementedError(f"{type(self)}.findDesktopByUser") - def removedesktop(self, authinfo, userinfo, args={}): + def removedesktop(self, authinfo:AuthInfo, userinfo:AuthUser, args={}): raise NotImplementedError(f"{type(self)}.removedesktop") - def getsecretuserinfo(self, authinfo, userinfo): + def getsecretuserinfo(self, authinfo:AuthInfo, userinfo:AuthUser): raise NotImplementedError(f"{type(self)}.getsecretuserinfo") def execwaitincontainer( self, desktop, command, timeout): @@ -958,10 +956,15 @@ def get_labelvalue( self, label_value:str)->str: Returns: [str]: [return normalized label name] """ - assert isinstance(label_value, str), f"label_value has invalid type {type(label_value)}" - normalize_data = oc.auth.namedlib.normalize_label( label_value ) - no_accent_normalize_data = oc.lib.remove_accents( normalize_data ) - return no_accent_normalize_data + if label_value is None: + return label_value + if not isinstance(label_value, str): + label_value = json.stringify(label_value) + # self.logger.error( f"get_labelvalue invalid type {type(label_value)} for label value {label_value}" ) + # return None + no_accent_normalize_data = oc.lib.remove_accents( label_value ) + normalize_data = oc.auth.namedlib.normalize_label( no_accent_normalize_data ) + return normalize_data def logs( self, authinfo:AuthInfo, userinfo:AuthUser )->str: """logs @@ -1152,7 +1155,7 @@ def build_volumes_additional_by_rules( self, authinfo:AuthInfo, userinfo:AuthUse if fstype=='nfs': volumes_mount[mountvol.name] = { 'name': volume_name, - 'mountPath': mountvol.mountPath + 'mountPath': mountvol.mountPath } volumes[mountvol.name] = { 'name': volume_name, @@ -1167,7 +1170,8 @@ def build_volumes_additional_by_rules( self, authinfo:AuthInfo, userinfo:AuthUse if fstype=='hostpath': volumes_mount[mountvol.name] = { 'name': volume_name, - 'mountPath': mountvol.mountPath + 'mountPath': mountvol.mountPath, + 'mountPropagation': mountvol.mountPropagation } volumes[mountvol.name] = { 'name': volume_name, @@ -1184,7 +1188,8 @@ def build_volumes_additional_by_rules( self, authinfo:AuthInfo, userinfo:AuthUse if isinstance(claimName, str): volumes_mount[mountvol.name] = { 'name': volume_name, - 'mountPath': mountvol.mountPath + 'mountPath': mountvol.mountPath, + 'mountPropagation': mountvol.mountPropagation } volumes[mountvol.name] = { 'name': volume_name, @@ -1241,7 +1246,7 @@ def build_volumes_home( self, authinfo:AuthInfo, userinfo:AuthUser, volume_type: volume_home_name = 'home' # homedirectorytype is by default None homedirectorytype = oc.od.settings.desktop['homedirectorytype'] - self.logger.debug(f"homedirectorytype is {homedirectorytype}") + self.logger.debug(f"homedirectorytype is {homedirectorytype} and volume_type is {volume_type}") subpath_name = oc.auth.namedlib.normalize_name( userinfo.userid ) self.logger.debug(f"subpath_name is {subpath_name}") user_homedirectory = os.path.join( self.get_user_homedirectory(authinfo, userinfo), @@ -1267,7 +1272,9 @@ def build_volumes_home( self, authinfo:AuthInfo, userinfo:AuthUser, volume_type: # now ovewrite home values if homedirectorytype == 'persistentVolumeClaim': + self.logger.debug( f"use homedirectorytype persistentVolumeClaim" ) claimName = None # None is the default value, nothing to do + # self.logger.debug( f"type of oc.od.settings.desktop['persistentvolumeclaim'] is {type(oc.od.settings.desktop['persistentvolumeclaim'])}" ) if isinstance( oc.od.settings.desktop['persistentvolumeclaim'], str): # oc.od.settings.desktop['persistentvolumeclaim'] is the name of the PVC # in this case, there is only one shared PVC for all users @@ -1277,6 +1284,7 @@ def build_volumes_home( self, authinfo:AuthInfo, userinfo:AuthUser, volume_type: elif isinstance( oc.od.settings.desktop['persistentvolumeclaim'], dict): # oc.od.settings.desktop['persistentvolumeclaim'] must be created by pyos + self.logger.debug( f"build home volume with volume_type={volume_type} and persistentvolumeclaim is a dict" ) if volume_type in [ 'pod_desktop', 'pod_application' ] : # create a pvc to store desktop volume persistentvolume = copy.deepcopy( oc.od.settings.desktop['persistentvolume'] ) @@ -1891,7 +1899,6 @@ def removedesktop(self, authinfo:AuthInfo, userinfo:AuthUser, myPod:V1Pod=None, assert isinstance(authinfo, AuthInfo), f"authinfo has invalid type {type(authinfo)}" assert isinstance(userinfo, AuthUser), f"userinfo has invalid type {type(userinfo)}" - # get the user's pod if not isinstance(myPod, V1Pod ): myPod = self.findPodByUser(authinfo, userinfo ) @@ -3244,7 +3251,34 @@ def get_volumemountlistfromcontainertype( self, volumemount:dict, currentcontain self.logger.warning( f"volumeMount {volumeMount_name} not found for container type {currentcontainertype}" ) return volumemountlist - def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, **kwargs)-> ODDesktop : + + def areAllmyContainerStarted( self, pod_name:str )->bool: + """areAllmyContainerstarted + check if all containers in the pod are started + return True if all containers in the pod are started, False otherwise + + Args: + myPod (V1Pod): pod object + Returns: + bool: True if all containers are started, False otherwise + """ + assert isinstance(pod_name, str), f"pod_name has invalid type {type(pod_name)}, str is expected" + try: + myPod = self.kubeapi.read_namespaced_pod(namespace=self.namespace,name=pod_name) + except ApiException as e: + self.logger.error( f"error in reading pod {pod_name} to check if all containers are started: {e}" ) + return False + assert isinstance(myPod, V1Pod), f"myPod has invalid type {type(myPod)}, V1Pod is expected" + if not isinstance( myPod.status, V1PodStatus ): + return False + if not isinstance( myPod.status.container_statuses, list): + return False + for c in myPod.status.container_statuses: + if c.started is not True: + return False + return True + + def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, rolesinfo:AuthRoles, **kwargs)-> ODDesktop : """createdesktop create the user pod @@ -3286,7 +3320,7 @@ def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, **kwargs)-> ODDesk env[ 'USER' ] = posixuser.get('uid') # read uid env[ 'USERNAME' ] = posixuser.get('uid') # read uid env[ 'LOGNAME' ] = posixuser.get('uid') # read uid - env[ 'PULSE_SERVER' ] = 'unix:/tmp/.pulse.sock' # for embedded applications + env[ 'PULSE_SERVER' ] = oc.od.settings.desktop['pulseaudiosocketpath'] # set PULSE_SERVER env[ 'ABCDESKTOP_EXECUTE_CLASSNAME' ] = executeclassname env[ 'ABCDESKTOP_EXECUTE_CLASS' ] = json.dumps(executeclasse) env[ 'ABCDESKTOP_RUNTIME_CLASSNAME' ] = executeclasse.get('runtimeClassName','') @@ -3300,7 +3334,7 @@ def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, **kwargs)-> ODDesk 'access_provider': authinfo.provider, 'access_providertype': authinfo.providertype, 'access_userid': userinfo.userid, - 'access_username': self.get_labelvalue(userinfo.name), + 'access_username': self.get_labelvalue(userinfo.name), # only for human readable label, not for logic use, because userinfo.name can contains special character and is not unique 'netpol/ocuser': 'true', 'xauthkey': env[ 'XAUTH_KEY' ], 'pulseaudio_cookie': env[ 'PULSEAUDIO_COOKIE' ], @@ -3310,9 +3344,14 @@ def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, **kwargs)-> ODDesk # add authinfo labels and env # could also use downward-api https://kubernetes.io/docs/concepts/workloads/pods/downward-api/ for k,v in authinfo.get_labels().items(): - abcdesktopvarenvname = oc.od.settings.ENV_PREFIX_LABEL_NAME + k.lower() - env[ abcdesktopvarenvname ] = v - labels[k] = v + if k.isalnum(): # only add alpanum label to avoid issue with kubernetes label validation, and only for env var, not for labels because we can use normalize_name_label for labels + abcdesktopvarenvname = oc.od.settings.ENV_PREFIX_LABEL_NAME + k.lower() + env[ abcdesktopvarenvname ] = v + labels[oc.auth.namedlib.normalize_label(k)] = oc.auth.namedlib.normalize_label(v) + + for k,v in rolesinfo.items(): + labels[oc.auth.namedlib.normalize_label(k)] = oc.auth.namedlib.normalize_label(v) + # add enabled services in env dict for currentcontainertype in self.nameprefixdict.keys() : if self.isenablecontainerinpod( authinfo, currentcontainertype ): @@ -3542,22 +3581,6 @@ def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, **kwargs)-> ODDesk pod_manifest['spec']['containers'].append( new_container ) self.logger.debug(f"container added {currentcontainertype} to pod {pod_name}") - """ - storage - currentcontainertype = 'storage' - if self.isenablecontainerinpod( authinfo, currentcontainertype ): - new_container = self.addcontainertopod( - authinfo=authinfo, - userinfo=userinfo, - currentcontainertype=currentcontainertype, - myuuid=myuuid, - envlist=envlist, - list_volumeMounts=list_pod_allvolumeMounts - ) - pod_manifest['spec']['containers'].append( new_container ) - self.logger.debug(f"container added {currentcontainertype} to pod {pod_name}") - """ - # add snapshot container if enabled # snasphot is a special container # it need some secrets env variables @@ -3619,132 +3642,164 @@ def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, **kwargs)-> ODDesk self.on_desktoplaunchprogress(f"b.Watching for events" ) self.logger.debug('watch list_namespaced_event pod creating' ) pulled_counter = 0 - started_counter = 0 expected_containers_len = 0 if isinstance( pod.spec.init_containers, list ): expected_containers_len += len( pod.spec.init_containers ) if isinstance( pod.spec.containers, list ): expected_containers_len += len( pod.spec.containers ) - # watch list_namespaced_event + continue_reading_events = True w = watch.Watch() + while continue_reading_events: + try: + # watch list_namespaced_event + for event in w.stream( self.kubeapi.list_namespaced_event, + namespace=self.namespace, + timeout_seconds=oc.od.settings.desktop['K8S_CREATE_POD_TIMEOUT_SECONDS'], + field_selector=f'involvedObject.name={pod_name}'): + + if not isinstance(event, dict ): continue # safe type test event is a dict + if not isinstance(event.get('object'), CoreV1Event ): continue # safe type test event object is a CoreV1Event + event_object = event.get('object') + # self.logger.debug(f"{event_object.type} reason={event_object.reason} message={event_object.message}") + self.on_desktoplaunchprogress( f"b.{event_object.message}" ) - for event in w.stream( self.kubeapi.list_namespaced_event, - namespace=self.namespace, - timeout_seconds=oc.od.settings.desktop['K8S_CREATE_POD_TIMEOUT_SECONDS'], - field_selector=f'involvedObject.name={pod_name}'): - - if not isinstance(event, dict ): continue # safe type test event is a dict - if not isinstance(event.get('object'), CoreV1Event ): continue # safe type test event object is a CoreV1Event - event_object = event.get('object') - self.logger.debug(f"{event_object.type} reason={event_object.reason} message={event_object.message}") - self.on_desktoplaunchprogress( f"b.{event_object.message}" ) - - # - # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Event.md - # Type of this event (Normal, Warning), new types could be added in the future - # 'Normal': Information only and will not cause any problems - # 'Warning': These events are to warn that something might go wrong - - if event_object.type == 'Warning': # event Warning - # something might goes wrong - self.logger.error(f"{event_object.type} reason={event_object.reason} message={event_object.message}") - w.stop() - return f"{event_object.type} {event_object.reason} {event_object.message}" - - elif event_object.type == 'Normal': # event Normal - - if event_object.reason in [ 'Created', 'Pulling', 'Scheduled' ]: - continue # nothing to do - - # check reason, read - # https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/events/event.go - # reason should be a short, machine understandable string that gives the reason for the transition - # into the object's current status. - if event_object.reason == 'Pulled': - self.logger.debug( f"Event Pulled received pulled_counter={pulled_counter}") - pulled_counter = pulled_counter + 1 - # if all images are pulled - self.logger.debug( f"counter pulled_counter={pulled_counter} expected_containers_len={expected_containers_len}") - if pulled_counter >= expected_containers_len : - self.logger.debug( f"counter pulled_counter={pulled_counter} >= expected_containers_len={expected_containers_len}") - # pod_IPAddress = self.getPodIPAddress( pod.metadata.name ) - # if isinstance( pod_IPAddress, str ): - # self.logger.debug( f"{pod.metadata.name} has an ip address: {pod_IPAddress}") - # self.on_desktoplaunchprogress(f"b.Your pod {pod.metadata.name} gets ip address {pod_IPAddress} from network plugin") - # self.logger.debug( f"stop watching event list_namespaced_event for pod {pod.metadata.name} ") + # + # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Event.md + # Type of this event (Normal, Warning), new types could be added in the future + # 'Normal': Information only and will not cause any problems + # 'Warning': These events are to warn that something might go wrong + + if event_object.type == 'Warning': # event Warning + # something might goes wrong + self.logger.error(f"{event_object.type} reason={event_object.reason} message={event_object.message}") w.stop() - - elif event_object.reason == 'Started': - self.logger.debug( f"Event Started received started_counter={started_counter}") - started_counter = started_counter + 1 - self.logger.debug( f"counter started_counter={started_counter} expected_containers_len={expected_containers_len}") - #pod_IPAddress = self.getPodIPAddress( pod.metadata.name ) - #if isinstance( pod_IPAddress, str ): - # self.logger.debug( f"{pod.metadata.name} has an ip address: {pod_IPAddress}") - if started_counter >= expected_containers_len : + return f"{event_object.type} {event_object.reason} {event_object.message}" + + elif event_object.type == 'Normal': # event Normal + + if event_object.reason in [ 'Created', 'Pulling', 'Scheduled' ]: + continue # nothing to do + + # check reason, read + # https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/events/event.go + # reason should be a short, machine understandable string that gives the reason for the transition + # into the object's current status. + if event_object.reason == 'Pulled': + self.logger.debug( f"Event Pulled received pulled_counter={pulled_counter}") + pulledmyPod = self.kubeapi.read_namespaced_pod(namespace=self.namespace,name=pod_name) + pulled_counter = pulled_counter + 1 + # if all images are pulled + self.logger.debug( f"counter pulled_counter={pulled_counter} expected_containers_len={expected_containers_len}") + if pulled_counter >= expected_containers_len : + self.logger.debug( f"counter pulled_counter={pulled_counter} >= expected_containers_len={expected_containers_len}") + # pod_IPAddress = self.getPodIPAddress( pod.metadata.name ) + # if isinstance( pod_IPAddress, str ): + # self.logger.debug( f"{pod.metadata.name} has an ip address: {pod_IPAddress}") + # self.on_desktoplaunchprogress(f"b.Your pod {pod.metadata.name} gets ip address {pod_IPAddress} from network plugin") + # self.logger.debug( f"stop watching event list_namespaced_event for pod {pod.metadata.name} ") + continue_reading_events = False + w.stop() + + elif event_object.reason == 'Started': + if self.areAllmyContainerStarted( pod_name=pod_name ) is True: + continue_reading_events = False + w.stop() + else: + self.logger.debug(f"Event Started received but not all containers are started, continue watching {pod_name}") + continue + else: + # log the events + self.logger.debug(f"{event_object.type} reason={event_object.reason} message={event_object.message}") + self.on_desktoplaunchprogress(f"b.Your pod gets event {event_object.message or event_object.reason}") + # fix for https://github.com/abcdesktopio/oc.user/issues/52 + # this is not an error + continue_reading_events = False + w.stop() + + else: + # this event is not 'Normal' or 'Warning', unknow event received + self.logger.error(f"UNMANAGED EVENT pod type {event_object.type}") + continue_reading_events = False w.stop() + + except ApiException as e: + if hasattr(e, 'status') and e.status == 504 and \ + hasattr(e, 'reason') and 'Too large resource version' in e.reason : + self.logger.debug( f"retrying after Timeout: Too large resource version ApiException {e}") else: - # log the events - self.logger.debug(f"{event_object.type} reason={event_object.reason} message={event_object.message}") - self.on_desktoplaunchprogress(f"b.Your pod gets event {event_object.message or event_object.reason}") - # fix for https://github.com/abcdesktopio/oc.user/issues/52 - # this is not an error - w.stop() - # return f"{event_object.reason} {event_object.message}" - - else: - # this event is not 'Normal' or 'Warning', unknow event received - self.logger.error(f"UNMANAGED EVENT pod type {event_object.type}") - w.stop() + continue_reading_events = False + self.logger.error( f"{type(e)} {e}" ) + + except Exception as e: + self.logger.error( f"Exception: {e}" ) + continue_reading_events = False + # # list_namespaced_event done # self.logger.debug('watch list_namespaced_pod creating, waiting for pod quit Pending phase' ) - w = watch.Watch() - for event in w.stream( self.kubeapi.list_namespaced_pod, - namespace=self.namespace, - timeout_seconds=oc.od.settings.desktop['K8S_CREATE_POD_TIMEOUT_SECONDS'], - field_selector=f"metadata.name={pod_name}" ): - # event must be a dict, else continue - if not isinstance(event,dict): - self.logger.error( f"event type is {type( event )}, and should be a dict, skipping event") - continue - - event_type = event.get('type') # event dict must contain a type - pod_event = event.get('object') # event dict must contain a pod object - if not isinstance( pod_event, V1Pod ): continue # if podevent type must be a V1Pod - if not isinstance( pod_event.status, V1PodStatus ): continue - # - self.on_desktoplaunchprogress( f"b.Your {pod_event.kind.lower()} is {event_type.lower()}") - self.logger.debug(f"The pod {pod_event.metadata.name} is in phase={pod_event.status.phase}" ) - # - # from https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ - # - # possible values for phase - # Pending The Pod has been accepted by the Kubernetes cluster, but one or more of the containers has not been set up and made ready to run. This includes time a Pod spends waiting to be scheduled as well as the time spent downloading container images over the network. - # Running The Pod has been bound to a node, and all of the containers have been created. At least one container is still running, or is in the process of starting or restarting. - # Succeeded All containers in the Pod have terminated in success, and will not be restarted. - # Failed All containers in the Pod have terminated, and at least one container has terminated in failure. - # Unknown For some reason the state of the Pod could not be obtained. This phase typically occurs due to an error in communicating with the node where the Pod should be running. - if pod_event.status.phase == 'Pending' : - self.on_desktoplaunchprogress( f"b.Your pod {pod_event.metadata.name} is {pod_event.status.phase}" ) - continue - elif pod_event.status.phase == 'Running' : - startedmsg = self.getPodStartedMessage(self.graphicalcontainernameprefix, pod_event, myEvent=None) - self.on_desktoplaunchprogress( startedmsg ) - w.stop() - elif pod_event.status.phase == 'Succeeded' or \ - pod_event.status.phase == 'Failed' : - # pod data object is complete, stop reading event - # phase can be 'Running' 'Succeeded' 'Failed' 'Unknown' - self.logger.debug(f"The pod {pod_event.metadata.name} is not in Pending phase, phase={pod_event.status.phase} stop watching" ) - w.stop() - else: - # pod_event.status.phase should be 'Unknow' - self.logger.error(f"UNMANAGED CASE pod {pod_event.metadata.name} is in unmanaged phase {pod_event.status.phase}") - self.logger.error(f"The pod {pod_event.metadata.name} is in phase={pod_event.status.phase} stop watching" ) - w.stop() + continue_reading_events = True + w = watch.Watch() + while continue_reading_events: + try: + for event in w.stream( self.kubeapi.list_namespaced_pod, + namespace=self.namespace, + timeout_seconds=oc.od.settings.desktop['K8S_CREATE_POD_TIMEOUT_SECONDS'], + field_selector=f"metadata.name={pod_name}" ): + # event must be a dict, else continue + if not isinstance(event,dict): + self.logger.error( f"event type is {type( event )}, and should be a dict, skipping event") + continue + + event_type = event.get('type') # event dict must contain a type + pod_event = event.get('object') # event dict must contain a pod object + if not isinstance( pod_event, V1Pod ): continue # if podevent type must be a V1Pod + if not isinstance( pod_event.status, V1PodStatus ): continue + # + self.on_desktoplaunchprogress( f"b.Your {pod_event.kind.lower()} is {event_type.lower()}") + self.logger.debug(f"The pod {pod_event.metadata.name} is in phase={pod_event.status.phase}" ) + # + # from https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + # + # possible values for phase + # Pending The Pod has been accepted by the Kubernetes cluster, but one or more of the containers has not been set up and made ready to run. This includes time a Pod spends waiting to be scheduled as well as the time spent downloading container images over the network. + # Running The Pod has been bound to a node, and all of the containers have been created. At least one container is still running, or is in the process of starting or restarting. + # Succeeded All containers in the Pod have terminated in success, and will not be restarted. + # Failed All containers in the Pod have terminated, and at least one container has terminated in failure. + # Unknown For some reason the state of the Pod could not be obtained. This phase typically occurs due to an error in communicating with the node where the Pod should be running. + if pod_event.status.phase == 'Pending' : + self.on_desktoplaunchprogress( f"b.Your pod {pod_event.metadata.name} is {pod_event.status.phase}" ) + continue + elif pod_event.status.phase == 'Running' : + startedmsg = self.getPodStartedMessage(self.graphicalcontainernameprefix, pod_event, myEvent=None) + self.on_desktoplaunchprogress( startedmsg ) + continue_reading_events = False + w.stop() + elif pod_event.status.phase == 'Succeeded' or pod_event.status.phase == 'Failed' : + # pod data object is complete, stop reading event + # phase can be 'Running' 'Succeeded' 'Failed' 'Unknown' + self.logger.debug(f"The pod {pod_event.metadata.name} is not in Pending phase, phase={pod_event.status.phase} stop watching" ) + continue_reading_events = False + w.stop() + else: + # pod_event.status.phase should be 'Unknow' + self.logger.error(f"UNMANAGED CASE pod {pod_event.metadata.name} is in unmanaged phase {pod_event.status.phase}") + self.logger.error(f"The pod {pod_event.metadata.name} is in phase={pod_event.status.phase} stop watching" ) + continue_reading_events = False + w.stop() + + except ApiException as e: + if hasattr(e, 'status') and e.status == 504 and \ + hasattr(e, 'reason') and 'Too large resource version' in e.reason : + self.logger.debug( f"retrying after Timeout: Too large resource version ApiException {e}") + else: + continue_reading_events = False + self.logger.error( f"{e}" ) + + except Exception as e: + self.logger.error( f"{e}" ) + continue_reading_events = False self.logger.debug(f"watch list_namespaced_pod created, the pod is no more in Pending phase" ) @@ -4621,7 +4676,7 @@ def get_DISPLAY( self, desktop_ip_addr:str=None ): return ':0.0' def get_PULSE_SERVER( self, desktop_ip_addr:str='' ): - return 'unix:/tmp/.pulse.sock' + return oc.od.settings.desktop['pulseaudiosocketpath'] def get_CUPS_SERVER( self, desktop_ip_addr:str ): return desktop_ip_addr + ':' + str(DEFAULT_CUPS_TCP_PORT) @@ -5014,11 +5069,11 @@ def watch_for_pulling_event( self, myDesktop:ODDesktop, pod_name:str, app_contai continue_reading_events = True dict_state_exec_only_once = {} + w = watch.Watch() while continue_reading_events: timeout_seconds = 5 # seconds try: # watch list_namespaced_event - w = watch.Watch() for event in w.stream( self.orchestrator.kubeapi.list_namespaced_event, namespace=self.orchestrator.namespace, field_selector=field_selector, @@ -5060,8 +5115,12 @@ def watch_for_pulling_event( self, myDesktop:ODDesktop, pod_name:str, app_contai w.stop() continue except ApiException as e: - continue_reading_events = False - self.logger.error( f"ApiException {e}" ) + if hasattr(e, 'status') and e.status == 504 and hasattr(e, 'reason') and 'Too large resource version' in e.reason : + self.logger.debug( f"retrying after Timeout: Too large resource version ApiException {e}") + continue + else: + continue_reading_events = False + self.logger.error( f"ApiException {e}" ) except Exception as e: continue_reading_events = False self.logger.error( f"Exception {e}" ) @@ -5107,7 +5166,7 @@ def watch_for_pulling_event( self, myDesktop:ODDesktop, pod_name:str, app_contai self.logger.debug('thread_to_watch_for_pulling_event end') - def create(self, myDesktop:ODDesktop, app, authinfo:AuthInfo, userinfo:AuthUser={}, userargs=None, **kwargs ): + def create(self, myDesktop:ODDesktop, app:dict, authinfo:AuthInfo, userinfo:AuthUser={}, userargs=None, **kwargs ): """create create an ephemeral container in a desktop pod Args: @@ -5273,118 +5332,6 @@ def create(self, myDesktop:ODDesktop, app, authinfo:AuthInfo, userinfo:AuthUser= self.logger.debug(f"create done {appinstancestatus}") return appinstancestatus - """ - self.logger.debug(f"starting read_namespaced_pod") - w = watch.Watch() - # read_namespaced_pod - for event in w.stream( self.orchestrator.kubeapi.list_namespaced_pod, - namespace=self.orchestrator.namespace, - field_selector=f"metadata.name={pod_name}" ): - # event must be a dict, else continue - if not isinstance(event,dict): continue - self.logger.debug( f"list_namespaced_pod get event type={event.get('type')}") - # object={type(event.get('object'))}" ) - pod = event.get('object') - # if podevent type must be a V1Pod, we use kubeapi.list_namespaced_pod - if not isinstance( pod, V1Pod ): continue - if not isinstance( pod.status, V1PodStatus ): continue - if not isinstance( pod.status.ephemeral_container_statuses, list): continue - - for c in pod.status.ephemeral_container_statuses: - if isinstance( c, V1ContainerStatus ) and c.name == app_container_name: - self.logger.debug( f"{app_container_name} is found in ephemeral_container_statuses {c}") - if isinstance( c.state, V1ContainerState ): - if isinstance(c.state.terminated, V1ContainerStateTerminated ): - appinstancestatus.message = 'Terminated' - w.stop() - break - elif isinstance(c.state.running, V1ContainerStateRunning ): - appinstancestatus.message = 'Running' - w.stop() - break - elif isinstance(c.state.waiting, V1ContainerStateWaiting): - self.logger.debug( f"V1ContainerStateWaiting reason={c.state.waiting.reason}" ) - data = { 'message': app.get('name'), - 'name': app.get('name'), - 'icondata': app.get('icondata'), - 'icon': app.get('icon'), - 'image': app.get('id'), - 'launch': app.get('launch') - } - if c.state.waiting.reason == 'PodInitializing': - data['message'] = f"{c.state.waiting.reason} {app.get('name')}, please wait" - self.orchestrator.notify_user( myDesktop, 'container', data ) - - if event.get('type') == 'ERROR': - self.logger.error( f"{event.get('type')} object={type(event.get('object'))}") - appinstancestatus.message = 'ERROR' - w.stop() - - return appinstancestatus - """ - - """ - # Valid values for event types (new types could be added in future) - # EventTypeNormal string = "Normal" // Information only and will not cause any problems - # EventTypeWarning string = "Warning" // These events are to warn that something might go wrong - # self.logger.info( f"object_type={event_object.type} reason={event_object.reason}") - # message = f"b.{event_object.reason} {event_object.message.lower()}" - - - send_previous_pulling_message = False - # watch list_namespaced_event - w = watch.Watch() - for event in w.stream( self.orchestrator.kubeapi.list_namespaced_event, - namespace=self.orchestrator.namespace, - timeout_seconds=self.orchestrator.DEFAULT_K8S_CREATE_TIMEOUT_SECONDS, - field_selector=f'involvedObject.name={pod_name}' ): - if not isinstance(event, dict ): continue - if not isinstance(event.get('object'), CoreV1Event ): continue - - # Valid values for event types (new types could be added in future) - # EventTypeNormal string = "Normal" // Information only and will not cause any problems - # EventTypeWarning string = "Warning" // These events are to warn that something might go wrong - - event_object = event.get('object') - - self.logger.debug(f"{event_object.type} reason={event_object.reason} message={event_object.message}") - data = { - 'message': app.get('name'), - 'name': app.get('name'), - 'icondata': app.get('icondata'), - 'icon': app.get('icon'), - 'image': app.get('id'), - 'launch': app.get('launch') - } - if event_object.type == 'Normal': - if event_object.reason == 'Pulling': - send_previous_pulling_message = True - data['message'] = f"Installing {app.get('name')}, please wait" - self.orchestrator.notify_user( myDesktop, 'container', data ) - elif event_object.reason == 'Pulled': - if send_previous_pulling_message is True: - data['message'] = f"{app.get('name')} is installed" - self.orchestrator.notify_user( myDesktop, 'container', data ) - elif event_object.reason in [ 'Created', 'Scheduled' ]: - pass # nothing to do - elif event_object.reason == 'Started': - # w.stop() - pass - else: - data['message'] = f"{app.get('name')} is {event_object.reason}" - self.orchestrator.notify_user( myDesktop, 'container', data ) - self.logger.error(f"{event_object.type} reason={event_object.reason} message={event_object.message}") - w.stop() - - else: # event_object.type == 'Warning': - # an error occurs - data['name'] = event_object.type - data['message'] = event_object.reason - self.orchestrator.notify_user( myDesktop, 'container', data ) - w.stop() - - return appinstancestatus - """ def describe( self, pod_name:str, app_name:str, apps:ODApps ): description = None @@ -5751,7 +5698,7 @@ def create_thread_to_watch_for_end_of_pod_initializing( self, myDesktop:ODDeskto def watch_for_end_of_pod_initializing( self, myDesktop:ODDesktop, app_pod_name:str, app:dict )->None: self.logger.debug('') - # pod data object is complete, stop reading event + # pod data object is complete, stop reading event # phase can be 'Running' 'Succeeded' 'Failed' 'Unknown' data = { 'id': app_pod_name, @@ -5763,63 +5710,65 @@ def watch_for_end_of_pod_initializing( self, myDesktop:ODDesktop, app_pod_name:s 'launch': app.get('launch') } - w = watch.Watch() - for event in w.stream( self.orchestrator.kubeapi.list_namespaced_pod, - namespace=self.orchestrator.namespace, - timeout_seconds=oc.od.settings.desktop['K8S_CREATE_POD_TIMEOUT_SECONDS'], - field_selector=f"metadata.name={app_pod_name}" ): - # event must be a dict, else continue - if not isinstance(event,dict): - self.logger.error( f"event type is {type( event )}, and should be a dict, skipping event") - continue - - self.logger.debug( f"event type is {event.get('type')}") - # event dict must contain a pod object - pod_event = event.get('object') - # if podevent type must be a V1Pod, we use kubeapi.list_namespaced_pod - if not isinstance( pod_event, V1Pod ): continue - if not isinstance( pod_event.status, V1PodStatus ): continue - - # expected_containers_len = len( pod_event.spec.containers ) + len( pod_event.spec.init_containers ) - - # self.logger.debug( f"pod_event.status.phase={pod_event.status.phase} pod_event.status.reason={pod_event.status.reason}") - # - # from https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ - # possible values for phase - # Pending The Pod has been accepted by the Kubernetes cluster, but one or more of the containers has not been set up and made ready to run. This includes time a Pod spends waiting to be scheduled as well as the time spent downloading container images over the network. - # Running The Pod has been bound to a node, and all of the containers have been created. At least one container is still running, or is in the process of starting or restarting. - # Succeeded All containers in the Pod have terminated in success, and will not be restarted. - # Failed All containers in the Pod have terminated, and at least one container has terminated in failure. - # Unknown For some reason the state of the Pod could not be obtained. This phase typically occurs due to an error in communicating with the node where the Pod should be running. - - if pod_event.status.phase == 'Running': - data['reason'] = pod_event.status.phase - data['message'] = pod_event.status.message or pod_event.status.phase - self.orchestrator.notify_user( myDesktop, 'container', data ) - w.stop() - continue - - if pod_event.status.phase == 'Pending': - if pod_event.status.reason in [ 'Pulling', 'Pulled', 'Started' ]: + # we must catch exception because + # if the pod is deleted while we are watching, it will raise an exception and we want to catch it and stop the thread + # if kubernetes.client.exceptions.ApiException: (504) Reason: Timeout: Timeout: Too large resource version: 135065452, current: 135065439 + try: + w = watch.Watch() + for event in w.stream( self.orchestrator.kubeapi.list_namespaced_pod, + namespace=self.orchestrator.namespace, + timeout_seconds=oc.od.settings.desktop['K8S_CREATE_POD_TIMEOUT_SECONDS'], + field_selector=f"metadata.name={app_pod_name}" ): + # event must be a dict, else continue + if not isinstance(event,dict): continue + self.logger.debug( f"event type is {event.get('type')}") + # event dict must contain a pod object + pod_event = event.get('object') + # if podevent type must be a V1Pod, we use kubeapi.list_namespaced_pod + if not isinstance( pod_event, V1Pod ): continue + if not isinstance( pod_event.status, V1PodStatus ): continue + + # from https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + # possible values for phase + # Pending The Pod has been accepted by the Kubernetes cluster, but one or more of the containers has not been set up and made ready to run. This includes time a Pod spends waiting to be scheduled as well as the time spent downloading container images over the network. + # Running The Pod has been bound to a node, and all of the containers have been created. At least one container is still running, or is in the process of starting or restarting. + # Succeeded All containers in the Pod have terminated in success, and will not be restarted. + # Failed All containers in the Pod have terminated, and at least one container has terminated in failure. + # Unknown For some reason the state of the Pod could not be obtained. This phase typically occurs due to an error in communicating with the node where the Pod should be running. + + if pod_event.status.phase == 'Running': data['reason'] = pod_event.status.phase data['message'] = pod_event.status.message or pod_event.status.phase self.orchestrator.notify_user( myDesktop, 'container', data ) - continue + w.stop() + continue - if pod_event.status.phase == 'Warning': - data['reason'] = pod_event.status.phase - data['message'] = pod_event.status.message - self.orchestrator.notify_user( myDesktop, 'container', data ) - w.stop() - elif pod_event.status.phase in [ 'Failed', 'Unknown', 'Warning', 'Succeeded'] : - # pod data object is complete, stop reading event - # phase can be 'Running' 'Succeeded' 'Failed' 'Unknown' - # an error occurs - data['reason'] = pod_event.status.type - data['message'] = pod_event.status.reason - self.orchestrator.notify_user( myDesktop, 'container', data ) - self.logger.debug(f"The pod is not in Pending phase, phase={pod_event.status.phase} stop watching" ) - w.stop() + if pod_event.status.phase == 'Pending': + if pod_event.status.reason in [ 'Pulling', 'Pulled', 'Started' ]: + data['reason'] = pod_event.status.phase + data['message'] = pod_event.status.message or pod_event.status.phase + self.orchestrator.notify_user( myDesktop, 'container', data ) + continue + + if pod_event.status.phase == 'Warning': + data['reason'] = pod_event.status.phase + data['message'] = pod_event.status.message + self.orchestrator.notify_user( myDesktop, 'container', data ) + w.stop() + + elif pod_event.status.phase in [ 'Failed', 'Unknown', 'Warning', 'Succeeded'] : + # pod data object is complete, stop reading event + # phase can be 'Running' 'Succeeded' 'Failed' 'Unknown' + # an error occurs + data['reason'] = pod_event.status.type + data['message'] = pod_event.status.reason + self.orchestrator.notify_user( myDesktop, 'container', data ) + self.logger.debug(f"The pod is not in Pending phase, phase={pod_event.status.phase} stop watching" ) + w.stop() + + except Exception as e: + self.logger.error( f"Exception in watch_for_end_of_pod_initializing: {e}" ) + self.logger.debug('end of watch_for_end_of_pod_initializing') @@ -5855,55 +5804,59 @@ def watch_for_pulling_event( self, myDesktop:ODDesktop, app_pod_name:str, app:di 'launch': app.get('launch') } - w = watch.Watch() - for event in w.stream( self.orchestrator.kubeapi.list_namespaced_event, - namespace=self.orchestrator.namespace, - timeout_seconds=oc.od.settings.desktop['K8S_CREATE_POD_TIMEOUT_SECONDS'], - field_selector=f'involvedObject.name={app_pod_name}' ): - # safe type check - if not isinstance(event, dict ): continue - if not isinstance(event.get('object'), CoreV1Event ): continue - - # Valid values for event types (new types could be added in future) - # EventTypeNormal string = "Normal" // Information only and will not cause any problems - # EventTypeWarning string = "Warning" // These events are to warn that something might go wrong - - event_object = event.get('object') - self.logger.debug(f"{event_object.type} reason={event_object.reason} message={event_object.message}") - - data['reason'] = event_object.reason - data['message'] = event_object.message - - if event_object.type == 'Normal': - if event_object.reason == 'Pulling': - data['message'] = f"{event_object.reason} {app.get('name')}, please wait" - self.orchestrator.notify_user( myDesktop, 'container', data ) - elif event_object.reason == 'Pulled': - self.logger.debug( f"Event Pulled received") - self.orchestrator.notify_user( myDesktop, 'container', data ) - elif event_object.reason == 'Started': - self.orchestrator.notify_user( myDesktop, 'container', data ) - w.stop() - elif event_object.reason in [ 'Scheduled', 'Created' ]: - self.orchestrator.notify_user( myDesktop, 'container', data ) - else: - data['message'] = f"{event_object.reason} {event_object.message}" + # we must catch exception because + # if the pod is deleted while we are watching, it will raise an exception and we want to catch it and stop the thread + # if kubernetes.client.exceptions.ApiException: (504) Reason: Timeout: Timeout: Too large resource version: 135065452, current: 135065439 + try: + w = watch.Watch() + for event in w.stream( self.orchestrator.kubeapi.list_namespaced_event, + namespace=self.orchestrator.namespace, + timeout_seconds=oc.od.settings.desktop['K8S_CREATE_POD_TIMEOUT_SECONDS'], + field_selector=f'involvedObject.name={app_pod_name}' ): + # safe type check + if not isinstance(event, dict ): continue + if not isinstance(event.get('object'), CoreV1Event ): continue + + # Valid values for event types (new types could be added in future) + # EventTypeNormal string = "Normal" // Information only and will not cause any problems + # EventTypeWarning string = "Warning" // These events are to warn that something might go wrong + + event_object = event.get('object') + data['reason'] = event_object.reason + data['message'] = event_object.message + + if event_object.type == 'Normal': + if event_object.reason == 'Pulling': + data['message'] = f"{event_object.reason} {app.get('name')}, please wait" + self.orchestrator.notify_user( myDesktop, 'container', data ) + elif event_object.reason == 'Pulled': + self.logger.debug( f"Event Pulled received") + self.orchestrator.notify_user( myDesktop, 'container', data ) + elif event_object.reason == 'Started': + self.orchestrator.notify_user( myDesktop, 'container', data ) + w.stop() + elif event_object.reason in [ 'Scheduled', 'Created' ]: + self.orchestrator.notify_user( myDesktop, 'container', data ) + else: + data['message'] = f"{event_object.reason} {event_object.message}" + self.orchestrator.notify_user( myDesktop, 'container', data ) + self.logger.error(f"{event_object.type} reason={event_object.reason} message={event_object.message}") + w.stop() + + else: # event_object.type == 'Warning': + # an error occurs + data['name'] = event_object.type + data['message'] = event_object.reason self.orchestrator.notify_user( myDesktop, 'container', data ) - self.logger.error(f"{event_object.type} reason={event_object.reason} message={event_object.message}") w.stop() - - else: # event_object.type == 'Warning': - # an error occurs - data['name'] = event_object.type - data['message'] = event_object.reason - self.orchestrator.notify_user( myDesktop, 'container', data ) - w.stop() + except Exception as e: + self.logger.error( f"Exception in watch_for_end_of_pod_initializing: {e}" ) self.logger.debug('end of watch_for_pulling_event') - def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs ): + def create(self, myDesktop:ODDesktop, app:dict, authinfo:AuthInfo, userinfo:AuthUser={}, userargs=None, **kwargs ): self.logger.debug('') rules = app.get('rules', {}) or {} # app['rules] can be set to None @@ -5927,7 +5880,7 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs 'access_providertype': authinfo.providertype, 'access_provider': authinfo.provider, 'access_userid': userinfo.userid, - 'access_username': self.orchestrator.get_labelvalue(userinfo.name), + 'access_username': self.orchestrator.get_labelvalue(userinfo.name), # 'type': self.type, 'uniquerunkey': app.get('uniquerunkey'), 'netpol/ocapplication': 'true' diff --git a/oc/od/persistentvolumeclaim.py b/oc/od/persistentvolumeclaim.py index aae5113..9aa01dd 100644 --- a/oc/od/persistentvolumeclaim.py +++ b/oc/od/persistentvolumeclaim.py @@ -180,47 +180,60 @@ def create( self, authinfo:AuthInfo, userinfo:AuthUser, persistentvolume_request return pvc - def waitforBoundPVC( self, name:str, callback_notify, timeout:int=42 )->tuple: + def waitforBoundPVC( self, name:str, callback_notify )->tuple: self.logger.debug('') assert_type( name, str ) - w = watch.Watch() event_counter = 0 - for event in w.stream( self.kubeapi.list_namespaced_persistent_volume_claim, - namespace=self.namespace, - timeout_seconds=oc.od.settings.desktop['K8S_BOUND_PVC_TIMEOUT_SECONDS'], - field_selector=f'metadata.name={name}' ): - if event_counter > oc.od.settings.desktop['K8S_BOUND_PVC_MAX_EVENT']: - return (False, f"e.Volume {name} has failed {event_counter}/{oc.od.settings.desktop['K8S_BOUND_PVC_MAX_EVENT']}") - - self.logger.debug( f"read event {event_counter} {event}") - # safe type test event is a dict - if not isinstance(event, dict ): continue - pvc = event.get('object') - if not isinstance(pvc, V1PersistentVolumeClaim ): continue + continue_reading_events = True + w = watch.Watch() + while continue_reading_events: + try: + for event in w.stream( self.kubeapi.list_namespaced_persistent_volume_claim, + namespace=self.namespace, + timeout_seconds=oc.od.settings.desktop['K8S_BOUND_PVC_TIMEOUT_SECONDS'], + field_selector=f'metadata.name={name}' ): + + # safe type test event is a dict + if not isinstance(event, dict ): continue + pvc = event.get('object') + if not isinstance(pvc, V1PersistentVolumeClaim ): continue + volume_name = None + storage_class_name = None + if isinstance( pvc.spec, V1PersistentVolumeClaimSpec ): + # volume_mode = pvc.spec.volume_mode + volume_name = pvc.spec.volume_name + storage_class_name = pvc.spec.storage_class_name + + if isinstance( pvc.status, V1PersistentVolumeClaimStatus): + # A volume will be in one of the following phases: + # Available -- a free resource that is not yet bound to a claim + # Bound -- the volume is bound to a claim + # Released -- the claim has been deleted, but the resource is not yet reclaimed by the cluster + # Failed -- the volume has failed its automatic reclamation + if callable(callback_notify): + callback_notify( f"b.Reading your persistent volume claim {name}, status is {pvc.status.phase}, using storage class {storage_class_name} " ) + if pvc.status.phase == 'Bound': + # continue_reading_events = False + return (True, f"b. Your persistent volume claim {name} is {pvc.status.phase} using storage class {storage_class_name} ") + if pvc.status.phase == 'Failed': + # continue_reading_events = False + return (False, f"e.PersistentVolumeClaim {name} has failed its automatic reclamation, claim={name}, volume {volume_name}, storage class {storage_class_name}") + if pvc.status.phase in [ 'Pending', 'Available' ]: + event_counter += 1 - # volume_mode = 'unknowfilesystem' - volume_name = 'unknowvolumename' - storage_class_name = 'unknowstorageclassname' - if isinstance( pvc.spec, V1PersistentVolumeClaimSpec ): - # volume_mode = pvc.spec.volume_mode - volume_name = pvc.spec.volume_name - storage_class_name = pvc.spec.storage_class_name - if isinstance( pvc.status, V1PersistentVolumeClaimStatus): - # A volume will be in one of the following phases: - # Available -- a free resource that is not yet bound to a claim - # Bound -- the volume is bound to a claim - # Released -- the claim has been deleted, but the resource is not yet reclaimed by the cluster - # Failed -- the volume has failed its automatic reclamation - if callable(callback_notify): - callback_notify( f"b.Reading your persistent volume claim {name}, status is {pvc.status.phase}, using storage class {storage_class_name} " ) - if pvc.status.phase == 'Bound': - return (True, f"b. Your persistent volume claim {name} is {pvc.status.phase} using storage class {storage_class_name} ") - if pvc.status.phase == 'Failed': - return (False, f"e.PersistentVolumeClaim {name} has failed its automatic reclamation, claim={name}, volume {volume_name}, storage class {storage_class_name}") - if pvc.status.phase in [ 'Pending', 'Available' ]: - event_counter += 1 - - return (False, f"e.Volume {name} has failed its automatic reclamation") + except ApiException as e: + # + # kubernetes.client.exceptions.ApiException: (504) + # Reason: Timeout: Timeout: Too large resource version: 130030756, current: 130030755 + # pass this exception + # read https://github.com/kubernetes/kubernetes/issues/107133 + # + if hasattr(e, 'status') and e.status == 504 and hasattr(e, 'reason') and 'Too large resource version' in e.reason : + self.logger.debug( f"retrying after Timeout: Too large resource version ApiException {e}") + else: + raise e + + return (False, f"e.Volume {name} has failed its automatic reclamation after {event_counter}/{oc.od.settings.desktop['K8S_BOUND_PVC_MAX_EVENT']} events") ''' def waitforBoundPVC( self, name:str, callback_notify, timeout:int=42 )->tuple: diff --git a/oc/od/services.py b/oc/od/services.py index 50384b6..e59f21a 100755 --- a/oc/od/services.py +++ b/oc/od/services.py @@ -24,6 +24,7 @@ def __init__(self): self.prelogin = None self.logmein = None self.fail2ban = None + self.asnumber = None def init(self): """[init services call all services init() methods] @@ -40,6 +41,7 @@ def init(self): self.init_prelogin() self.init_logmein() self.init_fail2ban() + self.init_asnumber() def start(self): """start @@ -64,6 +66,7 @@ def stop(self): * apps mongo change stream watcher * kuberneteswatcher ''' + self.logger.debug('ODServices stopping services threads...') if isinstance(self.apps, oc.od.apps.ODApps): try: self.apps.stop_mongo_watcher() @@ -76,7 +79,6 @@ def stop(self): if isinstance( self.kuberneteswatcher, oc.od.kuberneteswatcher.ODKubernetesWatcher): # always use try/except try: - self.logger.debug( 'kuberneteswatcher in stopping') self.kuberneteswatcher.stop() self.logger.debug( 'kuberneteswatcher stopped') except Exception as e: @@ -84,16 +86,21 @@ def stop(self): else: self.logger.debug( 'self.kuberneteswatcher is not defined') - self.logger.debug('done, this is the end') + self.logger.debug('done, this is the end') # pom pom pom def init_fail2ban( self ): import oc.od.fail2ban self.fail2ban = oc.od.fail2ban.ODFail2ban( mongodburl=settings.mongodburl, + mongodbparam=settings.mongodbparam, fail2banconfig=settings.fail2banconfig ) + def init_asnumber( self ): + import oc.od.asnumber + self.asnumber = oc.od.asnumber.ODASNumber( database='ipasn_db.dat' ) + def init_keymanager(self): """[decode arg params query string in metappli mode ] @@ -149,24 +156,7 @@ def init_accounting(self): def init_datastore(self): import oc.datastore - self.datastore = oc.datastore.ODMongoDatastoreClient(settings.mongodburl) - - ''' - replicaset_name = 'rs0' - # check if replicaset is configured - if not self.datastore.getstatus_replicaset(replicaset_name): - self.logger.info(f"replicaset {replicaset_name} does not exist") - # create a replicaset - create_replicaset = self.datastore.create_replicaset(replicaset_name) - # if create_replicaset is None or False - # create_replicaset can return None but this is not a failure - if not create_replicaset : - # reread if replicaset is configured - create_replicaset = self.datastore.getstatus_replicaset(replicaset_name) - return create_replicaset - self.logger.info(f"replicaset {replicaset_name} exist") - ''' - return True + self.datastore = oc.datastore.ODMongoDatastoreClient(settings.mongodburl, settings.mongodbparam) def init_datacache(self): import oc.sharecache @@ -191,7 +181,7 @@ def init_auth(self): def init_applist( self ): import oc.od.apps # Build applist cache data - self.apps = oc.od.apps.ODApps(mongodburl=settings.mongodburl) + self.apps = oc.od.apps.ODApps(mongodburl=settings.mongodburl, mongodbparam=settings.mongodbparam ) self.apps.cached_applist(bRefresh=True) def init_kuberneteswatcher( self ): diff --git a/oc/od/settings.py b/oc/od/settings.py index 5c07572..505e74c 100755 --- a/oc/od/settings.py +++ b/oc/od/settings.py @@ -17,7 +17,8 @@ # Default namespace used by kubernetes is abcdesktop namespace = 'abcdesktop' -mongodburl = None # Mongodb config Object Class +mongodburl = None # Mongodb config url +mongodbparam = None # Mongodb config parameters fail2banconfig = None # Fail2ban config mongodblist = [] @@ -338,6 +339,8 @@ def init_desktop(): desktop['dnspolicy'] = gconfig.get('desktop.dnspolicy', 'ClusterFirst') desktop['dnsconfig'] = gconfig.get('desktop.dnsconfig') desktop['nodeselector'] = gconfig.get('desktop.nodeselector', {} ) + desktop['theme'] = gconfig.get('desktop.theme') + desktop['pulseaudiosocketpath'] = gconfig.get('desktop.pulseaudiosocketpath', '/tmp/.pulse.sock' ) desktop['prestopexeccommand'] = gconfig.get('desktop.prestopexeccommand', [ "/bin/bash", "-c", "rm -rf ~/{*,.*}" ] ) desktop['persistentvolumeclaim'] = gconfig.get('desktop.persistentvolumeclaim') or gconfig.get('desktop.persistentvolumeclaimspec') desktop['persistentvolume'] = gconfig.get('desktop.persistentvolume') or gconfig.get('desktop.persistentvolumespec') @@ -409,9 +412,7 @@ def init_desktop(): 'log': { 'name': 'log', 'emptyDir': { 'medium': 'Memory', 'sizeLimit': '8Gi' } }, 'rundbus': { 'name': 'rundbus', 'emptyDir': { 'medium': 'Memory', 'sizeLimit': '8M' } }, 'runuser': { 'name': 'runuser', 'emptyDir': { 'medium': 'Memory', 'sizeLimit': '8M' } }, - 'x11socket': { 'name': 'x11socket', 'emptyDir': { 'medium': 'Memory' } }, - 'pulseaudiosocket' : { 'name': 'pulseaudiosocket', 'emptyDir': { 'medium': 'Memory' } }, - 'cupsdsocket': { 'name': 'cupsdsocket', 'emptyDir': { 'medium': 'Memory' } } + 'x11socket': { 'name': 'x11socket', 'emptyDir': { 'medium': 'Memory' } } } if not isinstance ( desktop_pod.get('default_volumes_mount'), dict ): desktop_pod['default_volumes_mount'] = { @@ -421,16 +422,14 @@ def init_desktop(): 'log': { 'name': 'log', 'mountPath': '/var/log/desktop' }, 'rundbus': { 'name': 'rundbus', 'mountPath': '/var/run/dbus' }, 'runuser': { 'name': 'runuser', 'mountPath': '/run/user/' }, - 'x11socket': { 'name': 'x11socket', 'mountPath': '/tmp/.X11-unix' }, - 'pulseaudiosocket': { 'name': 'pulseaudiosocket', 'mountPath': '/tmp/.pulseaudio' }, - 'cupsdsocket': { 'name': 'cupsdsocket', 'mountPath': '/tmp/.cupsd' } + 'x11socket': { 'name': 'x11socket', 'mountPath': '/tmp/.X11-unix' } } if not isinstance ( desktop_pod.get('graphical', {}).get('volumes') , list ): - desktop_pod['graphical']['volumes'] = [ 'x11socket', 'pulseaudiosocket', 'cupsdsocket', 'tmp', 'run', 'log', 'rundbus', 'runuser' ] + desktop_pod['graphical']['volumes'] = [ 'x11socket', 'tmp', 'run', 'log', 'rundbus', 'runuser' ] logger.debug(f"fixing desktop.pod.graphical.volumes config {desktop_pod['graphical']['volumes']}") if not isinstance ( desktop_pod.get('ephemeral_container', {}).get('volumes') , list ): # ephemeral container use the same volumes as graphical pod - desktop_pod['ephemeral_container']['volumes'] = [ 'x11socket', 'pulseaudiosocket', 'cupsdsocket', 'tmp', 'run', 'log', 'rundbus', 'runuser' ] + desktop_pod['ephemeral_container']['volumes'] = [ 'x11socket', 'tmp', 'run', 'log', 'rundbus', 'runuser' ] logger.debug(f"fixing desktop.pod.ephemeral_container.volumes config {desktop_pod['ephemeral_container']['volumes']}") if not isinstance ( desktop_pod.get('pod_application', {}).get('volumes') , list ): desktop_pod['pod_application']['volumes'] = [ 'tmp', 'run', 'log', 'rundbus', 'runuser' ] @@ -562,8 +561,8 @@ def get_mongodburl(): assert isinstance(parsedmongourl.hostname, str), f"Can not parse mongodburl {mongodburl} result {parsedmongourl}" mongodbhostipaddr = _resolv(parsedmongourl.hostname) logger.debug(f"a simple check for mongodb: host {parsedmongourl.hostname} resolved as {mongodbhostipaddr}") - logger.debug(f"mongodburl is set to {mongodburl}") - return mongodburl + mongodbparam = os.getenv('MONGODB_PARAM') or gconfig.get( 'mongodbparam', 'replicaSet=rs0' ) + return (mongodburl, mongodbparam) def init_controllers(): """Define controlers access @@ -625,8 +624,9 @@ def init_config_mongodb(): """ global mongodburl global mongodblist - mongodburl = get_mongodburl() - logger.debug(f"MongoDB url: {mongodburl}") + global mongodbparam + (mongodburl,mongodbparam) = get_mongodburl() + logger.debug(f"MongoDB url: {mongodburl} param: {mongodbparam}") mongodblist = gconfig.get('mongodblist', ['image','fail2ban','loginHistory','applications','profiles','desktop'] ) logger.debug(f"MongoDB list: {mongodblist}") diff --git a/oc/od/volume.py b/oc/od/volume.py index dbc654e..89d44f3 100644 --- a/oc/od/volume.py +++ b/oc/od/volume.py @@ -13,26 +13,11 @@ import os import logging import oc.logging -import oc.auth.namedlib +from oc.auth.authservice import AuthInfo, AuthUser # to read AuthInfo and AuthUser logger = logging.getLogger(__name__) - -def selectODVolume( authinfo, userinfo ): - - volumes = [] - volumeclassnamelist = [] - - if authinfo.providertype == 'activedirectory': - volumeclassnamelist = [ ODVolumeActiveDirectoryCIFS, ODVolumeActiveDirectoryWebDav ] - - for vclass in volumeclassnamelist: - volumes.append( vclass(authinfo, userinfo ) ) - - return volumes - - -def getODVolumebyRules( authinfo, userinfo, rule ): +def getODVolumebyRules( authinfo:AuthInfo, userinfo:AuthUser, rule:dict ): """getODVolumebyRules Args: @@ -50,45 +35,46 @@ def getODVolumebyRules( authinfo, userinfo, rule ): return vol if rule.get('type') == 'cifs' : - name = rule.get('volumename') - mountOptions = rule.get('mountOptions') + name = rule.get('volumename') + mountOptions = rule.get('mountOptions') if rule.get('name') == 'homedirectory' : - homeDrive = userinfo.get('homeDrive', 'homeDrive') - networkPath = userinfo.get('homeDirectory') + homeDrive = userinfo.get('homeDrive', 'homeDrive') + networkPath = userinfo.get('homeDirectory') vol = ODVolumeActiveDirectoryCIFS( authinfo, userinfo, name, homeDrive, networkPath, mountOptions ) else: - entry = rule.get('name') - unc = rule.get('unc') - vol = ODVolumeActiveDirectoryCIFS( authinfo, userinfo, name, entry, unc, mountOptions ) + entry = rule.get('name') + unc = rule.get('unc') + vol = ODVolumeActiveDirectoryCIFS( authinfo, userinfo, name, entry, unc, mountOptions ) - if rule.get('type') == 'webdav' : - entry = userinfo.get('name') - url = rule.get('url') - vol = ODVolumeActiveDirectoryWebDav( authinfo, userinfo, entry, url ) + # if rule.get('type') == 'webdav' : + # entry = userinfo.get('name') + # url = rule.get('url') + # vol = ODVolumeActiveDirectoryWebDav( authinfo, userinfo, entry, url ) if rule.get('type') == 'nfs' : - vol = ODVolumeNFS( name=rule.get('name'), - server=rule.get('server'), - path=rule.get('path'), - mountPath=rule.get('mountPath'), - readOnly=rule.get('readOnly')) + vol = ODVolumeNFS( name=rule.get('name'), + server=rule.get('server'), + path=rule.get('path'), + mountPath=rule.get('mountPath'), + readOnly=rule.get('readOnly')) if rule.get('type') == 'pvc' : vol = ODVolumePersistentVolumeClaim( name=rule.get('name'), mountPath=rule.get('mountPath'), - claimName=rule.get('claimName')) + claimName=rule.get('claimName'), + mountPropagation=rule.get('mountPropagation',None) ) if rule.get('type') == 'hostPath' : vol = ODVolumeHostPath( name=rule.get('name'), path=rule.get('path'), mountPath=rule.get('mountPath'), hostPathType=rule.get('hostPathType','DirectoryOrCreate'), - readOnly=rule.get('readOnly',False) ) - + readOnly=rule.get('readOnly',False), + mountPropagation=rule.get('mountPropagation',None) ) return vol -def selectODVolumebyRules( authinfo, userinfo, rules ): +def selectODVolumebyRules( authinfo:AuthInfo, userinfo:AuthUser, rules:dict ): """selectODVolumebyRules Args: @@ -132,6 +118,7 @@ def __init__(self): self._type = 'base' self._name = 'volbase' self._fstype = None + self.mountPropagation = None @property def type(self): @@ -156,7 +143,7 @@ def has_options(self): @oc.logging.with_logger() class ODVolumeHostPath(ODVolumeBase): - def __init__(self, name:str, mountPath:str, path:str, hostPathType:str='DirectoryOrCreate', readOnly:bool=False ): + def __init__(self, name:str, mountPath:str, path:str, hostPathType:str='DirectoryOrCreate', readOnly:bool=False, mountPropagation:str=None ): super().__init__() self._fstype = 'hostpath' self._type = 'hostpath' @@ -165,6 +152,7 @@ def __init__(self, name:str, mountPath:str, path:str, hostPathType:str='Director self.hostPathType = hostPathType self.mountPath = mountPath self.readOnly = readOnly + self.mountPropagation = mountPropagation def is_mountable(self): return all( [self.path, self.mountPath] ) @@ -172,7 +160,7 @@ def is_mountable(self): @oc.logging.with_logger() class ODVolumePersistentVolumeClaim(ODVolumeBase): - def __init__(self, name:str, mountPath:str, claimName:str ): + def __init__(self, name:str, mountPath:str, claimName:str, mountPropagation:str=None ): super().__init__() self._fstype = 'pvc' self._type = 'pvc' @@ -180,6 +168,7 @@ def __init__(self, name:str, mountPath:str, claimName:str ): self.mountPath = mountPath self.claimName = claimName self._name = self._name.lower() # Kubernetes volume name must be lowercase + self.mountPropagation = mountPropagation def is_mountable(self): return all( [self.claimName, self.mountPath] ) @@ -203,7 +192,7 @@ def is_mountable(self): @oc.logging.with_logger() class ODVolumeActiveDirectory(ODVolumeBase): - def __init__(self, authinfo, userinfo, name): + def __init__(self, authinfo:AuthInfo, userinfo:AuthUser, name:str): super().__init__() ''' authinfo.claims: {'domain': 'AD', 'password': 'xxxx', 'userid': 'alex'} @@ -244,7 +233,7 @@ def is_mountable(self): @oc.logging.with_logger() class ODVolumeActiveDirectoryCIFS(ODVolumeActiveDirectory): - def __init__(self, authinfo, userinfo, name, homeDrive, networkPath, mountOptions=None ): + def __init__(self, authinfo:AuthInfo, userinfo:AuthUser, name:str, homeDrive:str, networkPath:str, mountOptions:str=None ): self.logger.info(locals()) super().__init__(authinfo, userinfo, name) self._fstype = 'cifs' @@ -272,11 +261,11 @@ def is_mountable( self ): return all( [ super().is_mountable(), self.homeDrive, self.networkPath, self._containertarget ] ) - +""" @oc.logging.with_logger() class ODVolumeActiveDirectoryWebDav(ODVolumeActiveDirectory): - def __init__(self, authinfo, userinfo, name, entry, url, mountOptions=None ): + def __init__(self, authinfo:AuthInfo, userinfo:AuthUser, name, entry, url, mountOptions=None ): super().__init__(authinfo, userinfo) self._fstype = 'webdav' self._type = 'flexvol' @@ -317,4 +306,5 @@ def mount_command(self): self.remotewebdav_url, self._mountpoint] return command - ''' \ No newline at end of file + ''' +""" \ No newline at end of file diff --git a/od.py b/od.py index 4b1b249..c6802f2 100755 --- a/od.py +++ b/od.py @@ -212,16 +212,12 @@ def start(self): oc.od.services.services.start() def stop(self): - logger.debug("ODCherryWatcher is stopping. Stopping runnging thread") + logger.debug("ODCherryWatcher is stopping. Stopping running threads") if isinstance( oc.od.services.services, oc.od.services.ODServices ): - logger.debug("ODCherryWatcher is stopping. Stopping runnging thread") oc.od.services.services.stop() def handler_SIGNAL( signal:str, **signum )->None: logger.warning(f"*** Received signal {signal}, stopping cherrypy engine and services {len(signum)}") - # stop services - oc.od.services.services.stop() - # stop cherrypy engine cherrypy.engine.exit() def handler_SIGQUIT( **signum ): handler_SIGNAL( 'SIGQUIT', **signum ) diff --git a/requirements.txt b/requirements.txt index 9d8f097..1923b22 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ graypy>=2.1.0 iso8601>=1.0.2 ldap3==2.9.1 pyasn1==0.6.3 +pyasn>=1.6.2 jaraco.context==6.1.0 netaddr==0.8.0 requests>=2.32.4 @@ -32,3 +33,4 @@ geoip2>=5.1.0 passlib>=1.7.4 standard-imghdr>=3.13.0 setuptools==80.0.0 +ua-parser==1.0.1