from cryptography.fernet import Fernet,InvalidToken import datetime import os import json TOKENS = {} class TokenModel(): clients = None schema = None key = None fernet = None token_lifetime_hours = None env_lifetime = 'SOAR_TOKEN_LIFETIME' env_secret = 'SOAR_TOKEN_SECRET' def __init__(self): self.getFernet() self.token_lifetime_hours = os.getenv(self.env_lifetime, 24) def getFernet(self): self.fernet = Fernet(self.getKey().encode()) def getKey(self): key = os.getenv(self.env_secret) print(key) if not key: key = Fernet.generate_key().decode() os.environ[self.env_secret] = key self.key = key return self.key def setSecret(self): if not os.getenv(self.env_secret): os.environ[self.env_secret] = self.getKey() def getExpiry(self): now = datetime.datetime.utcnow() expires = now + datetime.timedelta(hours=self.token_lifetime_hours) return expires.isoformat() def encrypt(self, client_id): try: expiry = self.getExpiry() token_content = { 'client_id': client_id, 'expiry': expiry } token = self.fernet.encrypt(json.dumps(token_content).encode()).decode() return { 'token': token, 'expiry': expiry } except KeyError as e: return None def decrypt(self, token): try: content = json.loads(self.fernet.decrypt(token.encode()).decode()) return content except (InvalidToken,KeyError) as e: return None def get(self, client_id): response = self.encrypt(client_id) TOKENS[response['token']] = client_id return response def validate(self, token): response = { 'valid': False } if token in TOKENS: content = self.decrypt(token) if content: now = datetime.datetime.utcnow() expires = datetime.datetime.fromisoformat(content['expiry']) response['valid'] = expires > now if response['valid']: response.update(content) else: del TOKENS[token] else: del TOKENS[token] return response