Java分布式跟踪系统Zipkin(六):Brave源码分析-Brave和SpringBoot整合

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

Zipkin是用当下最流行的SpringBoot开发的,SpringBoot将Spring项目的开发过程大大简化,一切主流的开发框架都可以通过添加jar包和配置,自动激活,现在越来越受广大Java开发人员的喜爱。
上一篇博文中,我们分析了Brave是如何在SpringMVC项目中使用的,这一篇博文我们继续分析Brave和SpringBoot项目的整合方法及原理。

相关代码在Chapter6/springboot中
pom.xml中添加依赖和插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot.version}</version>
</dependency>

<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
</configuration>
</plugin>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package org.mozhu.zipkin.springboot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableAutoConfiguration
public class DefaultApplication {

public static void main(String[] args) {
SpringApplication.run(DefaultApplication.class, args);
}

}

启动Zipkin,然后分别运行

1
mvn spring-boot:run -Drun.jvmArguments="-Dserver.port=9000 -Dzipkin.service=backend"

1
mvn spring-boot:run -Drun.jvmArguments="-Dserver.port=8081 -Dzipkin.service=frontend"

浏览器访问 http://localhost:8081/ 会显示当前时间
在Zipkin的Web界面中,也能查询到这次跟踪信息

可见Brave和SpringBoot的整合更简单了,只添加了启动类DefaultApplication,其他类都没变化。至于SpringBoot的原理,这里就不展开了,网上优秀教程一大把。

在brave-instrumentation目录中,还有对其他框架的支持,有兴趣的可以看看其源代码实现。
grpc
httpasyncclient
httpclient
jaxrs2
kafka-clients
mysql
mysql6
p6spy
sparkjava

至此,我们Brave的源码分析即将告一段落,后续我们会逐步zipkin的高级用法及实现原理。

分享到

Java分布式跟踪系统Zipkin(五):Brave源码分析-Brave和SpringMVC整合

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

上一篇博文中,我们分析了Brave是如何在普通Web项目中使用的,这一篇博文我们继续分析Brave和SpringMVC项目的整合方法及原理。
我们分两个部分来介绍和SpringMVC的整合,及XML配置方式和Annotation注解方式

pom.xml添加相关依赖spring-web和spring-webmvc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-spring-web</artifactId>
<version>${brave.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-spring-webmvc</artifactId>
<version>${brave.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>

XML配置方式

在Servlet2.5规范中,必须配置web.xml,我们只需要配置DispatcherServlet,SpringMVC的核心控制器就可以了
相关代码在Chapter5/springmvc-servlet25中
web.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
version="2.5">

<display-name>SpringMVC Servlet2.5 Application</display-name>

<servlet>
<servlet-name>spring-webmvc</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>

<servlet-mapping>
<servlet-name>spring-webmvc</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>

然后在WEB-INF下配置spring-webmvc-servlet.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.2.xsd">

<context:property-placeholder/>

<bean id="sender" class="zipkin2.reporter.okhttp3.OkHttpSender" factory-method="create">
<constructor-arg type="String" value="http://localhost:9411/api/v2/spans"/>
</bean>

<bean id="tracing" class="brave.spring.beans.TracingFactoryBean">
<property name="localServiceName" value="${zipkin.service:springmvc-servlet25-example}"/>
<property name="spanReporter">
<bean class="brave.spring.beans.AsyncReporterFactoryBean">
<property name="encoder" value="JSON_V2"/>
<property name="sender" ref="sender"/>
<!-- wait up to half a second for any in-flight spans on close -->
<property name="closeTimeout" value="500"/>
</bean>
</property>
<property name="propagationFactory">
<bean id="propagationFactory" class="brave.propagation.ExtraFieldPropagation" factory-method="newFactory">
<constructor-arg index="0">
<util:constant static-field="brave.propagation.B3Propagation.FACTORY"/>
</constructor-arg>
<constructor-arg index="1">
<list>
<value>user-name</value>
</list>
</constructor-arg>
</bean>
</property>
<property name="currentTraceContext">
<bean class="brave.context.log4j2.ThreadContextCurrentTraceContext" factory-method="create"/>
</property>
</bean>

<bean id="httpTracing" class="brave.spring.beans.HttpTracingFactoryBean">
<property name="tracing" ref="tracing"/>
</bean>

<bean id="restTemplate" class="org.springframework.web.client.RestTemplate">
<property name="interceptors">
<list>
<bean class="brave.spring.web.TracingClientHttpRequestInterceptor" factory-method="create">
<constructor-arg type="brave.http.HttpTracing" ref="httpTracing"/>
</bean>
</list>
</property>
</bean>

<mvc:interceptors>
<bean class="brave.spring.webmvc.TracingHandlerInterceptor" factory-method="create">
<constructor-arg type="brave.http.HttpTracing" ref="httpTracing"/>
</bean>
</mvc:interceptors>

<!-- Loads the controller -->
<context:component-scan base-package="org.mozhu.zipkin.springmvc"/>
<mvc:annotation-driven/>
</beans>

使用brave.spring.beans.TracingFactoryBean创建tracing
使用brave.spring.beans.HttpTracingFactoryBean创建httpTracing
配置springmvc的拦截器brave.spring.webmvc.TracingHandlerInterceptor
并配置org.springframework.web.client.RestTemplate作为客户端发送http请求

再来看看两个Controller:Frontend和Backend,和前面FrontendServlet,BackendServlet功能一样

Frontend

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
public class Frontend {
private final static Logger LOGGER = LoggerFactory.getLogger(Frontend.class);
@Autowired
RestTemplate restTemplate;

@RequestMapping("/")
public String callBackend() {
LOGGER.info("frontend receive request");
return restTemplate.getForObject("http://localhost:9000/api", String.class);
}
}

Frontend中使用Spring提供的restTemplate向Backend发送请求

Backend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
public class Backend {

private final static Logger LOGGER = LoggerFactory.getLogger(Backend.class);

@RequestMapping("/api")
public String printDate(@RequestHeader(name = "user-name", required = false) String username) {
LOGGER.info("backend receive request");
if (username != null) {
return new Date().toString() + " " + username;
}
return new Date().toString();
}
}

Backend中收到来自Frontend的请求,并给出响应,打出当前的时间戳,如果headers中存在user-name,也会添加到响应字符串尾部

跟前面博文一样,启动Zipkin,然后分别运行

1
mvn jetty:run -Pbackend

1
mvn jetty:run -Pfrontend

浏览器访问 http://localhost:8081/ 会显示当前时间
在Zipkin的Web界面中,也能查询到这次跟踪信息

现在来分析下两个Spring相关的类
brave.spring.webmvc.TracingHandlerInterceptor - 服务端请求的拦截器,在这个类里会处理服务端的trace信息
brave.spring.web.TracingClientHttpRequestInterceptor - 客户端请求的拦截器,在这个类里会处理客户端的trace信息

TracingHandlerInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public final class TracingHandlerInterceptor extends HandlerInterceptorAdapter {

public static AsyncHandlerInterceptor create(Tracing tracing) {
return new TracingHandlerInterceptor(HttpTracing.create(tracing));
}

public static AsyncHandlerInterceptor create(HttpTracing httpTracing) {
return new TracingHandlerInterceptor(httpTracing);
}

final Tracer tracer;
final HttpServerHandler<HttpServletRequest, HttpServletResponse> handler;
final TraceContext.Extractor<HttpServletRequest> extractor;

@Autowired TracingHandlerInterceptor(HttpTracing httpTracing) { // internal
tracer = httpTracing.tracing().tracer();
handler = HttpServerHandler.create(httpTracing, new HttpServletAdapter());
extractor = httpTracing.tracing().propagation().extractor(HttpServletRequest::getHeader);
}

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) {
if (request.getAttribute(SpanInScope.class.getName()) != null) {
return true; // already handled (possibly due to async request)
}

Span span = handler.handleReceive(extractor, request);
request.setAttribute(SpanInScope.class.getName(), tracer.withSpanInScope(span));
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object o, Exception ex) {
Span span = tracer.currentSpan();
if (span == null) return;
((SpanInScope) request.getAttribute(SpanInScope.class.getName())).close();
handler.handleSend(response, ex, span);
}
}

TracingHandlerInterceptor继承了HandlerInterceptorAdapter,覆盖了其中preHandle和afterCompletion方法,分别在请求执行前,和请求完成后执行。
这里没办法向前面几篇博文的一样,使用try-with-resources来自动关闭SpanInScope,所以只能在preHandle中将SpanInScope放在request的attribute中,然后在afterCompletion中将其取出来手动close,其他代码逻辑和前面TracingFilter里一样

TracingClientHttpRequestInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public final class TracingClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
static final Propagation.Setter<HttpHeaders, String> SETTER = HttpHeaders::set;

public static ClientHttpRequestInterceptor create(Tracing tracing) {
return create(HttpTracing.create(tracing));
}

public static ClientHttpRequestInterceptor create(HttpTracing httpTracing) {
return new TracingClientHttpRequestInterceptor(httpTracing);
}

final Tracer tracer;
final HttpClientHandler<HttpRequest, ClientHttpResponse> handler;
final TraceContext.Injector<HttpHeaders> injector;

@Autowired TracingClientHttpRequestInterceptor(HttpTracing httpTracing) {
tracer = httpTracing.tracing().tracer();
handler = HttpClientHandler.create(httpTracing, new HttpAdapter());
injector = httpTracing.tracing().propagation().injector(SETTER);
}

@Override public ClientHttpResponse intercept(HttpRequest request, byte[] body,
ClientHttpRequestExecution execution) throws IOException {
Span span = handler.handleSend(injector, request.getHeaders(), request);
ClientHttpResponse response = null;
Throwable error = null;
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
return response = execution.execute(request, body);
} catch (IOException | RuntimeException | Error e) {
error = e;
throw e;
} finally {
handler.handleReceive(response, error, span);
}
}

static final class HttpAdapter
extends brave.http.HttpClientAdapter<HttpRequest, ClientHttpResponse> {

@Override public String method(HttpRequest request) {
return request.getMethod().name();
}

@Override public String url(HttpRequest request) {
return request.getURI().toString();
}

@Override public String requestHeader(HttpRequest request, String name) {
Object result = request.getHeaders().getFirst(name);
return result != null ? result.toString() : null;
}

@Override public Integer statusCode(ClientHttpResponse response) {
try {
return response.getRawStatusCode();
} catch (IOException e) {
return null;
}
}
}
}

TracingClientHttpRequestInterceptor里的逻辑和前面博文分析的brave.okhttp3.TracingInterceptor类似,此处不再展开分析

下面再来介绍用Annotation注解方式来配置SpringMVC和Brave

Annotation注解方式

相关代码在Chapter5/springmvc-servlet3中
在Servlet3以后,web.xml不是必须的了,org.mozhu.zipkin.springmvc.Initializer是我们整个应用的启动器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Initializer extends AbstractAnnotationConfigDispatcherServletInitializer {

@Override protected String[] getServletMappings() {
return new String[] {"/"};
}

@Override protected Class<?>[] getRootConfigClasses() {
return null;
}

@Override protected Class<?>[] getServletConfigClasses() {
return new Class[] {TracingConfiguration.class};
}
}

org.mozhu.zipkin.springmvc.Initializer,继承自AbstractDispatcherServletInitializer,实现了WebApplicationInitializer
WebApplicationInitializer

1
2
3
4
5
public interface WebApplicationInitializer {

void onStartup(ServletContext servletContext) throws ServletException;

}

关于Servlet3的容器是如何启动的,我们再来看一个类SpringServletContainerInitializer,该类实现了javax.servlet.ServletContainerInitializer接口,并且该类上有一个javax.servlet.annotation.HandlesTypes注解
Servlet3规范规定实现Servlet3的容器,必须加载classpath里所有实现了ServletContainerInitializer接口的类,并调用其onStartup方法,传入的第一个参数是类上HandlesTypes中所指定的类,这里是WebApplicationInitializer的集合
在SpringServletContainerInitializer的onStartup方法中,会将传入的WebApplicationInitializer类,全部实例化,并且排序,然后依次调用它们的initializer.onStartup(servletContext)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@HandlesTypes(WebApplicationInitializer.class)
public class SpringServletContainerInitializer implements ServletContainerInitializer {

@Override
public void onStartup(Set<Class<?>> webAppInitializerClasses, ServletContext servletContext)
throws ServletException {

List<WebApplicationInitializer> initializers = new LinkedList<WebApplicationInitializer>();

if (webAppInitializerClasses != null) {
for (Class<?> waiClass : webAppInitializerClasses) {
// Be defensive: Some servlet containers provide us with invalid classes,
// no matter what @HandlesTypes says...
if (!waiClass.isInterface() && !Modifier.isAbstract(waiClass.getModifiers()) &&
WebApplicationInitializer.class.isAssignableFrom(waiClass)) {
try {
initializers.add((WebApplicationInitializer) waiClass.newInstance());
}
catch (Throwable ex) {
throw new ServletException("Failed to instantiate WebApplicationInitializer class", ex);
}
}
}
}

if (initializers.isEmpty()) {
servletContext.log("No Spring WebApplicationInitializer types detected on classpath");
return;
}

servletContext.log(initializers.size() + " Spring WebApplicationInitializers detected on classpath");
AnnotationAwareOrderComparator.sort(initializers);
for (WebApplicationInitializer initializer : initializers) {
initializer.onStartup(servletContext);
}
}

}

另外Servlet3在ServletContext中提供了addServlet方法,允许以编码方式向容器中添加Servlet

1
public ServletRegistration.Dynamic addServlet(String servletName, Servlet servlet);

而在AbstractDispatcherServletInitializer中registerDispatcherServlet方法会将SpringMVC的核心控制器DispatcherServlet添加到Web容器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected void registerDispatcherServlet(ServletContext servletContext) {
String servletName = getServletName();
Assert.hasLength(servletName, "getServletName() must not return empty or null");

WebApplicationContext servletAppContext = createServletApplicationContext();
Assert.notNull(servletAppContext,
"createServletApplicationContext() did not return an application " +
"context for servlet [" + servletName + "]");

FrameworkServlet dispatcherServlet = createDispatcherServlet(servletAppContext);
dispatcherServlet.setContextInitializers(getServletApplicationContextInitializers());

ServletRegistration.Dynamic registration = servletContext.addServlet(servletName, dispatcherServlet);
Assert.notNull(registration,
"Failed to register servlet with name '" + servletName + "'." +
"Check if there is another servlet registered under the same name.");

registration.setLoadOnStartup(1);
registration.addMapping(getServletMappings());
registration.setAsyncSupported(isAsyncSupported());

Filter[] filters = getServletFilters();
if (!ObjectUtils.isEmpty(filters)) {
for (Filter filter : filters) {
registerServletFilter(servletContext, filter);
}
}

customizeRegistration(registration);
}
protected FrameworkServlet createDispatcherServlet(WebApplicationContext servletAppContext) {
return new DispatcherServlet(servletAppContext);
}

以前用xml配置bean的方式,全改为在TracingConfiguration类里用@Bean注解来配置,并且使用@ComponentScan注解指定controller的package,让Spring容器可以扫描到这些Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Configuration
@EnableWebMvc
@ComponentScan(basePackages = "org.mozhu.zipkin.springmvc")
@Import({TracingClientHttpRequestInterceptor.class, TracingHandlerInterceptor.class})
public class TracingConfiguration extends WebMvcConfigurerAdapter {

@Bean Sender sender() {
return OkHttpSender.create("http://127.0.0.1:9411/api/v2/spans");
}

@Bean AsyncReporter<Span> spanReporter() {
return AsyncReporter.create(sender());
}

@Bean Tracing tracing(@Value("${zipkin.service:springmvc-servlet3-example}") String serviceName) {
return Tracing.newBuilder()
.localServiceName(serviceName)
.propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))
.currentTraceContext(ThreadContextCurrentTraceContext.create()) // puts trace IDs into logs
.spanReporter(spanReporter()).build();
}

@Bean HttpTracing httpTracing(Tracing tracing) {
return HttpTracing.create(tracing);
}

@Autowired
private TracingHandlerInterceptor serverInterceptor;

@Autowired
private TracingClientHttpRequestInterceptor clientInterceptor;

@Bean RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
List<ClientHttpRequestInterceptor> interceptors =
new ArrayList<>(restTemplate.getInterceptors());
interceptors.add(clientInterceptor);
restTemplate.setInterceptors(interceptors);
return restTemplate;
}

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(serverInterceptor);
}
}

然后在getServletConfigClasses方法中指定TracingConfiguration,让Spring容器可以加载所有的配置

1
2
3
@Override protected Class<?>[] getServletConfigClasses() {
return new Class[] {TracingConfiguration.class};
}

Annotation和XML配置的方式相比,简化了不少,而其中使用的Tracing相关的类都一样,这里就不用再分析了

分享到

Java分布式跟踪系统Zipkin(四):Brave源码分析-HttpTracing

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

上一篇博文中,我们分析了Tracing的相关源代码,这一篇我们来看看Brave是如何在Web项目中使用的

我们先来看看普通的servlet项目中,如何使用Brave,这对我们后面分析和理解Brave和SpringMVC等框架整合有帮助

首先Chapter1/servlet25项目中配置了FrontServlet和BackendServlet以及TracingFilter

web.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
version="2.5">

<display-name>Servlet2.5 Application</display-name>

<filter>
<filter-name>TracingFilter</filter-name>
<filter-class>org.mozhu.zipkin.filter.BraveTracingFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>TracingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

<servlet>
<servlet-name>BackendServlet</servlet-name>
<servlet-class>org.mozhu.zipkin.servlet.BackendServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>BackendServlet</servlet-name>
<url-pattern>/api</url-pattern>
</servlet-mapping>

<servlet>
<servlet-name>FrontendServlet</servlet-name>
<servlet-class>org.mozhu.zipkin.servlet.FrontendServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>FrontendServlet</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>

TracingFilter

我们使用自定义的BraveTracingFilter作为入口,其init方法中,我们初始化了Tracing,然后创建HttpTracing对象,最后调用TracingFilter.create(httpTracing)创建了tracingFilter。
doFilter方法中,所有请求将被tracingFilter来处理

BraveTracingFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package org.mozhu.zipkin.filter;

import brave.Tracing;
import brave.context.log4j2.ThreadContextCurrentTraceContext;
import brave.http.HttpTracing;
import brave.propagation.B3Propagation;
import brave.propagation.ExtraFieldPropagation;
import brave.servlet.TracingFilter;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Sender;
import zipkin2.reporter.okhttp3.OkHttpSender;

import javax.servlet.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class BraveTracingFilter implements Filter {
Filter tracingFilter;

@Override
public void init(FilterConfig filterConfig) throws ServletException {
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.builder(sender)
.closeTimeout(500, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);

Tracing tracing = Tracing.newBuilder()
.localServiceName(System.getProperty("zipkin.service", "servlet25-demo"))
.spanReporter(asyncReporter)
.propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))
.currentTraceContext(ThreadContextCurrentTraceContext.create())
.build();

HttpTracing httpTracing = HttpTracing.create(tracing);
filterConfig.getServletContext().setAttribute("TRACING", httpTracing);
tracingFilter = TracingFilter.create(httpTracing);
tracingFilter.init(filterConfig);
}

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
tracingFilter.doFilter(servletRequest, servletResponse, filterChain);
}

@Override
public void destroy() {
tracingFilter.destroy();
}

}

TracingFilter

TracingFilter在brave-instrumentation-servlet包中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final class TracingFilter implements Filter {
public static Filter create(Tracing tracing) {
return new TracingFilter(HttpTracing.create(tracing));
}

public static Filter create(HttpTracing httpTracing) {
return new TracingFilter(httpTracing);
}

final ServletRuntime servlet = ServletRuntime.get();
final Tracer tracer;
final HttpServerHandler<HttpServletRequest, HttpServletResponse> handler;
final TraceContext.Extractor<HttpServletRequest> extractor;

TracingFilter(HttpTracing httpTracing) {
tracer = httpTracing.tracing().tracer();
handler = HttpServerHandler.create(httpTracing, new HttpServletAdapter());
extractor = httpTracing.tracing().propagation().extractor(HttpServletRequest::getHeader);
}
}

TracingFilter中几个重要的类

  • HttpTracing - 包含Http处理相关的组件,clientParser,serverParser,clientSampler,serverSampler
  • ServletRuntime - Servlet运行时类,包含根据环境来判断是否支持Servlet3异步调用等方法
  • HttpServerHandler - Http处理的核心组件,基本上所有和trace相关的操作均在此类中完成
  • HttpServletAdapter - HttpServlet的适配器接口,此类的引入可以让httpServerHandler类变得更为通用,因为它是一个泛型接口,跟具体的request和response无关,能和更多框架进行整合
  • TraceContext.Extractor - TraceContext的数据提取器

doFilter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = servlet.httpResponse(response);

Span span = handler.handleReceive(extractor, httpRequest);
Throwable error = null;
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
chain.doFilter(httpRequest, httpResponse); // any downstream filters see Tracer.currentSpan
} catch (IOException | ServletException | RuntimeException | Error e) {
error = e;
throw e;
} finally {
if (servlet.isAsync(httpRequest)) { // we don't have the actual response, handle later
servlet.handleAsync(handler, httpRequest, span);
} else { // we have a synchronous response, so we can finish the span
handler.handleSend(httpResponse, error, span);
}
}
}

  • 首先调用handler.handleReceive(extractor, httpRequest)从request中提取Span信息
  • 然后调用tracer.withSpanInScope(span)将Span包装成Tracer.SpanInScope,而Tracer.SpanInScope和前面博文中分析的CurrentTraceContext.Scope比较像,都实现了Closeable接口,这里的目的也一样,都是为了利用JDK7的try-with-resources的特性,JVM会自动调用close方法,做一些线程对象的清理工作。其区别是后者是SPI(Service Provider Interface),不适合暴露给真正的使用者。
    这样使得chain.doFilter(httpRequest, httpResponse)里的代码能用Tracer.currentSpan拿到从请求中提取(extract)的Span信息。
  • 最后调用handler.handleSend(httpResponse, error, span)

下面来仔细分析下handler中handleReceive和handleSend两个方法
handleReceive方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public Span handleReceive(TraceContext.Extractor<Req> extractor, Req request) {
return handleReceive(extractor, request, request);
}

public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
Span span = nextSpan(extractor.extract(carrier), request);
if (span.isNoop()) return span;

// all of the parsing here occur before a timestamp is recorded on the span
span.kind(Span.Kind.SERVER);

// Ensure user-code can read the current trace context
Tracer.SpanInScope ws = tracer.withSpanInScope(span);
try {
parser.request(adapter, request, span);
} finally {
ws.close();
}

boolean parsedEndpoint = false;
if (Platform.get().zipkinV1Present()) {
zipkin.Endpoint.Builder deprecatedEndpoint = zipkin.Endpoint.builder().serviceName("");
if ((parsedEndpoint = adapter.parseClientAddress(request, deprecatedEndpoint))) {
span.remoteEndpoint(deprecatedEndpoint.build());
}
}
if (!parsedEndpoint) {
Endpoint.Builder remoteEndpoint = Endpoint.newBuilder();
if (adapter.parseClientAddress(request, remoteEndpoint)) {
span.remoteEndpoint(remoteEndpoint.build());
}
}
return span.start();
}

  • 首先调用nextSpan(extractor.extract(carrier), request)从request中提取TraceContextOrSamplingFlags,并创建Span,并将Span的kind类型设置为SERVER
  • 然后调用parser.request(adapter, request, span),将request的内容,将span的name改为request的method即GET或者POST,而且会将当前请求的路径以Tag(http.path)写入Span中,这样我们就能在Zipkin的UI界面中能清晰的看出某个Span是发起了什么请求。
  • 最后为Span设置Endpoint信息,并调用start设置开始时间

handleSend方法

1
2
3
4
5
6
7
8
9
10
11
12
public void handleSend(@Nullable Resp response, @Nullable Throwable error, Span span) {
if (span.isNoop()) return;

// Ensure user-code can read the current trace context
Tracer.SpanInScope ws = tracer.withSpanInScope(span);
try {
parser.response(adapter, response, error, span);
} finally {
ws.close();
span.finish();
}
}

handleSend比较简单,调用parser.response(adapter, response, error, span),会将HTTP状态码写入Span的Tag(http.status_code)中,如果有出错,则会将错误信息写入Tag(error)中
最后会调用Span的finish方法,而finish方法中,会调用Reporter的report方法将Span信息上报到Zipkin。

接着看下nextSpan方法

1
2
3
4
5
6
7
8
Span nextSpan(TraceContextOrSamplingFlags extracted, Req request) {
if (extracted.sampled() == null) { // Otherwise, try to make a new decision
extracted = extracted.sampled(sampler.trySample(adapter, request));
}
return extracted.context() != null
? tracer.joinSpan(extracted.context())
: tracer.nextSpan(extracted);
}

从请求里提取的对象extracted(TraceContextOrSamplingFlags),如果没有sampled信息,则由HttpSampler的trySample方法来决定是否采样
如果extracted中含有TraceContext信息,则由tracer调用joinSpan,加入已存在的trace,这种情况一般是客户端代码使用将trace信息放入header,而服务端收到请求后,则自动加入客户端发起的trace中,所以当backend的请求运行到这段代码,会joinSpan
如果extracted中不含TraceContext信息,则由tracer调用nextSpan,这种情况一般是我们用户发起的请求,比如浏览器发起,则请求header中肯定是没有trace信息的,所以当frontend的请求运行到这段代码,会新建一个span

joinSpan方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final Span joinSpan(TraceContext context) {
if (context == null) throw new NullPointerException("context == null");
if (!supportsJoin) return newChild(context);
// If we are joining a trace, we are sharing IDs with the caller
// If the sampled flag was left unset, we need to make the decision here
TraceContext.Builder builder = context.toBuilder();
if (context.sampled() == null) {
builder.sampled(sampler.isSampled(context.traceId()));
} else {
builder.shared(true);
}
return toSpan(builder.build());
}

public Span newChild(TraceContext parent) {
if (parent == null) throw new NullPointerException("parent == null");
return nextSpan(TraceContextOrSamplingFlags.create(parent));
}

在joinSpan方法中,会共享调用方的traceId,如果调用者没有传入sampled信息,则由服务端自己决定是否采样,即sampler.isSampled(context.traceId())

nextSpan方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public Span nextSpan(TraceContextOrSamplingFlags extracted) {
TraceContext parent = extracted.context();
if (extracted.samplingFlags() != null) {
TraceContext implicitParent = currentTraceContext.get();
if (implicitParent == null) {
return toSpan(newRootContext(extracted.samplingFlags(), extracted.extra()));
}
// fall through, with an implicit parent, not an extracted one
parent = appendExtra(implicitParent, extracted.extra());
}
long nextId = Platform.get().randomLong();
if (parent != null) {
return toSpan(parent.toBuilder() // copies "extra" from the parent
.spanId(nextId)
.parentId(parent.spanId())
.shared(false)
.build());
}
TraceIdContext traceIdContext = extracted.traceIdContext();
if (extracted.traceIdContext() != null) {
Boolean sampled = traceIdContext.sampled();
if (sampled == null) sampled = sampler.isSampled(traceIdContext.traceId());
return toSpan(TraceContext.newBuilder()
.sampled(sampled)
.debug(traceIdContext.debug())
.traceIdHigh(traceIdContext.traceIdHigh()).traceId(traceIdContext.traceId())
.spanId(nextId)
.extra(extracted.extra()).build());
}
// TraceContextOrSamplingFlags is a union of 3 types, we've checked all three
throw new AssertionError("should not reach here");
}

在nextSpan方法中,首先找出合适的parent,当parent存在时,则新建一个child Span,否则返回new Span

到这里服务端接受到请求后,是如何记录Span信息的代码已经分析完毕,接下来我们看看作为客户端,我们是如何上报Span信息

FrontServlet

首先我们看到FrontServet中init方法里,我们初始化了OkHttpClient,并将TracingInterceptor拦截器添加到OkHttpClient的NetworkInterceptor拦截器栈中,然后还用CurrentTraceContext中的ExecutorService的包装方法,将Dispatcher中的ExecutorService包装后设置到OkHttpClient中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package org.mozhu.zipkin.servlet;

import brave.http.HttpTracing;
import brave.okhttp3.TracingInterceptor;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;

public class FrontendServlet extends HttpServlet {

private final static Logger LOGGER = LoggerFactory.getLogger(FrontendServlet.class);

private OkHttpClient client;

@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
HttpTracing httpTracing = (HttpTracing) config.getServletContext().getAttribute("TRACING");
client = new OkHttpClient.Builder()
.dispatcher(new Dispatcher(
httpTracing.tracing().currentTraceContext()
.executorService(new Dispatcher().executorService())
))
.addNetworkInterceptor(TracingInterceptor.create(httpTracing))
.build();
}

@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
LOGGER.info("frontend receive request");
Request request = new Request.Builder()
.url("http://localhost:9000/api")
.build();

Response response = client.newCall(request).execute();
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);

PrintWriter writer = resp.getWriter();
writer.write(response.body().string());
writer.flush();
writer.close();
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class TracingInterceptor implements Interceptor {
// ...

final Tracer tracer;
final String remoteServiceName;
final HttpClientHandler<Request, Response> handler;
final TraceContext.Injector<Request.Builder> injector;

TracingInterceptor(HttpTracing httpTracing) {
if (httpTracing == null) throw new NullPointerException("HttpTracing == null");
tracer = httpTracing.tracing().tracer();
remoteServiceName = httpTracing.serverName();
handler = HttpClientHandler.create(httpTracing, new HttpAdapter());
injector = httpTracing.tracing().propagation().injector(SETTER);
}
}

TracingInterceptor中依赖Tracer,TraceContext.Injector,HttpClientHandler,HttpAdapter。

  • TraceContext.Injector - 将Trace信息注入到HTTP Request中,即放到Http headers中
  • HttpClientHandler - 和HttpServerHandler对应,也是Http处理的核心组件,基本上所有和trace相关的操作均在此类中完成
  • HttpAdapter - 能从Http request中获得各种数据,比如method,请求Path,header值等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
Request.Builder requestBuilder = request.newBuilder();

Span span = handler.handleSend(injector, requestBuilder, request);
parseServerAddress(chain.connection(), span);
Response response = null;
Throwable error = null;
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
return response = chain.proceed(requestBuilder.build());
} catch (IOException | RuntimeException | Error e) {
error = e;
throw e;
} finally {
handler.handleReceive(response, error, span);
}
}

这里代码和TracingFilter中doFilter比较相似,是一个相反的过程

  • 首先将trace信息注入到request中,并创建Span对象
  • 然后调用chain.proceed(requestBuilder.build())来执行发送http请求
  • 最后handler.handleReceive(response, error, span)

接下来看看HttpClientHandler的handleSend方法和handleReceive方法
handleSend方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Span handleSend(TraceContext.Injector<Req> injector, Req request, Span span) {
return handleSend(injector, request, request, span);
}

public <C> Span handleSend(TraceContext.Injector<C> injector, C carrier, Req request, Span span) {
injector.inject(span.context(), carrier);
if (span.isNoop()) return span;

// all of the parsing here occur before a timestamp is recorded on the span
span.kind(Span.Kind.CLIENT);

// Ensure user-code can read the current trace context
Tracer.SpanInScope ws = tracer.withSpanInScope(span);
try {
parser.request(adapter, request, span);
} finally {
ws.close();
}

boolean parsedEndpoint = false;
if (Platform.get().zipkinV1Present()) {
zipkin.Endpoint.Builder deprecatedEndpoint = zipkin.Endpoint.builder()
.serviceName(serverNameSet ? serverName : "");
if ((parsedEndpoint = adapter.parseServerAddress(request, deprecatedEndpoint))) {
span.remoteEndpoint(deprecatedEndpoint.serviceName(serverName).build());
}
}
if (!parsedEndpoint) {
Endpoint.Builder remoteEndpoint = Endpoint.newBuilder().serviceName(serverName);
if (adapter.parseServerAddress(request, remoteEndpoint) || serverNameSet) {
span.remoteEndpoint(remoteEndpoint.build());
}
}
return span.start();
}

  • 首先调用injector.inject(span.context(), carrier)将Trace信息注入request中,并将Span的kind类型设置为CLIENT
  • 然后调用parser.request(adapter, request, span),将request的内容,将span的name改为request的method即GET或者POST,而且会将当前请求的路径以Tag(http.path)写入Span中,这样我们就能在Zipkin的UI界面中能清晰的看出某个Span是发起了什么请求。
  • 最后为Span设置Endpoint信息,并调用start设置开始时间

handleReceive方法

1
2
3
4
5
6
7
8
9
10
public void handleReceive(@Nullable Resp response, @Nullable Throwable error, Span span) {
if (span.isNoop()) return;
Tracer.SpanInScope ws = tracer.withSpanInScope(span);
try {
parser.response(adapter, response, error, span);
} finally {
ws.close();
span.finish();
}
}

handleReceive比较简单,当客户端收到服务端的响应后handleReceive方法会被调用,即调用parser.response(adapter, response, error, span),会将HTTP状态码写入Span的Tag(http.status_code)中,如果有出错,则会将错误信息写入Tag(error)中
最后会调用Span的finish方法,而finish方法中,会调用Reporter的report方法将Span信息上报到Zipkin。

BackendServlet

最后看看BackendServlet,在收到请求后,将请求的header中参数user-name取出,添加到时间戳字符串尾部,并返回。
在上一篇博文中,我们看到如果我们向Frontend发送的请求中带有header user-name参数,Frontend会将这个值传递给Backend,然后backend会将它放到响应字符串中返回,以表明接收到该header。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.mozhu.zipkin.servlet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Date;

public class BackendServlet extends HttpServlet {

private final static Logger LOGGER = LoggerFactory.getLogger(BackendServlet.class);

@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
LOGGER.info("backend receive request");
String username = req.getHeader("user-name");
String result;
if (username != null) {
result = new Date().toString() + " " + username;
} else {
result = new Date().toString();
}
PrintWriter writer = resp.getWriter();
writer.write(result);
writer.flush();
writer.close();
}

}

至此,我们已经分析完Brave是如何在普通的web项目中使用的,分析了TracingFilter拦截请求处理请求的逻辑,也分析了OkHttpClient是如何将Trace信息放入request中的。
后面博文中,我们还会继续分析Brave和Spring Web项目的整合方法。

分享到

Java分布式跟踪系统Zipkin(三):Brave源码分析-Tracing

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

上一篇博文中,我们了解了Brave框架的基本使用,并且分析了跟Tracer相关的部分源代码。这篇博文我们接着看看Tracing的初始化及相关类的源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TraceDemo {

public static void main(String[] args) {
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.builder(sender)
.closeTimeout(500, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);

Tracing tracing = Tracing.newBuilder()
.localServiceName("tracer-demo")
.spanReporter(asyncReporter)
.propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))
.currentTraceContext(ThreadContextCurrentTraceContext.create())
.build();
Tracer tracer = tracing.tracer();
// ...
}
}

Brave中各个组件创建大量使用的builder设计模式,Tacing也不例外,先来看下Tracing.Builder

Tracing.Builder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static final class Tracing.Builder {
String localServiceName;
Endpoint localEndpoint;
Reporter<zipkin2.Span> reporter;
Clock clock;
Sampler sampler = Sampler.ALWAYS_SAMPLE;
CurrentTraceContext currentTraceContext = CurrentTraceContext.Default.inheritable();
boolean traceId128Bit = false;
boolean supportsJoin = true;
Propagation.Factory propagationFactory = Propagation.Factory.B3;

public Tracing build() {
if (clock == null) clock = Platform.get();
if (localEndpoint == null) {
localEndpoint = Platform.get().localEndpoint();
if (localServiceName != null) {
localEndpoint = localEndpoint.toBuilder().serviceName(localServiceName).build();
}
}
if (reporter == null) reporter = Platform.get();
return new Default(this);
}

Builder() {
}
}

Tracing中依赖的几个重要类

  • Endpoint - IP,端口和应用服务名等信息
  • Sampler - 采样器,根据traceId来判断是否一条trace需要被采样,即上报到zipkin
  • TraceContext - 包含TraceId,SpanId,是否采样等数据
  • CurrentTraceContext - 是一个辅助类,可以用于获得当前线程的TraceContext
  • Propagation - 是一个可以向数据携带的对象carrier上注入(inject)和提取(extract)数据的接口
  • Propagation.Factory - Propagation的工厂类

前面TraceDemo例子中,我们初始化Tracing时设置了localServiceName,spanReporter,propagationFactory,currentTraceContext
其中spanReporter为AsyncReporter我们上一篇已经分析过其源代码了,在build方法中可以看到,其默认实现是Platform,默认会将Span信息用logger进行输出,而不是上报到zipkin中

1
2
3
4
5
@Override public void report(zipkin2.Span span) {
if (!logger.isLoggable(Level.INFO)) return;
if (span == null) throw new NullPointerException("span == null");
logger.info(span.toString());
}

Sampler

采样器,根据traceId来判断是否一条trace需要被采样,即上报到zipkin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public abstract class Sampler {

public static final Sampler ALWAYS_SAMPLE = new Sampler() {
@Override public boolean isSampled(long traceId) {
return true;
}

@Override public String toString() {
return "AlwaysSample";
}
};

public static final Sampler NEVER_SAMPLE = new Sampler() {
@Override public boolean isSampled(long traceId) {
return false;
}

@Override public String toString() {
return "NeverSample";
}
};

/** Returns true if the trace ID should be measured. */
public abstract boolean isSampled(long traceId);

/**
* Returns a sampler, given a rate expressed as a percentage.
*
* <p>The sampler returned is good for low volumes of traffic (<100K requests), as it is precise.
* If you have high volumes of traffic, consider {@link BoundarySampler}.
*
* @param rate minimum sample rate is 0.01, or 1% of traces
*/
public static Sampler create(float rate) {
return CountingSampler.create(rate);
}
}

Sampler.ALWAYS_SAMPLE 永远需要被采样
Sampler.NEVER_SAMPLE 永远不采样

Sampler还有一个实现类
CountingSampler可以指定采样率,如CountingSampler.create(0.5f)则对50%的请求数据进行采样,里面用到了一个算法,这里不展开分析了。

TraceContext

包含TraceId,SpanId,是否采样等数据

在Tracer的newRootContext方法中有这样一段代码,通过newBuilder来构建TraceContext对象

1
2
3
4
5
6
7
8
9
10
11
TraceContext newRootContext(SamplingFlags samplingFlags, List<Object> extra) {
long nextId = Platform.get().randomLong();
Boolean sampled = samplingFlags.sampled();
if (sampled == null) sampled = sampler.isSampled(nextId);
return TraceContext.newBuilder()
.sampled(sampled)
.traceIdHigh(traceId128Bit ? Platform.get().nextTraceIdHigh() : 0L).traceId(nextId)
.spanId(nextId)
.debug(samplingFlags.debug())
.extra(extra).build();
}

TraceContext中有以下一些属性

  • traceIdHigh - 唯一标识trace的16字节id,即128-bit
  • traceId - 唯一标识trace的8字节id
  • parentId - 父级Span的spanId
  • spanId - 在某个trace中唯一标识span的8字节id
  • shared - 如果为true,则表明需要从其他tracer上共享span信息
  • extra - 在某个trace中相关的额外数据集

还有继承自SamplingFlags的两个属性

  • sampled - 是否采样
  • debug - 是否为调试,如果为true时,就算sampled为false,也表明该trace需要采样(即可以覆盖sampled的值)

TraceContext中还定义了两个接口Injector,Extractor

1
2
3
4
5
6
7
public interface Injector<C> {
void inject(TraceContext traceContext, C carrier);
}

public interface Extractor<C> {
TraceContextOrSamplingFlags extract(C carrier);
}

  • Injector - 用于将TraceContext中的各种数据注入到carrier中,这里的carrier一般在RPC中指的是类似于Http Headers的可以携带额外信息的对象
  • Extractor - 用于在carrier中提取TraceContext相关信息或者采样标记信息TraceContextOrSamplingFlags

TraceContextOrSamplingFlags

TraceContextOrSamplingFlags是三种数据的联合类型,即TraceContext,TraceIdContext,SamplingFlags,官方文档上说

  • 当有traceId和spanId时,需用create(TraceContext)来创建
  • 当只有spanId时,需用create(TraceIdContext)来创建
  • 其他情况下,需用create(SamplingFlags)来创建
    TraceContextOrSamplingFlags里的代码比较简单,这里不展开分析了

CurrentTraceContext

CurrentTraceContext是一个辅助类,可以用于获得当前线程的TraceContext,它的默认实现类是CurrentTraceContext.Default

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static final class Default extends CurrentTraceContext {
static final ThreadLocal<TraceContext> DEFAULT = new ThreadLocal<>();
// Inheritable as Brave 3's ThreadLocalServerClientAndLocalSpanState was inheritable
static final InheritableThreadLocal<TraceContext> INHERITABLE = new InheritableThreadLocal<>();

final ThreadLocal<TraceContext> local;

/** @deprecated prefer {@link #create()} as it isn't inheritable, so can't leak contexts. */
@Deprecated
public Default() {
this(INHERITABLE);
}

/** Uses a non-inheritable static thread local */
public static CurrentTraceContext create() {
return new Default(DEFAULT);
}

/**
* Uses an inheritable static thread local which allows arbitrary calls to {@link
* Thread#start()} to automatically inherit this context. This feature is available as it is was
* the default in Brave 3, because some users couldn't control threads in their applications.
*
* <p>This can be a problem in scenarios such as thread pool expansion, leading to data being
* recorded in the wrong span, or spans with the wrong parent. If you are impacted by this,
* switch to {@link #create()}.
*/
public static CurrentTraceContext inheritable() {
return new Default(INHERITABLE);
}

Default(ThreadLocal<TraceContext> local) {
if (local == null) throw new NullPointerException("local == null");
this.local = local;
}

@Override public TraceContext get() {
return local.get();
}
}

CurrentTraceContext.Default提供了两个静态方法,即create()和inheritable()
当使用create方法创建时,local对象为ThreadLocal类型
当使用inheritable方法创建时,local对象为InheritableThreadLocal类型
ThreadLocal可以理解为JVM为同一个线程开辟的一个共享内存空间,在同一个线程中不同方法调用,可以从该空间中取出放入的对象
而当使用InheritableThreadLocal获取线程绑定对象时,当前线程没有,则向当前线程的父线程的共享内存中获取

官方文档指出,inheritable方法在线程池的环境中需谨慎使用,可能会取出错误的TraceContext,这样会导致Span等信息会记录并关联到错误的traceId上

CurrentTraceContext.Scope

1
2
3
4
5
6
7
public abstract Scope newScope(@Nullable TraceContext currentSpan);

/** A span remains in the scope it was bound to until close is called. */
public interface Scope extends Closeable {
/** No exceptions are thrown when unbinding a span scope. */
@Override void close();
}

CurrentTraceContext中还定义了一个Scope接口,该接口继承自Closeable接口
自JDK7开始,凡是实现了Closeable接口的对象,只要在try语句中定义的,当finally执行的时候,JVM都会主动调用其close方法来回收资源,所以CurrentTraceContext中就提供了一个newScope方法,我们在代码里可以这样来用

1
2
3
try (Scope scope = newScope(invocationContext)) {
// do somthing
}

再来看看CurrentTraceContext.Default中是如何实现newScope的

1
2
3
4
5
6
7
8
9
10
@Override public Scope newScope(@Nullable TraceContext currentSpan) {
final TraceContext previous = local.get();
local.set(currentSpan);
class DefaultCurrentTraceContextScope implements Scope {
@Override public void close() {
local.set(previous);
}
}
return new DefaultCurrentTraceContextScope();
}

首先会将当前线程的TraceContext赋值给previous变量,然后设置新的TraceContext到当前线程,当Scope的close方法调用时,会还原previous的值到当前线程中

用两个嵌套的try代码块来演示下上面做法的意义

1
2
3
4
5
6
7
8
9
TraceContext traceContext1;
TraceContext traceContext2;
try (Scope scope = newScope(traceContext1)) {
// 1.此处CurrentTraceContext.get()能获得traceContext1
try (Scope scope = newScope(traceContext2)) {
// 2.此处CurrentTraceContext.get()能获得traceContext2
}
// 3.此处CurrentTraceContext.get()能获得traceContext1
}

  1. 在进入内层try代码块前,通过CurrentTraceContext.get()获取到的traceContext1
  2. 在进入内层try代码块后,通过CurrentTraceContext.get()获取到的traceContext2
  3. 在运行完内层try代码块,通过CurrentTraceContext.get()获取到的traceContext1

这种处理方式确实比较灵活优雅,不过对使用的人来说,也有点过于隐晦,不知道JDK7新特性的同学刚开始看到这种用法可能会一脸茫然

当然这种用法必须得让使用的人将scope对象new在try语句中,每个人都能按照这种约定的规则来写,容易出错,所以CurrentTraceContext中提供了几个对Callable,Runnable的封装方法wrap方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** Wraps the input so that it executes with the same context as now. */
public <C> Callable<C> wrap(Callable<C> task) {
final TraceContext invocationContext = get();
class CurrentTraceContextCallable implements Callable<C> {
@Override public C call() throws Exception {
try (Scope scope = newScope(invocationContext)) {
return task.call();
}
}
}
return new CurrentTraceContextCallable();
}

/** Wraps the input so that it executes with the same context as now. */
public Runnable wrap(Runnable task) {
final TraceContext invocationContext = get();
class CurrentTraceContextRunnable implements Runnable {
@Override public void run() {
try (Scope scope = newScope(invocationContext)) {
task.run();
}
}
}
return new CurrentTraceContextRunnable();
}

CurrentTraceContext还对Executor,及ExecuteService提供了包装方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Decorates the input such that the {@link #get() current trace context} at the time a task is
* scheduled is made current when the task is executed.
*/
public Executor executor(Executor delegate) {
class CurrentTraceContextExecutor implements Executor {
@Override public void execute(Runnable task) {
delegate.execute(CurrentTraceContext.this.wrap(task));
}
}
return new CurrentTraceContextExecutor();
}

/**
* Decorates the input such that the {@link #get() current trace context} at the time a task is
* scheduled is made current when the task is executed.
*/
public ExecutorService executorService(ExecutorService delegate) {
class CurrentTraceContextExecutorService extends brave.internal.WrappingExecutorService {

@Override protected ExecutorService delegate() {
return delegate;
}

@Override protected <C> Callable<C> wrap(Callable<C> task) {
return CurrentTraceContext.this.wrap(task);
}

@Override protected Runnable wrap(Runnable task) {
return CurrentTraceContext.this.wrap(task);
}
}
return new CurrentTraceContextExecutorService();
}

这几个方法都用的是装饰器设计模式,属于比较常用的设计模式,此处就不再展开分析了

ThreadContextCurrentTraceContext

可以看到TraceDemo中,我们设置的CurrentTraceContext是ThreadContextCurrentTraceContext.create()

ThreadContextCurrentTraceContext是为log4j2封装的,是brave-context-log4j2包中的一个类,在ThreadContext中放置traceId和spanId两个属性,我们可以在log4j2的配置文件中配置日志打印的pattern,使用占位符%X{traceId}和%X{spanId},让每行日志都能打印当前的traceId和spanId

zipkin-learning\Chapter1\servlet25\src\main\resources\log4j2.properties

1
appender.console.layout.pattern = %d{ABSOLUTE} [%X{traceId}/%X{spanId}] %-5p [%t] %C{2} - %m%n

pom.xml中需要添加日志相关的jar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<brave.version>4.9.1</brave.version>
<log4j.version>2.8.2</log4j.version>

<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-context-log4j2</artifactId>
<version>${brave.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jul</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>

在Chapter1的例子中,如果你观察frontend和backend的控制台,会有如下输出0cabad9917e767ab为traceId,0cabad9917e767ab和e96a226ce75d30b4为spanId

1
10:11:05,731 [0cabad9917e767ab/0cabad9917e767ab] INFO  [qtp1441410416-17] servlet.FrontendServlet - frontend receive request

1
10:11:05,820 [0cabad9917e767ab/e96a226ce75d30b4] INFO  [qtp1441410416-15] servlet.BackendServlet - backend receive request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public final class ThreadContextCurrentTraceContext extends CurrentTraceContext {
public static ThreadContextCurrentTraceContext create() {
return create(CurrentTraceContext.Default.inheritable());
}

public static ThreadContextCurrentTraceContext create(CurrentTraceContext delegate) {
return new ThreadContextCurrentTraceContext(delegate);
}

final CurrentTraceContext delegate;

ThreadContextCurrentTraceContext(CurrentTraceContext delegate) {
if (delegate == null) throw new NullPointerException("delegate == null");
this.delegate = delegate;
}

@Override public TraceContext get() {
return delegate.get();
}

@Override public Scope newScope(@Nullable TraceContext currentSpan) {
final String previousTraceId = ThreadContext.get("traceId");
final String previousSpanId = ThreadContext.get("spanId");

if (currentSpan != null) {
ThreadContext.put("traceId", currentSpan.traceIdString());
ThreadContext.put("spanId", HexCodec.toLowerHex(currentSpan.spanId()));
} else {
ThreadContext.remove("traceId");
ThreadContext.remove("spanId");
}

Scope scope = delegate.newScope(currentSpan);
class ThreadContextCurrentTraceContextScope implements Scope {
@Override public void close() {
scope.close();
ThreadContext.put("traceId", previousTraceId);
ThreadContext.put("spanId", previousSpanId);
}
}
return new ThreadContextCurrentTraceContextScope();
}
}

ThreadContextCurrentTraceContext继承了CurrentTraceContext,覆盖了其newScope方法,提取了currentSpan中的traceId和spanId放到log4j2的上下文对象ThreadContext中

https://github.com/openzipkin/brave/tree/master/context中还能找到对slf4j和log4j的支持
brave-context-slf4j中的brave.context.slf4j.MDCCurrentTraceContext
brave-context-log4j12中的brave.context.log4j12.MDCCurrentTraceContext
代码都比较类似,这里不细说了

Propagation

Propagation,英文翻译传播器,是一个可以向数据携带的对象carrier上注入(inject)和提取(extract)数据的接口。
对于Http协议来说,通常carrier就是指http request对象,它的http headers可以携带trace信息,一般来说http的客户端会在headers里注入(inject)trace信息,而服务端则会在headers提取(extract)trace信息
Propagation.Setter和Propagation.Getter可以在carrier中设置和获取值
另外还有injector和extractor方法分别返回TraceContext.Injector和TraceContext.Extractor

1
2
3
4
5
6
7
8
9
interface Setter<C, K> {
void put(C carrier, K key, String value);
}
interface Getter<C, K> {
@Nullable String get(C carrier, K key);
}

<C> TraceContext.Injector<C> injector(Setter<C, K> setter);
<C> TraceContext.Extractor<C> extractor(Getter<C, K> getter);

Propagation中还有一个工厂类Propagation.Factory,有一个工厂方法create,通过KeyFactory来创建Propagation对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
abstract class Factory {
public static final Factory B3 = B3Propagation.FACTORY;

public boolean supportsJoin() {
return false;
}

public boolean requires128BitTraceId() {
return false;
}

public abstract <K> Propagation<K> create(KeyFactory<K> keyFactory);
}

interface KeyFactory<K> {
KeyFactory<String> STRING = name -> name;

K create(String name);
}

Propagation的默认实现是B3Propagation
B3Propagation用下面这些http headers来传播trace信息

  • X-B3-TraceId - 128位或者64位的traceId,被编码成32位和16位的小写16进制形式
  • X-B3-SpanId - 64位的spanId,被编码成16位的小写16进制形式
  • X-B3-ParentSpanId - 64位的父级spanId,被编码成16位的小写16进制形式
  • X-B3-Sampled - 1代表采样,0代表不采样,如果没有这个key,则留给header接受端,即服务端自行判断
  • X-B3-Flags - debug,如果为1代表采样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override public <C> TraceContext.Injector<C> injector(Setter<C, K> setter) {
if (setter == null) throw new NullPointerException("setter == null");
return new B3Injector<>(this, setter);
}

static final class B3Injector<C, K> implements TraceContext.Injector<C> {
final B3Propagation<K> propagation;
final Setter<C, K> setter;

B3Injector(B3Propagation<K> propagation, Setter<C, K> setter) {
this.propagation = propagation;
this.setter = setter;
}

@Override public void inject(TraceContext traceContext, C carrier) {
setter.put(carrier, propagation.traceIdKey, traceContext.traceIdString());
setter.put(carrier, propagation.spanIdKey, toLowerHex(traceContext.spanId()));
if (traceContext.parentId() != null) {
setter.put(carrier, propagation.parentSpanIdKey, toLowerHex(traceContext.parentId()));
}
if (traceContext.debug()) {
setter.put(carrier, propagation.debugKey, "1");
} else if (traceContext.sampled() != null) {
setter.put(carrier, propagation.sampledKey, traceContext.sampled() ? "1" : "0");
}
}
}

inject方法中很简单,就是利用Setter将trace信息设置在carrier中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override public <C> TraceContext.Extractor<C> extractor(Getter<C, K> getter) {
if (getter == null) throw new NullPointerException("getter == null");
return new B3Extractor(this, getter);
}

static final class B3Extractor<C, K> implements TraceContext.Extractor<C> {
final B3Propagation<K> propagation;
final Getter<C, K> getter;

B3Extractor(B3Propagation<K> propagation, Getter<C, K> getter) {
this.propagation = propagation;
this.getter = getter;
}

@Override public TraceContextOrSamplingFlags extract(C carrier) {
if (carrier == null) throw new NullPointerException("carrier == null");

String traceId = getter.get(carrier, propagation.traceIdKey);
String sampled = getter.get(carrier, propagation.sampledKey);
String debug = getter.get(carrier, propagation.debugKey);
if (traceId == null && sampled == null && debug == null) {
return TraceContextOrSamplingFlags.EMPTY;
}

// Official sampled value is 1, though some old instrumentation send true
Boolean sampledV = sampled != null
? sampled.equals("1") || sampled.equalsIgnoreCase("true")
: null;
boolean debugV = "1".equals(debug);

String spanId = getter.get(carrier, propagation.spanIdKey);
if (spanId == null) { // return early if there's no span ID
return TraceContextOrSamplingFlags.create(
debugV ? SamplingFlags.DEBUG : SamplingFlags.Builder.build(sampledV)
);
}

TraceContext.Builder result = TraceContext.newBuilder().sampled(sampledV).debug(debugV);
result.traceIdHigh(
traceId.length() == 32 ? lowerHexToUnsignedLong(traceId, 0) : 0
);
result.traceId(lowerHexToUnsignedLong(traceId));
result.spanId(lowerHexToUnsignedLong(spanId));
String parentSpanIdString = getter.get(carrier, propagation.parentSpanIdKey);
if (parentSpanIdString != null) result.parentId(lowerHexToUnsignedLong(parentSpanIdString));
return TraceContextOrSamplingFlags.create(result.build());
}
}

extract方法则利用Getter从carrier中获取trace信息

在TraceDemo中我们设置的propagationFactory是ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, “user-name”)

ExtraFieldPropagation

ExtraFieldPropagation可以用来传输额外的信息

运行Chapter1中的Frontend和Backend服务,在控制台输入

1
curl http://localhost:8081 --header "user-name: zhangsan"

可以看到控制台输出了user-name的值zhangsan

1
Wed Nov 15 11:42:02 GMT+08:00 2017 zhangsan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final class ExtraFieldInjector<C, K> implements Injector<C> {
final Injector<C> delegate;
final Propagation.Setter<C, K> setter;
final Map<String, K> nameToKey;

ExtraFieldInjector(Injector<C> delegate, Setter<C, K> setter, Map<String, K> nameToKey) {
this.delegate = delegate;
this.setter = setter;
this.nameToKey = nameToKey;
}

@Override public void inject(TraceContext traceContext, C carrier) {
for (Object extra : traceContext.extra()) {
if (extra instanceof Extra) {
((Extra) extra).setAll(carrier, setter, nameToKey);
break;
}
}
delegate.inject(traceContext, carrier);
}
}

ExtraFieldInjector的inject方法中,将traceContext的extra数据,set到carrier中,这里的Extra对象,其实就是key-value,有One和Many两种,Many时就相当于Map结构
在Extra中setAll方法中,先用extra的name去nameToKey里找,如果没有就不设置,如果找到就调用setter的put方法将值设置到carrier中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static final class One extends Extra {
String name, value;

@Override void put(String name, String value) {
this.name = name;
this.value = value;
}

@Override String get(String name) {
return name.equals(this.name) ? value : null;
}

@Override <C, K> void setAll(C carrier, Setter<C, K> setter, Map<String, K> nameToKey) {
K key = nameToKey.get(name);
if (key == null) return;
setter.put(carrier, key, value);
}

@Override public String toString() {
return "ExtraFieldPropagation{" + name + "=" + value + "}";
}
}

static final class Many extends Extra {
final LinkedHashMap<String, String> fields = new LinkedHashMap<>();

@Override void put(String name, String value) {
fields.put(name, value);
}

@Override String get(String name) {
return fields.get(name);
}

@Override <C, K> void setAll(C carrier, Setter<C, K> setter, Map<String, K> nameToKey) {
for (Map.Entry<String, String> field : fields.entrySet()) {
K key = nameToKey.get(field.getKey());
if (key == null) continue;
setter.put(carrier, nameToKey.get(field.getKey()), field.getValue());
}
}

@Override public String toString() {
return "ExtraFieldPropagation" + fields;
}
}

ExtraFieldExtractor的extract方法中,循环names去carrier里找,然后构造Extra数据放入delegate执行extract方法后的结果中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class ExtraFieldExtractor<C, K> implements Extractor<C> {
final Extractor<C> delegate;
final Propagation.Getter<C, K> getter;
final Map<String, K> names;

ExtraFieldExtractor(Extractor<C> delegate, Getter<C, K> getter, Map<String, K> names) {
this.delegate = delegate;
this.getter = getter;
this.names = names;
}

@Override public TraceContextOrSamplingFlags extract(C carrier) {
TraceContextOrSamplingFlags result = delegate.extract(carrier);

Extra extra = null;
for (Map.Entry<String, K> field : names.entrySet()) {
String maybeValue = getter.get(carrier, field.getValue());
if (maybeValue == null) continue;
if (extra == null) {
extra = new One();
} else if (extra instanceof One) {
One one = (One) extra;
extra = new Many();
extra.put(one.name, one.value);
}
extra.put(field.getKey(), maybeValue);
}
if (extra == null) return result;
return result.toBuilder().addExtra(extra).build();
}
}

至此,Tracing类相关的源代码已分析的差不多了,后续博文中,我们会继续分析Brave跟各大框架整合的源代码

分享到

Java分布式跟踪系统Zipkin(二):Brave源码分析-Tracer和Span

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

Brave是Java版的Zipkin客户端,它将收集的跟踪信息,以Span的形式上报给Zipkin系统。

(Zipkin是基于Google的一篇论文,名为Dapper,Dapper在荷兰语里是“勇敢的”的意思,这也是Brave的命名的原因)

Brave目前版本为4.9.1,兼容zipkin1和2的协议,github地址:https://github.com/openzipkin/brave

我们一般不会手动编写Trace相关的代码,Brave提供了一些开箱即用的库,来帮助我们对某些特定的库类来进行追踪,比如servlet,springmvc,mysql,okhttp3,httpclient等,这些都可以在下面页面中找到:

https://github.com/openzipkin/brave/tree/master/instrumentation

我们先来看看一个简单的Demo来演示下Brave的基本使用,这对我们后续分析Brave的原理和其他类库的使用有很大帮助

TraceDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package tracing;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.context.log4j2.ThreadContextCurrentTraceContext;
import brave.propagation.B3Propagation;
import brave.propagation.ExtraFieldPropagation;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Sender;
import zipkin2.reporter.okhttp3.OkHttpSender;

import java.util.concurrent.TimeUnit;

public class TraceDemo {

public static void main(String[] args) {
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.builder(sender)
.closeTimeout(500, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);

Tracing tracing = Tracing.newBuilder()
.localServiceName("tracer-demo")
.spanReporter(asyncReporter)
.propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))
.currentTraceContext(ThreadContextCurrentTraceContext.create())
.build();

Tracer tracer = tracing.tracer();
Span span = tracer.newTrace().name("encode").start();
try {
doSomethingExpensive();
} finally {
span.finish();
}


Span twoPhase = tracer.newTrace().name("twoPhase").start();
try {
Span prepare = tracer.newChild(twoPhase.context()).name("prepare").start();
try {
prepare();
} finally {
prepare.finish();
}
Span commit = tracer.newChild(twoPhase.context()).name("commit").start();
try {
commit();
} finally {
commit.finish();
}
} finally {
twoPhase.finish();
}


sleep(1000);

}

private static void doSomethingExpensive() {
sleep(500);
}

private static void commit() {
sleep(500);
}

private static void prepare() {
sleep(500);
}

private static void sleep(long milliseconds) {
try {
TimeUnit.MILLISECONDS.sleep(milliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

启动Zipkin,然后运行TraceDemo,在Zipkin的UI界面中能查到两条跟踪信息

点击第一条跟踪信息,可以看到有一条Span(encode),耗时500ms左右
encode跟踪信息

本条跟踪信息对应的代码片段为:

1
2
3
4
5
6
7
Tracer tracer = tracing.tracer();
Span span = tracer.newTrace().name("encode").start();
try {
doSomethingExpensive();
} finally {
span.finish();
}

由Tracer创建一个新的Span,名为encode,然后调用start方法开始计时,之后运行一个比较耗时的方法doSomethingExpensive,最后调用finish方法结束计时,完成并记录一条跟踪信息。

这段代码实际上向Zipkin上报的数据为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[
{
"traceId": "16661f6cb5d58903",
"id": "16661f6cb5d58903",
"name": "encode",
"timestamp": 1510043590522358,
"duration": 499867,
"binaryAnnotations": [
{
"key": "lc",
"value": "",
"endpoint": {
"serviceName": "tracer-demo",
"ipv4": "192.168.99.1"
}
}
]
}
]

然后我们再来看第二条稍微复杂的跟踪信息,可以看到一条名为twoPhase的Span,总耗时为1000ms,它有2个子Span,分别名为prepare和commit,两者分别耗时500ms

twoPhase跟踪信息

这条跟踪信息对应的代码片段为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Span twoPhase = tracer.newTrace().name("twoPhase").start();
try {
Span prepare = tracer.newChild(twoPhase.context()).name("prepare").start();
try {
prepare();
} finally {
prepare.finish();
}
Span commit = tracer.newChild(twoPhase.context()).name("commit").start();
try {
commit();
} finally {
commit.finish();
}
} finally {
twoPhase.finish();
}

这段代码实际上向Zipkin上报的数据为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
[
{
"traceId": "89e051d5394b90b1",
"id": "89e051d5394b90b1",
"name": "twophase",
"timestamp": 1510043591038983,
"duration": 1000356,
"binaryAnnotations": [
{
"key": "lc",
"value": "",
"endpoint": {
"serviceName": "tracer-demo",
"ipv4": "192.168.99.1"
}
}
]
},
{
"traceId": "89e051d5394b90b1",
"id": "60568c4903793b8d",
"name": "prepare",
"parentId": "89e051d5394b90b1",
"timestamp": 1510043591039919,
"duration": 499246,
"binaryAnnotations": [
{
"key": "lc",
"value": "",
"endpoint": {
"serviceName": "tracer-demo",
"ipv4": "192.168.99.1"
}
}
]
},
{
"traceId": "89e051d5394b90b1",
"id": "ce14448169d01d2f",
"name": "commit",
"parentId": "89e051d5394b90b1",
"timestamp": 1510043591539304,
"duration": 499943,
"binaryAnnotations": [
{
"key": "lc",
"value": "",
"endpoint": {
"serviceName": "tracer-demo",
"ipv4": "192.168.99.1"
}
}
]
}
]

Span

首先看下Span的实现类RealSpan

该类依赖几个核心类

Recorder,用于记录Span

Reporter,用于上报Span给Zipkin

MutableSpan,Span的包装类,提供各种API操作Span

MutableSpanMap,以TraceContext为Key,MutableSpan为Value的Map结构,用于内存中存放所有的Span

RealSpan两个核心方法start, finish

1
2
3
4
5
6
7
8
public Span start(long timestamp) {
recorder().start(context(), timestamp);
return this;
}

public void finish(long timestamp) {
recorder().finish(context(), timestamp);
}

分别调用Recorder的start和finish方法,获取跟TraceContext绑定的Span信息,记录开始时间和结束时间,并在结束时,调用reporter的report方法,上报给Zipkin

1
2
3
4
5
6
7
8
9
10
11
12
13
public void start(TraceContext context, long timestamp) {
if (noop.get()) return;
spanMap.getOrCreate(context).start(timestamp);
}

public void finish(TraceContext context, long finishTimestamp) {
MutableSpan span = spanMap.remove(context);
if (span == null || noop.get()) return;
synchronized (span) {
span.finish(finishTimestamp);
reporter.report(span.toSpan());
}
}

BoundedAsyncReporter

Reporter的实现类AsyncReporter,而AsyncReporter的实现类是BoundedAsyncReporter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
final AtomicBoolean closed = new AtomicBoolean(false);
final BytesEncoder<S> encoder;
final ByteBoundedQueue pending;
final Sender sender;
final int messageMaxBytes;
final long messageTimeoutNanos;
final long closeTimeoutNanos;
final CountDownLatch close;
final ReporterMetrics metrics;

BoundedAsyncReporter(Builder builder, BytesEncoder<S> encoder) {
this.pending = new ByteBoundedQueue(builder.queuedMaxSpans, builder.queuedMaxBytes);
this.sender = builder.sender;
this.messageMaxBytes = builder.messageMaxBytes;
this.messageTimeoutNanos = builder.messageTimeoutNanos;
this.closeTimeoutNanos = builder.closeTimeoutNanos;
this.close = new CountDownLatch(builder.messageTimeoutNanos > 0 ? 1 : 0);
this.metrics = builder.metrics;
this.encoder = encoder;
}
}

BoundedAsyncReporter中的几个重要的类:

  • BytesEncoder - Span的编码器,将Span编码成二进制,便于sender发送给Zipkin
  • ByteBoundedQueue - 类似于BlockingQueue,是一个既有数量限制,又有字节数限制的阻塞队列
  • Sender - 将编码后的二进制数据,发送给Zipkin
  • ReporterMetrics - Span的report相关的统计信息
  • BufferNextMessage - Consumer,Span信息的消费者,依靠Sender上报Span信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
if (encoder == null) throw new NullPointerException("encoder == null");

if (encoder.encoding() != sender.encoding()) {
throw new IllegalArgumentException(String.format(
"Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
}

final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);

if (messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop.
final BufferNextMessage consumer =
new BufferNextMessage(sender, messageMaxBytes, messageTimeoutNanos);
final Thread flushThread = new Thread(() -> {
try {
while (!result.closed.get()) {
result.flush(consumer);
}
} finally {
for (byte[] next : consumer.drain()) result.pending.offer(next);
result.close.countDown();
}
}, "AsyncReporter(" + sender + ")");
flushThread.setDaemon(true);
flushThread.start();
}
return result;
}

当messageTimeoutNanos大于0时,启动一个守护线程flushThread,一直循环调用BoundedAsyncReporter的flush方法,将内存中的Span信息上报给Zipkin
而当messageTimeoutNanos等于0时,客户端需要手动调用flush方法来上报Span信息

再来看下BoundedAsyncReporter中的close方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override public void close() {
if (!closed.compareAndSet(false, true)) return; // already closed
try {
// wait for in-flight spans to send
if (!close.await(closeTimeoutNanos, TimeUnit.NANOSECONDS)) {
logger.warning("Timed out waiting for in-flight spans to send");
}
} catch (InterruptedException e) {
logger.warning("Interrupted waiting for in-flight spans to send");
Thread.currentThread().interrupt();
}
int count = pending.clear();
if (count > 0) {
metrics.incrementSpansDropped(count);
logger.warning("Dropped " + count + " spans due to AsyncReporter.close()");
}
}

这个close方法和FlushThread中while循环相呼应,在close方法中,首先将closed变量置为true,然后调用close.await(),等待close信号量(CountDownLatch)的释放,此处代码会阻塞,一直到FlushThread中finally中调用result.close.countDown();
而在close方法中将closed变量置为true后,FlushThread中的while循环将结束执行,然后执行finally代码块,系统会将内存中还未上报的Span,添加到queue(result.pending)中,然后调用result.close.countDown(); close方法中阻塞的代码会继续执行,将调用metrics.incrementSpansDropped(count)将这些Span的数量添加到metrics统计信息中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override public void report(S span) {
if (span == null) throw new NullPointerException("span == null");

metrics.incrementSpans(1);
byte[] next = encoder.encode(span);
int messageSizeOfNextSpan = sender.messageSizeInBytes(Collections.singletonList(next));
metrics.incrementSpanBytes(next.length);
if (closed.get() ||
// don't enqueue something larger than we can drain
messageSizeOfNextSpan > messageMaxBytes ||
!pending.offer(next)) {
metrics.incrementSpansDropped(1);
}
}

前面看到在Recorder的finish方法中,会调用Reporter的report方法,此处report方法,将span转化成字节数组,然后计算出messageSize,添加到queue(pending)中,并记录相应的统计信息

接下来看看两个flush方法,其中flush()方法,是public的,供外部手动调用,而flush(BufferNextMessage bundler)是在FlushThread中循环调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override public final void flush() {
flush(new BufferNextMessage(sender, messageMaxBytes, 0));
}

void flush(BufferNextMessage bundler) {
if (closed.get()) throw new IllegalStateException("closed");

//将队列中的数据,全部提取到BufferNextMessage中,直到buffer(bundler)满为止
pending.drainTo(bundler, bundler.remainingNanos());

// record after flushing reduces the amount of gauge events vs on doing this on report
metrics.updateQueuedSpans(pending.count);
metrics.updateQueuedBytes(pending.sizeInBytes);

// loop around if we are running, and the bundle isn't full
// if we are closed, try to send what's pending
if (!bundler.isReady() && !closed.get()) return;

// Signal that we are about to send a message of a known size in bytes
metrics.incrementMessages();
metrics.incrementMessageBytes(bundler.sizeInBytes());
List<byte[]> nextMessage = bundler.drain();

try {
sender.sendSpans(nextMessage).execute();
} catch (IOException | RuntimeException | Error t) {
// In failure case, we increment messages and spans dropped.
int count = nextMessage.size();
Call.propagateIfFatal(t);
metrics.incrementMessagesDropped(t);
metrics.incrementSpansDropped(count);
if (logger.isLoggable(FINE)) {
logger.log(FINE,
format("Dropped %s spans due to %s(%s)", count, t.getClass().getSimpleName(),
t.getMessage() == null ? "" : t.getMessage()), t);
}
// Raise in case the sender was closed out-of-band.
if (t instanceof IllegalStateException) throw (IllegalStateException) t;
}
}

flush中大致分下面几步

  1. 先将队列pending中的数据,全部提取到BufferNextMessage(bundler)中,直到bundler满为止
  2. 当bundler准备好,即isReady()返回true,将bundler中的message全部取出来
  3. 将取出来的所有message,调用Sender的sendSpans方法,发送到Zipkin

ByteBoundedQueue

类似于BlockingQueue,是一个既有数量限制,又有字节数限制的阻塞队列,提供了offer,drainTo,clear三个方法,供调用者向queue里存放,提取和清空数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final class ByteBoundedQueue {

final ReentrantLock lock = new ReentrantLock(false);
final Condition available = lock.newCondition();

final int maxSize;
final int maxBytes;

final byte[][] elements;
int count;
int sizeInBytes;
int writePos;
int readPos;

ByteBoundedQueue(int maxSize, int maxBytes) {
this.elements = new byte[maxSize][];
this.maxSize = maxSize;
this.maxBytes = maxBytes;
}
}

ByteBoundedQueue接受两个int参数,maxSize是queue接受的最大数量,maxBytes是queue接受的最大字节数
ByteBoundedQueue中使用一个二维byte数组elements来存储message,并使用writePos和readPos两个游标,分别记录写和读的位置
ByteBoundedQueue中使用了最典型的可重入锁ReentrantLock,使offer,drainTo,clear等方法是线程安全的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Returns true if the element could be added or false if it could not due to its size.
*/
boolean offer(byte[] next) {
lock.lock();
try {
if (count == elements.length) return false;
if (sizeInBytes + next.length > maxBytes) return false;

elements[writePos++] = next;

if (writePos == elements.length) writePos = 0; // circle back to the front of the array

count++;
sizeInBytes += next.length;

available.signal(); // alert any drainers
return true;
} finally {
lock.unlock();
}
}

offer方法是添加message到queue中,使用了标准的try-lock结构,即先获取锁,然后finally里释放锁,在获取锁以后
当count等于elements.length时,意味着queue是满的,则不能继续添加
当sizeInBytes + next.length > maxBytes时,意味着该消息加进队列会超出队列字节大小限制,也不能添加新message
如果上面两个条件都不满足,则表明可以继续添加message,将writePos+1,并将message放于writePos+1处
当writePos到达数组尾部,则将writePos置为0,让下一次添加从数组头部开始
然后将count计数器加1,并更新字节总数
最后调用available.signal()来通知其他在lock上等待的线程(在drainTo方法中阻塞的线程)继续竞争线程资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** Blocks for up to nanosTimeout for elements to appear. Then, consume as many as possible. */
int drainTo(Consumer consumer, long nanosTimeout) {
try {
// This may be called by multiple threads. If one is holding a lock, another is waiting. We
// use lockInterruptibly to ensure the one waiting can be interrupted.
lock.lockInterruptibly();
try {
long nanosLeft = nanosTimeout;
while (count == 0) {
if (nanosLeft <= 0) return 0;
nanosLeft = available.awaitNanos(nanosLeft);
}
return doDrain(consumer);
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
return 0;
}
}

drainTo方法是提取message到Consumer中消费,如果当时queue里没有消息,则每次等待nanosTimeout,直到queue里存入消息为止
当while循环退出,表明queue中已经有新的message添加进来,可以消费,则调用doDrain方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int doDrain(Consumer consumer) {
int drainedCount = 0;
int drainedSizeInBytes = 0;
while (drainedCount < count) {
byte[] next = elements[readPos];

if (next == null) break;
if (consumer.accept(next)) {
drainedCount++;
drainedSizeInBytes += next.length;

elements[readPos] = null;
if (++readPos == elements.length) readPos = 0; // circle back to the front of the array
} else {
break;
}
}
count -= drainedCount;
sizeInBytes -= drainedSizeInBytes;
return drainedCount;
}

doDrain里依然是一个while循环,当drainedCount小于count,即提取的message数量总数小于queue里消息总数时,尝试调用consumer.accept方法
如果accept方法返回true,则将drainedCount加1,并且drainedSizeInBytes加上当前消息的字节数
如果accept方法返回false,则跳出循环,将queue的count减掉提取的总消息数drainedCount,sizeInBytes减去提取的总字节数drainedSizeInBytes

1
2
3
4
5
6
7
8
9
10
11
int clear() {
lock.lock();
try {
int result = count;
count = sizeInBytes = readPos = writePos = 0;
Arrays.fill(elements, null);
return result;
} finally {
lock.unlock();
}
}

clear方法,清空队列,这个方法比较简单,就是将所有东西清零,该方法在Reporter的close方法中会被使用

BufferNextMessage

BufferNextMessage是ByteBoundedQueue.Consumer的默认实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final class BufferNextMessage implements ByteBoundedQueue.Consumer {
private final Sender sender;
private final int maxBytes;
private final long timeoutNanos;
private final List<byte[]> buffer = new LinkedList<>();

long deadlineNanoTime;
int sizeInBytes;
boolean bufferFull;

BufferNextMessage(Sender sender, int maxBytes, long timeoutNanos) {
this.sender = sender;
this.maxBytes = maxBytes;
this.timeoutNanos = timeoutNanos;
}
}

BufferNextMessage中使用一个LinkedList来存储接收的messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public boolean accept(byte[] next) {
buffer.add(next); // speculatively add to the buffer so we can size it
int x = sender.messageSizeInBytes(buffer);
int y = maxBytes;
int includingNextVsMaxBytes = (x < y) ? -1 : ((x == y) ? 0 : 1);

// If we can fit queued spans and the next into one message...
if (includingNextVsMaxBytes <= 0) {
sizeInBytes = x;

if (includingNextVsMaxBytes == 0) {
bufferFull = true;
}
return true;
} else {
buffer.remove(buffer.size() - 1);
return false; // we couldn't fit the next message into this buffer
}
}

accept方法,先将message放入buffer,然后调用sender的messageSizeInBytes方法统计下所有buffer消息的总字节数includingNextVsMaxBytes
当includingNextVsMaxBytes大于该buffer的最大字节数maxBytes,则将加入到buffer的message移除
当includingNextVsMaxBytes等于该buffer的最大字节数maxBytes,则将该buffer标记为已满状态,即bufferFull = true

1
2
3
4
5
6
7
8
9
10
long remainingNanos() {
if (buffer.isEmpty()) {
deadlineNanoTime = System.nanoTime() + timeoutNanos;
}
return Math.max(deadlineNanoTime - System.nanoTime(), 0);
}

boolean isReady() {
return bufferFull || remainingNanos() <= 0;
}

remainingNanos方法中,当buffer为空,则重置一个deadlineNanoTime,其值为当前系统时间加上timeoutNanos,当系统时间超过这个时间或者buffer满了的时候, isReady会返回true,即buffer为准备就绪状态

1
2
3
4
5
6
7
8
9
List<byte[]> drain() {
if (buffer.isEmpty()) return Collections.emptyList();
ArrayList<byte[]> result = new ArrayList<>(buffer);
buffer.clear();
sizeInBytes = 0;
bufferFull = false;
deadlineNanoTime = 0;
return result;
}

drain方法返回buffer里的所有数据,并将buffer清空

isReady方法和drain方法,在BoundedAsyncReporter的flush方法中会被使用

1
2
3
4
5
6
7
8
void flush(BufferNextMessage bundler) {
// ...
if (!bundler.isReady() && !closed.get()) return;
// ...
List<byte[]> nextMessage = bundler.drain();
// ...
sender.sendSpans(nextMessage).execute();
}

因为flush是会一直不间断被调用,而这里先调用bundler.isReady()方法,当返回true后才取出所有堆积的消息,一起打包发送给zipkin提高效率

再回过头来看看BoundedAsyncReporter里手动flush方法

1
2
3
@Override public final void flush() {
flush(new BufferNextMessage(sender, messageMaxBytes, 0));
}

在我们分析完BufferNextMessage源代码后,我们很容易得出结论:这里构造BufferNextMessage传入的timeoutNanos为0,所以BufferNextMessage的isReady()方法会永远返回true。
这意味着每次我们手动调用flush方法,会立即将queue的数据用BufferNextMessage填满,并打包发送给Zipkin,至于queue里剩下的数据,需要等到下次FlushThread循环执行flush方法的时候被发送

至此,我们已经分析过Tracer和Span相关的源代码,这对我们后续看Brave和其他框架整合有很大帮助:
Span/RealSpan
Recorder
Reporter/AsyncReporter/BoundedAsyncReporter
BufferNextMessage
ByteBoundedQueue

在下一篇博文中,会继续分析Tracing的初始化过程,以及相关源代码

分享到

Java分布式跟踪系统Zipkin(一):初识Zipkin

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

在2010年,谷歌发表了其内部使用的分布式跟踪系统Dapper的论文,讲述了Dapper在谷歌内部两年的演变和设计、运维经验。Twitter也根据该论文开发了自己的分布式跟踪系统Zipkin,并将其开源。
论文地址:http://static.googleusercontent.com/media/research.google.com/zh-CN/archive/papers/dapper-2010-1.pdf
译文地址:http://bigbully.github.io/Dapper-translation/

分布式跟踪系统还有其他比较成熟的实现,例如:Naver的Pinpoint、Apache的HTrace、阿里的鹰眼Tracing、京东的Hydra、新浪的Watchman,美团点评的CAT,skywalking等。

本系列博文,主要以Zipkin为主,介绍Zipkin的基本使用,原理,以及部分核心源代码的分析,当前Zipkin版本为2.2.1

概述

Zipkin是一款开源的分布式实时数据追踪系统(Distributed Tracking System),基于 Google Dapper的论文设计而来,由 Twitter 公司开发贡献。其主要功能是聚集来自各个异构系统的实时监控数据。

架构图

如上图所示,各业务系统在彼此调用时,将特定的跟踪消息传递至zipkin,zipkin在收集到跟踪信息后将其聚合处理、存储、展示等,用户可通过web UI方便获得网络延迟、调用链路、系统依赖等等。

Zipkin主要包括四个模块

  • Collector 接收或收集各应用传输的数据
  • Storage 存储接受或收集过来的数据,当前支持Memory,MySQL,Cassandra,ElasticSearch等,默认存储在内存中。
  • API(Query) 负责查询Storage中存储的数据,提供简单的JSON API获取数据,主要提供给web UI使用
  • Web 提供简单的web界面

Instrumented Client 和Instrumented Server,是指分布式架构中使用了Trace工具的两个应用,Client会调用Server提供的服务,两者都会向Zipkin上报Trace相关信息。在Client 和 Server通过Transport上报Trace信息后,由Zipkin的Collector模块接收,并由Storage模块将数据存储在对应的存储介质中,然后Zipkin提供API供UI界面查询Trace跟踪信息。
Non-Instrumented Server,指的是未使用Trace工具的Server,显然它不会上报Trace信息。

流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
┌─────────────┐ ┌───────────────────────┐  ┌─────────────┐  ┌──────────────────┐
│ User Code │ │ Trace Instrumentation │ │ Http Client │ │ Zipkin Collector │
└─────────────┘ └───────────────────────┘ └─────────────┘ └──────────────────┘
│ │ │ │
┌─────────┐
│ ──┤GET /foo ├─▶ │ ────┐ │ │
└─────────┘ │ record tags
│ │ ◀───┘ │ │
────┐
│ │ │ add trace headers │ │
◀───┘
│ │ ────┐ │ │
│ record timestamp
│ │ ◀───┘ │ │
┌─────────────────┐
│ │ ──┤GET /foo ├─▶ │ │
│X-B3-TraceId: aa │ ────┐
│ │ │X-B3-SpanId: 6b │ │ │ │
└─────────────────┘ │ invoke
│ │ │ │ request │

│ │ │ │ │
┌────────┐ ◀───┘
│ │ ◀─────┤200 OK ├─────── │ │
────┐ └────────┘
│ │ │ record duration │ │
┌────────┐ ◀───┘
│ ◀──┤200 OK ├── │ │ │
└────────┘ ┌────────────────────────────────┐
│ │ ──┤ asynchronously report span ├────▶ │
│ │
│{ │
│ "traceId": "aa", │
│ "id": "6b", │
│ "name": "get", │
│ "timestamp": 1483945573944000,│
│ "duration": 386000, │
│ "annotations": [ │
│--snip-- │
└────────────────────────────────┘

由上图可以看出,应用的代码(User Code)发起Http Get请求(请求路径/foo),经过Trace框架(Trace Instrumentation)拦截,并依次经过如下步骤,记录Trace信息到Zipkin中:

  1. 记录tags信息
  2. 将当前调用链的Trace信息记录到Http Headers中
  3. 记录当前调用的时间戳(timestamp)
  4. 发送http请求,并携带Trace相关的Header,如X-B3-TraceId:aa,X-B3-SpandId:6b
  5. 调用结束后,记录当次调用所花的时间(duration)
  6. 将步骤1-5,汇总成一个Span(最小的Trace单元),异步上报该Span信息给Zipkin Collector

Zipkin的几个基本概念

Span:基本工作单元,一次链路调用(可以是RPC,DB等没有特定的限制)创建一个span,通过一个64位ID标识它, span通过还有其他的数据,例如描述信息,时间戳,key-value对的(Annotation)tag信息,parent-id等,其中parent-id 可以表示span调用链路来源,通俗的理解span就是一次请求信息

Trace:类似于树结构的Span集合,表示一条调用链路,存在唯一标识,即TraceId

Annotation:注解,用来记录请求特定事件相关信息(例如时间),通常包含四个注解信息

  • cs - Client Start,表示客户端发起请求
  • sr - Server Receive,表示服务端收到请求
  • ss - Server Send,表示服务端完成处理,并将结果发送给客户端
  • cr - Client Received,表示客户端获取到服务端返回信息

BinaryAnnotation:提供一些额外信息,一般以key-value对出现

安装

本系列博文使用的Zipkin版本为2.2.1,所需JDK为1.8

下载最新的ZIpkin的jar包,并运行

1
2
wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'
java -jar zipkin.jar

还可以使用docker,具体操作请参考:

https://github.com/openzipkin/docker-zipkin

启动成功后浏览器访问

http://localhost:9411/

打开Zipkin的Web UI界面

Zipkin Web UI

下面用一个简单的Web应用来演示如何向Zipkin上报追踪数据

代码地址:https://gitee.com/mozhu/zipkin-learning

在Chapter1/servlet25中,演示了如何在传统的Servlet项目中使用Brave框架,向Zipkin上传Trace数据

分别运行

1
mvn jetty:run -Pbackend

1
mvn jetty:run -Pfrontend

则会启动两个端口为8081和9000的服务,Frontend会发送请求到Backend,Backend返回当前时间

Frontend: http://localhost:8081/

Backend: http://localhost:9000/api

浏览器访问 http://localhost:8081/ 会显示当前时间

Fri Nov 03 18:43:00 GMT+08:00 2017

打开Zipkin Web UI界面,点击 Find Traces,显示如下界面:
Find Traces

继续点击,查看详情,界面如下:
Traces

可以看到Frontend调用Backend的跟踪链信息,Frontend整个过程耗时113.839ms,其中调用Backend服务耗时67.805ms

点击左侧跟踪栈的frontend和backend,分别打开每条跟踪栈的详细信息
frontend跟踪栈信息
backend跟踪栈信息

点击页面右上角的JSON,可以看到该Trace的所有数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
[
{
"traceId": "f3e648a459e6c685",
"id": "f3e648a459e6c685",
"name": "get",
"timestamp": 1509771706395235,
"duration": 113839,
"annotations": [
{
"timestamp": 1509771706395235,
"value": "sr",
"endpoint": {
"serviceName": "frontend",
"ipv4": "192.168.1.8"
}
},
{
"timestamp": 1509771706509074,
"value": "ss",
"endpoint": {
"serviceName": "frontend",
"ipv4": "192.168.1.8"
}
}
],
"binaryAnnotations": [
{
"key": "ca",
"value": true,
"endpoint": {
"serviceName": "",
"ipv6": "::1",
"port": 55037
}
},
{
"key": "http.path",
"value": "/",
"endpoint": {
"serviceName": "frontend",
"ipv4": "192.168.1.8"
}
}
]
},
{
"traceId": "f3e648a459e6c685",
"id": "2ce51fa654dd0c2f",
"name": "get",
"parentId": "f3e648a459e6c685",
"timestamp": 1509771706434207,
"duration": 67805,
"annotations": [
{
"timestamp": 1509771706434207,
"value": "cs",
"endpoint": {
"serviceName": "frontend",
"ipv4": "192.168.1.8"
}
},
{
"timestamp": 1509771706479391,
"value": "sr",
"endpoint": {
"serviceName": "backend",
"ipv4": "192.168.1.8"
}
},
{
"timestamp": 1509771706495481,
"value": "ss",
"endpoint": {
"serviceName": "backend",
"ipv4": "192.168.1.8"
}
},
{
"timestamp": 1509771706502012,
"value": "cr",
"endpoint": {
"serviceName": "frontend",
"ipv4": "192.168.1.8"
}
}
],
"binaryAnnotations": [
{
"key": "ca",
"value": true,
"endpoint": {
"serviceName": "",
"ipv4": "127.0.0.1",
"port": 55038
}
},
{
"key": "http.path",
"value": "/api",
"endpoint": {
"serviceName": "frontend",
"ipv4": "192.168.1.8"
}
},
{
"key": "http.path",
"value": "/api",
"endpoint": {
"serviceName": "backend",
"ipv4": "192.168.1.8"
}
},
{
"key": "sa",
"value": true,
"endpoint": {
"serviceName": "",
"ipv4": "127.0.0.1",
"port": 9000
}
}
]
}
]

点击Dependencies页面,可以看到下图,frontend和backend的依赖关系图

frontend和backend的依赖关系图

在复杂的调用链路中假设存在一条调用链路响应缓慢,如何定位其中延迟高的服务呢?
在使用分布式跟踪系统之前,我们一般只能依次分析调用链路上各个系统中的日志文件,
而在使用了Zipkin提供的WebUI界面后,我们很容易搜索出一个调用链路中延迟高的服务

后面博文中会详细介绍Zipkin的用法原理,以及和我们现有的系统框架整合。

分享到

Apache+Tomcat负载均衡(二):mod_jk

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

环境准备

Apache版本:2.4.18
下载地址:http://httpd.apache.org/download.cgi
下载mod_jk相关包
http://apache.fayea.com/tomcat/tomcat-connectors/jk/binaries/windows/

apache tomcat connector下载

注意:mod_jk必须和apache版本一致,本文因为采用的是apache2.4,操作系统是64位的,所以下载下面的版本
http://apache.fayea.com/tomcat/tomcat-connectors/jk/binaries/windows/tomcat-connectors-1.2.40-windows-x86_64-httpd-2.4.x.zip

Tomcat版本:6.0.44
下载地址:
http://tomcat.apache.org/download-60.cgi

JDK:1.6
OS:Windows 7 x64

部署WebDemo到三台Tomcat中,并启动,可以参考
Apache+Tomcat负载均衡(一):mod_proxy

将mod_jk.so文件放于/modules中,并修改/conf/httpd.conf
添加一行

1
Include conf/jk.conf

如果配置了mod_proxy,请注释掉防止冲突或干扰

#Include conf/extra/httpd-proxy.conf

/conf目录下建立文件jk.conf,其内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
##
## Configuration for JK
##

# Load mod_jk module
# Update this path to match your modules location
LoadModule jk_module modules/mod_jk.so
# Where to find workers.properties
# Update this path to match your conf directory location (put workers.properties next to httpd.conf)
JkWorkersFile conf/workers.properties
# Where to put jk shared memory
# Update this path to match your local state directory or logs directory
JkShmFile logs/mod_jk.shm
#This directive is only allowed inside VirtualHost (with value "On") and in the global server (with value "All")
#The default is Off, so no mounts will be inherited from the global server to any VirtualHost
JkMountCopy All
#This directive can be used multiple times per virtual server.
#The default value is "ForwardURIProxy" since version 1.2.24. It was "ForwardURICompatUnparsed" in version 1.2.23 and "ForwardURICompat" until version 1.2.22
JkOptions +ForwardURIProxy
# Where to put jk logs
# Update this path to match your logs directory location (put mod_jk.log next to access_log)
JkLogFile logs/mod_jk.log
# Set the jk log level [debug/error/info]
JkLogLevel info
# Select the timestamp log format
JkLogStampFormat "[%a %b %d %H:%M:%S %Y] "

# unmount option
#JkUnMount /*.jpg lb_worker
#JkUnMount /*.gif lb_worker
#JkUnMount /*.png lb_worker
#JkUnMount /*.htm lb_worker

<Location /djkstatus/>
JkMount jkstatus
Require all granted
</Location>

JkMount /WebDemo/* lb_worker

/conf目录下建立文件workers.properties,其内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#
# The workers that jk should create and work with
#
worker.list=lb_worker,jkstatus,worker1,worker2,worker3
worker.lb_worker.type=lb
worker.lb_worker.balance_workers=worker1,worker2,worker3
#worker1
worker.worker1.type=ajp13
worker.worker1.host=127.0.0.1
worker.worker1.port=8019
worker.worker1.lbfactor=10

#worker2
worker.worker2.type=ajp13
worker.worker2.host=127.0.0.1
worker.worker2.port=8029
worker.worker2.lbfactor=10

#worker3
worker.worker3.type=ajp13
worker.worker3.host=127.0.0.1
worker.worker3.port=8039
worker.worker3.lbfactor=10

worker.jkstatus.type=status
worker.jkstatus.mount=/djkstatus

打开浏览器访问:
http://localhost/WebDemo/index.jsp
不断刷新,可以看到
Current Server: 1
不断从1-3变化,则表明,负载均衡配置成功

打开jkstatus集群管理页面:
http://localhost/djkstatus

djkstatus

分享到

Apache+Tomcat负载均衡(一):mod_proxy

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

环境准备

Apache版本:2.4.18
下载地址:http://httpd.apache.org/download.cgi

Tomcat版本:6.0.44
下载地址:
http://tomcat.apache.org/download-60.cgi

JDK:1.6
OS:Windows 7 x64

安装Apache和Tomcat

解压Apache到D:/Apache目录,后文以APACHE_HOME替代该目录
解压Tomcat到D:/TomcatTest目录,即:D:/TomcatTest/tomcat,后文以TOMCAT_HOME替代该目录

TOMCAT_HOME/bin目录下添加setenv.bat文件加入

1
set JAVA_OPTS=%JAVA_OPTS% -Dserver=1

注:该server的值会在后面jsp页面中System.getProperty(“server”)取到

将tomcat复制三份,分别为:
D:/TomcatTest/tomcat1
D:/TomcatTest/tomcat2
D:/TomcatTest/tomcat3

修改三个Tomcat中conf/server.xml的Http端口(默认8080)及AJP13(默认8009)端口配置,防止端口冲突

1
2
<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
<Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />

Tomcat http端口 AJP13端口
tomcat1 9091 8019
tomcat2 9092 8029
tomcat3 9093 8039

修改三个Tomcat中bin/setenv.bat中的server标志
tomcat1:

1
set JAVA_OPTS=%JAVA_OPTS% -Dserver=1

tomcat2:

1
set JAVA_OPTS=%JAVA_OPTS% -Dserver=2

tomcat3:

1
set JAVA_OPTS=%JAVA_OPTS% -Dserver=3

将WebDemo.war解压在D:/TomcatTest/webapps目录下
修改TOMCAT_HOME/conf/server.xml
节点内添加

1
<Context path="/WebDemo" docBase="D:\TomcatTest\webapps\WebDemo" reloadable="true"/>

index.jsp源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
<%@page import="java.text.SimpleDateFormat"%>
<%@page import="java.util.Date"%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%
String server = System.getProperty("server");
request.setAttribute("Server", server);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
request.setAttribute("CurrentTime", dateFormat.format(new Date()));
%>
<html>
Current Server: ${Server} <br/>
Current Time: ${CurrentTime} <br/>
</html>

分别启动三个Tomcat,并访问WebDemo应用
访问http://localhost:9091/WebDemo/index.jsp
看到下面
Current Server: 1
Current Time: 2015-12-31 15:49:31

mod_proxy方式

从apache2.2以后官方自带mod_proxy,不用再下载额外的module包了,配置起来也很方便
修改APACHE_HOME/conf/httpd.conf
添加一行

1
Include conf/extra/httpd-proxy.conf

httpd-proxy.conf内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
LoadModule proxy_module modules/mod_proxy.so  
LoadModule proxy_balancer_module modules/mod_proxy_balancer.so
LoadModule proxy_http_module modules/mod_proxy_http.so

LoadModule lbmethod_bytraffic_module modules/mod_lbmethod_bytraffic.so
LoadModule slotmem_shm_module modules/mod_slotmem_shm.so

<Proxy balancer://mycluster>
BalancerMember http://127.0.0.1:9091
BalancerMember http://127.0.0.1:9092
BalancerMember http://127.0.0.1:9093
ProxySet lbmethod=bytraffic
</Proxy>
ProxyRequests Off
ProxyPass /test balancer://mycluster/
ProxyPassReverse /test balancer://mycluster/

启动apache

1
httpd -k start

打开浏览器访问:
http://localhost/test/WebDemo/index.jsp
不断刷新,可以看到
Current Server: 1
不断从1-3变化,则表明,负载均衡配置成功

loadfactor参数设置:
服务器的权重进行负载均衡了,权重大的表示服务器处理能力强,可以处理更多的请求


BalancerMember http://127.0.0.1:9091 loadfactor=1
BalancerMember http://127.0.0.1:9092 loadfactor=2
BalancerMember http://127.0.0.1:9093 loadfactor=3
ProxySet lbmethod=bytraffic

修改配置如上并重启apache,重复刷新页面,可以看到
Current Server: 1
第1次刷新会看到Current Server: 1
第2次刷新会看到Current Server: 2
第3次刷新会看到Current Server: 2
第4次刷新会看到Current Server: 3
第5次刷新会看到Current Server: 3
第6次刷新会看到Current Server: 3
这样apache就可以按照服务器的权重进行负载均衡了,权重大的表示服务器处理能力强,可以处理更多的请求

lbmethod参数设置
lbmethod表示负载载均衡策略。
lbmethod=byrequests 按照请求次数均衡(默认)
lbmethod=bytraffic 按照流量均衡
lbmethod=bybusyness 按照繁忙程度均衡(总是分配给活跃请求数最少的服务器)
当需要开启这些负载均衡策略时,需要分别加载如下module:
LoadModule lbmethod_bybusyness_module modules/mod_lbmethod_bybusyness.so
LoadModule lbmethod_byrequests_module modules/mod_lbmethod_byrequests.so
LoadModule lbmethod_bytraffic_module modules/mod_lbmethod_bytraffic.so

热备(Hot Standby)
热备份的实现很简单,只需添加 status=+H 属性,就可以把某台服务器指定为备份服务器

BalancerMember http://127.0.0.1:9091 loadfactor=1 status=+H
BalancerMember http://127.0.0.1:9092 loadfactor=1
BalancerMember http://127.0.0.1:9093 loadfactor=1
ProxySet lbmethod=bytraffic

在tomcat1的BalancerMember后加上status=+H则可以把tomcat1当成热备机器,在tomcat2,和tomcat3正常运行期间,apache不会将请求转发到tomcat1上,如果tomcat2,和tomcat3同时宕机,则请求会转发到tomcat1上,当tomcat2,和tomcat3再次恢复工作时,请求会重新转发到tomcat2,和tomcat3上。

lbset参数设置
类似于在balancerMember中建立群组的概念,lbset值越低,优先级越高,apache优先将请求转发到lbset更低的balancerMember上,当这组balancerMember都宕机的话,才会将请求分发到其他组的balancerMember上

BalancerMember http://127.0.0.1:9091 loadfactor=1 lbset=2
BalancerMember http://127.0.0.1:9092 loadfactor=1 lbset=1
BalancerMember http://127.0.0.1:9093 loadfactor=1 lbset=1
ProxySet lbmethod=bytraffic

修改配置如上并重启apache
打开浏览器访问:
http://localhost/test/WebDemo/index.jsp
不断刷新,发现apache只能将请求转到后台的tomcat2,和tomcat3上,我们手动关闭tomcat2,和tomcat3后,请求才转到tomcat1上
重新启动tomcat2,和tomcat3,等大约1分钟左右,新的请求又会重新转发到tomcat2,和tomcat3上,不难发现这种配置和将tomcat1变成热备(status=+H)的配置功能有同样的效果。

route参数设置
如果只用到最基本的负载均衡,route是可以不用设置的,如果需要配置tomcat做StickySession的话,需要设置该值,和后台Tomcat中server.xml中Engine节点中jvmRoute值保持一致

BalancerMember http://127.0.0.1:9091 loadfactor=1 route=tomcat1
BalancerMember http://127.0.0.1:9092 loadfactor=1 route=tomcat2
BalancerMember http://127.0.0.1:9093 loadfactor=1 route=tomcat3
ProxySet lbmethod=bytraffic

关于ProxyPassReverse
在httpd-proxy.conf的配置中,有如下反向代理设置
ProxyPass /test balancer://mycluster/
ProxyPassReverse /test balancer://mycluster/
ProxyPass 很好理解,就是把所有来自客户端对/test的请求转发给balancer://mycluster/上定义的tomcat上进行处理。
ProxyPassReverse 的配置总是和ProxyPass 一致,但用途很让人费解,似乎去掉它很能很好的工作。
事实真的是这样么?其实不然,如果响应中有302重定向,ProxyPassReverse就派上用场。
我们可以写一个例子来测试下:
redirect.jsp:
<%@ page language=”java” contentType=”text/html; charset=UTF-8” pageEncoding=”UTF-8”%>
<% response.sendRedirect(“welcome.jsp”); %>

welcome.jsp
<%@ page language=”java” contentType=”text/html; charset=UTF-8” pageEncoding=”UTF-8”%>
welcome

我们注释掉ProxyPassReverse

#ProxyPassReverse /test balancer://mycluster/
重启Apache,然后访问
http://localhost/test/WebDemo/redirect.jsp
页面会跳转到
http://127.0.0.1:9092/test/WebDemo/welcome.jsp

mod_proxy_1

如果没有配置反向代理(ProxyPassReverse),客户端收到的请求响应是重定向操作,并且重定向目的url为http://127.0.0.1:9092/test/WebDemo/welcome.jsp ,而这个地址如果只是代理服务器能访问到的(目前例子里是可以绕过apache直接访问到tomcat后的url资源),可想而知,客户端肯定是打不开的。反之如果配置了反向代理,则会在转交HTTP重定向应答到客户端之前调整它为http://localhost/test/WebDemo/welcome.jsp。

我们取消注释ProxyPassReverse
ProxyPassReverse /test balancer://mycluster/
重启Apache,然后访问
http://localhost/test/WebDemo/redirect.jsp
页面会跳转到
http://localhost/test/WebDemo/welcome.jsp

查看Chrome控制台,可以看到,这时,apache会将原本响应头里的http://127.0.0.1:9092/test/WebDemo/welcome.jsp

Localtion:http://127.0.0.1:9092/test/WebDemo/welcome.jsp
变成了:
Localtion:http://localhost/test/WebDemo/welcome.jsp

Apache还可以通过AJP13协议和后台Tomcat进行连接
只要修改httpd-proxy.conf(需要加载proxy_ajp_module)
LoadModule proxy_module modules/mod_proxy.so
LoadModule proxy_balancer_module modules/mod_proxy_balancer.so

LoadModule proxy_ajp_module modules/mod_proxy_ajp.so

LoadModule lbmethod_bytraffic_module modules/mod_lbmethod_bytraffic.so
LoadModule slotmem_shm_module modules/mod_slotmem_shm.so


BalancerMember ajp://127.0.0.1:8019 loadfactor=1 route=tomcat1
BalancerMember ajp://127.0.0.1:8029 loadfactor=1 route=tomcat2
BalancerMember ajp://127.0.0.1:8039 loadfactor=1 route=tomcat3
ProxySet lbmethod=bytraffic

ProxyRequests Off
ProxyPass /test balancer://mycluster/

重启apache,打开浏览器访问:
http://localhost/test/WebDemo/index.jsp
不断刷新,可以看到
Current Server: 1
不断从1-3变化,则表明,apache通过AJP连接Tomcat负载均衡配置成功

AJP 模式下ProxyPassReverse不起作用,页面有redirect时不能正常跳转
官方解释,请参考页面:
https://httpd.apache.org/docs/2.2/mod/mod_proxy_ajp.html
However, it is usually better to deploy the application on the backend server at the same path as the proxy rather than to take this approach.
官方建议,在AJP协议下,apache暴露的访问路径最好和后台的路径一致,否则AJP模式下redirect会失效,也就是说,如果我们后台应用是/,通过apache暴露出去的最好也是/而不是/test
我们可以访问
http://localhost/test/WebDemo/redirect.jsp
页面会跳转到
http://localhost/WebDemo/welcome.jsp
很显然跳转失败

修改反向代理设置,将/test改为/
ProxyPass / balancer://mycluster/
重启apache
我们可以访问
http://localhost/WebDemo/redirect.jsp
页面会跳转到
http://localhost/WebDemo/welcome.jsp
这样跳转就成功了

集群监控
在配置文件/conf/extra/httpd-proxy.conf中加入


SetHandler balancer-manager
Require all granted


SetHandler server-status
Require all granted


SetHandler server-info
Require all granted

配置完成后重启Apache可以通过
http://localhost/server-staus
http://localhost/server-info
http://localhost/balancer-manager
这些URL查看apache 集群的运行状态
其中server-status可以查看服务器运行状态;
server-info可以查看服务器配置信息;
balancer-manager可以管理集群,修改负载均衡配置,动态生效(修改后的配置不会保存,重启后失效)

注意:
1.文中所有Require all granted的地方因为为了测试方便,均采用此配置,表示这些权限对多有人开放,如果要应用于生产环境,请务必修改,防止服务器信息被泄露或集群配置被恶意修改。
2.文中所有Require all granted的地方均为apache2.4的配置方式,若使用apache2.2,请自行修改为相应的配置方式。

参考链接:
mod_proxy
http://httpd.apache.org/docs/2.2/en/mod/mod_proxy.html

mod_proxy_balancer算法
http://httpd.apache.org/docs/2.2/en/mod/mod_proxy_balancer.html

Apache2.2和Apache2.4中httpd.conf配置文件的异同
http://www.upupw.net/server/n75.html

分享到