本人阅读了 Skywalking 的大部分核心代码,也了解了相关的文献,对此深有感悟,特此借助巨人的思想自己手动用JAVA语言实现了一个 “调用链监控APM” 系统。本书采用边讲解实现原理边编写代码的方式,看本书时一定要跟着敲代码。
作者已经将过程写成一部书籍,奈何没有钱发表,如果您知道渠道可以联系本人。一定重谢。
本书涉及到的核心技术与思想
JavaAgent , ByteBuddy,SPI服务,类加载器的命名空间,增强JDK类,kafka,插件思想,切面,链路栈等等。实际上远不止这么多,差不多贯通了整个java体系。
适用人群
自己公司要实现自己的调用链的;写架构的;深入java编程的;阅读Skywalking源码的;
版权
本书是作者呕心沥血亲自编写的代码,不经同意切勿拿出去商用,否则会追究其责任。
原版PDF+源码请见:
本章涉及到的工具类也在这里面:
PDF书籍《手写调用链监控APM系统-Java版》第1章 开篇介绍-CSDN博客
第7章 插件与链路的结合:Tomcat插件实现
通过前面的章节,我们已经把所有基建工程开发完成了,本章就是制作各种插桩插件,通过这些插件的修改原始调用字节码来实现创建链路以及上报链路数据。
7.1 Tomcat插件实现与测试
制作这个插件是为了上报http的get,post请求链路数据,链路数据包含请求的url,请求时间,服务名等,也就是链路的span的一些信息。
拦截这个插件就需要你要了解当一个请求发生时,会必须执行tomcat的哪个类的哪个方法,这里我直接告诉你答案:
类名:org.apache.catalina.core.StandardHostValve
方法:invoke
非JDK类库
首先先将tomcat库的依赖加上,在tomcat-plugin的pom中添加:
<dependency><groupId>com.hadluo.apm</groupId><artifactId>apm-commons</artifactId><version>1.0</version><scope>compile</scope>
</dependency><dependency><groupId>org.apache.tomcat.embed</groupId><artifactId>tomcat-embed-core</artifactId><version>8.5.43</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.tomcat.embed</groupId><artifactId>tomcat-embed-jasper</artifactId><version>8.5.43</version><scope>provided</scope>
</dependency>
根据上面描述的拦截信息,修改我们之前的tomcat-plugin的测试代码:
com.hadluo.apm.plugin.tomcat.TomcatInstrumentation
public class TomcatInstrumentation extends AbstractClassEnhancePluginDefine {@Overridepublic String enhanceClass() {// 要增强的 类return "org.apache.catalina.core.StandardHostValve";}@Overridepublic MethodsInterceptPoint[] configMethodsInterceptPoint() {return new MethodsInterceptPoint[]{new MethodsInterceptPoint() {@Overridepublic ElementMatcher<MethodDescription> getMethodsMatcher() {// 拦截 invoke 方法return ElementMatchers.named("invoke");}@Overridepublic String getMethodsInterceptor() {// 拦截逻辑交给 TomcatInvokeInterceptorreturn "com.hadluo.apm.plugin.tomcat.TomcatInvokeInterceptor";}@Overridepublic boolean isOverrideArgs() {return false;}}};}
}
上述插件定义帮我们声明了要增强的是StandardHostValve类中的invoke方法,然后将增强处理逻辑交给了TomcatInvokeInterceptor拦截器类。如果你忘记了插件的实现,可以回到第4章看看如何实现插件的。
接下来修改TomcatInvokeInterceptor拦截器代码:
com.hadluo.apm.plugin.tomcat.TomcatInvokeInterceptor
public class TomcatInvokeInterceptor implements InstanceMethodsAroundInterceptor {@Overridepublic void beforeMethod(Object objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes) throws Throwable {}@Overridepublic Object afterMethod(Object objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {// 需要结束这个spanTraceContextManager service = ServiceManager.INSTANCE.getService(TraceContextManager.class);service.stopSpan();return ret;}@Overridepublic void handleMethodException(Object objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {// 出异常了,需要记录异常// 取出栈顶的 spanTraceContextManager service = ServiceManager.INSTANCE.getService(TraceContextManager.class);AbstractSpan span = service.activeSpan();span.log(t) ;}
}
方法执行后stopSpan ,出异常了记录log,这两个都很简单,beforeMethod稍微麻烦,单独拿出来。
beforeMethod为StandardHostValve的invoke方法执行之前的调用,这里表示是一个请求进来了,所以我们需要创建的是一个EntrySpan,同时,我们还需要解析http中的header数据,防止不是链路的第一层请求。
beforeMethod方法代码实现:
// 获取invoke方法的第一个参数,也就是tomcat请求的Request
Request request = (Request) allArguments[0];
// 将 header的 值 放到 ContextCarrier
ContextCarrier carrier = new ContextCarrier();
Map<String,String> headers = new HashMap<String,String>();
carrier.keys().forEach(key->{if(request.getHeader(key) != null){headers.put(key,request.getHeader(key));}
});
carrier.deserialize(headers);
// 构造 entry span
TraceContextManager service = ServiceManager.INSTANCE.getService(TraceContextManager.class);
AbstractSpan entrySpan = service.createEntrySpan(request.getRequestURI(), carrier);// 设置参数信息
entrySpan.setTag("url", request.getRequestURI());
entrySpan.setTag("http.method", request.getMethod());
entrySpan.setComponent("Tomcat") ;
entrySpan.setLayer(SpanLayer.HTTP);
跨进程传播ContextCarrier 时,我们将ContextCarrier 里面的字段名称作为http header的key,值就作为http header的key值进行传递。
carrier.keys就是获取里面的字段名称集合。在com.hadluo.apm.commons.trace.ContextCarrier类中新增方法:
public Set<String> keys() {Set<String> keys = new HashSet<String>();for (Field f : this.getClass().getDeclaredFields()) {keys.add(SW_FLAG + f.getName());}return keys;
}
我们对http header的key加了一个前缀,防止ContextCarrier字段名和微服务应用要用的http header的key重名。
在com.hadluo.apm.commons.trace.ContextCarrier类中新增字段:
//key标识
private static final String SW_FLAG = "SW_APM_";
carrier.deserialize 是一个相当于反序列化的方法,刚构造出来的carrier里面的字段是为空的,需要deserialize 方法就是将http的header中的数据拷贝到carrier字段中的。
在com.hadluo.apm.commons.trace.ContextCarrier类中新增方法:
public void deserialize(Map<String, String> param) {// 获取所有字段for (Field f : this.getClass().getDeclaredFields()) {if (!param.containsKey(SW_FLAG + f.getName())) {continue;}// 含有携带的数据String value = param.get(SW_FLAG + f.getName());f.setAccessible(true);try {f.set(this, value);} catch (IllegalAccessException e) {Logs.err(getClass(), "ContextCarrier deserialize错误, field: " + f.getName() + " ,value:" + value, e);}}
}
beforeMethod后面的逻辑就是创建EntrySpan,然后设置标签等信息,此时就会创建一个链路上下文,包含一个TraceSegment,然后开辟一个存储span的栈空间,入栈第一个EntrySpan,这些都是在第5章做过了详细的介绍了。
至于将ContextCarrier的值设置到http header中的逻辑就不是这个插件完成的,而是发起http调用的插件,比如http client, 是创建ExitSpan时要进行的操作。
到此Tomcat插件代码编写完成。还要注意两点:
1. 在hadluo-apm-plugin.def插件定义文件中声明
2. 在agent-core项目的pom中引用到插件的maven坐标。
这两点之前讲插件都已经完成。此时我们就可以监控tomcat的请求了。接下来我们进行测试。
修改我们测试的微服务controller接口:
@GetMapping("/order")
public String order(@RequestParam("shopId")String shopId) throws ClassNotFoundException {System.out.println("下单请求 商品ID:" + shopId);return UUID.randomUUID().toString();
}
打包apm-agent-core, 启动测试,访问接口。后台会打印出kafka发送,发现我们数据已经上报到kafka了。
将数据格式化:
{"msgTypeClass": "com.hadluo.apm.commons.kafka.Segment","sampleTime": 1733280074533,"serviceName": "smartapm-test","serviceInstance": "4a4bbeaf82f045b6b2055046d2e96860@192.168.2.233","traceId": "f9e28159290b443fbe8323fa5919b1d1.45.17332800535210001","traceSegmentId": "f9e28159290b443fbe8323fa5919b1d1.45.17332800535210000","spans": [{"spanId": 0,"parentSpanId": -1,"startTime": 1733280053525,"endTime": 1733280074531,"refs": [],"operationName": "/order","peer": null,"spanType": "Entry","spanLayer": "HTTP","component": "Tomcat","tags": {"http.method": "GET","url": "/order"},"logs": {}}]
}
这就是一条简单的链路信息,我们可以看出来里面就一个span操作,组件是tomcat,类型是EntrySpan,访问的接口为/order 。
这个json数据其实就是对应一个结束的TraceSegment的信息,包含了里面所有的span,字段意思在第5章讲解链路就已经详细阐述过。
接下来我们还需要测试http接口异常的情况,改写测试接口,编写会抛出零除异常,代码如下:
@GetMapping("/order")
public String order(@RequestParam("shopId")String shopId) throws ClassNotFoundException {System.out.println("下单请求 商品ID:" + shopId);int i=0;i = 12/i ;return UUID.randomUUID().toString();
}
测试发现上报的链路数据中并没有log出现。原因是handleMethodException并没有执行,也就是说这种异常不会在StandardHostValve类的invoke方法中被抛出来。其实StandardHostValve类还有一个throwable方法,异常会走这个方法。
在插件定义TomcatInstrumentation中多声明一个执行方法,改写
configMethodsInterceptPoint方法:
@Override
public MethodsInterceptPoint[] configMethodsInterceptPoint() {return new MethodsInterceptPoint[]{new MethodsInterceptPoint() {@Overridepublic ElementMatcher<MethodDescription> getMethodsMatcher() {// 拦截 invoke 方法return ElementMatchers.named("invoke");}@Overridepublic String getMethodsInterceptor() {// 拦截逻辑交给 TomcatInvokeInterceptorreturn "com.hadluo.apm.plugin.tomcat.TomcatInvokeInterceptor";}@Overridepublic boolean isOverrideArgs() {return false;}},new MethodsInterceptPoint() {@Overridepublic ElementMatcher<MethodDescription> getMethodsMatcher() {// 拦截 throwable方法return ElementMatchers.named("throwable");}@Overridepublic String getMethodsInterceptor() {return "com.hadluo.apm.plugin.tomcat.TomcatExceptionInterceptor";}@Overridepublic boolean isOverrideArgs() {return false;}}};
}
增加执行逻辑拦截器TomcatExceptionInterceptor:
public class TomcatExceptionInterceptor implements InstanceMethodsAroundInterceptor {@Overridepublic Object afterMethod(Object objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {return ret;}@Overridepublic void handleMethodException(Object objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {}@Overridepublic void beforeMethod(Object objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes) throws Throwable {TraceContextManager service = ServiceManager.INSTANCE.getService(TraceContextManager.class);AbstractSpan span = service.activeSpan();// 第二个参数为 异常span.log((Throwable) allArguments[2]) ;}
}
在 beforeMethod中,也就是throwable方法执行前将异常记录到栈顶span中。
然后打包agent jar,测试,发现log已经记录在链路数据中:
到此,tomcat插件就编写完成,拿出去就可以监控tomcat容器接口了, 最后我再总结以下新增一个插件的四部曲:
1. 定义XXInstrumentation插件定义类,描述好增强类和方法。
2. 定义hadluo-apm-plugin.def插件定义文件,配置好XXInstrumentation。
3. 写好XXInterceptor方法环绕拦截器 。
4. 在apm-agent-core的pom中依赖插件的maven坐标。
后续插件都是这四部曲,我就不讲解很细致了。