Ecommerce Python Code
run steps
sudo apt-get update -y
sudo apt-get install python3-pip -y
pip3 install pytest==5.3.1 urllib3==1.25.7 Flask==1.1.1 Flask-HTTPAuth==3.3.0 flask-marshmallow==0.10.1 Flask-Migrate==2.5.2 Flask-Script==2.0.6 Flask-SQLAlchemy==2.4.1 Flask-Testing==0.8.0 marshmallow==3.2.2
cd /home/labuser/Desktop/Project/t4_ecommerce_flaskh/ecommerceapp;
export FLASK_APP=main.py;
python -m flask seeddata
1. set FLASK_APP=main
2. python -m flask db init (ignore the errormigrations folder not empty)
3. python -m flask db migrate
4. python -m flask db upgrade
5. python -m flask run
$env:FLASK_APP="main.py"
sqlite3’ command will get you into sqlite3 terminal.
2. ‘.open dbname.db’ will get that database as core. All further operations will be made
on that database.
3. ‘.tables’ will list the tables in that database.
4. ‘select * from tablename; ’ will list the rows of table.
virtual env
ctrl shift p
python interpreter`
enter interpreterpath
select scripts python exe
python -m pytest tests.py
Password encode code
import base64
print(str(base64.b64decode("YXBwbGU6cGFzc193b3Jk"))[2:-1])
from ecommerceapp import db
from datetime import datetime
class Product(db.Model):
product_id=db.Column(db.Integer,primary_key=True)
product_name=db.Column(db.String(80),nullable=False)
price=db.Column(db.Float,nullable=False)
seller_id=db.Column(db.Integer, db.ForeignKey('user.user_id'))
category_id=db.Column(db.Integer, db.ForeignKey('category.category_id'))
class Category(db.Model):
category_id=db.Column(db.Integer,primary_key=True)
category_name=db.Column(db.String(80),nullable=False)
products=db.relationship("Product", cascade="all, delete-orphan", backref='category')
class Cart(db.Model):
cart_id=db.Column(db.Integer,primary_key=True)
total_amount=db.Column(db.Float,nullable=False)
user_id=db.Column(db.Integer, db.ForeignKey('user.user_id'))
cartproducts=db.relationship("CartProduct", cascade="all, delete-orphan", backref='cart')
class CartProduct(db.Model):
cp_id=db.Column(db.Integer,primary_key=True)
cart_id=db.Column(db.Integer, db.ForeignKey('cart.cart_id'))
product_id=db.Column(db.Integer, db.ForeignKey('product.product_id'))
quantity=db.Column(db.Integer)
class Role(db.Model):
role_id=db.Column(db.Integer,primary_key=True)
role_name=db.Column(db.String(20),nullable=False)
users=db.relationship("User", cascade="all, delete-orphan", backref='role')
class User(db.Model):
user_id=db.Column(db.Integer,primary_key=True)
user_name=db.Column(db.String(20),nullable=False)
password=db.Column(db.String(150),nullable=False)
user_role=db.Column(db.Integer, db.ForeignKey('role.role_id'))
cart = db.relationship('Cart', backref='user', uselist=False)
__ init __.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from .config import BaseConfig
from flask_migrate import Migrate
app = Flask(__name__)
app.config.from_object(BaseConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
from . import routes, models
@app.cli.command()
def erasedata():
db.drop_all()
db.session.commit()
db.create_all()
@app.cli.command()
def seeddata():
from . import seed
from werkzeug.security import generate_password_hash
from . import db
from .models import Product, Role, Cart, CartProduct, Category,User
db.drop_all()
db.session.commit()
try:
db.drop_all()
db.create_all()
c1=Category(category_name='Fashion')
db.session.add(c1)
db.session.commit()
c2=Category(category_name='Electronics')
db.session.add(c2)
db.session.commit()
c3=Category(category_name='Books')
db.session.add(c3)
db.session.commit()
c4=Category(category_name='Groceries')
db.session.add(c4)
db.session.commit()
c5=Category(category_name='Medicines')
db.session.add(c5)
db.session.commit()
r1=Role(role_name='CONSUMER')
db.session.add(r1)
db.session.commit()
r2=Role(role_name='SELLER')
db.session.add(r2)
db.session.commit()
password=generate_password_hash("pass_word",method='pbkdf2:sha256',salt_length=8)
u1=User(user_name='jack',password=password,user_role=1)
db.session.add(u1)
db.session.commit()
u2=User(user_name='bob',password=password,user_role=1)
db.session.add(u2)
db.session.commit()
u3=User(user_name='apple',password=password,user_role=2)
db.session.add(u3)
db.session.commit()
u4=User(user_name='glaxo',password=password,user_role=2)
db.session.add(u4)
db.session.commit()
cart1=Cart(total_amount=20, user_id=1)
db.session.add(cart1)
db.session.commit()
cart2=Cart(total_amount=0, user_id=2)
db.session.add(cart2)
db.session.commit()
p1=Product(price=29190,product_name='ipad',category_id=2,seller_id=3)
db.session.add(p1)
db.session.commit()
p2=Product(price=10,product_name='crocin',category_id=5,seller_id=4)
db.session.add(p2)
db.session.commit()
cp1=CartProduct(cart_id=1,product_id=2,quantity=2)
db.session.add(cp1)
db.session.commit()
print('database successfully initialized')
except Exception:
db.session.rollback()
print('error in adding data to db')
from flask import (
Flask,
request,
jsonify,
make_response,
session,
app,
Response,
url_for,
)
from ecommerceapp import app, db
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_migrate import Migrate, MigrateCommand
from datetime import datetime, timedelta
import os
from sqlalchemy.orm import sessionmaker
from .models import *
from sqlalchemy.orm import joinedload
import base64
import json
import jwt
from functools import wraps
from flask import abort, current_app, request
"""
NOTE:
Use jsonify function to return the outputs and status code
Example:
with output
return jsonify(output),2000
only status code
return '',404
"""
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
print(request.headers)
if "x-access-token" in request.headers:
token = request.headers["x-access-token"]
print(token)
if not token:
return jsonify({"Message": " Token is missing"}), 401
try:
print("yes")
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms="HS256")
print(data)
current_user = User.query.filter_by(user_id=data["user_id"]).first()
print(current_user.user_id)
print(current_user.user_role)
except:
return jsonify({"Message": "Token is invalid"}), 401
return f(current_user, *args, **kwargs)
return decorated
def role_required(role):
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
user_id = kwargs.pop("user_id", None)
print(user_id)
user_role = User.objects.get(user_id=user_id)
print(user_role)
roleName = user_role.user_role.role_name
print(roleName)
if roleName != role:
abort(403)
return fn(*args, **kwargs)
return decorated_view
return wrapper
@app.route("/")
def index():
return "Index Page"
@app.route("/api/public/login", methods=["GET"])
def login():
uname = request.headers["Authorization"][6:]
print(uname)
pass1 = str(base64.b64decode(uname))[2:-1]
username = pass1.split(":")[0]
password = pass1.split(":")[1]
user = User.query.filter_by(user_name=username).first()
if user and check_password_hash(user.password, password):
token = jwt.encode(
{"user_id": user.user_id, "exp": datetime.utcnow() + timedelta(hours=5)},
app.config["SECRET_KEY"],
algorithm="HS256",
)
return jsonify({"token": token}), 200
else:
return jsonify({"token": "0"}), 401
@app.route("/api/public/product/search", methods=["GET"])
def search_products():
keyword = request.args.get("keyword")
products = (
db.session.query(Product)
.join(Category)
.filter(Product.product_name.contains(keyword))
.all()
)
if not products:
return jsonify("Null"), 400
response = []
for product in products:
category = {
"category_id": product.category.category_id,
"category_name": product.category.category_name,
}
product_data = {
"category": category,
"product_id": product.product_id,
"product_name": product.product_name,
"price": product.price,
"seller_id": product.seller_id,
}
response.append(product_data)
return jsonify(response), 200
@app.route("/api/auth/seller/product", methods=["GET"])
@token_required
def get_seller_products(current_user):
if current_user.user_role != 2:
return {
"message": "Unauthorized Endpoint",
}, 403
products = Product.query.filter_by(seller_id=current_user.user_id).all()
if not products:
return {"message": "Product not found"}, 403
response1 = []
for product in products:
response1.append(
{
"category": {
"category_id": product.category.category_id,
"category_name": product.category.category_name,
},
"price": product.price,
"product_id": product.product_id,
"product_name": product.product_name,
"seller_id": product.seller_id,
}
)
print(jsonify(response1))
return jsonify(response1), 200
@app.route("/api/auth/seller/product/<int:product_id>", methods=["GET"])
@token_required
def get_product(current_user, product_id):
if current_user.user_role != 2:
return {
"message": "Unauthorized Endpoint",
}, 403
product = Product.query.filter_by(
product_id=product_id, seller_id=current_user.user_id
).first()
if not product:
return jsonify({"error": "Product not found"}), 404
response = [
{
"category": {
"category_id": product.category.category_id,
"category_name": product.category.category_name,
},
"price": product.price,
"product_id": product.product_id,
"product_name": product.product_name,
"seller_id": product.seller_id,
}
]
return jsonify(response), 200
@app.route("/api/auth/seller/product", methods=["POST"])
@token_required
def add_product(current_user):
if current_user.user_role != 2:
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
existing_product = Product.query.filter_by(product_id=product_id).first()
if existing_product:
return jsonify({"message": "Product ID already exists"}), 409
product = Product(
product_id=product_id,
product_name=data.get("product_name"),
price=data.get("price"),
seller_id=current_user.user_id,
category_id=data.get("category_id"),
)
db.session.add(product)
db.session.commit()
return jsonify(product.product_id), 201
@app.route("/api/auth/seller/product", methods=["PUT"])
@token_required
def update_product(current_user):
if current_user.user_role != 2:
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
price = data.get("price")
product = Product.query.filter_by(
product_id=product_id, seller_id=current_user.user_id
).first()
if not product:
return jsonify({"message": "Product not found or not owned by the seller"}), 404
if product.price != price:
cartProducts = CartProduct.query.filter_by(product_id=product_id)
for cp in cartProducts:
cp.cart.total_amount += cp.quantity * (price - product.price)
product.price = price
db.session.commit()
return jsonify({"message": "Product price updated successfully"}), 200
@app.route("/api/auth/seller/product/<int:prodid>", methods=["DELETE"])
@token_required
def delete_product(current_user, prodid):
if current_user.user_role != 2:
return jsonify({"message": "Forbidden"}), 403
product = Product.query.filter_by(
product_id=prodid, seller_id=current_user.user_id
).first()
if not product:
return jsonify({"message": "Product not found or not owned by the seller"}), 404
cartProducts = CartProduct.query.filter_by(product_id=prodid)
for cp in cartProducts:
cp.cart.total_amount -= cp.quantity * product.price
db.session.delete(product)
db.session.commit()
return jsonify({"message": "Product deleted successfully"}), 200
@app.route("/api/auth/consumer/cart", methods=["GET"])
@token_required
def get_consumer_cart(current_user):
if current_user.user_role != 1:
return jsonify({"message": "Forbidden"}), 403
cart_data = (
Cart.query.join(CartProduct, Cart.cart_id == CartProduct.cart_id)
.join(Product, CartProduct.product_id == Product.product_id)
.join(Category, Product.category_id == Category.category_id)
.filter(Cart.user_id == current_user.user_id)
.with_entities(
CartProduct.cp_id,
Cart.cart_id,
Cart.total_amount,
Product.product_id,
Product.price,
Product.product_name,
Category.category_id,
Category.category_name,
)
.all()
)
cart_items = []
for cart_product in cart_data:
cart_item = {
"cartproducts": {
"product": {
"product_id": cart_product.product_id,
"price": cart_product.price,
"product_name": cart_product.product_name,
"category": {
"category_name": cart_product.category_name,
"category_id": cart_product.category_id,
},
},
"cp_id": cart_product.cp_id,
},
"cart_id": cart_product.cart_id,
"total_amount": cart_product.total_amount,
}
cart_items.append(cart_item)
return jsonify(cart_items), 200
@app.route("/api/auth/consumer/cart", methods=["POST"])
@token_required
def add_to_consumer_cart(current_user):
if current_user.user_role != 1:
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
quantity = data.get("quantity")
existing_cart_product = (
CartProduct.query.join(Cart, CartProduct.cart_id == Cart.cart_id)
.filter(
Cart.user_id == current_user.user_id, CartProduct.product_id == product_id
)
.first()
)
if existing_cart_product:
return jsonify({"message": "Product already in cart"}), 409
product = Product.query.filter_by(product_id=product_id).first()
if not product:
return jsonify({"message": "Product not found"}), 404
total_amount = current_user.cart.total_amount + (product.price * quantity)
new_cart_product = CartProduct(
cart_id=current_user.cart.cart_id, product_id=product_id, quantity=quantity
)
db.session.add(new_cart_product)
db.session.commit()
current_user.cart.total_amount = total_amount
db.session.commit()
return jsonify(total_amount), 200
@app.route("/api/auth/consumer/cart", methods=["PUT"])
@token_required
def update_consumer_cart(current_user):
if current_user.user_role != 1:
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
quantity = data.get("quantity")
cart_product = (
CartProduct.query.join(Cart, CartProduct.cart_id == Cart.cart_id)
.filter(
Cart.user_id == current_user.user_id, CartProduct.product_id == product_id
)
.first()
)
if not cart_product:
return jsonify({"message": "Product not found in cart"}), 404
product = Product.query.filter_by(product_id=product_id).first()
if not product:
return jsonify({"message": "Product not found"}), 404
total_amount = current_user.cart.total_amount + (
quantity - cart_product.quantity
) * (product.price)
cart_product.quantity = quantity
current_user.cart.total_amount = total_amount
db.session.commit()
return jsonify(total_amount), 200
@app.route("/api/auth/consumer/cart", methods=["DELETE"])
@token_required
def remove_from_consumer_cart(current_user):
if current_user.user_role != 1:
return jsonify({"message": "Forbidden"}), 403
data = request.get_json()
product_id = data.get("product_id")
cart_product = (
CartProduct.query.join(Cart, CartProduct.cart_id == Cart.cart_id)
.filter(
Cart.user_id == current_user.user_id, CartProduct.product_id == product_id
)
.first()
)
if not cart_product:
return jsonify({"message": "Product not found in cart"}), 404
product = Product.query.filter_by(product_id=product_id).first()
if not product:
return jsonify({"message": "Product not found"}), 404
total_amount = current_user.cart.total_amount - (
product.price * cart_product.quantity
)
db.session.delete(cart_product)
db.session.commit()
print(total_amount)
current_user.cart.total_amount = total_amount
db.session.commit()
return jsonify(total_amount), 200