Skip to content

Commit c3b81b6

Browse files
author
Owen Smith
committed
Set timeout in produceBatch too
1 parent f854506 commit c3b81b6

File tree

3 files changed

+23
-4
lines changed

3 files changed

+23
-4
lines changed

kafka.c

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,9 @@ int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_le
468468
return 0;
469469
}
470470

471-
int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report)
471+
int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report, long timeout)
472472
{
473+
char errstr[512];
473474
rd_kafka_topic_t *rkt;
474475
struct produce_cb_params pcb = {msg_cnt, 0, 0, 0, 0, NULL};
475476
void *opaque;
@@ -496,6 +497,24 @@ int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, in
496497
/* Topic configuration */
497498
topic_conf = rd_kafka_topic_conf_new();
498499

500+
char timeoutStr[64];
501+
snprintf(timeoutStr, 64, "%lu", timeout);
502+
if (rd_kafka_topic_conf_set(topic_conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
503+
{
504+
if (log_level)
505+
{
506+
openlog("phpkafka", 0, LOG_USER);
507+
syslog(
508+
LOG_ERR,
509+
"Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s",
510+
timeout,
511+
errstr
512+
);
513+
}
514+
rd_kafka_topic_conf_destroy(topic_conf);
515+
return -3;
516+
}
517+
499518
/* Create topic */
500519
rkt = rd_kafka_topic_new(r, topic, topic_conf);
501520

kafka.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ void kafka_set_log_level(int ll);
3838
void kafka_set_partition(int partition);
3939
int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report, long timeout);
4040
int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len, long timeout);
41-
int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report);
41+
int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report, long timeout);
4242
rd_kafka_t *kafka_set_connection(rd_kafka_type_t type, const char *b, int report_level, const char *compression);
4343
rd_kafka_t *kafka_get_connection(kafka_connection_params params, const char *brokers);
4444
int kafka_consume(rd_kafka_t *r, zval* return_value, char* topic, char* offset, int item_count, int partition);

php_kafka.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,7 +1140,7 @@ PHP_METHOD(Kafka, produceBatch)
11401140
++current_idx;
11411141
if (current_idx == 50)
11421142
{
1143-
status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode);
1143+
status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode, timeout);
11441144
if (status)
11451145
{
11461146
if (status < 0)
@@ -1160,7 +1160,7 @@ PHP_METHOD(Kafka, produceBatch)
11601160
}
11611161
if (current_idx)
11621162
{//we still have some messages to produce...
1163-
status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode);
1163+
status = kafka_produce_batch(connection->producer, topic, msg_batch, msg_batch_len, current_idx, connection->delivery_confirm_mode, timeout);
11641164
if (status)
11651165
{
11661166
if (status < 0)

0 commit comments

Comments
 (0)