我们面临一个棘手的现实:数百个边缘节点上的应用各自将状态写入本地的SQLite文件。这些数据需要近实时地汇集到一个中央数据存储进行分析。直接的文件同步或定期的批量导出都无法满足时效性要求,并且在网络不稳定的环境下极其脆弱。我们需要的是一套针对分布式SQLite文件的变更数据捕获(Change Data Capture, CDC)机制。
传统的CDC方案依赖于数据库的事务日志(如MySQL的binlog或Postgres的WAL),但SQLite作为一个嵌入式文件数据库,并没有一个标准的、可供外部消费的日志流。这意味着我们必须从零开始构建捕获代理。
初步构想是利用SQLite的触发器(Triggers)。当INSERT, UPDATE, DELETE操作发生时,触发器可以将变更的行数据写入一个专门的outbox表。然后,一个独立的代理进程会轮询这个outbox表,将变更事件发送到消息队列。
这个方案的技术栈选型如下:
- 代码组织: Monorepo。捕获代理、消息消费者、共享类型定义和部署脚本都放在一个仓库中,使用
pnpm的workspace进行管理。这对于维护这样一个多组件系统至关重要,能保证接口定义的一致性和原子化的提交。 - 消息中间件: RabbitMQ。它提供了可靠的消息投递(ACK机制)、持久化和灵活的路由,足以应对边缘网络不稳定的情况。
- 目标数据库: Couchbase。它是一个分布式的NoSQL文档数据库,其灵活的JSON格式非常适合存储来自不同版本SQLite的、结构可能略有差异的CDC事件。其内置的缓存层也能提供高性能的读取。
- Schema与配置版本控制: DVC (Data Version Control)。这是整个方案的关键。边缘应用的SQLite schema会随着版本迭代而演进。我们不能硬编码处理逻辑。DVC可以像Git一样管理数据和配置文件,但它更适合大文件和结构化数据。我们将用它来版本化每个SQLite版本的schema定义(
.sql文件)以及关联的转换逻辑,确保整个数据管道的演进是可追溯和可复现的。
Monorepo项目结构搭建
整个项目的根目录结构清晰地反映了各个组件的职责。
# directory-structure
cdc-pipeline/
├── pnpm-workspace.yaml
├── package.json
├── packages/
│ ├── sqlite-capture-agent/ # 运行在边缘节点的捕获代理
│ │ ├── src/
│ │ ├── package.json
│ │ └── tsconfig.json
│ ├── mq-couchbase-consumer/ # 消费消息并写入Couchbase的服务
│ │ ├── src/
│ │ ├── package.json
│ │ └── tsconfig.json
│ └── shared-types/ # 共享的TypeScript类型定义
│ ├── src/
│ ├── package.json
│ └── tsconfig.json
└── schemas/ # DVC管理的核心区域
├── v1/
│ ├── user_schema.sql
│ └── user_schema.sql.dvc # DVC元数据文件
└── v2/
├── user_schema.sql
└── user_schema.sql.dvc
pnpm-workspace.yaml 文件内容很简单,它告诉pnpm去哪里寻找子包:
# pnpm-workspace.yaml
packages:
- 'packages/*'
这样,在根目录运行 pnpm install 就会安装所有子包的依赖。
核心组件一:SQLite变更捕获代理
这是最关键的自研部分。代理需要完成三件事:
- 初始化目标SQLite数据库,确保
outbox表和相关触发器存在。 - 轮询
outbox表,读取未处理的变更。 - 将变更事件序列化后,可靠地发送到RabbitMQ。
数据库初始化与触发器
假设我们的业务表是users。我们需要为它的INSERT, UPDATE, DELETE操作都创建触发器。
// packages/sqlite-capture-agent/src/db-setup.ts
import Database from 'better-sqlite3';
const CREATE_USERS_TABLE = `
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
email TEXT,
version INTEGER DEFAULT 1,
last_updated DATETIME DEFAULT CURRENT_TIMESTAMP
);
`;
const CREATE_OUTBOX_TABLE = `
CREATE TABLE IF NOT EXISTS cdc_outbox (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id TEXT NOT NULL UNIQUE,
table_name TEXT NOT NULL,
operation TEXT NOT NULL,
payload_before TEXT, -- JSON format
payload_after TEXT, -- JSON format
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT 0
);
`;
const CREATE_USER_INSERT_TRIGGER = `
CREATE TRIGGER IF NOT EXISTS t_users_insert
AFTER INSERT ON users
BEGIN
INSERT INTO cdc_outbox (event_id, table_name, operation, payload_after)
VALUES (
lower(hex(randomblob(16))),
'users',
'INSERT',
json_object('id', NEW.id, 'name', NEW.name, 'email', NEW.email, 'version', NEW.version)
);
END;
`;
// UPDATE 和 DELETE 的触发器类似,分别记录 NEW 和 OLD 的值
// ... 省略UPDATE和DELETE触发器代码 ...
export function initializeDatabase(dbPath: string): Database.Database {
const db = new Database(dbPath, { verbose: console.log });
db.pragma('journal_mode = WAL'); // WAL模式对并发读写更友好
// 使用事务确保原子性
const setupTx = db.transaction(() => {
db.exec(CREATE_USERS_TABLE);
db.exec(CREATE_OUTBOX_TABLE);
db.exec(CREATE_USER_INSERT_TRIGGER);
// ... 执行其他触发器创建语句 ...
});
setupTx();
console.log('Database initialized with CDC triggers.');
return db;
}
这里的坑在于,触发器内的json_object函数需要SQLite 3.9.0以上版本。在资源受限的边缘设备上,需要确认SQLite版本。
代理轮询与发送逻辑
代理的核心循环需要健壮,能处理网络中断和程序崩溃。我们将通过processed字段和数据库事务来保证事件至少被处理一次。
// packages/sqlite-capture-agent/src/agent.ts
import Database from 'better-sqlite3';
import amqplib from 'amqplib';
import { v4 as uuidv4 } from 'uuid';
const BATCH_SIZE = 100;
const POLLING_INTERVAL_MS = 2000;
interface OutboxEvent {
id: number;
event_id: string;
table_name: string;
operation: string;
payload_before: string | null;
payload_after: string | null;
}
export class CaptureAgent {
private db: Database.Database;
private mqChannel: amqplib.Channel | null = null;
private timer: NodeJS.Timeout | null = null;
private isProcessing = false;
constructor(private dbPath: string, private mqUrl: string, private queueName: string) {
// 实际项目中,数据库连接的创建应包含重试逻辑
this.db = new Database(dbPath);
}
public async start() {
await this.connectMq();
this.timer = setInterval(() => this.pollAndProcess(), POLLING_INTERVAL_MS);
console.log('Capture agent started.');
}
private async connectMq() {
try {
const connection = await amqplib.connect(this.mqUrl);
this.mqChannel = await connection.createChannel();
await this.mqChannel.assertQueue(this.queueName, { durable: true });
console.log('Connected to RabbitMQ.');
} catch (error) {
console.error('Failed to connect to RabbitMQ, retrying in 5s...', error);
setTimeout(() => this.connectMq(), 5000);
}
}
private async pollAndProcess() {
if (this.isProcessing || !this.mqChannel) {
return;
}
this.isProcessing = true;
// 查询并锁定一批未处理的事件
const selectStmt = this.db.prepare(
`SELECT * FROM cdc_outbox WHERE processed = 0 ORDER BY id ASC LIMIT ?`
);
const updateStmt = this.db.prepare(
`UPDATE cdc_outbox SET processed = 1 WHERE id = ?`
);
const processTx = this.db.transaction((events: OutboxEvent[]) => {
for (const event of events) {
const message = {
// 在消息中加入元数据,例如schema版本
metadata: {
eventId: event.event_id,
sourceNodeId: 'edge-node-001', // 从配置中读取
schemaVersion: 'v1', // 关键!后续会用DVC管理
timestamp: new Date().toISOString()
},
tableName: event.table_name,
operation: event.operation,
before: event.payload_before ? JSON.parse(event.payload_before) : null,
after: event.payload_after ? JSON.parse(event.payload_after) : null,
};
const successfullySent = this.mqChannel!.sendToQueue(
this.queueName,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
);
if (!successfullySent) {
// RabbitMQ的Node.js客户端会在缓冲区满时返回false
// 这是一种背压机制。我们应该停止处理并等待 drain 事件
// 为简化示例,这里我们直接抛出错误回滚事务
console.warn('RabbitMQ buffer is full. Pausing sending.');
throw new Error('MQ_BUFFER_FULL');
}
updateStmt.run(event.id);
}
});
try {
const eventsToProcess = selectStmt.all(BATCH_SIZE) as OutboxEvent[];
if (eventsToProcess.length > 0) {
console.log(`Processing ${eventsToProcess.length} events.`);
processTx(eventsToProcess);
}
} catch (error) {
console.error('Failed to process outbox batch. Will retry.', error);
// 事务回滚,事件的 processed 字段仍为 0,下次轮询会重试
} finally {
this.isProcessing = false;
}
}
}
这个实现的关键点是事务。我们将消息发送和更新processed字段包裹在同一个SQLite事务中。如果消息发送失败(例如,RabbitMQ缓冲区满),事务会回滚,processed字段不变,保证了数据不会丢失。
核心组件二:DVC管理Schema演进
随着业务发展,users表可能需要增加一个department字段。这就是Schema演进。如果不加管理,消费者服务很快就会因为遇到不认识的字段而崩溃。
我们使用DVC来追踪schemas/目录。
初始化DVC
在项目根目录运行dvc init。追踪第一个版本的Schema
schemas/v1/user_schema.sql文件内容:CREATE TABLE users ( id TEXT PRIMARY KEY, name TEXT NOT NULL, email TEXT, version INTEGER, last_updated DATETIME );使用DVC添加追踪:
dvc add schemas/v1/user_schema.sql这会生成一个
.dvc文件,并把user_schema.sql的哈希值记录下来。然后将.dvc文件和.gitignore中自动添加的/schemas/v1/user_schema.sql提交到Git。演进到第二版Schema
创建schemas/v2/user_schema.sql:CREATE TABLE users ( id TEXT PRIMARY KEY, name TEXT NOT NULL, email TEXT, department TEXT, -- 新增字段 version INTEGER, last_updated DATETIME );再次添加追踪:
dvc add schemas/v2/user_schema.sql现在,Git仓库的每次提交都精确对应一个完整的、可复现的Schema版本集合。CI/CD流水线可以利用
dvc pull拉取指定Git版本的Schema文件,用于测试或部署。
捕获代理发送的消息中包含schemaVersion字段。消费端就可以根据这个版本号来决定如何解析和处理消息。
核心组件三:消息消费者与写入Couchbase
消费者的职责是:
- 从RabbitMQ接收消息。
- 根据消息元数据中的
schemaVersion,选择合适的逻辑来解析和转换数据。 - 将转换后的数据写入Couchbase。
// packages/mq-couchbase-consumer/src/consumer.ts
import amqplib from 'amqplib';
import couchbase from 'couchbase';
// 实际项目中,这些配置应来自环境变量或配置文件
const MQ_URL = 'amqp://localhost';
const QUEUE_NAME = 'cdc_queue';
const COUCHBASE_CONN_STR = 'couchbase://localhost';
const COUCHBASE_BUCKET = 'cdc-events';
const COUCHBASE_USER = 'admin';
const COUCHBASE_PASS = 'password';
export class Consumer {
private mqChannel: amqplib.Channel | null = null;
private couchbaseCluster: couchbase.Cluster | null = null;
private couchbaseBucket: couchbase.Bucket | null = null;
async start() {
await this.connectAll();
this.listen();
}
private async connectAll() {
// 省略MQ和Couchbase的连接与错误处理代码...
this.couchbaseCluster = await couchbase.connect(COUCHBASE_CONN_STR, {
username: COUCHBASE_USER,
password: COUCHBASE_PASS,
});
this.couchbaseBucket = this.couchbaseCluster.bucket(COUCHBASE_BUCKET);
}
private listen() {
if (!this.mqChannel) return;
this.mqChannel.consume(QUEUE_NAME, async (msg) => {
if (!msg) {
return;
}
try {
const content = JSON.parse(msg.content.toString());
const { metadata, tableName, operation, after } = content;
// 根据Schema版本进行数据转换
const transformedData = this.transform(after, metadata.schemaVersion);
if (operation === 'INSERT' || operation === 'UPDATE') {
// 使用记录的ID作为Couchbase文档的ID,实现upsert
const docId = `${tableName}::${transformedData.id}`;
await this.couchbaseBucket!.defaultCollection().upsert(docId, transformedData);
} else if (operation === 'DELETE') {
const docId = `${tableName}::${content.before.id}`;
await this.couchbaseBucket!.defaultCollection().remove(docId);
}
// 消息处理成功,进行ACK
this.mqChannel!.ack(msg);
} catch (error) {
console.error('Error processing message, sending to NACK for requeue.', error);
// 处理失败,nack(msg, false, true)表示不处理,并让MQ重新投递
// 在生产环境中,需要有更复杂的死信队列(DLQ)机制
this.mqChannel!.nack(msg, false, true);
}
});
}
// 这里的转换逻辑是核心
private transform(payload: any, schemaVersion: string): any {
if (!payload) return null;
// 单元测试应覆盖所有版本的转换逻辑
switch (schemaVersion) {
case 'v1':
// v1版本的数据已经是我们想要的格式
return {
id: payload.id,
name: payload.name,
email: payload.email,
data_schema_version: 'v1'
};
case 'v2':
// v2版本多了department字段
return {
id: payload.id,
name: payload.name,
email: payload.email,
department: payload.department || 'UNKNOWN', // 提供默认值,增强健壮性
data_schema_version: 'v2'
};
default:
// 未知版本,抛出错误让消息进入重试或死信队列
throw new Error(`Unsupported schema version: ${schemaVersion}`);
}
}
}
架构图
整个系统的流程可以通过下面的图来概括:
graph TD
subgraph Monorepo
direction LR
DVC[DVC: Manages Schemas]
AgentCode[sqlite-capture-agent]
ConsumerCode[mq-couchbase-consumer]
SharedTypes[shared-types]
end
subgraph Edge Node
direction TB
App[Application] -- Writes --> SQLite
SQLite -- Triggers --> OutboxTable[Outbox Table]
Agent[Capture Agent] -- Polls --> OutboxTable
end
subgraph Central Services
direction TB
MQ[RabbitMQ]
Consumer[MQ Consumer]
CB[Couchbase Cluster]
end
DVC --> AgentCode & ConsumerCode
Agent -- Sends Event --> MQ
MQ -- Delivers Event --> Consumer
Consumer -- Writes Document --> CB
这个架构虽然组件多,但职责单一,每个部分都可以独立测试和部署。Monorepo和DVC的引入,解决了在演进过程中最头疼的协同和一致性问题。
方案的局限性与未来展望
此方案并非没有缺点。首先,基于触发器和轮询的捕获机制会给源SQLite数据库带来额外开销,并且存在毫秒到秒级的延迟。对于需要更低延迟的场景,可能需要探索直接解析SQLite的WAL日志文件,但这会带来巨大的复杂性。
其次,消费端的transform函数目前是硬编码的switch语句。当schema版本增多时,这里会变得臃肿。未来的一个优化方向是,将转换逻辑也作为脚本(例如JS或Lua脚本)和schema文件一起用DVC进行版本化管理。消费者在启动时加载所有版本的转换脚本,运行时动态调用,实现真正的配置驱动。
最后,错误处理机制虽然考虑了重试,但一个完善的系统还需要一个死信队列(Dead Letter Queue)和配套的监控告警,以便于开发者能及时发现并处理那些持续失败的“毒丸”消息。