Source code for pyUSIrest.auth

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu May 24 15:46:37 2018

@author: Paolo Cozzi <cozzi@ibba.cnr.it>

.. _`python_jwt.process_jwt`: https://rawgit.now.sh/davedoesdev/python-jwt/master/docs/_build/html/index.html#python_jwt.process_jwt

"""  # noqa

import requests
import datetime
import logging

import python_jwt

from . import settings
from .exceptions import USIConnectionError

logger = logging.getLogger(__name__)


[docs]class Auth(): """ Deal with EBI AAP tokens. Starts from a token object or by providing user credentials. It parse token and provide methods like checking expiration times. Attributes: auth_url (str): Default url for EBI AAP. expire (datetime.datetime): when token expires issued (datetime.datetime): when token was requested header (dict): token header read by `python_jwt.process_jwt`_ claims (dict): token claims read by `python_jwt.process_jwt`_ """ auth_url = None
[docs] def __init__(self, user=None, password=None, token=None): """ Instantiate a new python EBI AAP Object. You can generate a new object providing both user and password, or by passing a valid token string Args: user (str): your aap username password (str): your password token (str): a valid EBI AAP jwt token """ self.expire = None self.issued = None self.header = None self.claims = None self._token = None # load auth url form init self.auth_url = settings.AUTH_URL + "/auth" # get a response if password and user: logger.debug("Authenticating user {user}".format(user=user)) self.response = requests.get( self.auth_url, auth=requests.auth.HTTPBasicAuth( user, password)) # set status code self.status_code = self.response.status_code if self.status_code != 200: logger.error("Got status %s" % (self.status_code)) raise USIConnectionError( "Got status %s: '%s'" % ( self.status_code, self.response.text)) logger.debug("Got status %s" % (self.status_code)) # Set token with token.setter self.token = self.response.text elif token: # Set token with token.setter self.token = token else: raise ValueError( "You need to provide user/password or a valid token")
def _decode(self, token=None): """Decode JWT token using python_jwt""" # process token self.header, self.claims = python_jwt.process_jwt(token) # debug logger.debug("Decoded tocken with %s" % (self.header['alg'])) # record useful values self.issued = datetime.datetime.fromtimestamp(self.claims['iat']) self.expire = datetime.datetime.fromtimestamp(self.claims['exp']) @property def token(self): """Get/Set token as a string""" return self._token @token.setter def token(self, token): self._decode(token) self._token = token
[docs] def get_duration(self): """Get token remaining time before expiration Returns: datetime.timedelta: remaining time as :py:class:`timedelta <datetime.timedelta>` object """ now = datetime.datetime.now() duration = (self.expire - now) # debug if 0 < duration.total_seconds() < 300: logger.warning( "Token for {user} will expire in {seconds} seconds".format( user=self.claims['name'], seconds=duration.total_seconds() ) ) elif duration.total_seconds() < 0: logger.error( "Token for {user} is expired".format( user=self.claims['name'] ) ) else: logger.debug( "Token for {user} will expire in {seconds} seconds".format( user=self.claims['name'], seconds=duration.total_seconds() ) ) return duration
[docs] def is_expired(self): """Return True if token is exipired, False otherwise Returns: bool: True if token is exipired """ return self.get_duration().days < 0
def __str__(self): duration = self.get_duration() total_time = duration.total_seconds() formatted = "%.2d:%.2d:%.2d" % ( duration.seconds // 3600, (duration.seconds // 60) % 60, duration.seconds % 60) if total_time < 0: return "Token for {user} is expired".format( user=self.claims['name']) else: return "Token for {user} will expire in {duration}".format( user=self.claims['name'], duration=formatted)
[docs] def get_domains(self): """Returns a list of domain managed by this object Returns: list: a list of managed domains """ return self.claims['domains']