-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
3.0.x.
Issue Description
I stopped a bookie running on a faulty disk, and another machine subsequently shut down due to a mechanical fault. This machine hosts one broker and more than 10 bookies.
Then, the throughput in and out decline dramatically. The topic policy module can't initialize successfully, resulting into most of the topics can't be initialized.

Though most of the throughput has decreased, there is still a small portion of inbound and outbound throughput.
We turn off the topic policy module and restart the broker cluster to avoid the issue temporarily.
It is noticed that there are many read request for bookie and the read latency soars to minutes level, which is suspected to be the read request for change_event topic.

It is clear that the direct cause for the problem is the failure of topic policy initialization. However, I don't know the reason why it fail to initialize.
After the cluster restores to availablity, i create readers to simulate the reading logic of topic policy:
Reader<PulsarEvent> reader = client.newReader(Schema.AVRO(PulsarEvent.class))
.topic("persistent://XX/XX/__change_events-partition-0")
.readCompacted(true)
.startMessageId(MessageId.earliest)
.create();
recursiveRead(reader);
}
private static void recursiveRead(Reader<PulsarEvent> reader) {
reader.hasMessageAvailableAsync().whenComplete((hasMore, ex) -> {
if (hasMore) {
reader.readNextAsync().thenAccept(msg -> {
System.out.println("Time:" + translateTime(msg.getPublishTime()) + " ,Message Key: " + msg.getKey() +
" ,Message received: " + msg.getValue());
recursiveRead(reader);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
} else {
System.out.println("No more messages available.");
}
});
}
I can read all the messages stored in the compacted topic, so the possibility of data corruption is precluded.
And there are metrics that demonstrates the successful read of change event.

Error messages
ERROR org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService - [XX/XX] Failed to create reader on __change_events topic
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Consumer already closed
Reproducing the issue
we can't reproduce the issue for now.
Additional information
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!