-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-19183: Replace Pool with ConcurrentHashMap #19535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Signed-off-by: PoAn Yang <[email protected]>
@@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri | |||
metricsGroup.newGauge("MaxLag", () => { | |||
// current max lag across all fetchers/topics/partitions | |||
fetcherThreadMap.values.foldLeft(0L) { (curMaxLagAll, fetcherThread) => | |||
val maxLagThread = fetcherThread.fetcherLagStats.stats.values.foldLeft(0L)((curMaxLagThread, lagMetrics) => | |||
val maxLagThread = fetcherThread.fetcherLagStats.stats.values.asScala.foldLeft(0L)((curMaxLagThread, lagMetrics) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val maxLagThread = fetcherThread.fetcherLagStats.stats.values.stream().mapToLong(v => v.lag).max().orElse(0L)
@@ -916,7 +915,7 @@ class FetcherLagStats(metricId: ClientIdAndBroker) { | |||
} | |||
|
|||
def unregister(): Unit = { | |||
stats.keys.toBuffer.foreach { key: TopicPartition => | |||
stats.keys.asScala.toBuffer.foreach { key: TopicPartition => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stats.forEach((key, _) => unregister(key))
followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition, metadataCache))) | ||
remoteReplicasMap.removeAll(removedReplicas) | ||
followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new Replica(id, topicPartition, metadataCache))) | ||
remoteReplicasMap.keySet.removeAll(removedReplicas.asJavaCollection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remoteReplicasMap.keySet.removeIf(replica => !followers.contains(replica))
|
||
// Due to code paths accessing remoteReplicasMap without a lock, | ||
// first add the new replicas and then remove the old ones. | ||
followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition, metadataCache))) | ||
remoteReplicasMap.removeAll(removedReplicas) | ||
followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new Replica(id, topicPartition, metadataCache))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
k -> _
@@ -2512,7 +2510,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") | |||
|
|||
// Shrink ISRs for non offline partitions | |||
allPartitions.keys.foreach { topicPartition => | |||
allPartitions.keys.asScala.foreach { topicPartition => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allPartitions.forEach { (topicPartition, _) =>
onlinePartition(topicPartition).foreach(_.maybeShrinkIsr())
}
Pool.scala
withConcurrentHashMap
.PoolTest.scala
.