一个生产级的可观测性平台,其核心诉求是在海量事件流中提供近乎实时的洞察。我们面临的挑战是构建一个系统,它不仅要处理高吞吐量的异构事件,还要支持对这些事件状态的快速更新与查询。数据模型并非简单的追加,而是需要频繁进行 UPSERT 操作,例如,一个任务的状态从 PENDING 变为 RUNNING,最终变为 COMPLETED 或 FAILED。这就要求我们的存储层能高效处理行级别更新,同时查询层能快速响应前端的数据可视化请求。
问题的核心落在了两点:一是如何选择一个可靠、可扩展的消息中间件来解耦事件的生产与消费;二是如何选择一个能够高效处理 UPSERT 的存储格式,并将其与前端展示层无缝对接。
消息队列的十字路口:AWS SQS vs. Azure Service Bus
事件管道的入口是消息队列,这是整个系统稳定性的基石。在评估中,AWS SQS 和 Azure Service Bus 成为了我们的主要候选方案。
方案 A: AWS SQS (Simple Queue Service)
SQS 的最大优点在于其极致的简单性和与 AWS 生态的无缝集成。对于我们的场景,SQS FIFO (First-In-First-Out) 队列是必须的,因为它能保证与同一任务相关的事件(由 MessageGroupId 标识)按顺序处理,这对于追踪状态变更至关重要。
优势分析:
- 运维简单: 完全托管,几乎没有运维心智负担。
- 弹性伸缩: 自动扩展以应对流量洪峰,无需预先配置容量。
- 生态集成: 与 Lambda、Fargate、EC2 等计算服务以及 S3 等存储服务天然集成。
劣势分析:
- 功能相对基础: FIFO 队列的吞吐量有上限(默认 300 TPS,高吞吐量模式下为 3000 TPS)。没有内置的发布/订阅或高级消息过滤功能。如果需要将同一事件分发给多个不同类型的消费者,需要借助 SNS (Simple Notification Service) 来实现 Fan-out 模式,增加了架构复杂度。
- 强依赖
MessageGroupId: 顺序保证完全依赖生产者正确设置MessageGroupId。业务逻辑的任何疏忽都可能破坏顺序性。
一个典型的 SQS 生产者实现(使用 AWS SDK for Java v2)可能如下:
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;
import java.util.UUID;
public class SqsEventProducer {
private final SqsClient sqsClient;
private final String queueUrl;
public SqsEventProducer(String queueUrl, Region region) {
this.sqsClient = SqsClient.builder().region(region).build();
this.queueUrl = queueUrl;
}
/**
* Sends an observability event to the SQS FIFO queue.
* @param taskId The unique identifier for the task, used as the MessageGroupId.
* @param eventPayload The JSON payload of the event.
* @return The message ID of the sent message.
*/
public String sendEvent(String taskId, String eventPayload) {
// 在真实项目中,这里应该有更完善的日志和异常封装
try {
SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
.queueUrl(this.queueUrl)
.messageBody(eventPayload)
// MessageGroupId 是保证 FIFO 队列中消息分组有序的关键
.messageGroupId(taskId)
// MessageDeduplicationId 用于防止消息重复,这里使用 UUID 保证唯一性
.messageDeduplicationId(UUID.randomUUID().toString())
.build();
return sqsClient.sendMessage(sendMsgRequest).messageId();
} catch (SqsException e) {
// 生产环境中必须有重试逻辑和告警机制
System.err.println("Failed to send message to SQS: " + e.awsErrorDetails().errorMessage());
throw e; // Rethrow or handle appropriately
}
}
public void close() {
sqsClient.close();
}
}
这里的核心是 messageGroupId。所有与 taskId 相同的消息都将被路由到同一个消息分组,并按发送顺序被消费,这恰好满足了我们追踪任务状态的需求。
方案 B: Azure Service Bus
Azure Service Bus 提供了比 SQS 更丰富的功能集。它不仅仅是一个队列,更是一个功能完备的消息代理。
优势分析:
- 功能强大: 支持 Topics 和 Subscriptions,天然实现了发布/订阅模式。消费者可以通过规则过滤自己感兴趣的消息,这在未来系统扩展时非常有用。
- 会话 (Sessions): 提供了与 SQS FIFO 队列类似的顺序保证,通过
SessionId来实现。 - 事务与批处理: 支持跨多个消息的原子操作,增强了数据一致性。
劣势分析:
- 配置复杂: 命名空间、定价层(Standard/Premium)、Topics/Subscriptions 的管理比 SQS 更复杂。
- 成本考量: Premium 层的性能和隔离性更好,但成本也更高。Standard 层基于共享资源,性能可能有抖动。
- 跨云开销: 如果我们的主要计算和存储资源在 AWS,使用 Azure Service Bus 会引入跨云网络延迟和数据传输成本。
以下是使用 Azure SDK for Java 的一个简单生产者示例:
import com.azure.messaging.servicebus.*;
import com.azure.messaging.servicebus.models.CreateMessageBatchOptions;
import java.util.Arrays;
import java.util.List;
public class ServiceBusEventProducer {
private final ServiceBusSenderClient senderClient;
public ServiceBusEventProducer(String connectionString, String queueOrTopicName) {
this.senderClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sender()
.queueName(queueOrTopicName) // Or .topicName()
.buildClient();
}
/**
* Sends an observability event with a session ID for ordering.
* @param sessionId The session ID, equivalent to SQS's MessageGroupId.
* @param eventPayload The event data.
*/
public void sendSessionEnabledEvent(String sessionId, String eventPayload) {
ServiceBusMessage message = new ServiceBusMessage(eventPayload);
// 设置 SessionId 来保证消息的顺序处理
message.setSessionId(sessionId);
try {
senderClient.sendMessage(message);
} catch (Exception e) {
// 同样,需要健壮的错误处理
System.err.println("Failed to send message to Azure Service Bus: " + e.getMessage());
}
}
public void close() {
senderClient.close();
}
}
这里的 setSessionId 扮演了与 SQS MessageGroupId 类似的角色。
最终决策与架构概览
经过权衡,我们最终选择了 AWS SQS FIFO 队列。决策依据是:
- 最小化复杂度: 当前阶段,我们不需要 Service Bus 提供的复杂发布/订阅和过滤功能。单一消费者模型足以满足需求。保持技术栈的精简和统一(AWS-native)能显著降低运维成本。
- 性能满足预期: SQS FIFO 的吞吐量上限对于我们的初期和中期业务量是足够的。当业务增长到需要更高吞吐量时,可以通过增加分区(使用多个
MessageGroupId前缀)或引入 Kinesis 等方案进行扩展。 - 成本可控: SQS 的按量付费模型更具成本效益,尤其是在业务初期流量不稳定的情况下。
最终确定的数据流架构如下:
graph TD
subgraph Frontend
A[MUI DataGrid / Chart] -- GraphQL Query (Relay) --> B{GraphQL API}
end
subgraph Backend
B -- Serves Data From --> C[Query Engine e.g., Trino/Presto]
E[Event Producer] -- Task Events --> F[AWS SQS FIFO Queue]
G[Spark Streaming Consumer] -- Polls Events --> F
end
subgraph Data Lake
C -- SQL Query --> D[Apache Hudi Table on S3]
G -- Writes/Upserts Data --> D
end
style A fill:#cde4ff
style B fill:#cde4ff
style F fill:#ffb3ba
style D fill:#baffc9
核心实现:从 SQS 到 Hudi 的数据落地
这一环节是整个系统的核心。我们使用 Apache Spark Streaming 从 SQS 读取数据,并将其写入 Apache Hudi 表。Hudi 的 Copy-on-Write (COW) 存储类型非常适合我们的读多写少场景,它能提供更好的查询性能。
Spark Consumer 应用配置与代码
以下是一个生产级的 Spark Streaming 应用,用于消费 SQS 消息并写入 Hudi。
build.sbt 依赖 (Scala):
// ... other dependencies
val sparkVersion = "3.3.1"
val hudiVersion = "0.12.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
// Hudi Spark Bundle
"org.apache.hudi" %% "hudi-spark3.3-bundle" % hudiVersion,
// A community-maintained Spark-SQS connector
"io.github.spark-packages" %% "spark-sqs" % "3.0.0-s_2.12"
)
主应用代码 SqsToHudiProcessor.scala:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
object SqsToHudiProcessor {
def main(args: Array[String]): Unit = {
// 在真实项目中,这些配置应该来自配置文件或环境变量
val queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/observability-events.fifo"
val region = "us-east-1"
val hudiTablePath = "s3a://your-datalake-bucket/observability/tasks"
val tableName = "tasks_cow"
val checkpointLocation = "s3a://your-datalake-bucket/checkpoints/sqs_to_hudi"
val spark = SparkSession.builder()
.appName("SQS to Hudi Processor")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.getOrCreate()
import spark.implicits._
// 1. 从 SQS 读取原始消息流
val rawMessages = spark.readStream
.format("sqs")
.option("queueUrl", queueUrl)
.option("region", region)
.option("sqsEndpoint", s"https://sqs.$region.amazonaws.com")
// 一次拉取多少消息,需要根据流量和延迟要求调优
.option("maxMessagesPerFetch", 10)
.load()
// 2. 解析 JSON 数据并进行结构化转换
// 假设消息体是 {"taskId": "...", "status": "...", "timestamp": 167..., "details": "{...}"}
val eventSchema = new org.apache.spark.sql.types.StructType()
.add("taskId", "string")
.add("status", "string")
.add("timestamp", "long")
.add("details", "string")
val structuredEvents = rawMessages
.select(from_json($"body", eventSchema).as("data"))
.select("data.*")
.withColumn("ts", ($"timestamp" / 1000).cast("timestamp")) // 转换为 timestamp 类型
.withColumn("partition_path", date_format($"ts", "yyyy-MM-dd")) // 按天分区
// 3. 将数据流写入 Hudi 表
val hudiWriteOptions = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "taskId", // 记录的唯一主键
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", // 解决并发更新的合并字段,选择时间戳最大的记录
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_path", // 分区字段
DataSourceWriteOptions.TABLE_NAME.key -> tableName,
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.HIVE_SYNC_ENABLED.key -> "true",
DataSourceWriteOptions.HIVE_TABLE.key -> tableName,
DataSourceWriteOptions.HIVE_DATABASE.key -> "observability",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key -> "partition_path",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
HoodieWriteConfig.TBL_NAME.key -> tableName
)
val query = structuredEvents.writeStream
.format("hudi")
.options(hudiWriteOptions)
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("1 minute")) // 每分钟触发一次 micro-batch
.option("checkpointLocation", checkpointLocation)
.start(hudiTablePath)
// 单元测试思路:
// 1. Mock SQS source: 使用 MemoryStream[String] 模拟 SQS 消息输入。
// 2. 构造包含新增、更新操作的 JSON 字符串序列。
// 3. 在本地文件系统上运行 Hudi 写入。
// 4. 断言:使用 SparkSession.read.format("hudi").load(localPath) 读取结果。
// 5. 验证最终的记录是否符合预期(例如,taskId 的状态是否为最新的,记录总数是否正确)。
query.awaitTermination()
}
}
这段代码的核心在于 Hudi 的写入配置。RECORDKEY_FIELD (taskId) 告诉 Hudi 如何识别一条唯一的记录。PRECOMBINE_FIELD (timestamp) 则是 UPSERT 的关键:当多条消息具有相同的 taskId 时,Hudi 会保留 timestamp 值最大的那条记录,这完美地解决了状态更新的问题。
前端呈现:Relay 与 Material-UI 的协同
前端的目标是提供一个高性能、响应式的仪表盘来展示 Hudi 表中的数据。我们选择 Relay 是因为它与 GraphQL 的深度集成,能够通过声明式的数据获取方式,精确地获取组件所需的数据,避免过度获取和不足获取。
GraphQL Schema (由后端 API 提供):
type Task {
taskId: ID!
status: String!
lastUpdatedAt: DateTime!
details: String
}
type Query {
tasks(first: Int, after: String, statusFilter: String): TaskConnection
}
Relay Fragment 与 React 组件:
我们使用 Material-UI (MUI) 的 DataGrid 组件来展示任务列表,并用 Relay 的 useFragment 来管理数据。
// TaskRow.tsx
import React from 'react';
import { useFragment, graphql } from 'react-relay';
import { TaskRow_task$key } from './__generated__/TaskRow_task.graphql';
import { TableCell, TableRow } from '@mui/material';
const taskFragment = graphql`
fragment TaskRow_task on Task {
taskId
status
lastUpdatedAt
}
`;
interface Props {
task: TaskRow_task$key;
}
export const TaskRow: React.FC<Props> = ({ task }) => {
const data = useFragment(taskFragment, task);
// 根据 status 渲染不同颜色的 Chip 等 UI 逻辑
return (
<TableRow>
<TableCell>{data.taskId}</TableCell>
<TableCell>{data.status}</TableCell>
<TableCell>{new Date(data.lastUpdatedAt).toLocaleString()}</TableCell>
</TableRow>
);
};
// TaskList.tsx
import React from 'react';
import { usePaginationFragment, graphql } from 'react-relay';
import { TaskList_query$key } from './__generated__/TaskList_query.graphql';
import { TaskRow } from './TaskRow';
import { Table, TableBody, TableHead, TableContainer, Paper } from '@mui/material';
const taskListFragment = graphql`
fragment TaskList_query on Query
@refetchable(queryName: "TaskListPaginationQuery")
@argumentDefinitions(
count: { type: "Int", defaultValue: 10 }
cursor: { type: "String" }
statusFilter: { type: "String" }
) {
tasks(first: $count, after: $cursor, statusFilter: $statusFilter)
@connection(key: "TaskList_tasks") {
edges {
node {
id
...TaskRow_task
}
}
}
}
`;
interface Props {
query: TaskList_query$key;
}
export const TaskList: React.FC<Props> = ({ query }) => {
const { data, loadNext, hasNext } = usePaginationFragment(taskListFragment, query);
// MUI DataGrid 或者自定义 Table 的渲染逻辑
// 结合 Intersection Observer 和 loadNext 实现无限滚动
return (
<TableContainer component={Paper}>
<Table>
<TableHead>
{/* ... Table Headers ... */}
</TableHead>
<TableBody>
{data.tasks?.edges?.map(edge =>
edge?.node ? <TaskRow key={edge.node.id} task={edge.node} /> : null
)}
</TableBody>
</Table>
</TableContainer>
);
};
通过这种方式,数据获取的逻辑被封装在 Relay 的 Fragment 中,与 UI 组件紧密耦合。当用户进行筛选或翻页时,Relay 会自动生成最高效的 GraphQL 查询,后端 API 则将这些查询转换为对 Trino/Presto 的 SQL 查询,最终从 Hudi 表中获取数据。
架构的局限性与未来迭代方向
当前这套架构并非没有缺点。首先,端到端延迟受限于 Spark Streaming 的微批处理间隔(Trigger.ProcessingTime)。虽然可以缩短到秒级,但无法做到毫秒级的真流式处理。如果业务对延迟有更苛刻的要求,需要考虑使用 Flink 配合 Hudi 的流式写入器。
其次,查询性能严重依赖于底层的查询引擎(如 Trino)和 Hudi 表的维护情况。Hudi 表需要定期进行 compaction (对于 MOR 表) 和 clustering 来优化文件大小和布局,以保证查询性能不会随着数据量的增长而衰减。这些运维任务需要被自动化。
最后,虽然前端通过 Relay 实现了高效的数据获取,但如果查询并发量非常高,后端的查询引擎可能会成为瓶颈。针对高频访问的聚合结果,可以引入一个缓存层(如 Redis)来减轻数据湖的查询压力。这条路径允许我们从一个健壮、可扩展的基线出发,逐步演进以应对未来的性能挑战。