Skip to content

Commit 32b1b3e

Browse files
hzxa21xiowu0
authored andcommitted
[LI-HOTFIX] Add topic deletion hook in zookeeper to trigger in-memory topic deletion flag changes:
TICKET = LI_DESCRIPTION = - Add TopicDeletionFlagPath `/topic_deletion_flag` to trigger changes in topic deletion enable flag in TopicDeletionManager. - Controller will listen on this path when active and un-listen when resignation. - If the znode is deleted, the topic deletion flag will be set according to the config. RB=1306354 BUG=LIKAFKA-17224 G=Kafka-Code-Reviews R=luwang A=luwang EXIT_CRITERIA = MANUAL ["After upstream makes delete.topic.enable a dynamic broker config"]
1 parent 320b70a commit 32b1b3e

File tree

6 files changed

+178
-5
lines changed

6 files changed

+178
-5
lines changed

core/src/main/scala/kafka/controller/ControllerState.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ object ControllerState {
9898
def value = 14
9999
}
100100

101+
case object TopicDeletionFlagChange extends ControllerState {
102+
def value = 15
103+
}
104+
101105
val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
102106
PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
103-
LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, TopicUncleanLeaderElectionEnable)
107+
LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, TopicUncleanLeaderElectionEnable, TopicDeletionFlagChange)
104108
}

core/src/main/scala/kafka/controller/KafkaController.scala

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class KafkaController(val config: KafkaConfig,
101101
private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager)
102102
private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(eventManager)
103103
private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(eventManager)
104+
private val topicDeletionFlagHandler = new TopicDeletionFlagHandler(this, eventManager)
104105

105106
@volatile private var activeControllerId = -1
106107
@volatile private var offlinePartitionCount = 0
@@ -244,7 +245,7 @@ class KafkaController(val config: KafkaConfig,
244245
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
245246
isrChangeNotificationHandler)
246247
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
247-
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
248+
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler, topicDeletionFlagHandler)
248249
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
249250

250251
info("Deleting log dir event notifications")
@@ -325,6 +326,7 @@ class KafkaController(val config: KafkaConfig,
325326
zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
326327
unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
327328
zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
329+
zkClient.unregisterZNodeChangeHandler(topicDeletionFlagHandler.path)
328330
// shutdown replica state machine
329331
replicaStateMachine.shutdown()
330332
zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
@@ -1378,7 +1380,7 @@ class KafkaController(val config: KafkaConfig,
13781380
zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
13791381
}
13801382
topicsToBeDeleted --= nonExistentTopics
1381-
if (config.deleteTopicEnable) {
1383+
if (topicDeletionManager.isDeleteTopicEnabled) {
13821384
if (topicsToBeDeleted.nonEmpty) {
13831385
info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
13841386
// mark topic ineligible for deletion if other state changes are in progress
@@ -1399,6 +1401,24 @@ class KafkaController(val config: KafkaConfig,
13991401
}
14001402
}
14011403

1404+
private def processTopicDeletionFlagChange(reset: Boolean = false): Unit = {
1405+
info("Process TopicDeletionFlagChange event")
1406+
if (!isActive) return
1407+
if (reset)
1408+
topicDeletionManager.resetDeleteTopicEnabled()
1409+
else {
1410+
val topicDeletionFlag = zkClient.getTopicDeletionFlag
1411+
if (!topicDeletionFlag.equalsIgnoreCase("true") && !topicDeletionFlag.equalsIgnoreCase("false")) {
1412+
info(s"Overwrite ${DeleteTopicFlagZNode.path} to ${topicDeletionManager.isDeleteTopicEnabled}")
1413+
zkClient.setTopicDeletionFlag(topicDeletionManager.isDeleteTopicEnabled.toString)
1414+
}
1415+
else {
1416+
info(s"Set isDeleteTopicEnabled flag to $topicDeletionFlag")
1417+
topicDeletionManager.isDeleteTopicEnabled = topicDeletionFlag.toBoolean
1418+
}
1419+
}
1420+
}
1421+
14021422
private def processPartitionReassignment(): Unit = {
14031423
if (!isActive) return
14041424

@@ -1594,6 +1614,8 @@ class KafkaController(val config: KafkaConfig,
15941614
processPartitionModifications(topic)
15951615
case TopicDeletion =>
15961616
processTopicDeletion()
1617+
case TopicDeletionFlagChange(reset) =>
1618+
processTopicDeletionFlagChange(reset)
15971619
case PartitionReassignment =>
15981620
processPartitionReassignment()
15991621
case PartitionReassignmentIsrChange(partition) =>
@@ -1668,6 +1690,19 @@ class TopicDeletionHandler(eventManager: ControllerEventManager) extends ZNodeCh
16681690

16691691
override def handleChildChange(): Unit = eventManager.put(TopicDeletion)
16701692
}
1693+
/**
1694+
* Listener for /topic_deletion_flag znode.
1695+
* If the data of the znode is set to true/false, it will trigger the in memory isDeleteTopicEnabled to be set accordingly.
1696+
* If the znode data cannot be converted to boolean, it will overwrite znode with the previous valid value.
1697+
* If the znode path is deleted, it will reset the in memory isDeleteTopicEnabled to the config value.
1698+
*/
1699+
class TopicDeletionFlagHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
1700+
override val path: String = DeleteTopicFlagZNode.path
1701+
1702+
override def handleDataChange(): Unit = eventManager.put(TopicDeletionFlagChange())
1703+
1704+
override def handleDeletion(): Unit = eventManager.put(TopicDeletionFlagChange(true))
1705+
}
16711706

16721707
class PartitionReassignmentHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
16731708
override val path: String = ReassignPartitionsZNode.path
@@ -1840,6 +1875,10 @@ case object IsrChangeNotification extends ControllerEvent {
18401875
override def state: ControllerState = ControllerState.IsrChange
18411876
}
18421877

1878+
case class TopicDeletionFlagChange(reset: Boolean = false) extends ControllerEvent {
1879+
def state = ControllerState.TopicDeletionFlagChange
1880+
}
1881+
18431882
case class PreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
18441883
electionType: ElectionType = ZkTriggered,
18451884
callback: ElectPreferredLeadersCallback = (_,_) => {}) extends ControllerEvent {

core/src/main/scala/kafka/controller/TopicDeletionManager.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import kafka.server.KafkaConfig
2020
import kafka.utils.Logging
2121
import kafka.zk.KafkaZkClient
2222
import org.apache.kafka.common.TopicPartition
23+
import org.apache.zookeeper.KeeperException.{NoNodeException, NodeExistsException}
2324

2425
import scala.collection.Set
2526

@@ -28,6 +29,8 @@ trait DeletionClient {
2829
def deleteTopicDeletions(topics: Seq[String], epochZkVersion: Int): Unit
2930
def mutePartitionModifications(topic: String): Unit
3031
def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit
32+
def createDeleteTopicFlagPath(): Unit
33+
def getTopicDeletionFlag(): String
3134
}
3235

3336
class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkClient) extends DeletionClient {
@@ -48,6 +51,14 @@ class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkCli
4851
override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = {
4952
controller.sendUpdateMetadataRequest(controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
5053
}
54+
55+
override def createDeleteTopicFlagPath(): Unit = {
56+
zkClient.createDeleteTopicFlagPath
57+
}
58+
59+
override def getTopicDeletionFlag(): String = {
60+
zkClient.getTopicDeletionFlag
61+
}
5162
}
5263

5364
/**
@@ -89,12 +100,21 @@ class TopicDeletionManager(config: KafkaConfig,
89100
partitionStateMachine: PartitionStateMachine,
90101
client: DeletionClient) extends Logging {
91102
this.logIdent = s"[Topic Deletion Manager ${config.brokerId}] "
92-
val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable
103+
var isDeleteTopicEnabled: Boolean = config.deleteTopicEnable
104+
105+
// Try to create the znode for delete topic flag
106+
try {
107+
client.createDeleteTopicFlagPath()
108+
} catch {
109+
case _: NodeExistsException =>
110+
}
93111

94112
def init(initialTopicsToBeDeleted: Set[String], initialTopicsIneligibleForDeletion: Set[String]): Unit = {
95113
info(s"Initializing manager with initial deletions: $initialTopicsToBeDeleted, " +
96114
s"initial ineligible deletions: $initialTopicsIneligibleForDeletion")
97115

116+
isDeleteTopicEnabled = getDeleteTopicEnabled()
117+
98118
if (isDeleteTopicEnabled) {
99119
controllerContext.queueTopicDeletion(initialTopicsToBeDeleted)
100120
controllerContext.topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & controllerContext.topicsToBeDeleted
@@ -355,4 +375,20 @@ class TopicDeletionManager(config: KafkaConfig,
355375
}
356376
}
357377
}
378+
379+
private def getDeleteTopicEnabled(): Boolean = {
380+
try {
381+
val deleteTopicFlag = client.getTopicDeletionFlag
382+
if (deleteTopicFlag == null || (!deleteTopicFlag.equalsIgnoreCase("true") && !deleteTopicFlag.equalsIgnoreCase("false")))
383+
isDeleteTopicEnabled
384+
else deleteTopicFlag.toBoolean
385+
} catch {
386+
case _: NoNodeException => config.deleteTopicEnable
387+
}
388+
}
389+
390+
def resetDeleteTopicEnabled(): Unit = {
391+
info("Reset isDeleteTopicEnabled flag to %s".format(config.deleteTopicEnable))
392+
isDeleteTopicEnabled = config.deleteTopicEnable
393+
}
358394
}

core/src/main/scala/kafka/zk/KafkaZkClient.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,37 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
773773
}
774774
}
775775

776+
/**
777+
* Creates the delete topic flag znode.
778+
* @throws KeeperException if there is an error while setting or creating the znode
779+
*/
780+
def createDeleteTopicFlagPath(): Unit = {
781+
createRecursive(DeleteTopicFlagZNode.path)
782+
}
783+
784+
/**
785+
* Get topic deletion flag in zookeeper.
786+
* @return topic deletion flag in zookeeper.
787+
*/
788+
def getTopicDeletionFlag: String = {
789+
val getDataResponse = retryRequestUntilConnected(GetDataRequest(DeleteTopicFlagZNode.path))
790+
getDataResponse.resultCode match {
791+
case Code.OK => DeleteTopicFlagZNode.decode(getDataResponse.data)
792+
case _ => throw getDataResponse.resultException.get
793+
}
794+
}
795+
796+
/**
797+
* Set topic deletion flag in zookeeper.
798+
*/
799+
def setTopicDeletionFlag(flag: String): Unit = {
800+
val setDataResponse = retryRequestUntilConnected(SetDataRequest(DeleteTopicFlagZNode.path, DeleteTopicFlagZNode.encode(flag), -1))
801+
setDataResponse.resultCode match {
802+
case Code.OK =>
803+
case _ => throw setDataResponse.resultException.get
804+
}
805+
}
806+
776807
/**
777808
* Remove the given topics from the topics marked for deletion.
778809
* @param topics the topics to remove.

core/src/main/scala/kafka/zk/ZkData.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,12 @@ object DeleteTopicsTopicZNode {
373373
def path(topic: String) = s"${DeleteTopicsZNode.path}/$topic"
374374
}
375375

376+
object DeleteTopicFlagZNode {
377+
def path = "/topic_deletion_flag"
378+
def encode(topicDeletionFlag: String): Array[Byte] = topicDeletionFlag.getBytes(UTF_8)
379+
def decode(bytes: Array[Byte]): String = if (bytes != null) new String(bytes, UTF_8) else ""
380+
}
381+
376382
object ReassignPartitionsZNode {
377383

378384
/**

core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package kafka.admin
1818

1919
import kafka.log.Log
20-
import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
20+
import kafka.zk.{DeleteTopicFlagZNode, TopicPartitionZNode, TopicZNode, ZooKeeperTestHarness}
2121
import kafka.utils.TestUtils
2222
import kafka.server.{KafkaConfig, KafkaServer}
2323
import org.junit.Assert._
@@ -411,6 +411,63 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
411411
assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
412412
}
413413

414+
@Test
415+
def testDeleteTopicAfterEnableZkDeleteTopicFlag() {
416+
val topicPartition = new TopicPartition("test", 0)
417+
val topic = topicPartition.topic
418+
servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
419+
// mark the topic for deletion
420+
adminZkClient.deleteTopic("test")
421+
TestUtils.waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
422+
"Admin path /admin/delete_topic/%s path not deleted even if deleteTopic is disabled".format(topic))
423+
// verify that topic test is untouched
424+
assertTrue(servers.forall(_.getLogManager().getLog(topicPartition).isDefined))
425+
// test the topic path exists
426+
assertTrue("Topic path disappeared even when topic deletion is disabled", zkClient.pathExists(TopicZNode.path(topic)))
427+
// topic test should have a leader
428+
val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
429+
assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
430+
431+
// Set TopicDeletionFlag to true in zk and try delete topic again
432+
zkClient.setTopicDeletionFlag("true")
433+
TestUtils.waitUntilTrue(() =>
434+
try {
435+
zkClient.getTopicDeletionFlag.equalsIgnoreCase("true")
436+
} catch {
437+
case _: Throwable => false
438+
},
439+
"TopicDeletionFlag is not set")
440+
TestUtils.waitUntilTrue( () => getController()._1.kafkaController.topicDeletionManager.isDeleteTopicEnabled,
441+
"Delete topic is not enabled")
442+
// mark the topic for deletion
443+
adminZkClient.deleteTopic("test")
444+
TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
445+
446+
// Set TopicDeletionFlag to invalid value in zk
447+
zkClient.setTopicDeletionFlag("flase")
448+
TestUtils.waitUntilTrue(() =>
449+
try {
450+
zkClient.getTopicDeletionFlag.equalsIgnoreCase("true")
451+
} catch {
452+
case _: Throwable => false
453+
},
454+
"TopicDeletionFlag is not overwritten")
455+
456+
// delete TopicDeletionFlagPath in zk
457+
zkClient.deletePath(DeleteTopicFlagZNode.path)
458+
TestUtils.waitUntilTrue(() =>
459+
try {
460+
!zkClient.pathExists(DeleteTopicFlagZNode.path)
461+
} catch {
462+
case _: Throwable => false
463+
},
464+
"TopicDeletionFlagPath is not deleted")
465+
TestUtils.waitUntilTrue(() =>
466+
getController()._1.kafkaController.topicDeletionManager.isDeleteTopicEnabled == false,
467+
"Topic deletion flag is not rest"
468+
)
469+
}
470+
414471
@Test
415472
def testDeletingPartiallyDeletedTopic() {
416473
/**

0 commit comments

Comments
 (0)