Faculty Management Code

Run Steps

$env:FLASK_APP = "main"  
python -m flask db init (ignore the errormigrations folder not empty)  
python -m flask db migrate  
python -m flask db upgrade  
python -m flask run  

POSTMAN

__init __.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from .config import BaseConfig
from flask_migrate import Migrate

app = Flask(__name__)
app.config.from_object(BaseConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
app.app_context().push()


from . import routes, models


# do not delete these below methods
@app.cli.command()
def erasedata():
    db.drop_all()
    db.session.commit()
    # db.create_all()


@app.cli.command()
def seeddata():
    from . import seed

seed.py

from werkzeug.security import generate_password_hash
from . import db
from .models import *

db.drop_all()
db.session.commit()
try:
    db.create_all()

    # Role creating
    r1 = RoleModel(rolename='ADMIN')
    db.session.add(r1)
    db.session.commit()

    r2 = RoleModel(rolename='USER')
    db.session.add(r2)
    db.session.commit()

    # Creating Users
    u1 = UserModel(username='admin1', email='adminemail@gmail.com', password='admin123$', role=1)
    db.session.add(u1)
    db.session.commit()

    u2 = UserModel(username='admin2', email='adminemail2@gmail.com', password='admin789$', role=1)
    db.session.add(u2)
    db.session.commit()

    u3 = UserModel(username='user', email='useremail@gmail.com', password='user123$', role=2)
    db.session.add(u3)
    db.session.commit()

    print('Database successfully initialized')
except Exception as e:
    db.session.rollback()
    print(f'Error in adding data to db: {e}')

config.py

import os
basedir = os.path.abspath(os.path.dirname(__file__))


class BaseConfig(object):
    DEBUG = True
    TESTING = False
    SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'ecommerceapp.db')
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'heythisisthesecretkey'
    

class TestingConfig(BaseConfig):
    TESTING = True
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'heythisisthesecretkey'
    SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'testecommerce.db')
    

models.py

from ecommerceapp import db
from datetime import datetime


class RoleModel(db.Model):
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    rolename = db.Column(db.String(50), unique=True, nullable=False)

class UserModel(db.Model):
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    username = db.Column(db.String(50), nullable=False)
    email = db.Column(db.String(100), unique=True, nullable=False)
    password = db.Column(db.String(100), nullable=False)
    role = db.Column(db.Integer, db.ForeignKey('role_model.id'), nullable=False)

class FacultyModel(db.Model):
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    faculty_name = db.Column(db.String(100), nullable=False)
    qualification = db.Column(db.String(100), nullable=False)
    email = db.Column(db.String(100), unique=True, nullable=False)
    gender = db.Column(db.String(10), nullable=False)
    department = db.Column(db.String(50), nullable=False)
    experience = db.Column(db.Integer, nullable=False)
    admin_id = db.Column(db.Integer, db.ForeignKey('user_model.id'), nullable=False)

routes.py

from sqlite3 import IntegrityError
from flask import (
    Flask,
    abort,
    request,
    jsonify,
    make_response,
    session,
    app,
    Response,
    url_for,
)
from ecommerceapp import app, db
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_migrate import Migrate

from datetime import datetime, timedelta
import os
from sqlalchemy.orm import sessionmaker

import json
import jwt
from functools import wraps


from .models import *


"""
NOTE:
Use jsonify function to return the outputs and status code

Example:
with output
return jsonify(output),2000

only status code
return '',404


"""


# # Use this token_required method for your routes where ever needed.
# def token_required(f):
#     @wraps(f)
#     def decorated(*args, **kwargs):
#         token = None
#         if "Authorization" in request.headers:
#             token = request.headers["Authorization"]
#             print(token)
#         if not token:
#             return jsonify({"Message": " Token is missing"}), 401
#         try:
#             data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
#             print(data)
#             current_user = UserModel.query.filter_by(email=data["email"]).first()
#             print(current_user)
#             print(current_user.email)
#         except:
#             return jsonify({"Message": "Token is invalid"}), 401
#         return f(current_user, *args, **kwargs)

#     return decorated


def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = None
        if "Authorization" in request.headers:
            token = request.headers["Authorization"].split(" ")[
                1
            ]  # Extract the token without the "Bearer " prefix
            print("Token:", token)

        if not token:
            return jsonify({"Message": "Token is missing"}), 401

        try:
            data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
            print("Decoded Data:", data)

            current_user = UserModel.query.filter_by(email=data["email"]).first()
            print("Current User:", current_user)
            print("Current User Email:", current_user.email)

        except jwt.ExpiredSignatureError:
            return jsonify({"Message": "Token has expired"}), 401
        except jwt.InvalidTokenError:
            return jsonify({"Message": "Token is invalid"}), 401

        return f(current_user, *args, **kwargs)

    return decorated


# #Write your code here for the API end points


# Use this token_required method for your routes where ever needed.
# def token_required(f):
#     @wraps(f)
#     def decorated(*args, **kwargs):
#         token = None
#         print(request.headers)
#         if 'x-access-token' in request.headers:
#             token = request.headers['x-access-token']
#             print(token)
#         if not token:
#             return jsonify({'Message':' Token is missing'}), 401
#         try:
#             print("yes")
#             data = jwt.decode(token, app.config['SECRET_KEY'],  algorithms='HS256')
#             print(data)
#             current_user = UserModel.query.filter_by(email=data['email']).first()
#             print(current_user.email)
#             print(current_user.role)
#         except:
#             return jsonify({'Message': 'Token is invalid'}), 401
#         return f(current_user, *args, **kwargs)
#     return decorated


def role_required(role):
    def wrapper(fn):
        @wraps(fn)
        def decorated_view(*args, **kwargs):
            email = kwargs.pop("email", None)
            print(email)

            role = UserModel.objects.get(email=email)
            print(role)

            roleName = role.role.rolename
            print(roleName)

            if roleName != role:
                abort(403)
            return fn(*args, **kwargs)

        return decorated_view

    return wrapper


@app.route("/")
def index():
    return "Index Page"


@app.route("/user/login", methods=["POST"])
def user_login():
    data = request.get_json()
    user = UserModel.query.filter_by(email=data.get("email")).first()

    if user and user.password == data.get("password"):
        # If authentication is successful, create a JWT token
        token = jwt.encode(
            {"email": user.email}, app.config["SECRET_KEY"], algorithm="HS256"
        )

        print(token)
        return jsonify({"jwt": token, "status": 200})
    else:
        return jsonify({"Message": "Invalid credentials"}), 400


# Constants for roles
ADMIN_ROLE = 1
USER_ROLE = 2


# Endpoint for adding a new faculty
@app.route("/faculty/add", methods=["POST"])
@token_required
def add_faculty(current_user):
    print("134 line", current_user.role)
    if current_user.role != ADMIN_ROLE:
        abort(400)  # Only admins can add faculty

    data = request.get_json()

    new_faculty = FacultyModel(
        faculty_name=data.get("faculty_name"),
        qualification=data.get("qualification"),
        email=data.get("email"),
        gender=data.get("gender"),
        department=data.get("department"),
        experience=data.get("experience"),
        admin_id=current_user.id,
    )

    try:
        db.session.add(new_faculty)
        db.session.commit()
        return (
            jsonify(
                {
                    "id": new_faculty.id,
                    "faculty_name": new_faculty.faculty_name,
                    "qualification": new_faculty.qualification,
                    "email": new_faculty.email,
                    "gender": new_faculty.gender,
                    "department": new_faculty.department,
                    "experience": new_faculty.experience,
                    "admin_id": new_faculty.admin_id,
                }
            ),
            201,
        )
    except IntegrityError as e:
        db.session.rollback()
        return (
            jsonify(
                {"Message": "Duplicate email or other integrity constraint violated"}
            ),
            400,
        )
    except Exception as e:
        db.session.rollback()
        return jsonify({"Message": "Bad Request on invalid credentials"}), 400


# http://127.0.0.1:5000/faculty/list?gender=MALE&department=CE
@app.route("/faculty/list", methods=["GET"])
@token_required
def list_faculty(current_user):
    # Check if the user is authenticated
    if not current_user:
        return jsonify({"Message": "Authentication required"}), 401

    # Get parameters from the URL
    department = request.args.get("department")
    experience = request.args.get("experience")
    gender = request.args.get("gender")

    # Query faculty based on the provided search criteria
    # faculty_query = FacultyModel.query.filter_by(admin_id=current_user.id)
    faculty_query = FacultyModel.query

    if department:
        faculty_query = faculty_query.filter_by(department=department)
    if experience:
        faculty_query = faculty_query.filter_by(experience=experience)
    if gender:
        faculty_query = faculty_query.filter_by(gender=gender)

    faculty_list = faculty_query.all()

    if not faculty_list:
        return jsonify({"Message": "No data available"}), 400

    # Format the response
    formatted_faculty_list = []
    for faculty in faculty_list:
        formatted_faculty_list.append(
            {
                "id": faculty.id,
                "faculty_name": faculty.faculty_name,
                "qualification": faculty.qualification,
                "email": faculty.email,
                "gender": faculty.gender,
                "department": faculty.department,
                "experience": faculty.experience,
                "admin_id": faculty.admin_id,
            }
        )

    return jsonify(formatted_faculty_list), 200


@app.route("/faculty/update/<int:faculty_id>", methods=["PATCH"])
@token_required
def update_faculty(current_user, faculty_id):
    # Check if the user is authenticated
    if not current_user:
        return jsonify({"Message": "Authentication required"}), 401

    # Query the faculty by ID and check if it belongs to the authenticated user
    faculty = FacultyModel.query.get(faculty_id)

    if not faculty or faculty.admin_id != current_user.id:
        return jsonify({"Message": "Faculty not found or unauthorized access"}), 404

    # Get the update data from the request body
    data = request.get_json()

    # Update the faculty details
    faculty.qualification = data.get("qualification", faculty.qualification)
    faculty.experience = data.get("experience", faculty.experience)
    faculty.department = data.get("department", faculty.department)

    try:
        db.session.commit()
        return (
            jsonify(
                {
                    "id": faculty.id,
                    "faculty_name": faculty.faculty_name,
                    "qualification": faculty.qualification,
                    "email": faculty.email,
                    "gender": faculty.gender,
                    "department": faculty.department,
                    "experience": faculty.experience,
                    "admin_id": faculty.admin_id,
                }
            ),
            200,
        )
    except Exception as e:
        db.session.rollback()
        return jsonify({"Message": "Bad Request on invalid credentials"}), 400


# Endpoint for deleting a faculty by ID
# @app.route('/faculty/delete/<string:faculty_id>', methods=['DELETE'])
@app.route("/faculty/delete/<int:faculty_id>", methods=["DELETE"])
@token_required
def delete_faculty(current_user, faculty_id):
    # Check if the user is authenticated
    if not current_user:
        return jsonify({"Message": "Authentication required"}), 401

    # Query the faculty by ID
    faculty = FacultyModel.query.get(faculty_id)

    if not faculty:
        return jsonify({"Message": "Faculty not found"}), 400

    # Check if the authenticated user is an admin or the creator of the faculty
    if current_user.role != ADMIN_ROLE and faculty.admin_id != current_user.id:
        return jsonify({"Message": "You don’t have permission"}), 403

    try:
        # Delete the faculty from the database
        db.session.delete(faculty)
        db.session.commit()
        return jsonify({"Message": "Deleted successfully"}), 200
    except Exception as e:
        db.session.rollback()
        return jsonify({"Message": "Bad Request on invalid credentials"}), 400