diff --git a/kafka_consumer.c b/kafka_consumer.c index 89c56018..b32a4743 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -756,7 +756,7 @@ PHP_METHOD(RdKafka__KafkaConsumer, offsetsForTimes) /* {{{ proto void RdKafka\KafkaConsumer::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms) Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */ -ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_query_watermark_offsets, 0, 0, 1) +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_query_watermark_offsets, 0, 0, 5) ZEND_ARG_INFO(0, topic) ZEND_ARG_INFO(0, partition) ZEND_ARG_INFO(1, low) diff --git a/message.c b/message.c index 221c202d..c906cb41 100644 --- a/message.c +++ b/message.c @@ -41,13 +41,13 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze timestamp = rd_kafka_message_timestamp(message, &tstype); + zval headers_array; #ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS rd_kafka_headers_t *message_headers = NULL; rd_kafka_resp_err_t header_response; const char *header_name = NULL; const void *header_value = NULL; size_t header_size = 0; - zval headers_array; size_t i; #endif /* HAVE_RD_KAFKA_MESSAGE_HEADERS */ @@ -67,11 +67,11 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze } zend_update_property_long(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("offset"), message->offset); + array_init(&headers_array); #ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS if (message->err == RD_KAFKA_RESP_ERR_NO_ERROR) { rd_kafka_message_headers(message, &message_headers); if (message_headers != NULL) { - array_init(&headers_array); for (i = 0; i < rd_kafka_header_cnt(message_headers); i++) { header_response = rd_kafka_header_get_all(message_headers, i, &header_name, &header_value, &header_size); if (header_response != RD_KAFKA_RESP_ERR_NO_ERROR) { @@ -79,11 +79,11 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze } add_assoc_stringl(&headers_array, header_name, (const char*)header_value, header_size); } - zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array); - zval_ptr_dtor(&headers_array); } } #endif + zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array); + zval_ptr_dtor(&headers_array); if (msg_opaque != NULL) { zend_update_property_str(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("opaque"), msg_opaque); diff --git a/package.xml b/package.xml index c442fdf4..fa4a78c6 100644 --- a/package.xml +++ b/package.xml @@ -10,10 +10,10 @@ arnaud.lb@gmail.com yes - 2021-11-19 - + 2021-11-27 + - 5.0.1 + 5.0.2 4.0.0 @@ -22,13 +22,8 @@ MIT License - ## Enhancements - - Add pausePartitions(), resumePartitions() on RdKfaka, RdKafka\KafkaConsumer (#438, @arnaud-lb) - - Clarify error when KafkaConsumer is closed (@zoonru) - ## Bugfixes - - Fix windows build (#440, @nick-zh) - - Fix crash in RdKafka\Metadata\Topic::getTopic() (#465, @arnaud-lb) + - Fix arginfo on getMetadata(), queryWatermarkOffsets() (#494) @@ -71,6 +66,7 @@ + @@ -117,6 +113,28 @@ rdkafka + + 2021-11-19 + + + 5.0.1 + 4.0.0 + + + stable + stable + + MIT License + + ## Enhancements + - Add pausePartitions(), resumePartitions() on RdKfaka, RdKafka\KafkaConsumer (#438, @arnaud-lb) + - Clarify error when KafkaConsumer is closed (@zoonru) + + ## Bugfixes + - Fix windows build (#440, @nick-zh) + - Fix crash in RdKafka\Metadata\Topic::getTopic() (#465, @arnaud-lb) + + 2021-01-14 diff --git a/php_rdkafka.h b/php_rdkafka.h index 8dd71b7a..e0b5007e 100644 --- a/php_rdkafka.h +++ b/php_rdkafka.h @@ -43,7 +43,7 @@ PHP_METHOD(RdKafka, __construct); extern zend_module_entry rdkafka_module_entry; #define phpext_rdkafka_ptr &rdkafka_module_entry -#define PHP_RDKAFKA_VERSION "5.0.1" +#define PHP_RDKAFKA_VERSION "5.0.2" extern zend_object_handlers kafka_default_object_handlers; extern zend_class_entry * ce_kafka_exception; diff --git a/rdkafka.c b/rdkafka.c index bd16e09a..3ce5a9ec 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -366,7 +366,7 @@ PHP_METHOD(RdKafka__Kafka, addBrokers) /* {{{ proto RdKafka\Metadata::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms) Request Metadata from broker */ -ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_get_metadata, 0, 0, 1) +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_get_metadata, 0, 0, 3) ZEND_ARG_INFO(0, all_topics) ZEND_ARG_INFO(0, only_topic) ZEND_ARG_INFO(0, timeout_ms) @@ -608,7 +608,7 @@ PHP_METHOD(RdKafka__Kafka, purge) /* {{{ proto void RdKafka\Kafka::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms) Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */ -ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_query_watermark_offsets, 0, 0, 1) +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_query_watermark_offsets, 0, 0, 5) ZEND_ARG_INFO(0, topic) ZEND_ARG_INFO(0, partition) ZEND_ARG_INFO(1, low) diff --git a/tests/bug508.phpt b/tests/bug508.phpt new file mode 100644 index 00000000..c36d9943 --- /dev/null +++ b/tests/bug508.phpt @@ -0,0 +1,118 @@ +--TEST-- +Bug 508 +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) { + if ($msg->err) { + throw new Exception("Message delivery failed: " . $msg->errstr()); + } + $delivered++; +}); + +$producer = new RdKafka\Producer($conf); +$topic = $producer->newTopic($topicName); + +if (!$producer->getMetadata(false, $topic, 10*1000)) { + echo "Failed to get metadata, is broker down?\n"; +} + +$topic->produce(0, 0, "message"); + +while ($producer->getOutQLen()) { + $producer->poll(50); +} + +printf("%d messages delivered\n", $delivered); + +$conf = new RdKafka\Conf(); +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('enable.partition.eof', 'true'); + +$consumer = new RdKafka\Consumer($conf); +$topic = $consumer->newTopic($topicName); +$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); + +while (true) { + $msg = $topic->consume(0, 1000); + if (!$msg) { + continue; + } + // All props are initialized and readable in all cases + var_dump([ + 'err' => $msg->err, + 'topic_name' => $msg->topic_name, + 'timestamp' => $msg->timestamp, + 'partition' => $msg->partition, + 'payload' => $msg->payload, + 'len' => $msg->len, + 'key' => $msg->key, + 'offset' => $msg->offset, + 'headers' => $msg->headers, + 'opaque' => $msg->opaque, + ]); + echo "--------------\n"; + if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) { + echo "EOF\n"; + break; + } +} +--EXPECTF-- +1 messages delivered +array(10) { + ["err"]=> + int(0) + ["topic_name"]=> + string(%d) "test_rdkafka_%s" + ["timestamp"]=> + int(%d) + ["partition"]=> + int(0) + ["payload"]=> + string(7) "message" + ["len"]=> + int(7) + ["key"]=> + NULL + ["offset"]=> + int(0) + ["headers"]=> + array(0) { + } + ["opaque"]=> + NULL +} +-------------- +array(10) { + ["err"]=> + int(-%d) + ["topic_name"]=> + string(%d) "test_rdkafka_%s" + ["timestamp"]=> + int(-1) + ["partition"]=> + int(0) + ["payload"]=> + string(%d) "%s" + ["len"]=> + int(%d) + ["key"]=> + NULL + ["offset"]=> + int(1) + ["headers"]=> + array(0) { + } + ["opaque"]=> + NULL +} +-------------- +EOF diff --git a/tests/bug74.phpt b/tests/bug74.phpt index 04f48572..025baf35 100644 --- a/tests/bug74.phpt +++ b/tests/bug74.phpt @@ -16,6 +16,6 @@ $topic = $consumer->newTopic("batman", null); $producer = new RdKafka\Producer($conf); if (class_exists('RdKafka\TopicPartition')) { - $tp = new RdKafka\TopicPartition("batman", 0, null); + $tp = new RdKafka\TopicPartition("batman", 0, 0); } --EXPECT--