Skip to content

Commit e97edff

Browse files
author
Elias Van Ootegem
committed
Use rd_kafka_produce_batch function if possible
1 parent 73dfdd7 commit e97edff

File tree

1 file changed

+37
-9
lines changed

1 file changed

+37
-9
lines changed

kafka.c

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,8 @@ int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, in
447447
struct produce_cb_params pcb = {msg_cnt, 0, 0, 0, 0, NULL};
448448
void *opaque;
449449
int partition = RD_KAFKA_PARTITION_UA;
450-
int i;
450+
int i,
451+
err_cnt = 0;
451452

452453
if (report)
453454
opaque = (void *) &pcb;
@@ -471,20 +472,45 @@ int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, in
471472
/* Create topic */
472473
rkt = rd_kafka_topic_new(r, topic, topic_conf);
473474

474-
for (i=0;i<msg_cnt;++i)
475+
//do we have VLA?
476+
rd_kafka_message_t *messages = calloc(sizeof *messages, msg_cnt);
477+
if (messages == NULL)
478+
{//fallback to individual produce calls
479+
for (i=0;i<msg_cnt;++i)
480+
{
481+
if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, msg[i], msg_len[i], NULL, 0, opaque) == -1)
482+
{
483+
if (log_level)
484+
{
485+
openlog("phpkafka", 0, LOG_USER);
486+
syslog(LOG_INFO, "phpkafka - %% Failed to produce to topic %s "
487+
"partition %i: %s",
488+
rd_kafka_topic_name(rkt), partition,
489+
rd_kafka_err2str(
490+
rd_kafka_errno2err(errno)));
491+
}
492+
}
493+
}
494+
}
495+
else
475496
{
476-
if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, msg[i], msg_len[i], NULL, 0, opaque) == -1)
497+
for (i=0;i<msg_cnt;++i)
498+
{
499+
messages[i].payload = msg[i];
500+
messages[i].len = msg_len[i];
501+
}
502+
i = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, messages, msg_cnt, NULL, 0, opaque);
503+
if (i < msg_cnt)
477504
{
478505
if (log_level)
479506
{
480507
openlog("phpkafka", 0, LOG_USER);
481-
syslog(LOG_INFO, "phpkafka - %% Failed to produce to topic %s "
482-
"partition %i: %s",
483-
rd_kafka_topic_name(rkt), partition,
484-
rd_kafka_err2str(
485-
rd_kafka_errno2err(errno)));
508+
syslog(LOG_WARNING, "Failed to queue full message batch, %d of %d were put in queue", i, msg_cnt);
486509
}
487510
}
511+
err_cnt = msg_cnt - i;
512+
free(messages);
513+
messages = NULL;
488514
}
489515
/* Poll to handle delivery reports */
490516
rd_kafka_poll(r, 0);
@@ -495,7 +521,9 @@ int kafka_produce_batch(rd_kafka_t *r, char *topic, char **msg, int *msg_len, in
495521

496522
//set global to NULL again
497523
rd_kafka_topic_destroy(rkt);
498-
return pcb.err_count;
524+
if (report)
525+
err_cnt = pcb.err_count;
526+
return err_cnt;
499527
}
500528

501529
int kafka_produce(rd_kafka_t *r, char* topic, char* msg, int msg_len, int report)

0 commit comments

Comments
 (0)