Skip to content

Commit 5b99a6b

Browse files
hzxa21xiowu0
authored andcommitted
[LI-HOTFIX] LIKAFKA-18349 - Reuse the same UpdateMetadataRequest object and struct to reduce controller memory usage
TICKET = KAFKA-7186 LI_DESCRIPTION = During controller failover and broker changes, it sends out UpdateMetadataRequest to all brokers in the cluster containing the states for all partitions and live brokers. The current implementation will instantiate the UpdateMetadataRequest object and its serialized form (Struct) for # of brokers times, which causes OOM if the memory exceeds the configure JVM heap size before GC kicks in. We have seen this issue in the production environment for multiple times. For example, if we have 100 brokers in the cluster and each broker is the leader of 2k partitions, the extra memory usage introduced by controller trying to send out UpdateMetadataRequest is around: ``` <memory used by UpdateMetadataRequest Structs> * <# of brokers> * <total # of leader parittions> = 250B * 100 * 200k = 5GB ``` This patch avoids the recreation of UpdateMetadataReuqest and struct objects if the same builder object is used to build the request. This can significantly reduce memory footprint in Controller based on the above calculation. RB=1364716 BUG=LIKAFKA-18349 G=Kafka-Code-Reviews R=luwang,okaraman,mgharat A=mgharat EXIT_CRITERIA = TICKET [KAFKA-7186]
1 parent 32b1b3e commit 5b99a6b

File tree

1 file changed

+78
-66
lines changed

1 file changed

+78
-66
lines changed

clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java

Lines changed: 78 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ public static class Builder extends AbstractControlRequest.Builder<UpdateMetadat
185185
private final Map<TopicPartition, PartitionState> partitionStates;
186186
private final Set<Broker> liveBrokers;
187187

188+
// LIKAFKA-18349 - Cache the UpdateMetadataRequest Objects to reduce memory usage
189+
private final Map<Short, UpdateMetadataRequest> requestCache = new HashMap<>();
190+
188191
public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
189192
Map<TopicPartition, PartitionState> partitionStates, Set<Broker> liveBrokers) {
190193
super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch);
@@ -194,14 +197,20 @@ public Builder(short version, int controllerId, int controllerEpoch, long broker
194197

195198
@Override
196199
public UpdateMetadataRequest build(short version) {
197-
if (version == 0) {
198-
for (Broker broker : liveBrokers) {
199-
if (broker.endPoints.size() != 1 || broker.endPoints.get(0).securityProtocol != SecurityProtocol.PLAINTEXT) {
200-
throw new UnsupportedVersionException("UpdateMetadataRequest v0 only handles PLAINTEXT endpoints");
200+
UpdateMetadataRequest updateMetadataRequest = requestCache.get(version);
201+
if (updateMetadataRequest == null) {
202+
if (version == 0) {
203+
for (Broker broker : liveBrokers) {
204+
if (broker.endPoints.size() != 1 || broker.endPoints.get(0).securityProtocol != SecurityProtocol.PLAINTEXT) {
205+
throw new UnsupportedVersionException(
206+
"UpdateMetadataRequest v0 only handles PLAINTEXT endpoints");
207+
}
201208
}
202209
}
210+
updateMetadataRequest = new UpdateMetadataRequest(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, liveBrokers);
211+
requestCache.put(version, updateMetadataRequest);
203212
}
204-
return new UpdateMetadataRequest(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, liveBrokers);
213+
return updateMetadataRequest;
205214
}
206215

207216
@Override
@@ -327,6 +336,9 @@ public String toString() {
327336
private final Map<TopicPartition, PartitionState> partitionStates;
328337
private final Set<Broker> liveBrokers;
329338

339+
// LIKAFKA-18349 - Cache the UpdateMetadataRequest struct to reduce memory usage
340+
private Struct struct = null;
341+
330342
private UpdateMetadataRequest(short version, int controllerId, int controllerEpoch, long brokerEpoch,
331343
Map<TopicPartition, PartitionState> partitionStates, Set<Broker> liveBrokers) {
332344
super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch);
@@ -400,74 +412,74 @@ public UpdateMetadataRequest(Struct struct, short versionId) {
400412

401413
@Override
402414
protected Struct toStruct() {
403-
short version = version();
404-
Struct struct = new Struct(ApiKeys.UPDATE_METADATA.requestSchema(version));
405-
struct.set(CONTROLLER_ID, controllerId);
406-
struct.set(CONTROLLER_EPOCH, controllerEpoch);
407-
struct.setIfExists(BROKER_EPOCH, brokerEpoch);
408-
409-
if (struct.hasField(TOPIC_STATES)) {
410-
Map<String, Map<Integer, PartitionState>> topicStates = CollectionUtils.groupPartitionDataByTopic(partitionStates);
411-
List<Struct> topicStatesData = new ArrayList<>(topicStates.size());
412-
for (Map.Entry<String, Map<Integer, PartitionState>> entry : topicStates.entrySet()) {
413-
Struct topicStateData = struct.instance(TOPIC_STATES);
414-
topicStateData.set(TOPIC_NAME, entry.getKey());
415-
Map<Integer, PartitionState> partitionMap = entry.getValue();
416-
List<Struct> partitionStatesData = new ArrayList<>(partitionMap.size());
417-
for (Map.Entry<Integer, PartitionState> partitionEntry : partitionMap.entrySet()) {
418-
Struct partitionStateData = topicStateData.instance(PARTITION_STATES);
419-
partitionStateData.set(PARTITION_ID, partitionEntry.getKey());
420-
partitionEntry.getValue().setStruct(partitionStateData);
415+
if (struct == null) {
416+
short version = version();
417+
Struct struct = new Struct(ApiKeys.UPDATE_METADATA.requestSchema(version));
418+
struct.set(CONTROLLER_ID, controllerId);
419+
struct.set(CONTROLLER_EPOCH, controllerEpoch);
420+
struct.setIfExists(BROKER_EPOCH, brokerEpoch);
421+
422+
if (struct.hasField(TOPIC_STATES)) {
423+
Map<String, Map<Integer, PartitionState>> topicStates = CollectionUtils.groupPartitionDataByTopic(partitionStates);
424+
List<Struct> topicStatesData = new ArrayList<>(topicStates.size());
425+
for (Map.Entry<String, Map<Integer, PartitionState>> entry : topicStates.entrySet()) {
426+
Struct topicStateData = struct.instance(TOPIC_STATES);
427+
topicStateData.set(TOPIC_NAME, entry.getKey());
428+
Map<Integer, PartitionState> partitionMap = entry.getValue();
429+
List<Struct> partitionStatesData = new ArrayList<>(partitionMap.size());
430+
for (Map.Entry<Integer, PartitionState> partitionEntry : partitionMap.entrySet()) {
431+
Struct partitionStateData = topicStateData.instance(PARTITION_STATES);
432+
partitionStateData.set(PARTITION_ID, partitionEntry.getKey());
433+
partitionEntry.getValue().setStruct(partitionStateData);
434+
partitionStatesData.add(partitionStateData);
435+
}
436+
topicStateData.set(PARTITION_STATES, partitionStatesData.toArray());
437+
topicStatesData.add(topicStateData);
438+
}
439+
struct.set(TOPIC_STATES, topicStatesData.toArray());
440+
} else {
441+
List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size());
442+
for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) {
443+
Struct partitionStateData = struct.instance(PARTITION_STATES);
444+
TopicPartition topicPartition = entry.getKey();
445+
partitionStateData.set(TOPIC_NAME, topicPartition.topic());
446+
partitionStateData.set(PARTITION_ID, topicPartition.partition());
447+
entry.getValue().setStruct(partitionStateData);
421448
partitionStatesData.add(partitionStateData);
422449
}
423-
topicStateData.set(PARTITION_STATES, partitionStatesData.toArray());
424-
topicStatesData.add(topicStateData);
450+
struct.set(PARTITION_STATES, partitionStatesData.toArray());
425451
}
426-
struct.set(TOPIC_STATES, topicStatesData.toArray());
427-
} else {
428-
List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size());
429-
for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) {
430-
Struct partitionStateData = struct.instance(PARTITION_STATES);
431-
TopicPartition topicPartition = entry.getKey();
432-
partitionStateData.set(TOPIC_NAME, topicPartition.topic());
433-
partitionStateData.set(PARTITION_ID, topicPartition.partition());
434-
entry.getValue().setStruct(partitionStateData);
435-
partitionStatesData.add(partitionStateData);
436-
}
437-
struct.set(PARTITION_STATES, partitionStatesData.toArray());
438-
}
439452

440-
List<Struct> brokersData = new ArrayList<>(liveBrokers.size());
441-
for (Broker broker : liveBrokers) {
442-
Struct brokerData = struct.instance(LIVE_BROKERS);
443-
brokerData.set(BROKER_ID, broker.id);
444-
445-
if (version == 0) {
446-
EndPoint endPoint = broker.endPoints.get(0);
447-
brokerData.set(HOST, endPoint.host);
448-
brokerData.set(PORT, endPoint.port);
449-
} else {
450-
List<Struct> endPointsData = new ArrayList<>(broker.endPoints.size());
451-
for (EndPoint endPoint : broker.endPoints) {
452-
Struct endPointData = brokerData.instance(ENDPOINTS);
453-
endPointData.set(PORT, endPoint.port);
454-
endPointData.set(HOST, endPoint.host);
455-
endPointData.set(SECURITY_PROTOCOL_TYPE, endPoint.securityProtocol.id);
456-
if (version >= 3)
457-
endPointData.set(LISTENER_NAME, endPoint.listenerName.value());
458-
endPointsData.add(endPointData);
459-
460-
}
461-
brokerData.set(ENDPOINTS, endPointsData.toArray());
462-
if (version >= 2) {
463-
brokerData.set(RACK, broker.rack);
453+
List<Struct> brokersData = new ArrayList<>(liveBrokers.size());
454+
for (Broker broker : liveBrokers) {
455+
Struct brokerData = struct.instance(LIVE_BROKERS);
456+
brokerData.set(BROKER_ID, broker.id);
457+
458+
if (version == 0) {
459+
EndPoint endPoint = broker.endPoints.get(0);
460+
brokerData.set(HOST, endPoint.host);
461+
brokerData.set(PORT, endPoint.port);
462+
} else {
463+
List<Struct> endPointsData = new ArrayList<>(broker.endPoints.size());
464+
for (EndPoint endPoint : broker.endPoints) {
465+
Struct endPointData = brokerData.instance(ENDPOINTS);
466+
endPointData.set(PORT, endPoint.port);
467+
endPointData.set(HOST, endPoint.host);
468+
endPointData.set(SECURITY_PROTOCOL_TYPE, endPoint.securityProtocol.id);
469+
if (version >= 3) endPointData.set(LISTENER_NAME, endPoint.listenerName.value());
470+
endPointsData.add(endPointData);
471+
}
472+
brokerData.set(ENDPOINTS, endPointsData.toArray());
473+
if (version >= 2) {
474+
brokerData.set(RACK, broker.rack);
475+
}
464476
}
465-
}
466477

467-
brokersData.add(brokerData);
478+
brokersData.add(brokerData);
479+
}
480+
struct.set(LIVE_BROKERS, brokersData.toArray());
481+
this.struct = struct;
468482
}
469-
struct.set(LIVE_BROKERS, brokersData.toArray());
470-
471483
return struct;
472484
}
473485

0 commit comments

Comments
 (0)