基于C++ CDC与Hudi-HBase的金融交易数据分层存储架构实现


我们面临一个棘手的工程问题:一个核心的、高频交易后处理系统,其底层依赖于一个庞大的关系型数据库集群。该系统每天产生数十亿条交易流水、委托变更和清算记录。业务需求被清晰地划分为两个极端:一方面,交易运营团队需要对过去24小时内的任意订单进行亚秒级(目标P99 < 50ms)的点查,以应对实时风控和结算质询;另一方面,量化分析与合规审计团队需要对数年的历史数据执行复杂的聚合与关联分析(Ad-hoc SQL),为模型训练和监管报告提供支持。

现有的架构,即在关系型数据库上通过增加索引和读写分离来硬扛,已经走到了尽头。点查性能在数据量超过一个阈值后急剧下降,索引维护成本高昂;分析型查询则常常导致主库负载飙升,甚至引发线上故障。将所有数据放入一个单一的技术栈,无论是纯粹的NoSQL还是纯粹的数据仓库,都无法同时满足这两个看似矛盾的需求。

方案A:纯HBase架构的局限

初步设想是使用HBase,它天生为低延迟、高并发的键值查询而生。通过精心设计的Rowkey,将交易ID、用户ID等关键信息编码进去,可以轻松实现毫秒级的点查。

Rowkey设计示例:[userId_reversed]_[timestamp_ms]_[tradeId]

这种设计利用HBase的有序存储特性,能快速定位到某个用户在特定时间范围内的交易。

但它的问题同样致命。对于分析场景,例如“统计上个季度所有交易额超过100万的VIP客户的交易品种分布”,这种查询需要进行全表扫描(Full Table Scan)。在HBase上执行这种操作是对集群的毁灭性打击,效率极低,且缺乏成熟的SQL优化器。虽然Apache Phoenix项目提供了SQL层,但在复杂JOIN和聚合操作上,其性能和灵活性远不及专门的分析引擎。

方案B:纯数据湖(Apache Hudi)架构的挑战

另一个方向是直接拥抱数据湖。将所有数据以Apache Hudi的格式存储在对象存储上,利用Spark SQL或Trino/Presto提供强大的分析能力。Hudi的Copy-on-Write (CoW) 或 Merge-on-Read (MoR) 模式可以优雅地处理数据更新和删除,同时提供ACID事务保证和时间旅行能力,非常适合分析和合规场景。

但它的短板在于点查。即便Hudi通过索引机制(如Bloom Index, Simple Index)优化了文件的定位,一次点查仍然需要启动一个分布式计算任务(如Spark Job)。这个过程包含任务调度、资源申请、代码分发等一系列开销,其端到端延迟通常在秒级甚至更高,完全无法满足50ms的SLA要求。

最终选择:Hudi-HBase混合分层架构

权衡之下,我们决定采用一种混合架构,将数据按访问模式进行分层:

  • 热数据层 (Hot Layer): 使用HBase存储最近24-72小时的交易数据。这一层专为高性能点查服务,保证交易运营的实时性需求。
  • 温/冷数据层 (Warm/Cold Layer): 使用Apache Hudi存储全量的历史数据。这一层面向分析师和数据科学家,通过Spark SQL提供灵活、高效的批量查询能力。

这种架构的关键在于如何构建一个统一、可靠、低延迟的数据采集与分发管道,确保数据能准确无误地流入两个不同的存储系统。

graph TD
    A[Legacy RDBMS] -- C++ CDC Agent --> B[Kafka: raw_log_topic];
    B -- Spark Structured Streaming --> C{Data Dispatcher};
    C -- Hot Path --> D[HBase Sink];
    C -- Warm Path --> E[Hudi Sink];
    D -- P99 < 50ms Point Queries --> F[Trading Ops Platform];
    E -- Ad-hoc SQL Queries --> G[Analytics & BI Platform];

核心挑战转移到了数据入口:如何从老旧的关系型数据库中高效、准实时地捕获变更数据(Change Data Capture, CDC)。通用的CDC工具如Debezium,虽然功能强大,但在我们的场景中遇到了性能瓶颈。源端数据库的日志格式非标准,且QPS极高,基于JVM的CDC工具在序列化/反序列化和GC上产生了不可忽视的延迟。因此,我们决定自研一个轻量级的C++ CDC代理。

C++ CDC代理:性能的基石

这个C++代理直接部署在数据库服务器旁,伪装成一个从库,实时解析物理复制日志(Binary Log)。选择C++的核心原因是追求极致的性能和资源控制,避免GC pause,并通过手动内存管理和零拷贝技术最大化吞吐量。

以下是该CDC代理核心逻辑的简化实现。它负责解析一个假设的、紧凑的二进制交易日志格式,并将其转换为Protobuf格式,最终发送到Kafka。

include/log_parser.h

#pragma once

#include <iostream>
#include <vector>
#include <cstdint>
#include <string>
#include <optional>
#include "trade.pb.h" // Protobuf generated header

// A hypothetical binary log record structure
// [4 bytes: record_len] [1 byte: record_type] [payload...]
// Payload for TRADE_RECORD_TYPE (0x01):
// [8 bytes: trade_id] [8 bytes: user_id] [8 bytes: timestamp_ms] 
// [1 byte: instrument_len] [instrument_len bytes: instrument_symbol]
// [8 bytes: price (double)] [4 bytes: quantity (int32)]
enum RecordType : uint8_t {
    UNKNOWN = 0x00,
    TRADE = 0x01,
    ORDER_STATUS = 0x02,
};

class LogParser {
public:
    LogParser() = default;
    ~LogParser() = default;

    // Parses a chunk of binary data and returns a vector of serialized protobuf messages.
    // Returns an empty optional on parsing failure.
    std::optional<std::vector<std::string>> parse(const std::vector<uint8_t>& buffer);

private:
    // Helper to safely read from buffer
    template<typename T>
    bool read_from_buffer(const std::vector<uint8_t>& buffer, size_t& offset, T& value);
};

src/log_parser.cpp

#include "log_parser.h"
#include <arpa/inet.h> // For ntohl, ntohll (if needed, here we assume same endianness)
#include <cstring>

// Custom ntohll for 64-bit integers if not available
uint64_t ntohll_custom(uint64_t val) {
    if (__BYTE_ORDER == __LITTLE_ENDIAN) {
        return __builtin_bswap64(val);
    }
    return val;
}

template<typename T>
bool LogParser::read_from_buffer(const std::vector<uint8_t>& buffer, size_t& offset, T& value) {
    if (offset + sizeof(T) > buffer.size()) {
        std::cerr << "Error: Buffer underflow while reading type " << typeid(T).name() << std::endl;
        return false;
    }
    memcpy(&value, buffer.data() + offset, sizeof(T));
    offset += sizeof(T);
    return true;
}

std::optional<std::vector<std::string>> LogParser::parse(const std::vector<uint8_t>& buffer) {
    std::vector<std::string> messages;
    size_t offset = 0;

    while (offset < buffer.size()) {
        uint32_t record_len;
        if (!read_from_buffer(buffer, offset, record_len)) return std::nullopt;
        // Assuming network byte order for length
        record_len = ntohl(record_len);
        
        if (offset + record_len > buffer.size()) {
            std::cerr << "Error: Incomplete record. Expected " << record_len << " bytes, but only " 
                      << (buffer.size() - offset) << " remain." << std::endl;
            // This indicates a partial read, should handle it by buffering
            return std::nullopt;
        }

        uint8_t record_type;
        if (!read_from_buffer(buffer, offset, record_type)) return std::nullopt;

        switch (static_cast<RecordType>(record_type)) {
            case RecordType::TRADE: {
                trading::Trade trade_event;
                uint64_t trade_id, user_id, timestamp_ms;
                double price;
                int32_t quantity;

                if (!read_from_buffer(buffer, offset, trade_id) ||
                    !read_from_buffer(buffer, offset, user_id) ||
                    !read_from_buffer(buffer, offset, timestamp_ms)) {
                    return std::nullopt;
                }
                
                uint8_t symbol_len;
                if (!read_from_buffer(buffer, offset, symbol_len)) return std::nullopt;
                
                if (offset + symbol_len > buffer.size()) return std::nullopt;
                std::string symbol(reinterpret_cast<const char*>(buffer.data() + offset), symbol_len);
                offset += symbol_len;

                if (!read_from_buffer(buffer, offset, price) ||
                    !read_from_buffer(buffer, offset, quantity)) {
                    return std::nullopt;
                }

                trade_event.set_trade_id(ntohll_custom(trade_id));
                trade_event.set_user_id(ntohll_custom(user_id));
                trade_event.set_timestamp_ms(ntohll_custom(timestamp_ms));
                trade_event.set_instrument_symbol(symbol);
                trade_event.set_price(price);
                trade_event.set_quantity(ntohl(quantity));
                
                std::string serialized_message;
                if (!trade_event.SerializeToString(&serialized_message)) {
                    std::cerr << "Failed to serialize protobuf message." << std::endl;
                    return std::nullopt;
                }
                messages.push_back(std::move(serialized_message));
                break;
            }
            // Other record types can be handled here...
            default:
                // Skip unknown record types
                std::cout << "Warning: Skipping unknown record type 0x" << std::hex << (int)record_type << std::endl;
                offset += record_len - 1; // -1 because we already read the type byte
                break;
        }
    }
    return messages;
}

这个解析器没有使用任何动态库依赖,纯粹基于标准库和Protobuf生成的代码,保证了极高的可移植性和低部署开销。在生产中,我们会结合librdkafka C++客户端,将解析出的serialized_message高效地异步发送到Kafka集群。错误处理是关键:任何解析失败或网络问题都必须记录详尽的日志,并触发告警,同时实现持久化的缓冲队列以防Kafka短时不可用。

Spark Structured Streaming:双路分发器

数据进入Kafka后,下游由一个Spark Structured Streaming作业负责消费。这个作业是整个架构的核心枢纽,它将单一的数据源分发到HBase和Hudi两个目的地。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import com.google.protobuf.InvalidProtocolBufferException
import com.your_company.trading.Trade // Your Protobuf generated class

object DataDispatcher {

  // HBase sink logic
  def writeToHBase(df: DataFrame, tableName: String): Unit = {
    df.foreachPartition { partition =>
      if (partition.nonEmpty) {
        val hbaseConf = HBaseConfiguration.create()
        // Production-grade configuration should come from a secure source
        hbaseConf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
        hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
        
        val connection = ConnectionFactory.createConnection(hbaseConf)
        val table = connection.getTable(TableName.valueOf(tableName))
        
        try {
          partition.foreach { row =>
            // Rowkey: reverse(userId) + (Long.MaxValue - timestamp) + tradeId
            val userId = row.getAs[Long]("userId")
            val timestamp = row.getAs[Long]("timestampMs")
            val tradeId = row.getAs[Long]("tradeId")
            
            val reversedUserId = userId.toString.reverse
            val invertedTs = Long.MaxValue - timestamp
            
            val rowkey = Bytes.toBytes(s"${reversedUserId}_${invertedTs}_${tradeId}")
            val put = new Put(rowkey)

            // Adding columns to 'cf1' column family
            put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("symbol"), Bytes.toBytes(row.getAs[String]("instrumentSymbol")))
            put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("price"), Bytes.toBytes(row.getAs[Double]("price")))
            put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("quantity"), Bytes.toBytes(row.getAs[Int]("quantity")))
            
            table.put(put)
          }
        } finally {
          table.close()
          connection.close()
        }
      }
    }
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("HudiHBaseDispatcher")
      // Add necessary Spark and Hudi configurations
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    import spark.implicits._

    val kafkaBootstrapServers = "kafka1:9092,kafka2:9092"
    val kafkaTopic = "raw_log_topic"
    val hudiBasePath = "s3a://data-lake/hudi/trades"
    val hbaseTableName = "trades_hot"
    
    // UDF to parse Protobuf byte array
    val parseTrade = udf((data: Array[Byte]) => {
      try {
        val trade = Trade.parseFrom(data)
        Some((
          trade.getTradeId, 
          trade.getUserId, 
          trade.getTimestampMs, 
          trade.getInstrumentSymbol,
          trade.getPrice,
          trade.getQuantity,
          // Add partition path column for Hudi
          new java.sql.Date(trade.getTimestampMs).toLocalDate.toString
        ))
      } catch {
        case e: InvalidProtocolBufferException =>
          // In production, send bad records to a dead-letter queue
          None 
      }
    })

    val kafkaStreamDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServers)
      .option("subscribe", kafkaTopic)
      .option("failOnDataLoss", "false") // Important for production
      .load()

    val tradesDF = kafkaStreamDF
      .select(parseTrade($"value").as("trade_data"))
      .filter($"trade_data".isNotNull)
      .select(
        $"trade_data._1".as("tradeId"),
        $"trade_data._2".as("userId"),
        $"trade_data._3".as("timestampMs"),
        $"trade_data._4".as("instrumentSymbol"),
        $"trade_data._5".as("price"),
        $"trade_data._6".as("quantity"),
        $"trade_data._7".as("partition_path") // YYYY-MM-DD
      )

    // The core logic: write to two sinks in one stream query
    val streamQuery = tradesDF.writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        println(s"Processing batch $batchId")
        
        // Cache the batch DF as it's used twice
        batchDF.persist()

        // --- Hudi Sink (Warm/Cold Layer) ---
        val hudiOptions = Map(
          DataSourceWriteOptions.RECORDKEY_FIELD.key -> "tradeId",
          DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestampMs", // Use timestamp for latest record
          DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_path",
          KeyGeneratorOptions.PARTITIONPATH_URL_ENCODE.key -> "true",
          DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, // Merge-on-Read for faster ingestion
          HoodieWriteConfig.TABLE_NAME.key -> "trades_warm",
          DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
          DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL + ".compact.strategy" -> "org.apache.hudi.io.compact.strategy.UnboundedCompactionStrategy"
        )
        
        batchDF.write
          .format("hudi")
          .options(hudiOptions)
          .mode("append")
          .save(hudiBasePath)

        // --- HBase Sink (Hot Layer) ---
        // A real project would implement more robust connection pooling and error handling
        writeToHBase(batchDF, hbaseTableName)
        
        // Unpersist the cached DataFrame
        batchDF.unpersist()
      }
      .option("checkpointLocation", "/tmp/spark/checkpoints/data_dispatcher")
      .start()

    streamQuery.awaitTermination()
  }
}

foreachBatch是这里的关键。它为我们提供了一个强大的模式:在每个微批次(micro-batch)内,我们可以对DataFrame执行任意操作。我们将数据persist()到内存,然后分别触发对Hudi和HBase的写操作。这里的坑在于事务性:这个双写操作并非原子性的。如果Hudi写入成功但HBase写入失败,就会导致数据不一致。在我们的场景中,业务可以容忍分钟级别的最终一致性。我们通过引入一个独立的对账作业来解决这个问题,该作业定期扫描Hudi和HBase,找出差异并进行修复。对于一致性要求更高的场景,需要考虑更复杂的事务控制机制。

架构的扩展性与局限性

这个架构成功地解决了最初的问题,将读写负载分离到了最适合它们的系统上。交易运营平台通过Java HBase客户端直连HBase,获得了稳定的低延迟点查能力。分析平台则通过Trino连接到Hudi表,能够自由地执行大规模SQL分析,而不会影响线上交易。

然而,这个方案并非银弹。它的主要局限性在于运维复杂度的增加。我们现在需要维护两个分布式存储系统(HBase, Hudi/S3)、一个消息队列(Kafka)和一个计算集群(Spark),以及一个自研的C++组件。这对团队的技能要求和监控体系的建设提出了很高的挑战。数据一致性问题,如前所述,是一个需要持续关注和优化的点。

未来的一个演进方向是探索查询联邦。通过部署一个像Presto或Trino这样的查询引擎,并为其配置HBase连接器和Hive Metastore连接器(用于Hudi),理论上可以实现通过单一SQL端点透明地查询HBase中的热数据和Hudi中的冷数据。但这会引入新的性能挑战,特别是跨系统JOIN的优化,需要对联邦查询引擎有非常深入的理解和调优经验。


  目录