Skip to content

KAFKA-19183: Replace Pool with ConcurrentHashMap #19535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 27, 2025

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Apr 22, 2025

  1. Replace Pool.scala with ConcurrentHashMap.
  2. Remove PoolTest.scala.

Reviewers: Chia-Ping Tsai [email protected]

@@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
metricsGroup.newGauge("MaxLag", () => {
// current max lag across all fetchers/topics/partitions
fetcherThreadMap.values.foldLeft(0L) { (curMaxLagAll, fetcherThread) =>
val maxLagThread = fetcherThread.fetcherLagStats.stats.values.foldLeft(0L)((curMaxLagThread, lagMetrics) =>
val maxLagThread = fetcherThread.fetcherLagStats.stats.values.asScala.foldLeft(0L)((curMaxLagThread, lagMetrics) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val maxLagThread = fetcherThread.fetcherLagStats.stats.values.stream().mapToLong(v => v.lag).max().orElse(0L)

@@ -916,7 +915,7 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
}

def unregister(): Unit = {
stats.keys.toBuffer.foreach { key: TopicPartition =>
stats.keys.asScala.toBuffer.foreach { key: TopicPartition =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stats.forEach((key, _) => unregister(key))

followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition, metadataCache)))
remoteReplicasMap.removeAll(removedReplicas)
followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new Replica(id, topicPartition, metadataCache)))
remoteReplicasMap.keySet.removeAll(removedReplicas.asJavaCollection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remoteReplicasMap.keySet.removeIf(replica => !followers.contains(replica))


// Due to code paths accessing remoteReplicasMap without a lock,
// first add the new replicas and then remove the old ones.
followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition, metadataCache)))
remoteReplicasMap.removeAll(removedReplicas)
followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new Replica(id, topicPartition, metadataCache)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k -> _

@@ -2512,7 +2510,7 @@ class ReplicaManager(val config: KafkaConfig,
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")

// Shrink ISRs for non offline partitions
allPartitions.keys.foreach { topicPartition =>
allPartitions.keys.asScala.foreach { topicPartition =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    allPartitions.forEach { (topicPartition, _) =>
      onlinePartition(topicPartition).foreach(_.maybeShrinkIsr())
    }

@chia7712 chia7712 merged commit 7293f3a into apache:trunk Apr 27, 2025
23 checks passed
@FrankYang0529 FrankYang0529 deleted the KAFKA-19183 branch April 27, 2025 15:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants