From 25d7a475f306695874b9920df60383524745454e Mon Sep 17 00:00:00 2001 From: mhdali Date: Mon, 13 Feb 2023 23:25:01 +0530 Subject: [PATCH 1/3] fixed generate_api_token --- requirements.txt | 1 - src/generate_api_token.py | 63 ++++++++++++++++++++------------------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/requirements.txt b/requirements.txt index e732cff..fb6804a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ Flask==2.2.2 pytest==7.2.1 -PyJWT==2.6.0 diff --git a/src/generate_api_token.py b/src/generate_api_token.py index a99f80f..cc4410a 100644 --- a/src/generate_api_token.py +++ b/src/generate_api_token.py @@ -1,40 +1,41 @@ -import jwt -import json +import random +import string +import sqlite3 +# Connect to the database (or create it if it doesn't exist) +conn = sqlite3.connect('Tokens.db', check_same_thread=False) +cursor = conn.cursor() -def generate_api_token( payload, secret): - header = {"alg": "HS256", "typ": "JWT"} - encoded_jwt = jwt.encode(payload, secret, algorithm='HS256', headers=header) - return json.dumps({"token": encoded_jwt}) +# Create the table to store the tokens (if it doesn't already exist) +cursor.execute(''' +CREATE TABLE IF NOT EXISTS Tokens ( + token_text text +) +''') +conn.commit() - -def verify_api_token(token, secret): - try: - decoded_jwt = jwt.decode(token, secret, algorithms=["HS256"]) - return json.dumps({"payload": decoded_jwt}) - except: - return {"error": "Invalid signature:"} - +class ApiToken: + def generate_token(): + # Generate random alpha-numeric string with length 50 + token = ''.join(random.choices(string.ascii_letters+string.digits, k=40)) + result = cursor.execute("SELECT * FROM Tokens WHERE token_text=?", (token,)).fetchone() + + # Check whether the token already exist or not + while result: + token = ''.join(random.choices(string.ascii_letters+string.digits, k=50)) + result = cursor.execute("SELECT * FROM Tokens WHERE token_text=?", (token,)).fetchone() + + cursor.execute("INSERT INTO Tokens (token_text) VALUES (?)", (token,)) + conn.commit() + + return token + def is_valid_token(token_to_check): + cursor.execute("SELECT * FROM Tokens WHERE token_text=?", (token_to_check,)) + result = cursor.fetchone() -# example - -payload = {"sub": "12364567890", "user": "John Doe"} -secret = "Ravipassword" #securely store secret in production - -jwt_json = generate_api_token( payload, secret) -print(jwt_json) - -jwt_dict = json.loads(jwt_json) -token = jwt_dict["token"] - -decoded_jwt = verify_api_token(token, secret) -print(decoded_jwt) - - - - + return True if result else False From ce0c3f2cb5acd6bb4f3903e7c586fc01fc5a4823 Mon Sep 17 00:00:00 2001 From: mhdali Date: Sun, 19 Feb 2023 14:09:15 +0530 Subject: [PATCH 2/3] changed sqlite3 to sqlalchemy --- requirements.txt | 1 + src/generate_api_token.py | 65 ++++++++++++++++++++------------------- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/requirements.txt b/requirements.txt index fb6804a..b19e214 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ Flask==2.2.2 pytest==7.2.1 +sqlalchemy==2.0.3 \ No newline at end of file diff --git a/src/generate_api_token.py b/src/generate_api_token.py index cc4410a..c36f1ce 100644 --- a/src/generate_api_token.py +++ b/src/generate_api_token.py @@ -1,41 +1,42 @@ import random import string -import sqlite3 +from sqlalchemy import create_engine, Column, String +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker -# Connect to the database (or create it if it doesn't exist) -conn = sqlite3.connect('Tokens.db', check_same_thread=False) -cursor = conn.cursor() +# db configuration +engine = create_engine('sqlite:///Tokens.db', echo=True) +Session = sessionmaker(bind=engine) -# Create the table to store the tokens (if it doesn't already exist) -cursor.execute(''' -CREATE TABLE IF NOT EXISTS Tokens ( - token_text text -) -''') -conn.commit() +# Create a base class for the ORM model +Base = declarative_base() -class ApiToken: - - def generate_token(): - # Generate random alpha-numeric string with length 50 - token = ''.join(random.choices(string.ascii_letters+string.digits, k=40)) - result = cursor.execute("SELECT * FROM Tokens WHERE token_text=?", (token,)).fetchone() - - # Check whether the token already exist or not - while result: - token = ''.join(random.choices(string.ascii_letters+string.digits, k=50)) - result = cursor.execute("SELECT * FROM Tokens WHERE token_text=?", (token,)).fetchone() - - cursor.execute("INSERT INTO Tokens (token_text) VALUES (?)", (token,)) - conn.commit() +class ApiToken(Base): + __tablename__ = 'Tokens' + token_text = Column(String, primary_key=True) - return token - - - def is_valid_token(token_to_check): - cursor.execute("SELECT * FROM Tokens WHERE token_text=?", (token_to_check,)) - result = cursor.fetchone() + def generate_token(cls): + ''' + generate random alpha-numeric string with length 50 and check whether it already exist or not + ''' + token = ''.join(random.choices(string.ascii_letters+string.digits, k=40)) + with Session() as session: + result = session.query(cls).filter_by(token_text=token).first() + while result: + token = ''.join(random.choices(string.ascii_letters+string.digits, k=50)) + result = session.query(cls).filter_by(token_text=token).first() + new_token = cls(token_text=token) + session.add(new_token) + session.commit() + return token - return True if result else False + def is_valid_token(cls, token_to_check): + with Session() as session: + result = session.query(cls).filter_by(token_text=token_to_check).first() + if result: + return True + return False +# create the table (if it doesn't already exist) +Base.metadata.create_all(engine) From d11af31abe405197b5ed5908de11412a7dfd29b0 Mon Sep 17 00:00:00 2001 From: FandF Brothers Date: Thu, 23 Feb 2023 00:15:05 +0530 Subject: [PATCH 3/3] nested class Token added --- src/generate_api_token.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/generate_api_token.py b/src/generate_api_token.py index c36f1ce..35f00af 100644 --- a/src/generate_api_token.py +++ b/src/generate_api_token.py @@ -11,29 +11,32 @@ # Create a base class for the ORM model Base = declarative_base() +class ApiToken(): + def __init__(self): + self.token = self.Token() -class ApiToken(Base): - __tablename__ = 'Tokens' - token_text = Column(String, primary_key=True) + class Token(Base): + __tablename__ = 'Tokens' + token_text = Column(String, primary_key=True) - def generate_token(cls): + def generate_token(self): ''' generate random alpha-numeric string with length 50 and check whether it already exist or not ''' token = ''.join(random.choices(string.ascii_letters+string.digits, k=40)) with Session() as session: - result = session.query(cls).filter_by(token_text=token).first() + result = session.query(self.Token).filter_by(token_text=token).first() while result: token = ''.join(random.choices(string.ascii_letters+string.digits, k=50)) - result = session.query(cls).filter_by(token_text=token).first() - new_token = cls(token_text=token) + result = session.query(self.Token).filter_by(token_text=token).first() + new_token = self.Token(token_text=token) session.add(new_token) session.commit() return token - def is_valid_token(cls, token_to_check): + def is_valid_token(self, token_to_check): with Session() as session: - result = session.query(cls).filter_by(token_text=token_to_check).first() + result = session.query(self.Token).filter_by(token_text=token_to_check).first() if result: return True return False