网站运营内容建设方案,网页制作与网站开发,pc网站建设怎么样,在原域名给公司建立网站在当今高度互联的软件生态系统中#xff0c;API#xff08;应用程序编程接口#xff09;扮演着至关重要的角色#xff0c;它们是不同服务和应用之间进行通信的桥梁。然而#xff0c;随着API的广泛应用#xff0c;确保数据和用户身份的安全变得至关重要。本文将深入探讨AP…在当今高度互联的软件生态系统中API应用程序编程接口扮演着至关重要的角色它们是不同服务和应用之间进行通信的桥梁。然而随着API的广泛应用确保数据和用户身份的安全变得至关重要。本文将深入探讨API认证的核心概念特别是如何在FastAPI中利用JWTJSON Web Tokens结合SQLModel实现安全、高效的用户认证。
1. API 认证的重要性及认证类型
API认证是验证请求来源身份的过程它确保只有经过授权的用户或系统才能访问受保护的资源并执行特定操作。设想一个社交媒体应用用户需要登录才能创建帖子、删除帖子或投票。如果没有适当的认证机制任何人都可能冒充他人或执行未经授权的操作这将导致严重的安全漏洞。
API认证主要有两种主流方式 Session-based Authentication (基于会话的认证) 这种方式在传统的Web应用中非常常见。当用户登录时服务器会创建一个“会话”Session并在其后端服务器或API上存储该会话的信息以跟踪用户是否已登录。服务器通常会向客户端如浏览器发送一个包含Session ID的Cookie。客户端在后续的每个请求中都会携带这个Cookie服务器通过查找Session ID来验证用户身份。优点 相对简单服务器端可以轻松管理用户状态如登录/登出。缺点 有状态性 服务器需要维护每个用户的会话状态这在分布式系统或微服务架构中会增加复杂性因为会话数据可能需要跨多个服务器共享。可扩展性 增加了后端服务器的负担难以水平扩展。CSRF攻击 容易受到跨站请求伪造CSRF攻击。 Token-based Authentication (基于令牌的认证) Token-based认证特别是JWT是API认证的现代趋势。与Session-based认证不同它是一种“无状态”Stateless认证机制。这意味着后端API服务器本身不存储任何关于用户登录状态的信息。相反当用户成功登录后API会生成一个令牌Token并将其发送给客户端。客户端负责存储这个令牌通常在本地存储或Cookie中并在后续的每个需要认证的请求中将这个令牌包含在请求头如 Authorization 头中发送给API。API收到请求后会验证令牌的有效性如果有效则处理请求。优点 无状态性 API服务器无需维护会话状态大大提高了可扩展性和性能。跨域 更容易实现跨域Cross-Origin认证。安全性 通过数字签名保证令牌的完整性防止篡改。移动友好 更适合移动应用程序。
2. JWTJSON Web Tokens机制深度解析
JWT不仅仅是一个随机字符串它是一个紧凑且自包含的令牌用于在各方之间安全地传输信息。它由三部分组成用点号.分隔Header头部、Payload载荷和Signature签名。
2.1 JWT 的结构 Header (头部) Header通常包含两部分信息令牌的类型typ通常是JWT以及所使用的签名算法alg例如HS256或RS256。 示例 {alg: HS256,typ: JWT
}这部分信息经过Base64URL编码后构成JWT的第一部分。 Payload (载荷) Payload包含令牌的“声明”Claims即关于实体通常是用户和其他数据的陈述。这些声明可以是注册的如iss发行者、exp过期时间也可以是公共的或私有的。 重要提示JWT的Payload是经过Base64URL编码的但它不是加密的 这意味着任何获取到JWT的人都可以解码并查看其中的内容。 因此Payload中绝不应包含任何敏感信息例如用户的原始密码或银行卡号。通常Payload会包含用户IDuser_id、用户角色role等非敏感信息这些信息在API处理请求时可能需要用到。 示例 {user_id: 123,role: user,exp: 1678886400 // 过期时间戳 (示例)
}这部分信息经过Base64URL编码后构成JWT的第二部分。 Signature (签名) Signature是JWT最重要的部分它用于验证令牌的完整性确保令牌在传输过程中未被篡改。签名是通过将Header和Payload的Base64URL编码字符串与一个只有服务器知道的“秘密密钥”Secret Key一起使用Header中指定的算法如HS256进行哈希或HMAC计算而生成的。 签名计算公式大致如下 Signature HASH_Algorithm(Base64URL(Header) . Base64URL(Payload), Secret_Key) 这个签名构成JWT的第三部分。客户端不需要知道Secret Key因为它只用于服务器验证令牌的有效性。
2.2 JWT 的工作原理 (无状态认证)
JWT实现无状态认证的流程如下
用户登录 客户端向API的/login端点发送用户的凭据例如电子邮件和密码。验证凭据 API接收凭据后会查询数据库验证用户的身份。如果凭据正确API会使用一个秘密密钥生成一个JWT。返回令牌 API将生成的JWT以及令牌类型如Bearer作为响应的一部分返回给客户端。客户端存储令牌 客户端接收到JWT后将其存储在本地例如浏览器端的localStorage或SessionStorage或移动应用的内存中。访问受保护资源 在后续需要认证的请求中客户端会将此JWT放置在HTTP请求的Authorization头中格式通常为 Bearer token发送给API。API验证令牌 API接收到请求后会从请求头中提取JWT。它会使用相同的秘密密钥和签名算法重新计算JWT的签名并与接收到的签名进行比较。如果签名匹配并且令牌未过期API就认为该令牌是有效的并提取Payload中的信息如user_id来识别用户身份。处理请求 验证通过后API会处理请求并返回相应数据。
由于JWT包含了验证所需的所有信息通过签名API服务器无需在自己的数据库中存储任何会话信息从而实现了真正的无状态。
2.3 JWT 的安全性考虑
尽管JWT提供了强大的认证能力但理解其安全特性至关重要
JWT 未加密 如前所述JWT的Payload是可读的即使经过Base64URL编码也并非加密。因此永远不要在Payload中放置敏感信息如用户密码、敏感个人数据。签名保证完整性 签名机制的主要目的是确保令牌的完整性Integrity而非机密性。它能防止令牌在传输过程中被篡改。如果攻击者试图更改Payload中的任何数据例如将用户ID从123改为456由于他们不知道秘密密钥将无法生成一个有效的签名API在验证时会立即发现令牌被篡改并拒绝请求。秘密密钥的保密性 秘密密钥是JWT认证安全的核心。它必须严格保密只存储在服务器端绝不能暴露给客户端或存储在版本控制系统中。一旦秘密密钥泄露攻击者就可以伪造有效的JWT从而完全绕过认证机制。过期时间 JWT应设置合理的过期时间exp声明。这可以限制令牌的有效性即使令牌被盗其有效时间也有限。一旦令牌过期API将拒绝该令牌用户需要重新登录获取新的令牌。
3. 密码哈希使用 passlib 和 bcrypt
在用户注册时绝不能将用户的原始密码以纯文本形式存储在数据库中。FastAPI生态推荐使用passlib库和bcrypt哈希算法来实现密码的安全存储。 安装依赖 pip install passlib[bcrypt]
# 或者分开安装
# pip install passlib bcrypt(通常 pip install fastapi[all] 会包含 bcrypt) 配置 CryptContext 在您的 utils.py (或其他工具模块) 中 # app/utils.py
from passlib.context import CryptContext# 定义密码上下文指定使用bcrypt算法
pwd_context CryptContext(schemes[bcrypt], deprecatedauto)def hash_password(password: str): # 建议函数名更明确return pwd_context.hash(password)def verify_password(plain_password: str, hashed_password: str):return pwd_context.verify(plain_password, hashed_password)hash_password() 用于生成密码哈希verify_password() 用于验证明文密码与哈希是否匹配这是一个单向过程。
4. FastAPI与SQLModel用户认证实战
现在我们将这些概念整合到FastAPI应用中使用SQLModel作为ORM来实现用户注册和登录。
4.1 用户注册创建用户记录与哈希密码
用户注册流程接收用户凭据 - 哈希密码 - 存储到数据库。 定义Pydantic API Schema (schemas.py): 用于验证请求体和塑造响应。 # app/schemas.py
from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import Optionalclass UserCreate(BaseModel):email: EmailStrpassword: strclass UserOut(BaseModel): # 用于API响应不包含密码id: intemail: EmailStrcreated_at: datetime # SQLModel会自动处理datetime对象class Config:from_attributes True # 允许从ORM/SQLModel对象属性填充定义SQLModel数据库模型 (models.py): # app/models.py
from typing import Optional
from sqlmodel import SQLModel, Field
from datetime import datetime
from sqlalchemy import text # 用于服务器端默认值class User(SQLModel, tableTrue):__tablename__ users # 显式指定表名好习惯id: Optional[int] Field(defaultNone, primary_keyTrue)email: str Field(uniqueTrue, indexTrue, nullableFalse) # 邮箱唯一且建立索引password: str Field(nullableFalse) # 存储哈希后的密码created_at: datetime Field(default_factorydatetime.utcnow, # Pydantic层面默认值sa_column_kwargs{server_default: text(now())}, # 数据库层面默认值nullableFalse)【注】: default_factorydatetime.utcnow 是Pydantic层面的默认值而sa_column_kwargs{server_default: text(now())}是数据库服务器端的默认值。两者可以并存通常数据库默认值更可靠。 实现用户注册路由 (routers/users.py): # app/routers/users.py
from fastapi import APIRouter, Depends, status, HTTPException
from sqlmodel import Session # 使用SQLModel的Session# 假设 get_db 依赖已在 database.py
from ..database import get_db
from .. import models, schemas, utils # utils 包含密码哈希函数router APIRouter(prefix/users,tags[Users]
)router.post(/, status_codestatus.HTTP_201_CREATED, response_modelschemas.UserOut)
def register_user(user_payload: schemas.UserCreate, db: Session Depends(get_db)):# 1. 检查用户是否已存在 (可选但推荐)# existing_user db.exec(select(models.User).where(models.User.email user_payload.email)).first()# if existing_user:# raise HTTPException(status_codestatus.HTTP_400_BAD_REQUEST, detailEmail already registered)# 2. 密码哈希hashed_pwd utils.hash_password(user_payload.password) # 使用工具函数# 3. 创建新的用户记录 (SQLModel实例)user_payload.password hashed_pwddb_user models.User(**user_data_for_db)db.add(db_user)db.commit()db.refresh(db_user) # 刷新以获取数据库生成的id和created_atreturn db_user # FastAPI会用schemas.UserOut进行响应塑形此路由在创建用户前哈希密码。若Pydantic验证失败如邮件格式错误会自动返回422。成功后schemas.UserOut确保密码不被返回。
4.2 用户登录验证凭据并生成JWT
用户登录流程验证凭据 - 凭据正确 - 生成JWT - 返回JWT。 定义Pydantic API Schema (schemas.py): # app/schemas.py (继续添加)
# ... (UserCreate, UserOut 已定义) ...class Token(BaseModel): # 用于包装JWT响应access_token: strtoken_type: strclass TokenData(BaseModel): # 用于JWT Payload内部结构定义 (可选但有助于类型安全)id: Optional[int] None # JWT Payload中的用户ID【注】TokenData中的id类型与models.User.id (通常是int)保持一致更佳。 配置JWT生成与验证逻辑 (oauth2.py): # app/oauth2.py
from jose import JWTError, jwt
from datetime import datetime, timedelta, timezone # 确保导入timezone
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlmodel import Session # 使用SQLModel的Sessionfrom . import schemas, models # schemas.TokenData, models.User
from .database import get_db # 假设get_db返回SQLModel Session
from .core.config import settings # 假设配置在 core.configoauth2_scheme OAuth2PasswordBearer(tokenUrllogin) # login 是登录端点的相对路径SECRET_KEY settings.SECRET_KEY # 从配置中获取
ALGORITHM settings.ALGORITHM
ACCESS_TOKEN_EXPIRE_MINUTES settings.ACCESS_TOKEN_EXPIRE_MINUTESdef create_access_token(data: dict) - str:to_encode data.copy()# 使用 timezone.utc 确保是时区感知的UTC时间expire datetime.now(timezone.utc) timedelta(minutesACCESS_TOKEN_EXPIRE_MINUTES)to_encode.update({exp: expire.timestamp()}) # JWT标准通常用Unix时间戳# 或 to_encode.update({exp: expire}) # python-jose也能处理datetime对象encoded_jwt jwt.encode(to_encode, SECRET_KEY, algorithmALGORITHM)return encoded_jwtdef verify_access_token(token: str, credentials_exception: HTTPException) - schemas.TokenData:try:payload jwt.decode(token, SECRET_KEY, algorithms[ALGORITHM])user_id: Optional[int] payload.get(user_id) # 假设payload中存储的是user_idif user_id is None:raise credentials_exception# 将str类型的id如果payload存的是str转为intreturn schemas.TokenData(idint(user_id))except JWTError: # 包括过期、签名错误等raise credentials_exceptionexcept ValueError: # 处理int转换失败raise credentials_exceptiondef get_current_active_user(token: str Depends(oauth2_scheme), db: Session Depends(get_db)
) - models.User: # 返回SQLModel的User实例credentials_exception HTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detailCould not validate credentials,headers{WWW-Authenticate: Bearer},)token_data verify_access_token(token, credentials_exception)# 使用SQLModel的 session.get() 通过主键获取用户user db.get(models.User, token_data.id)if user is None:raise credentials_exception# 你可以在这里添加用户是否激活的检查 (if not user.is_active: ...)return user实现用户登录路由 (routers/auth.py): # app/routers/auth.py
from fastapi import APIRouter, Depends, status, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import Session, select # 导入selectfrom .. import schemas, models, utils, oauth2
from ..database import get_dbrouter APIRouter(tags[Authentication])router.post(/login, response_modelschemas.Token)
def login_for_access_token(form_data: OAuth2PasswordRequestForm Depends(), db: Session Depends(get_db)
):# OAuth2PasswordRequestForm 将email存储在username字段statement select(models.User).where(models.User.email form_data.username)user db.exec(statement).first()if not user or not utils.verify_password(form_data.password, user.password):raise HTTPException(status_codestatus.HTTP_401_UNAUTHORIZED, # 用401更合适detailIncorrect email or password,headers{WWW-Authenticate: Bearer},)access_token oauth2.create_access_token(data{user_id: user.id})return {access_token: access_token, token_type: bearer}在 main.py 中引入路由: # app/main.py
from fastapi import FastAPI
# 假设你的路由组织在 app.routers 包下
from .routers import posts_router, users_router, auth_router # 使用更明确的导入名
from .database import create_db_and_tables # 假设SQLModel表创建函数app FastAPI()app.on_event(startup)
def on_startup():create_db_and_tables() # 创建SQLModel定义的表app.include_router(posts_router.router) # 假设路由实例名为 router
app.include_router(users_router.router)
app.include_router(auth_router.router)app.get(/)
def read_root():return {message: Welcome to my API!}4.3 保护 API 端点
任何需要用户登录才能访问的API端点只需在路径操作函数中添加oauth2.get_current_active_user作为依赖项。
# app/routers/posts_router.py (假设帖子路由文件)
from fastapi import APIRouter, Depends, status, HTTPException, Response
from sqlmodel import Session, select
from typing import Listfrom .. import models, schemas, oauth2
from ..database import get_dbrouter APIRouter(prefix/posts,tags[Posts]
)router.post(/, status_codestatus.HTTP_201_CREATED, response_modelschemas.Post) # 假设schemas.Post是响应模型
def create_new_post(post_payload: schemas.PostCreate,db: Session Depends(get_db),current_user: models.User Depends(oauth2.get_current_active_user) # 注入当前用户
):# current_user 现在是经过认证的 models.User SQLModel实例# 你需要确保 models.Post 有 owner_id 字段来关联用户post_data_for_db post_payload.model_dump()# 假设 models.Post 有 owner_id 字段# post_data_for_db[owner_id] current_user.idnew_post models.Post(**post_data_for_db)db.add(new_post)db.commit()db.refresh(new_post)return new_postrouter.delete(/{post_id}, status_codestatus.HTTP_204_NO_CONTENT)
def delete_existing_post(post_id: int,db: Session Depends(get_db),current_user: models.User Depends(oauth2.get_current_active_user)
):db_post db.get(models.Post, post_id) # 使用SQLModel的get方法if not db_post:raise HTTPException(status_codestatus.HTTP_404_NOT_FOUND,detailfPost with id: {post_id} does not exist)# 权限检查确保当前用户是帖子的所有者# if db_post.owner_id ! current_user.id: # 假设 Post 模型有 owner_id# raise HTTPException(status_codestatus.HTTP_403_FORBIDDEN,# detailNot authorized to perform requested action)db.delete(db_post)db.commit()return Response(status_codestatus.HTTP_204_NO_CONTENT)总结
FastAPI结合SQLModel为构建安全的API提供了一套现代且高效的工具。通过深入理解JWT的无状态工作原理、其三部分结构Header、Payload、Signature以及其并非加密而是签名以保证数据完整性的特性开发者可以有效地利用它实现API认证。结合passlib和bcrypt进行密码哈希存储以及FastAPI强大的依赖注入系统和SQLModel简洁的ORM操作我们可以构建出安全、高效、可扩展的用户认证系统。这对于任何需要确保用户数据和操作安全的API项目都至关重要。