Zacard's Notes

分布式追踪系统x-apm开发经历分享

感谢@dingdang对开发的强力支持。

背景

在公司微服务化转型后,系统被拆分为多个由不同开发团队维护的分布式微服务。随着业务的发展,分布式服务越来越多,其关系越来越复杂。我们亟需一个工具能够梳理内部服务之间的关系,感知上下游服务的形态,快速定位冗长服务调用间的问题。

需要解决的核心问题:

  • 一次请求的完整调用链
  • 一次请求出现异常,能快速定位那个节点出现问题
  • 一次请求经历的服务的详细信息
  • 一次请求的瓶颈节点,并对节点资源分配提供数据支持
  • 一次请求中出现异常、超时自动报警

简介

为了解决以上问题,自研了一个分布式追踪系统X-APM(APM = Application Performance Management,应用性能管理).其核心就是调用链:通过一个全局的ID将分布在各个服务节点上的同一次请求串联起来,还原原有的调用关系、追踪系统问题、分析调用数据、统计系统指标。
在阅读google 《dapper》的思想,open traceing的理念后,实现参考了skywalking

设计目标

  • 低损耗: 对微服务的影响越低越好,包括cpu、内存、tps、响应时间等指标
  • 低侵入: 作为非业务组件,应当尽可能少侵入或者无侵入其他业务系统,对于使用方透明,减少开发人员的负担
  • 低延迟: 从数据的产生和收集,到数据计算和处理,再到最终展现,都要求尽可能快
  • 可配置: 可以通过配置决定所收集数据的范围和粒度
  • 可视化: 能够以图表方式展示调用链路等信息
  • 可预警: 分析跟踪到的调用链数据,当出现调用耗时时间过长、调用异常、重复调用等情况,及时警告
  • 决策支持: 这些数据最好能在决策支持层面发挥作用

架构

各业务系统使用x-apm的agent探针来处理调用链路信息的采集,当采集的信息达到一定量或每隔一段特定的时间时,agent将会把这些信息传送到Flume,通过flume的channel缓冲区流向到指定的sink。目前项目一期已支持将数据保存至ES,并通过UI项目读取ES数据,方便大家在页面上以图表的形式查看各业务系统的调用链路信息。

领域模型

对于理解分布式追踪系统的领域模型,我强烈建议先阅读open traceing,能够理解在分布式追踪系统中,Trace、Span、Tag等基本概念。

trace

一个trace代表一个潜在的,分布式的,存在并行数据或并行执行轨迹(潜在的分布式、并行)的系统。一个trace可以认为是多个span的有向无环图(DAG)。

span

一个span代表系统中具有开始时间和执行时长的逻辑运行单元。span之间通过嵌套或者顺序排列建立逻辑因果关系。

trace与span的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
一个tracer过程中,各span的关系
[Span A] ←←←(the root span)
|
+------+------+
| |
[Span B] [Span C] ←←←(Span C 是 Span A 的孩子节点, ChildOf)
| |
[Span D] +---+-------+
| |
[Span E] [Span F] >>> [Span G] >>> [Span H]
(Span G 在 Span F 后被调用, FollowsFrom)
上述tracer与span的时间轴关系
––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–> time
[Span A···················································]
[Span B··············································]
[Span D··········································]
[Span C········································]
[Span E·······] [Span F··] [Span G··] [Span H··]

核心数据结构

简单解释:

  • TraceSegment: 一次分布式Trace中可能会经历多个微服务系统,每个系统都是一个TraceSegment(追踪链的一部分),所有经过的微服务TraceSegment组合在一起,才是一次完整的Trace
  • Span: 正如open traceing中的概念,记录调用链中某个功能、某个组件的详细信息
  • TraceSegmentRef: 各个微服务系统的TraceSegment之间的关系(调用顺序)
  • DistributedTraceIds: 记录各个微服务系统的追踪id
  • DistributedTraceId: 分布式追踪id,全局唯一

原理

这里只讲agent端的原理。因为后面的数据收集使用flume,数据存储使用elasticsearch,实时流分析使用spark,就不细说了

数据埋点

为了无侵入,数据埋点采用javaagent配合bytebuddy完成。

  • javaagent: 主要利用instrument agent,其可以在加载class文件之前做拦截,对字节码做修改
  • bytebuddy: 修改字节码

对所有中间件、框架、类库、本地方法的数据埋点,都采用Plugin的形式实现,做到自动化、可插拔、可配置、低耦合。

延伸:javaagent的what、how、why

数据收集

由于埋点是动态织入到字节码的功能增强,如何快速高效的收集而不阻塞正常业务系统的流程是个关键。

具体设计考虑:

  1. 如何收集数据: 收集的数据在发送前怎么存,收集的速度大于发送的速度的时候,又该怎么办?
  2. 如何发送数据: 异步发送数据到flume,但又不能开启太多线程,以免抢占业务系统线程资源,这个线程数设定多大合适呢?
  3. 如何保证数据完整性: 高并发,大数据量下,如何保证数据的完整性?

解决方案

方案注意点:

  • 埋点数据无需落盘,直接发送到flume.在业务系统节点落盘追踪数据毫无意义,徒耗节点系统资源
  • 埋点数据不能一收到就发送,得批量发送

最终实现:

以数组为基础,实现一个环形结构的无锁数据缓冲区,批量发送埋点数据。数据结构如下:

  • buffer: 缓存区
  • bufferSize: 缓存区大小
  • everySendSize: 每次批量发送的数量
  • currentIndex: 缓存区当前位置索引

实现细节:

  1. 对于追踪系统而言,当收集速度大于发送速度,应该果断丢弃数据。因为埋点的数据是可重复的且不能影响业务系统,而且本身很多追踪系统是有采样率的,即不是每次的埋点数据都会收集。当丢弃数据达到一定程度的时候会输出一条警告日志,spark中的日志实时分析会对该日志预警
  2. 对于保证数据的完整性,我们采用宁可丢弃数据,也不能有重复数据。因为重复的数据将对后续调用链分析,预警,数据统计,流量分析造成比较大的影响
  3. 如何实现环形的缓存区,只需要让缓存区的位置索引currentIndex始终在1~bufferSize内原子变动即可,请看以下示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    // 利用cas操作,获取buffer位置索引
    while (true) {
    int current = currentIndex.get();
    int next = current >= bufferSize ? 1 : current + 1;
    if (currentIndex.compareAndSet(current, next)) {
    return next;
    }
    }
  4. 如何实现无锁,核心还是cas操作,请看以下代码示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 只有数组中index位置上为null的才设置新值
    if (!casObject(index, null, traceSegment)) {
    return;
    }
    // 判断是否需要发送数据
    // a % b == a & (b - 1)
    if ((nextIndex & (everySendSize - 1)) != 0) {
    return;
    }
    // 收集缓冲区数据发送
    collectBufferToSend(index, everySendSize);

ps: 这里有个强制约定,bufferSize和everySendSize的大小必须为2^n.因为计算是否需要批量发送的时候要用到%操作,%操作是个相对比较昂贵的操作。所以这里我们有个取巧,当计算a%b,且b=2^n的时候,a%b==a&(b-1),位移操作将高效的多。

方案benchmark

为了检验环形数据结构的性能,我们专门写了benchmark,使用openjdk.jmh测试,benchmark代码片段如下:

1
2
3
4
5
6
7
8
9
10
@Threads(value = Threads.MAX)
@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 1, time = 10, timeUnit = TimeUnit.MILLISECONDS)
@Fork(value = 1)
@Benchmark
@BenchmarkMode(value = Mode.Throughput)
@OutputTimeUnit(value = TimeUnit.MILLISECONDS)
public void benchMarkCollector() {
collector(mockData.getAndIncrement());
}

可用性

  1. flume挂了怎么办?
  2. agent出异常了怎么办?
  3. agent如何升级?

解决方案

  • flume是个集群,单点故障可以无视,有重试机制。当全部flume都挂了的时候,agent端的数据将全部丢失,并且agent端将进入每隔30s的无限重连flume操作
  • agent的异常被全局catch,输出到日志,后面spark日志实时流分析预警。
  • agent的升级比较简单,由于项目使用docker部署,所以升级的时候,只要替换docker仓库中的基础镜像中的agent包即可。

性能测试

cpu额外消耗保持在7%以内,内存基本无变化

tps和响应时间基本无变化

坚持原创技术分享,您的支持将鼓励我继续创作!

热评文章