1. 简介 FastAPI 是增长最快的 Python Web 框架之一,其突出特性之一就是对异步编程的支持。但”async”在实践中到底意味着什么?什么时候应该使用它,什么时候应该避免使用它?
在这篇文章中,我们将分解 FastAPI 中异步编程的要点。无论你是 async/await 的新手,还是想知道它如何影响你的 API 性能,这篇文章将帮助你理解这些概念,并编写更好、更快的 Web 应用程序。
2. 什么是异步编程? 在传统的(同步)Python 代码中,每个操作都是按顺序发生的。如果你的代码需要等待某些东西——比如文件加载或网络响应——它会阻塞整个线程,直到该操作完成。FastAPI 异步请求流程
异步编程改变了这一点。异步代码不是等待,而是在等待时暂停 执行,同时让其他任务运行。这对于 I/O 密集型任务特别有用,例如:
发起 HTTP 请求
读取/写入文件
与数据库通信
使用 Python 的 async def 和 await 语法,你可以定义协程,高效地并发处理许多 I/O 操作——而无需创建新线程或进程。
3. Python 中的异步与同步:快速入门 在深入了解 FastAPI 的异步功能之前,理解 Python 中同步和异步代码的区别至关重要。
在同步代码中,所有操作都是按顺序运行的。如果一个函数需要时间(例如,获取网页),整个程序会暂停,直到该任务完成:
1 2 3 4 5 6 7 8 import requestsdef get_data (): response = requests.get("https://httpbin.org/delay/3" ) return response.json() data = get_data()
相比之下,异步代码使用 async def 定义协程,使用 await 暂停协程直到结果准备好——而不会阻塞其他任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import httpximport asynciofrom datetime import datetimeasync def get_data (): print (f"[{datetime.now()} ] get_data start" ) async with httpx.AsyncClient() as client: response = await client.get("https://httpbin.org/delay/3" ) print (f"[{datetime.now()} ] get_data end" ) return response.json()async def quick_task (): print (f"[{datetime.now()} ] quick_task start" ) await asyncio.sleep(1 ) print (f"[{datetime.now()} ] quick_task end" )async def main (): task1 = asyncio.create_task(get_data()) task2 = asyncio.create_task(quick_task()) await asyncio.gather(task1, task2)if __name__ == "__main__" : asyncio.run(main())
所以,你可以看到 get_data 不会阻塞 quick_task 函数。
主要区别:
特性
同步 (def)
异步 (async def)
阻塞性
是
否(使用 await 时)
并发运行
否
是(使用事件循环)
适用于
CPU 密集型任务
I/O 密集型任务
注意:仅仅编写 async def 并不会让你的函数并行运行——你仍然需要在其中 await 异步调用。
4. 为什么异步在 FastAPI 中很重要 FastAPI 构建在 Starlette 之上,这是一个 ASGI(异步服务器网关接口)框架。这使 FastAPI 能够原生支持使用 Python 的 async/await 语法的异步端点。
这意味着 FastAPI 可以处理许多 I/O 密集型操作——比如从数据库读取、调用外部 API 或流式传输数据——而不会阻塞主线程 。这在高负载下带来更高的性能,特别是在实时和高并发环境中。
以下是 FastAPI 中异步路由的样子:
1 2 3 4 5 6 7 8 9 10 from fastapi import FastAPIimport httpx app = FastAPI()@app.get("/quote" ) async def fetch_quote (): async with httpx.AsyncClient() as client: response = await client.get("https://httpbin.org/delay/3" ) return response.json()
因为这个端点是异步的,FastAPI 可以在等待 API 响应的同时继续处理其他请求——从而提高吞吐量和响应性。
🧠 需要了解 :你仍然可以使用普通的 def 定义同步端点,FastAPI 会在线程池中运行它们——但异步函数对于 I/O 密集型操作更高效。
5. 在 FastAPI 中定义异步端点 FastAPI 使定义异步端点变得非常简单。你只需要使用 async def 声明路由处理器,并在其中使用 await 处理任何 I/O 密集型操作。
以下是一个最小示例:
1 2 3 4 5 6 7 from fastapi import FastAPI app = FastAPI()@app.get("/ping" ) async def ping (): return {"message" : "pong" }
在这个例子中,/ping 路由使用 async def 定义,但由于它不执行任何真正的 I/O 操作,所以它像普通函数一样运行。只有当你使用 await 与异步兼容的库时,才能获得性能优势。
让我们比较同一个端点的异步和同步版本异步版本(推荐用于 I/O 密集型任务):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import httpximport requests@app.get("/async-quote" ) async def async_quote (): async with httpx.AsyncClient() as client: response = await client.get("https://httpbin.org/delay/3" ) return response.json()@app.get("/sync-quote" ) def sync_quote (): response = requests.get("https://httpbin.org/delay/3" ) return response.json()
🔥 FastAPI 同时支持 def 和 async def 端点。然而,只有 async def 端点才能充分利用 FastAPI 的非阻塞 ASGI 堆栈来实现高并发。 在内部,FastAPI 会在线程池中运行基于 def 的函数(通过 run_in_threadpool),这会增加一些开销,并且在处理数千个并发连接时扩展性不佳。
6. await 实战:I/O 密集型示例(使用异步数据库) 在 FastAPI 中,await 最常见和最强大的用途之一是处理 I/O 密集型操作,如查询数据库。在同步代码中,此类操作会阻塞主线程。但使用异步驱动程序,FastAPI 可以并发处理许多请求,即使有些请求正在等待数据库响应。
以下是一个使用 FastAPI 与 SQLModel 以及由 SQLAlchemy 1.4+ 和 asyncpg 驱动的异步引擎的实用示例。
本文的代码位于名为 04_fastapi_async 的 GitHub 仓库中。 https://github.com/gzthss/fastapi_tutorial.git
🔧 设置:安装依赖 1 pip install sqlmodel[asyncio] asyncpg
让我们通过一个使用 SQLModel (来自 FastAPI 创建者的异步兼容 ORM)和 Postgresql 的实用示例来了解。
1 2 3 4 5 6 7 8 9 10 from sqlmodel import SQLModel, Fieldfrom typing import Optional class Item (SQLModel, table=True ): id : Optional [int ] = Field(default=None , primary_key=True ) name: str description: Optional [str ] = None
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from sqlmodel import SQLModelfrom sqlmodel.ext.asyncio.session import AsyncSessionfrom sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker database_url = "postgresql+asyncpg://postgres:admin123@192.168.1.11:5432/postgres" engine = create_async_engine(database_url, echo=True ) async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False )async def get_session (): async with async_session() as session: yield sessionasync def init_db (): async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from fastapi import FastAPI, HTTPException, Dependsfrom sqlmodel import selectimport uvicornfrom database import init_db, engine, get_sessionfrom models import Itemfrom sqlmodel.ext.asyncio.session import AsyncSession app = FastAPI()@app.on_event("startup" ) async def startup (): await init_db()@app.get("/items/{item_id}" ) async def read_item (item_id: int , session: AsyncSession = Depends(get_session ) ): query = select(Item).where(Item.id == item_id) result = await session.exec (query) item = result.one_or_none() if item is None : raise HTTPException(status_code=404 , detail="Item not found" ) return itemif __name__ == "__main__" : uvicorn.run(app, host="127.0.0.1" , port=8000 )
下图显示了 API 调用的结果。
希望这篇文章能帮助你更好地理解 FastAPI 中的异步编程,并在实际项目中提升你的 API 性能。如果你觉得有帮助,欢迎与他人分享!