深入学习Kafka:集群中Controller和Broker之间通讯机制分析 - ControllerChannelManager

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

Kafka集群中,首先会选举出一个broker作为controller,然后该controller负责跟其他broker进行协调topic创建,partition主副本选举,topic删除等事务。
下面我们来分析controller和其他broker的通讯机制
controller会发三种请求给其他broker,即:

  • LeaderAndIsrRequest - 针对topic,KafkaController会进行partition选主,产生最新的ISR, 然后发送LeaderAndIsrRequest到topic的各个replica,即其他broker。
  • StopReplicaRequest - 当broker挂掉或用户删除某replica时,会发送LeaderAndIsrRequest给其他broker
  • UpdateMetadataRequest - 当broker变动(挂掉或重启), topic创建, partition增加,partition主副本选举等等时机都需要更新metadata,以便KafkaClient能知道最新的topic的partition信息,也知道每个partition的leader是哪个broker,知道该向哪个broker读写消息。

为了提高KafkaController Leader和集群其他broker的通信效率,ControllerBrokerRequestBatch实现批量发送请求的功能。
在ControllerBrokerRequestBatch先将一批需要发送给其他broker的信息压入queue中,然后通过ControllerChannelManager从queue中取出数据批量发送给其他broker。
leaderAndIsrRequestMap:保存了发往指定broker的LeaderAndIsrRequest请求相关的信息
stopReplicaRequestMap: 保存了发往指定broker的StopReplicaRequest请求相关的信息
updateMetadataRequestMap:保存了发往指定broker的UpdateMetadataRequest请求相关的信息

ControllerChannelManager

ControllerChannelManager在构造对象时,会初始化brokerStateInfo,调用addNewBroker方法,为每个存活的broker维持一个ControllerBrokerStateInfo对象,这个ControllerBrokerStateInfo对象中有networkClient, brokerNode, messageQueue, requestThread。其中

  • networkClient - 负责底层的网络通信的客户端
  • brokerNode - 有broker的IP,端口号以及机架信息
  • messageQueue - 需要发送的消息队列BlockingQueue
  • requestThread - 发送请求的线程RequestSendThread

注意在addNewBroker中,初始化RequestSendThread后,不会立即运行,需要等到startup方法执行时才运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private def addNewBroker(broker: Broker) {
//初始化阻塞队列,用于存放发给broker的消息
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
//初始化底层的通信客户端
val networkClient = {
val channelBuilder = ChannelBuilders.create(
config.interBrokerSecurityProtocol,
Mode.CLIENT,
LoginType.SERVER,
config.values,
config.saslMechanismInterBrokerProtocol,
config.saslInterBrokerHandshakeRequestEnable
)
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
time,
"controller-channel",
Map("broker-id" -> broker.id.toString).asJava,
false,
channelBuilder
)
new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
1,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
time
)
}
val threadName = threadNamePrefix match {
case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
}

//初始化发送请求的线程RequestSendThread,但是不开始
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
brokerNode, config, time, threadName)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "

controllerContext.liveBrokers.foreach(addNewBroker(_))

def startup() = {
brokerLock synchronized {
brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
}
}

def shutdown() = {
brokerLock synchronized {
brokerStateInfo.values.foreach(removeExistingBroker)
}
}

def addBroker(broker: Broker) {
// be careful here. Maybe the startup() API has already started the request send thread
//有可能startup中已经启动了thread
brokerLock synchronized {
if(!brokerStateInfo.contains(broker.id)) {
addNewBroker(broker)
startRequestSendThread(broker.id)
}
}
}

def removeBroker(brokerId: Int) {
brokerLock synchronized {
removeExistingBroker(brokerStateInfo(brokerId))
}
}

//移除已经存在的broker
private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
try {
brokerState.networkClient.close()
brokerState.messageQueue.clear()
brokerState.requestSendThread.shutdown()
brokerStateInfo.remove(brokerState.brokerNode.id)
} catch {
case e: Throwable => error("Error while removing broker by the controller", e)
}
}

//开始请求发送线程
protected def startRequestSendThread(brokerId: Int) {
val requestThread = brokerStateInfo(brokerId).requestSendThread
if(requestThread.getState == Thread.State.NEW)
requestThread.start()
}
}

sendRequest方法中,只是把request压入对应broker的queue中

1
2
3
4
5
6
7
8
9
10
11
def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) {
brokerLock synchronized {
val stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback))
case None =>
warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
}
}
}

RequestSendThread

当broker被选举成controller后,KafkaController的onControllerFailover方法会被调用,在这个方法中会调用ControllerChannelManager的startup方法。
在startup中会依次启动每个broker的请求发送线程,在RequestSendThread的doWork方法中,会从queue中取出需要发送的Item,然后一个个发送给相应的Broker,遇到broker未准备就绪,或者发送失败,都会等待300ms后再次重试,直到有收到正确的响应,并调用callback方法回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
val queue: BlockingQueue[QueueItem],
val networkClient: NetworkClient,
val brokerNode: Node,
val config: KafkaConfig,
val time: Time,
name: String)
extends ShutdownableThread(name = name) {

private val lock = new Object()
private val stateChangeLogger = KafkaController.stateChangeLogger
private val socketTimeoutMs = config.controllerSocketTimeoutMs

override def doWork(): Unit = {

def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300))

//从queue中取出QueueItem
val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
import NetworkClientBlockingOps._
var clientResponse: ClientResponse = null
try {
lock synchronized {
var isSendSuccessful = false
while (isRunning.get() && !isSendSuccessful) {
// if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
// removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
//如果某个broker宕机了,controller会收到zookeeper事件,会触发removeBroker,将会关闭该broker相应的thread
try {
if (!brokerReady()) {
//如果broker没有就绪,则等待300ms后继续尝试
isSendSuccessful = false
backoff()
}
else {
val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
//发送请求
clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
isSendSuccessful = true
}
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
"Reconnecting to broker.").format(controllerId, controllerContext.epoch,
request.toString, brokerNode.toString()), e)
networkClient.close(brokerNode.idString)
//如果发送失败,则等待300ms后继续尝试
isSendSuccessful = false
backoff()
}
}
if (clientResponse != null) {
val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
}
stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
.format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))

if (callback != null) {
callback(response)
}
}
}
} catch {
case e: Throwable =>
error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
// If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
networkClient.close(brokerNode.idString)
}
}

}

ControllerBrokerRequestBatch

在KafkaController,PartitionStateMachine,ReplicaStateMachine等类中,有Broker,Partition状态等发生改变时,需要向其他Broker同步这些信息时,都会调用ControllerBrokerRequestBatch来批量发送。
先看一个例子,先调用newBatch方法,然后再调用addStopReplicaRequestForBrokers等方法向queue中添加需要发送的请求,最后调用sendRequestsToBrokers方法来完成发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
callbacks: Callbacks = (new CallbackBuilder).build) {
if(replicas.size > 0) {
info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
try {
brokerRequestBatch.newBatch()
replicas.foreach(r => handleStateChange(r, targetState, callbacks))
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}catch {
case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
}
}
}

ControllerBrokerRequestBatch中对于三种请求leaderAndIsrRequest,stopReplicaRequest,updateMetadataRequest维持了不同的map,分别提供的addLeaderAndIsrRequestForBrokers,addStopReplicaRequestForBrokers,addUpdateMetadataRequestForBrokers用来添加对应的Request到map中,最后调用sendRequestsToBrokers方法来将不同的request压入不同的RequestSendThread线程队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging {
val controllerContext = controller.controllerContext
val controllerId: Int = controller.config.brokerId
val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
private val stateChangeLogger = KafkaController.stateChangeLogger

def newBatch() {
// raise error if the previous batch is not empty
if (leaderAndIsrRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
"a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
if (stopReplicaRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
if (updateMetadataRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
}

def clear() {
leaderAndIsrRequestMap.clear()
stopReplicaRequestMap.clear()
updateMetadataRequestMap.clear()
}

//添加LeaderAndIsrRequest
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
val topicPartition = new TopicPartition(topic, partition)

brokerIds.filter(_ >= 0).foreach { brokerId =>
val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
}

addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
Set(TopicAndPartition(topic, partition)))
}

//添加StopReplicaRequest
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
callback: (AbstractRequestResponse, Int) => Unit = null) {
brokerIds.filter(b => b >= 0).foreach { brokerId =>
stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
val v = stopReplicaRequestMap(brokerId)
//添加停止副本fetch的request到map中
if(callback != null)
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
else
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition)
}
}

/** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
//添加UpdateMetadataRequest
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
callback: AbstractRequestResponse => Unit = null) {
def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = if (beingDeleted) {
val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
} else {
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
}
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
//将最新的metadata信息放入updateMetadataRequestMap中
updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
}
case None =>
info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
}
}

val filteredPartitions = {
val givenPartitions = if (partitions.isEmpty)
controllerContext.partitionLeadershipInfo.keySet
else
partitions
if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
givenPartitions
else
givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
}
if (filteredPartitions.isEmpty)
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
}
else
filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))

controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
}

//依次取出三个map的内容,分别放入对应的queue中
def sendRequestsToBrokers(controllerEpoch: Int) {
try {
//将leaderAndIsrRequest放入queue
leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
partitionStateInfos.foreach { case (topicPartition, state) =>
val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
state.leaderIsrAndControllerEpoch, broker,
topicPartition.topic, topicPartition.partition))
}
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
_.getNode(controller.config.interBrokerSecurityProtocol)
}
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
topicPartition -> partitionState
}
val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
//将leaderAndIsrRequest放入queue,而非真正发送,此处并没有给callback,不需要回调
controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
}
leaderAndIsrRequestMap.clear()
updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>

partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
"to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
broker, p._1)))
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
topicPartition -> partitionState
}

val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
else 0: Short

val updateMetadataRequest =
if (version == 0) {
val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
}
else {
val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
}
new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
}
new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
}
//将updateMetadataRequest放入queue,而非真正发送,此处并没有给callback,不需要回调
controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
}
updateMetadataRequestMap.clear()
stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
debug("The stop replica request (delete = true) sent to broker %d is %s"
.format(broker, stopReplicaWithDelete.mkString(",")))
debug("The stop replica request (delete = false) sent to broker %d is %s"
.format(broker, stopReplicaWithoutDelete.mkString(",")))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,
Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
//将stopReplicaRequest放入queue,而非真正发送
controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)
}
}
stopReplicaRequestMap.clear()
} catch {
case e : Throwable => {
if (leaderAndIsrRequestMap.size > 0) {
error("Haven't been able to send leader and isr requests, current state of " +
s"the map is $leaderAndIsrRequestMap. Exception message: $e")
}
if (updateMetadataRequestMap.size > 0) {
error("Haven't been able to send metadata update requests, current state of " +
s"the map is $updateMetadataRequestMap. Exception message: $e")
}
if (stopReplicaRequestMap.size > 0) {
error("Haven't been able to send stop replica requests, current state of " +
s"the map is $stopReplicaRequestMap. Exception message: $e")
}
throw new IllegalStateException(e)
}
}
}
}

这篇博文并未对底层的通信协议进行展开分析,后续会有专门博文来分析协议相关的东西。

[参考资料]
http://blog.csdn.net/zhanglh046/article/details/72821930

分享到