在构建任何面向企业的SaaS产品时,将简单的单用户技术演示转化为一个健壮、安全、可扩展的多租户服务,是架构设计的核心挑战。一个典型的检索增强生成(RAG)应用,若直接暴露同步API接口进行文档处理和向量化,在生产环境中会迅速遭遇瓶颈:API请求超时、资源争抢、以及最关键的——数据隔离缺失。本文记录了将一个RAG原型重构为生产级异步多租户架构的决策过程与核心实现,该架构围绕四大支柱构建:用于安全认证与租户身份传递的 OAuth 2.0,用于任务解耦与削峰填谷的 AWS SQS,用于租户数据物理隔离的 Milvus Partitions,以及粘合所有组件的 LangChain。
架构决策:同步阻塞 vs. 异步解耦
最初的RAG实现是一个简单的FastAPI同步接口:用户通过REST API上传文档,服务器在同一个HTTP请求生命周期内完成文档加载、切分、向量化,并将其写入Milvus。
方案A:同步架构
# 一个典型的、不适用于生产的同步RAG处理方式
@app.post("/upload/sync")
def upload_document_sync(file: UploadFile, db: Session = Depends(get_db)):
# 1. 保存临时文件
content = file.file.read()
# 2. LangChain处理(耗时操作)
loader = UnstructuredFileLoader("temp_file.tmp")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
docs = text_splitter.split_documents(documents)
# 3. 向量化并存入Milvus(网络IO与CPU密集)
embeddings = OpenAIEmbeddings()
vector_store = Milvus.from_documents(
docs,
embeddings,
connection_args={"host": "localhost", "port": "19530"},
collection_name="articles"
)
return {"status": "success", "message": "Document processed."}
这个方案的弊端在生产环境中是致命的:
- 糟糕的用户体验: 上传一个几十兆的PDF可能需要数分钟的处理时间,HTTP连接会因超时而中断。
- 资源耗尽: 并发请求会同时进行CPU密集的向量化计算,导致服务器资源迅速耗尽,影响其他API的响应能力。
- 无状态与扩展性差: 如果服务实例在处理过程中崩溃,任务就会丢失。横向扩展实例也无法解决长耗时请求的根本问题。
- 无租户隔离: 所有向量数据都存放在同一个
articles集合中,这是多租户场景下绝对无法接受的数据安全漏洞。
方案B:异步解耦架构
为了解决上述所有问题,我们必须转向异步架构。核心思路是将耗时的处理流程与API请求分离。API的角色转变为一个轻量级的任务接收器,它仅负责验证请求、接收数据,然后将处理任务委托给一个可靠的后台系统。
graph TD
subgraph "用户侧"
Client[客户端]
end
subgraph "API 层 (AWS Fargate / EC2)"
API_GW[API Gateway]
Auth[OAuth 2.0 验证中间件]
FastAPI[FastAPI 应用]
end
subgraph "消息队列 (AWS SQS)"
SQS[SQS Queue: IngestionQueue]
end
subgraph "后台处理层 (Worker Fleet)"
Worker[Ingestion Worker]
end
subgraph "数据与模型层"
Milvus[Milvus Vector DB]
EmbeddingModel[Embedding Service]
S3[S3 Bucket: raw-documents]
end
Client -- HTTPS Request (携带JWT) --> API_GW
API_GW -- 转发 --> FastAPI
FastAPI -- 调用 --> Auth
Auth -- 验证JWT, 提取tenant_id --> FastAPI
FastAPI -- 1. 上传文件 --> S3
FastAPI -- 2. 发送消息 (S3_URI, tenant_id) --> SQS
FastAPI -- HTTP 202 Accepted --> Client
Worker -- 轮询拉取消息 --> SQS
Worker -- 1. 从S3下载文件 --> S3
Worker -- 2. 调用LangChain处理 --> EmbeddingModel
Worker -- 3. 写入向量 (指定Partition) --> Milvus
Milvus -- (Partition A for Tenant A) --> Milvus
Milvus -- (Partition B for Tenant B) --> Milvus
这个架构的优势显而易见:
- 高可用性: API服务变得轻量且无状态,易于扩展。SQS作为托管消息队列,保证了任务的持久化,即使worker全部宕机,任务也不会丢失。
- 弹性伸缩: 可以根据SQS队列的长度动态调整后台Worker的数量,实现成本与性能的平衡。
- 关注点分离: API服务只关心认证和任务分发,Worker只关心数据处理。
技术选型决策:
- AWS SQS: 选择标准队列而非FIFO队列。 ingestion 任务通常可以独立处理,对顺序没有强依赖,标准队列提供了更高的吞吐量。它的“至少一次”传递模型要求我们的worker必须具备幂等性。
- OAuth 2.0 (JWT Bearer Token): 这是API安全的标准。我们将租户标识符(
tenant_id)作为JWT的一个自定义claim,这样API层和Worker层就能安全地识别每个请求和任务的归属。 - Milvus Partitions: Milvus的Partition功能允许在同一个Collection内部创建逻辑隔离区。查询可以被限定在特定的Partition内,实现了高效且物理上隔离的多租户数据存储,避免了为每个租户创建一个Collection的管理噩梦。
核心实现:代码中的架构
我们将通过三个关键部分的代码来展示该架构的实现:安全API端点、异步处理Worker、以及租户隔离的查询。
1. 安全的异步任务提交API (FastAPI)
API端点需要完成三件事:通过OAuth 2.0验证请求并提取租户ID,将上传的文件存入S3,然后将包含文件位置和租户ID的消息发送到SQS。
配置文件 config.py
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# AWS
AWS_REGION: str = "us-east-1"
AWS_ACCESS_KEY_ID: str
AWS_SECRET_ACCESS_KEY: str
S3_BUCKET_NAME: str = "my-rag-raw-documents"
SQS_QUEUE_URL: str
# OAuth 2.0 / JWT
AUTH0_DOMAIN: str
AUTH0_API_AUDIENCE: str
JWT_ALGORITHMS: str = "RS256"
class Config:
env_file = ".env"
settings = Settings()
安全依赖 security.py
import httpx
from fastapi import Request, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
# 这是一个简化的Auth0 JWT验证器,生产中需要更完善的JWKS缓存机制
class Auth0JWTBearer(HTTPBearer):
def __init__(self, domain: str, audience: str, auto_error: bool = True):
super().__init__(auto_error=auto_error)
self.domain = domain
self.audience = audience
self.jwks_client = httpx.get(f"https://{domain}/.well-known/jwks.json").json()
async def __call__(self, request: Request):
credentials = await super().__call__(request)
if not credentials:
raise HTTPException(status_code=401, detail="Not authenticated")
token = credentials.credentials
try:
unverified_header = jwt.get_unverified_header(token)
rsa_key = {}
for key in self.jwks_client["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"],
}
if rsa_key:
payload = jwt.decode(
token,
rsa_key,
algorithms=["RS256"],
audience=self.audience,
issuer=f"https://{self.domain}/",
)
return payload
raise HTTPException(status_code=401, detail="Invalid token signature")
except JWTError as e:
raise HTTPException(status_code=401, detail=f"Token validation failed: {e}")
# 创建一个可依赖注入的实例
auth_scheme = Auth0JWTBearer(domain=settings.AUTH0_DOMAIN, audience=settings.AUTH0_API_AUDIENCE)
# 从payload中提取租户ID的依赖项
def get_tenant_id(payload: dict = Depends(auth_scheme)) -> str:
# 假设租户ID存储在自定义claim 'https://myapp.com/tenant_id'
tenant_id = payload.get("https://myapp.com/tenant_id")
if not tenant_id:
# 在真实项目中,应该记录这个异常,这表示JWT配置有问题
raise HTTPException(status_code=403, detail="Tenant ID not found in token.")
return tenant_id
API端点 main.py
import boto3
import uuid
import json
import logging
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException
from botocore.exceptions import ClientError
from config import settings
from security import get_tenant_id
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = FastAPI()
# 初始化AWS客户端
s3_client = boto3.client(
"s3",
region_name=settings.AWS_REGION,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
)
sqs_client = boto3.client(
"sqs",
region_name=settings.AWS_REGION,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
)
@app.post("/v1/documents/ingest")
async def ingest_document(
tenant_id: str = Depends(get_tenant_id),
file: UploadFile = File(...)
):
"""
接收文档上传,验证权限,并将其推送到SQS进行异步处理
"""
file_extension = file.filename.split('.')[-1]
s3_object_key = f"uploads/{tenant_id}/{uuid.uuid4()}.{file_extension}"
# 1. 上传文件到S3
try:
s3_client.upload_fileobj(file.file, settings.S3_BUCKET_NAME, s3_object_key)
logging.info(f"Tenant '{tenant_id}' uploaded file to s3://{settings.S3_BUCKET_NAME}/{s3_object_key}")
except ClientError as e:
logging.error(f"S3 upload failed for tenant '{tenant_id}': {e}")
raise HTTPException(status_code=500, detail="Failed to upload document.")
# 2. 构建并发送SQS消息
message_body = {
"tenant_id": tenant_id,
"s3_bucket": settings.S3_BUCKET_NAME,
"s3_key": s3_object_key,
"original_filename": file.filename,
}
try:
sqs_client.send_message(
QueueUrl=settings.SQS_QUEUE_URL,
MessageBody=json.dumps(message_body),
MessageGroupId=tenant_id # 如果使用FIFO队列,这个很重要
)
logging.info(f"Ingestion task for tenant '{tenant_id}' sent to SQS.")
except ClientError as e:
logging.error(f"SQS send message failed for tenant '{tenant_id}': {e}")
# 这里的错误处理很关键,可能需要一个补偿事务来删除已上传的S3文件
raise HTTPException(status_code=500, detail="Failed to queue ingestion task.")
return {"status": "pending", "message": "Document ingestion has started."}
这里的关键点是,API几乎不执行任何重计算。它的职责是验证、持久化原始数据(S3)和持久化任务(SQS)。返回HTTP 202 Accepted向客户端表明请求已被接受,但处理尚未完成。
2. 幂等性的后台处理Worker
Worker是一个独立的长时间运行的进程,它不断从SQS拉取消息并执行真正的处理逻辑。
worker.py
import boto3
import json
import logging
import tempfile
import time
from langchain.document_loaders import S3FileLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Milvus
# ... 省略config和logging配置 ...
class IngestionWorker:
def __init__(self, milvus_host, milvus_port, collection_name="enterprise_rag"):
self.sqs_client = boto3.client("sqs", region_name=settings.AWS_REGION)
self.s3_client = boto3.client("s3", region_name=settings.AWS_REGION)
self.queue_url = settings.SQS_QUEUE_URL
# 初始化向量存储相关
self.embeddings = OpenAIEmbeddings()
self.milvus_connection_args = {"host": milvus_host, "port": milvus_port}
self.collection_name = collection_name
# 确保主集合存在
self._ensure_collection_exists()
def _ensure_collection_exists(self):
# 使用pymilvus客户端检查和创建集合
from pymilvus import utility, Collection, CollectionSchema, FieldSchema, DataType
if not utility.has_collection(self.collection_name, using='default'):
fields = [
FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=True, max_length=100),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1536) # 维度取决于你的 embedding model
]
schema = CollectionSchema(fields, "Enterprise RAG collection")
Collection(self.collection_name, schema, using='default')
logging.info(f"Collection '{self.collection_name}' created.")
def process_message(self, message):
receipt_handle = message['ReceiptHandle']
try:
body = json.loads(message['Body'])
tenant_id = body['tenant_id']
s3_bucket = body['s3_bucket']
s3_key = body['s3_key']
logging.info(f"Processing message for tenant '{tenant_id}', file s3://{s3_bucket}/{s3_key}")
# 确保租户的 Milvus Partition 存在
from pymilvus import utility
if not utility.has_partition(self.collection_name, tenant_id):
utility.create_partition(self.collection_name, tenant_id)
logging.info(f"Created partition '{tenant_id}' in collection '{self.collection_name}'.")
# 使用LangChain加载文档
loader = S3FileLoader(s3_bucket, s3_key)
documents = loader.load()
# 文本分割
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)
docs = text_splitter.split_documents(documents)
# 这是一个确保幂等性的简单思路,生产中可能需要更复杂的去重逻辑
# 例如基于文档内容的哈希值检查
logging.info(f"Split document into {len(docs)} chunks.")
# 向量化并存入指定租户的Partition
Milvus.from_documents(
docs,
self.embeddings,
connection_args=self.milvus_connection_args,
collection_name=self.collection_name,
partition_name=tenant_id, # <-- 这是多租户隔离的核心
)
logging.info(f"Successfully ingested {len(docs)} vectors into partition '{tenant_id}'.")
# 处理成功后,从队列中删除消息
self.sqs_client.delete_message(QueueUrl=self.queue_url, ReceiptHandle=receipt_handle)
logging.info(f"Message for tenant '{tenant_id}' processed and deleted from SQS.")
except Exception as e:
# 这里的错误处理非常重要。如果失败,消息会保留在队列中,
# 并在VisibilityTimeout后重新变为可见,从而实现重试。
# 需要配置死信队列(DLQ)来处理反复失败的消息。
logging.error(f"Failed to process message. Error: {e}. Message will be retried.", exc_info=True)
def run(self):
logging.info("Worker started. Polling for messages...")
while True:
response = self.sqs_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=5,
WaitTimeSeconds=20
)
messages = response.get('Messages', [])
for message in messages:
self.process_message(message)
if not messages:
# 在没有消息时短暂休眠,避免空轮询消耗CPU
time.sleep(5)
if __name__ == "__main__":
worker = IngestionWorker(milvus_host="localhost", milvus_port="19530")
worker.run()
Worker的核心在于partition_name=tenant_id这行代码。它指示LangChain的Milvus集成将所有生成的向量定向到与当前任务租户ID同名的Partition中。这从物理存储层面保证了A租户的数据永远不会与B租户的数据混合。
3. 租户隔离的查询API
查询时,也必须使用同样的分区隔离机制。
# 在 main.py 中增加查询端点
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
@app.post("/v1/query")
async def query_documents(
query: str,
tenant_id: str = Depends(get_tenant_id)
):
"""
接收用户查询,并仅在授权租户的数据分区内进行检索
"""
try:
# 初始化Milvus,准备进行检索
vector_store = Milvus(
embedding_function=OpenAIEmbeddings(),
connection_args={"host": "localhost", "port": "19530"},
collection_name="enterprise_rag"
)
# 创建一个仅搜索特定分区的检索器
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={
"k": 5,
"partition_names": [tenant_id] # <-- 关键的查询隔离
}
)
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
# 构建QA链
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever
)
result = qa_chain.run(query)
return {"answer": result}
except Exception as e:
logging.error(f"Query failed for tenant '{tenant_id}': {e}", exc_info=True)
raise HTTPException(status_code=500, detail="An error occurred during query.")
在as_retriever方法中,我们通过search_kwargs传递partition_names=[tenant_id]。这确保了Milvus在执行向量相似性搜索时,只会扫描属于该租户的Partition。即使恶意用户尝试通过构造查询来访问其他数据,数据库层面已经施加了不可逾越的限制。
架构的局限性与未来迭代方向
当前这套架构解决了核心的多租户、异步处理和安全问题,但它并非完美。在真实的生产环境中,还有几个方面需要深化:
- Worker的幂等性: 当前的Worker实现不完全幂等。如果一个文档被处理成功,但在删除SQS消息之前Worker崩溃,任务会被重传,导致数据重复。一个可行的方案是在数据库中记录已处理文件的哈希值或唯一标识符,在处理前进行检查。
- 死信队列与监控: 对于持续失败的任务,必须配置一个SQS死信队列(DLQ)。同时,需要建立监控告警,当DLQ中有消息堆积时,能及时通知开发人员进行干预。
- 任务状态追踪: 当前架构中,客户端无法查询文档处理的进度。一个完整的解决方案需要引入一个状态数据库(如Redis或DynamoDB),API在提交任务时创建一个带状态(如
PENDING)的记录,Worker在处理中和处理后更新该状态。客户端可以通过另一个API轮询此状态。 - Milvus集群管理: 随着租户数量的增多,需要考虑Milvus集群自身的扩展性、备份和恢复策略。对Partition的管理,如创建、加载/释放,也需要自动化,以优化内存使用。
- 成本考量: SQS和后台Worker的持续运行会产生费用。对于流量波动大的场景,可以基于队列长度配置更复杂的Auto Scaling策略,甚至在没有任务时缩容到零(例如使用AWS Lambda作为Worker)。