Skip to content

Commit 3012e9f

Browse files
committed
Call flush method on shutdown / context closing
This allows compatibility with phprdkafka 4.0
1 parent f63b08d commit 3012e9f

5 files changed

+41
-21
lines changed

RdKafkaConnectionFactory.php

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class RdKafkaConnectionFactory implements ConnectionFactory
2828
* 'partitioner' => null, // https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-topicconf.setpartitioner.html
2929
* 'log_level' => null,
3030
* 'commit_async' => false,
31+
* 'shutdown_timeout' => -1, // https://github.com/arnaud-lb/php-rdkafka#proper-shutdown
3132
* ]
3233
*
3334
* or

RdKafkaContext.php

+23-15
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class RdKafkaContext implements Context
3636
private $conf;
3737

3838
/**
39-
* @var Producer
39+
* @var RdKafkaProducer
4040
*/
4141
private $producer;
4242

@@ -96,7 +96,23 @@ public function createTemporaryQueue(): Queue
9696
*/
9797
public function createProducer(): Producer
9898
{
99-
return new RdKafkaProducer($this->getProducer(), $this->getSerializer());
99+
if (!isset($this->producer)) {
100+
$producer = new VendorProducer($this->getConf());
101+
102+
if (isset($this->config['log_level'])) {
103+
$producer->setLogLevel($this->config['log_level']);
104+
}
105+
106+
$this->producer = new RdKafkaProducer($producer, $this->getSerializer());
107+
108+
// Once created RdKafkaProducer can store messages internally that need to be delivered before PHP shuts
109+
// down. Otherwise, we are bound to lose messages in transit.
110+
// Note that it is generally preferable to call "close" method explicitly before shutdown starts, since
111+
// otherwise we might not have access to some objects, like database connections.
112+
register_shutdown_function([$this->producer, 'flush'], $this->config['shutdown_timeout'] ?? -1);
113+
}
114+
115+
return $this->producer;
100116
}
101117

102118
/**
@@ -139,6 +155,11 @@ public function close(): void
139155
foreach ($kafkaConsumers as $kafkaConsumer) {
140156
$kafkaConsumer->unsubscribe();
141157
}
158+
159+
// Compatibility with phprdkafka 4.0.
160+
if (isset($this->producer)) {
161+
$this->producer->flush($this->config['shutdown_timeout'] ?? -1);
162+
}
142163
}
143164

144165
public function createSubscriptionConsumer(): SubscriptionConsumer
@@ -163,19 +184,6 @@ public static function getLibrdKafkaVersion(): string
163184
return "$major.$minor.$patch";
164185
}
165186

166-
private function getProducer(): VendorProducer
167-
{
168-
if (null === $this->producer) {
169-
$this->producer = new VendorProducer($this->getConf());
170-
171-
if (isset($this->config['log_level'])) {
172-
$this->producer->setLogLevel($this->config['log_level']);
173-
}
174-
}
175-
176-
return $this->producer;
177-
}
178-
179187
private function getConf(): Conf
180188
{
181189
if (null === $this->conf) {

RdKafkaProducer.php

+8
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,12 @@ public function getTimeToLive(): ?int
111111
{
112112
return null;
113113
}
114+
115+
public function flush(int $timeout): void
116+
{
117+
// Flush method is exposed in phprdkafka 4.0
118+
if (method_exists($this->producer, 'flush')) {
119+
$this->producer->flush($timeout);
120+
}
121+
}
114122
}

Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php

+7-4
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@ public function test()
1919

2020
$topic = $this->createTopic($context, uniqid('', true));
2121

22-
$consumer = $context->createConsumer($topic);
23-
2422
$expectedBody = __CLASS__.time();
23+
$producer = $context->createProducer();
24+
$producer->send($topic, $context->createMessage($expectedBody));
25+
26+
// Calling close causes Producer to flush (wait for messages to be delivered to Kafka)
27+
$context->close();
28+
29+
$consumer = $context->createConsumer($topic);
2530

2631
$context->createProducer()->send($topic, $context->createMessage($expectedBody));
2732

@@ -48,8 +53,6 @@ protected function createContext()
4853

4954
$context = (new RdKafkaConnectionFactory($config))->createContext();
5055

51-
sleep(3);
52-
5356
return $context;
5457
}
5558
}

composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
"license": "MIT",
88
"require": {
99
"php": "^7.1.3",
10-
"ext-rdkafka": "^3.0.3",
10+
"ext-rdkafka": "^3.0.3|^4.0",
1111
"queue-interop/queue-interop": "^0.8"
1212
},
1313
"require-dev": {
1414
"phpunit/phpunit": "~7.5",
1515
"enqueue/test": "0.10.x-dev",
1616
"enqueue/null": "0.10.x-dev",
1717
"queue-interop/queue-spec": "^0.6",
18-
"kwn/php-rdkafka-stubs": "^1.0.2"
18+
"kwn/php-rdkafka-stubs": "^1.0.2 | ^2.0"
1919
},
2020
"support": {
2121
"email": "[email protected]",

0 commit comments

Comments
 (0)