diff --git a/README.md b/README.md index 988b077..8c515ff 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,36 @@ Dev build: [![Build Status](https://travis-ci.org/EVODelavega/phpkafka.svg?branc SRP build: [![Build Status](https://travis-ci.org/EVODelavega/phpkafka.svg?branch=feature%2FSRP)](https://travis-ci.org/EVODelavega/phpkafka) +## Common issues: + +Here's a short list of common issues people run into when installing this extension (only 1 issue so far) + +#### _"Unable to load dynamic library '/usr/lib64/php/modules/kafka.so' - librdkafka.so.1"_ + +What this, basically means is that PHP can't find the shared object (librdkafka) anywhere. Thankfully, the fix is trivial: +First, make sure you've actually compiled and installed librdkafka. Then run these commands: + +```bash +sudo updatedb +locate librdkafka.so.1 # locate might not exist on some systems, like slackware, which uses slocate +``` + +The output should show a full path to the `librdkafka.so.1` file, probably _"/usr/local/lib/librdkafka.so.1"_. Edit `/etc/ld.so.conf` to make sure _"/usr/local/lib"_ is included when searching for libraries. Either add id directly to the aforementioned file, or if your system uses a /etc/ld.so.conf.d/ directory, create a new .conf file there: + +```bash +sudo touch /etc/ld.so.conf.d/librd.conf +echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf +``` + +Or simply type `vim /etc/ld.so.conf.d/librd.conf`, when the editor opens, tap _":"_ (colon), and run the command `read !locate librdkafka.so.1`, delete the filename from the path (move your cursor to the last `/` of the line that just appeared in the file and type `d$` (delete until end of line). Save and close the file (`:wq`). + +Once updated, run the following: + +```bash +sudo ldconfig +``` + + _Note:_ Whatever gets merged into the master branch should work just fine. The main dev build is where small tweaks, bugfixes and minor improvements are tested (ie sort-of beta branch). diff --git a/kafka.c b/kafka.c index 2406cb5..4b7860e 100644 --- a/kafka.c +++ b/kafka.c @@ -394,7 +394,7 @@ static void kafka_init( rd_kafka_type_t type ) } } -int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len) +int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len, long timeout) { char errstr[512]; rd_kafka_topic_t *rkt = NULL; @@ -411,7 +411,30 @@ int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_le } return -2; } + + /* Topic configuration */ + conf = rd_kafka_topic_conf_new(); + rd_kafka_topic_conf_set(conf,"produce.offset.report", "true", errstr, sizeof errstr ); + + char timeoutStr[64]; + snprintf(timeoutStr, 64, "%lu", timeout); + if (rd_kafka_topic_conf_set(conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog( + LOG_ERR, + "Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s", + timeout, + errstr + ); + } + rd_kafka_topic_conf_destroy(conf); + return -3; + } + //callback already set in kafka_set_connection rkt = rd_kafka_topic_new(r, topic, conf); if (!rkt) @@ -424,6 +447,7 @@ int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_le rd_kafka_topic_conf_destroy(conf); return -1; } + //begin producing: if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, msg, msg_len,NULL, 0,&pcb) == -1) { @@ -444,8 +468,9 @@ int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_le return 0; } -int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report) +int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report, long timeout) { + char errstr[512]; rd_kafka_topic_t *rkt; struct produce_cb_params pcb = {msg_cnt, 0, 0, 0, 0, NULL}; void *opaque; @@ -472,6 +497,24 @@ int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, in /* Topic configuration */ topic_conf = rd_kafka_topic_conf_new(); + char timeoutStr[64]; + snprintf(timeoutStr, 64, "%lu", timeout); + if (rd_kafka_topic_conf_set(topic_conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog( + LOG_ERR, + "Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s", + timeout, + errstr + ); + } + rd_kafka_topic_conf_destroy(topic_conf); + return -3; + } + /* Create topic */ rkt = rd_kafka_topic_new(r, topic, topic_conf); @@ -529,9 +572,10 @@ int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, in return err_cnt; } -int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report) +int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report, long timeout) { + char errstr[512]; rd_kafka_topic_t *rkt; struct produce_cb_params pcb = {1, 0, 0, 0, 0, NULL}; void *opaque; @@ -558,6 +602,24 @@ int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report /* Topic configuration */ topic_conf = rd_kafka_topic_conf_new(); + char timeoutStr[64]; + snprintf(timeoutStr, 64, "%lu", timeout); + if (rd_kafka_topic_conf_set(topic_conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog( + LOG_ERR, + "Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s", + timeout, + errstr + ); + } + rd_kafka_topic_conf_destroy(topic_conf); + return -3; + } + /* Create topic */ rkt = rd_kafka_topic_new(r, topic, topic_conf); @@ -706,11 +768,12 @@ void queue_consume(rd_kafka_message_t *message, void *opaque) params->read_count -= 1; //add message to return value (perhaps add as array -> offset + msg? if (message->len > 0) { - //ensure there is a payload - char payload[(int) message->len]; - sprintf(payload, "%.*s", (int) message->len, (char *) message->payload); - //add_index_string(return_value, (int) message->offset, payload, 1); - add_next_index_string(return_value, payload, 1); + add_next_index_stringl( + return_value, + (char *) message->payload, + (int) message->len, + 1 + ); } else { add_next_index_string(return_value, "", 1); } @@ -1155,16 +1218,17 @@ int kafka_consume(rd_kafka_t *r, zval* return_value, char* topic, char* offset, { if ((int) rkmessage_return->len > 0) { - //ensure there is a payload - char payload[(int) rkmessage_return->len]; - sprintf(payload, "%.*s", (int) rkmessage_return->len, (char *) rkmessage_return->payload); - add_index_string(return_value, (int) rkmessage_return->offset, payload, 1); + add_index_stringl( + return_value, + (int) rkmessage_return->offset, + (char *) rkmessage_return->payload, + (int) rkmessage_return->len, + 1 + ); } else { - //add empty value - char payload[1] = "";//empty string - add_index_string(return_value, (int) rkmessage_return->offset, payload, 1); + add_index_string(return_value, (int) rkmessage_return->offset, "", 1); } } /* Return message to rdkafka */ diff --git a/kafka.h b/kafka.h index 4f6dca2..abdf57c 100644 --- a/kafka.h +++ b/kafka.h @@ -36,9 +36,9 @@ typedef struct connection_params_s { void kafka_setup(char *brokers); void kafka_set_log_level(int ll); void kafka_set_partition(int partition); -int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report); -int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len); -int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report); +int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report, long timeout); +int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len, long timeout); +int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report, long timeout); rd_kafka_t *kafka_set_connection(rd_kafka_type_t type, const char *b, int report_level, const char *compression); rd_kafka_t *kafka_get_connection(kafka_connection_params params, const char *brokers); int kafka_consume(rd_kafka_t *r, zval* return_value, char* topic, char* offset, int item_count, int partition); diff --git a/php_kafka.c b/php_kafka.c index 4632312..6950921 100644 --- a/php_kafka.c +++ b/php_kafka.c @@ -82,14 +82,16 @@ ZEND_BEGIN_ARG_INFO(arginf_kafka_set_get_partition, 0) ZEND_ARG_INFO(0, mode) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_INFO(arginf_kafka_produce, 0) +ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_produce, 0, 0, 2) ZEND_ARG_INFO(0, topic) ZEND_ARG_INFO(0, message) + ZEND_ARG_INFO(0, timeout) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_INFO(arginf_kafka_produce_batch, 0) +ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_produce_batch, 0, 0, 2) ZEND_ARG_INFO(0, topic) ZEND_ARG_INFO(0, messages) + ZEND_ARG_INFO(0, timeout) ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_consume, 0, 0, 2) @@ -1012,7 +1014,7 @@ PHP_METHOD(Kafka, disconnect) } /* }}} end Kafka::disconnect */ -/* {{{ proto Kafka Kafka::produce( string $topic, string $message); +/* {{{ proto Kafka Kafka::produce( string $topic, string $message [, int $timeout = 60000]); Produce a message, returns instance or throws KafkaException in case something went wrong */ @@ -1023,14 +1025,16 @@ PHP_METHOD(Kafka, produce) char *topic; char *msg; long reporting = connection->delivery_confirm_mode; + long timeout = 60000; int topic_len, msg_len, status = 0; - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|l", &topic, &topic_len, - &msg, &msg_len) == FAILURE) { + &msg, &msg_len, + &timeout) == FAILURE) { return; } if (!connection->producer) @@ -1055,9 +1059,9 @@ PHP_METHOD(Kafka, produce) (int) connection->producer_partition ); if (connection->delivery_confirm_mode == PHP_KAFKA_CONFIRM_EXTENDED) - status = kafka_produce_report(connection->producer, topic, msg, msg_len); + status = kafka_produce_report(connection->producer, topic, msg, msg_len, timeout); else - status = kafka_produce(connection->producer, topic, msg, msg_len, connection->delivery_confirm_mode); + status = kafka_produce(connection->producer, topic, msg, msg_len, connection->delivery_confirm_mode, timeout); switch (status) { case -1: @@ -1066,12 +1070,15 @@ PHP_METHOD(Kafka, produce) case -2: zend_throw_exception(kafka_exception, "Connection failure, cannot produce message", 0 TSRMLS_CC); return; + case -3: + zend_throw_exception(kafka_exception, "Topic configuration error", 0 TSRMLS_CC); + return; } RETURN_ZVAL(object, 1, 0); } /* }}} end Kafka::produce */ -/* {{{ proto Kafka Kafka::produceBatch( string $topic, array $messages); +/* {{{ proto Kafka Kafka::produceBatch( string $topic, array $messages [, int $timeout = 60000]); Produce a batch of messages, returns instance or throws exceptions in case of error */ @@ -1086,14 +1093,16 @@ PHP_METHOD(Kafka, produceBatch) char *msg_batch[50]; int msg_batch_len[50] = {0}; long reporting = connection->delivery_confirm_mode; + long timeout = 60000; int topic_len, msg_len, current_idx = 0, status = 0; HashPosition pos; - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sa", + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sa|l", &topic, &topic_len, - &arr) == FAILURE) { + &arr, + &timeout) == FAILURE) { return; } //get producer up and running @@ -1131,7 +1140,7 @@ PHP_METHOD(Kafka, produceBatch) ++current_idx; if (current_idx == 50) { - status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode); + status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode, timeout); if (status) { if (status < 0) @@ -1151,7 +1160,7 @@ PHP_METHOD(Kafka, produceBatch) } if (current_idx) {//we still have some messages to produce... - status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode); + status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode, timeout); if (status) { if (status < 0) diff --git a/stub/Kafka.class.php b/stub/Kafka.class.php index 6572406..3e32ebd 100644 --- a/stub/Kafka.class.php +++ b/stub/Kafka.class.php @@ -235,10 +235,11 @@ public function isConnected($mode = null) * produce message on topic * @param string $topic * @param string $message + * @param int $timeout * @return $this * @throws \KafkaException */ - public function produce($topic, $message) + public function produce($topic, $message, $timeout=5000) { $this->connected = true; //internal call, produce message on topic @@ -251,10 +252,11 @@ public function produce($topic, $message) * Causing any overhead (internally, array is iterated, and produced * @param string $topic * @param array $messages + * @param int $timeout * @return $this * @throws \KafkaException */ - public function produceBatch($topic, array $messages) + public function produceBatch($topic, array $messages, $timeout=5000) { foreach ($messages as $msg) { //non-string messages are skipped silently ATM