3.使用数据库示例
3.1 使用SQLAlchemy orm框架
官方代码里面使用的是sqlalchemy,异步也是使用的这个
3.1.1 准备工作
安装 SQLAlchemy
pip install sqlalchemy
创建数据库 fastapi_db
create database fastapi_db;
3.1.2 配置数据库连接
创建文件database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 数据库连接配置
SQLALCHEMY_DATABASE_URI = (
"mysql+pymysql://root:123456@localhost/fastapi_db?charset=utf8mb4"
# 用户:密码@服务器/数据库?参数
)
# 创建数据库引擎
engine = create_engine(SQLALCHEMY_DATABASE_URI)
# 创建数据库会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 声明基类
Base = declarative_base()
3.1.3 创建数据模型
创建文件models.py
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
# 定义 User 类
class User(Base):
__tablename__ = 'users' # 定义表名
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True)
hashed_password = Column(String(255))
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
# 关联 Item 表
# 定义 Item 类
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(255), index=True)
description = Column(String(255), index=True)
owner_id = Column(Integer, ForeignKey('users.id'))
owner = relationship("User", back_populates="items")
# 关联 User 表
3.1.4 创建 Pydantic 模型
创建文件schemas.py
from typing import List
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: str = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
复制
3.1.5 crud 工具
创建文件crud.py
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
"""
根据id获取用户信息
:param db: 数据库会话
:param user_id: 用户id
:return: 用户信息
"""
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
"""
根据email获取用户信息
:param db: 数据库会话
:param email: 用户email
:return: 用户信息
"""
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
"""
获取特定数量的用户
:param db: 数据库会话
:param skip: 开始位置
:param limit: 限制数量
:return: 用户信息列表
"""
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
"""
创建用户
:param db: 数据库会话
:param user: 用户模型
:return: 根据email和password登录的用户信息
"""
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user) # 添加到会话
db.commit() # 提交到数据库
db.refresh(db_user) # 刷新数据库
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
"""
获取指定数量的item
:param db: 数据库会话
:param skip: 开始位置
:param limit: 限制数量
:return: item列表
"""
return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
"""
创建用户item
:param db: 数据库会话
:param item: Item对象
:param user_id: 用户id
:return: Item模型对象
"""
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
3.1.6 入口main函数修改
from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# 依赖
def get_db():
try:
db = SessionLocal()
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# 根据email查找用户
db_user = crud.get_user_by_email(db, email=user.email)
# 如果用户存在,提示该邮箱已经被注册
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
# 返回创建的user对象
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
# 读取指定数量用户
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
# 获取当前id的用户信息
db_user = crud.get_user(db, user_id=user_id)
# 如果没有信息,提示用户不存在
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
# 创建该用户的items
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
# 获取所有items
items = crud.get_items(db, skip=skip, limit=limit)
return items
(pt19) D:\web_python_dev>uvicorn fastapi_mysql.main:app --reload
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [6988] using watchgod
INFO: Started server process [2112]
INFO: Waiting for application startup.
INFO: Application startup complete.
mysql> use fastapi_db
Database changed
mysql> show tables;
+----------------------+
| Tables_in_fastapi_db |
+----------------------+
| items |
| users |
+----------------------+
2 rows in set (0.00 sec)
3.2 其他orm框架
除了sqlalchemy, 还有fastapi-async-sqlalchemy, tortoise-orm等其他框架, 网上可以搜搜
评论 (0)