diff --git a/.gitignore b/.gitignore index e2fd78c..10c0437 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,7 @@ mkinstalldirs run-tests.php autom4te.cache nbproject/ -.project \ No newline at end of file +.project +test2.php +recompile.sh + diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..273d90c --- /dev/null +++ b/.travis.yml @@ -0,0 +1,11 @@ +language: c +sudo: required +compiler: + - gcc +os: + - linux + +before_install: + - sudo apt-get update -qq + - sudo apt-get install -y php5-dev php5-cli +script: ./travis.sh diff --git a/CREDITS b/CREDITS index 938ef40..d4051e5 100644 --- a/CREDITS +++ b/CREDITS @@ -1,3 +1,4 @@ phpkafka Aleksandar Babic -Patrick Reilly \ No newline at end of file +Patrick Reilly +Elias Van Ootegem diff --git a/README.md b/README.md index 6f4bc46..8c515ff 100644 --- a/README.md +++ b/README.md @@ -1,34 +1,159 @@ -phpkafka -======== - -**Note: The library is not supported by author anymore. Please check other forks or make your own.** - -PHP extension for **Apache Kafka 0.8**. It's built on top of kafka C driver ([librdkafka](https://github.com/edenhill/librdkafka/)). -It makes persistent connection to kafka broker with non-blocking calls, so it should be very fast. - -IMPORTANT: Library is in heavy development and some features are not implemented yet. - -Requirements: -------------- -Download and install [librdkafka](https://github.com/edenhill/librdkafka/). Run `sudo ldconfig` to update shared libraries. - -Installing PHP extension: ----------- -```bash -phpize -./configure --enable-kafka -make -sudo make install -sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/conf.d/kafka.ini' -#For CLI mode: -sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/cli/conf.d/20-kafka.ini' -``` - -Examples: --------- -```php -// Produce a message -$kafka = new Kafka("localhost:9092"); -$kafka->produce("topic_name", "message content"); -$kafka->consume("topic_name", 1172556); -``` +Master build: [![Build Status](https://travis-ci.org/EVODelavega/phpkafka.svg?branch=master)](https://travis-ci.org/EVODelavega/phpkafka) + +Dev build: [![Build Status](https://travis-ci.org/EVODelavega/phpkafka.svg?branch=consume-with-meta)](https://travis-ci.org/EVODelavega/phpkafka) + +SRP build: [![Build Status](https://travis-ci.org/EVODelavega/phpkafka.svg?branch=feature%2FSRP)](https://travis-ci.org/EVODelavega/phpkafka) + +## Common issues: + +Here's a short list of common issues people run into when installing this extension (only 1 issue so far) + +#### _"Unable to load dynamic library '/usr/lib64/php/modules/kafka.so' - librdkafka.so.1"_ + +What this, basically means is that PHP can't find the shared object (librdkafka) anywhere. Thankfully, the fix is trivial: +First, make sure you've actually compiled and installed librdkafka. Then run these commands: + +```bash +sudo updatedb +locate librdkafka.so.1 # locate might not exist on some systems, like slackware, which uses slocate +``` + +The output should show a full path to the `librdkafka.so.1` file, probably _"/usr/local/lib/librdkafka.so.1"_. Edit `/etc/ld.so.conf` to make sure _"/usr/local/lib"_ is included when searching for libraries. Either add id directly to the aforementioned file, or if your system uses a /etc/ld.so.conf.d/ directory, create a new .conf file there: + +```bash +sudo touch /etc/ld.so.conf.d/librd.conf +echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf +``` + +Or simply type `vim /etc/ld.so.conf.d/librd.conf`, when the editor opens, tap _":"_ (colon), and run the command `read !locate librdkafka.so.1`, delete the filename from the path (move your cursor to the last `/` of the line that just appeared in the file and type `d$` (delete until end of line). Save and close the file (`:wq`). + +Once updated, run the following: + +```bash +sudo ldconfig +``` + + +_Note:_ + +Whatever gets merged into the master branch should work just fine. The main dev build is where small tweaks, bugfixes and minor improvements are tested (ie sort-of beta branch). + +The SRP build is a long-term dev branch, where I'm currently in the process of separating the monolithic `Kafka` class into various logical sub-classes (a `KafkaTopic` class, perhaps a `KafkaMeta` object, `KafkaConfig` is another candidate...) to make this extension as intuitive as I can. + +#This fork is still being actively developed + +Given that the original repo is no longer supported by the author, I've decided to keep working at this PHP-Kafka extension instead. +The branch where most of the work is being done is the `consume-with-meta` branch. + +Changes that have happened thusfar: + +* Timeout when disconnecting is reduced (significantly) +* Connections can be closed as you go +* The librdkafka meta API is used +* New methods added (`Kafka::getTopics`, `Kafka::getPartitionsFor($topic)` most notable additions) +* `Kafka::set_partition` is deprecated, in favour of the more PSR-compliant `Kafka::setPartition` method +* A PHP stub was added for IDE code-completion +* Argument checks were added, and exceptions are thrown in some places +* Class constants for an easier API (`Kafka::OFFSET_*`) +* The extension logged everything in `/var/etc/syslog`, this is still the default behaviour (as this extension is under development), but can be turned off (`Kafka::setLogLevel(Kafka::LOG_OFF)`) +* Exceptions (`KafkaException`) in case of errors (still work in progress, though) +* Thread-safe Kafka connections +* Easy configuration: passing an array of options to the constructor, `setBrokers` or `setOptions` method like you would with `PDO` +* Compression support added (when produing messages, a compressed message is returned _as-is_) +* Each instance holds 2 distinct connections (at most): a producer and a consumer +* CI (travis), though there is a lot of work to be done putting together useful tests + +Changes that are on the _TODO_ list include: + +* Separating kafka meta information out into a separate class (`KafkaTopic` and `KafkaMessage` classes) +* Allow PHP to determine what the timeouts should be (mainly when disconnecting, or producing messages) (do we still need this?) +* Add custom exceptions (partially done) +* Overall API improvements (!!) +* Performance - it's what you make of it (test results varied from 2 messages/sec to 2.5 million messages per second - see examples below) +* Adding tests to the build (very much a work in progress) +* PHP7 support + +All help is welcome, of course... + + +PHP extension for **Apache Kafka 0.8**. It's built on top of kafka C driver ([librdkafka](https://github.com/edenhill/librdkafka/)). +This extension requires the version 0.8.6 (ubuntu's librdkafka packages won't do it, they do not implement the meta API yet). + +IMPORTANT: Library is in heavy development and some features are not implemented yet. + +Requirements: +------------- +Download and install [librdkafka](https://github.com/edenhill/librdkafka/). Run `sudo ldconfig` to update shared libraries. + +Installing PHP extension: +---------- +```bash +phpize +./configure --enable-kafka +make +sudo make install +sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/conf.d/kafka.ini' +#For CLI mode: +sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/cli/conf.d/20-kafka.ini' +``` + +Examples: +-------- +```php +// Produce a message +$kafka = new Kafka("localhost:9092"); +$kafka->produce("topic_name", "message content"); +//get all the available partitions +$partitions = $kafka->getPartitionsForTopic('topic_name'); +//use it to OPTIONALLY specify a partition to consume from +//if not, consuming IS slower. To set the partition: +$kafka->setPartition($partitions[0]);//set to first partition +//then consume, for example, starting with the first offset, consume 20 messages +$msg = $kafka->consume("topic_name", Kafka::OFFSET_BEGIN, 20); +var_dump($msg);//dumps array of messages +``` + +A more complete example of how to use this extension if performance is what you're after: + +```php +$kafka = new Kafka( + 'broker-1:9092,broker-2:9092', + [ + Kafka::LOGLEVEL => Kafka::LOG_OFF,//while in dev, default is Kafka::LOG_ON + Kafka::CONFIRM_DELIVERY => Kafka::CONFIRM_OFF,//default is Kafka::CONFIRM_BASIC + Kafka::RETRY_COUNT => 1,//default is 3 + Kafka::RETRY_INTERVAL => 25,//default is 100 + ] +); +$fh = fopen('big_data_file.csv', 'r'); +if (!$fh) + exit(1); +$count = 0; +$lines = []; +while ($line = fgets($fh, 2048)) +{ + $lines[] = trim($line); + ++$count; + if ($count >= 200) + { + $kafka->produceBatch('my_topic', $lines); + $lines = []; + $count = 0; + //in theory, the next bit is optional, but Kafka::disconnect + //waits for the out queue to be empty before closing connections + //it's a way to sort-of ensure messages are delivered, even though Kafka::CONFIRM_DELIVERY + //was set to Kafka::CONFIRM_OFF... This approach can be used to speed up your code + $kafka->disconnect(Kafka::MODE_PRODUCER);//disconnect the producer + } +} +if ($count) +{ + $kafka->produceBatch('my_topic', $lines); +} +$kafka->disconnect();//disconnects all opened connections, in this case, only a producer connection will exist, though +``` + +I've used code very similar to the code above to produce ~3 million messages, and got an average throughput rate of 2000 messages/second. +Removing the disconnect call, or increasing the batches to produce will change the rate at which messages get produced. + +Not disconnecting at all yielded the best performance (by far): 2.5 million messages in just over 1 second (though depending on the output buffer, and how kafka is set up to handle full produce-queue's, this is not to be recommended!). diff --git a/kafka.c b/kafka.c index 69f8b5c..4b7860e 100644 --- a/kafka.c +++ b/kafka.c @@ -1,5 +1,5 @@ /** - * Copyright 2013-2014 Patrick Reilly. + * Copyright 2015 Elias Van Ootegem. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,29 +12,50 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Special thanks to Patrick Reilly and Aleksandar Babic for their work + * On which this extension was actually built. */ #include #include "kafka.h" #include - +#include #include -#include #include #include #include #include #include #include -#include #include #include "kafka.h" #include "librdkafka/rdkafka.h" -static int run = 1; -static rd_kafka_t *rk; -static int exit_eof = 1; //Exit consumer when last message +struct consume_cb_params { + int read_count; + zval *return_value; + union { + int *partition_ends; + long *partition_offset; + }; + int error_count; + int eop; + int auto_commit; +}; + +struct produce_cb_params { + int msg_count; + int err_count; + int offset; + int partition; + int errmsg_len; + char *err_msg; +}; + +static int log_level = 1; +static rd_kafka_t *rk = NULL; +static rd_kafka_type_t rk_type; char *brokers = "localhost:9092"; -int64_t start_offset = 0; int partition = RD_KAFKA_PARTITION_UA; void kafka_connect(char *brokers) @@ -42,35 +63,269 @@ void kafka_connect(char *brokers) kafka_setup(brokers); } -void kafka_set_partition(int partition_selected) +void kafka_set_log_level( int ll ) { - partition = partition_selected; + log_level = ll; } -void kafka_stop(int sig) { - run = 0; - fclose(stdin); /* abort fgets() */ - rd_kafka_destroy(rk); - rk = NULL; +void kafka_msg_delivered (rd_kafka_t *rk, + void *payload, size_t len, + int error_code, + void *opaque, void *msg_opaque) +{ + if (error_code && log_level) { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - Message delivery failed: %s", + rd_kafka_err2str(error_code)); + } } -void kafka_err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) { - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - ERROR CALLBACK: %s: %s: %s\n", +void kafka_err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) +{ + if (log_level) { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk), rd_kafka_err2str(err), reason); + } + if (rk) + rd_kafka_destroy(rk); +} - kafka_stop(err); +void kafka_produce_cb_simple(rd_kafka_t *rk, void *payload, size_t len, int err_code, void *opaque, void *msg_opaque) +{ + struct produce_cb_params *params = msg_opaque; + if (params) + { + params->msg_count -=1; + } + if (log_level) + { + if (params) + params->err_count += 1; + openlog("phpkafka", 0, LOG_USER); + if (err_code) + syslog(LOG_ERR, "Failed to deliver message %s: %s", (char *) payload, rd_kafka_err2str(err_code)); + else + syslog(LOG_DEBUG, "Successfuly delevired message (%zd bytes)", len); + } } -void kafka_msg_delivered (rd_kafka_t *rk, - void *payload, size_t len, - int error_code, - void *opaque, void *msg_opaque) { - if (error_code) { +void kafka_produce_detailed_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) +{ + struct produce_cb_params *params = opaque; + if (params) + { + params->msg_count -= 1; + } + if (msg->err) + { + int offset = params->errmsg_len, + err_len = 0; + const char *errstr = rd_kafka_message_errstr(msg); + err_len = strlen(errstr); + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "Failed to deliver message: %s", errstr); + } + if (params) + { + params->err_count += 1; + params->err_msg = realloc( + params->err_msg, + (offset + err_len + 2) * sizeof params->err_msg + ); + if (params->err_msg == NULL) + { + params->errmsg_len = 0; + } + else + { + strcpy( + params->err_msg + offset, + errstr + ); + offset += err_len;//get new strlen + params->err_msg[offset] = '\n';//add new line + ++offset; + params->err_msg[offset] = '\0';//ensure zero terminated string + } + } + return; + } + if (params) + { + params->offset = msg->offset; + params->partition = msg->partition; + } +} + +rd_kafka_t *kafka_get_connection(kafka_connection_params params, const char *brokers) +{ + rd_kafka_t *r = NULL; + char errstr[512]; + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + //set error callback + rd_kafka_conf_set_error_cb(conf, kafka_err_cb); + if (params.type == RD_KAFKA_CONSUMER) + { + if (params.queue_buffer) + rd_kafka_conf_set(conf, "queued.min.messages", params.queue_buffer, NULL, 0); + r = rd_kafka_new(params.type, conf, errstr, sizeof errstr); + if (!r) + { + if (params.log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "Failed to connect to kafka: %s", errstr); + } + //destroy config, no connection to use it... + rd_kafka_conf_destroy(conf); + return NULL; + } + if (!rd_kafka_brokers_add(r, brokers)) + { + if (params.log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "Failed to connect to brokers %s", brokers); + } + rd_kafka_destroy(r); + return NULL; + } + return r; + } + if (params.compression) + { + rd_kafka_conf_res_t result = rd_kafka_conf_set( + conf, "compression.codec",params.compression, errstr, sizeof errstr + ); + if (result != RD_KAFKA_CONF_OK) + { + if (params.log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ALERT, "Failed to set compression %s: %s", params.compression, errstr); + } + rd_kafka_conf_destroy(conf); + return NULL; + } + } + if (params.retry_count) + { + rd_kafka_conf_res_t result = rd_kafka_conf_set( + conf, "message.send.max.retries",params.retry_count, errstr, sizeof errstr + ); + if (result != RD_KAFKA_CONF_OK) + { + if (params.log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ALERT, "Failed to set compression %s: %s", params.compression, errstr); + } + rd_kafka_conf_destroy(conf); + return NULL; + } + } + if (params.retry_interval) + { + rd_kafka_conf_res_t result = rd_kafka_conf_set( + conf, "retry.backoff.ms",params.retry_interval, errstr, sizeof errstr + ); + if (result != RD_KAFKA_CONF_OK) + { + if (params.log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ALERT, "Failed to set compression %s: %s", params.compression, errstr); + } + rd_kafka_conf_destroy(conf); + return NULL; + } + } + if (params.reporting == 1) + rd_kafka_conf_set_dr_cb(conf, kafka_produce_cb_simple); + else if (params.reporting == 2) + rd_kafka_conf_set_dr_msg_cb(conf, kafka_produce_detailed_cb); + r = rd_kafka_new(params.type, conf, errstr, sizeof errstr); + if (!r) + { + if (params.log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "Failed to connect to kafka: %s", errstr); + } + //destroy config, no connection to use it... + rd_kafka_conf_destroy(conf); + return NULL; + } + if (!rd_kafka_brokers_add(r, brokers)) + { + if (params.log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "Failed to connect to brokers %s", brokers); + } + rd_kafka_destroy(r); + return NULL; + } + return r; +} + +rd_kafka_t *kafka_set_connection(rd_kafka_type_t type, const char *b, int report_level, const char *compression) +{ + rd_kafka_t *r = NULL; + char *tmp = brokers; + char errstr[512]; + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + if (!(r = rd_kafka_new(type, conf, errstr, sizeof(errstr)))) { + if (log_level) { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - failed to create new producer: %s", errstr); + } + exit(1); + } + /* Add brokers */ + if (rd_kafka_brokers_add(r, b) == 0) { + if (log_level) { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "php kafka - No valid brokers specified"); + } + exit(1); + } + /* Set up a message delivery report callback. + * It will be called once for each message, either on successful + * delivery to broker, or upon failure to deliver to broker. */ + if (type == RD_KAFKA_PRODUCER) + { + if (compression && !strcmp(compression, "none")) + {//silently fail on error ATM... + if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf, "compression.codec", compression, errstr, sizeof errstr)) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "Failed to set compression to %s", compression); + } + } + } + if (report_level == 1) + rd_kafka_conf_set_dr_cb(conf, kafka_produce_cb_simple); + else if (report_level == 2) + rd_kafka_conf_set_dr_msg_cb(conf, kafka_produce_detailed_cb); + } + rd_kafka_conf_set_error_cb(conf, kafka_err_cb); + + if (log_level) { openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - Message delivery failed: %s", - rd_kafka_err2str(error_code)); + syslog(LOG_INFO, "phpkafka - using: %s", brokers); } + return r; +} + +void kafka_set_partition(int partition_selected) +{ + partition = partition_selected; } void kafka_setup(char* brokers_list) @@ -78,61 +333,295 @@ void kafka_setup(char* brokers_list) brokers = brokers_list; } -void kafka_destroy() +void kafka_destroy(rd_kafka_t *r, int timeout) { - if(rk != NULL) { + if(r != NULL) + { + //poll handle status + rd_kafka_poll(r, 0); + if (rd_kafka_outq_len(r) > 0) + {//wait for out-queue to clear + while(rd_kafka_outq_len(r) > 0) + rd_kafka_poll(r, timeout); + timeout = 1; + } + rd_kafka_destroy(r); + //this wait is blocking PHP + //not calling it will yield segfault, though + rd_kafka_wait_destroyed(timeout); + r = NULL; + } +} + +//We're no longer relying on the global rk variable (not thread-safe) +static void kafka_init( rd_kafka_type_t type ) +{ + if (rk && type != rk_type) + { rd_kafka_destroy(rk); - rd_kafka_wait_destroyed(1000); rk = NULL; } + if (rk == NULL) + { + char errstr[512]; + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + if (!(rk = rd_kafka_new(type, conf, errstr, sizeof(errstr)))) { + if (log_level) { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - failed to create new producer: %s", errstr); + } + exit(1); + } + /* Add brokers */ + if (rd_kafka_brokers_add(rk, brokers) == 0) { + if (log_level) { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "php kafka - No valid brokers specified"); + } + exit(1); + } + /* Set up a message delivery report callback. + * It will be called once for each message, either on successful + * delivery to broker, or upon failure to deliver to broker. */ + if (type == RD_KAFKA_PRODUCER) + rd_kafka_conf_set_dr_cb(conf, kafka_produce_cb_simple); + rd_kafka_conf_set_error_cb(conf, kafka_err_cb); + + if (log_level) { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - using: %s", brokers); + } + } } -void kafka_produce(char* topic, char* msg, int msg_len) +int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len, long timeout) { + char errstr[512]; + rd_kafka_topic_t *rkt = NULL; + int partition = RD_KAFKA_PARTITION_UA; + rd_kafka_topic_conf_t *conf = NULL; + struct produce_cb_params pcb = {1, 0, 0, 0, 0, NULL}; - signal(SIGINT, kafka_stop); - signal(SIGPIPE, kafka_stop); + if (r == NULL) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "No connection provided to produce to topic %s", topic); + } + return -2; + } + + /* Topic configuration */ + conf = rd_kafka_topic_conf_new(); + + rd_kafka_topic_conf_set(conf,"produce.offset.report", "true", errstr, sizeof errstr ); + + char timeoutStr[64]; + snprintf(timeoutStr, 64, "%lu", timeout); + if (rd_kafka_topic_conf_set(conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog( + LOG_ERR, + "Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s", + timeout, + errstr + ); + } + rd_kafka_topic_conf_destroy(conf); + return -3; + } + //callback already set in kafka_set_connection + rkt = rd_kafka_topic_new(r, topic, conf); + if (!rkt) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "Failed to open topic %s", topic); + } + rd_kafka_topic_conf_destroy(conf); + return -1; + } + + //begin producing: + if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, msg, msg_len,NULL, 0,&pcb) == -1) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "Failed to produce message: %s", rd_kafka_err2str(rd_kafka_errno2err(errno))); + } + //handle delivery response (callback) + rd_kafka_poll(rk, 0); + rd_kafka_topic_destroy(rkt); + return -1; + } + rd_kafka_poll(rk, 0); + while(pcb.msg_count && rd_kafka_outq_len(r) > 0) + rd_kafka_poll(r, 10); + rd_kafka_topic_destroy(rkt); + return 0; +} + +int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report, long timeout) +{ + char errstr[512]; rd_kafka_topic_t *rkt; + struct produce_cb_params pcb = {msg_cnt, 0, 0, 0, 0, NULL}; + void *opaque; int partition = RD_KAFKA_PARTITION_UA; + int i, + err_cnt = 0; + if (report) + opaque = &pcb; + else + opaque = NULL; rd_kafka_topic_conf_t *topic_conf; - if(rk == NULL) { - char errstr[512]; - rd_kafka_conf_t *conf; + if (r == NULL) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "phpkafka - no connection to produce to topic: %s", topic); + } + return -2; + } - /* Kafka configuration */ - conf = rd_kafka_conf_new(); + /* Topic configuration */ + topic_conf = rd_kafka_topic_conf_new(); - if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) { - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - failed to create new producer: %s", errstr); - exit(1); + char timeoutStr[64]; + snprintf(timeoutStr, 64, "%lu", timeout); + if (rd_kafka_topic_conf_set(topic_conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog( + LOG_ERR, + "Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s", + timeout, + errstr + ); } + rd_kafka_topic_conf_destroy(topic_conf); + return -3; + } - /* Add brokers */ - if (rd_kafka_brokers_add(rk, brokers) == 0) { + /* Create topic */ + rkt = rd_kafka_topic_new(r, topic, topic_conf); + + //do we have VLA? + rd_kafka_message_t *messages = calloc(sizeof *messages, msg_cnt); + if (messages == NULL) + {//fallback to individual produce calls + for (i=0;i 0) + rd_kafka_poll(r, 10); - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - using: %s", brokers); + //set global to NULL again + rd_kafka_topic_destroy(rkt); + if (report) + err_cnt = pcb.err_count; + return err_cnt; +} + +int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report, long timeout) +{ + + char errstr[512]; + rd_kafka_topic_t *rkt; + struct produce_cb_params pcb = {1, 0, 0, 0, 0, NULL}; + void *opaque; + int partition = RD_KAFKA_PARTITION_UA; + + //decide whether to pass callback params or not... + if (report) + opaque = &pcb; + else + opaque = NULL; + + rd_kafka_topic_conf_t *topic_conf; + + if (r == NULL) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "phpkafka - no connection to produce to topic: %s", topic); + } + return -2; } /* Topic configuration */ topic_conf = rd_kafka_topic_conf_new(); + char timeoutStr[64]; + snprintf(timeoutStr, 64, "%lu", timeout); + if (rd_kafka_topic_conf_set(topic_conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog( + LOG_ERR, + "Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s", + timeout, + errstr + ); + } + rd_kafka_topic_conf_destroy(topic_conf); + return -3; + } + /* Create topic */ - rkt = rd_kafka_topic_new(rk, topic, topic_conf); + rkt = rd_kafka_topic_new(r, topic, topic_conf); if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, @@ -143,91 +632,501 @@ void kafka_produce(char* topic, char* msg, int msg_len) /* Message opaque, provided in * delivery report callback as * msg_opaque. */ - NULL) == -1) { - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - %% Failed to produce to topic %s " - "partition %i: %s", - rd_kafka_topic_name(rkt), partition, - rd_kafka_err2str( - rd_kafka_errno2err(errno))); - rd_kafka_poll(rk, 0); + opaque) == -1) { + if (log_level) { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - %% Failed to produce to topic %s " + "partition %i: %s", + rd_kafka_topic_name(rkt), partition, + rd_kafka_err2str( + rd_kafka_errno2err(errno))); + } + rd_kafka_topic_destroy(rkt); + return -1; } /* Poll to handle delivery reports */ - rd_kafka_poll(rk, 0); + rd_kafka_poll(r, 0); /* Wait for messages to be delivered */ - while (run && rd_kafka_outq_len(rk) > 0) - rd_kafka_poll(rk, 100); + while (report && pcb.msg_count && rd_kafka_outq_len(r) > 0) + rd_kafka_poll(r, 10); + //set global to NULL again rd_kafka_topic_destroy(rkt); + return 0; +} + +static +void offset_queue_consume(rd_kafka_message_t *message, void *opaque) +{ + struct consume_cb_params *params = opaque; + if (params->eop == 0) + return; + if (message->err) + { + params->error_count += 1; + if (params->auto_commit == 0) + rd_kafka_offset_store( + message->rkt, + message->partition, + message->offset == 0 ? 0 : message->offset -1 + ); + if (message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) + { + if (params->partition_offset[message->partition] == -2) + {//no previous message read from this partition + //set offset value to last possible value (-1 or last existing) + //reduce eop count + params->eop -= 1; + params->read_count += 1; + params->partition_offset[message->partition] = message->offset -1; + } + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, + "phpkafka - %% Consumer reached end of %s [%"PRId32"] " + "message queue at offset %"PRId64"\n", + rd_kafka_topic_name(message->rkt), + message->partition, message->offset); + } + } + return; + } + if (params->partition_offset[message->partition] == -1) + params->eop -= 1; + //we have an offset, save it + params->partition_offset[message->partition] = message->offset; + //tally read_count + params->read_count += 1; + if (params->auto_commit == 0) + rd_kafka_offset_store( + message->rkt, + message->partition, + message->offset == 0 ? 0 : message->offset -1 + ); +} + +static +void queue_consume(rd_kafka_message_t *message, void *opaque) +{ + struct consume_cb_params *params = opaque; + zval *return_value = params->return_value; + //all partitions EOF + if (params->eop < 1) + return; + //nothing more to read... + if (params->read_count == 0) + return; + if (message->err) + { + params->error_count += 1; + //if auto-commit is disabled: + if (params->auto_commit == 0) + //store offset + rd_kafka_offset_store( + message->rkt, + message->partition, + message->offset == 0 ? 0 : message->offset -1 + ); + if (message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) + { + if (params->partition_ends[message->partition] == 0) + { + params->eop -= 1; + params->partition_ends[message->partition] = 1; + } + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, + "phpkafka - %% Consumer reached end of %s [%"PRId32"] " + "message queue at offset %"PRId64"\n", + rd_kafka_topic_name(message->rkt), + message->partition, message->offset); + } + return; + } + //add_next_index_string(return_value, rd_kafka_message_errstr(message), 1); + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - %% Consume error for topic \"%s\" [%"PRId32"] " + "offset %"PRId64": %s\n", + rd_kafka_topic_name(message->rkt), + message->partition, + message->offset, + rd_kafka_message_errstr(message) + ); + } + return; + } + //only count successful reads! + //-1 means read all from offset until end + if (params->read_count != -1) + params->read_count -= 1; + //add message to return value (perhaps add as array -> offset + msg? + if (message->len > 0) { + add_next_index_stringl( + return_value, + (char *) message->payload, + (int) message->len, + 1 + ); + } else { + add_next_index_string(return_value, "", 1); + } + + //store offset if autocommit is disabled + if (params->auto_commit == 0) + rd_kafka_offset_store( + message->rkt, + message->partition, + message->offset + ); } static rd_kafka_message_t *msg_consume(rd_kafka_message_t *rkmessage, - void *opaque) { - if (rkmessage->err) { - if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, - "phpkafka - %% Consumer reached end of %s [%"PRId32"] " - "message queue at offset %"PRId64"\n", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, rkmessage->offset); - if (exit_eof) - run = 0; - return; - } - - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - %% Consume error for topic \"%s\" [%"PRId32"] " - "offset %"PRId64": %s\n", - rd_kafka_topic_name(rkmessage->rkt), - rkmessage->partition, - rkmessage->offset, - rd_kafka_message_errstr(rkmessage)); - return; - } - - //php_printf("%.*s\n", (int)rkmessage->len, (char *)rkmessage->payload); - return rkmessage; + void *opaque) +{ + int *run = opaque; + if (rkmessage->err) + { + *run = 0; + if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, + "phpkafka - %% Consumer reached end of %s [%"PRId32"] " + "message queue at offset %"PRId64"\n", + rd_kafka_topic_name(rkmessage->rkt), + rkmessage->partition, rkmessage->offset); + } + return NULL; + } + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - %% Consume error for topic \"%s\" [%"PRId32"] " + "offset %"PRId64": %s\n", + rd_kafka_topic_name(rkmessage->rkt), + rkmessage->partition, + rkmessage->offset, + rd_kafka_message_errstr(rkmessage) + ); + } + return NULL; + } + + return rkmessage; } -void kafka_consume(zval* return_value, char* topic, char* offset, int item_count) +//get topics + partition count +void kafka_get_topics(rd_kafka_t *r, zval *return_value) { + int i; + const struct rd_kafka_metadata *meta = NULL; + if (r == NULL) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "phpkafka - no connection to get topics"); + } + return; + } + if (RD_KAFKA_RESP_ERR_NO_ERROR == rd_kafka_metadata(r, 1, NULL, &meta, 200)) { + for (i=0;itopic_cnt;++i) { + add_assoc_long( + return_value, + meta->topics[i].topic, + (long) meta->topics[i].partition_cnt + ); + } + } + if (meta) { + rd_kafka_metadata_destroy(meta); + } +} - int read_counter = 0; +static +int kafka_partition_count(rd_kafka_t *r, const char *topic) +{ + rd_kafka_topic_t *rkt; + rd_kafka_topic_conf_t *conf; + int i;//C89 compliant + //connect as consumer if required + if (r == NULL) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, "phpkafka - no connection to get partition count for topic: %s", topic); + } + return -1; + } + /* Topic configuration */ + conf = rd_kafka_topic_conf_new(); + + /* Create topic */ + rkt = rd_kafka_topic_new(r, topic, conf); + //metadata API required rd_kafka_metadata_t** to be passed + const struct rd_kafka_metadata *meta = NULL; + if (RD_KAFKA_RESP_ERR_NO_ERROR == rd_kafka_metadata(r, 0, rkt, &meta, 200)) + i = (int) meta->topics->partition_cnt; + else + i = 0; + if (meta) { + rd_kafka_metadata_destroy(meta); + } + rd_kafka_topic_destroy(rkt); + return i; +} + +//get the available partitions for a given topic +void kafka_get_partitions(rd_kafka_t *r, zval *return_value, char *topic) +{ + //we need a connection! + if (r == NULL) + return; + int i, count = kafka_partition_count(r, topic); + for (i=0;itopics->partition_cnt * sizeof *values); + if (values == NULL) + { + *partitions = values;//possible corrupted pointer now + //free metadata, return error + rd_kafka_metadata_destroy(meta); + return -1; + } + //we need eop to reach 0, if there are 4 partitions, start at 3 (0, 1, 2, 3) + cb_params.eop = meta->topics->partition_cnt -1; + cb_params.partition_offset = values; + for (i=0;itopics->partition_cnt;++i) + { + //initialize: set to -2 for callback + values[i] = -2; + if (rd_kafka_consume_start_queue(rkt, meta->topics->partitions[i].id, RD_KAFKA_OFFSET_BEGINNING, rkqu)) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, + "Failed to start consuming topic %s [%"PRId32"]", + topic, meta->topics->partitions[i].id + ); + } + continue; + } + } + //eiter eop reached 0, or the read errors >= nr of partitions + //either way, we've consumed a message from each partition, and therefore, we're done + while(cb_params.eop && cb_params.error_count < meta->topics->partition_cnt) + rd_kafka_consume_callback_queue(rkqu, 100, offset_queue_consume, &cb_params); + //stop consuming for all partitions + for (i=0;itopics->partition_cnt;++i) + rd_kafka_consume_stop(rkt, meta->topics[0].partitions[i].id); + rd_kafka_queue_destroy(rkqu); + //do we need this poll here? + while(rd_kafka_outq_len(r) > 0) + rd_kafka_poll(r, 5); + + //let's be sure to pass along the correct values here... + *partitions = values; + i = meta->topics->partition_cnt; + } + if (meta) + rd_kafka_metadata_destroy(meta); + rd_kafka_topic_destroy(rkt); + return i; +} + +void kafka_consume_all(rd_kafka_t *rk, zval *return_value, const char *topic, const char *offset, int item_count) +{ + char errstr[512]; + rd_kafka_topic_t *rkt; + rd_kafka_topic_conf_t *conf; + const struct rd_kafka_metadata *meta = NULL; + rd_kafka_queue_t *rkqu = NULL; + int current, p, i = 0; + int32_t partition = 0; + int64_t start; + struct consume_cb_params cb_params = {item_count, return_value, NULL, 0, 0, 0}; + //check for NULL pointers, all arguments are required! + if (rk == NULL || return_value == NULL || topic == NULL || offset == NULL || strlen(offset) == 0) + return; - if (strlen(offset) != 0) { if (!strcmp(offset, "end")) - start_offset = RD_KAFKA_OFFSET_END; + start = RD_KAFKA_OFFSET_END; else if (!strcmp(offset, "beginning")) - start_offset = RD_KAFKA_OFFSET_BEGINNING; + start = RD_KAFKA_OFFSET_BEGINNING; else if (!strcmp(offset, "stored")) - start_offset = RD_KAFKA_OFFSET_STORED; + start = RD_KAFKA_OFFSET_STORED; else - start_offset = strtoll(offset, NULL, 10); - } + start = strtoll(offset, NULL, 10); - rd_kafka_topic_t *rkt; + /* Topic configuration */ + conf = rd_kafka_topic_conf_new(); - char errstr[512]; - rd_kafka_conf_t *conf; + /* Disable autocommit, queue_consume sets offsets automatically */ + if (rd_kafka_topic_conf_set(conf, "auto.commit.enable", "false", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog( + LOG_WARNING, + "failed to turn autocommit off consuming %d messages (start offset %"PRId64") from topic %s: %s", + item_count, + start, + topic, + errstr + ); + } + cb_params.auto_commit = 1; + } + /* Create topic */ + rkt = rd_kafka_topic_new(rk, topic, conf); + if (!rkt) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - Failed to read %s from %"PRId64" (%s)", topic, start, offset); + } + return; + } + rkqu = rd_kafka_queue_new(rk); + if (RD_KAFKA_RESP_ERR_NO_ERROR == rd_kafka_metadata(rk, 0, rkt, &meta, 5)) + { + p = meta->topics->partition_cnt; + cb_params.partition_ends = calloc(sizeof *cb_params.partition_ends, p); + if (cb_params.partition_ends == NULL) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - Failed to read %s from %"PRId64" (%s)", topic, start, offset); + } + rd_kafka_metadata_destroy(meta); + meta = NULL; + rd_kafka_queue_destroy(rkqu); + rd_kafka_topic_destroy(rkt); + return; + } + cb_params.eop = p; + for (i=0;itopics[0].partitions[i].id; + if (rd_kafka_consume_start_queue(rkt, partition, start, rkqu)) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_ERR, + "Failed to start consuming topic %s [%"PRId32"]: %s", + topic, partition, offset + ); + } + continue; + } + } + while(cb_params.read_count && cb_params.eop) + rd_kafka_consume_callback_queue(rkqu, 200, queue_consume, &cb_params); + free(cb_params.partition_ends); + cb_params.partition_ends = NULL; + for (i=0;itopics[0].partitions[i].id; + rd_kafka_consume_stop(rkt, partition); + } + rd_kafka_metadata_destroy(meta); + meta = NULL; + rd_kafka_queue_destroy(rkqu); + while(rd_kafka_outq_len(rk) > 0) + rd_kafka_poll(rk, 50); + rd_kafka_topic_destroy(rkt); + } + if (meta) + rd_kafka_metadata_destroy(meta); +} - /* Kafka configuration */ - conf = rd_kafka_conf_new(); +int kafka_consume(rd_kafka_t *r, zval* return_value, char* topic, char* offset, int item_count, int partition) +{ + int64_t start_offset = 0; + int read_counter = 0, + run = 1; + //nothing to consume? + if (item_count == 0) + return 0; + if (strlen(offset) != 0) + { + if (!strcmp(offset, "end")) + start_offset = RD_KAFKA_OFFSET_END; + else if (!strcmp(offset, "beginning")) + start_offset = RD_KAFKA_OFFSET_BEGINNING; + else if (!strcmp(offset, "stored")) + start_offset = RD_KAFKA_OFFSET_STORED; + else + { + start_offset = strtoll(offset, NULL, 10); + if (start_offset < 1) + return -1; + } - /* Create Kafka handle */ - if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) { - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - failed to create new consumer: %s", errstr); - exit(1); } + rd_kafka_topic_t *rkt; - /* Add brokers */ - if (rd_kafka_brokers_add(rk, brokers) == 0) { + if (r == NULL) + { + if (log_level) + { openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "php kafka - No valid brokers specified"); - exit(1); + syslog(LOG_ERR, "phpkafka - no connection to consume from topic: %s", topic); + } + return -2; } rd_kafka_topic_conf_t *topic_conf; @@ -236,55 +1135,108 @@ void kafka_consume(zval* return_value, char* topic, char* offset, int item_count topic_conf = rd_kafka_topic_conf_new(); /* Create topic */ - rkt = rd_kafka_topic_new(rk, topic, topic_conf); - - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - start_offset: %d and offset passed: %d", start_offset, offset); - - /* Start consuming */ - if (rd_kafka_consume_start(rkt, partition, start_offset) == -1) { - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - %% Failed to start consuming: %s", - rd_kafka_err2str(rd_kafka_errno2err(errno))); - exit(1); + rkt = rd_kafka_topic_new(r, topic, topic_conf); + if (rkt == NULL) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog( + LOG_ERR, + "Failed to consume from topic %s: %s", + topic, + rd_kafka_err2str( + rd_kafka_errno2err(errno) + ) + ); + } + return -3; } + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - start_offset: %"PRId64" and offset passed: %s", start_offset, offset); - if (item_count != 0) { - read_counter = item_count; } - - while (run) { - if (item_count != 0 && read_counter >= 0) { - read_counter--; - openlog("phpkafka", 0, LOG_USER); - syslog(LOG_INFO, "phpkafka - read_counter: %d", read_counter); - if (read_counter == -1) { - run = 0; - continue; + /* Start consuming */ + if (rd_kafka_consume_start(rkt, partition, start_offset) == -1) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "phpkafka - %% Failed to start consuming: %s", + rd_kafka_err2str(rd_kafka_errno2err(errno))); } - } - - rd_kafka_message_t *rkmessage; + return -4; + } - /* Consume single message. - * See rdkafka_performance.c for high speed - * consuming of messages. */ - rkmessage = rd_kafka_consume(rkt, partition, 1000); - if (!rkmessage) /* timeout */ - continue; + /** + * Keep reading until run == 0, or read_counter == item_count + */ + for (read_counter=0;read_counter!=item_count;++read_counter) + { + if (run == 0) + break; + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "Consuming, count at %d (of %d - run: %d)", + read_counter, + item_count, + run + ); + } + rd_kafka_message_t *rkmessage = NULL, + *rkmessage_return = NULL; - rd_kafka_message_t *rkmessage_return; - rkmessage_return = msg_consume(rkmessage, NULL); - char payload[(int)rkmessage_return->len]; - sprintf(payload, "%.*s", (int)rkmessage_return->len, (char *)rkmessage_return->payload); - add_index_string(return_value, (int)rkmessage_return->offset, payload, 1); + /* Consume single message. + * See rdkafka_performance.c for high speed + * consuming of messages. */ + rkmessage = rd_kafka_consume(rkt, partition, 1000); + //timeout ONLY if error didn't cause run to be 0 + if (!rkmessage) + { + //break on timeout, makes second call redundant + if (errno == ETIMEDOUT) + { + if (log_level) + { + openlog("phpkafka", 0, LOG_USER); + syslog(LOG_INFO, "Consumer timed out, count at %d (of %d) stop consuming after %d messages", + read_counter, + item_count, + read_counter +1 + ); + } + break; + } + continue; + } - /* Return message to rdkafka */ - rd_kafka_message_destroy(rkmessage); + rkmessage_return = msg_consume(rkmessage, &run); + if (rkmessage_return != NULL) + { + if ((int) rkmessage_return->len > 0) + { + add_index_stringl( + return_value, + (int) rkmessage_return->offset, + (char *) rkmessage_return->payload, + (int) rkmessage_return->len, + 1 + ); + } + else + { + add_index_string(return_value, (int) rkmessage_return->offset, "", 1); + } + } + /* Return message to rdkafka */ + rd_kafka_message_destroy(rkmessage); } /* Stop consuming */ rd_kafka_consume_stop(rkt, partition); rd_kafka_topic_destroy(rkt); - rd_kafka_destroy(rk); -} \ No newline at end of file + return 0; +} diff --git a/kafka.h b/kafka.h index 703e92d..abdf57c 100644 --- a/kafka.h +++ b/kafka.h @@ -1,5 +1,5 @@ /** - * Copyright 2013-2014 Patrick Reilly + * Copyright 2015 Elias Van Ootegem * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,15 +12,40 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Special thanks to Patrick Reilly and Aleksandar Babic for their work + * On which this extension was actually built. */ #ifndef __KAFKA_H__ #define __KAFKA_H__ +#include "librdkafka/rdkafka.h" + +typedef struct connection_params_s { + rd_kafka_type_t type; + int log_level; + int reporting; + char *compression; + union { + char *retry_count; + char *queue_buffer; + }; + char *retry_interval; +} kafka_connection_params; void kafka_setup(char *brokers); +void kafka_set_log_level(int ll); void kafka_set_partition(int partition); -void kafka_produce(char* topic, char* msg, int msg_len); -void kafka_consume(zval* return_value, char* topic, char* offset, int item_count); -void kafka_destroy(); +int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report, long timeout); +int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len, long timeout); +int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report, long timeout); +rd_kafka_t *kafka_set_connection(rd_kafka_type_t type, const char *b, int report_level, const char *compression); +rd_kafka_t *kafka_get_connection(kafka_connection_params params, const char *brokers); +int kafka_consume(rd_kafka_t *r, zval* return_value, char* topic, char* offset, int item_count, int partition); +void kafka_get_partitions(rd_kafka_t *r, zval *return_value, char *topic); +int kafka_partition_offsets(rd_kafka_t *r, long **partitions, const char *topic); +void kafka_get_topics(rd_kafka_t *r,zval *return_value); +void kafka_consume_all(rd_kafka_t *rk, zval *return_value, const char *topic, const char *offset, int item_count); +void kafka_destroy(rd_kafka_t *r, int timeout); -#endif \ No newline at end of file +#endif diff --git a/php_kafka.c b/php_kafka.c index 8942b67..6950921 100644 --- a/php_kafka.c +++ b/php_kafka.c @@ -1,5 +1,5 @@ /** - * Copyright 2013-2014 Patrick Reilly. + * Copyright 2015 Elias Van Ootegem. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,6 +12,9 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Special thanks to Patrick Reilly and Aleksandar Babic for their work + * On which this extension was actually built. */ #ifdef HAVE_CONFIG_H @@ -21,17 +24,120 @@ #include #include #include "kafka.h" +#include "zend_exceptions.h" +#include "zend_hash.h" +#include +#include + +#ifdef COMPILE_DL_KAFKA +ZEND_GET_MODULE(kafka) +#endif +#define REGISTER_KAFKA_CLASS_CONST_STRING(ce, name, value) \ + zend_declare_class_constant_stringl(ce, name, sizeof(name)-1, value, sizeof(value)-1) +#define REGISTER_KAFKA_CLASS_CONST_LONG(ce, name, value) \ + zend_declare_class_constant_long(ce, name, sizeof(name)-1, value) +#define REGISTER_KAFKA_CLASS_CONST(ce, c_name, type) \ + REGISTER_KAFKA_CLASS_CONST_ ## type(ce, #c_name, PHP_KAFKA_ ## c_name) +#ifndef BASE_EXCEPTION +#if (PHP_MAJOR_VERSION < 5) || ( ( PHP_MAJOR_VERSION == 5 ) && (PHP_MINOR_VERSION < 2) ) +#define BASE_EXCEPTION zend_exception_get_default() +#else +#define BASE_EXCEPTION zend_exception_get_default(TSRMLS_C) +#endif +#endif + +#define GET_KAFKA_CONNECTION(varname, thisObj) \ + kafka_connection *varname = (kafka_connection *) zend_object_store_get_object( \ + thisObj TSRMLS_CC \ + ) + +/* {{{ arginfo */ +ZEND_BEGIN_ARG_INFO_EX(arginf_kafka__constr, 0, 0, 1) + ZEND_ARG_INFO(0, brokers) + ZEND_ARG_INFO(0, options) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO(arginf_kafka_set_options, 0) + ZEND_ARG_INFO(0, options) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_set_partition, 0, 0, 1) + ZEND_ARG_INFO(0, partition) + ZEND_ARG_INFO(0, mode) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO(arginf_kafka_set_compression, 0) + ZEND_ARG_INFO(0, compression) +ZEND_END_ARG_INFO() -/* decalre the class entry */ +ZEND_BEGIN_ARG_INFO(arginf_kafka_set_log_level, 0) + ZEND_ARG_INFO(0, logLevel) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO(arginf_kafka_get_partitions_for_topic, 0) + ZEND_ARG_INFO(0, topic) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO(arginf_kafka_set_get_partition, 0) + ZEND_ARG_INFO(0, mode) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_produce, 0, 0, 2) + ZEND_ARG_INFO(0, topic) + ZEND_ARG_INFO(0, message) + ZEND_ARG_INFO(0, timeout) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_produce_batch, 0, 0, 2) + ZEND_ARG_INFO(0, topic) + ZEND_ARG_INFO(0, messages) + ZEND_ARG_INFO(0, timeout) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_consume, 0, 0, 2) + ZEND_ARG_INFO(0, topic) + ZEND_ARG_INFO(0, offset) + ZEND_ARG_INFO(0, messageCount) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_is_conn, 0, 0, 0) + ZEND_ARG_INFO(0, mode) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO(arginf_kafka_void, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginf_kafka_disconnect, 0, 0, 0) + ZEND_ARG_INFO(0, mode) +ZEND_END_ARG_INFO() + +/* }}} end arginfo */ + +/* decalre the class entries */ zend_class_entry *kafka_ce; +zend_class_entry *kafka_exception; /* the method table */ /* each method can have its own parameters and visibility */ static zend_function_entry kafka_functions[] = { - PHP_ME(Kafka, __construct, NULL, ZEND_ACC_CTOR | ZEND_ACC_PUBLIC) - PHP_ME(Kafka, set_partition, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Kafka, produce, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Kafka, consume, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, __construct, arginf_kafka__constr, ZEND_ACC_CTOR | ZEND_ACC_PUBLIC) + PHP_ME(Kafka, __destruct, arginf_kafka_void, ZEND_ACC_DTOR | ZEND_ACC_PUBLIC) + PHP_ME(Kafka, setCompression, arginf_kafka_set_compression, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, getCompression, arginf_kafka_void, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, set_partition, arginf_kafka_set_partition, ZEND_ACC_PUBLIC|ZEND_ACC_DEPRECATED) + PHP_ME(Kafka, setPartition, arginf_kafka_set_partition, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, getPartition, arginf_kafka_set_get_partition, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, setLogLevel, arginf_kafka_set_log_level, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, getPartitionsForTopic, arginf_kafka_get_partitions_for_topic, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, getPartitionOffsets, arginf_kafka_get_partitions_for_topic, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, setBrokers, arginf_kafka__constr, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, setOptions, arginf_kafka_set_options, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, getTopics, arginf_kafka_void, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, disconnect, arginf_kafka_disconnect, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, isConnected, arginf_kafka_is_conn, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, produce, arginf_kafka_produce, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, produceBatch, arginf_kafka_produce_batch, ZEND_ACC_PUBLIC) + PHP_ME(Kafka, consume, arginf_kafka_consume, ZEND_ACC_PUBLIC) {NULL,NULL,NULL} /* Marks the end of function entries */ }; @@ -48,91 +154,1148 @@ zend_module_entry kafka_module_entry = { STANDARD_MODULE_PROPERTIES }; -#ifdef COMPILE_DL_KAFKA -ZEND_GET_MODULE(kafka) -#endif - - PHP_MINIT_FUNCTION(kafka) { - zend_class_entry ce; + zend_class_entry ce, + ce_ex; INIT_CLASS_ENTRY(ce, "Kafka", kafka_functions); kafka_ce = zend_register_internal_class(&ce TSRMLS_CC); + INIT_CLASS_ENTRY(ce_ex, "KafkaException", NULL); + kafka_exception = zend_register_internal_class_ex( + &ce_ex, + BASE_EXCEPTION, + NULL TSRMLS_CC + ); + //do not allow people to extend this class, make it final + kafka_ce->create_object = create_kafka_connection; + kafka_ce->ce_flags |= ZEND_ACC_FINAL_CLASS; + //offset constants (consume) + REGISTER_KAFKA_CLASS_CONST(kafka_ce, OFFSET_BEGIN, STRING); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, OFFSET_END, STRING); + //compression mode constants + REGISTER_KAFKA_CLASS_CONST(kafka_ce, COMPRESSION_NONE, STRING); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, COMPRESSION_GZIP, STRING); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, COMPRESSION_SNAPPY, STRING); + //global log-mode constants TODO: refactor to ERRMODE constants + REGISTER_KAFKA_CLASS_CONST(kafka_ce, LOG_ON, LONG); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, LOG_OFF, LONG); + //connection mode constants + REGISTER_KAFKA_CLASS_CONST(kafka_ce, MODE_CONSUMER, LONG); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, MODE_PRODUCER, LONG); + //random partition constant + REGISTER_KAFKA_CLASS_CONST(kafka_ce, PARTITION_RANDOM, LONG); + //config constants: + REGISTER_KAFKA_CLASS_CONST(kafka_ce, RETRY_COUNT, LONG); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, RETRY_INTERVAL, LONG); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, CONFIRM_DELIVERY, LONG); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, QUEUE_BUFFER_SIZE, LONG); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, COMPRESSION_MODE, LONG); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, LOGLEVEL, LONG); + //confirmation value constants + REGISTER_KAFKA_CLASS_CONST(kafka_ce, CONFIRM_OFF, LONG); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, CONFIRM_BASIC, LONG); + REGISTER_KAFKA_CLASS_CONST(kafka_ce, CONFIRM_EXTENDED, LONG); + return SUCCESS; +} + +PHP_RSHUTDOWN_FUNCTION(kafka) +{ return SUCCESS; } -PHP_RSHUTDOWN_FUNCTION(kafka) { return SUCCESS; } -PHP_RINIT_FUNCTION(kafka) { return SUCCESS; } -PHP_MSHUTDOWN_FUNCTION(kafka) { - kafka_destroy(); + +PHP_RINIT_FUNCTION(kafka) +{ return SUCCESS; } +PHP_MSHUTDOWN_FUNCTION(kafka) +{ + return SUCCESS; +} + +zend_object_value create_kafka_connection(zend_class_entry *class_type TSRMLS_DC) +{ + zend_object_value retval; + kafka_connection *intern; + zval *tmp; + + // allocate the struct we're going to use + intern = emalloc(sizeof *intern); + memset(intern, 0, sizeof *intern); + //init partitions to random partitions + intern->consumer_partition = PHP_KAFKA_PARTITION_RANDOM; + intern->producer_partition = PHP_KAFKA_PARTITION_RANDOM; + //set default values + //basic confirmation (wait for success callback) + intern->delivery_confirm_mode = PHP_KAFKA_CONFIRM_BASIC; + //logging = default on (while in development, at least) + intern->log_level = PHP_KAFKA_LOG_ON; + + zend_object_std_init(&intern->std, class_type TSRMLS_CC); + //add properties table +#if PHP_VERSION_ID < 50399 + zend_hash_copy( + intern->std.properties, &class_type->default_properties, + (copy_ctor_func_t)zval_add_ref, + (void *)&tmp, + sizeof tmp + ); +#else + object_properties_init(&intern->std, class_type); +#endif + + // create a destructor for this struct + retval.handle = zend_objects_store_put( + intern, + (zend_objects_store_dtor_t) zend_objects_destroy_object, + free_kafka_connection, + NULL TSRMLS_CC + ); + retval.handlers = zend_get_std_object_handlers(); + + return retval; +} + +//clean current connections +void free_kafka_connection(void *object TSRMLS_DC) +{ + int interval = 1; + kafka_connection *connection = ((kafka_connection *) object); + //no confirmation, wait to close connection a bit longer, for what it's worth + if (connection->delivery_confirm_mode == PHP_KAFKA_CONFIRM_OFF) + interval = 50; + + if (connection->brokers) + efree(connection->brokers); + if (connection->compression) + efree(connection->compression); + if (connection->queue_buffer) + efree(connection->queue_buffer); + if (connection->retry_count) + efree(connection->retry_count); + if (connection->retry_interval) + efree(connection->retry_interval); + if (connection->consumer != NULL) + kafka_destroy( + connection->consumer, + 1 + ); + if (connection->producer != NULL) + kafka_destroy( + connection->producer, + interval + ); + efree(connection); +} + +static +int is_number(const char *str) +{ + while (*str != '\0') + { + if (!isdigit(*str)) + return 0; + ++str; + } + return 1; +} + +//parse connection config array, and update connection struct +static int parse_options_array(zval *arr, kafka_connection **conn) +{ + zval **entry; + char *assoc_key; + int key_len; + long idx; + HashPosition pos; + //make life easier, dereference struct + kafka_connection *connection = *conn; + zend_hash_internal_pointer_reset_ex(Z_ARRVAL_P(arr), &pos); + while (zend_hash_get_current_data_ex(Z_ARRVAL_P(arr), (void **)&entry, &pos) == SUCCESS) + { + if (zend_hash_get_current_key_ex(Z_ARRVAL_P(arr), &assoc_key, &key_len, &idx, 0, &pos) == HASH_KEY_IS_STRING) + { + zend_throw_exception(kafka_exception, "Invalid option key, use class constants", 0 TSRMLS_CC); + return -1; + } + else + { + char tmp[128]; + switch (idx) + { + case PHP_KAFKA_RETRY_COUNT: + if (Z_TYPE_PP(entry) == IS_STRING && is_number(Z_STRVAL_PP(entry))) + { + if (connection->retry_count) + efree(connection->retry_count); + connection->retry_count = estrdup(Z_STRVAL_PP(entry)); + } + else if (Z_TYPE_PP(entry) == IS_LONG) + { + if (connection->retry_count) + efree(connection->retry_count); + snprintf(tmp, 128, "%d", Z_LVAL_PP(entry)); + connection->retry_count = estrdup(tmp); + } + else + { + zend_throw_exception(kafka_exception, "Invalid value for Kafka::RETRY_COUNT option, expected numeric value", 0 TSRMLS_CC); + return -1; + } + break; + case PHP_KAFKA_RETRY_INTERVAL: + if (Z_TYPE_PP(entry) == IS_STRING && is_number(Z_STRVAL_PP(entry))) + { + if (connection->retry_interval) + efree(connection->retry_interval); + connection->retry_interval = estrdup(Z_STRVAL_PP(entry)); + } + else if (Z_TYPE_PP(entry) == IS_LONG) + { + if (connection->retry_interval) + efree(connection->retry_interval); + snprintf(tmp, 128, "%d", Z_LVAL_PP(entry)); + connection->retry_interval = estrdup(tmp); + } + else + { + zend_throw_exception(kafka_exception, "Invalid value for Kafka::RETRY_INTERVAL option, expected numeric value", 0 TSRMLS_CC); + return -1; + } + break; + case PHP_KAFKA_CONFIRM_DELIVERY: + if ( + Z_TYPE_PP(entry) != IS_LONG + || + ( + Z_LVAL_PP(entry) != PHP_KAFKA_CONFIRM_OFF + && + Z_LVAL_PP(entry) != PHP_KAFKA_CONFIRM_BASIC + && + Z_LVAL_PP(entry) != PHP_KAFKA_CONFIRM_EXTENDED + ) + ) + { + zend_throw_exception(kafka_exception, "Invalid value for Kafka::CONFIRM_DELIVERY, use Kafka::CONFIRM_* constants", 0 TSRMLS_CC); + return -1; + } + connection->delivery_confirm_mode = Z_LVAL_PP(entry); + break; + case PHP_KAFKA_QUEUE_BUFFER_SIZE: + if (Z_TYPE_PP(entry) == IS_STRING && is_number(Z_STRVAL_PP(entry))) + { + if (connection->queue_buffer) + efree(connection->queue_buffer); + connection->queue_buffer = estrdup(Z_STRVAL_PP(entry)); + } + else if (Z_TYPE_PP(entry) == IS_LONG) + { + if (connection->queue_buffer) + efree(connection->queue_buffer); + snprintf(tmp, 128, "%d", Z_LVAL_PP(entry)); + connection->queue_buffer = estrdup(tmp); + } + else + { + zend_throw_exception(kafka_exception, "Invalid value for Kafka::QUEUE_BUFFER_SIZE, expected numeric value", 0 TSRMLS_CC); + return -1; + } + break; + case PHP_KAFKA_COMPRESSION_MODE: + if (Z_TYPE_PP(entry) != IS_STRING) + { + zend_throw_exception(kafka_exception, "Invalid type for Kafka::COMPRESSION_MODE option, use Kafka::COMPRESSION_* constants", 0 TSRMLS_CC); + return -1; + } + if ( + !strcmp(Z_STRVAL_PP(entry), PHP_KAFKA_COMPRESSION_GZIP) + && + !strcmp(Z_STRVAL_PP(entry), PHP_KAFKA_COMPRESSION_NONE) + && + !strcmp(Z_STRVAL_PP(entry), PHP_KAFKA_COMPRESSION_SNAPPY) + ) { + zend_throw_exception(kafka_exception, "Invalid value for Kafka::COMPRESSION_MODE, use Kafka::COMPRESSION_* constants", 0 TSRMLS_CC); + return -1; + } + if (connection->compression) + efree(connection->compression); + connection->compression = estrdup(Z_STRVAL_PP(entry)); + break; + case PHP_KAFKA_LOGLEVEL: + if (Z_TYPE_PP(entry) != IS_LONG || + (Z_LVAL_PP(entry) != PHP_KAFKA_LOG_OFF && Z_LVAL_PP(entry) != PHP_KAFKA_LOG_ON)) + { + zend_throw_exception(kafka_exception, "Invalid value for Kafka::LOGLEVEL option, use Kafka::LOG_* constants", 0 TSRMLS_CC); + return -1; + } + connection->log_level = Z_LVAL_PP(entry); + break; + } + } + zend_hash_move_forward_ex(Z_ARRVAL_P(arr), &pos); + } + return 0; +} + +/** {{{ proto void DOMDocument::__construct( string $brokers [, array $options = null]); + Constructor, expects a comma-separated list of brokers to connect to +*/ PHP_METHOD(Kafka, __construct) { - char *brokers = "localhost:9092"; - int brokers_len; + zval *arr = NULL; + char *brokers = NULL; + int brokers_len = 0; + kafka_connection *connection = (kafka_connection *) zend_object_store_get_object( + getThis() TSRMLS_CC + ); - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|s", - &brokers, &brokers_len) == FAILURE) { + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|a", + &brokers, &brokers_len, &arr) == FAILURE) { return; } - + if (arr) + { + if (parse_options_array(arr, &connection)) + return;//we've thrown an exception + } + connection->brokers = estrdup(brokers); + kafka_set_log_level(connection->log_level); kafka_connect(brokers); } +/* }}} end Kafka::__construct */ + +/* {{{ proto bool Kafka::isConnected( [ int $mode ] ) + returns true if kafka connection is active, fals if not + Mode defaults to current active mode +*/ +PHP_METHOD(Kafka, isConnected) +{ + zval *mode = NULL, + *obj = getThis(); + long tmp_val = -1; + rd_kafka_type_t type; + GET_KAFKA_CONNECTION(k, obj); + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|z", &mode) == FAILURE) + return; + if (mode) + { + if (Z_TYPE_P(mode) == IS_LONG) + tmp_val = Z_LVAL_P(mode); + if (tmp_val != PHP_KAFKA_MODE_CONSUMER && tmp_val != PHP_KAFKA_MODE_PRODUCER) + { + zend_throw_exception( + kafka_exception, + "invalid argument passed to Kafka::isConnected, use Kafka::MODE_* constants", + 0 TSRMLS_CC + ); + return; + } + if (tmp_val == PHP_KAFKA_MODE_CONSUMER) + type = RD_KAFKA_CONSUMER; + else + type = RD_KAFKA_PRODUCER; + } + else + type = k->rk_type; + if (type == RD_KAFKA_CONSUMER) + { + if (k->consumer != NULL) + { + RETURN_TRUE; + } + RETURN_FALSE; + + } + if (k->producer != NULL) + { + RETURN_TRUE; + } + RETURN_FALSE; +} +/* }}} end bool Kafka::isConnected */ + +/* {{{ proto void Kafka::__destruct( void ) + constructor, disconnects kafka +*/ +PHP_METHOD(Kafka, __destruct) +{ + int interval = 1; + kafka_connection *connection = (kafka_connection *) zend_object_store_get_object( + getThis() TSRMLS_CC + ); + if (connection->delivery_confirm_mode == PHP_KAFKA_CONFIRM_OFF) + interval = 25; + if (connection->brokers) + efree(connection->brokers); + if (connection->queue_buffer) + efree(connection->queue_buffer); + if (connection->retry_count) + efree(connection->retry_count); + if (connection->retry_interval) + efree(connection->retry_interval); + if (connection->compression) + efree(connection->compression); + if (connection->consumer != NULL) + kafka_destroy( + connection->consumer, + 1 + ); + if (connection->producer != NULL) + kafka_destroy( + connection->producer, + interval + ); + connection->producer = NULL; + connection->brokers = NULL; + connection->compression = NULL; + connection->consumer = NULL; + connection->queue_buffer = connection->retry_count = connection->retry_interval = NULL; + connection->delivery_confirm_mode = 0; + connection->consumer_partition = connection->producer_partition = PHP_KAFKA_PARTITION_RANDOM; +} +/* }}} end Kafka::__destruct */ +/* {{{ proto Kafka Kafka::set_partition( int $partition [, int $mode ] ); + Set partition (used by consume method) + This method is deprecated, in favour of the more PSR-compliant + Kafka::setPartition +*/ PHP_METHOD(Kafka, set_partition) { - zval *partition; + zval *partition, + *mode = NULL, + *obj = getThis(); + long p_value; + GET_KAFKA_CONNECTION(connection, obj); + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|z", &partition, &mode) == FAILURE) + return; + if (Z_TYPE_P(partition) != IS_LONG || (mode && Z_TYPE_P(mode) != IS_LONG)) { + zend_throw_exception(kafka_exception, "Partition and/or mode is expected to be an int", 0 TSRMLS_CC); + return; + } + if (mode) + { + if (Z_LVAL_P(mode) != PHP_KAFKA_MODE_CONSUMER && Z_LVAL_P(mode) != PHP_KAFKA_MODE_PRODUCER) + { + zend_throw_exception( + kafka_exception, + "invalid mode argument passed to Kafka::setPartition, use Kafka::MODE_* constants", + 0 TSRMLS_CC + ); + return; + } + } + p_value = Z_LVAL_P(partition); + if (p_value < -1) + { + zend_throw_exception( + kafka_exception, + "invalid partition passed to Kafka::setPartition, partition value should be >= 0 or Kafka::PARTION_RANDOM", + 0 TSRMLS_CC + ); + return; + } + p_value = p_value == -1 ? PHP_KAFKA_PARTITION_RANDOM : p_value; + if (!mode) + { + connection->consumer_partition = p_value; + connection->producer_partition = p_value; + kafka_set_partition(p_value); + } + else + { + if (Z_LVAL_P(mode) != PHP_KAFKA_MODE_CONSUMER) + connection->producer_partition = p_value; + else + connection->consumer_partition = p_value; + } + //return $this + RETURN_ZVAL(getThis(), 1, 0); +} +/* }}} end Kafka::set_partition */ - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|z", - &partition) == FAILURE) { +/* {{{ proto Kafka Kafka::setLogLevel( mixed $logLevel ) + toggle syslogging on or off use Kafka::LOG_* constants +*/ +PHP_METHOD(Kafka, setLogLevel) +{ + zval *log_level, + *obj = getThis(); + GET_KAFKA_CONNECTION(connection, obj); + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z", &log_level) == FAILURE) + { + return;//? + } + if (Z_TYPE_P(log_level) != IS_LONG) { + zend_throw_exception(kafka_exception, "Kafka::setLogLevel expects argument to be an int", 0 TSRMLS_CC); + return; + } + if ( + Z_LVAL_P(log_level) != PHP_KAFKA_LOG_ON + && + Z_LVAL_P(log_level) != PHP_KAFKA_LOG_OFF + ) { + zend_throw_exception(kafka_exception, "Invalid argument, use Kafka::LOG_* constants", 0 TSRMLS_CC); + return; + } + connection->log_level = Z_LVAL_P(log_level); + kafka_set_log_level(connection->log_level); + RETURN_ZVAL(getThis(), 1, 0); +} +/* }}} end Kafka::setLogLevel */ + +/* {{{ proto Kafka Kafka::setCompression( string $compression ) + * Enable compression for produced messages + */ +PHP_METHOD(Kafka, setCompression) +{ + zval *obj = getThis(); + char *arg; + int arg_len; + GET_KAFKA_CONNECTION(connection, obj); + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &arg, &arg_len) == FAILURE) + { + return; + } + //if valid compression constant was used... + if ( + !strcmp(arg, PHP_KAFKA_COMPRESSION_GZIP) + || + !strcmp(arg, PHP_KAFKA_COMPRESSION_NONE) + || + !strcmp(arg, PHP_KAFKA_COMPRESSION_SNAPPY) + ) + { + if (connection->compression || strcmp(connection->compression, arg)) + { + //close connections, if any, currently only use compression for producers + if (connection->producer) + kafka_destroy(connection->producer, 1); + connection->producer = NULL; + connection->producer_partition = PHP_KAFKA_PARTITION_RANDOM; + connection->compression = estrdup(arg); + } + } + else + { + zend_throw_exception(kafka_exception, "Invalid argument, use Kafka::COMPRESSION_* constants", 0 TSRMLS_CC); + } + RETURN_ZVAL(obj, 1, 0); +} +/* }}} end proto Kafka::setCompression */ + +/* {{{ proto string Kafka::getCompression( void ) + * Get type of compression that is currently used + */ +PHP_METHOD(Kafka, getCompression) +{ + zval *obj = getThis(); + GET_KAFKA_CONNECTION(connection, obj); + if (!connection->compression) + RETURN_STRING(PHP_KAFKA_COMPRESSION_NONE, 1); + RETURN_STRING(connection->compression, 1); +} +/* }}} end proto Kafka::getCompression */ + +/* {{{ proto Kafka Kafka::setPartition( int $partition [, int $mode ] ); + Set partition to use for Kafka::consume calls +*/ +PHP_METHOD(Kafka, setPartition) +{ + zval *partition, + *mode = NULL, + *obj = getThis(); + long p_value; + GET_KAFKA_CONNECTION(connection, obj); + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|z", &partition, &mode) == FAILURE) + return; + if (Z_TYPE_P(partition) != IS_LONG || (mode && Z_TYPE_P(mode) != IS_LONG)) { + zend_throw_exception(kafka_exception, "Partition and/or mode is expected to be an int", 0 TSRMLS_CC); + return; + } + if (mode) + { + if (Z_LVAL_P(mode) != PHP_KAFKA_MODE_CONSUMER && Z_LVAL_P(mode) != PHP_KAFKA_MODE_PRODUCER) + { + zend_throw_exception( + kafka_exception, + "invalid mode argument passed to Kafka::setPartition, use Kafka::MODE_* constants", + 0 TSRMLS_CC + ); + return; + } + } + p_value = Z_LVAL_P(partition); + if (p_value < -1) + { + zend_throw_exception( + kafka_exception, + "invalid partition passed to Kafka::setPartition, partition value should be >= 0 or Kafka::PARTION_RANDOM", + 0 TSRMLS_CC + ); return; } + p_value = p_value == -1 ? PHP_KAFKA_PARTITION_RANDOM : p_value; + if (!mode) + { + connection->consumer_partition = p_value; + connection->producer_partition = p_value; + kafka_set_partition(p_value); + } + else + { + if (Z_LVAL_P(mode) != PHP_KAFKA_MODE_CONSUMER) + connection->producer_partition = p_value; + else + connection->consumer_partition = p_value; + } + //return $this + RETURN_ZVAL(getThis(), 1, 0); +} +/* }}} end Kafka::setPartition */ - if (Z_TYPE_P(partition) == IS_LONG) { - kafka_set_partition(Z_LVAL_P(partition)); +/* {{{ proto int Kafka::getPartition( int $mode ) + Get partition for connection (consumer/producer) +*/ +PHP_METHOD(Kafka, getPartition) +{ + zval *obj = getThis(), + *arg = NULL; + GET_KAFKA_CONNECTION(connection, obj); + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z", &arg) == FAILURE) + return; + if (Z_TYPE_P(arg) != IS_LONG || (Z_LVAL_P(arg) != PHP_KAFKA_MODE_CONSUMER && Z_LVAL_P(arg) != PHP_KAFKA_MODE_PRODUCER)) + { + zend_throw_exception(kafka_exception, "Invalid argument passed to Kafka::getPartition, use Kafka::MODE_* constants", 0 TSRMLS_CC); + return; } + if (Z_LVAL_P(arg) == PHP_KAFKA_MODE_CONSUMER) + RETURN_LONG(connection->consumer_partition); + RETURN_LONG(connection->producer_partition); } +/* }}} end proto Kafka::getPartition */ +/* {{{ proto array Kafka::getTopics( void ) + Get all existing topics +*/ +PHP_METHOD(Kafka, getTopics) +{ + zval *obj = getThis(); + GET_KAFKA_CONNECTION(connection, obj); + if (connection->brokers == NULL && connection->consumer == NULL) + { + zend_throw_exception(kafka_exception, "No brokers to get topics from", 0 TSRMLS_CC); + return; + } + if (connection->consumer == NULL) + { + kafka_connection_params config; + config.type = RD_KAFKA_CONSUMER; + config.log_level = connection->log_level; + config.queue_buffer = connection->queue_buffer; + config.compression = NULL; + connection->consumer = kafka_get_connection(config, connection->brokers); + if (connection->consumer == NULL) + { + zend_throw_exception(kafka_exception, "Failed to connect to kafka", 0 TSRMLS_CC); + return; + } + connection->rk_type = RD_KAFKA_CONSUMER; + } + array_init(return_value); + kafka_get_topics(connection->consumer, return_value); +} +/* }}} end Kafka::getTopics */ + +/* {{{ proto Kafka Kafka::setBrokers ( string $brokers [, array $options = null ] ) + Set brokers on-the-fly +*/ +PHP_METHOD(Kafka, setBrokers) +{ + zval *arr = NULL, + *obj = getThis(); + char *brokers; + int brokers_len; + GET_KAFKA_CONNECTION(connection, obj); + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|a", + &brokers, &brokers_len, &arr) == FAILURE) { + return; + } + //if array is passed, parse it, return if an exception was thrown... + if (arr && parse_options_array(arr, &connection)) + return; + if (connection->consumer) + kafka_destroy(connection->consumer, 1); + if (connection->producer) + kafka_destroy(connection->producer, 1); + //free previous brokers value, if any + if (connection->brokers) + efree(connection->brokers); + if (connection->compression) + efree(connection->compression); + //set brokers + connection->brokers = estrdup( + brokers + ); + //reinit to NULL + connection->producer = connection->consumer = NULL; + connection->compression = NULL; + //restore partitions back to random... + connection->consumer_partition = connection->producer_partition = PHP_KAFKA_PARTITION_RANDOM; + //set brokers member to correct value + //we can ditch this call, I think... + kafka_connect( + connection->brokers + ); + //return + RETURN_ZVAL(obj, 1, 0); +} +/* }}} end Kafka::setBrokers */ + +/* proto Kafka Kafka::setOptions( array $options ) + * Set connection options on the "fly" + */ +PHP_METHOD(Kafka, setOptions) +{ + zval *arr = NULL, + *obj = getThis(); + GET_KAFKA_CONNECTION(connection, obj); + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a", &arr) == FAILURE) + { + return; + } + if (parse_options_array(arr, &connection)) + return; + RETURN_ZVAL(obj, 1, 0); + +} +/* end proto Kafka::setOptions */ + +/* {{{ proto array Kafka::getPartitionsForTopic( string $topic ) + Get an array of available partitions for a given topic +*/ +PHP_METHOD(Kafka, getPartitionsForTopic) +{ + zval *obj = getThis(); + char *topic = NULL; + int topic_len = 0; + GET_KAFKA_CONNECTION(connection, obj); + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", + &topic, &topic_len) == FAILURE) { + return; + } + if (!connection->consumer) + { + kafka_connection_params config; + config.type = RD_KAFKA_CONSUMER; + config.log_level = connection->log_level; + config.queue_buffer = connection->queue_buffer; + config.compression = NULL; + connection->consumer = kafka_get_connection(config, connection->brokers); + if (connection->consumer == NULL) + { + zend_throw_exception(kafka_exception, "Failed to connect to kafka", 0 TSRMLS_CC); + return; + } + connection->rk_type = RD_KAFKA_CONSUMER; + } + array_init(return_value); + kafka_get_partitions(connection->consumer, return_value, topic); +} +/* }}} end Kafka::getPartitionsForTopic */ + +/* {{{ proto Kafka::getPartitionOffsets( string $topic ) + * Get an array containing all partitions and their respective first offsets + */ +PHP_METHOD(Kafka, getPartitionOffsets) +{ + char *topic = NULL; + int topic_len = 0, + kafka_r; + long *offsets = NULL, + i; + kafka_connection *connection = (kafka_connection *) zend_object_store_get_object( + getThis() TSRMLS_CC + ); + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", + &topic, &topic_len) == FAILURE) { + return; + } + if (!connection->consumer) + { + kafka_connection_params config; + config.type = RD_KAFKA_CONSUMER; + config.log_level = connection->log_level; + config.queue_buffer = connection->queue_buffer; + config.compression = NULL; + connection->consumer = kafka_get_connection(config, connection->brokers); + if (connection->consumer == NULL) + { + zend_throw_exception(kafka_exception, "Failed to connect to kafka", 0 TSRMLS_CC); + return; + } + connection->rk_type = RD_KAFKA_CONSUMER; + } + kafka_r = kafka_partition_offsets( + connection->consumer, + &offsets, + topic + ); + if (kafka_r < 1) { + char *msg = NULL; + if (kafka_r) + msg = kafka_r == -2 ? "No kafka connection" : "Allocation error"; + else + msg = "Failed to get metadata for topic"; + zend_throw_exception( + kafka_exception, + msg, + 0 TSRMLS_CC + ); + return; + } + array_init(return_value); + for (i=0;iconsumer) + kafka_destroy(connection->consumer, 1); + connection->consumer = NULL; + } + else + { + if (connection->producer) + kafka_destroy(connection->producer, 1); + connection->producer = NULL; + } + RETURN_TRUE; + } + if (connection->consumer) + kafka_destroy(connection->consumer, 1); + if (connection->producer) + kafka_destroy(connection->producer, 1); + connection->producer = connection->consumer = NULL; + connection->consumer_partition = connection->producer_partition = PHP_KAFKA_PARTITION_RANDOM; + RETURN_TRUE; +} +/* }}} end Kafka::disconnect */ + +/* {{{ proto Kafka Kafka::produce( string $topic, string $message [, int $timeout = 60000]); + Produce a message, returns instance + or throws KafkaException in case something went wrong +*/ PHP_METHOD(Kafka, produce) { zval *object = getThis(); + GET_KAFKA_CONNECTION(connection, object); char *topic; char *msg; - int topic_len; - int msg_len; + long reporting = connection->delivery_confirm_mode; + long timeout = 60000; + int topic_len, + msg_len, + status = 0; - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|l", &topic, &topic_len, - &msg, &msg_len) == FAILURE) { + &msg, &msg_len, + &timeout) == FAILURE) { return; } + if (!connection->producer) + { + kafka_connection_params config; + config.type = RD_KAFKA_PRODUCER; + config.log_level = connection->log_level; + config.reporting = connection->delivery_confirm_mode; + config.retry_count = connection->retry_count; + config.retry_interval = connection->retry_interval; + config.compression = connection->compression; + connection->producer = kafka_get_connection(config, connection->brokers); + if (connection->producer == NULL) + { + zend_throw_exception(kafka_exception, "Failed to connect to kafka", 0 TSRMLS_CC); + return; + } + connection->rk_type = RD_KAFKA_PRODUCER; + } + //this does nothing at this stage... + kafka_set_partition( + (int) connection->producer_partition + ); + if (connection->delivery_confirm_mode == PHP_KAFKA_CONFIRM_EXTENDED) + status = kafka_produce_report(connection->producer, topic, msg, msg_len, timeout); + else + status = kafka_produce(connection->producer, topic, msg, msg_len, connection->delivery_confirm_mode, timeout); + switch (status) + { + case -1: + zend_throw_exception(kafka_exception, "Failed to produce message", 0 TSRMLS_CC); + return; + case -2: + zend_throw_exception(kafka_exception, "Connection failure, cannot produce message", 0 TSRMLS_CC); + return; + case -3: + zend_throw_exception(kafka_exception, "Topic configuration error", 0 TSRMLS_CC); + return; + } + RETURN_ZVAL(object, 1, 0); +} +/* }}} end Kafka::produce */ - kafka_produce(topic, msg, msg_len); - - RETURN_TRUE; +/* {{{ proto Kafka Kafka::produceBatch( string $topic, array $messages [, int $timeout = 60000]); + Produce a batch of messages, returns instance + or throws exceptions in case of error +*/ +PHP_METHOD(Kafka, produceBatch) +{ + zval *arr, + *object = getThis(), + **entry; + GET_KAFKA_CONNECTION(connection, object); + char *topic; + char *msg; + char *msg_batch[50]; + int msg_batch_len[50] = {0}; + long reporting = connection->delivery_confirm_mode; + long timeout = 60000; + int topic_len, + msg_len, + current_idx = 0, + status = 0; + HashPosition pos; + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sa|l", + &topic, &topic_len, + &arr, + &timeout) == FAILURE) { + return; + } + //get producer up and running + if (!connection->producer) + { + kafka_connection_params config; + config.type = RD_KAFKA_PRODUCER; + config.log_level = connection->log_level; + config.reporting = connection->delivery_confirm_mode; + config.retry_count = connection->retry_count; + config.compression = connection->compression; + config.retry_interval = connection->retry_interval; + connection->producer = kafka_get_connection(config, connection->brokers); + if (connection->producer == NULL) + { + zend_throw_exception(kafka_exception, "Failed to connect to kafka", 0 TSRMLS_CC); + return; + } + connection->rk_type = RD_KAFKA_PRODUCER; + } + //this does nothing at this stage... + kafka_set_partition( + (int) connection->producer_partition + ); + //iterate array of messages, start producing them + //todo: change individual produce calls to a more performant + //produce queue... + zend_hash_internal_pointer_reset_ex(Z_ARRVAL_P(arr), &pos); + while (zend_hash_get_current_data_ex(Z_ARRVAL_P(arr), (void **)&entry, &pos) == SUCCESS) + { + if (Z_TYPE_PP(entry) == IS_STRING) + { + msg_batch[current_idx] = Z_STRVAL_PP(entry); + msg_batch_len[current_idx] = Z_STRLEN_PP(entry); + ++current_idx; + if (current_idx == 50) + { + status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode, timeout); + if (status) + { + if (status < 0) + zend_throw_exception(kafka_exception, "Failed to produce messages", 0 TSRMLS_CC); + else if (status > 0) + { + char err_msg[200]; + snprintf(err_msg, 200, "Produced messages with %d errors", status); + zend_throw_exception(kafka_exception, err_msg, 0 TSRMLS_CC); + } + return; + } + current_idx = 0;//reset batch counter + } + } + zend_hash_move_forward_ex(Z_ARRVAL_P(arr), &pos); + } + if (current_idx) + {//we still have some messages to produce... + status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode, timeout); + if (status) + { + if (status < 0) + zend_throw_exception(kafka_exception, "Failed to produce messages", 0 TSRMLS_CC); + else if (status > 0) + { + char err_msg[200]; + snprintf(err_msg, 200, "Produced messages with %d errors", status); + zend_throw_exception(kafka_exception, err_msg, 0 TSRMLS_CC); + } + return; + } + } + RETURN_ZVAL(object, 1, 0); } +/* end proto Kafka::produceBatch */ +/* {{{ proto array Kafka::consume( string $topic, [ string $offset = 0 [, mixed $length = 1] ] ); + Consumes 1 or more ($length) messages from the $offset (default 0) +*/ PHP_METHOD(Kafka, consume) { zval *object = getThis(); + GET_KAFKA_CONNECTION(connection, object); char *topic; int topic_len; char *offset; - int offset_len; - long item_count = 0; + int offset_len, status = 0; + long count = 0; + zval *item_count = NULL; - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|sl", + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|z", &topic, &topic_len, &offset, &offset_len, &item_count) == FAILURE) { return; } + if (item_count == NULL || Z_TYPE_P(item_count) == IS_NULL) + {//default + count = 1; + } + else + { + if (Z_TYPE_P(item_count) == IS_STRING && strcmp(Z_STRVAL_P(item_count), PHP_KAFKA_OFFSET_END) == 0) { + count = -1; + } else if (Z_TYPE_P(item_count) == IS_LONG) { + count = Z_LVAL_P(item_count); + } else { + zend_throw_exception( + kafka_exception, + "Invalid messageCount value passed to Kafka::consume, should be int or OFFSET constant", + 0 TSRMLS_CC + ); + } + } + if (count < -1 || count == 0) + { + zend_throw_exception( + kafka_exception, + "Invalid messageCount value passed to Kafka::consume", + 0 TSRMLS_CC + ); + } + if (!connection->consumer) + { + kafka_connection_params config; + config.type = RD_KAFKA_CONSUMER; + config.log_level = connection->log_level; + config.queue_buffer = connection->queue_buffer; + config.compression = NULL; + connection->consumer = kafka_get_connection(config, connection->brokers); + if (connection->consumer == NULL) + { + zend_throw_exception(kafka_exception, "Failed to connect to kafka", 0 TSRMLS_CC); + return; + } + connection->rk_type = RD_KAFKA_CONSUMER; + } array_init(return_value); - kafka_consume(return_value, topic, offset, item_count); - - if(return_value == NULL) { - RETURN_FALSE; + if (connection->consumer_partition == PHP_KAFKA_PARTITION_RANDOM) + { + kafka_consume_all( + connection->consumer, + return_value, + topic, + offset, + count + ); } -} \ No newline at end of file + else + { + status = kafka_consume( + connection->consumer, + return_value, + topic, + offset, + count, + connection->consumer_partition + ); + if (status) + { + switch (status) + { + case -1: + zend_throw_exception( + kafka_exception, + "Invalid offset passed, use Kafka::OFFSET_* constants, or positive integer!", + 0 TSRMLS_CC + ); + return; + case -2: + zend_throw_exception( + kafka_exception, + "No kafka connection available", + 0 TSRMLS_CC + ); + return; + case -3: + zend_throw_exception( + kafka_exception, + "Unable to access topic", + 0 TSRMLS_CC + ); + return; + case -4: + default: + zend_throw_exception( + kafka_exception, + "Consuming from topic failed", + 0 TSRMLS_CC + ); + return; + } + } + } +} +/* }}} end Kafka::consume */ diff --git a/php_kafka.h b/php_kafka.h index 3c535c6..fdfdd44 100644 --- a/php_kafka.h +++ b/php_kafka.h @@ -1,5 +1,5 @@ /** - * Copyright 2013-2014 Patrick Reilly. + * Copyright 2015 Elias Van Ootegem. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,13 +12,34 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Special thanks to Patrick Reilly and Aleksandar Babic for their work + * On which this extension was actually built. */ #ifndef PHP_KAFKA_H #define PHP_KAFKA_H 1 -#define PHP_KAFKA_VERSION "0.1.0-dev" +#define PHP_KAFKA_VERSION "0.2.0-dev" #define PHP_KAFKA_EXTNAME "kafka" - +#define PHP_KAFKA_OFFSET_BEGIN "beginning" +#define PHP_KAFKA_OFFSET_END "end" +#define PHP_KAFKA_LOG_ON 1 +#define PHP_KAFKA_LOG_OFF 0 +#define PHP_KAFKA_MODE_CONSUMER 0 +#define PHP_KAFKA_MODE_PRODUCER 1 +#define PHP_KAFKA_COMPRESSION_NONE "none" +#define PHP_KAFKA_COMPRESSION_GZIP "gzip" +#define PHP_KAFKA_COMPRESSION_SNAPPY "snappy" +//option constants... +#define PHP_KAFKA_RETRY_COUNT 1 +#define PHP_KAFKA_RETRY_INTERVAL 2 +#define PHP_KAFKA_CONFIRM_DELIVERY 4 +#define PHP_KAFKA_QUEUE_BUFFER_SIZE 8 +#define PHP_KAFKA_COMPRESSION_MODE 16 +#define PHP_KAFKA_LOGLEVEL 32 +#define PHP_KAFKA_CONFIRM_OFF 0 +#define PHP_KAFKA_CONFIRM_BASIC 1 +#define PHP_KAFKA_CONFIRM_EXTENDED 2 extern zend_module_entry kafka_module_entry; PHP_MSHUTDOWN_FUNCTION(kafka); @@ -29,12 +50,49 @@ PHP_RSHUTDOWN_FUNCTION(kafka); #ifdef ZTS #include #endif +#include "librdkafka/rdkafka.h" + +#define PHP_KAFKA_PARTITION_RANDOM RD_KAFKA_PARTITION_UA + +typedef struct _kafka_r { + zend_object std; + rd_kafka_t *consumer; + rd_kafka_t *producer; + char *brokers; + char *compression; + char *retry_count; + char *retry_interval; + int delivery_confirm_mode; + char *queue_buffer; + long consumer_partition; + long producer_partition; + int log_level; + rd_kafka_type_t rk_type; +} kafka_connection; + +//attach kafka connection to module +zend_object_value create_kafka_connection(zend_class_entry *class_type TSRMLS_DC); +void free_kafka_connection(void *object TSRMLS_DC); /* Kafka class */ static PHP_METHOD(Kafka, __construct); +static PHP_METHOD(Kafka, __destruct); +static PHP_METHOD(Kafka, setCompression); +static PHP_METHOD(Kafka, getCompression); static PHP_METHOD(Kafka, set_partition); +static PHP_METHOD(Kafka, setPartition); +static PHP_METHOD(Kafka, getPartition); +static PHP_METHOD(Kafka, setLogLevel); +static PHP_METHOD(Kafka, getPartitionsForTopic); +static PHP_METHOD(Kafka, getPartitionOffsets); +static PHP_METHOD(Kafka, isConnected); +static PHP_METHOD(Kafka, setBrokers); +static PHP_METHOD(Kafka, setOptions); +static PHP_METHOD(Kafka, getTopics); +static PHP_METHOD(Kafka, disconnect); +static PHP_METHOD(Kafka, produceBatch); static PHP_METHOD(Kafka, produce); static PHP_METHOD(Kafka, consume); PHPAPI void kafka_connect(char *brokers); -#endif \ No newline at end of file +#endif diff --git a/stub/Kafka.class.php b/stub/Kafka.class.php new file mode 100644 index 0000000..3e32ebd --- /dev/null +++ b/stub/Kafka.class.php @@ -0,0 +1,355 @@ +partition = $partition; + return $this; + } + + /** + * @param int $partition + * @param null|$mode + * @return $this + * @throws \KafkaException + */ + public function setPartition($partition, $mode = null) + { + if (!is_int($partition) || ($mode !== null)) { + throw new \KafkaException('Invalid arguments passed to Kafka::set_topics'); + } + if ($mode && $mode != self::MODE_CONSUMER && $mode != self::MODE_PRODUCER) { + throw new \KafkaException( + sprintf( + 'Invalid mode passed to %s, use Kafka::MODE_* constants', + __METHOD__ + ) + ); + } + if ($partition < self::PARTITION_RANDOM) { + throw new \KafkaException('Invalid partition'); + } + $this->partition = $partition; + return $this; + } + + /** + * @param array $options + * @throws KafkaException on invalid config + */ + public function setOptions(array $options) + {} + + /** + * Note, this disconnects a previously opened producer connection! + * @param string $compression + * @return $this + * @throws KafkaException + */ + public function setCompression($compression) + { + if ($compression !== self::COMPRESSION_NONE && $compression !== self::COMPRESSION_GZIP && $compression !== self::COMPRESSION_SNAPPY) + throw new KafkaException( + sprintf('Invalid argument, use %s::COMPRESSION_* constants', __CLASS__) + ); + $this->compression = $compression; + return $this; + } + + /** + * @return string + */ + public function getCompression() + { + return $this->compression; + } + + /** + * @param int $mode + * @return int + * @throws KafkaException + */ + public function getPartition($mode) + { + if ($mode != self::MODE_CONSUMER && $mode != self::MODE_PRODUCER) { + throw new \KafkaException( + sprintf( + 'Invalid argument passed to %s, use %s::MODE_* constants', + __METHOD__, + __CLASS__ + ) + ); + } + return $this->partition; + } + + /** + * @param int $level + * @return $this + * @throws \KafkaException (invalid argument) + */ + public function setLogLevel($level) + { + if (!is_int($level)) { + throw new KafkaException( + sprintf( + '%s expects argument to be an int', + __METHOD__ + ) + ); + } + if ($level != self::LOG_ON && $level != self::LOG_OFF) { + throw new KafkaException( + sprintf( + '%s argument invalid, use %s::LOG_* constants', + __METHOD__, + __CLASS__ + ) + ); + } + //level is passed to kafka backend + return $this; + } + + /** + * @param string $brokers + * @param array $options = null + * @return $this + * @throws \KafkaException + */ + public function setBrokers($brokers, array $options = null) + { + if (!is_string($brokers)) { + throw new \KafkaException( + sprintf( + '%s expects argument to be a string', + __CLASS__ + ) + ); + } + $this->brokers = $brokers; + return $this; + } + + /** + * + * @oaran int|null $mode + * @return bool + */ + public function isConnected($mode = null) + { + if ($mode == null) { + $mode = $this->lastMode; + } + if ($mode != self::MODE_CONSUMER && $mode != self::MODE_PRODUCER) { + throw new \KafkaException( + sprintf( + 'invalid argument passed to %s, use Kafka::MODE_* constants', + __METHOD__ + ) + ); + } + //connection pointers determine connected status + return $this->connected; + } + + /** + * produce message on topic + * @param string $topic + * @param string $message + * @param int $timeout + * @return $this + * @throws \KafkaException + */ + public function produce($topic, $message, $timeout=5000) + { + $this->connected = true; + //internal call, produce message on topic + //or throw exception + return $this; + } + + /** + * Produce a batch of messages without having PHP method calls + * Causing any overhead (internally, array is iterated, and produced + * @param string $topic + * @param array $messages + * @param int $timeout + * @return $this + * @throws \KafkaException + */ + public function produceBatch($topic, array $messages, $timeout=5000) + { + foreach ($messages as $msg) { + //non-string messages are skipped silently ATM + if (is_string($msg)) { + //internally, the method call overhead is not there + $this->produce($topic, $msg); + } + } + return $this; + } + + /** + * @param string $topic + * @param string|int $offset + * @param string|int $count + * @return array + */ + public function consume($topic, $offset = self::OFFSET_BEGIN, $count = self::OFFSET_END) + { + $this->connected = true; + $return = []; + if (!is_numeric($offset)) { + //0 or last message (whatever its offset might be) + $start = $offset == self::OFFSET_BEGIN ? 0 : 100; + } else { + $start = $offset; + } + if (!is_numeric($count)) { + //depending on amount of messages in topic + $count = 100; + } + return array_fill_keys( + range($start, $start + $count), + 'the message at the offset $key' + ); + } + + /** + * Returns an assoc array of topic names + * The value is the partition count + * @return array + */ + public function getTopics() + { + return [ + 'topicName' => 1 + ]; + } + + /** + * Disconnect a specific connection (producer/consumer) or both + * @param int|null $mode + * @return bool + */ + public function disconnect($mode = null) + { + if ($mode !== null && $mode != self::MODE_PRODUCER && $mode != self::MODE_CONSUMER) { + throw new \KafkaException( + sprintf( + 'invalid argument passed to %s, use Kafka::MODE_* constants', + __METHOD__ + ) + ); + } + $this->connected = false; + return true; + } + + /** + * Returns an array of ints (available partitions for topic) + * @param string $topic + * @return array + */ + public function getPartitionsForTopic($topic) + { + return []; + } + + /** + * Returns an array where keys are partition + * values are their respective beginning offsets + * if a partition has offset -1, the consume call failed + * @param string $topic + * @return array + * @throws \KafkaException when meta call failed or no partitions available + */ + public function getPartitionOffsets($topic) + { + return []; + } + + public function __destruct() + { + $this->connected = false; + } +} diff --git a/stub/KafkaException.class.php b/stub/KafkaException.class.php new file mode 100644 index 0000000..5560c84 --- /dev/null +++ b/stub/KafkaException.class.php @@ -0,0 +1,4 @@ + +--EXPECT-- +string(9) "beginning" diff --git a/tests/constant_end.phpt b/tests/constant_end.phpt new file mode 100644 index 0000000..7906c7d --- /dev/null +++ b/tests/constant_end.phpt @@ -0,0 +1,8 @@ +--TEST-- +Basic test for Kafka::OFFSET_END constant +--FILE-- + +--EXPECT-- +string(3) "end" diff --git a/tests/constant_mode.phpt b/tests/constant_mode.phpt new file mode 100644 index 0000000..5b88b3d --- /dev/null +++ b/tests/constant_mode.phpt @@ -0,0 +1,10 @@ +--TEST-- +Basic test for Kafka::MODE_* constants +--FILE-- + +--EXPECT-- +int(1) +int(0) diff --git a/tests/exceptiontest1.phpt b/tests/exceptiontest1.phpt new file mode 100644 index 0000000..8ca10fd --- /dev/null +++ b/tests/exceptiontest1.phpt @@ -0,0 +1,13 @@ +--TEST-- +Test custom exception class +--FILE-- +isConnected('InvalidParam'); +} catch (Exception $e) { + var_dump(get_class($e)); +} +?> +--EXPECT-- +string(14) "KafkaException" diff --git a/travis.sh b/travis.sh new file mode 100755 index 0000000..8bc3258 --- /dev/null +++ b/travis.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +echo "....fetching librdkafka dependency...." +mkdir tmp_build +cd tmp_build +## clone fork, we know this version of librdkafka works +git clone https://github.com/EVODelavega/librdkafka.git +echo ".....done....." +cd librdkafka +echo "....compiling librdkafka...." +./configure && make +sudo make install +echo "....done...." +cd ../../ +echo ".... ensure librdkafka is available....." +sudo ldconfig +rm -Rf tmp_build +echo ".... start building extension....." +phpize +./configure --enable-kafka +make +NO_INTERACTION=1 make test