Skip to content

Commit e62aee1

Browse files
committed
Fix CI performance run
1 parent 2f86688 commit e62aee1

File tree

3 files changed

+28
-22
lines changed

3 files changed

+28
-22
lines changed

ci/tests/run_perf_test.js

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,11 @@ function belowTarget(value, target) {
4242
async function main() {
4343
// Run performance tests and store outputs in memory
4444
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 50000;
45-
const skipCtpTest = process.env.SKIP_CTP_TEST ? process.env.SKIP_CTP_TEST === 'true' : false;
45+
let skipCtpTest = process.env.SKIP_CTP_TEST ? process.env.SKIP_CTP_TEST === 'true' : false;
4646
const concurrentRun = process.env.CONCURRENT_RUN ? process.env.CONCURRENT_RUN === 'true' : false;
47+
if (concurrentRun) {
48+
skipCtpTest = true;
49+
}
4750
const consumerMode = process.env.CONSUMER_MODE || 'all';
4851
const produceToSecondTopic = process.env.PRODUCE_TO_SECOND_TOPIC ? process.env.PRODUCE_TO_SECOND_TOPIC === 'true' : false;
4952
const produceToSecondTopicParam = produceToSecondTopic ? '--produce-to-second-topic' : '';
@@ -119,11 +122,11 @@ async function main() {
119122

120123
console.log('Running Confluent CTP test...');
121124
const outputConfluentCtp = skipCtpTest ? '' :
122-
runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
125+
(await runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp'));
123126

124127
console.log('Running KafkaJS CTP test...');
125128
const outputKjsCtp = skipCtpTest ? '' :
126-
runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
129+
(await runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp'));
127130

128131
// Extract Confluent results
129132
let ctpConfluent, ctpKjs;
@@ -278,9 +281,11 @@ async function main() {
278281
console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
279282
console.log(`Average RSS (eachMessage): confluent ${consumerConfluentMessageAverageRSS}, kafkajs ${consumerKjsMessageAverageRSS}`);
280283
console.log(`Max RSS (eachMessage): confluent ${consumerConfluentMessageMaxRSS}, kafkajs ${consumerKjsMessageMaxRSS}`);
281-
console.log(`Average broker lag (eachMessage): confluent ${consumerConfluentMessageAverageBrokerLag}, kafkajs ${consumerKjsMessageAverageBrokerLag}`);
282-
console.log(`Max broker lag (eachMessage): confluent ${consumerConfluentMessageMaxBrokerLag}, kafkajs ${consumerKjsMessageMaxBrokerLag}`);
283-
console.log(`Sample size for broker lag measurement (eachMessage): confluent ${consumerConfluentMessageTotalLagMeasurements}, kafkajs ${consumerKjsMessageTotalLagMeasurements}`);
284+
if (concurrentRun) {
285+
console.log(`Average broker lag (eachMessage): confluent ${consumerConfluentMessageAverageBrokerLag}, kafkajs ${consumerKjsMessageAverageBrokerLag}`);
286+
console.log(`Max broker lag (eachMessage): confluent ${consumerConfluentMessageMaxBrokerLag}, kafkajs ${consumerKjsMessageMaxBrokerLag}`);
287+
console.log(`Sample size for broker lag measurement (eachMessage): confluent ${consumerConfluentMessageTotalLagMeasurements}, kafkajs ${consumerKjsMessageTotalLagMeasurements}`);
288+
}
284289
}
285290
if (consumerModeAll || consumerModeEachBatch) {
286291
console.log(`Consumer rates MB/s (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
@@ -297,9 +302,11 @@ async function main() {
297302
console.log(`Average eachBatch size: confluent ${consumerConfluentBatchAverageSize}, kafkajs ${consumerKjsBatchAverageSize}`);
298303
console.log(`Average RSS (eachBatch): confluent ${consumerConfluentBatchAverageRSS}, kafkajs ${consumerKjsBatchAverageRSS}`);
299304
console.log(`Max RSS (eachBatch): confluent ${consumerConfluentBatchMaxRSS}, kafkajs ${consumerKjsBatchMaxRSS}`);
300-
console.log(`Average broker lag (eachBatch): confluent ${consumerConfluentBatchAverageBrokerLag}, kafkajs ${consumerKjsBatchAverageBrokerLag}`);
301-
console.log(`Max broker lag (eachBatch): confluent ${consumerConfluentBatchMaxBrokerLag}, kafkajs ${consumerKjsBatchMaxBrokerLag}`);
302-
console.log(`Sample size for broker lag measurement (eachBatch): confluent ${consumerConfluentBatchTotalLagMeasurements}, kafkajs ${consumerKjsBatchTotalLagMeasurements}`);
305+
if (concurrentRun) {
306+
console.log(`Average broker lag (eachBatch): confluent ${consumerConfluentBatchAverageBrokerLag}, kafkajs ${consumerKjsBatchAverageBrokerLag}`);
307+
console.log(`Max broker lag (eachBatch): confluent ${consumerConfluentBatchMaxBrokerLag}, kafkajs ${consumerKjsBatchMaxBrokerLag}`);
308+
console.log(`Sample size for broker lag measurement (eachBatch): confluent ${consumerConfluentBatchTotalLagMeasurements}, kafkajs ${consumerKjsBatchTotalLagMeasurements}`);
309+
}
303310
}
304311
if (!concurrentRun) {
305312
console.log(`Average RSS: confluent ${consumerConfluentAverageRSS}, kafkajs ${consumerKjsAverageRSS}`);

examples/performance/performance-consolidated.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,9 @@ function logParameters(parameters) {
207207
console.log(` ProduceTopic: ${topic2}`);
208208
console.log(` Message Count: ${messageCount}`);
209209
// Seed the topic with messages
210-
await runProducer(parameters, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
210+
await runProducerCKJS(parameters, topic, batchSize,
211+
warmupMessages, messageCount, messageSize, compression,
212+
randomness, limitRPS);
211213
startTrackingMemory();
212214
const ctpRate = await runConsumeTransformProduce(parameters, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
213215
endTrackingMemory('consume-transform-produce', `consume-transform-produce-${mode}.json`);

examples/performance/performance-primitives-common.js

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ else {
1414
}
1515
}
1616

17-
function installHandlers() {
17+
function installHandlers(useTerminateTimeout) {
1818
const handlers = {
1919
terminationRequested: false,
2020
terminateTimeout: null,
@@ -26,8 +26,10 @@ function installHandlers() {
2626
process.on('SIGINT', terminationRequestedCallback);
2727
process.on('SIGTERM', terminationRequestedCallback);
2828
handlers.terminationRequestedCallback = terminationRequestedCallback;
29-
handlers.terminateTimeout = setTimeout(terminationRequestedCallback,
30-
TERMINATE_TIMEOUT_MS);
29+
if (useTerminateTimeout) {
30+
handlers.terminateTimeout = setTimeout(terminationRequestedCallback,
31+
TERMINATE_TIMEOUT_MS);
32+
}
3133
return handlers;
3234
}
3335

@@ -57,7 +59,7 @@ function genericProduceToTopic(producer, topic, messages) {
5759
}
5860

5961
async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages) {
60-
const handlers = installHandlers();
62+
const handlers = installHandlers(totalMessageCnt === -1);
6163
await consumer.connect();
6264
await consumer.subscribe({ topic });
6365

@@ -179,12 +181,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
179181
if (!startTime) {
180182
startTime = hrtime.bigint();
181183
} else if (totalMessageCnt > 0 && messagesMeasured >= totalMessageCnt) {
182-
let durationNanos = Number(hrtime.bigint() - startTime);
183-
durationSeconds = durationNanos / 1e9;
184-
rate = durationNanos === 0 ? Infinity :
185-
(totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */
186-
console.log(`Recvd ${messagesMeasured} messages in ${durationSeconds} seconds, ${totalMessageSize} bytes; rate is ${rate} MB/s`);
187-
consumer.pause([{ topic }]);
184+
stopConsuming();
188185
}
189186
}
190187

@@ -243,7 +240,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
243240
}
244241

245242
async function runProducer(producer, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness, limitRPS) {
246-
const handlers = installHandlers();
243+
const handlers = installHandlers(totalMessageCnt === -1);
247244
let totalMessagesSent = 0;
248245
let totalBytesSent = 0;
249246

@@ -347,7 +344,7 @@ async function runProducer(producer, topic, batchSize, warmupMessages, totalMess
347344
}
348345

349346
async function runLagMonitoring(admin, topic) {
350-
const handlers = installHandlers();
347+
const handlers = installHandlers(true);
351348
let groupId = process.env.GROUPID_MONITOR;
352349
if (!groupId) {
353350
throw new Error("GROUPID_MONITOR environment variable not set");

0 commit comments

Comments
 (0)