diff --git a/requirements.txt b/requirements.txt index e732cff..b19e214 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ Flask==2.2.2 pytest==7.2.1 -PyJWT==2.6.0 +sqlalchemy==2.0.3 \ No newline at end of file diff --git a/src/generate_api_token.py b/src/generate_api_token.py index a99f80f..35f00af 100644 --- a/src/generate_api_token.py +++ b/src/generate_api_token.py @@ -1,40 +1,45 @@ -import jwt -import json - - -def generate_api_token( payload, secret): - header = {"alg": "HS256", "typ": "JWT"} - encoded_jwt = jwt.encode(payload, secret, algorithm='HS256', headers=header) - return json.dumps({"token": encoded_jwt}) - - - -def verify_api_token(token, secret): - try: - decoded_jwt = jwt.decode(token, secret, algorithms=["HS256"]) - return json.dumps({"payload": decoded_jwt}) - except: - return {"error": "Invalid signature:"} - - - - - -# example - -payload = {"sub": "12364567890", "user": "John Doe"} -secret = "Ravipassword" #securely store secret in production - -jwt_json = generate_api_token( payload, secret) -print(jwt_json) - -jwt_dict = json.loads(jwt_json) -token = jwt_dict["token"] - -decoded_jwt = verify_api_token(token, secret) -print(decoded_jwt) - - - - - +import random +import string +from sqlalchemy import create_engine, Column, String +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker + +# db configuration +engine = create_engine('sqlite:///Tokens.db', echo=True) +Session = sessionmaker(bind=engine) + +# Create a base class for the ORM model +Base = declarative_base() + +class ApiToken(): + def __init__(self): + self.token = self.Token() + + class Token(Base): + __tablename__ = 'Tokens' + token_text = Column(String, primary_key=True) + + def generate_token(self): + ''' + generate random alpha-numeric string with length 50 and check whether it already exist or not + ''' + token = ''.join(random.choices(string.ascii_letters+string.digits, k=40)) + with Session() as session: + result = session.query(self.Token).filter_by(token_text=token).first() + while result: + token = ''.join(random.choices(string.ascii_letters+string.digits, k=50)) + result = session.query(self.Token).filter_by(token_text=token).first() + new_token = self.Token(token_text=token) + session.add(new_token) + session.commit() + return token + + def is_valid_token(self, token_to_check): + with Session() as session: + result = session.query(self.Token).filter_by(token_text=token_to_check).first() + if result: + return True + return False + +# create the table (if it doesn't already exist) +Base.metadata.create_all(engine)