【SkyWalking】SkyWalking是如何实现跨进程传播链路数据?

文章目录

  • 一、简介
    • 1 为什么写这篇文章
    • 2 跨进程传播协议-简介
  • 二、协议
    • 1 Standard Header项
    • 2 Extension Header项
    • 3 Correlation Header项
  • 三、跨进程传播协议的源码分析
    • 1 OpenTracing规范
    • 2 通过dubbo插件分析跨进程数据传播
    • 3 分析跨进程传播协议的核心源码
  • 四、小结
  • 参考

一、简介

1 为什么写这篇文章

写这篇文章是为了让自己和大家梳理这些内容:

  1. SkyWalking的链路串联依赖跨进程数据传播,他的跨进程传播协议是怎样的?
  2. 如果我想借助SkyWalking的跨进程传播协议实现传递全链路业务数据(如全局userId等),该如何实现?

2 跨进程传播协议-简介

SkyWalking 跨进程传播协议是用于上下文的传播,之前经历过sw3协议、sw6协议,本文介绍是当前(2023年)最新的sw8协议。
该协议适用于不同语言、系统的探针之间传递上下文。

二、协议

Header项分为三类:

  • Standard Header项,Header名称:sw8
  • Extension Header项,Header名称:sw8-x
  • Correlation Header项,Header名称:sw8-correlation

协议的整体设计:
在这里插入图片描述

下面详细讲解协议的Header项:

1 Standard Header项

该Header项是上下文传播必须包含的。

  • Header名称:sw8.
  • Header值:由-分隔的8个字段组成。Header值的长度应该小于2KB。

Header值中具体包含以下8个字段:

  • 采样(Sample),0 或 1,0 表示上下文存在,但是可以(也很可能)被忽略而不做采样;1 表示这个trace需要采样并发送到后端。
  • 追踪ID(Trace Id),是 Base64 编码的字符串,其内容是由 . 分割的三个 long 类型值, 表示此trace的唯一标识。
  • 父追踪片段ID(Parent trace segment Id),是 Base64 编码的字符串,其内容是字符串且全局唯一。
  • 父跨度ID(Parent span Id),是一个从 0 开始的整数,这个跨度ID指向父追踪片段(segment)中的父跨度(span)。
  • 父服务名称(Parent service),是 Base64 编码的字符串,其内容是一个长度小于或等于50个UTF-8编码的字符串。
  • 父服务实例标识(Parent service instance),是 Base64 编码的字符串,其内容是一个长度小于或等于50个UTF-8编码的字符串。
  • 父服务的端点(Parent endpoint),是 Base64 编码的字符串,其内容是父追踪片段(segment)中第一个入口跨度(span)的操作名,由长度小于或等于50个UTF-8编码的字符组成。
  • 本请求的目标地址(Peer),是 Base64 编码的字符串,其内容是客户端用于访问目标服务的网络地址(不一定是 IP + 端口)。

示例值: 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT

2 Extension Header项

该Header项是可选的。扩展Header项是为高级特性设计的,它提供了部署在上游和下游服务中的探针之间的交互功能。

Header名称:sw8-x

Header值:由-分割,字段可扩展。

扩展Header值
当前值包括的字段:

追踪模式(Tracing Mode),空、0或1,默认为空或0。表示在这个上下文中生成的所有跨度(span)应该跳过分析。在默认情况下,这个应该在上下文中传播到服务端,除非它在跟踪过程中被更改。
客户端发送的时间戳:用于异步RPC,如MQ。一旦设置,消费端将计算发送和接收之间的延迟,并使用key transmission.latency自动在span中标记延迟。

示例值:1-1621344125000

3 Correlation Header项

该Header项是是可选的。并非所有语言的探针都支持,已知的是Java的探针是支持该协议。
该Header项用于跨进程传递用户自定义数据,例如userId、orgId。
这个协议跟OpenTracing 的 Baggage很类似,但是Correlation Header项相比,在默认设置下会更有更严格的限制,例如,只能存放3个字段,且有字段长度限制,这个是为了安全、性能等考虑。
数据格式:

Header名称:sw8-correlation

Header值:由,分割一对对key、value,每对key、value逗号分割,key、value的由Base64编码。

示例值:a2V5MQ==:dmFsdWUx,a2V5LTI=:dmFsdWUy

三、跨进程传播协议的源码分析

1 OpenTracing规范

SkyWalking是基于OpenTracing标准的追踪系统,参考吴晟老师翻译的OpenTracing规范的文章opentracing之Inject和Extract,OpenTracing定义了跨进程传播的几个要素:

SpanContext:SpanContext代表跨越进程边界,传递到下级span的状态。在SkyWalking中的实现类是org.apache.skywalking.apm.agent.core.context.TracingContext
Carrier:传递跨进程数据的搬运工,负责将追踪状态从一个进程"carries"(携带,传递)到另一个进程
Inject 和 Extract:SpanContexts可以通过Inject(注入)操作向Carrier增加,或者通过Extract(提取)从Carrier中获取,跨进程通讯数据(例如:HTTP头)。通过这种方式,SpanContexts可以跨越进程边界,并提供足够的信息来建立跨进程的span间关系(因此可以实现跨进程连续追踪)

2 通过dubbo插件分析跨进程数据传播

我们以SkyWalking java agent的dubbo-2.7.x-plugin插件为例,其中跨进程传播数据的核心代码在org.apache.skywalking.apm.plugin.asf.dubbo.DubboInterceptor,下面是该类跨进程传播的核心代码:

public class DubboInterceptor implements InstanceMethodsAroundInterceptor {/*** Consumer: The serialized trace context data will* inject to the {@link RpcContext#attachments} for transport to provider side.* <p>* Provider: The serialized trace context data will extract from* {@link RpcContext#attachments}. current trace segment will ref if the serialization context data is not null.*/@Overridepublic void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,MethodInterceptResult result) throws Throwable {......if (isConsumer) { // 1、consumer端// ContextCarrierfinal ContextCarrier contextCarrier = new ContextCarrier();// 1.1 createExitSpan()内部会调用TracerContext.inject(carrier),将TracerContext中的context数据inject(注入)到ContextCarrier的context中span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);CarrierItem next = contextCarrier.items();// 1.2 遍历ContextCarrier,从ContextCarrier的context获取数据,注入到dubbo的attachment,从consumer端传递到provider端while (next.hasNext()) {next = next.next();rpcContext.setAttachment(next.getHeadKey(), next.getHeadValue());if (invocation.getAttachments().containsKey(next.getHeadKey())) {invocation.getAttachments().remove(next.getHeadKey());}}} else { // 2 provider端// 2.1 从consumer端传递到provider端的attachment中获取跨进程协议数据,然后设置到contextContextCarrier contextCarrier = new ContextCarrier();CarrierItem next = contextCarrier.items();while (next.hasNext()) {next = next.next();next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));}// 2.2 createEntrySpan()内部会调用TracerContext.extract(carrier),将ContextCarrier的context数据extract(提取)到将TracerContext中的context中span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);span.setPeer(rpcContext.getRemoteAddressString());}}
}

从上面的源码可以看出在服务调用方和被调用方,都会用到ContextCarrier,他是临时搬运工,负责两个进程的TracerContext数据的传递。
下面分析ContextCarrier等类的核心源码。

3 分析跨进程传播协议的核心源码

TracingContext
org.apache.skywalking.apm.agent.core.context.TracingContext是OpenTracing的SpanContext的一种实现,里面包含了span的上下文,包含在segment、correlationContext、extensionContext,而inject()、extract()负责跨进程上下文透传。

public class TracingContext implements AbstractTracerContext {/*** The final {@link TraceSegment}, which includes all finished spans.*/private TraceSegment segment;@Getter(AccessLevel.PACKAGE)private final CorrelationContext correlationContext;@Getter(AccessLevel.PACKAGE)private final ExtensionContext extensionContext;/*** Prepare for the cross-process propagation. How to initialize the carrier, depends on the implementation.** @param carrier to carry the context for crossing process.*/void inject(ContextCarrier carrier);/*** Build the reference between this segment and a cross-process segment. How to build, depends on the* implementation.** @param carrier carried the context from a cross-process segment.*/void extract(ContextCarrier carrier);
}

ContextCarrier
ContextCarrier作为传递跨进程数据的搬运工,负责将追踪状态从一个进程"carries"(携带,传递)到另一个进程,其中包含了sw8协议里的Standard Header项、Extension Header项、Correlation Header项相关的上下文数据,具体参考下面的代码:

public class ContextCarrier implements Serializable {/*** extensionContext包含了在某些特定场景中用于增强分析的可选上下文,对应sw8的Extension Header项*/private ExtensionContext extensionContext = new ExtensionContext();/*** 用户的自定义上下文容器。此上下文与主追踪上下文一同传播。对应sw8的Correlation Header项*/private CorrelationContext correlationContext = new CorrelationContext();/*** @return 存在于当前tracing上下文中的item清单*/public CarrierItem items() {SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null);SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(correlationContext, sw8ExtensionCarrierItem);SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem);return new CarrierItemHead(sw8CarrierItem);}/*** Extract the extension context to tracing context*/void extractExtensionTo(TracingContext tracingContext) {tracingContext.getExtensionContext().extract(this);// The extension context could have field not to propagate further, so, must use the this.* to process.this.extensionContext.handle(tracingContext.activeSpan());}/*** Extract the correlation context to tracing context*/void extractCorrelationTo(TracingContext tracingContext) {tracingContext.getCorrelationContext().extract(this);// The correlation context could have field not to propagate further, so, must use the this.* to process.this.correlationContext.handle(tracingContext.activeSpan());}/*** 序列化sw8的Standard Header项,使用 '-' 分割各个字段* Serialize this {@link ContextCarrier} to a {@link String}, with '|' split.* @return the serialization string.*/String serialize(HeaderVersion version) {if (this.isValid(version)) {return StringUtil.join('-',"1",Base64.encode(this.getTraceId()),Base64.encode(this.getTraceSegmentId()),this.getSpanId() + "",Base64.encode(this.getParentService()),Base64.encode(this.getParentServiceInstance()),Base64.encode(this.getParentEndpoint()),Base64.encode(this.getAddressUsedAtClient()));}return "";}/*** 反序列化sw8的Standard Header项* Initialize fields with the given text.* @param text carries {@link #traceSegmentId} and {@link #spanId}, with '|' split.*/ContextCarrier deserialize(String text, HeaderVersion version) {if (text == null) {return this;}if (HeaderVersion.v3.equals(version)) {String[] parts = text.split("-", 8);if (parts.length == 8) {try {// parts[0] is sample flag, always trace if header exists.this.traceId = Base64.decode2UTFString(parts[1]);this.traceSegmentId = Base64.decode2UTFString(parts[2]);this.spanId = Integer.parseInt(parts[3]);this.parentService = Base64.decode2UTFString(parts[4]);this.parentServiceInstance = Base64.decode2UTFString(parts[5]);this.parentEndpoint = Base64.decode2UTFString(parts[6]);this.addressUsedAtClient = Base64.decode2UTFString(parts[7]);} catch (IllegalArgumentException ignored) {}}}return this;}
}

CorrelationContext
ContextCarrier里包含里sw8的Correlation Header项存放于CorrelationContext,这个类非常有用,适合我们去在全链路跨进程传递自定义的数据。
sw8协议里的Standard Header项、Extension Header项是比较固定的协议格式,我们可以扩展这些协议,例如Standard Header项,当前固定是8位的,对应8个字段,我们可以扩展为9位,第九位可以定义为userId。但是如果要这样改造,就得修改ContextCarrier类序列化、反序列的逻辑,要重新发布agent,并考虑好新旧版本兼容性问题、以及不同语言的agent是否兼容。
而sw8的Correlation Header项使用起来就非常方便。先看下对应实现了CorrelationContext的源码:

/*** Correlation context, use to propagation user custom data.* Correlation上下文,用于传播用户自定义数据*/
public class CorrelationContext {private final Map<String, String> data;/*** Add or override the context. 添加或覆盖上下文数据** @param key   to add or locate the existing context* @param value as new value* @return old one if exist.*/public Optional<String> put(String key, String value) {// 可以存放于span的tag中if (AUTO_TAG_KEYS.contains(key) && ContextManager.isActive()) {ContextManager.activeSpan().tag(new StringTag(key), value);}// settingdata.put(key, value);return Optional.empty();}/*** @param key to find the context 获取上下文数据* @return value if exist.*/public Optional<String> get(String key) {return Optional.ofNullable(data.get(key));}/*** Serialize this {@link CorrelationContext} to a {@link String} 序列化** @return the serialization string.*/String serialize() {if (data.isEmpty()) {return "";}return data.entrySet().stream().map(entry -> Base64.encode(entry.getKey()) + ":" + Base64.encode(entry.getValue())).collect(Collectors.joining(","));}/*** Deserialize data from {@link String} 反序列化*/void deserialize(String value) {if (StringUtil.isEmpty(value)) {return;}for (String perData : value.split(",")) {// Only data with limited count of elements can be addedif (data.size() >= Config.Correlation.ELEMENT_MAX_NUMBER) {break;}final String[] parts = perData.split(":");if (parts.length != 2) {continue;}data.put(Base64.decode2UTFString(parts[0]), Base64.decode2UTFString(parts[1]));}}/*** Prepare for the cross-process propagation. Inject the {@link #data} into {@link* ContextCarrier#getCorrelationContext()}*/void inject(ContextCarrier carrier) {carrier.getCorrelationContext().data.putAll(this.data);}/*** Extra the {@link ContextCarrier#getCorrelationContext()} into this context.*/void extract(ContextCarrier carrier) {......}/*** Clone the context data, work for capture to cross-thread. 克隆数据,用于跨线程传递*/@Overridepublic CorrelationContext clone() {final CorrelationContext context = new CorrelationContext();context.data.putAll(this.data);return context;}/*** Continue the correlation context in another thread.传递到另外的线程** @param snapshot holds the context.*/void continued(ContextSnapshot snapshot) {this.data.putAll(snapshot.getCorrelationContext().data);}
}

通过源码可知,CorrelationContext通过Map<String, String>来存放数据,CorrelationContext数据支持跨线程、跨进程透传。

四、小结

分析Dubbo插件的跨进程核心代码,了解了跨进程传播协议的核心实现逻辑。

其实在其他分布式追踪系统(如Zipkin、Jager)、全链路灰度系统等涉及到跨进程数据传播的系统中,也是使用了类似于上面SkyWalking协议的思路。

参考

SkyWalking Cross Process Propagation Headers Protocol
SkyWalking Cross Process Correlation Headers Protocol
详解 Apache SkyWalking 的跨进程传播协议

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/155818.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

亚马逊,速卖通,敦煌产品测评补单攻略:低成本、高安全实操指南

随着电商平台的发展和消费者对产品质量的要求提升&#xff0c;测评补单成为了商家们提升销售和用户口碑的关键环节。然而&#xff0c;如何在保持成本低廉的同时确保操作安全&#xff0c;一直是卖家们面临的挑战。今天林哥分享一些实用的技巧和策略&#xff0c;帮助卖家们产品的…

嵌入式C语言自我修养《内存堆栈管理》学习笔记

目录 一、Linux环境下的内存管理 二、栈的管理 三、堆内存管理 四、mmap映射区 五、内存泄漏与防范 六、常见的内存错误及检测 C程序中定义的函数、全局变量、静态变量经过编译链接后&#xff0c;分别以section的形式存储在可执行文件的代码段、数据段和BSS段中。当程序运…

【Zabbix】Zabbix学习笔记

现在Zabbix Server存在的问题&#xff1a; 问题1&#xff1a; Zabbix server: Utilization of discoverer processes over 75% 问题2&#xff1a; Zabbix server: Utilization of icmp pinger processes over 75% 优化的解决办法是修改配置文件把Discovery和Pinger进程数量调大…

04-RocketMQ源码解读

目录汇总&#xff1a;RocketMQ从入门到精通汇总 上一篇&#xff1a;03-RocketMQ高级原理 这一部分&#xff0c;我们开始深入RocketMQ的源码。源码的解读是个非常困难的过程&#xff0c;每个人的理解程度都会不一样&#xff0c;也不太可能通过讲解把其中的细节全部讲明白。我们今…

panads操作excel

panads简介 pandas是基于Numpy创建的Python包&#xff0c;内置了大量标准函数&#xff0c;能够高效地解决数据分析数据处理和分析任务&#xff0c;pandas支持多种文件的操作&#xff0c;比如Excel&#xff0c;csv&#xff0c;json&#xff0c;txt 文件等&#xff0c;读取文件之…

unity发布微信小游戏,未找到 game.json报错原因

unity发布微信小游戏&#xff0c;未找到 game.json报错原因 同一个问题相隔一年遇到两次&#xff0c;两次原因都不一样&#xff0c;记录一下&#xff0c;以后不要再掉坑里 原因一&#xff1a;申请的appID是小程序不是小游戏 解决方法&#xff1a;需要在程序平台修改服务类目 如…

哈希应用之布隆过滤器

文章目录 1.介绍1.1百度搜索1.2知乎好文1.3自身理解 2.模拟实现2.1文档阅读2.2代码剖析 3.误判率的研究4.布隆过滤器的应用4.1如何找到两个分别有100亿个字符串的文件的交集[只有1G内存].分别给出精确算法和近似算法4.2如何扩展BloomFilter使得它支持删除元素的操作 5.整体代码…

pytorch中nn.DataParallel多次使用

pytorch中nn.DataParallel多次使用 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader# 定义模型 class MyModel(nn.Module):def __init__(self):super(MyModel, self).__init__()self.fc nn.Linear(10, 1)def forwa…

ROS为机器人装配激光雷达

移动机器人在环境中获取障碍物的具体位置、房间的内部轮廓等信息都是非常必要的&#xff0c;这些信息是机器人创建地图、进行导航的基础数据&#xff0c;除上面所讲的Kinect&#xff0c;还可以使用激光雷达作为这种场景应用下的传感器。 激光雷达可用于测量机器人和其他物体之间…

3.简单场景构建

在新建的项目中&#xff0c;默认存在 Main Camera 和 Directional Light两个对象。若是缺失&#xff0c;可通过选择菜单中的 Game Object->Camera 和 Geme Object->Light->Directional Light进行创建。 1.添加地形及底图 通过在Cesium面板中选择 Cesium World Terrai…

批量文件重命名软件 A Better Finder Rename 11汉化for mac

A Better Finder Rename 11是一款功能强大的文件重命名工具&#xff0c;可在Mac操作系统上使用。它提供了简单而直观的界面&#xff0c;帮助用户快速批量重命名文件和文件夹&#xff0c;提高文件管理和组织效率。 以下是A Better Finder Rename 11可能提供的一些主要功能和特点…

设计模式 - 结构型模式考点篇:适配器模式(类适配器、对象适配器、接口适配器)

目录 一、适配器模式 一句话概括结构式模式 1.1、适配器模式概述 1.2、案例 1.2.1、类适配器模式实现案例 1.2.2、对象适配器 1.2.3、接口适配器 1.3、优缺点&#xff08;对象适配器模式&#xff09; 1.4、应用场景 一、适配器模式 一句话概括结构式模式 教你将类和对…

多线程入门

1 创建线程 下面的程序&#xff0c;我们可以用它来创建一个 POSIX 线程&#xff1a; #include <pthread.h> pthread_create (myThread, attr, start_routine, arg) 在这里&#xff0c;pthread_create 创建一个新的线程&#xff0c;并让它可执行。下面是关于参数的说明…

QT自制软键盘 最完美、最简单、跟自带虚拟键盘一样

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 QT自制软键盘 最完美、最简单、跟自带虚拟键盘一样 Chapter1 QT自制软键盘 最完美、最简单、跟自带虚拟键盘一样一、本自制虚拟键盘特点二、windows打开系统自带软键盘三、让…

3、在docker 容器中安装tomcat

&#xff11;、在服务器上查找tomcat镜像,查看前5条 docker search tomcat --limit 5​​​​​​​ 2、拉取镜像到本地 拉取官方的tomcat到本地 docker pull tomcat:9.0.34-jdk8 3、查看本地镜像 docker images |grep tomcat 4、启动tomcat 服务 使用默认配置 docker ru…

如何选择一个向量数据库:Elastic Cloud 和 Zilliz Cloud 面面观

随着以 Milvus 为代表的向量数据库在 AI 产业界越来越受欢迎&#xff0c;诸如 Elasticsearch 之类的传统数据库和检索系统也开始行动起来&#xff0c;纷纷在快速集成专门的向量检索插件方面展开角逐。 例如&#xff0c;在提供类似插件的传统数据库中&#xff0c;Elasticsearch …

VAE模型(详细推导+实例代码)

文章目录 EM算法思路E步M步直观感觉 GMM模型VAEVAE思想从GMM到VAE公式推导重参数VAE神经网络另一个视角的VAE思想为什么引入encoder为什么要重参数噪声与重建 Discrete VAE 本文会从EM算法&#xff0c;GMM模型一步一步的的推导&#xff0c;在过渡到VAE模型&#xff0c;如果有熟…

棱镜七彩参编!开源领域4项团体标准正式发布

近日&#xff0c;中电标2023年第27号团体标准公告正式发布&#xff0c;《T/CESA 1270.2-2023 信息技术 开源治理 第 2 部分&#xff1a;企业治理评估模型》、《T/CESA 1270.3-2023 信息技术 开源治理 第 3 部分&#xff1a;社区治理框架》、《T/CESA 1270.5-2023 信息技术 开源…

信创办公–基于WPS的EXCEL最佳实践系列 (单元格与行列)

信创办公–基于WPS的EXCEL最佳实践系列 &#xff08;单元格与行列&#xff09; 目录 应用背景操作步骤1、插入和删除行和列2、合并单元格3、调整行高与列宽4、隐藏行与列5、修改单元格对齐和缩进6、更改字体7、使用格式刷8、设置单元格内的文本自动换行9、应用单元格样式10、插…

STM32F4X I2C LM75

STM32F4X I2C LM75 I2C协议讲解I2C接线I2C协议波形I2C起始信号I2C停止信号I2C应答信号I2C寻址I2C地址格式 I2C数据传输 LM75ALM75A介绍LM75A引脚说明LM75A地址LM75A寄存器LM75A I2C协议写配置寄存器读配置寄存器写Tos和Thyst寄存器读Tos Thyst Temp寄存器LM75A温度计算 LM75A例…