Skip to content

[Bug] [broker] topic policy fail to initialize result into unavailability of the entire cluster. #25116

@thetumbled

Description

@thetumbled

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

3.0.x.

Issue Description

I stopped a bookie running on a faulty disk, and another machine subsequently shut down due to a mechanical fault. This machine hosts one broker and more than 10 bookies.

Then, the throughput in and out decline dramatically. The topic policy module can't initialize successfully, resulting into most of the topics can't be initialized.
Image

Image

Though most of the throughput has decreased, there is still a small portion of inbound and outbound throughput.

We turn off the topic policy module and restart the broker cluster to avoid the issue temporarily.

It is noticed that there are many read request for bookie and the read latency soars to minutes level, which is suspected to be the read request for change_event topic.
Image

It is clear that the direct cause for the problem is the failure of topic policy initialization. However, I don't know the reason why it fail to initialize.
After the cluster restores to availablity, i create readers to simulate the reading logic of topic policy:

       Reader<PulsarEvent> reader = client.newReader(Schema.AVRO(PulsarEvent.class))
               .topic("persistent://XX/XX/__change_events-partition-0")
               .readCompacted(true)
               .startMessageId(MessageId.earliest)
               .create();
        recursiveRead(reader);
    }
    private static void recursiveRead(Reader<PulsarEvent> reader) {
        reader.hasMessageAvailableAsync().whenComplete((hasMore, ex) -> {
            if (hasMore) {
                reader.readNextAsync().thenAccept(msg -> {
                    System.out.println("Time:" + translateTime(msg.getPublishTime()) + " ,Message Key: " + msg.getKey() +
                            " ,Message received: " + msg.getValue());
                    recursiveRead(reader);
                }).exceptionally(e -> {
                    e.printStackTrace();
                    return null;
                });
            } else {
                System.out.println("No more messages available.");
            }
        });
    }

I can read all the messages stored in the compacted topic, so the possibility of data corruption is precluded.

Image

And there are metrics that demonstrates the successful read of change event.
Image

Error messages

ERROR org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService - [XX/XX] Failed to create reader on __change_events topic
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Consumer already closed

Reproducing the issue

we can't reproduce the issue for now.

Additional information

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

type/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions