-
Notifications
You must be signed in to change notification settings - Fork 36
Description
Is your feature request related to a problem? Please describe.
Ability to handle OAuth flows with secret id/key exchange for token, perhaps with refresh.
Describe the solution you'd like
a new auth class like HeaderAuthentication/CookieAuthentication but can take id, key, url and work some magic before the request is done to get an actual token (and then use the normal HeaderAuthentication
Describe alternatives you've considered
I've made my own adapter, but it's pretty common auth method so a one liner would be nice.
Additional context
I wrote this which seems to do the trick, maybe could be adapted a bit. It does nothing in the perform_initial_auth, instead gets the token if needed in the get_headers call so that if the token is expired it can be refreshed first.
An alternative would be to assume one request per instance and then just have some kind of BodyAuthentication that can send id/key in the body and collect the token from the body which is then put into the existing HeaderAuthentication.
from time import time
from apiclient.authentication_methods import BaseAuthenticationMethod
class OAuthAuthentication(BaseAuthenticationMethod):
"""Authentication using secret_id and secret_key."""
_access_token = None
_token_expiration = None
_refresh_token = None
_refresh_expiration = None
def __init__(
self,
token_url,
refresh_url,
body,
expiry_margin=10,
):
"""Initialize OAuthAuthentication."""
self._token_url = token_url
self._refresh_url = refresh_url
self._body = body
self._expiry_margin = expiry_margin
def get_headers(self):
self.refresh_token()
return {
'Authorization': f'Bearer {self._access_token}',
}
def refresh_token(self):
if self._token_expiration and self._token_expiration >= int(time()):
return True
if self._refresh_expiration and self._refresh_expiration >= int(time()):
ret = self._client.post(
self._refresh_url,
data={
'refresh': self._refresh_token,
},
)
self._access_token = ret.get('access')
self._token_expiration = int(time()) + int(ret.get('access_expires')) - self._expiry_margin
return True
ret = self._client.post(self._token_url, data=self._body)
self._access_token = ret.get('access')
self._refresh_token = ret.get('refresh')
self._token_expiration = int(time()) + int(ret.get('access_expires')) - self._expiry_margin
self._refresh_expiration = int(time()) + int(ret.get('refresh_expires')) - self._expiry_margin
def perform_initial_auth(self, client):
self._client = client