Skip to content

Commit f854506

Browse files
author
Owen Smith
committed
Set timeout in the 'extended report' version too
1 parent 1858296 commit f854506

File tree

3 files changed

+23
-3
lines changed

3 files changed

+23
-3
lines changed

kafka.c

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ static void kafka_init( rd_kafka_type_t type )
394394
}
395395
}
396396

397-
int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len)
397+
int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len, long timeout)
398398
{
399399
char errstr[512];
400400
rd_kafka_topic_t *rkt = NULL;
@@ -416,6 +416,25 @@ int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_le
416416
conf = rd_kafka_topic_conf_new();
417417

418418
rd_kafka_topic_conf_set(conf,"produce.offset.report", "true", errstr, sizeof errstr );
419+
420+
char timeoutStr[64];
421+
snprintf(timeoutStr, 64, "%lu", timeout);
422+
if (rd_kafka_topic_conf_set(conf, "message.timeout.ms", timeoutStr, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
423+
{
424+
if (log_level)
425+
{
426+
openlog("phpkafka", 0, LOG_USER);
427+
syslog(
428+
LOG_ERR,
429+
"Failed to configure topic param 'message.timeout.ms' to %lu before producing; config err was: %s",
430+
timeout,
431+
errstr
432+
);
433+
}
434+
rd_kafka_topic_conf_destroy(conf);
435+
return -3;
436+
}
437+
419438
//callback already set in kafka_set_connection
420439
rkt = rd_kafka_topic_new(r, topic, conf);
421440
if (!rkt)
@@ -428,6 +447,7 @@ int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_le
428447
rd_kafka_topic_conf_destroy(conf);
429448
return -1;
430449
}
450+
431451
//begin producing:
432452
if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, msg, msg_len,NULL, 0,&pcb) == -1)
433453
{

kafka.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ void kafka_setup(char *brokers);
3737
void kafka_set_log_level(int ll);
3838
void kafka_set_partition(int partition);
3939
int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report, long timeout);
40-
int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len);
40+
int kafka_produce_report(rd_kafka_t *r, const char *topic, char *msg, int msg_len, long timeout);
4141
int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, int msg_cnt, int report);
4242
rd_kafka_t *kafka_set_connection(rd_kafka_type_t type, const char *b, int report_level, const char *compression);
4343
rd_kafka_t *kafka_get_connection(kafka_connection_params params, const char *brokers);

php_kafka.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ PHP_METHOD(Kafka, produce)
10591059
(int) connection->producer_partition
10601060
);
10611061
if (connection->delivery_confirm_mode == PHP_KAFKA_CONFIRM_EXTENDED)
1062-
status = kafka_produce_report(connection->producer, topic, msg, msg_len);
1062+
status = kafka_produce_report(connection->producer, topic, msg, msg_len, timeout);
10631063
else
10641064
status = kafka_produce(connection->producer, topic, msg, msg_len, connection->delivery_confirm_mode, timeout);
10651065
switch (status)

0 commit comments

Comments
 (0)