Closed
Description
Correct me if I am wrong
Now consumer can not subscribe to the topic. There is only a possibility to assign to a specific partition of topic. And if partition is not specified then the consumer assign to the zero partition.
public function receive($timeout = 0)
{
if (false == $this->subscribed) {
$this->consumer->assign([new TopicPartition(
$this->getQueue()->getQueueName(),
$this->getQueue()->getPartition(),
$this->offset
)]);
$this->subscribed = true;
}
$message = null;
if ($timeout > 0) {
$message = $this->doReceive($timeout);
} else {
while (true) {
if ($message = $this->doReceive(500)) {
break;
}
}
}
return $message;
}
Problems:
- One consumer can only read from one partition
(this contradicts ideology of Kafka https://kafka.apache.org/intro#intro_consumers) - Not possible to automatically rebalance consumer group