深入学习Kafka:PartitionLeaderSelector源码分析

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

PartitionLeaderSelector主要是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象。
TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的当前leader和ISR记录。该方法会返回一个元组包括了选举出来的leader和ISR以及需要接收LeaderAndISr请求的一组副本。

1
2
3
trait PartitionLeaderSelector {
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}

PartitionLeaderSelector的实现类主要有

  1. NoOpLeaderSelector
  2. OfflinePartitionLeaderSelector
  3. ReassignedPartitionLeaderSelector
  4. PreferredReplicaPartitionLeaderSelector
  5. ControlledShutdownLeaderSelector

PartitionLeaderSelector 类图

NoOpLeaderSelector

NoOpLeaderSelector就是啥也不做的LeaderSelector。

1
2
3
4
5
6
7
8
9
class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {

this.logIdent = "[NoOpLeaderSelector]: "

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
(currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
}
}

ControlledShutdownLeaderSelector

当controller收到shutdown命令后,触发新的分区主副本选举
先找出已分配的副本集合(assignedReplicas),然后过滤出仍存活的副本集合(liveAssignedReplicas),在该列表中选取第一个broker作为该分区的主副本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
extends PartitionLeaderSelector
with Logging {

this.logIdent = "[ControlledShutdownLeaderSelector]: "

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion

val currentLeader = currentLeaderAndIsr.leader

//已分配的Replica
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
//仍存活的Replica
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))

//当前的ISR列表中滤掉挂掉的broker
val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
//存活的ISR列表中,选出第一个broker作为该分区的Leader(主副本)
liveAssignedReplicas.filter(newIsr.contains).headOption match {
case Some(newLeader) =>
//如果存在,则将当前LeaderEpoch计数器加1,对应的Zookeeper节点的版本号也加1
debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
case None =>
//不存在则报错StateChangeFailedException
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}
}

PreferredReplicaPartitionLeaderSelector

当controller收到分区主副本重新优化分配命令后,触发新的分区主副本优化,即将AR里的第一个取出,作为优化后的主副本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
//已分配的Replica
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//已分配的Replica列表中第一个即为最优的副本
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
//检查是否当前分区主副本已经是最优的副本,则报错LeaderElectionNotNeededException
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if (currentLeader == preferredReplica) {
throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
.format(preferredReplica, topicAndPartition))
} else {
info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
" Trigerring preferred replica leader election")
// check if preferred replica is not the current leader and is alive and in the isr
if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
//如果当前的最优主副本存活,返回将其设为最优主副本,则将当前LeaderEpoch计数器加1,对应的Zookeeper节点的版本号也加1
(new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
} else {
//如果当前的最优主副本挂掉了,则报错StateChangeFailedException
throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
"%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
}

ReassignedPartitionLeaderSelector

在某个topic重新分配分区的时候,触发新的主副本选举,将存活的ISR中的第一个副本选举成为主副本(leader)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[ReassignedPartitionLeaderSelector]: "

/**
* The reassigned replicas are already in the ISR when selectLeader is called.
*/
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
//重新分配的ISR副本集
val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
//过滤出仍存活的重新分配的ISR副本集
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
currentLeaderAndIsr.isr.contains(r))
//选取ISR中的第一个为主副本
val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
newLeaderOpt match {
//返回ISR中的第一个为主副本,则将当前LeaderEpoch计数器加1,对应的Zookeeper节点的版本号也加1
case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
case None =>
//如果没有存活的ISR,则报错NoReplicaOnlineException,选举失败
reassignedInSyncReplicas.size match {
case 0 =>
throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
"%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
}

OfflinePartitionLeaderSelector

选出新的Leader,新的ISR,步骤如下:

  1. 如果至少有一个broker在ISR列表中,并且存活,则将其选为leader,ISR中存活的为新的ISR
  2. 如果ISR列表为空,且unclean.leader.election.enable=false,则报错NoReplicaOnlineException
  3. 如果unclean.leader.election.enable=true,即意味着可以选举不在ISR列表中的broker为Leader,即在AR列表中选出Leader,但是这样会引起数据不一致
  4. 若AR列表也为空,则报错NoReplicaOnlineException
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
    extends PartitionLeaderSelector with Logging {
    this.logIdent = "[OfflinePartitionLeaderSelector]: "

    def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
    case Some(assignedReplicas) =>
    val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
    val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
    val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
    case true =>
    // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
    // for unclean leader election.
    if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
    ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
    throw new NoReplicaOnlineException(("No broker in ISR for partition " +
    "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
    " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
    }

    debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
    .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
    liveAssignedReplicas.isEmpty match {
    case true =>
    //若AR列表也为空,则报错NoReplicaOnlineException
    throw new NoReplicaOnlineException(("No replica for partition " +
    "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
    " Assigned replicas are: [%s]".format(assignedReplicas))
    case false =>
    //如果unclean.leader.election.enable=true,即意味着可以选举不在ISR列表中的broker为Leader,即在AR列表中,选出Leader
    ControllerStats.uncleanLeaderElectionRate.mark()
    val newLeader = liveAssignedReplicas.head
    warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
    .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
    new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
    }
    case false =>
    //如果至少有一个broker在ISR列表中,并且存活,则将其选为leader,ISR中存活的为新的ISR
    val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
    val newLeader = liveReplicasInIsr.head
    debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
    .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
    new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
    }
    info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
    (newLeaderAndIsr, liveAssignedReplicas)
    case None =>
    throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
    }
    }
    }

[参考资料]
https://yq.aliyun.com/articles/15283
http://blog.csdn.net/zhanglh046/article/details/72822066

分享到