利用 Axum 和 Tonic 构建基于分布式锁的高并发 SCSS 编译服务


一个日益庞大的微前端体系中,CI/CD 流水线的效率成了绕不开的瓶颈。数百个前端应用,每次提交都会触发独立的 npm installwebpack/vite 构建,其中 SCSS 编译是不可忽视的耗时环节。大量的重复编译不仅浪费了宝贵的计算资源,也严重拖慢了开发和部署的节奏。为了解决这个问题,我们决定构建一个中心化的、支持高并发的、具备缓存能力的 SCSS 按需编译服务。

这个服务的核心架构必须满足几个关键要求:

  1. 高性能: 核心编译逻辑必须快,这促使我们选择了 Rust。
  2. 双协议接口: 需要一个 gRPC 接口 (Tonic) 供 CI/CD 系统高效调用,以及一个 HTTP 接口 (Axum) 用于管理、监控和手动操作。
  3. 并发安全: 在高并发场景下,对于同一个源码版本(以 Git Commit Hash 标识)的编译请求,必须保证只有一个工作节点实际执行编译,其他节点则等待结果。这直接指向了分布式锁的需求。
  4. 可观测性: 作为一个核心基础设施,必须有强大的错误追踪和性能监控能力,Sentry 是不二之选。

最终的系统拓扑如下:

graph TD
    subgraph CI/CD Pipeline
        A[Git Push] --> B{Trigger Build};
        B --> C[gRPC Client];
    end

    subgraph On-Demand Compiler Service (Rust)
        C -- CompileRequest(commit_hash, scss_source) --> D[Tonic gRPC Server];
        D --> E{Acquire Distributed Lock};
        E -- Lock Key: compile:{commit_hash} --> F[Redis];
        E -- Success --> G[Compile SCSS];
        G -- grass crate --> H[Compiled CSS];
        H --> I[Upload to Artifact Storage];
        I --> J{Release Distributed Lock};
        J --> F;
        E -- Failure/Waiting --> K[Fetch from Artifact Storage];
        
        subgraph Observability
            D -- Sentry Event --> S[Sentry];
            G -- Sentry Event --> S;
            L -- Sentry Event --> S;
        end

        subgraph Management API
            M[Admin/Dev] -- HTTP Request --> L[Axum HTTP Server];
            L -- Get Status / Health Check --> D;
        end
    end

    subgraph Shared Infrastructure
        F;
        ArtifactStorage[Artifact Storage (e.g., S3)];
        I --> ArtifactStorage;
        K --> ArtifactStorage;
    end

项目依赖与配置

在真实项目中,配置管理是第一步。我们不会将配置硬编码在代码里。

Cargo.toml 中包含了所有核心组件:

[package]
name = "dist-compiler"
version = "0.1.0"
edition = "2021"

[dependencies]
# Web & gRPC
axum = "0.6"
tonic = "0.9"
prost = "0.11"
tokio = { version = "1", features = ["full"] }

# Data & Config
serde = { version = "1.0", features = ["derive"] }
config = "0.13"
lazy_static = "1.4"

# SCSS Compilation
grass = "0.13"

# Distributed Lock & State
redis = { version = "0.23", features = ["tokio-comp"] }

# Observability
sentry = "0.31"
sentry-tracing = "0.31"
tracing = "0.1"
tracing-subscriber = "0.3"

# Utilities
sha2 = "0.10"
hex = "0.4"
anyhow = "1.0"
thiserror = "1.0"

相应的,我们需要一个配置文件 config/default.toml 来管理服务端口、Redis 连接信息和 Sentry DSN。

[server]
http_port = 8080
grpc_port = 50051

[redis]
url = "redis://127.0.0.1/"

[sentry]
dsn = "YOUR_SENTRY_DSN_HERE"
environment = "development"
release = "[email protected]"

通过 config crate 加载这些配置,并用 lazy_static 将其设为全局可访问,是生产环境中常见的做法。

核心组件实现:分布式锁

分布式锁是防止“惊群效应”的关键。当多个 CI 任务同时请求编译同一个 commit hash 的代码时,如果没有锁,所有节点都会开始重复且昂贵的编译工作。

我们的实现基于 Redis 的 SET key value NX PX milliseconds 原子操作。NX 确保只有在 key 不存在时才设置,PX 为其设置一个过期时间,防止因服务崩溃导致死锁。

src/distributed_lock.rs:

use redis::AsyncCommands;
use std::time::Duration;
use anyhow::{Result, Context};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum LockError {
    #[error("Failed to acquire lock for key: {0}")]
    AcquisitionFailed(String),
    #[error("Redis command failed: {0}")]
    RedisError(#[from] redis::RedisError),
}

/// 一个简单的基于 Redis 的分布式锁实现
pub struct RedisLock {
    key: String,
    // 锁的唯一标识,防止误释放其他进程的锁
    token: String, 
    ttl: Duration,
    client: redis::Client,
}

impl RedisLock {
    pub fn new(client: redis::Client, key: impl Into<String>, ttl: Duration) -> Self {
        Self {
            key: key.into(),
            token: uuid::Uuid::new_v4().to_string(),
            ttl,
            client,
        }
    }

    /// 尝试获取锁。
    /// 这是一个原子操作。
    ///
    /// # Returns
    /// - `Ok(true)`: 成功获取锁
    /// - `Ok(false)`: 锁已被其他进程持有
    /// - `Err(LockError)`: Redis 操作出错
    pub async fn acquire(&self) -> Result<bool, LockError> {
        let mut conn = self.client.get_async_connection().await
            .context("Failed to get Redis connection for acquiring lock")?;
            
        let result: Option<String> = conn.set_options(
            &self.key,
            &self.token,
            redis::SetOptions::default()
                .nx() // NX: Only set if not exists
                .with_expiration(redis::Expiration::PX(self.ttl.as_millis() as u64)),
        ).await?;

        Ok(result.is_some())
    }

    /// 释放锁。
    /// 使用 Lua 脚本确保原子性:只有当 key 存在且 value 与 token 匹配时才删除。
    /// 这是为了防止一个进程释放了另一个进程因延迟而持有的同一个锁。
    pub async fn release(&self) -> Result<(), LockError> {
        let mut conn = self.client.get_async_connection().await
            .context("Failed to get Redis connection for releasing lock")?;
        
        let script = redis::Script::new(r"
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
        ");

        script
            .key(&self.key)
            .arg(&self.token)
            .invoke_async(&mut conn)
            .await?;
            
        Ok(())
    }
}

这里的坑在于 release 操作。一个常见的错误是直接 GET key 然后 DEL key,这不是原子操作。在 GETDEL 之间,锁可能已经过期并被另一个进程获取,此时错误的 DEL 会释放掉不属于自己的锁。使用 Lua 脚本是解决这个问题的标准模式。

gRPC 服务层 (Tonic)

这是 CI 系统直接交互的入口。我们定义一个简单的 .proto 文件。

proto/compiler.proto:

syntax = "proto3";

package compiler;

service Compiler {
  rpc Compile(CompileRequest) returns (CompileResponse);
}

message CompileRequest {
  // 通常是 Git Commit Hash,用于缓存和锁定
  string version_hash = 1;
  string source_content = 2;
}

message CompileResponse {
  enum Status {
    COMPILED = 0;
    FETCHED_FROM_CACHE = 1;
    FAILED = 2;
  }
  Status status = 1;
  // 编译成功后,返回存储的 URL 或 CSS 内容
  string output = 2; 
  string error_message = 3;
}

Tonic 的服务实现将整合所有逻辑:锁、编译、Sentry 监控。

src/grpc_server.rs:

use tonic::{Request, Response, Status};
use crate::compiler::{compiler_server::Compiler, CompileRequest, CompileResponse};
use crate::distributed_lock::{RedisLock, LockError};
use std::time::Duration;
use anyhow::Context;

// 假设我们有一个S3客户端和Redis客户端的共享状态
pub struct AppState {
    pub redis_client: redis::Client,
    // pub s3_client: S3Client, // 生产环境中应是真实的 S3 客户端
}

pub struct CompilerService {
    pub state: std::sync::Arc<AppState>,
}

#[tonic::async_trait]
impl Compiler for CompilerService {
    async fn compile(
        &self,
        request: Request<CompileRequest>,
    ) -> Result<Response<CompileResponse>, Status> {
        // 使用 Sentry 的 span 来包裹整个编译流程,以便于追踪性能
        let span = sentry::start_transaction(sentry::TransactionContext::new(
            "gRPC Compile",
            "compile",
        ));

        let inner_request = request.into_inner();
        let version_hash = inner_request.version_hash.clone();
        
        // 模拟检查缓存
        // 在真实场景中,这里会查询 S3 或其他存储
        if check_cache_exists(&version_hash).await {
             return Ok(Response::new(CompileResponse {
                status: crate::compiler::compile_response::Status::FetchedFromCache as i32,
                output: format!("s3://artifacts/{}", version_hash),
                error_message: "".to_string(),
            }));
        }

        let lock_key = format!("compiler_lock:{}", version_hash);
        // 编译锁的 TTL 应该大于预期的最大编译时间
        let lock = RedisLock::new(self.state.redis_client.clone(), lock_key, Duration::from_secs(120));

        let acquired_span = span.start_child("lock.acquire", "Acquiring distributed lock");
        let acquired = lock.acquire().await.map_err(|e| {
            sentry::capture_error(&e);
            Status::internal("Failed to communicate with Redis for locking")
        })?;
        acquired_span.finish();

        if acquired {
            // 成功获取锁,我们是“天选之子”,负责执行编译
            let compile_span = span.start_child("scss.compile", "Compiling SCSS source");
            let compile_result = perform_compilation(&inner_request.source_content).await;
            compile_span.finish();
            
            let release_span = span.start_child("lock.release", "Releasing distributed lock");
            if let Err(e) = lock.release().await {
                // 释放锁失败是个严重问题,必须记录
                sentry::capture_message(&format!("FATAL: Failed to release lock for {}: {:?}", version_hash, e), sentry::Level::Error);
            }
            release_span.finish();

            match compile_result {
                Ok(css) => {
                    // 编译成功,上传到存储并返回结果
                    let upload_span = span.start_child("artifact.upload", "Uploading compiled CSS");
                    // 模拟上传
                    let artifact_url = format!("s3://artifacts/{}", version_hash);
                    upload_span.finish();

                    Ok(Response::new(CompileResponse {
                        status: crate::compiler::compile_response::Status::Compiled as i32,
                        output: artifact_url,
                        error_message: "".to_string(),
                    }))
                },
                Err(e) => {
                    // 编译失败,记录错误并返回
                    sentry::with_scope(|scope| {
                        scope.set_tag("version_hash", &version_hash);
                        sentry::capture_error(&e);
                    }, ||{});
                    Ok(Response::new(CompileResponse {
                        status: crate::compiler::compile_response::Status::Failed as i32,
                        output: "".to_string(),
                        error_message: e.to_string(),
                    }))
                }
            }
        } else {
            // 没有获取到锁,说明有其他节点正在编译。我们需要轮询等待结果。
            // 生产环境中,使用 Redis Pub/Sub 或类似机制会更高效。
            let wait_span = span.start_child("cache.wait", "Waiting for artifact to appear in cache");
            let artifact_url = wait_for_artifact(&version_hash, Duration::from_secs(120)).await;
            wait_span.finish();
            
            match artifact_url {
                Some(url) => Ok(Response::new(CompileResponse {
                    status: crate::compiler::compile_response::Status::FetchedFromCache as i32,
                    output: url,
                    error_message: "".to_string(),
                })),
                None => {
                    sentry::capture_message(&format!("Timeout waiting for artifact {}", version_hash), sentry::Level::Warning);
                    Err(Status::deadline_exceeded(format!("Timed out waiting for compilation of {}", version_hash)))
                }
            }
        }
    }
}

// 模拟函数
async fn perform_compilation(source: &str) -> anyhow::Result<String> {
    let opts = grass::Options::default();
    grass::from_string(source.to_string(), &opts).context("SCSS compilation failed")
}

async fn check_cache_exists(hash: &str) -> bool {
    // 模拟查询 S3
    false 
}

async fn wait_for_artifact(hash: &str, timeout: Duration) -> Option<String> {
    // 模拟轮询 S3
    let start = std::time::Instant::now();
    while start.elapsed() < timeout {
        // if artifact_exists_in_s3(hash).await {
        //     return Some(format!("s3://artifacts/{}", hash));
        // }
        tokio::time::sleep(Duration::from_secs(2)).await;
    }
    None
}

在这段代码中,Sentry 的作用被明确体现出来。我们不仅捕获了编译和 Redis 的硬错误,还通过创建 TransactionSpan 来监控每个阶段的耗时,如获取锁、编译、上传。当 CI 流水线变慢时,我们可以直接在 Sentry 的 Performance 面板看到是哪个环节出了问题。

HTTP 管理接口 (Axum)

与底层、高性能的 gRPC 接口不同,HTTP 接口主要服务于人。我们需要一个简单的健康检查端点。Axum 的集成非常直观。

src/http_server.rs:

use axum::{routing::get, Router};
use std::net::SocketAddr;
use std::sync::Arc;
use crate::grpc_server::AppState;

pub async fn run_http_server(state: Arc<AppState>, port: u16) {
    let app = Router::new()
        .route("/health", get(health_check))
        .with_state(state);

    let addr = SocketAddr::from(([0, 0, 0, 0], port));
    tracing::info!("HTTP server listening on {}", addr);
    
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn health_check(axum::extract::State(state): axum::extract::State<Arc<AppState>>) -> &'static str {
    // 简单的健康检查可以尝试 ping Redis
    let mut conn = state.redis_client.get_async_connection().await;
    match conn {
        Ok(mut c) => {
            let res: redis::RedisResult<String> = redis::cmd("PING").query_async(&mut c).await;
            if res.is_ok() {
                "OK"
            } else {
                sentry::capture_message("Health check failed: Redis PING error", sentry::Level::Error);
                "Error: Redis connection failed"
            }
        },
        Err(_) => {
            sentry::capture_message("Health check failed: Cannot get Redis connection", sentry::Level::Error);
            "Error: Redis connection failed"
        }
    }
}

整合与启动

main.rs 是所有组件的粘合剂。它负责初始化配置、Sentry、客户端连接池,然后同时启动 gRPC 和 HTTP 服务。

src/main.rs:

mod distributed_lock;
mod grpc_server;
mod http_server;

// build.rs 会生成这部分代码
pub mod compiler {
    tonic::include_proto!("compiler");
}

use grpc_server::{CompilerService, AppState};
use std::sync::Arc;
use anyhow::Result;

#[tokio::main]
async fn main() -> Result<()> {
    // 1. 初始化配置 (此处简化)
    let http_port = 8080;
    let grpc_port = 50051;
    let redis_url = "redis://127.0.0.1/";
    let sentry_dsn = "YOUR_SENTRY_DSN_HERE";

    // 2. 初始化 Sentry
    let _guard = sentry::init((sentry_dsn, sentry::ClientOptions {
        release: sentry::release_name!(),
        environment: Some("production".into()),
        ..Default::default()
    }));
    
    // 3. 初始化共享状态
    let redis_client = redis::Client::open(redis_url)?;
    let app_state = Arc::new(AppState { redis_client });

    // 4. 启动 gRPC 和 HTTP 服务器
    let grpc_server = {
        let state_clone = app_state.clone();
        tokio::spawn(async move {
            let addr = format!("[::1]:{}", grpc_port).parse().unwrap();
            let compiler_service = CompilerService { state: state_clone };
            
            tonic::transport::Server::builder()
                .add_service(compiler::compiler_server::CompilerServer::new(compiler_service))
                .serve(addr)
                .await
                .unwrap();
        })
    };

    let http_server = {
        let state_clone = app_state.clone();
        tokio::spawn(async move {
            http_server::run_http_server(state_clone, http_port).await;
        })
    };

    println!("Server running with HTTP on port {} and gRPC on port {}", http_port, grpc_port);
    
    // 等待两个服务器任务完成
    let _ = tokio::try_join!(grpc_server, http_server)?;

    Ok(())
}

这个架构虽然解决了核心问题,但它并非没有局限性。当前等待锁的节点采用的是简单的轮询策略,这在请求量巨大时会给缓存系统带来不必要的压力,一个更优的方案是使用 Redis 的 Pub/Sub 模式,让等待者订阅一个与 version_hash 相关的 channel,编译完成后由持有锁的节点发布一条消息,唤醒所有等待者。此外,我们实现的分布式锁没有考虑“锁续期”的问题,如果一个编译任务耗时超过了锁的 TTL,锁会被自动释放,可能导致另一个节点开始重复编译,生产级的分布式锁需要一个心跳机制来维持锁的持有状态。这些都是未来迭代中需要进一步优化的方向。


  目录