深入学习Kafka:集群中Controller和Broker之间通讯机制分析 - ControllerChannelManager

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

Kafka集群中,首先会选举出一个broker作为controller,然后该controller负责跟其他broker进行协调topic创建,partition主副本选举,topic删除等事务。
下面我们来分析controller和其他broker的通讯机制
controller会发三种请求给其他broker,即:

  • LeaderAndIsrRequest - 针对topic,KafkaController会进行partition选主,产生最新的ISR, 然后发送LeaderAndIsrRequest到topic的各个replica,即其他broker。
  • StopReplicaRequest - 当broker挂掉或用户删除某replica时,会发送LeaderAndIsrRequest给其他broker
  • UpdateMetadataRequest - 当broker变动(挂掉或重启), topic创建, partition增加,partition主副本选举等等时机都需要更新metadata,以便KafkaClient能知道最新的topic的partition信息,也知道每个partition的leader是哪个broker,知道该向哪个broker读写消息。

为了提高KafkaController Leader和集群其他broker的通信效率,ControllerBrokerRequestBatch实现批量发送请求的功能。
在ControllerBrokerRequestBatch先将一批需要发送给其他broker的信息压入queue中,然后通过ControllerChannelManager从queue中取出数据批量发送给其他broker。
leaderAndIsrRequestMap:保存了发往指定broker的LeaderAndIsrRequest请求相关的信息
stopReplicaRequestMap: 保存了发往指定broker的StopReplicaRequest请求相关的信息
updateMetadataRequestMap:保存了发往指定broker的UpdateMetadataRequest请求相关的信息

ControllerChannelManager

ControllerChannelManager在构造对象时,会初始化brokerStateInfo,调用addNewBroker方法,为每个存活的broker维持一个ControllerBrokerStateInfo对象,这个ControllerBrokerStateInfo对象中有networkClient, brokerNode, messageQueue, requestThread。其中

  • networkClient - 负责底层的网络通信的客户端
  • brokerNode - 有broker的IP,端口号以及机架信息
  • messageQueue - 需要发送的消息队列BlockingQueue
  • requestThread - 发送请求的线程RequestSendThread

注意在addNewBroker中,初始化RequestSendThread后,不会立即运行,需要等到startup方法执行时才运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private def addNewBroker(broker: Broker) {
//初始化阻塞队列,用于存放发给broker的消息
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
//初始化底层的通信客户端
val networkClient = {
val channelBuilder = ChannelBuilders.create(
config.interBrokerSecurityProtocol,
Mode.CLIENT,
LoginType.SERVER,
config.values,
config.saslMechanismInterBrokerProtocol,
config.saslInterBrokerHandshakeRequestEnable
)
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
time,
"controller-channel",
Map("broker-id" -> broker.id.toString).asJava,
false,
channelBuilder
)
new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
1,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
time
)
}
val threadName = threadNamePrefix match {
case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
}

//初始化发送请求的线程RequestSendThread,但是不开始
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
brokerNode, config, time, threadName)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "

controllerContext.liveBrokers.foreach(addNewBroker(_))

def startup() = {
brokerLock synchronized {
brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
}
}

def shutdown() = {
brokerLock synchronized {
brokerStateInfo.values.foreach(removeExistingBroker)
}
}

def addBroker(broker: Broker) {
// be careful here. Maybe the startup() API has already started the request send thread
//有可能startup中已经启动了thread
brokerLock synchronized {
if(!brokerStateInfo.contains(broker.id)) {
addNewBroker(broker)
startRequestSendThread(broker.id)
}
}
}

def removeBroker(brokerId: Int) {
brokerLock synchronized {
removeExistingBroker(brokerStateInfo(brokerId))
}
}

//移除已经存在的broker
private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
try {
brokerState.networkClient.close()
brokerState.messageQueue.clear()
brokerState.requestSendThread.shutdown()
brokerStateInfo.remove(brokerState.brokerNode.id)
} catch {
case e: Throwable => error("Error while removing broker by the controller", e)
}
}

//开始请求发送线程
protected def startRequestSendThread(brokerId: Int) {
val requestThread = brokerStateInfo(brokerId).requestSendThread
if(requestThread.getState == Thread.State.NEW)
requestThread.start()
}
}

sendRequest方法中,只是把request压入对应broker的queue中

1
2
3
4
5
6
7
8
9
10
11
def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) {
brokerLock synchronized {
val stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback))
case None =>
warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
}
}
}

RequestSendThread

当broker被选举成controller后,KafkaController的onControllerFailover方法会被调用,在这个方法中会调用ControllerChannelManager的startup方法。
在startup中会依次启动每个broker的请求发送线程,在RequestSendThread的doWork方法中,会从queue中取出需要发送的Item,然后一个个发送给相应的Broker,遇到broker未准备就绪,或者发送失败,都会等待300ms后再次重试,直到有收到正确的响应,并调用callback方法回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
val queue: BlockingQueue[QueueItem],
val networkClient: NetworkClient,
val brokerNode: Node,
val config: KafkaConfig,
val time: Time,
name: String)
extends ShutdownableThread(name = name) {

private val lock = new Object()
private val stateChangeLogger = KafkaController.stateChangeLogger
private val socketTimeoutMs = config.controllerSocketTimeoutMs

override def doWork(): Unit = {

def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300))

//从queue中取出QueueItem
val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
import NetworkClientBlockingOps._
var clientResponse: ClientResponse = null
try {
lock synchronized {
var isSendSuccessful = false
while (isRunning.get() && !isSendSuccessful) {
// if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
// removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
//如果某个broker宕机了,controller会收到zookeeper事件,会触发removeBroker,将会关闭该broker相应的thread
try {
if (!brokerReady()) {
//如果broker没有就绪,则等待300ms后继续尝试
isSendSuccessful = false
backoff()
}
else {
val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
//发送请求
clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
isSendSuccessful = true
}
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
"Reconnecting to broker.").format(controllerId, controllerContext.epoch,
request.toString, brokerNode.toString()), e)
networkClient.close(brokerNode.idString)
//如果发送失败,则等待300ms后继续尝试
isSendSuccessful = false
backoff()
}
}
if (clientResponse != null) {
val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
}
stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
.format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))

if (callback != null) {
callback(response)
}
}
}
} catch {
case e: Throwable =>
error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
// If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
networkClient.close(brokerNode.idString)
}
}

}

ControllerBrokerRequestBatch

在KafkaController,PartitionStateMachine,ReplicaStateMachine等类中,有Broker,Partition状态等发生改变时,需要向其他Broker同步这些信息时,都会调用ControllerBrokerRequestBatch来批量发送。
先看一个例子,先调用newBatch方法,然后再调用addStopReplicaRequestForBrokers等方法向queue中添加需要发送的请求,最后调用sendRequestsToBrokers方法来完成发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
callbacks: Callbacks = (new CallbackBuilder).build) {
if(replicas.size > 0) {
info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
try {
brokerRequestBatch.newBatch()
replicas.foreach(r => handleStateChange(r, targetState, callbacks))
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}catch {
case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
}
}
}

ControllerBrokerRequestBatch中对于三种请求leaderAndIsrRequest,stopReplicaRequest,updateMetadataRequest维持了不同的map,分别提供的addLeaderAndIsrRequestForBrokers,addStopReplicaRequestForBrokers,addUpdateMetadataRequestForBrokers用来添加对应的Request到map中,最后调用sendRequestsToBrokers方法来将不同的request压入不同的RequestSendThread线程队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging {
val controllerContext = controller.controllerContext
val controllerId: Int = controller.config.brokerId
val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
private val stateChangeLogger = KafkaController.stateChangeLogger

def newBatch() {
// raise error if the previous batch is not empty
if (leaderAndIsrRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
"a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
if (stopReplicaRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
if (updateMetadataRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
}

def clear() {
leaderAndIsrRequestMap.clear()
stopReplicaRequestMap.clear()
updateMetadataRequestMap.clear()
}

//添加LeaderAndIsrRequest
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
val topicPartition = new TopicPartition(topic, partition)

brokerIds.filter(_ >= 0).foreach { brokerId =>
val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
}

addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
Set(TopicAndPartition(topic, partition)))
}

//添加StopReplicaRequest
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
callback: (AbstractRequestResponse, Int) => Unit = null) {
brokerIds.filter(b => b >= 0).foreach { brokerId =>
stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
val v = stopReplicaRequestMap(brokerId)
//添加停止副本fetch的request到map中
if(callback != null)
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
else
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition)
}
}

/** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
//添加UpdateMetadataRequest
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
callback: AbstractRequestResponse => Unit = null) {
def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = if (beingDeleted) {
val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
} else {
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
}
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
//将最新的metadata信息放入updateMetadataRequestMap中
updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
}
case None =>
info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
}
}

val filteredPartitions = {
val givenPartitions = if (partitions.isEmpty)
controllerContext.partitionLeadershipInfo.keySet
else
partitions
if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
givenPartitions
else
givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
}
if (filteredPartitions.isEmpty)
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
}
else
filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))

controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
}

//依次取出三个map的内容,分别放入对应的queue中
def sendRequestsToBrokers(controllerEpoch: Int) {
try {
//将leaderAndIsrRequest放入queue
leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
partitionStateInfos.foreach { case (topicPartition, state) =>
val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
state.leaderIsrAndControllerEpoch, broker,
topicPartition.topic, topicPartition.partition))
}
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
_.getNode(controller.config.interBrokerSecurityProtocol)
}
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
topicPartition -> partitionState
}
val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
//将leaderAndIsrRequest放入queue,而非真正发送,此处并没有给callback,不需要回调
controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
}
leaderAndIsrRequestMap.clear()
updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>

partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
"to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
broker, p._1)))
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
topicPartition -> partitionState
}

val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
else 0: Short

val updateMetadataRequest =
if (version == 0) {
val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
}
else {
val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
}
new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
}
new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
}
//将updateMetadataRequest放入queue,而非真正发送,此处并没有给callback,不需要回调
controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
}
updateMetadataRequestMap.clear()
stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
debug("The stop replica request (delete = true) sent to broker %d is %s"
.format(broker, stopReplicaWithDelete.mkString(",")))
debug("The stop replica request (delete = false) sent to broker %d is %s"
.format(broker, stopReplicaWithoutDelete.mkString(",")))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,
Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
//将stopReplicaRequest放入queue,而非真正发送
controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)
}
}
stopReplicaRequestMap.clear()
} catch {
case e : Throwable => {
if (leaderAndIsrRequestMap.size > 0) {
error("Haven't been able to send leader and isr requests, current state of " +
s"the map is $leaderAndIsrRequestMap. Exception message: $e")
}
if (updateMetadataRequestMap.size > 0) {
error("Haven't been able to send metadata update requests, current state of " +
s"the map is $updateMetadataRequestMap. Exception message: $e")
}
if (stopReplicaRequestMap.size > 0) {
error("Haven't been able to send stop replica requests, current state of " +
s"the map is $stopReplicaRequestMap. Exception message: $e")
}
throw new IllegalStateException(e)
}
}
}
}

这篇博文并未对底层的通信协议进行展开分析,后续会有专门博文来分析协议相关的东西。

[参考资料]
http://blog.csdn.net/zhanglh046/article/details/72821930

分享到

深入学习Kafka:PartitionLeaderSelector源码分析

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

PartitionLeaderSelector主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。
TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了选举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。

1
2
3
trait PartitionLeaderSelector {
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}

PartitionLeaderSelector的实现类主要有

  1. NoOpLeaderSelector
  2. OfflinePartitionLeaderSelector
  3. ReassignedPartitionLeaderSelector
  4. PreferredReplicaPartitionLeaderSelector
  5. ControlledShutdownLeaderSelector

PartitionLeaderSelector 类图

NoOpLeaderSelector

NoOpLeaderSelector就是啥也不做的LeaderSelector。

1
2
3
4
5
6
7
8
9
class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {

this.logIdent = "[NoOpLeaderSelector]: "

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
(currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
}
}

ControlledShutdownLeaderSelector

当controller收到shutdown命令后,触发新的分区主副本选举
先找出已分配的副本集合(assignedReplicas),然后过滤出仍存活的副本集合(liveAssignedReplicas),在该列表中选取第一个broker作为该分区的主副本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
extends PartitionLeaderSelector
with Logging {

this.logIdent = "[ControlledShutdownLeaderSelector]: "

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion

val currentLeader = currentLeaderAndIsr.leader

//已分配的Replica
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
//仍存活的Replica
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))

//当前的ISR列表中滤掉挂掉的broker
val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
//存活的ISR列表中,选出第一个broker作为该分区的Leader(主副本)
liveAssignedReplicas.filter(newIsr.contains).headOption match {
case Some(newLeader) =>
//如果存在,则将当前LeaderEpoch计数器加1,对应的Zookeeper节点的版本号也加1
debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
case None =>
//不存在则报错StateChangeFailedException
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}
}

PreferredReplicaPartitionLeaderSelector

当controller收到分区主副本重新优化分配命令后,触发新的分区主副本优化,即将AR里的第一个取出,作为优化后的主副本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
//已分配的Replica
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//已分配的Replica列表中第一个即为最优的副本
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
//检查是否当前分区主副本已经是最优的副本,则报错LeaderElectionNotNeededException
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if (currentLeader == preferredReplica) {
throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
.format(preferredReplica, topicAndPartition))
} else {
info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
" Trigerring preferred replica leader election")
// check if preferred replica is not the current leader and is alive and in the isr
if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
//如果当前的最优主副本存活,返回将其设为最优主副本,则将当前LeaderEpoch计数器加1,对应的Zookeeper节点的版本号也加1
(new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
} else {
//如果当前的最优主副本挂掉了,则报错StateChangeFailedException
throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
"%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
}

ReassignedPartitionLeaderSelector

在某个topic重新分配分区的时候,触发新的主副本选举,将存活的ISR中的第一个副本选举成为主副本(leader)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[ReassignedPartitionLeaderSelector]: "

/**
* The reassigned replicas are already in the ISR when selectLeader is called.
*/
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
//重新分配的ISR副本集
val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
//过滤出仍存活的重新分配的ISR副本集
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
currentLeaderAndIsr.isr.contains(r))
//选取ISR中的第一个为主副本
val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
newLeaderOpt match {
//返回ISR中的第一个为主副本,则将当前LeaderEpoch计数器加1,对应的Zookeeper节点的版本号也加1
case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
case None =>
//如果没有存活的ISR,则报错NoReplicaOnlineException,选举失败
reassignedInSyncReplicas.size match {
case 0 =>
throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
"%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
}

OfflinePartitionLeaderSelector

选出新的Leader,新的ISR,步骤如下:

  1. 如果至少有一个broker在ISR列表中,并且存活,则将其选为leader,ISR中存活的为新的ISR
  2. 如果ISR列表为空,且unclean.leader.election.enable=false,则报错NoReplicaOnlineException
  3. 如果unclean.leader.election.enable=true,即意味着可以选举不在ISR列表中的broker为Leader,即在AR列表中选出Leader,但是这样会引起数据不一致
  4. 若AR列表也为空,则报错NoReplicaOnlineException
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
    extends PartitionLeaderSelector with Logging {
    this.logIdent = "[OfflinePartitionLeaderSelector]: "

    def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
    case Some(assignedReplicas) =>
    val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
    val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
    val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
    case true =>
    // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
    // for unclean leader election.
    if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
    ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
    throw new NoReplicaOnlineException(("No broker in ISR for partition " +
    "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
    " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
    }

    debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
    .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
    liveAssignedReplicas.isEmpty match {
    case true =>
    //若AR列表也为空,则报错NoReplicaOnlineException
    throw new NoReplicaOnlineException(("No replica for partition " +
    "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
    " Assigned replicas are: [%s]".format(assignedReplicas))
    case false =>
    //如果unclean.leader.election.enable=true,即意味着可以选举不在ISR列表中的broker为Leader,即在AR列表中,选出Leader
    ControllerStats.uncleanLeaderElectionRate.mark()
    val newLeader = liveAssignedReplicas.head
    warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
    .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
    new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
    }
    case false =>
    //如果至少有一个broker在ISR列表中,并且存活,则将其选为leader,ISR中存活的为新的ISR
    val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
    val newLeader = liveReplicasInIsr.head
    debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
    .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
    new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
    }
    info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
    (newLeaderAndIsr, liveAssignedReplicas)
    case None =>
    throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
    }
    }
    }

[参考资料]
https://yq.aliyun.com/articles/15283
http://blog.csdn.net/zhanglh046/article/details/72822066

分享到

深入学习Kafka:Topic的删除过程分析

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

要删除Topic,需要执行下面命令:

1
.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test

这里假设zookeeper地址为localhost,要删除的topic是test,这条命令实际上是在zookeeper的节点/admin/delete_topics下创建一个节点test,节点名为topic名字。(很多博文中说这个节点时临时的,其实不是,是个持久节点,直到topic真正删除时,才会被controller删除)
执行这段命令后控制台输出
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
也就是说执行删除命令,不是真正删除,而是标记删除,在zookeeper上添加/admin/delete_topics/test节点,也提醒了我们,需要提前打开delete.topic.enable开关。

Kafka删除Topic的源码分析

在Kafka中,Topic的删除是靠DeleteTopicManager类来完成的。
当Broker被选举成集群Leader之后,KafkaController中的onControllerFailover会被调用,在该方法中会调用deleteTopicManager.start()来启动删除Topic的线程。
而当Broker不再成为集群Leader时,KafkaController中的onControllerResignation会被调用,在该方法中会调用deleteTopicManager.shutdown()来关闭删除Topic的线程。

在KafkaController的onControllerFailover方法中,初始化了partitionStateMachine状态机,并注册了相应的事件监听器,主要是监听zookeeper节点/admin/delete_topics下子节点的变化。

1
2
3
4
5
6
7
8
9
10
11
def onControllerFailover() {
if(isRunning) {
// ...
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
// ...
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class PartitionStateMachine{
def registerListeners() {
registerTopicChangeListener()
if(controller.config.deleteTopicEnable)
//注册事件监听,关注节点/admin/delete_topics下子节点的变化
registerDeleteTopicListener()
}

private def registerDeleteTopicListener() = {
zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}

private def deregisterDeleteTopicListener() = {
zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
}

kafka.controller.PartitionStateMachine.DeleteTopicsListener

DeleteTopicsListener将监听zookeeper节点/admin/delete_topics下子节点的变化,当有childChange,即有新的topic需要被删除时,该handleChildChange会被触发,将该topic加入到deleteTopicManager的queue中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class DeleteTopicsListener() extends IZkChildListener with Logging {
this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
val zkUtils = controllerContext.zkUtils

/**
* Invoked when a topic is being deleted
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
//监听zookeeper节点/admin/delete_topics下子节点的变化,当有childChange,即有新的topic需要被删除时,该handleChildChange会被触发
inLock(controllerContext.controllerLock) {
var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[String]).toSet
}
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
//查询Topic是否存在,若topic已经不存在了,则直接删除/admin/delete_topics/< topic_name >节点
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
if(nonExistentTopics.size > 0) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
}
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
// mark topic ineligible for deletion if other state changes are in progress
// 查询topic是否为当前正在执行Preferred副本选举或分区重分配,若果是,则标记为暂时不适合被删除。
topicsToBeDeleted.foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
// add topic to deletion list
//添加topic到待删除的queue中
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
}
}

/**
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}

TopicDeletionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class TopicDeletionManager(controller: KafkaController,
initialTopicsToBeDeleted: Set[String] = Set.empty,
initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
val controllerContext = controller.controllerContext
//partition状态机
val partitionStateMachine = controller.partitionStateMachine
//replica状态机
val replicaStateMachine = controller.replicaStateMachine
val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
val deleteLock = new ReentrantLock()
val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
(initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
val deleteTopicsCond = deleteLock.newCondition()
val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
var deleteTopicsThread: DeleteTopicsThread = null
val isDeleteTopicEnabled = controller.config.deleteTopicEnable

/**
* Invoked at the end of new controller initiation
*/
def start() {
if (isDeleteTopicEnabled) {
//如果topic.delete.enable=true,则启动删除线程
deleteTopicsThread = new DeleteTopicsThread()
if (topicsToBeDeleted.size > 0)
deleteTopicStateChanged.set(true)
deleteTopicsThread.start()
}
}

/**
* Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.
*/
def shutdown() {
// Only allow one shutdown to go through
if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {
// Resume the topic deletion so it doesn't block on the condition
//此时删除线程有可能处于等待状态,即awaitTopicDeletionNotification方法处于阻塞等待状态,则唤醒该删除线程
resumeTopicDeletionThread()
// Await delete topic thread to exit
//等待删除线程doWork执行结束
deleteTopicsThread.awaitShutdown()
//清除资源
topicsToBeDeleted.clear()
partitionsToBeDeleted.clear()
topicsIneligibleForDeletion.clear()
}
}
}

DeleteTopicsThread

DeleteTopicsThread继承自ShutdownableThread,ShutdownableThread是一个可以循环执行某个方法(doWork方法)的线程,也提供了shutdown方法同步等待该线程真正运行结束,代码比较简单。利用了CountDownLatch来阻塞调用shutdown的线程,待doWork真正执行结束时,再唤醒其他阻塞的线程。

ShutdownableThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)
extends Thread(name) with Logging {
this.setDaemon(false)
this.logIdent = "[" + name + "], "
val isRunning: AtomicBoolean = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)

def shutdown() = {
//设置running状态为false
initiateShutdown()
//等待在运行的任务执行完毕
awaitShutdown()
}

def initiateShutdown(): Boolean = {
if(isRunning.compareAndSet(true, false)) {
info("Shutting down")
isRunning.set(false)
if (isInterruptible)
interrupt()
true
} else
false
}

/**
* After calling initiateShutdown(), use this API to wait until the shutdown is complete
*/
def awaitShutdown(): Unit = {
//等待线程运行结束
shutdownLatch.await()
info("Shutdown completed")
}

/**
* This method is repeatedly invoked until the thread shuts down or this method throws an exception
*/
def doWork(): Unit

override def run(): Unit = {
info("Starting ")
try{
while(isRunning.get()){
doWork()
}
} catch{
case e: Throwable =>
if(isRunning.get())
error("Error due to ", e)
}
//计数器减一,唤醒在awaitShutdown方法上等待的线程
shutdownLatch.countDown()
info("Stopped ")
}
}

DeleteTopicsThread

当删除Topic的事件通知到来,则doWork里方法继续往下执行:
当所有的replica都完成了topic的删除动作,则调用completeDeleteTopic做最后的清理动作,包括zookeeper上节点的删除,以及controller内存中的清理。
如果有replica将该topic标记为不可删除(可能之前是由于该replica处于Preferred副本选举或分区重分配的过程中),如果有,则重试将topic标记成删除状态
如果该topic可以被删除,且还没有处于已经开始删除的状态,则调用onTopicDeletion执行真正的删除逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
val zkUtils = controllerContext.zkUtils
override def doWork() {
//等待删除Topic的事件通知
awaitTopicDeletionNotification()

if (!isRunning.get)
return

inLock(controllerContext.controllerLock) {
val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted

if(!topicsQueuedForDeletion.isEmpty)
info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))

topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
//如果所有的replica都完成了topic的删除
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
// clear up all state for this topic from controller cache and zookeeper
completeDeleteTopic(topic)
info("Deletion of topic %s successfully completed".format(topic))
} else {
//至少一个replica在开始删除状态
if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
// ignore since topic deletion is in progress
val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
val replicaIds = replicasInDeletionStartedState.map(_.replica)
val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
partitions.mkString(","), topic))
} else {
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
//如果没有replica处于开始删除状态(TopicDeletionStarted),并且也不是所有replica都删除了该topic
//则判断是否有replica将该topic标记为不可删除,如果有,则重试将topic标记成删除状态
if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
// mark topic for deletion retry
markTopicForDeletionRetry(topic)
}
}
}
// Try delete topic if it is eligible for deletion.
//如果该topic可以被删除,且还没有处于已经开始删除的状态
if(isTopicEligibleForDeletion(topic)) {
info("Deletion of topic %s (re)started".format(topic))
// topic deletion will be kicked off
//触发topic删除事件
onTopicDeletion(Set(topic))
} else if(isTopicIneligibleForDeletion(topic)) {
info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
}
}
}
}
}

completeDeleteTopic方法

完成删除topic后会调用completeDeleteTopic进行一些清理工作,即:
删除zookeeper上节点/brokers/topics/< topic_name >
删除zookeeper上节点/config/topics/< topic_name >
删除zookeeper上节点/admin/delete_topics/< topic_name >
并删除内存中的topic相关信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private def completeDeleteTopic(topic: String) {
// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine.deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
val zkUtils = controllerContext.zkUtils
//删除zookeeper上节点/brokers/topics/< topic_name >
zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
//删除zookeeper上节点/config/topics/< topic_name >
zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
//删除zookeeper上节点/admin/delete_topics/< topic_name >
zkUtils.zkClient.delete(getDeleteTopicPath(topic))
//最后移除内存中的topic相关信息
controllerContext.removeTopic(topic)
}

markTopicForDeletionRetry方法

将topic标记成OfflineReplica状态来重试删除

1
2
3
4
5
6
7
private def markTopicForDeletionRetry(topic: String) {
// reset replica states from ReplicaDeletionIneligible to OfflineReplica
val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
.format(topic, failedReplicas.mkString(",")))
controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
}

onTopicDeletion方法

onTopicDeletion最终会调用startReplicaDeletion方法,来开始删除这个topic的所有分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private def onTopicDeletion(topics: Set[String]) {
info("Topic deletion callback for %s".format(topics.mkString(",")))
// send update metadata so that brokers stop serving data for topics to be deleted
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
// 向各broker更新原信息,使得他们不再向外提供数据服务,准备开始删除数据
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach { topic =>
onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
}
}

private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
startReplicaDeletion(replicasPerPartition)
}

private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
// move dead replicas directly to failed state
//将所有已经挂掉的replica标记成ReplicaDeletionIneligible(无法删除的Replica)
replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
// send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
//将所有未挂掉的replica标记成OfflineReplica(下线的Replica),并发送给相应的broker,这样这些broker就不会再向Leader发送该topic的同步请求(FetchRequest)
replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
//给所有replica发送停止fetch请求,请求完成后,回调deleteTopicStopReplicaCallback方法
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
if(deadReplicasForTopic.size > 0) {
debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
markTopicIneligibleForDeletion(Set(topic))
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//开始删除topic开始时,会给存活的broker发送停止fetch的请求,请求完成后,会回调该方法
private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {
val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
val responseMap = stopReplicaResponse.responses.asScala
val partitionsInError =
if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet
val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
inLock(controllerContext.controllerLock) {
// move all the failed replicas to ReplicaDeletionIneligible
//若有replica出现错误,则将它踢出可删除的Replica列表
failReplicaDeletion(replicasInError)
if (replicasInError.size != responseMap.size) {
//有些Replica已经成功删除了数据
// some replicas could have been successfully deleted
val deletedReplicas = responseMap.keySet -- partitionsInError
completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
}
}
}

Kafka删除Topic的过程

分析完源代码,我们总结下,Kafka删除Topic的过程

  1. Kafka的broker在被选举成controller后,会执行下面几步
    1.1 注册DeleteTopicsListener,监听zookeeper节点/admin/delete_topics下子节点的变化,delete命令实际上就是要在该节点下创建一个节点,名字是待删除topic名,标记该topic是待删除的
    1.2 创建一个单独的线程DeleteTopicsThread,来执行topic删除的操作
  2. DeleteTopicsThread线程启动时会先在awaitTopicDeletionNotification处阻塞并等待删除事件的通知,即有新的topic被添加到queue里等待被删除。
  3. 当我们使用了delete命令在zookeeper上的节点/admin/delete_topics下创建子节点< topic_name >。
  4. DeleteTopicsListener会收到ChildChange事件会依次判断如下逻辑:
    4.1 查询topic是否存在,若已经不存在了,则直接删除/admin/delete_topics/< topic_name >节点。
    4.2 查询topic是否为当前正在执行Preferred副本选举或分区重分配,若果是,则标记为暂时不适合被删除。
    4.3 并将该topic添加到queue中,此时会唤醒DeleteTopicsThread中doWork方法里awaitTopicDeletionNotification处的阻塞线程,让删除线程继续往下执行。

而删除线程执行删除操作的真正逻辑是:

  1. 它首先会向各broker更新原信息,使得他们不再向外提供数据服务,准备开始删除数据。
  2. 开始删除这个topic的所有分区
    2.1. 给所有broker发请求,告诉它们这些分区要被删除。broker收到后就不再接受任何在这些分区上的客户端请求了
    2.2. 把每个分区下的所有副本都置于OfflineReplica状态,这样ISR就不断缩小,当leader副本最后也被置于OfflineReplica状态时leader信息将被更新为-1
    2.3 将所有副本置于ReplicaDeletionStarted状态
    2.4 副本状态机捕获状态变更,然后发起StopReplicaRequest给broker,broker接到请求后停止所有fetcher线程、移除缓存,然后删除底层log文件
    2.5 关闭所有空闲的Fetcher线程
  3. 删除zookeeper上节点/brokers/topics/< topic_name >
  4. 删除zookeeper上节点/config/topics/< topic_name >
  5. 删除zookeeper上节点/admin/delete_topics/< topic_name >
  6. 并删除内存中的topic相关信息。

Kafka删除Topic的流程图

Kafka删除Topic的流程图

Q&A

前面我们分析了Kafka删除Topic的源代码,也总结了其删除的过程,下面我们再来看看下面这些相关问题,加深对这个过程的理解

Q1:有分区挂掉的情况下,是否能正常删除?

修改三个broker的server.properties,分别开启delete.topic.enable=true
启动zookeeper和三个broker,broker1,broker2,broker3,并启动kafka-manager,
其中zookeeper端口为2181,
broker1端口为9091,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_1
broker2端口为9092,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_2
broker3端口为9093,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_3
kafka-manager端口为9000,访问http://localhost:9000可以查看kafka集群情况

开始实验,创建topic test

1
.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic test

并写入几条测试消息

1
2
3
4
5
6
7
.\kafka-console-producer.bat --broker-list localhost:9092 --topic test
111
222
333
444
555
666

观察zookeeper中路径

1
2
ls /brokers/topics/test/partitions
[0, 1, 2, 3, 4, 5]

关闭broker2,并执行删除topic命令

1
.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test

观察zookeeper中路径/admin/delete_topics

1
2
[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics
[test]

过几秒后观察只有broker2的log目录下存在topic test的文件夹,而broker1和broker2的log目录下已经删除了test相关log
test-0,test-1,test-2,test-3,test-4,test-5

观察broker1的controller.log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
[2017-12-06 12:14:39,181] DEBUG [DeleteTopicsListener on 1]: Delete topics listener fired for topics test to be deleted (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2017-12-06 12:14:39,182] INFO [DeleteTopicsListener on 1]: Starting topic deletion for topics test (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2017-12-06 12:14:39,184] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,186] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> OnlineReplica, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=3] -> OnlineReplica, [Topic=test,Partition=3,Replica=3] -> OnlineReplica, [Topic=test,Partition=5,Replica=1] -> OnlineReplica, [Topic=test,Partition=0,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=1] -> OnlineReplica, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> OnlineReplica, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,187] INFO [delete-topics-thread-1], Deletion of topic test (re)started (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,188] INFO [Topic Deletion Manager 1], Topic deletion callback for test (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,191] INFO [Topic Deletion Manager 1], Partition deletion callback for [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,194] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionIneligible for replicas [Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,195] INFO [Replica state machine on controller 1]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,195] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,5]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,200] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":1,"leader_epoch":5,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,200] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,2]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,204] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,204] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,3]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,207] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,208] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,4]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,211] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,211] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,1]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,215] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,215] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,2]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,219] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,219] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,0]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,223] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":3,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,223] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,3]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,227] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,227] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,5]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,230] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,231] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,0]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,235] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":4,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,235] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,4]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,239] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,240] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,1]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,243] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 1 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 3 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,245] DEBUG [Topic Deletion Manager 1], Deletion started for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,245] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionStarted for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,246] DEBUG The stop replica request (delete = true) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = false) sent to broker 1 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = true) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = false) sent to broker 3 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,247] DEBUG [Topic Deletion Manager 1], Dead Replicas ([Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2]) found for topic test (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,248] INFO [Topic Deletion Manager 1], Halted deletion of topics test (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,248] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,275] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=2,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,275] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=5,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,279] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,279] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=5,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,279] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,279] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,281] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,281] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,281] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,281] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,282] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=2,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,282] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,282] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,282] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,283] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,283] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,295] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=4,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,295] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,295] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=4,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,296] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,296] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=3,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,296] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,297] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,297] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,297] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=3,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,298] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,298] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,298] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,298] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=1,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,313] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=1,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,313] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=5,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,313] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,314] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,314] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,314] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=5,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,315] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,315] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,315] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,315] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,329] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=2,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,329] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,329] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=2,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,330] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=0,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,0],[test,3],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,330] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=0,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,332] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,332] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,332] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3],[test,0],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,333] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,333] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,345] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=0,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,345] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,345] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=0,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,345] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,346] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,346] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,346] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,346] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,347] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=4,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,347] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,347] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=4,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,348] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,348] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,360] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=3,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,361] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,361] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=3,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,361] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,361] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=1,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Deletion for replicas 1 for partition [test,1] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,362] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,362] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,363] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)

发现broker1和broker3的所有partition全部删除了

1
2
3
4
5
6
7
8
9
10
11
12
Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=1]

观察broker1和broker3的server.log,发现topic test的数据文件已被成功清除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[2017-12-06 12:14:39,245] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,248] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,249] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,250] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,251] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,252] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,253] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,268] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-5\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,272] INFO Deleted log for partition [test,5] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-5. (kafka.log.LogManager)
[2017-12-06 12:14:39,279] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,290] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-4\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,294] INFO Deleted log for partition [test,4] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-4. (kafka.log.LogManager)
[2017-12-06 12:14:39,296] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,309] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-1\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,312] INFO Deleted log for partition [test,1] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-1. (kafka.log.LogManager)
[2017-12-06 12:14:39,313] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,324] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-2\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,328] INFO Deleted log for partition [test,2] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-2. (kafka.log.LogManager)
[2017-12-06 12:14:39,330] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,341] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-0\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,344] INFO Deleted log for partition [test,0] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-0. (kafka.log.LogManager)
[2017-12-06 12:14:39,346] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,356] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-3\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,360] INFO Deleted log for partition [test,3] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-3. (kafka.log.LogManager)

接着启动broker2
观察broker2的server.log,发现topic test的数据文件已被成功清除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[2017-12-06 12:24:05,687] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,691] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,695] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,700] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,706] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,711] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,715] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,742] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-1\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,747] INFO Deleted log for partition [test,1] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-1. (kafka.log.LogManager)
[2017-12-06 12:24:05,750] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,764] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-4\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,769] INFO Deleted log for partition [test,4] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-4. (kafka.log.LogManager)
[2017-12-06 12:24:05,772] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,785] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-2\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,789] INFO Deleted log for partition [test,2] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-2. (kafka.log.LogManager)
[2017-12-06 12:24:05,791] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,803] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-0\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,805] INFO Deleted log for partition [test,0] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-0. (kafka.log.LogManager)
[2017-12-06 12:24:05,808] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,818] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-3\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,821] INFO Deleted log for partition [test,3] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-3. (kafka.log.LogManager)
[2017-12-06 12:24:05,823] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,833] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-5\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,836] INFO Deleted log for partition [test,5] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-5. (kafka.log.LogManager)

最后观察zookeeper中路径/admin/delete_topics,已经没有test子节点

1
2
[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics
[]

观察zookeeper中路径

1
2
[zk: localhost:2181(CONNECTED) 26] ls /brokers/topics/test/partitions
Node does not exist: /brokers/topics/test/partitions

Q2:是否不需要每台broker都修改topic.delete.enable为true?

在Q1中,我们如果只配置broker1的topic.delete.enable为true,也是能正常删除topic的,我们只要保证broker1是整个集群的controller即可,我们可以先启动broker1,然后再启动broker2,broker3。
而在生产环境中,为防止有人误删,我们一般会关闭删除功能,而定期清理不用的topic,在定期做清理时,我们其实可以只改其中一个broker的配置,然后确保它是集群中的controller即可。
而不幸的是,你改了broker1配置,必然要重启该broker,然后其他broker就会成为新的broker,这时,相当于你修改的这个broker1的配置还是不起作用,因为它不是controller了,所以我们可以在broker1重新启动后,分别关闭broker2,broker3,让broker1重新获得controller的控制权,这样就不用一个个去修改broker的配置了。
在清理完topic后,记得把topic.delete.enable改回为false。

Q3:如果删除topic时还有producer连接到该topic时,且配置auto.create.topics.enable=true,会发生什么?

本人在做Q1例子的时候,启动了客户端生产消息到topic test中,然后忘记关了,而auto.create.topics.enable是默认打开的,所以我在尝试删除topic的时候,发现总有broker1中的test-0无法删除。
后来才发现是因为有客户端连接到了该topic,在我删除成功后,又自动创建了topic,而默认是1个replica,和1个partition。

Q4: 如何不通过kafka手动删除topic

以删除topic test为例,步骤如下:

  1. 删除kafka存储目录(server.properties文件log.dirs配置,默认为”/tmp/kafka-logs”)相关topic目录,如test-0,test-1等
  2. 删除zookeeper目录下相关topic节点
    rmr /brokers/topics/test
    rmr /config/topics/test
    rmr /admin/delete_topics/test

[参考资料]

http://www.cnblogs.com/huxi2b/p/4842695.html

分享到

深入学习Kafka:Leader Election - Kafka集群Leader选举过程分析

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

本文所讲的Leader是指集群中的Controller,而不是各个Partition的Leader。

为什么要有Leader?

在Kafka早期版本,对于分区和副本的状态的管理依赖于zookeeper的Watcher和队列:每一个broker都会在zookeeper注册Watcher,所以zookeeper就会出现大量的Watcher, 如果宕机的broker上的partition很多比较多,会造成多个Watcher触发,造成集群内大规模调整;每一个replica都要去再次zookeeper上注册监视器,当集群规模很大的时候,zookeeper负担很重。这种设计很容易出现脑裂和羊群效应以及zookeeper集群过载。

新的版本中该变了这种设计,使用KafkaController,只有KafkaController,Leader会向zookeeper上注册Watcher,其他broker几乎不用监听zookeeper的状态变化。

Kafka集群中多个broker,有一个会被选举为controller leader,负责管理整个集群中分区和副本的状态,比如partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic分区的时候也会由controller管理分区的重新分配工作

Kafka集群Leader选举原理

我们知道Zookeeper集群中也有选举机制,是通过Paxos算法,通过不同节点向其他节点发送信息来投票选举出leader,但是Kafka的leader的选举就没有这么复杂了。
Kafka的Leader选举是通过在zookeeper上创建/controller临时节点来实现leader选举,并在该节点中写入当前broker的信息
{“version”:1,”brokerid”:1,”timestamp”:”1512018424988”}
利用Zookeeper的强一致性特性,一个节点只能被一个客户端创建成功,创建成功的broker即为leader,即先到先得原则,leader也就是集群中的controller,负责集群中所有大小事务。
当leader和zookeeper失去连接时,临时节点会删除,而其他broker会监听该节点的变化,当节点删除时,其他broker会收到事件通知,重新发起leader选举。

KafkaController

KafkaController初始化ZookeeperLeaderElector对象,为ZookeeperLeaderElector设置两个回调方法,onControllerFailover和onControllerResignation
onControllerFailover在选举leader成功后会回调,在onControllerFailover中进行leader依赖的模块初始化,包括向zookeeper上/controller_epoch节点上记录leader的选举次数,这个epoch数值在处理分布式脑裂的场景中很有用。
而onControllerResignation在当前broker不再成为leader(即当前leader退位后)时会回调。
KafkaController在启动后注册zookeeper的会话超时监听器,并尝试选举leader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class KafkaController {
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId)

def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up")
//注册Session过期监听器
registerSessionExpirationListener()
isRunning = true
//每次启动时,尝试选举leader
controllerElector.startup
info("Controller startup complete")
}
}

private def registerSessionExpirationListener() = {
zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
}
}

SessionExpirationListener

当broker和zookeeper重新建立连接后,SessionExpirationListener中的handleNewSession会被调用,这时先关闭之前的leader相关模块,然后重新尝试选举成为leader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class SessionExpirationListener() extends IZkStateListener with Logging {
this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}

/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
//和Zookeeper重新建立连接后,此方法会被调用
inLock(controllerContext.controllerLock) {
//先注销一些已经注册的监听器,关闭资源
onControllerResignation()
//重新尝试选举成controller
controllerElector.elect
}
}

override def handleSessionEstablishmentError(error: Throwable): Unit = {
//no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
}
}

ZookeeperLeaderElector

ZookeeperLeaderElector类实现leader选举的功能,但是它并不负责处理broker和zookeeper的会话超时(连接超时)的情况,而是认为调用者应该在会话恢复(连接重新建立)时进行重新选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ZookeeperLeaderElector(controllerContext: ControllerContext,
electionPath: String,
onBecomingLeader: () => Unit,
onResigningAsLeader: () => Unit,
brokerId: Int)
extends LeaderElector with Logging {
var leaderId = -1
// create the election path in ZK, if one does not exist
val index = electionPath.lastIndexOf("/")
if (index > 0)
controllerContext.zkUtils.makeSurePersistentPathExists(electionPath.substring(0, index))
val leaderChangeListener = new LeaderChangeListener

def startup {
inLock(controllerContext.controllerLock) {
// 添加/controller节点的IZkDataListener监听器
controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
// 选举
elect
}
}
}

ZookeeperLeaderElector的startup方法中调用elect方法选举leader

有下面几种情况会调用elect方法

  1. broker启动时,第一次调用
  2. 上一次创建节点成功,但是可能在等Zookeeper响应的时候,连接中断,resign方法中删除/controller节点后,触发了leaderChangeListener的handleDataDeleted
  3. 上一次创建节点未成功,但是可能在等Zookeeper响应的时候,连接中断,而再次进入elect方法时,已有别的broker创建controller节点成功,成为了leader
  4. 上一次创建节点成功,但是onBecomingLeader抛出了异常,而再次进入
    所以elect方法中先获取/controller节点信息,判断是否已经存在,然后再尝试选举leader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private def getControllerID(): Int = {
controllerContext.zkUtils.readDataMaybeNull(electionPath)._1 match {
case Some(controller) => KafkaController.parseControllerId(controller)
case None => -1
}
}

def elect: Boolean = {
val timestamp = SystemTime.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))

//先尝试获取/controller节点信息
leaderId = getControllerID
/*
* We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
* it's possible that the controller has already been elected when we get here. This check will prevent the following
* createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
*/
// 有下面几种情况会调用elect方法
// 1.broker启动时,第一次调用
// 2.上一次创建节点成功,但是可能在等Zookeeper响应的时候,连接中断,resign方法中删除/controller节点后,触发了leaderChangeListener的handleDataDeleted
// 3.上一次创建节点未成功,但是可能在等Zookeeper响应的时候,连接中断,而再次进入elect方法时,已有别的broker创建controller节点成功,成为了leader
// 4.上一次创建节点成功,但是onBecomingLeader抛出了异常,而再次进入
// 所以先获取节点信息,判断是否已经存在
if(leaderId != -1) {
debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
return amILeader
}

try {
val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
electString,
controllerContext.zkUtils.zkConnection.getZookeeper,
JaasUtils.isZkSecurityEnabled())
//创建/controller节点,并写入controller信息,brokerid, version, timestamp
zkCheckedEphemeral.create()
info(brokerId + " successfully elected as leader")
leaderId = brokerId
//写入成功,成为Leader,回调
onBecomingLeader()
} catch {
case e: ZkNodeExistsException =>
// If someone else has written the path, then
leaderId = getControllerID
//写入失败,节点已经存在,说明已有其他broker创建成功
if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
else
warn("A leader has been elected but just resigned, this will result in another round of election")

case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
//这里有可能是创建节点时,和zookeeper断开了连接,也有可能是onBecomingLeader的回调方法里出了异常
//onBecomingLeader方法里,一般是初始化leader的相关的模块,如果初始化失败,则调用resign方法先删除/controller节点
//当/controller节点被删除时,会触发leaderChangeListener的handleDataDeleted,会重新尝试选举成Leader,更重要的是也让其他broker有机会成为leader,避免某一个broker的onBecomingLeader一直失败造成整个集群一直处于“群龙无首”的尴尬局面
resign()
}
amILeader
}

def close = {
leaderId = -1
}

def amILeader : Boolean = leaderId == brokerId

def resign() = {
leaderId = -1
// 删除/controller节点
controllerContext.zkUtils.deletePath(electionPath)
}

在创建/controller节点时,若收到的异常是ZkNodeExistsException,则说明其他broker已经成为了leader。
而若是onBecomingLeader的回调方法里出了异常,一般是初始化leader的相关的模块出了问题,如果初始化失败,则调用resign方法先删除/controller节点。
当/controller节点被删除时,会触发leaderChangeListener的handleDataDeleted,会重新尝试选举成Leader。
更重要的是也让其他broker有机会成为leader,避免某一个broker的onBecomingLeader一直失败造成整个集群一直处于“群龙无首”的尴尬局面。

LeaderChangeListener

在startup方法中,注册了/controller节点的IZkDataListener监听器即LeaderChangeListener。
若节点数据有变化时,则有可能别的broker成为了leader,则调用onResigningAsLeader方法。
若节点被删除,则是leader已经出了故障下线了,如果当前broker之前是leader,则调用onResigningAsLeader方法,然后重新尝试选举成为leader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class LeaderChangeListener extends IZkDataListener with Logging {
/**
* Called when the leader information stored in zookeeper has changed. Record the new leader in memory
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) {
val amILeaderBeforeDataChange = amILeader
leaderId = KafkaController.parseControllerId(data.toString)
info("New leader is %d".format(leaderId))
// The old leader needs to resign leadership if it is no longer the leader
if (amILeaderBeforeDataChange && !amILeader)
//如果之前是Leader,而现在不是Leader
onResigningAsLeader()
}
}

/**
* Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
.format(brokerId, dataPath))
if(amILeader)
//如果之前是Leader
onResigningAsLeader()
//重新尝试选举成Leader
elect
}
}
}

onBecomingLeader方法对应KafkaController里的onControllerFailover方法,当成为新的leader后,要初始化leader所依赖的功能模块
onResigningAsLeader方法对应KafkaController里的onControllerResignation方法,当leader退位后,要关闭leader所依赖的功能模块

Leader选举流程图

整个leader选举的过程的流程图为
Kafka Leader选举流程图

[参考资料]
http://blog.csdn.net/zhanglh046/article/details/72821995

分享到

Keepalived+Nginx+Tomcat搭建高可用的Web服务(一):主备模式

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

前言

Nginx和Apache都可以用来作为反向代理服务器,来提供负载均衡的能力,使我们的web服务器,能够水平扩容,从而处理更多的用户请求,但是反向代理服务器又变成了一个单点,当反向代理服务器挂了,整合Web服务器就不能被外界访问到,所以我们必须要保证反向代理服务器的高可用。
Nginx+Tomcat集群模式

下面我们来演示下如何使用Keepalived,搭建高可用的Web服务,我们选择使用Nginx作为反向代理服务器。

Keepalived 简要介绍

Keepalived 是一种高性能的服务器高可用或热备解决方案,Keepalived 可以用来防止服务器单点故障的发生,通过配合 Nginx 可以实现 web 前端服务的高可用。

这种Nginx+keepalived高可用架构一般有2种模式:
1、主备模式
使用一个VIP(虚拟IP)地址,前端使用2台机器,一台做主,一台做备,但同时只有一台机器工作,另一台备份机器在主机器不出现故障的时候,永远处于浪费状态,对于服务器不多的网站,该方案不经济实惠。

2、双主模式
使用两个VIP(虚拟IP)地址,前端使用2台机器,互为主备,同时有两台机器工作,当其中一台机器出现故障,两台机器的请求转移到一台机器负担,非常适合于当前架构环境。

Keepalived 以 VRRP 协议为实现基础,用 VRRP 协议来实现高可用性(HA)。VRRP(Virtual Router Redundancy Protocol)协议是用于实现路由器冗余的协议,VRRP 协议将两台或多台路由器设备虚拟成一个设备,对外提供虚拟路由器 IP(一个或多个),而在路由器组内部,如果实际拥有这个对外 IP 的路由器如果工作正常的话就是 MASTER, 或者是通过算法选举产生, MASTER 实现针对虚拟路由器 IP 的各种网络功能,如 ARP 请求,ICMP,以及数据的转发等;其他设备不拥有该虚拟 IP,状态是 BACKUP,除了接收 MASTER 的VRRP 状态通告信息外,不执行对外的网络功能。当主机失效时,BACKUP 将接管原先 MASTER 的网络功能。
VRRP 协议使用多播数据来传输 VRRP 数据, VRRP 数据使用特殊的虚拟源 MAC 地址发送数据而不是自身网卡的 MAC 地址,VRRP 运行时只有 MASTER 路由器定时发送 VRRP 通告信息,表示 MASTER 工作正常以及虚拟路由器 IP(组),BACKUP 只接收 VRRP 数据,不发送数据,如果一定时间内没有接收到 MASTER 的通告信息,各 BACKUP 将宣告自己成为 MASTER,发送通告信息,重新进行 MASTER 选举状态。

本篇博文将主要演示Keepalived+Nginx的主备模式

Keepalived+Nginx+Tomcat主备模式

环境准备

Ubuntu 16.04 LTS
Keepalived v1.2.19 (03/13,2017)
Niginx nginx/1.10.3
Tomcat v8.0

准备4台虚拟机,两台Nginx和两台Tomcat,另外一个VIP(虚拟IP),192.168.224.150

虚拟机 IP 说明
Nginx1+Keepalived (Master) 192.168.224.101 Nginx Server 1
Nginx2+Keepalived (Backup) 192.168.224.102 Nginx Server 2
Tomcat1 192.168.224.103 Tomcat Web Server 1
Tomcat2 192.168.224.104 Tomcat Web Server 2

安装tomcat

在192.168.224.103,192.168.224.104上安装tomcat,并修改ROOT/index.jsp页面,在页面中加入tomcat的IP地址,并打印出request header中的X-NGINX值,这个值,我们将在后面配置NGINX时传入,如192.168.224.103上我们修改为:

1
2
3
<div id="asf-box">
<h1>${pageContext.servletContext.serverInfo}(192.168.224.103)<%=request.getHeader("X-NGINX")%></h1>
</div>

启动103和104两台机器的tomcat,确认浏览器能正确访问http://192.168.224.103:8080和http://192.168.224.104:8080
并且页面上能正确显示各自的IP地址,此时request header里没有X-NGINX,所以显示null。
tomcat

安装nginx

安装gcc g++的依赖库

1
2
apt-get install build-essential
apt-get install libtool

安装 pcre依赖库

1
2
sudo apt-get update
sudo apt-get install libpcre3 libpcre3-dev

安装 zlib依赖库

1
apt-get install zlib1g-dev

安装 ssl依赖库

1
apt-get install openssl

安装nginx,并启动

1
2
apt-get install nginx
nignx

浏览器中访问http://192.168.224.101
看到如下界面,则nginx配置成功
nignx

配置nginx反向代理

cat /etc/nginx/nginx.conf
查看nignx配置,默认会加载/etc/nginx/conf.d下所有*.conf文件,和/etc/nginx/sites-enabled下所有文件

1
2
include /etc/nginx/conf.d/*.conf;
include /etc/nginx/sites-enabled/*;

移除/etc/nginx/sites-enabled目录下的default配置,并在目录下创建tomcat.conf

1
2
3
4
cd /etc/nginx/sites-enabled
rm default
touch tomcat.conf
vi tomcat.conf

在tomcat.conf中,配置nginx的反向代理,将请求转发到103和104两台tomcat上。
为了能在tomcat页面上看出是哪台nginx转发过来的请求,我们这里给两台nginx分别配置两个自定义的header传给tomcat,tomcat将其取出打印在页面上。
在Master(101)中设置proxy_set_header X-NGINX “NGINX-1”

1
2
3
4
5
6
7
8
9
10
11
upstream tomcat {
server 192.168.224.103:8080 weight=1;
server 192.168.224.104:8080 weight=1;
}
server{
location / {
proxy_pass http://tomcat;
proxy_set_header X-NGINX "NGINX-1";
}
#......其他省略
}

在Backup(102)中设置proxy_set_header X-NGINX “NGINX-2”

1
2
3
4
5
6
7
8
9
10
11
upstream tomcat {
server 192.168.224.103:8080 weight=1;
server 192.168.224.104:8080 weight=1;
}
server{
location / {
proxy_pass http://tomcat;
proxy_set_header X-NGINX "NGINX-2";
}
#......其他省略
}

执行reload命令,使配置生效

1
nginx -s reload

按照上面方法在Master(101)上安装和配置好nginx,浏览器地址栏输入http://192.168.224.101
多次刷新页面,能看到页面上显式IP地址信息,192.168.224.103和192.168.224.104交替显式,说明nginx已经将用户请求负载均衡到了2台tomcat上
并且能看到NGINX-1也能显示在页面上,如下图
nignx-1 Master

同理在Backup(102)上安装和配置好nginx,浏览器地址栏输入http://192.168.224.102
多次刷新页面,能看到页面上显式IP地址信息,192.168.224.103和192.168.224.104交替显式,说明nginx已经将用户请求负载均衡到了2台tomcat上
并且能看到NGINX-2也能显示在页面上,如下图
nignx-2 Backup

安装Keepalived

1
apt-get install keepalived

在/etc/keepalived目录下,添加check_nginx.sh(检查nginx存活的shell脚本)和keepalived.conf(keepalived配置文件)

check_nginx.sh

1
2
3
4
5
6
7
8
9
10
#!/bin/bash  
#代码一定注意空格,逻辑就是:如果nginx进程不存在则启动nginx,如果nginx无法启动则kill掉keepalived所有进程
A=`ps -C nginx --no-header |wc -l`
if [ $A -eq 0 ];then
/etc/init.d/nginx start
sleep 3
if [ `ps -C nginx --no-header |wc -l`-eq 0 ];then
killall keepalived
fi
fi

Master(101)中的keepalived.conf配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
vrrp_script chk_nginx {  
script "/etc/keepalived/check_nginx.sh" //检测nginx进程的脚本
interval 2
weight -20
}

global_defs {
notification_email {
//可以添加邮件提醒
}
}
vrrp_instance VI_1 {
state MASTER //主服务器
interface ens33
virtual_router_id 51
mcast_src_ip 192.168.224.101
priority 250
advert_int 1

authentication {
auth_type PASS
auth_pass 123456
}
track_script {
chk_nginx
}
virtual_ipaddress {
192.168.224.150
}
}

Backup(102)中的keepalived.conf配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
vrrp_script chk_nginx {  
script "/etc/keepalived/check_nginx.sh" //检测nginx进程的脚本
interval 2
weight -20
}

global_defs {
notification_email {
//可以添加邮件提醒
}
}
vrrp_instance VI_1 {
state BACKUP //从服务器
interface ens33
virtual_router_id 51
mcast_src_ip 192.168.224.102
priority 240
advert_int 1

authentication {
auth_type PASS
auth_pass 123456
}
track_script {
chk_nginx
}
virtual_ipaddress {
192.168.224.150
}
}

关于keepalived配置的几点说明

  • state - 主服务器需配成MASTER,从服务器需配成BACKUP
  • interface - 这个是网卡名,我使用的是VM12.0的版本,所以这里网卡名为ens33
  • mcast_src_ip - 配置各自的实际IP地址
  • priority - 主服务器的优先级必须比从服务器的高,这里主服务器配置成250,从服务器配置成240
  • virtual_ipaddress - 配置虚拟IP(192.168.224.150)
  • authentication - auth_pass主从服务器必须一致,keepalived靠这个来通信
  • virtual_router_id - 主从服务器必须保持一致

Step-1 Master,Backup都正常,只有Master对外提供服务

配置完成后,先启动Master(101)机器的keepalived,并查看启动log

1
2
3
4
5
6
7
8
9
10
11
12
13
root@ubuntu:/etc/keepalived# /etc/init.d/keepalived start
[ ok ] Starting keepalived (via systemctl): keepalived.service.
root@ubuntu:/etc/keepalived# tail /var/log/syslog
Nov 21 00:46:38 ubuntu Keepalived_vrrp[1296]: Configuration is using : 68579 Bytes
Nov 21 00:46:38 ubuntu Keepalived_vrrp[1296]: Using LinkWatch kernel netlink reflector...
Nov 21 00:46:38 ubuntu Keepalived_healthcheckers[1295]: Initializing ipvs 2.6
Nov 21 00:46:38 ubuntu Keepalived_healthcheckers[1295]: Registering Kernel netlink reflector
Nov 21 00:46:38 ubuntu Keepalived_healthcheckers[1295]: Registering Kernel netlink command channel
Nov 21 00:46:38 ubuntu Keepalived_healthcheckers[1295]: Opening file '/etc/keepalived/keepalived.conf'.
Nov 21 00:46:38 ubuntu Keepalived_healthcheckers[1295]: Configuration is using : 11193 Bytes
Nov 21 00:46:38 ubuntu Keepalived_healthcheckers[1295]: Using LinkWatch kernel netlink reflector...
Nov 21 00:46:39 ubuntu Keepalived_vrrp[1296]: VRRP_Instance(VI_1) Transition to MASTER STATE
Nov 21 00:46:40 ubuntu Keepalived_vrrp[1296]: VRRP_Instance(VI_1) Entering MASTER STATE

查看keepalived和nginx进程,确保keepalived和nginx启动正常

1
2
3
4
5
6
7
8
9
10
root@ubuntu:/etc/keepalived# ps -ef | grep keepalived
root 1293 1 0 00:46 ? 00:00:00 /usr/sbin/keepalived
root 1295 1293 0 00:46 ? 00:00:00 /usr/sbin/keepalived
root 1296 1293 0 00:46 ? 00:00:00 /usr/sbin/keepalived
root 1377 1121 0 00:47 pts/0 00:00:00 grep --color=auto keepalived
root@ubuntu:/etc/keepalived# ps -ef | grep nginx
root 1365 1 0 00:47 ? 00:00:00 nginx: master process nginx
www-data 1366 1365 0 00:47 ? 00:00:00 nginx: worker process
www-data 1367 1365 0 00:47 ? 00:00:00 nginx: worker process
root 1381 1121 0 00:47 pts/0 00:00:00 grep --color=auto nginx

使用ip add命令,查看VIP是否被绑定到101机器上,可以看到192.168.224.150/32 scope global ens33已经被绑定到101机器上了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
root@ubuntu:/etc/keepalived# ip add
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
link/ether 00:0c:29:46:80:15 brd ff:ff:ff:ff:ff:ff
inet 192.168.224.101/24 brd 192.168.224.255 scope global ens33
valid_lft forever preferred_lft forever
inet 192.168.224.150/32 scope global ens33
valid_lft forever preferred_lft forever
inet6 fe80::20c:29ff:fe46:8015/64 scope link
valid_lft forever preferred_lft forever

再启动Backup(102)机器的keepalived和nginx,确保其正常启动,查看Backup的IP信息,发现VIP现在还没有绑定到Backup(102)上。

浏览器多次刷新并访问http://192.168.224.150/
可以看到页面上IP交替显式103和104,并且显示NGINX-1,则表明是Master(101)在转发web请求

Step-2 Master挂了,Backup接替Master对外提供服务

接着,我们在Master(101)机器上关闭keepalived和nginx进程来模拟Master服务器挂掉,并查看VIP是否还在Master机器上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
root@ubuntu:/etc/keepalived# killall keepalived 
root@ubuntu:/etc/keepalived# killall nginx
root@ubuntu:/etc/keepalived# ip add
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
link/ether 00:0c:29:46:80:15 brd ff:ff:ff:ff:ff:ff
inet 192.168.224.101/24 brd 192.168.224.255 scope global ens33
valid_lft forever preferred_lft forever
inet6 fe80::20c:29ff:fe46:8015/64 scope link
valid_lft forever preferred_lft forever

查看Backup(102)的VIP,发现VIP已经绑定到了Backup(102)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
root@ubuntu:/etc/keepalived# ip add
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
link/ether 00:0c:29:8c:eb:5c brd ff:ff:ff:ff:ff:ff
inet 192.168.224.102/24 brd 192.168.224.255 scope global ens33
valid_lft forever preferred_lft forever
inet 192.168.224.150/32 scope global ens33
valid_lft forever preferred_lft forever
inet6 fe80::20c:29ff:fe8c:eb5c/64 scope link
valid_lft forever preferred_lft forever

浏览器多次刷新并访问http://192.168.224.150/
可以看到页面上IP交替显式103和104,并且显示NGINX-2,则表明是Backup(102)在转发web请求,也就是说Master挂掉后,Backup继续接管Master的服务。

Step-3 Master恢复正常后,Master继续提供服务,Backup停止服务,并继续等待Master出现故障

我们再启动Master(101)机器的keepalived和nginx,查看VIP,发现VIP已经被Master“夺回”了使用权限
浏览器多次刷新并访问http://192.168.224.150/
可以看到页面上IP交替显式103和104,并且显示NGINX-1,则表明是Master(101)在转发web请求,也就是说Master重新启动后,Master重新对外提供服务,Backup则停止服务继续等待Master挂掉。

Keepalived抢占模式和非抢占模式

keepalived的HA分为抢占模式和非抢占模式,抢占模式即MASTER从故障中恢复后,会将VIP从BACKUP节点中抢占过来。非抢占模式即MASTER恢复后不抢占BACKUP升级为MASTER后的VIP。
前面的例子中,我们实际上配置的是抢占模式,下面我们再来看看非抢占模式

Master(101)中的keepalived.conf配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
vrrp_script chk_nginx {  
script "/etc/keepalived/check_nginx.sh" //检测nginx进程的脚本
interval 2
weight -20
}

global_defs {
notification_email {
//可以添加邮件提醒
}
}
vrrp_instance VI_1 {
state BACKUP //主服务器(非抢占模式需要配置成BACKUP)
interface ens33
virtual_router_id 51
mcast_src_ip 192.168.224.101
priority 250
advert_int 1
nopreempt //非抢占模式
authentication {
auth_type PASS
auth_pass 123456
}
track_script {
chk_nginx
}
virtual_ipaddress {
192.168.224.150
}
}

Backup(102)中的keepalived.conf配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
vrrp_script chk_nginx {  
script "/etc/keepalived/check_nginx.sh" //检测nginx进程的脚本
interval 2
weight -20
}

global_defs {
notification_email {
//可以添加邮件提醒
}
}
vrrp_instance VI_1 {
state BACKUP //从服务器
interface ens33
virtual_router_id 51
mcast_src_ip 192.168.224.102
priority 240
advert_int 1
nopreempt //非抢占模式
authentication {
auth_type PASS
auth_pass 123456
}
track_script {
chk_nginx
}
virtual_ipaddress {
192.168.224.150
}
}

抢占模式配置说明
和非抢占模式的配置相比,只改了两个地方:
1> 在vrrp_instance块下两个节点各增加了nopreempt指令,表示不争抢vip
2> 节点的state都为BACKUP
两个keepalived节点都启动后,默认都是BACKUP状态,双方在发送组播信息后,会根据优先级来选举一个MASTER出来。由于两者都配置了nopreempt,所以MASTER从故障中恢复后,不会抢占vip。这样会避免VIP切换可能造成的服务延迟。

(非抢占模式)Step-1 Master,Backup都正常,只有Master对外提供服务

配置完成后,先启动Master(101)机器的keepalived和nginx,查看Master的IP信息,可以看到VIP已经被绑定到101机器上了,再启动Backup(102)机器的keepalived和nginx,查看Backup的IP信息,可以看到VIP没有被绑定到102机器上了
浏览器多次刷新并访问http://192.168.224.150/
可以看到页面上IP交替显式103和104,并且显示NGINX-1,则表明是Master(101)在转发web请求

(非抢占模式)Step-2 Master挂了,Backup接替Master对外提供服务

接着,我们在Master(101)机器上关闭keepalived和nginx进程来模拟Master服务器挂掉,查看Backup(102)的VIP,发现VIP已经绑定到了Backup(102)
浏览器多次刷新并访问http://192.168.224.150/
可以看到页面上IP交替显式103和104,并且显示NGINX-2,则表明是Backup(102)在转发web请求,也就是说Master挂掉后,Backup继续接管Master的服务。

(非抢占模式)Step-3 Master恢复正常后,Backup继续对外提供服务,Master不会抢占VIP,而是继续等待Backup出现故障

我们再启动Master(101)机器的keepalived和nginx,查看VIP,发现VIP已经被Master“夺回”了使用权限
浏览器多次刷新并访问http://192.168.224.150/
可以看到页面上IP交替显式103和104,并且显示NGINX-2,则表明是Backup(102)在转发web请求,也就是说Master恢复后,并未接管Backup的服务。

(非抢占模式)Step-4 Backup挂了,Master重新绑定VIP,接替Backup对外提供服务

我们在Backup(102)机器上关闭keepalived和nginx进程来模拟Backup服务器挂掉,查看Master(101)的VIP,发现VIP已经绑定到了Master(101)
浏览器多次刷新并访问http://192.168.224.150/
可以看到页面上IP交替显式103和104,并且显示NGINX-1,则表明是Master(101)在转发web请求

[参考资料]

http://blog.csdn.net/joyo2008/article/details/78495921?locationNum=4&fps=1
http://blog.csdn.net/yougoule/article/details/78484991?locationNum=6&fps=1
http://blog.csdn.net/xyang81/article/details/52556886

分享到

Keepalived+Nginx+Tomcat搭建高可用的Web服务(二):双主模式

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

前一篇博文中使用keepalived实现nginx的高可用,并且演示的是主备模式,这种方式有一台机器一直作backup使用,有50%的资源被浪费。而双主模式中,两台主机互为主备,同时有两台机器工作,当其中一台机器出现故障,两台机器的请求转移到一台机器负担。

本篇博文将主要演示Keepalived+Nginx的双主模式

Keepalived+Nginx+Tomcat双主模式

环境准备

Ubuntu 16.04 LTS
Keepalived v1.2.19 (03/13,2017)
Niginx nginx/1.10.3
Tomcat v8.0

准备4台虚拟机,两台Nginx和两台Tomcat,另外两个VIP(虚拟IP),192.168.224.150和192.168.224.151

虚拟机 IP 说明
Nginx1+Keepalived (Master) 192.168.224.101 Nginx Server 1 Master for 192.168.224.150,Backup for 192.168.224.151
Nginx2+Keepalived (Master) 192.168.224.102 Nginx Server 2 Master for 192.168.224.151,Backup for 192.168.224.150
Tomcat1 192.168.224.103 Tomcat Web Server 1
Tomcat2 192.168.224.104 Tomcat Web Server 2

其他配置和前篇博文一样,区别只是keepalived配置不一样

101机器中keepalived.conf配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
vrrp_script chk_nginx {
script "/etc/keepalived/check_nginx.sh" //检测nginx进程的脚本
interval 2
weight -20
}

global_defs {
notification_email {
//可以添加邮件提醒
}
}
vrrp_instance VI_1 {
state MASTER //主服务器
interface ens33
virtual_router_id 51
mcast_src_ip 192.168.224.101
priority 250
advert_int 1

authentication {
auth_type PASS
auth_pass 123456
}
track_script {
chk_nginx
}
virtual_ipaddress {
192.168.224.150
}
}

vrrp_instance VI_2 {
state BACKUP //从服务器
interface ens33
virtual_router_id 52
mcast_src_ip 192.168.224.101
priority 240
advert_int 1

authentication {
auth_type PASS
auth_pass 123456
}
track_script {
chk_nginx
}
virtual_ipaddress {
192.168.224.151
}
}

102机器中keepalived.conf配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
vrrp_script chk_nginx {
script "/etc/keepalived/check_nginx.sh" //检测nginx进程的脚本
interval 2
weight -20
}

global_defs {
notification_email {
//可以添加邮件提醒
}
}
vrrp_instance VI_1 {
state BACKUP //从服务器
interface ens33
virtual_router_id 51
mcast_src_ip 192.168.224.102
priority 240
advert_int 1

authentication {
auth_type PASS
auth_pass 123456
}
track_script {
chk_nginx
}
virtual_ipaddress {
192.168.224.150
}
}

vrrp_instance VI_2 {
state MASTER //主服务器
interface ens33
virtual_router_id 52
mcast_src_ip 192.168.224.102
priority 250
advert_int 1

authentication {
auth_type PASS
auth_pass 123456
}
track_script {
chk_nginx
}
virtual_ipaddress {
192.168.224.151
}
}

注意观察配置,我们添加了一个vrrp_instance VI_2,绑定了新的VIP192.168.224.151,然后设置了virtual_router_id为52跟VI_1(51)区分开来,VI_2和VI_1的state两台机器配置刚好相反,互为主备。

两台服务器分别启动keepalived和nginx,观察,VIP(150)已经绑定到101机器上,VIP(151)已经绑定到102机器上
浏览器多次刷新并访问http://192.168.224.150/
可以看到页面上IP交替显式103和104,并且显示NGINX-1,则表明是机器(101)在转发web请求
浏览器多次刷新并访问http://192.168.224.151/
可以看到页面上IP交替显式103和104,并且显示NGINX-2,则表明是机器(102)在转发web请求

关闭101机器上的keepalived和nginx来模拟101宕机,查看102的IP信息,发现150,151都绑定到了102机器上
浏览器多次刷新并访问http://192.168.224.150/和http://192.168.224.151/
可以看到页面上IP交替显式103和104,并且显示NGINX-2,则表明是机器(102)在转发两个VIP的web请求

重新启动101机器上的keepalived和nginx,查看101的IP信息,发现150已经重新绑定到101机器上
浏览器多次刷新并访问http://192.168.224.150/
可以看到页面上IP交替显式103和104,并且显示NGINX-1,则表明是机器(101)在转发web请求

从上面的测试结果可以看出,使用两个VIP,将两台机器互作主备,可以100%利用上两台机器,当其中一台机器挂掉后,两个VIP均能通过另一台机器提供服务,达到服务的高可用性。
我们在用双主模式时,对于每台机器我们也应该给足够的余量,避免一台机器挂之后,另一台机器处理不了所有的请求,导致第二台机器接着挂掉,引起“雪崩效应”。

[参考资料]
http://blog.csdn.net/lexang1/article/details/52386909
https://www.cnblogs.com/kevingrace/p/6146031.html

分享到

Java分布式跟踪系统Zipkin(十):Zipkin源码分析-ElasticsearchStorage

分享到

Java分布式跟踪系统Zipkin(九):Zipkin源码分析-MySQLStorage

分享到

Java分布式跟踪系统Zipkin(八):Zipkin源码分析-KafkaCollector

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

前面几篇博文中,都是使用OkHttpSender来上报Trace信息给Zipkin,这在生产环境中,当业务量比较大的时候,可能会成为一个性能瓶颈,这一篇博文我们来使用KafkaSender将Trace信息先写入到Kafka中,然后Zipkin使用KafkaCollector从Kafka中收集Span信息。
在Brave配置中需要将Sender设置为KafkaSender,而zipkin的collector组件配置为KafkaCollector

相关代码在Chapter8/zipkin-kafka中
pom.xml中添加依赖

1
2
3
4
5
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-kafka11</artifactId>
<version>${zipkin-reporter2.version}</version>
</dependency>

TracingConfiguration中,我们修改Sender为KafkaSender,指定Kafka的地址,以及topic

1
2
3
4
@Bean
Sender sender() {
return KafkaSender.newBuilder().bootstrapServers("localhost:9091,localhost:9092,localhost:9093").topic("zipkin").encoding(Encoding.JSON).build();
}

我们先启动zookeeper(默认端口号为2181),再依次启动一个本地的3个broker的kafka集群(端口号分别为9091、9092、9093),最后启动一个KafkaManager(默认端口号9000),KafkaManager是Kafka的UI管理工具
关于如何搭建本地Kafka伪集群,请自行上网搜索教程,本文使用的Kafka版本为0.10.0.0。

kafka启动完毕后,我们创建名为zipkin的topic,因为我们有3个broker,我这里设置replication-factor=3

1
bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic zipkin

打开KafkaManager界面
http://localhost:9000/clusters/localhost/topics/zipkin
KafkaManager
可以看到topic zipkin中暂时没有消息。

我们使用如下命令启动zipkin,带上Kafka的Zookeeper地址参数,这样zipkin就会从kafka中消费我们上报的trace信息。

1
java -jar zipkin-server-2.2.1-exec.jar --KAFKA_ZOOKEEPER=localhost:2181

然后分别运行,主意我们这里将backend的端口改为9001,目的是为了避免和KafkaManager端口号冲突。

1
mvn spring-boot:run -Drun.jvmArguments="-Dserver.port=9001 -Dzipkin.service=backend"

1
mvn spring-boot:run -Drun.jvmArguments="-Dserver.port=8081 -Dzipkin.service=frontend"

浏览器访问 http://localhost:8081/ 会显示当前时间

我们再次刷新KafkaManager界面
http://localhost:9000/clusters/localhost/topics/zipkin
KafkaManager
可以看到topic zipkin中有两条消息。

为了看到这两条消息的具体内容,我们可以在kafka安装目录使用如下命令

1
bin/windows/kafka-console-consumer.bat --zookeeper localhost:2181 --topic zipkin --from-beginning

在控制台会打印出最近的两条消息

1
2
[{"traceId":"802bd09f480b5faa","parentId":"802bd09f480b5faa","id":"bb3c70909ea3ee3c","kind":"SERVER","name":"get","timestamp":1510891296426607,"duration":10681,"localEndpoint":{"serviceName":"backend","ipv4":"10.200.170.137"},"remoteEndpoint":{"ipv4":"127.0.0.1","port":64421},"tags":{"http.path":"/api"},"shared":true}]
[{"traceId":"802bd09f480b5faa","parentId":"802bd09f480b5faa","id":"bb3c70909ea3ee3c","kind":"CLIENT","name":"get","timestamp":1510891296399882,"duration":27542,"localEndpoint":{"serviceName":"frontend","ipv4":"10.200.170.137"},"tags":{"http.path":"/api"}},{"traceId":"802bd09f480b5faa","id":"802bd09f480b5faa","kind":"SERVER","name":"get","timestamp":1510891296393252,"duration":39514,"localEndpoint":{"serviceName":"frontend","ipv4":"10.200.170.137"},"remoteEndpoint":{"ipv6":"::1","port":64420},"tags":{"http.path":"/"}}]

这说明我们的应用frontend和backend已经将trace信息写入kafka成功了!

在Zipkin的Web界面中,也能查询到这次跟踪信息
在zipkin的控制台,我们也看到跟Kafka相关的类ConsumerFetcherThread启动,我们在后续专门分析zipkin的源代码再来看看这个类。

1
2
2017-11-17 11:25:00.477  INFO 9292 --- [49-8e18eab0-0-1] kafka.consumer.ConsumerFetcherThread     : [ConsumerFetcherThread-zipkin_LT290-1510889099649-8e18eab0-0-1], Starting
2017-11-17 11:25:00.482 INFO 9292 --- [r-finder-thread] kafka.consumer.ConsumerFetcherManager : [ConsumerFetcherManager-1510889099800] Added fetcher for partitions ArrayBuffer([[zipkin,0], initOffset 0 to broker id:1,host:10.200.170.137,port:9091] )

KafkaSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public abstract class KafkaSender extends Sender {
public static Builder newBuilder() {
// Settings below correspond to "Producer Configs"
// http://kafka.apache.org/0102/documentation.html#producerconfigs
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "0");
return new zipkin2.reporter.kafka11.AutoValue_KafkaSender.Builder()
.encoding(Encoding.JSON)
.properties(properties)
.topic("zipkin")
.overrides(Collections.EMPTY_MAP)
.messageMaxBytes(1000000);
}

@Override public zipkin2.Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new IllegalStateException("closed");
byte[] message = encoder().encode(encodedSpans);
return new KafkaCall(message);
}

}

KafkaSender中通过KafkaProducer客户端来发送消息给Kafka,在newBuilder方法中,设置了一些默认值,比如topic默认为zipkin,编码默认用JSON,消息最大字节数1000000,还可以通过overrides来覆盖默认的配置来定制KafkaProducer。

在sendSpans方法中返回KafkaCall,这个对象的execute方法,在AsyncReporter中的flush方法中会被调用:

1
2
3
4
5
void flush(BufferNextMessage bundler) {
// ...
sender.sendSpans(nextMessage).execute();
// ...
}

KafkaCall的父类BaseCall方法execute会调用doExecute,而在doExecute方法中使用了一个AwaitableCallback将KafkaProducer的异步发送消息的方法,强制转为了同步发送,这里也确实处理的比较优雅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class KafkaCall extends BaseCall<Void> { // KafkaFuture is not cancelable
private final byte[] message;

KafkaCall(byte[] message) {
this.message = message;
}

@Override protected Void doExecute() throws IOException {
final AwaitableCallback callback = new AwaitableCallback();
get().send(new ProducerRecord<>(topic(), message), (metadata, exception) -> {
if (exception == null) {
callback.onSuccess(null);
} else {
callback.onError(exception);
}
});
callback.await();
return null;
}

@Override protected void doEnqueue(Callback<Void> callback) {
get().send(new ProducerRecord<>(topic(), message), (metadata, exception) -> {
if (exception == null) {
callback.onSuccess(null);
} else {
callback.onError(exception);
}
});
}

@Override public Call<Void> clone() {
return new KafkaCall(message);
}
}

这里还有一个知识点,get方法每次都会返回一个新的KafkaProducer,我在第一眼看到这段代码时也曾怀疑,难道这里没有性能问题?
原来这里用到了google的插件autovalue里的标签@Memoized,结合@AutoValue标签,它会在自动生成的类里,给我们添加一些代码,可以看到get方法里作了一层缓存,所以我们的担心是没有必要的

1
2
3
4
5
@Memoized KafkaProducer<byte[], byte[]> get() {
KafkaProducer<byte[], byte[]> result = new KafkaProducer<>(properties());
provisioned = true;
return result;
}

AutoValue_KafkaSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final class AutoValue_KafkaSender extends $AutoValue_KafkaSender {
private volatile KafkaProducer<byte[], byte[]> get;

AutoValue_KafkaSender(Encoding encoding$, int messageMaxBytes$, BytesMessageEncoder encoder$,
String topic$, Properties properties$) {
super(encoding$, messageMaxBytes$, encoder$, topic$, properties$);
}

@Override
KafkaProducer<byte[], byte[]> get() {
if (get == null) {
synchronized (this) {
if (get == null) {
get = super.get();
if (get == null) {
throw new NullPointerException("get() cannot return null");
}
}
}
}
return get;
}
}

KafkaCollector

我们再来看下Zipkin中的KafkaCollector,我们打开zipkin-server的源代码,在目录resources/zipkin-server-shared.yml文件中,发现关于kafka的配置片段
而我们在本文前面使用–KAFKA_ZOOKEEPER启动了zipkin,将kafka的zookeeper参数传递给了KafkaServer的main方法,也就是说,我们制定了zipkin.collector.kafka.zookeeper的值为localhost:2181

1
java -jar zipkin-server-2.2.1-exec.jar --KAFKA_ZOOKEEPER=localhost:2181

zipkin-server-shared.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
zipkin:
collector:
kafka:
# ZooKeeper host string, comma-separated host:port value.
zookeeper: ${KAFKA_ZOOKEEPER:}
# Name of topic to poll for spans
topic: ${KAFKA_TOPIC:zipkin}
# Consumer group this process is consuming on behalf of.
group-id: ${KAFKA_GROUP_ID:zipkin}
# Count of consumer threads consuming the topic
streams: ${KAFKA_STREAMS:1}
# Maximum size of a message containing spans in bytes
max-message-size: ${KAFKA_MAX_MESSAGE_SIZE:1048576}

在pom.xml中,有如下依赖

1
2
3
4
5
6
<!-- Kafka Collector -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-autoconfigure-collector-kafka</artifactId>
<optional>true</optional>
</dependency>

ZipkinKafkaCollectorAutoConfiguration

我们找到zipkin-autoconfigure/collector-kafka的ZipkinKafkaCollectorAutoConfiguration类,使用了@Conditional注解,当KafkaZooKeeperSetCondition条件满足时,ZipkinKafkaCollectorAutoConfiguration类会被SpringBoot加载。当加载时,会配置KafkaCollector到spring容器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
@EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
@Conditional(KafkaZooKeeperSetCondition.class)
public class ZipkinKafkaCollectorAutoConfiguration {

/**
* This launches a thread to run start. This prevents a several second hang, or worse crash if
* zookeeper isn't running, yet.
*/
@Bean KafkaCollector kafka(ZipkinKafkaCollectorProperties kafka, CollectorSampler sampler,
CollectorMetrics metrics, StorageComponent storage) {
final KafkaCollector result =
kafka.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build();

// don't use @Bean(initMethod = "start") as it can crash the process if zookeeper is down
Thread start = new Thread("start " + result.getClass().getSimpleName()) {
@Override public void run() {
result.start();
}
};
start.setDaemon(true);
start.start();

return result;
}
}

KafkaZooKeeperSetCondition

KafkaZooKeeperSetCondition继承了SpringBootCondition,实现了getMatchOutcome方法,当上下文的环境变量中有配置zipkin.collector.kafka.zookeeper的时候,则条件满足,即ZipkinKafkaCollectorAutoConfiguration会被加载

1
2
3
4
5
6
7
8
9
10
11
final class KafkaZooKeeperSetCondition extends SpringBootCondition {
static final String PROPERTY_NAME = "zipkin.collector.kafka.zookeeper";

@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
String kafkaZookeeper = context.getEnvironment().getProperty(PROPERTY_NAME);
return kafkaZookeeper == null || kafkaZookeeper.isEmpty() ?
ConditionOutcome.noMatch(PROPERTY_NAME + " isn't set") :
ConditionOutcome.match();
}
}

在ZipkinKafkaCollectorAutoConfiguration中,启动了一个守护线程来运行KafkaCollector的start方法,避免zookeeper连不上,阻塞zipkin的启动过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class KafkaCollector implements CollectorComponent {
final LazyConnector connector;
final LazyStreams streams;

KafkaCollector(Builder builder) {
connector = new LazyConnector(builder);
streams = new LazyStreams(builder, connector);
}

@Override public KafkaCollector start() {
connector.get();
streams.get();
return this;
}
}

KafkaCollector中初始化了两个对象,LazyConnector,和LazyStreams,在start方法中调用了2个对象的get方法

LazyConnector

LazyConnector继承了Lazy,当get方法被调用的时候,compute方法会被调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class LazyConnector extends LazyCloseable<ZookeeperConsumerConnector> {

final ConsumerConfig config;

LazyConnector(Builder builder) {
this.config = new ConsumerConfig(builder.properties);
}

@Override protected ZookeeperConsumerConnector compute() {
return (ZookeeperConsumerConnector) createJavaConsumerConnector(config);
}

@Override
public void close() {
ZookeeperConsumerConnector maybeNull = maybeNull();
if (maybeNull != null) maybeNull.shutdown();
}
}

Lazy的get方法中,使用了典型的懒汉式单例模式,并使用了double-check,方式多线程构造多个实例,而真正构造对象是委派给compute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class Lazy<T> {

volatile T instance = null;

/** Remembers the result, if the operation completed unexceptionally. */
protected abstract T compute();

/** Returns the same value, computing as necessary */
public final T get() {
T result = instance;
if (result == null) {
synchronized (this) {
result = instance;
if (result == null) {
instance = result = tryCompute();
}
}
}
return result;
}

/**
* This is called in a synchronized block when the value to memorize hasn't yet been computed.
*
* <p>Extracted only for LazyCloseable, hence package protection.
*/
T tryCompute() {
return compute();
}
}

在LazyConnector的compute方法中根据ConsumerConfig构造出了ZookeeperConsumerConnector,这个是kafka 0.8版本一种重要的对象,基于zookeeper的ConsumerConnector。

LazyStreams

在LazyStreams的compute中,新建了一个线程池,线程池大小可以由参数streams(即zipkin.collector.kafka.streams)来指定,默认为一个线程的线程池。
然后通过topicCountMap设置zipkin的kafka消费使用的线程数,再使用ZookeeperConsumerConnector的createMessageStreams方法来创建KafkaStream,然后使用线程池执行KafkaStreamProcessor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static final class LazyStreams extends LazyCloseable<ExecutorService> {
final int streams;
final String topic;
final Collector collector;
final CollectorMetrics metrics;
final LazyCloseable<ZookeeperConsumerConnector> connector;
final AtomicReference<CheckResult> failure = new AtomicReference<>();

LazyStreams(Builder builder, LazyCloseable<ZookeeperConsumerConnector> connector) {
this.streams = builder.streams;
this.topic = builder.topic;
this.collector = builder.delegate.build();
this.metrics = builder.metrics;
this.connector = connector;
}

@Override protected ExecutorService compute() {
ExecutorService pool = streams == 1
? Executors.newSingleThreadExecutor()
: Executors.newFixedThreadPool(streams);

Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
topicCountMap.put(topic, streams);

for (KafkaStream<byte[], byte[]> stream : connector.get().createMessageStreams(topicCountMap)
.get(topic)) {
pool.execute(guardFailures(new KafkaStreamProcessor(stream, collector, metrics)));
}
return pool;
}

Runnable guardFailures(final Runnable delegate) {
return () -> {
try {
delegate.run();
} catch (RuntimeException e) {
failure.set(CheckResult.failed(e));
}
};
}

@Override
public void close() {
ExecutorService maybeNull = maybeNull();
if (maybeNull != null) maybeNull.shutdown();
}
}

KafkaStreamProcessor

在KafkaStreamProcessor的run方法中,迭代stream对象,取出获得的流数据,然后调用Collector的acceptSpans方法,即使用storage组件来接收并存储span数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final class KafkaStreamProcessor implements Runnable {
final KafkaStream<byte[], byte[]> stream;
final Collector collector;
final CollectorMetrics metrics;

KafkaStreamProcessor(
KafkaStream<byte[], byte[]> stream, Collector collector, CollectorMetrics metrics) {
this.stream = stream;
this.collector = collector;
this.metrics = metrics;
}

@Override
public void run() {
ConsumerIterator<byte[], byte[]> messages = stream.iterator();
while (messages.hasNext()) {
byte[] bytes = messages.next().message();
metrics.incrementMessages();

if (bytes.length == 0) {
metrics.incrementMessagesDropped();
continue;
}

// If we received legacy single-span encoding, decode it into a singleton list
if (bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
try {
metrics.incrementBytes(bytes.length);
Span span = SpanDecoder.THRIFT_DECODER.readSpan(bytes);
collector.accept(Collections.singletonList(span), NOOP);
} catch (RuntimeException e) {
metrics.incrementMessagesDropped();
}
} else {
collector.acceptSpans(bytes, DETECTING_DECODER, NOOP);
}
}
}
}

这里的kafka消费方式还是kafka0.8版本的,如果你想用kafka0.10+的版本,可以更改zipkin-server的pom,将collector-kafka10加入到依赖中,其原理跟kafka0.8的差不多,此处不再展开分析了。

1
2
3
4
5
6
7
8
9
10
11
<!-- Kafka10 Collector -->
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-collector-kafka10</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-collector-kafka10</artifactId>
</dependency>

在生产环境中,我们可以将zipkin的日志收集器改为kafka来提高系统的吞吐量,而且也可以让客户端和zipkin服务端解耦,客户端将不依赖zipkin服务端,只依赖kafka集群。

当然我们也可以将zipkin的collector替换为RabbitMQ来提高日志收集的效率,zipkin对scribe也作了支持,这里就不展开篇幅细说了。

分享到

Java分布式跟踪系统Zipkin(七):Zipkin源码分析-Zipkin的源码结构

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

前面花了大量篇幅来介绍Brave的使用,一直把Zipkin当黑盒在使用,现在来逐渐拨开Zipkin的神秘面纱。
Zipkin的源代码地址为:https://github.com/openzipkin/zipkin

Zipkin的源码结构
Zipkin的源码结构

  • zipkin - 对应的是zipkin v1
  • zipkin2 - 对应的是zipkin v2
  • zipkin-server - 是zipkin的web工程目录,zipkin.server.ZipkinServer是启动类
  • zipkin-ui - zipkin ui工程目录,zipkin的设计师前后端分离的,zipkin-server提供数据查询接口,zipkin-ui做数据展现。
  • zipkin-autoconfigure - 是为springboot提供的自动配置相关的类
    collector-kafka
    collector-kafka10
    collector-rabbitmq
    collector-scribe
    metrics-prometheus
    storage-cassandra
    storage-cassandra3
    storage-elasticsearch-aws
    storage-elasticsearch-http
    storage-mysql
    ui
  • zipkin-collector - 是zipkin比较重要的模块,收集trace信息,支持从kafka和rabbitmq,以及scribe中收集,这个模块是可选的,因为zipkin默认使用http协议提供给客户端来收集
    kafka
    kafka10
    rabbitmq
    scribe
  • zipkin-storage - 也是zipkin比较重要的模块,用于存储收集的trace信息,默认是使用内置的InMemoryStorage,即存储在内存中,重启就会丢失。我们可以根据我们实际的需要更换存储方式,将trace存储在mysql,elasticsearch,cassandra中。
    cassandra
    elasticsearch
    elasticsearch-http
    mysql
    zipkin2_cassandra

ZipkinServer

ZipkinServer是SpringBoot启动类,该类上使用了@EnableZipkinServer注解,加载了相关的Bean,而且在启动方法中添加了监听器RegisterZipkinHealthIndicators类,来初始化健康检查的相关bean。

1
2
3
4
5
6
7
8
9
10
@SpringBootApplication
@EnableZipkinServer
public class ZipkinServer {

public static void main(String[] args) {
new SpringApplicationBuilder(ZipkinServer.class)
.listeners(new RegisterZipkinHealthIndicators())
.properties("spring.config.name=zipkin-server").run(args);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({
ZipkinServerConfiguration.class,
BraveConfiguration.class,
ZipkinQueryApiV1.class,
ZipkinHttpCollector.class
})
public @interface EnableZipkinServer {

}

EnableZipkinServer注解导入了ZipkinServerConfiguration,BraveConfiguration,ZipkinQueryApiV1,ZipkinHttpCollector。注意,这里并没有导入ZipkinQueryApiV2,但是由于SpringBoot项目会默认加载和启动类在一个包,或者在其子包的所有使用Component,Controller,Service等注解的类,所以在启动后,也会发现ZipkinQueryApiV2也被加载了。

  • ZipkinServerConfiguration - Zipkin Server端所有核心配置
  • BraveConfiguration - Zipkin存储trace信息时,还可以将自身的trace信息一起记录,这时就依赖Brave相关的类,都在这个类里配置
  • ZipkinQueryApiV1 - Zipkin V1版本的查询API都在这个Controller中
  • ZipkinQueryApiV2 - Zipkin V2版本的查询API都在这个Controller中
  • ZipkinHttpCollector - Zipkin默认的Collector使用http协议里收集Trace信息,客户端调用/api/v1/spans或/api/v2/spans来上报trace信息

ZipkinServerConfiguration

所有Zipkin服务需要的Bean都在这个类里进行配置

  • ZipkinHealthIndicator - Zipkin健康自检的类
  • CollectorSampler - Collector的采样率,默认100%采样,可以通过zipkin.collector.sample-rate来设置采样率
  • CollectorMetrics - Collector的统计信息,默认实现为ActuateCollectorMetrics
  • BraveTracedStorageComponentEnhancer - Zipkin存储trace时的self-trace类,启用后会将Zipkin的Storage存储模块执行的trace信息也采集进系统中
  • InMemoryConfiguration - 默认的内存Storage存储配置,当zipkin.storage.type属性未指定,或者容器中没有配置StorageComponent时,该配置被激活

ZipkinHealthIndicator

Zipkin健康自检的类,实现了springboot-actuate的CompositeHealthIndicator,提供系统组件的健康信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final class ZipkinHealthIndicator extends CompositeHealthIndicator {

ZipkinHealthIndicator(HealthAggregator healthAggregator) {
super(healthAggregator);
}

void addComponent(Component component) {
String healthName = component instanceof V2StorageComponent
? ((V2StorageComponent) component).delegate().getClass().getSimpleName()
: component.getClass().getSimpleName();
healthName = healthName.replace("AutoValue_", "");
addHealthIndicator(healthName, new ComponentHealthIndicator(component));
}

static final class ComponentHealthIndicator implements HealthIndicator {
final Component component;

ComponentHealthIndicator(Component component) {
this.component = component;
}

@Override public Health health() {
Component.CheckResult result = component.check();
return result.ok ? Health.up().build() : Health.down(result.exception).build();
}
}
}

RegisterZipkinHealthIndicators

启动时加载的RegisterZipkinHealthIndicators类,当启动启动后,收到ApplicationReadyEvent事件,即系统已经启动完毕,会将Spring容器中的zipkin.Component添加到ZipkinHealthIndicator中

1
2
3
4
5
6
7
8
9
10
11
12
public final class RegisterZipkinHealthIndicators implements ApplicationListener {

@Override public void onApplicationEvent(ApplicationEvent event) {
if (!(event instanceof ApplicationReadyEvent)) return;
ConfigurableListableBeanFactory beanFactory =
((ApplicationReadyEvent) event).getApplicationContext().getBeanFactory();
ZipkinHealthIndicator healthIndicator = beanFactory.getBean(ZipkinHealthIndicator.class);
for (Component component : beanFactory.getBeansOfType(Component.class).values()) {
healthIndicator.addComponent(component);
}
}
}

启动zipkin,访问下面地址,可以看到输出zipkin的健康检查信息
http://localhost:9411/health.json

1
{"status":"UP","zipkin":{"status":"UP","InMemoryStorage":{"status":"UP"}},"diskSpace":{"status":"UP","total":429495595008,"free":392936411136,"threshold":10485760}}

ZipkinHttpCollector

Zipkin默认的Collector使用http协议里收集Trace信息,客户端均调用/api/v1/spans或/api/v2/spans来上报trace信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Autowired ZipkinHttpCollector(StorageComponent storage, CollectorSampler sampler,
CollectorMetrics metrics) {
this.metrics = metrics.forTransport("http");
this.collector = Collector.builder(getClass())
.storage(storage).sampler(sampler).metrics(this.metrics).build();
}

@RequestMapping(value = "/api/v2/spans", method = POST)
public ListenableFuture<ResponseEntity<?>> uploadSpansJson2(
@RequestHeader(value = "Content-Encoding", required = false) String encoding,
@RequestBody byte[] body
) {
return validateAndStoreSpans(encoding, JSON2_DECODER, body);
}

ListenableFuture<ResponseEntity<?>> validateAndStoreSpans(String encoding, SpanDecoder decoder,
byte[] body) {
SettableListenableFuture<ResponseEntity<?>> result = new SettableListenableFuture<>();
metrics.incrementMessages();
if (encoding != null && encoding.contains("gzip")) {
try {
body = gunzip(body);
} catch (IOException e) {
metrics.incrementMessagesDropped();
result.set(ResponseEntity.badRequest().body("Cannot gunzip spans: " + e.getMessage() + "\n"));
}
}
collector.acceptSpans(body, decoder, new Callback<Void>() {
@Override public void onSuccess(@Nullable Void value) {
result.set(SUCCESS);
}

@Override public void onError(Throwable t) {
String message = t.getMessage() == null ? t.getClass().getSimpleName() : t.getMessage();
result.set(t.getMessage() == null || message.startsWith("Cannot store")
? ResponseEntity.status(500).body(message + "\n")
: ResponseEntity.status(400).body(message + "\n"));
}
});
return result;
}

ZipkinHttpCollector中uploadSpansJson2方法接受所有/api/v2/spans请求,然后调用validateAndStoreSpans方法校验并存储Span
在validateAndStoreSpans方法中,当请求数据为gzip格式,会先解压缩,然后调用collector的acceptSpans方法

Collector

zipkin.collector.Collector的acceptSpans方法中,对各种格式的Span数据做了兼容处理,我们这里只看下V2版的JSON格式的Span是如何处理的,即会调用storage2(V2Collector)的acceptSpans方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Collector
extends zipkin.internal.Collector<SpanDecoder, zipkin.Span> {
@Override
public void acceptSpans(byte[] serializedSpans, SpanDecoder decoder, Callback<Void> callback) {
try {
if (decoder instanceof DetectingSpanDecoder) decoder = detectFormat(serializedSpans);
} catch (RuntimeException e) {
metrics.incrementBytes(serializedSpans.length);
callback.onError(errorReading(e));
return;
}
if (storage2 != null && decoder instanceof V2JsonSpanDecoder) {
storage2.acceptSpans(serializedSpans, SpanBytesDecoder.JSON_V2, callback);
} else {
super.acceptSpans(serializedSpans, decoder, callback);
}
}
}

V2Collector

zipkin.internal.V2Collector继承了zipkin.internal.Collector,而在Collector的acceptSpans方法中会调用decodeList先将传入的二进制数据转换成Span对象,然后调用accept方法,accept方法中会调用sampled方法,将需要采样的Span过滤出来,最后调用record方法将Span信息存入Storage中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public abstract class Collector<D, S> {
protected void acceptSpans(byte[] serializedSpans, D decoder, Callback<Void> callback) {
metrics.incrementBytes(serializedSpans.length);
List<S> spans;
try {
spans = decodeList(decoder, serializedSpans);
} catch (RuntimeException e) {
callback.onError(errorReading(e));
return;
}
accept(spans, callback);
}

public void accept(List<S> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
metrics.incrementSpans(spans.size());

List<S> sampled = sample(spans);
if (sampled.isEmpty()) {
callback.onSuccess(null);
return;
}

try {
record(sampled, acceptSpansCallback(sampled));
callback.onSuccess(null);
} catch (RuntimeException e) {
callback.onError(errorStoringSpans(sampled, e));
return;
}
}

List<S> sample(List<S> input) {
List<S> sampled = new ArrayList<>(input.size());
for (S s : input) {
if (isSampled(s)) sampled.add(s);
}
int dropped = input.size() - sampled.size();
if (dropped > 0) metrics.incrementSpansDropped(dropped);
return sampled;
}
}

V2Collector中的record方法会调用storage的accept方法,zipkin默认会使用InMemoryStorage来存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class V2Collector extends Collector<BytesDecoder<Span>, Span> {
@Override protected List<Span> decodeList(BytesDecoder<Span> decoder, byte[] serialized) {
List<Span> out = new ArrayList<>();
if (!decoder.decodeList(serialized, out)) return Collections.emptyList();
return out;
}

@Override protected boolean isSampled(Span span) {
return sampler.isSampled(Util.lowerHexToUnsignedLong(span.traceId()), span.debug());
}

@Override protected void record(List<Span> sampled, Callback<Void> callback) {
storage.spanConsumer().accept(sampled).enqueue(new V2CallbackAdapter<>(callback));
}
}

ZipkinQueryApiV1 & ZipkinQueryApiV2

暴露了Zipkin对外的查询API,V1和V2的区别,主要是Span里的字段叫法不一样了,这里主要看下ZipkinQueryApiV2,ZipkinQueryApiV2方法都比较简单,主要是调用storage组件来实现查询功能。

/dependencies - 查看所有trace的依赖关系
/services - 查看所有的services
/spans - 根据serviceName查询spans信息
/traces - 根据serviceName,spanName,annotationQuery,minDuration,maxDuration等来搜索traces信息
/trace/{traceIdHex} - 根据traceId查询某条trace信息

至此ZipkinServer的代码分析的差不多了,在后面博文中我们再具体分析各种Storage,和Collector的源代码。

分享到