from cryptography.fernet import Fernet, InvalidToken import datetime import os import json TOKENS = {} class TokenModel: clients = None schema = None key = None fernet = None token_lifetime_hours = None env_lifetime = "SOAR_TOKEN_LIFETIME" env_secret = "SOAR_TOKEN_SECRET" def __init__(self): self.getFernet() self.token_lifetime_hours = int(os.getenv(self.env_lifetime, 24)) def getFernet(self): self.fernet = Fernet(self.getKey().encode()) def getKey(self): key = os.getenv(self.env_secret) print(key) if not key: key = Fernet.generate_key().decode() os.environ[self.env_secret] = key self.key = key return self.key def setSecret(self): if not os.getenv(self.env_secret): os.environ[self.env_secret] = self.getKey() def getExpiry(self): now = datetime.datetime.utcnow() expires = now + datetime.timedelta(hours=self.token_lifetime_hours) return expires.isoformat() def encrypt(self, client_id): try: expiry = self.getExpiry() token_content = {"client_id": client_id, "expiry": expiry} token = self.fernet.encrypt(json.dumps(token_content).encode()).decode() return {"token": token, "expiry": expiry} except KeyError as e: return None def decrypt(self, token): try: content = json.loads(self.fernet.decrypt(token.encode()).decode()) return content except (InvalidToken, KeyError) as e: return None def get(self, client_id): response = self.encrypt(client_id) TOKENS[response["token"]] = client_id return response def validate(self, token): response = {"valid": False} if token in TOKENS: content = self.decrypt(token) if content: now = datetime.datetime.utcnow() expires = datetime.datetime.fromisoformat(content["expiry"]) response["valid"] = expires > now if response["valid"]: response.update(content) else: del TOKENS[token] else: del TOKENS[token] return response