$env:FLASK_APP = "main"
python -m flask db init (ignore the errormigrations folder not empty)
python -m flask db migrate
python -m flask db upgrade
python -m flask run
Authorization: Select Bearer Token and direcltly add token at token no need to write Bearer ahead itwill be bydefault
ERASE DATA:
python -m flask erasedata
SeedDATA: python -m flask seeddata
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from .config import BaseConfig
from flask_migrate import Migrate
app = Flask(__name__)
app.config.from_object(BaseConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
app.app_context().push()
from . import routes, models
# do not delete these below methods
@app.cli.command()
def erasedata():
db.drop_all()
db.session.commit()
# db.create_all()
@app.cli.command()
def seeddata():
from . import seed
from werkzeug.security import generate_password_hash
from . import db
from .models import *
db.drop_all()
db.session.commit()
try:
db.create_all()
# Role creating
r1 = RoleModel(rolename='ADMIN')
db.session.add(r1)
db.session.commit()
r2 = RoleModel(rolename='USER')
db.session.add(r2)
db.session.commit()
# Creating Users
u1 = UserModel(username='admin1', email='adminemail@gmail.com', password='admin123$', role=1)
db.session.add(u1)
db.session.commit()
u2 = UserModel(username='admin2', email='adminemail2@gmail.com', password='admin789$', role=1)
db.session.add(u2)
db.session.commit()
u3 = UserModel(username='user', email='useremail@gmail.com', password='user123$', role=2)
db.session.add(u3)
db.session.commit()
print('Database successfully initialized')
except Exception as e:
db.session.rollback()
print(f'Error in adding data to db: {e}')
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class BaseConfig(object):
DEBUG = True
TESTING = False
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'ecommerceapp.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
SECRET_KEY = os.environ.get('SECRET_KEY') or 'heythisisthesecretkey'
class TestingConfig(BaseConfig):
TESTING = True
SQLALCHEMY_TRACK_MODIFICATIONS = False
SECRET_KEY = os.environ.get('SECRET_KEY') or 'heythisisthesecretkey'
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'testecommerce.db')
from ecommerceapp import db
from datetime import datetime
class RoleModel(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
rolename = db.Column(db.String(50), unique=True, nullable=False)
class UserModel(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(50), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
password = db.Column(db.String(100), nullable=False)
role = db.Column(db.Integer, db.ForeignKey('role_model.id'), nullable=False)
class FacultyModel(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
faculty_name = db.Column(db.String(100), nullable=False)
qualification = db.Column(db.String(100), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
gender = db.Column(db.String(10), nullable=False)
department = db.Column(db.String(50), nullable=False)
experience = db.Column(db.Integer, nullable=False)
admin_id = db.Column(db.Integer, db.ForeignKey('user_model.id'), nullable=False)
from sqlite3 import IntegrityError
from flask import (
Flask,
abort,
request,
jsonify,
make_response,
session,
app,
Response,
url_for,
)
from ecommerceapp import app, db
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_migrate import Migrate
from datetime import datetime, timedelta
import os
from sqlalchemy.orm import sessionmaker
import json
import jwt
from functools import wraps
from .models import *
"""
NOTE:
Use jsonify function to return the outputs and status code
Example:
with output
return jsonify(output),2000
only status code
return '',404
"""
# # Use this token_required method for your routes where ever needed.
# def token_required(f):
# @wraps(f)
# def decorated(*args, **kwargs):
# token = None
# if "Authorization" in request.headers:
# token = request.headers["Authorization"]
# print(token)
# if not token:
# return jsonify({"Message": " Token is missing"}), 401
# try:
# data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
# print(data)
# current_user = UserModel.query.filter_by(email=data["email"]).first()
# print(current_user)
# print(current_user.email)
# except:
# return jsonify({"Message": "Token is invalid"}), 401
# return f(current_user, *args, **kwargs)
# return decorated
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if "Authorization" in request.headers:
token = request.headers["Authorization"].split(" ")[
1
] # Extract the token without the "Bearer " prefix
print("Token:", token)
if not token:
return jsonify({"Message": "Token is missing"}), 401
try:
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
print("Decoded Data:", data)
current_user = UserModel.query.filter_by(email=data["email"]).first()
print("Current User:", current_user)
print("Current User Email:", current_user.email)
except jwt.ExpiredSignatureError:
return jsonify({"Message": "Token has expired"}), 401
except jwt.InvalidTokenError:
return jsonify({"Message": "Token is invalid"}), 401
return f(current_user, *args, **kwargs)
return decorated
# #Write your code here for the API end points
# Use this token_required method for your routes where ever needed.
# def token_required(f):
# @wraps(f)
# def decorated(*args, **kwargs):
# token = None
# print(request.headers)
# if 'x-access-token' in request.headers:
# token = request.headers['x-access-token']
# print(token)
# if not token:
# return jsonify({'Message':' Token is missing'}), 401
# try:
# print("yes")
# data = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256')
# print(data)
# current_user = UserModel.query.filter_by(email=data['email']).first()
# print(current_user.email)
# print(current_user.role)
# except:
# return jsonify({'Message': 'Token is invalid'}), 401
# return f(current_user, *args, **kwargs)
# return decorated
def role_required(role):
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
email = kwargs.pop("email", None)
print(email)
role = UserModel.objects.get(email=email)
print(role)
roleName = role.role.rolename
print(roleName)
if roleName != role:
abort(403)
return fn(*args, **kwargs)
return decorated_view
return wrapper
@app.route("/")
def index():
return "Index Page"
@app.route("/user/login", methods=["POST"])
def user_login():
data = request.get_json()
user = UserModel.query.filter_by(email=data.get("email")).first()
if user and user.password == data.get("password"):
# If authentication is successful, create a JWT token
token = jwt.encode(
{"email": user.email}, app.config["SECRET_KEY"], algorithm="HS256"
)
print(token)
return jsonify({"jwt": token, "status": 200})
else:
return jsonify({"Message": "Invalid credentials"}), 400
# Constants for roles
ADMIN_ROLE = 1
USER_ROLE = 2
# Endpoint for adding a new faculty
@app.route("/faculty/add", methods=["POST"])
@token_required
def add_faculty(current_user):
print("134 line", current_user.role)
if current_user.role != ADMIN_ROLE:
abort(400) # Only admins can add faculty
data = request.get_json()
new_faculty = FacultyModel(
faculty_name=data.get("faculty_name"),
qualification=data.get("qualification"),
email=data.get("email"),
gender=data.get("gender"),
department=data.get("department"),
experience=data.get("experience"),
admin_id=current_user.id,
)
try:
db.session.add(new_faculty)
db.session.commit()
return (
jsonify(
{
"id": new_faculty.id,
"faculty_name": new_faculty.faculty_name,
"qualification": new_faculty.qualification,
"email": new_faculty.email,
"gender": new_faculty.gender,
"department": new_faculty.department,
"experience": new_faculty.experience,
"admin_id": new_faculty.admin_id,
}
),
201,
)
except IntegrityError as e:
db.session.rollback()
return (
jsonify(
{"Message": "Duplicate email or other integrity constraint violated"}
),
400,
)
except Exception as e:
db.session.rollback()
return jsonify({"Message": "Bad Request on invalid credentials"}), 400
# http://127.0.0.1:5000/faculty/list?gender=MALE&department=CE
@app.route("/faculty/list", methods=["GET"])
@token_required
def list_faculty(current_user):
# Check if the user is authenticated
if not current_user:
return jsonify({"Message": "Authentication required"}), 401
# Get parameters from the URL
department = request.args.get("department")
experience = request.args.get("experience")
gender = request.args.get("gender")
# Query faculty based on the provided search criteria
# faculty_query = FacultyModel.query.filter_by(admin_id=current_user.id)
faculty_query = FacultyModel.query
if department:
faculty_query = faculty_query.filter_by(department=department)
if experience:
faculty_query = faculty_query.filter_by(experience=experience)
if gender:
faculty_query = faculty_query.filter_by(gender=gender)
faculty_list = faculty_query.all()
if not faculty_list:
return jsonify({"Message": "No data available"}), 400
# Format the response
formatted_faculty_list = []
for faculty in faculty_list:
formatted_faculty_list.append(
{
"id": faculty.id,
"faculty_name": faculty.faculty_name,
"qualification": faculty.qualification,
"email": faculty.email,
"gender": faculty.gender,
"department": faculty.department,
"experience": faculty.experience,
"admin_id": faculty.admin_id,
}
)
return jsonify(formatted_faculty_list), 200
@app.route("/faculty/update/<int:faculty_id>", methods=["PATCH"])
@token_required
def update_faculty(current_user, faculty_id):
# Check if the user is authenticated
if not current_user:
return jsonify({"Message": "Authentication required"}), 401
# Query the faculty by ID and check if it belongs to the authenticated user
faculty = FacultyModel.query.get(faculty_id)
if not faculty or faculty.admin_id != current_user.id:
return jsonify({"Message": "Faculty not found or unauthorized access"}), 404
# Get the update data from the request body
data = request.get_json()
# Update the faculty details
faculty.qualification = data.get("qualification", faculty.qualification)
faculty.experience = data.get("experience", faculty.experience)
faculty.department = data.get("department", faculty.department)
try:
db.session.commit()
return (
jsonify(
{
"id": faculty.id,
"faculty_name": faculty.faculty_name,
"qualification": faculty.qualification,
"email": faculty.email,
"gender": faculty.gender,
"department": faculty.department,
"experience": faculty.experience,
"admin_id": faculty.admin_id,
}
),
200,
)
except Exception as e:
db.session.rollback()
return jsonify({"Message": "Bad Request on invalid credentials"}), 400
# Endpoint for deleting a faculty by ID
# @app.route('/faculty/delete/<string:faculty_id>', methods=['DELETE'])
@app.route("/faculty/delete/<int:faculty_id>", methods=["DELETE"])
@token_required
def delete_faculty(current_user, faculty_id):
# Check if the user is authenticated
if not current_user:
return jsonify({"Message": "Authentication required"}), 401
# Query the faculty by ID
faculty = FacultyModel.query.get(faculty_id)
if not faculty:
return jsonify({"Message": "Faculty not found"}), 400
# Check if the authenticated user is an admin or the creator of the faculty
if current_user.role != ADMIN_ROLE and faculty.admin_id != current_user.id:
return jsonify({"Message": "You don’t have permission"}), 403
try:
# Delete the faculty from the database
db.session.delete(faculty)
db.session.commit()
return jsonify({"Message": "Deleted successfully"}), 200
except Exception as e:
db.session.rollback()
return jsonify({"Message": "Bad Request on invalid credentials"}), 400