diff --git a/kafka_consumer.c b/kafka_consumer.c
index 89c56018..b32a4743 100644
--- a/kafka_consumer.c
+++ b/kafka_consumer.c
@@ -756,7 +756,7 @@ PHP_METHOD(RdKafka__KafkaConsumer, offsetsForTimes)
/* {{{ proto void RdKafka\KafkaConsumer::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms)
Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */
-ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_query_watermark_offsets, 0, 0, 1)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_query_watermark_offsets, 0, 0, 5)
ZEND_ARG_INFO(0, topic)
ZEND_ARG_INFO(0, partition)
ZEND_ARG_INFO(1, low)
diff --git a/message.c b/message.c
index 221c202d..c906cb41 100644
--- a/message.c
+++ b/message.c
@@ -41,13 +41,13 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze
timestamp = rd_kafka_message_timestamp(message, &tstype);
+ zval headers_array;
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
rd_kafka_headers_t *message_headers = NULL;
rd_kafka_resp_err_t header_response;
const char *header_name = NULL;
const void *header_value = NULL;
size_t header_size = 0;
- zval headers_array;
size_t i;
#endif /* HAVE_RD_KAFKA_MESSAGE_HEADERS */
@@ -67,11 +67,11 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze
}
zend_update_property_long(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("offset"), message->offset);
+ array_init(&headers_array);
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
if (message->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_message_headers(message, &message_headers);
if (message_headers != NULL) {
- array_init(&headers_array);
for (i = 0; i < rd_kafka_header_cnt(message_headers); i++) {
header_response = rd_kafka_header_get_all(message_headers, i, &header_name, &header_value, &header_size);
if (header_response != RD_KAFKA_RESP_ERR_NO_ERROR) {
@@ -79,11 +79,11 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze
}
add_assoc_stringl(&headers_array, header_name, (const char*)header_value, header_size);
}
- zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array);
- zval_ptr_dtor(&headers_array);
}
}
#endif
+ zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array);
+ zval_ptr_dtor(&headers_array);
if (msg_opaque != NULL) {
zend_update_property_str(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("opaque"), msg_opaque);
diff --git a/package.xml b/package.xml
index c442fdf4..fa4a78c6 100644
--- a/package.xml
+++ b/package.xml
@@ -10,10 +10,10 @@
arnaud.lb@gmail.com
yes
- 2021-11-19
-
+ 2021-11-27
+
- 5.0.1
+ 5.0.2
4.0.0
@@ -22,13 +22,8 @@
MIT License
- ## Enhancements
- - Add pausePartitions(), resumePartitions() on RdKfaka, RdKafka\KafkaConsumer (#438, @arnaud-lb)
- - Clarify error when KafkaConsumer is closed (@zoonru)
-
## Bugfixes
- - Fix windows build (#440, @nick-zh)
- - Fix crash in RdKafka\Metadata\Topic::getTopic() (#465, @arnaud-lb)
+ - Fix arginfo on getMetadata(), queryWatermarkOffsets() (#494)
@@ -71,6 +66,7 @@
+
@@ -117,6 +113,28 @@
rdkafka
+
+ 2021-11-19
+
+
+ 5.0.1
+ 4.0.0
+
+
+ stable
+ stable
+
+ MIT License
+
+ ## Enhancements
+ - Add pausePartitions(), resumePartitions() on RdKfaka, RdKafka\KafkaConsumer (#438, @arnaud-lb)
+ - Clarify error when KafkaConsumer is closed (@zoonru)
+
+ ## Bugfixes
+ - Fix windows build (#440, @nick-zh)
+ - Fix crash in RdKafka\Metadata\Topic::getTopic() (#465, @arnaud-lb)
+
+
2021-01-14
diff --git a/php_rdkafka.h b/php_rdkafka.h
index 8dd71b7a..e0b5007e 100644
--- a/php_rdkafka.h
+++ b/php_rdkafka.h
@@ -43,7 +43,7 @@ PHP_METHOD(RdKafka, __construct);
extern zend_module_entry rdkafka_module_entry;
#define phpext_rdkafka_ptr &rdkafka_module_entry
-#define PHP_RDKAFKA_VERSION "5.0.1"
+#define PHP_RDKAFKA_VERSION "5.0.2"
extern zend_object_handlers kafka_default_object_handlers;
extern zend_class_entry * ce_kafka_exception;
diff --git a/rdkafka.c b/rdkafka.c
index bd16e09a..3ce5a9ec 100644
--- a/rdkafka.c
+++ b/rdkafka.c
@@ -366,7 +366,7 @@ PHP_METHOD(RdKafka__Kafka, addBrokers)
/* {{{ proto RdKafka\Metadata::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
Request Metadata from broker */
-ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_get_metadata, 0, 0, 1)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_get_metadata, 0, 0, 3)
ZEND_ARG_INFO(0, all_topics)
ZEND_ARG_INFO(0, only_topic)
ZEND_ARG_INFO(0, timeout_ms)
@@ -608,7 +608,7 @@ PHP_METHOD(RdKafka__Kafka, purge)
/* {{{ proto void RdKafka\Kafka::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms)
Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */
-ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_query_watermark_offsets, 0, 0, 1)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_query_watermark_offsets, 0, 0, 5)
ZEND_ARG_INFO(0, topic)
ZEND_ARG_INFO(0, partition)
ZEND_ARG_INFO(1, low)
diff --git a/tests/bug508.phpt b/tests/bug508.phpt
new file mode 100644
index 00000000..c36d9943
--- /dev/null
+++ b/tests/bug508.phpt
@@ -0,0 +1,118 @@
+--TEST--
+Bug 508
+--SKIPIF--
+set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
+$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
+ if ($msg->err) {
+ throw new Exception("Message delivery failed: " . $msg->errstr());
+ }
+ $delivered++;
+});
+
+$producer = new RdKafka\Producer($conf);
+$topic = $producer->newTopic($topicName);
+
+if (!$producer->getMetadata(false, $topic, 10*1000)) {
+ echo "Failed to get metadata, is broker down?\n";
+}
+
+$topic->produce(0, 0, "message");
+
+while ($producer->getOutQLen()) {
+ $producer->poll(50);
+}
+
+printf("%d messages delivered\n", $delivered);
+
+$conf = new RdKafka\Conf();
+$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
+$conf->set('enable.partition.eof', 'true');
+
+$consumer = new RdKafka\Consumer($conf);
+$topic = $consumer->newTopic($topicName);
+$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
+
+while (true) {
+ $msg = $topic->consume(0, 1000);
+ if (!$msg) {
+ continue;
+ }
+ // All props are initialized and readable in all cases
+ var_dump([
+ 'err' => $msg->err,
+ 'topic_name' => $msg->topic_name,
+ 'timestamp' => $msg->timestamp,
+ 'partition' => $msg->partition,
+ 'payload' => $msg->payload,
+ 'len' => $msg->len,
+ 'key' => $msg->key,
+ 'offset' => $msg->offset,
+ 'headers' => $msg->headers,
+ 'opaque' => $msg->opaque,
+ ]);
+ echo "--------------\n";
+ if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
+ echo "EOF\n";
+ break;
+ }
+}
+--EXPECTF--
+1 messages delivered
+array(10) {
+ ["err"]=>
+ int(0)
+ ["topic_name"]=>
+ string(%d) "test_rdkafka_%s"
+ ["timestamp"]=>
+ int(%d)
+ ["partition"]=>
+ int(0)
+ ["payload"]=>
+ string(7) "message"
+ ["len"]=>
+ int(7)
+ ["key"]=>
+ NULL
+ ["offset"]=>
+ int(0)
+ ["headers"]=>
+ array(0) {
+ }
+ ["opaque"]=>
+ NULL
+}
+--------------
+array(10) {
+ ["err"]=>
+ int(-%d)
+ ["topic_name"]=>
+ string(%d) "test_rdkafka_%s"
+ ["timestamp"]=>
+ int(-1)
+ ["partition"]=>
+ int(0)
+ ["payload"]=>
+ string(%d) "%s"
+ ["len"]=>
+ int(%d)
+ ["key"]=>
+ NULL
+ ["offset"]=>
+ int(1)
+ ["headers"]=>
+ array(0) {
+ }
+ ["opaque"]=>
+ NULL
+}
+--------------
+EOF
diff --git a/tests/bug74.phpt b/tests/bug74.phpt
index 04f48572..025baf35 100644
--- a/tests/bug74.phpt
+++ b/tests/bug74.phpt
@@ -16,6 +16,6 @@ $topic = $consumer->newTopic("batman", null);
$producer = new RdKafka\Producer($conf);
if (class_exists('RdKafka\TopicPartition')) {
- $tp = new RdKafka\TopicPartition("batman", 0, null);
+ $tp = new RdKafka\TopicPartition("batman", 0, 0);
}
--EXPECT--