实现一个基于 Server-Sent Events 的 Zipkin 动态追踪流中继服务


在处理线上微服务故障时,分布式追踪系统是定位问题的关键。但传统的 Zipkin 使用模式是“事后分析”:请求完成,Trace 数据被收集、存储、索引,然后我们才能在 UI 上查询。这个流程存在一个无法忽视的延迟。当需要对特定用户或特定请求进行实时观测时,这种延迟会严重影响故障排查的效率。我们反复刷新页面,等待着那个关键的 Trace 出现,整个过程非常被动。

我们需要的不是一个数据仓库,而是一个实时的数据流。一个能够让我们像盯着 tail -f 日志一样,实时观察符合特定条件的 Trace 流的工具。这个需求驱动我们构建了一个轻量级的中间件:一个 Zipkin Trace Relay(追踪中继服务)。它的核心任务是拦截所有发送到 Zipkin Collector 的追踪数据,一方面无损地将它们转发给原始的 Collector,另一方面,根据客户端的实时订阅,通过 Server-Sent Events (SSE) 将符合过滤条件的 Trace 推送给开发者。

架构选型与决策

构建这个 Relay 服务,首先要确定几个关键的技术选点。

  1. 开发语言:Go
    对于这种网络密集型、需要高并发处理能力且自身逻辑简单的中间件,Go 是一个近乎完美的选项。它的 Goroutine 和 Channel 模型能极其优雅地处理并发的客户端连接与数据流转,同时编译后的二进制文件体积小、无依赖,部署极为方便。在真实项目中,一个稳定、低资源占用的组件至关重要。

  2. 实时通信协议:Server-Sent Events (SSE)
    提到实时推送,很多人会立刻想到 WebSocket。但在这个场景下,WebSocket 是一种过度设计。我们只需要从服务器到客户端的单向数据流,而 SSE 正是为此而生。它基于标准的 HTTP 协议,客户端实现简单(浏览器原生支持 EventSource API),支持自动重连,并且相比 WebSocket 协议握手更轻量。对于一个纯粹的数据推送场景,选择 SSE 是一个务实且高效的决策。

  3. 数据流模型
    Relay 内部必须维护一个状态,即当前所有 SSE 客户端的订阅关系。一个典型的订阅可能长这样:“我需要所有 tag 中包含 userId=12345 的 Trace”。当新的 Trace 数据进入 Relay 时,它需要遍历所有订阅,进行匹配,然后将数据分发给对应的客户端。这是一个典型的发布-订阅模型。

下面是整个系统的架构图:

graph TD
    subgraph "微服务集群"
        A[Service A]
        B[Service B]
        C[Service C]
    end

    subgraph "Trace Relay (Go Service)"
        Ingest(HTTP Ingestion /api/v2/spans)
        Hub(Subscription Hub)
        SSE_Endpoint(SSE Endpoint /stream)
        Forwarder(Async Forwarder)
    end

    subgraph "开发者"
        Client[Browser / Terminal]
    end

    subgraph "原生 Zipkin"
        Collector[Zipkin Collector]
        Storage[Storage: ES/MySQL]
        UI[Zipkin UI]
    end

    A -- Trace Spans --> Ingest
    B -- Trace Spans --> Ingest
    C -- Trace Spans --> Ingest

    Ingest -- All Spans --> Hub
    Ingest -- All Spans --> Forwarder

    Forwarder -- Unmodified Spans --> Collector

    Client -- HTTP GET /stream?tag=userId:123 --> SSE_Endpoint
    SSE_Endpoint -- Registers Client & Filter --> Hub
    Hub -- Matched Spans --> SSE_Endpoint
    SSE_Endpoint -- SSE Stream --> Client

    Collector --> Storage --> UI

这个架构的核心在于 Relay 的非阻塞性。它接收数据后,立即在两个独立的 Goroutine 中处理:一个负责异步转发给 Zipkin Collector,确保不影响主链路;另一个负责将其广播给内部的订阅中心(Hub)进行实时处理。

核心实现:一步步构建 Relay 服务

我们来用 Go 从零开始构建这个服务。整个项目可以放在一个 main.go 文件中,因为它足够小巧。

1. 定义数据结构

首先,我们需要定义 Zipkin V2 Span 的数据结构。我们不需要实现所有字段,只需要关心用于过滤的关键字段即可,比如 traceId, name, tags 等。

// main.go
package main

import (
	"bytes"
	"context"
	"encoding/json"
	"io"
	"log"
	"net/http"
	"strings"
	"sync"
	"time"
)

// Span 是 Zipkin v2 Span 模型的简化表示。
// 我们只定义了过滤和展示所必需的字段。
type Span struct {
	TraceID string            `json:"traceId"`
	ID      string            `json:"id"`
	Name    string            `json:"name,omitempty"`
	Tags    map[string]string `json:"tags,omitempty"`
}

// Client 代表一个通过 SSE 连接的客户端。
// 每个客户端都有一个独立的 channel 用于接收匹配的 trace 数据。
// filter 是客户端定义的过滤条件。
type Client struct {
	channel chan []byte
	filter  Filter
}

// Filter 定义了客户端的过滤规则。
// 目前只支持一个简单的 tag 键值对。
type Filter struct {
	TagKey   string
	TagValue string
}

// match 检查传入的 Span 是否满足该过滤器的条件。
// 这是一个核心的业务逻辑。
func (f *Filter) match(span *Span) bool {
	// 如果没有设置过滤器,则不匹配任何内容。
	// 避免客户端在没有过滤器的情况下收到所有 trace。
	if f.TagKey == "" {
		return false
	}
	if val, ok := span.Tags[f.TagKey]; ok && val == f.TagValue {
		return true
	}
	return false
}

这里的 Filter 结构非常简单,在真实项目中,这里可以扩展成一个更复杂的规则引擎。

2. 实现订阅中心 (Hub)

Hub 是整个并发模型的核心。它通过 channel 来安全地管理客户端的注册、注销和消息广播,避免了使用锁带来的复杂性。

// main.go (continued)

// Hub 维护了所有活跃的客户端和订阅。
// 它是整个发布-订阅模式的中心协调者。
type Hub struct {
	clients      map[*Client]bool
	register     chan *Client
	unregister   chan *Client
	broadcast    chan []byte // 从 ingestion endpoint 接收原始的 spans 数据
	mu           sync.RWMutex // 用于保护 clients map
	zipkinTarget string       // 真实的 Zipkin Collector 地址
	httpClient   *http.Client
}

func newHub(zipkinTarget string) *Hub {
	return &Hub{
		clients:      make(map[*Client]bool),
		register:     make(chan *Client),
		unregister:   make(chan *Client),
		broadcast:    make(chan []byte, 256), // 使用带缓冲的 channel 避免阻塞 ingestion
		zipkinTarget: zipkinTarget,
		httpClient: &http.Client{
			Timeout: 5 * time.Second, // 设置超时是生产级代码的必要条件
		},
	}
}

// run 是 Hub 的主循环。它必须在一个单独的 goroutine 中运行。
func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.mu.Lock()
			h.clients[client] = true
			h.mu.Unlock()
			log.Printf("Client registered with filter: %+v", client.filter)

		case client := <-h.unregister:
			h.mu.Lock()
			if _, ok := h.clients[client]; ok {
				delete(h.clients, client)
				close(client.channel)
			}
			h.mu.Unlock()
			log.Printf("Client unregistered with filter: %+v", client.filter)

		case message := <-h.broadcast:
			// 异步转发给真实的 Zipkin Collector
			// 这是一个 "fire and forget" 的操作,确保不阻塞 trace 的实时分发。
			go h.forwardToZipkin(message)

			// 将接收到的 spans JSON 数组解析为单个 Span 对象
			var spans []Span
			if err := json.Unmarshal(message, &spans); err != nil {
				log.Printf("Error unmarshalling spans: %v", err)
				continue
			}

			// 遍历所有订阅的客户端,进行过滤和分发
			h.mu.RLock()
			clients := make([]*Client, 0, len(h.clients))
			for c := range h.clients {
				clients = append(clients, c)
			}
			h.mu.RUnlock()

			for _, span := range spans {
				for _, client := range clients {
					if client.filter.match(&span) {
						// 重新序列化匹配的单个 span
						spanBytes, err := json.Marshal(span)
						if err != nil {
							log.Printf("Error marshalling matched span: %v", err)
							continue
						}
						
						// 非阻塞发送
						select {
						case client.channel <- spanBytes:
						default:
							log.Printf("Client channel is full, dropping message for filter: %+v", client.filter)
						}
					}
				}
			}
		}
	}
}

// forwardToZipkin 将原始的 trace 数据转发给配置的 Zipkin Collector。
func (h *Hub) forwardToZipkin(data []byte) {
	if h.zipkinTarget == "" {
		return // 如果没有配置目标地址,则不转发
	}
	req, err := http.NewRequest("POST", h.zipkinTarget, bytes.NewReader(data))
	if err != nil {
		log.Printf("Error creating forward request: %v", err)
		return
	}
	req.Header.Set("Content-Type", "application/json")

	resp, err := h.httpClient.Do(req)
	if err != nil {
		log.Printf("Error forwarding to zipkin: %v", err)
		return
	}
	defer resp.Body.Close()

	if resp.StatusCode >= 400 {
		log.Printf("Zipkin collector returned error status: %d", resp.StatusCode)
	}
}

这里的 run 方法是 Relay 的“心脏”。它使用一个 select 语句来监听三个不同的 channel,实现了客户端管理和消息分发的解耦。注意 forwardToZipkin 是在一个新的 goroutine 中调用的,这保证了向上游 Zipkin 的转发延迟或失败不会影响到实时 SSE 推送。

3. 实现 HTTP Endpoints

我们需要两个 HTTP 端点:

  1. /api/v2/spans: 用于接收来自微服务的 trace 数据。
  2. /stream: 用于客户端建立 SSE 连接和订阅。
// main.go (continued)

// handleSpans 接收来自 instrumented application 的 trace 数据。
func (h *Hub) handleSpans(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	body, err := io.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "Error reading request body", http.StatusInternalServerError)
		return
	}

	// 将数据推送到 hub 的广播 channel,非阻塞。
	select {
	case h.broadcast <- body:
	default:
		// 如果 broadcast channel 满了,我们选择丢弃。
		// 在生产环境中,这里应该有更完善的策略,例如记录 metric。
		log.Println("Broadcast channel is full, dropping incoming spans.")
	}

	// 立即返回 202 Accepted,表示我们已接收但尚未处理。
	// 这是标准的异步 API 设计模式。
	w.WriteHeader(http.StatusAccepted)
}

// handleStream 处理 SSE 连接请求。
func (h *Hub) handleStream(w http.ResponseWriter, r *http.Request) {
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
		return
	}

	// 设置 SSE 必需的 headers
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")

	// 从 query 参数解析 filter
	// 例如: /stream?tag=userId:123
	queryTag := r.URL.Query().Get("tag")
	parts := strings.SplitN(queryTag, ":", 2)
	filter := Filter{}
	if len(parts) == 2 {
		filter.TagKey = parts[0]
		filter.TagValue = parts[1]
	} else {
		http.Error(w, "Invalid filter format. Use ?tag=key:value", http.StatusBadRequest)
		return
	}

	client := &Client{
		channel: make(chan []byte, 64), // 为每个 client 设置一个缓冲 channel
		filter:  filter,
	}

	h.register <- client
	defer func() {
		h.unregister <- client
	}()
	
	ctx := r.Context()

	// 监听连接关闭事件
	go func() {
		<-ctx.Done()
		h.unregister <- client
	}()

	// 循环从 client channel 读取数据并发送给客户端
	for {
		select {
		case <-ctx.Done(): // 客户端断开连接
			return
		case message, ok := <-client.channel:
			if !ok { // channel 被关闭
				return
			}
			// 格式化为 SSE message
			// data: {"traceId":"...","id":"...","name":"...","tags":{...}}
			//
			//
			_, err := fmt.Fprintf(w, "data: %s\n\n", message)
			if err != nil {
				log.Printf("Error writing to client: %v", err)
				return
			}
			flusher.Flush() // 立即将数据发送到客户端
		}
	}
}

handleStream 中,正确处理客户端断开连接至关重要。我们通过 r.Context().Done() channel 来捕获这个事件,并及时从 Hub 中注销客户端,避免资源泄漏。http.Flusher 的调用确保了数据被立即推送到客户端,而不是在缓冲区中等待。

4. 组装并启动服务

最后,我们在 main 函数中把所有部分串联起来。

// main.go (continued)
import "fmt"
import "os"

func main() {
	// 从环境变量获取真实的 Zipkin Collector 地址
	// 这是一个好的实践,避免硬编码配置。
	zipkinTarget := os.Getenv("ZIPKIN_TARGET")
	if zipkinTarget == "" {
		log.Println("WARNING: ZIPKIN_TARGET environment variable not set. Traces will not be forwarded.")
	} else {
		log.Printf("Forwarding traces to: %s", zipkinTarget)
	}

	hub := newHub(zipkinTarget)
	go hub.run()

	http.HandleFunc("/api/v2/spans", hub.handleSpans)
	http.HandleFunc("/stream", hub.handleStream)

	port := os.Getenv("PORT")
	if port == "" {
		port = "9412" // Relay 服务监听的端口
	}

	log.Printf("Starting Zipkin Trace Relay on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

现在,一个功能完整的 Zipkin Trace Relay 就完成了。你可以通过以下命令运行它:

# 设置上游 Zipkin Collector 的地址
export ZIPKIN_TARGET="http://localhost:9411/api/v2/spans"

# 运行 Relay 服务,它将在 9412 端口监听
go run main.go

你需要将你的微服务的 Zipkin Reporter 地址指向这个 Relay 服务(例如 http://<relay-host>:9412/api/v2/spans)。然后,你可以通过 curl 或浏览器来订阅实时 trace 流:

# 订阅所有 tag 中包含 userId: 'user-alpha' 的 trace
curl -N "http://localhost:9412/stream?tag=userId:user-alpha"

当一个符合条件的请求在微服务集群中发生时,你将立即在终端看到对应的 Span JSON 数据被推送过来。

遗留问题与未来迭代

这个实现虽然解决了核心问题,但在生产环境中部署之前,还有几个方面需要考虑和优化:

  1. 水平扩展性:当前的设计是一个单体服务,所有的客户端订阅信息都保存在内存中。这成为了一个单点瓶颈。如果需要部署多个 Relay 实例来处理高流量,就需要一个外部的协调机制,例如使用 Redis Pub/Sub 来广播 Spans 和管理订阅关系,而不是在进程内通过 channel。

  2. 过滤能力的增强:目前只支持单个 tag 的精确匹配。一个更强大的系统应该支持更复杂的查询语言,比如 serviceName=auth-service AND durationMs>100 或者 error=true。这需要引入一个简单的查询解析器。

  3. 安全性:当前的 /stream 端点是完全开放的,任何人都可以连接并订阅。在生产环境中,必须增加认证和授权机制,例如通过 OAuth2 token 来验证用户身份,并根据其权限限制可以订阅的 tag 或服务。

  4. 性能与资源控制:当 trace 流量非常巨大时,即使是 Go 服务也可能成为瓶颈。可以考虑引入采样机制,或者对每个客户端的连接数、订阅复杂度进行限制,防止恶意或错误的客户端消耗过多服务器资源。同时,对于广播给 Hub 的数据,目前是全量反序列化再进行匹配,对于超高吞吐量的场景,可以考虑更高效的匹配方式,比如在原始 JSON 字节流上进行操作,避免不必要的内存分配。


  目录