Skip to content

KAFKA-19183: Replace Pool with ConcurrentHashMap #19535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Apr 22, 2025

  1. Replace Pool.scala with ConcurrentHashMap.
  2. Remove PoolTest.scala.

@@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
metricsGroup.newGauge("MaxLag", () => {
// current max lag across all fetchers/topics/partitions
fetcherThreadMap.values.foldLeft(0L) { (curMaxLagAll, fetcherThread) =>
val maxLagThread = fetcherThread.fetcherLagStats.stats.values.foldLeft(0L)((curMaxLagThread, lagMetrics) =>
val maxLagThread = fetcherThread.fetcherLagStats.stats.values.asScala.foldLeft(0L)((curMaxLagThread, lagMetrics) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val maxLagThread = fetcherThread.fetcherLagStats.stats.values.stream().mapToLong(v => v.lag).max().orElse(0L)

@@ -916,7 +915,7 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
}

def unregister(): Unit = {
stats.keys.toBuffer.foreach { key: TopicPartition =>
stats.keys.asScala.toBuffer.foreach { key: TopicPartition =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stats.forEach((key, _) => unregister(key))

followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition, metadataCache)))
remoteReplicasMap.removeAll(removedReplicas)
followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new Replica(id, topicPartition, metadataCache)))
remoteReplicasMap.keySet.removeAll(removedReplicas.asJavaCollection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remoteReplicasMap.keySet.removeIf(replica => !followers.contains(replica))


// Due to code paths accessing remoteReplicasMap without a lock,
// first add the new replicas and then remove the old ones.
followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition, metadataCache)))
remoteReplicasMap.removeAll(removedReplicas)
followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new Replica(id, topicPartition, metadataCache)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k -> _

@@ -2512,7 +2510,7 @@ class ReplicaManager(val config: KafkaConfig,
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")

// Shrink ISRs for non offline partitions
allPartitions.keys.foreach { topicPartition =>
allPartitions.keys.asScala.foreach { topicPartition =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    allPartitions.forEach { (topicPartition, _) =>
      onlinePartition(topicPartition).foreach(_.maybeShrinkIsr())
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants