深入学习Kafka:Topic的删除过程分析

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

要删除Topic,需要执行下面命令:

1
.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test

这里假设zookeeper地址为localhost,要删除的topic是test,这条命令实际上是在zookeeper的节点/admin/delete_topics下创建一个节点test,节点名为topic名字。(很多博文中说这个节点时临时的,其实不是,是个持久节点,直到topic真正删除时,才会被controller删除)
执行这段命令后控制台输出
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
也就是说执行删除命令,不是真正删除,而是标记删除,在zookeeper上添加/admin/delete_topics/test节点,也提醒了我们,需要提前打开delete.topic.enable开关。

Kafka删除Topic的源码分析

在Kafka中,Topic的删除是靠DeleteTopicManager类来完成的。
当Broker被选举成集群Leader之后,KafkaController中的onControllerFailover会被调用,在该方法中会调用deleteTopicManager.start()来启动删除Topic的线程。
而当Broker不再成为集群Leader时,KafkaController中的onControllerResignation会被调用,在该方法中会调用deleteTopicManager.shutdown()来关闭删除Topic的线程。

在KafkaController的onControllerFailover方法中,初始化了partitionStateMachine状态机,并注册了相应的事件监听器,主要是监听zookeeper节点/admin/delete_topics下子节点的变化。

1
2
3
4
5
6
7
8
9
10
11
def onControllerFailover() {
if(isRunning) {
// ...
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
// ...
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class PartitionStateMachine{
def registerListeners() {
registerTopicChangeListener()
if(controller.config.deleteTopicEnable)
//注册事件监听,关注节点/admin/delete_topics下子节点的变化
registerDeleteTopicListener()
}

private def registerDeleteTopicListener() = {
zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}

private def deregisterDeleteTopicListener() = {
zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
}

kafka.controller.PartitionStateMachine.DeleteTopicsListener

DeleteTopicsListener将监听zookeeper节点/admin/delete_topics下子节点的变化,当有childChange,即有新的topic需要被删除时,该handleChildChange会被触发,将该topic加入到deleteTopicManager的queue中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class DeleteTopicsListener() extends IZkChildListener with Logging {
this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
val zkUtils = controllerContext.zkUtils

/**
* Invoked when a topic is being deleted
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
//监听zookeeper节点/admin/delete_topics下子节点的变化,当有childChange,即有新的topic需要被删除时,该handleChildChange会被触发
inLock(controllerContext.controllerLock) {
var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[String]).toSet
}
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
//查询Topic是否存在,若topic已经不存在了,则直接删除/admin/delete_topics/< topic_name >节点
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
if(nonExistentTopics.size > 0) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
}
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
// mark topic ineligible for deletion if other state changes are in progress
// 查询topic是否为当前正在执行Preferred副本选举或分区重分配,若果是,则标记为暂时不适合被删除。
topicsToBeDeleted.foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
// add topic to deletion list
//添加topic到待删除的queue中
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
}
}

/**
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}

TopicDeletionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class TopicDeletionManager(controller: KafkaController,
initialTopicsToBeDeleted: Set[String] = Set.empty,
initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
val controllerContext = controller.controllerContext
//partition状态机
val partitionStateMachine = controller.partitionStateMachine
//replica状态机
val replicaStateMachine = controller.replicaStateMachine
val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
val deleteLock = new ReentrantLock()
val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
(initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
val deleteTopicsCond = deleteLock.newCondition()
val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
var deleteTopicsThread: DeleteTopicsThread = null
val isDeleteTopicEnabled = controller.config.deleteTopicEnable

/**
* Invoked at the end of new controller initiation
*/
def start() {
if (isDeleteTopicEnabled) {
//如果topic.delete.enable=true,则启动删除线程
deleteTopicsThread = new DeleteTopicsThread()
if (topicsToBeDeleted.size > 0)
deleteTopicStateChanged.set(true)
deleteTopicsThread.start()
}
}

/**
* Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.
*/
def shutdown() {
// Only allow one shutdown to go through
if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {
// Resume the topic deletion so it doesn't block on the condition
//此时删除线程有可能处于等待状态,即awaitTopicDeletionNotification方法处于阻塞等待状态,则唤醒该删除线程
resumeTopicDeletionThread()
// Await delete topic thread to exit
//等待删除线程doWork执行结束
deleteTopicsThread.awaitShutdown()
//清除资源
topicsToBeDeleted.clear()
partitionsToBeDeleted.clear()
topicsIneligibleForDeletion.clear()
}
}
}

DeleteTopicsThread

DeleteTopicsThread继承自ShutdownableThread,ShutdownableThread是一个可以循环执行某个方法(doWork方法)的线程,也提供了shutdown方法同步等待该线程真正运行结束,代码比较简单。利用了CountDownLatch来阻塞调用shutdown的线程,待doWork真正执行结束时,再唤醒其他阻塞的线程。

ShutdownableThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)
extends Thread(name) with Logging {
this.setDaemon(false)
this.logIdent = "[" + name + "], "
val isRunning: AtomicBoolean = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)

def shutdown() = {
//设置running状态为false
initiateShutdown()
//等待在运行的任务执行完毕
awaitShutdown()
}

def initiateShutdown(): Boolean = {
if(isRunning.compareAndSet(true, false)) {
info("Shutting down")
isRunning.set(false)
if (isInterruptible)
interrupt()
true
} else
false
}

/**
* After calling initiateShutdown(), use this API to wait until the shutdown is complete
*/
def awaitShutdown(): Unit = {
//等待线程运行结束
shutdownLatch.await()
info("Shutdown completed")
}

/**
* This method is repeatedly invoked until the thread shuts down or this method throws an exception
*/
def doWork(): Unit

override def run(): Unit = {
info("Starting ")
try{
while(isRunning.get()){
doWork()
}
} catch{
case e: Throwable =>
if(isRunning.get())
error("Error due to ", e)
}
//计数器减一,唤醒在awaitShutdown方法上等待的线程
shutdownLatch.countDown()
info("Stopped ")
}
}

DeleteTopicsThread

当删除Topic的事件通知到来,则doWork里方法继续往下执行:
当所有的replica都完成了topic的删除动作,则调用completeDeleteTopic做最后的清理动作,包括zookeeper上节点的删除,以及controller内存中的清理。
如果有replica将该topic标记为不可删除(可能之前是由于该replica处于Preferred副本选举或分区重分配的过程中),如果有,则重试将topic标记成删除状态
如果该topic可以被删除,且还没有处于已经开始删除的状态,则调用onTopicDeletion执行真正的删除逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
val zkUtils = controllerContext.zkUtils
override def doWork() {
//等待删除Topic的事件通知
awaitTopicDeletionNotification()

if (!isRunning.get)
return

inLock(controllerContext.controllerLock) {
val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted

if(!topicsQueuedForDeletion.isEmpty)
info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))

topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
//如果所有的replica都完成了topic的删除
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
// clear up all state for this topic from controller cache and zookeeper
completeDeleteTopic(topic)
info("Deletion of topic %s successfully completed".format(topic))
} else {
//至少一个replica在开始删除状态
if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
// ignore since topic deletion is in progress
val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
val replicaIds = replicasInDeletionStartedState.map(_.replica)
val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
partitions.mkString(","), topic))
} else {
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
//如果没有replica处于开始删除状态(TopicDeletionStarted),并且也不是所有replica都删除了该topic
//则判断是否有replica将该topic标记为不可删除,如果有,则重试将topic标记成删除状态
if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
// mark topic for deletion retry
markTopicForDeletionRetry(topic)
}
}
}
// Try delete topic if it is eligible for deletion.
//如果该topic可以被删除,且还没有处于已经开始删除的状态
if(isTopicEligibleForDeletion(topic)) {
info("Deletion of topic %s (re)started".format(topic))
// topic deletion will be kicked off
//触发topic删除事件
onTopicDeletion(Set(topic))
} else if(isTopicIneligibleForDeletion(topic)) {
info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
}
}
}
}
}

completeDeleteTopic方法

完成删除topic后会调用completeDeleteTopic进行一些清理工作,即:
删除zookeeper上节点/brokers/topics/< topic_name >
删除zookeeper上节点/config/topics/< topic_name >
删除zookeeper上节点/admin/delete_topics/< topic_name >
并删除内存中的topic相关信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private def completeDeleteTopic(topic: String) {
// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine.deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
val zkUtils = controllerContext.zkUtils
//删除zookeeper上节点/brokers/topics/< topic_name >
zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
//删除zookeeper上节点/config/topics/< topic_name >
zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
//删除zookeeper上节点/admin/delete_topics/< topic_name >
zkUtils.zkClient.delete(getDeleteTopicPath(topic))
//最后移除内存中的topic相关信息
controllerContext.removeTopic(topic)
}

markTopicForDeletionRetry方法

将topic标记成OfflineReplica状态来重试删除

1
2
3
4
5
6
7
private def markTopicForDeletionRetry(topic: String) {
// reset replica states from ReplicaDeletionIneligible to OfflineReplica
val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
.format(topic, failedReplicas.mkString(",")))
controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
}

onTopicDeletion方法

onTopicDeletion最终会调用startReplicaDeletion方法,来开始删除这个topic的所有分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private def onTopicDeletion(topics: Set[String]) {
info("Topic deletion callback for %s".format(topics.mkString(",")))
// send update metadata so that brokers stop serving data for topics to be deleted
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
// 向各broker更新原信息,使得他们不再向外提供数据服务,准备开始删除数据
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach { topic =>
onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
}
}

private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
startReplicaDeletion(replicasPerPartition)
}

private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
// move dead replicas directly to failed state
//将所有已经挂掉的replica标记成ReplicaDeletionIneligible(无法删除的Replica)
replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
// send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
//将所有未挂掉的replica标记成OfflineReplica(下线的Replica),并发送给相应的broker,这样这些broker就不会再向Leader发送该topic的同步请求(FetchRequest)
replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
//给所有replica发送停止fetch请求,请求完成后,回调deleteTopicStopReplicaCallback方法
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
if(deadReplicasForTopic.size > 0) {
debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
markTopicIneligibleForDeletion(Set(topic))
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//开始删除topic开始时,会给存活的broker发送停止fetch的请求,请求完成后,会回调该方法
private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {
val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
val responseMap = stopReplicaResponse.responses.asScala
val partitionsInError =
if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet
val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
inLock(controllerContext.controllerLock) {
// move all the failed replicas to ReplicaDeletionIneligible
//若有replica出现错误,则将它踢出可删除的Replica列表
failReplicaDeletion(replicasInError)
if (replicasInError.size != responseMap.size) {
//有些Replica已经成功删除了数据
// some replicas could have been successfully deleted
val deletedReplicas = responseMap.keySet -- partitionsInError
completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
}
}
}

Kafka删除Topic的过程

分析完源代码,我们总结下,Kafka删除Topic的过程

  1. Kafka的broker在被选举成controller后,会执行下面几步
    1.1 注册DeleteTopicsListener,监听zookeeper节点/admin/delete_topics下子节点的变化,delete命令实际上就是要在该节点下创建一个节点,名字是待删除topic名,标记该topic是待删除的
    1.2 创建一个单独的线程DeleteTopicsThread,来执行topic删除的操作
  2. DeleteTopicsThread线程启动时会先在awaitTopicDeletionNotification处阻塞并等待删除事件的通知,即有新的topic被添加到queue里等待被删除。
  3. 当我们使用了delete命令在zookeeper上的节点/admin/delete_topics下创建子节点< topic_name >。
  4. DeleteTopicsListener会收到ChildChange事件会依次判断如下逻辑:
    4.1 查询topic是否存在,若已经不存在了,则直接删除/admin/delete_topics/< topic_name >节点。
    4.2 查询topic是否为当前正在执行Preferred副本选举或分区重分配,若果是,则标记为暂时不适合被删除。
    4.3 并将该topic添加到queue中,此时会唤醒DeleteTopicsThread中doWork方法里awaitTopicDeletionNotification处的阻塞线程,让删除线程继续往下执行。

而删除线程执行删除操作的真正逻辑是:

  1. 它首先会向各broker更新原信息,使得他们不再向外提供数据服务,准备开始删除数据。
  2. 开始删除这个topic的所有分区
    2.1. 给所有broker发请求,告诉它们这些分区要被删除。broker收到后就不再接受任何在这些分区上的客户端请求了
    2.2. 把每个分区下的所有副本都置于OfflineReplica状态,这样ISR就不断缩小,当leader副本最后也被置于OfflineReplica状态时leader信息将被更新为-1
    2.3 将所有副本置于ReplicaDeletionStarted状态
    2.4 副本状态机捕获状态变更,然后发起StopReplicaRequest给broker,broker接到请求后停止所有fetcher线程、移除缓存,然后删除底层log文件
    2.5 关闭所有空闲的Fetcher线程
  3. 删除zookeeper上节点/brokers/topics/< topic_name >
  4. 删除zookeeper上节点/config/topics/< topic_name >
  5. 删除zookeeper上节点/admin/delete_topics/< topic_name >
  6. 并删除内存中的topic相关信息。

Kafka删除Topic的流程图

Kafka删除Topic的流程图

Q&A

前面我们分析了Kafka删除Topic的源代码,也总结了其删除的过程,下面我们再来看看下面这些相关问题,加深对这个过程的理解

Q1:有分区挂掉的情况下,是否能正常删除?

修改三个broker的server.properties,分别开启delete.topic.enable=true
启动zookeeper和三个broker,broker1,broker2,broker3,并启动kafka-manager,
其中zookeeper端口为2181,
broker1端口为9091,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_1
broker2端口为9092,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_2
broker3端口为9093,log目录为D:\Workspaces\git\others\kafka\kafkaconifg\broker_3
kafka-manager端口为9000,访问http://localhost:9000可以查看kafka集群情况

开始实验,创建topic test

1
.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic test

并写入几条测试消息

1
2
3
4
5
6
7
.\kafka-console-producer.bat --broker-list localhost:9092 --topic test
111
222
333
444
555
666

观察zookeeper中路径

1
2
ls /brokers/topics/test/partitions
[0, 1, 2, 3, 4, 5]

关闭broker2,并执行删除topic命令

1
.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test

观察zookeeper中路径/admin/delete_topics

1
2
[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics
[test]

过几秒后观察只有broker2的log目录下存在topic test的文件夹,而broker1和broker2的log目录下已经删除了test相关log
test-0,test-1,test-2,test-3,test-4,test-5

观察broker1的controller.log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
[2017-12-06 12:14:39,181] DEBUG [DeleteTopicsListener on 1]: Delete topics listener fired for topics test to be deleted (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2017-12-06 12:14:39,182] INFO [DeleteTopicsListener on 1]: Starting topic deletion for topics test (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2017-12-06 12:14:39,184] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,186] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> OnlineReplica, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=3] -> OnlineReplica, [Topic=test,Partition=3,Replica=3] -> OnlineReplica, [Topic=test,Partition=5,Replica=1] -> OnlineReplica, [Topic=test,Partition=0,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=1] -> OnlineReplica, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> OnlineReplica, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,187] INFO [delete-topics-thread-1], Deletion of topic test (re)started (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,188] INFO [Topic Deletion Manager 1], Topic deletion callback for test (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,191] INFO [Topic Deletion Manager 1], Partition deletion callback for [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,194] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionIneligible for replicas [Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,195] INFO [Replica state machine on controller 1]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,195] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,5]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,200] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":1,"leader_epoch":5,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,200] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,2]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,204] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,204] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,3]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,207] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,208] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,4]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,211] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,211] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,1]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,215] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,215] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,2]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,219] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,219] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,0]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,223] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":3,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,223] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,3]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,227] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,227] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,5]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,230] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,231] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,0]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,235] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":4,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,235] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,4]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,239] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,240] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,1]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,243] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 1 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 3 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,245] DEBUG [Topic Deletion Manager 1], Deletion started for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,245] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionStarted for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,246] DEBUG The stop replica request (delete = true) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = false) sent to broker 1 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = true) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,247] DEBUG The stop replica request (delete = false) sent to broker 3 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,247] DEBUG [Topic Deletion Manager 1], Dead Replicas ([Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2]) found for topic test (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,248] INFO [Topic Deletion Manager 1], Halted deletion of topics test (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,248] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,275] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=2,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,275] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=5,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,279] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,279] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=5,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,279] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,279] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,281] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,281] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,281] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,281] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,282] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=2,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,282] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,282] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,282] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,283] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,283] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,295] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=4,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,295] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,295] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=4,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,296] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,296] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=3,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,296] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,297] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,297] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,297] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=3,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,297] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,298] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,298] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,298] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,298] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=1,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,313] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=1,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,313] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,313] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=5,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,313] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,314] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,314] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,314] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=5,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,314] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,315] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,315] INFO [delete-topics-thread-1], Deletion for replicas 3,1 for partition [test,4],[test,0],[test,3],[test,2],[test,1] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,315] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,315] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,329] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=2,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,329] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,329] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=2,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,330] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=0,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,0],[test,3],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,330] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,330] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,330] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=0,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,332] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,332] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,332] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3],[test,0],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,333] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,333] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,345] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=0,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,345] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,345] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=0,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,345] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,346] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,346] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3],[test,4] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,346] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,346] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,347] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=4,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,347] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,347] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=4,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,348] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Deletion for replicas 1,3 for partition [test,1],[test,3] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,348] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,348] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,360] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=3,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,361] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=3] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,361] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=3,Replica=3] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,361] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,361] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionStarted, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Delete topic callback invoked for {error_code=0,partitions=[{topic=test,partition=1,error_code=0}]} (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Deletion for replicas 1 for partition [test,1] of topic test in progress (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,362] DEBUG [Topic Deletion Manager 1], Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=1] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,362] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionSuccessful for replicas [Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,362] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=2,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=3] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=0,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=4,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> ReplicaDeletionSuccessful, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,362] INFO [delete-topics-thread-1], Not retrying deletion of topic test at this time since it is marked ineligible for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,363] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)

发现broker1和broker3的所有partition全部删除了

1
2
3
4
5
6
7
8
9
10
11
12
Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=5,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=2,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=0,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=4,Replica=1]
Deletion successfully completed for replicas [Topic=test,Partition=3,Replica=3]
Deletion successfully completed for replicas [Topic=test,Partition=1,Replica=1]

观察broker1和broker3的server.log,发现topic test的数据文件已被成功清除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[2017-12-06 12:14:39,245] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,248] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,249] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,250] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,251] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,252] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,253] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,268] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-5\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,272] INFO Deleted log for partition [test,5] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-5. (kafka.log.LogManager)
[2017-12-06 12:14:39,279] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,290] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-4\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,294] INFO Deleted log for partition [test,4] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-4. (kafka.log.LogManager)
[2017-12-06 12:14:39,296] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,309] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-1\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,312] INFO Deleted log for partition [test,1] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-1. (kafka.log.LogManager)
[2017-12-06 12:14:39,313] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,324] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-2\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,328] INFO Deleted log for partition [test,2] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-2. (kafka.log.LogManager)
[2017-12-06 12:14:39,330] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,341] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-0\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,344] INFO Deleted log for partition [test,0] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-0. (kafka.log.LogManager)
[2017-12-06 12:14:39,346] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:14:39,356] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-3\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:14:39,360] INFO Deleted log for partition [test,3] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_3\test-3. (kafka.log.LogManager)

接着启动broker2
观察broker2的server.log,发现topic test的数据文件已被成功清除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[2017-12-06 12:24:05,687] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,691] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,695] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,700] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,706] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,711] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,715] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,1] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,742] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-1\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,747] INFO Deleted log for partition [test,1] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-1. (kafka.log.LogManager)
[2017-12-06 12:24:05,750] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,4] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,764] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-4\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,769] INFO Deleted log for partition [test,4] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-4. (kafka.log.LogManager)
[2017-12-06 12:24:05,772] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,2] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,785] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-2\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,789] INFO Deleted log for partition [test,2] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-2. (kafka.log.LogManager)
[2017-12-06 12:24:05,791] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,803] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-0\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,805] INFO Deleted log for partition [test,0] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-0. (kafka.log.LogManager)
[2017-12-06 12:24:05,808] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,3] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,818] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-3\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,821] INFO Deleted log for partition [test,3] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-3. (kafka.log.LogManager)
[2017-12-06 12:24:05,823] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test,5] (kafka.server.ReplicaFetcherManager)
[2017-12-06 12:24:05,833] INFO Deleting index D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-5\00000000000000000000.index (kafka.log.OffsetIndex)
[2017-12-06 12:24:05,836] INFO Deleted log for partition [test,5] in D:\Workspaces\git\others\kafka\kafkaconifg\broker_2\test-5. (kafka.log.LogManager)

最后观察zookeeper中路径/admin/delete_topics,已经没有test子节点

1
2
[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics
[]

观察zookeeper中路径

1
2
[zk: localhost:2181(CONNECTED) 26] ls /brokers/topics/test/partitions
Node does not exist: /brokers/topics/test/partitions

Q2:是否不需要每台broker都修改topic.delete.enable为true?

在Q1中,我们如果只配置broker1的topic.delete.enable为true,也是能正常删除topic的,我们只要保证broker1是整个集群的controller即可,我们可以先启动broker1,然后再启动broker2,broker3。
而在生产环境中,为防止有人误删,我们一般会关闭删除功能,而定期清理不用的topic,在定期做清理时,我们其实可以只改其中一个broker的配置,然后确保它是集群中的controller即可。
而不幸的是,你改了broker1配置,必然要重启该broker,然后其他broker就会成为新的broker,这时,相当于你修改的这个broker1的配置还是不起作用,因为它不是controller了,所以我们可以在broker1重新启动后,分别关闭broker2,broker3,让broker1重新获得controller的控制权,这样就不用一个个去修改broker的配置了。
在清理完topic后,记得把topic.delete.enable改回为false。

Q3:如果删除topic时还有producer连接到该topic时,且配置auto.create.topics.enable=true,会发生什么?

本人在做Q1例子的时候,启动了客户端生产消息到topic test中,然后忘记关了,而auto.create.topics.enable是默认打开的,所以我在尝试删除topic的时候,发现总有broker1中的test-0无法删除。
后来才发现是因为有客户端连接到了该topic,在我删除成功后,又自动创建了topic,而默认是1个replica,和1个partition。

Q4: 如何不通过kafka手动删除topic

以删除topic test为例,步骤如下:

  1. 删除kafka存储目录(server.properties文件log.dirs配置,默认为”/tmp/kafka-logs”)相关topic目录,如test-0,test-1等
  2. 删除zookeeper目录下相关topic节点
    rmr /brokers/topics/test
    rmr /config/topics/test
    rmr /admin/delete_topics/test

[参考资料]

http://www.cnblogs.com/huxi2b/p/4842695.html

分享到