Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit d81eadd

Browse files
Fix bug in processor
1 parent 7043b27 commit d81eadd

File tree

6 files changed

+33
-38
lines changed

6 files changed

+33
-38
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The following parameters can be set in config files or in env variables:
1313

1414
- LOG_LEVEL: the log level
1515
- PORT: the server port
16+
- KAFKA_GROUP_ID: group id of the consumer; default value: 'leaderboard-processor-group'
1617
- KAFKA_URL: comma separated Kafka hosts
1718
- KAFKA_CLIENT_CERT: Kafka connection certificate, optional;
1819
if not provided, then SSL connection is not used, direct insecure connection is used;

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module.exports = {
66
LOG_LEVEL: process.env.LOG_LEVEL || 'debug',
77
PORT: process.env.PORT || 3000,
88

9+
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'leaderboard-processor-group',
910
KAFKA_URL: process.env.KAFKA_URL || 'localhost:9092',
1011
// below two params are used for secure Kafka connection, they are optional
1112
// for the local Kafka, they are not needed

package-lock.json

Lines changed: 11 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
"joi": "^14.0.0",
2828
"lodash": "^4.17.11",
2929
"mongoose": "^5.3.3",
30-
"no-kafka": "^3.2.4",
30+
"no-kafka": "^3.4.3",
3131
"superagent": "^3.8.3",
3232
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#feature/m2mtoken",
3333
"topcoder-healthcheck-dropin": "^1.0.3",

src/app.js

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
require('./bootstrap')
66
const config = require('config')
7-
const _ = require('lodash')
87
const logger = require('./common/logger')
98
const Kafka = require('no-kafka')
109
const healthcheck = require('topcoder-healthcheck-dropin')
@@ -13,14 +12,14 @@ const ProcessorService = require('./services/ProcessorService')
1312
// start Kafka consumer
1413
logger.info('Start Kafka consumer.')
1514
// create consumer
16-
const options = { connectionString: config.KAFKA_URL }
15+
const options = { connectionString: config.KAFKA_URL, groupId: config.KAFKA_GROUP_ID }
1716
if (config.KAFKA_CLIENT_CERT && config.KAFKA_CLIENT_CERT_KEY) {
1817
options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY }
1918
}
20-
const consumer = new Kafka.SimpleConsumer(options)
19+
const consumer = new Kafka.GroupConsumer(options)
2120

2221
// data handler
23-
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => {
22+
const dataHandler = async (messageSet, topic, partition) => Promise.each(messageSet, async (m) => {
2423
const message = m.message.value.toString('utf8')
2524
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
2625
m.offset}; Message: ${message}.`)
@@ -47,7 +46,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (
4746
return
4847
}
4948

50-
return (async () => {
49+
try {
5150
switch (topic) {
5251
case config.CREATE_DATA_TOPIC:
5352
await ProcessorService.upsert(messageJSON)
@@ -61,10 +60,12 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (
6160
default:
6261
throw new Error(`Invalid topic: ${topic}`)
6362
}
64-
})()
63+
6564
// commit offset
66-
.then(() => consumer.commitOffset({ topic, partition, offset: m.offset }))
67-
.catch((err) => logger.logFullError(err))
65+
await consumer.commitOffset({ topic, partition, offset: m.offset })
66+
} catch (err) {
67+
logger.logFullError(err)
68+
}
6869
})
6970

7071
// check if there is kafka connection alive
@@ -80,16 +81,15 @@ function check () {
8081
return connected
8182
}
8283

84+
const topics = [config.CREATE_DATA_TOPIC, config.UPDATE_DATA_TOPIC, config.DELETE_DATA_TOPIC]
85+
// consume configured topics
8386
consumer
84-
.init()
85-
// consume configured topics
87+
.init([{
88+
subscriptions: topics,
89+
handler: dataHandler
90+
}])
8691
.then(() => {
8792
healthcheck.init([check])
88-
89-
const topics = [config.CREATE_DATA_TOPIC, config.UPDATE_DATA_TOPIC, config.DELETE_DATA_TOPIC]
90-
_.each(topics, (tp) => {
91-
consumer.subscribe(tp, { time: Kafka.LATEST_OFFSET }, dataHandler)
92-
})
9393
})
9494
.catch((err) => logger.logFullError(err))
9595

src/services/ProcessorService.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ const {
1414
* Returns the tests passed using the metadata information
1515
* @param {object} metadata the object from which to retrieve the tests passed
1616
*/
17-
function getTestsPassed(metadata = {}) {
17+
function getTestsPassed (metadata = {}) {
1818
const tests = metadata.assertions || {}
1919

2020
let testsPassed = tests.total - tests.pending - tests.failed
@@ -30,7 +30,7 @@ function getTestsPassed(metadata = {}) {
3030
* Handle create / update topic messages from Kafka queue
3131
* @param {Object} message the Kafka message in JSON format
3232
*/
33-
const upsert = async(message) => {
33+
const upsert = async (message) => {
3434
const submission = await helper.reqToAPI(`${config.SUBMISSION_API_URL}/${message.payload.submissionId}`)
3535

3636
const existRecord = await Leaderboard.findOne({
@@ -71,7 +71,7 @@ const upsert = async(message) => {
7171

7272
if (!helper.isGroupIdValid(challengeDetail.body.result.content[0].groupIds)) {
7373
logger.debug(`Group ID of Challenge # ${submission.body.challengeId} is not configured for processing!`)
74-
// Ignore the message
74+
// Ignore the message
7575
return
7676
}
7777

@@ -107,7 +107,7 @@ upsert.schema = {
107107
* Handle delete topic message from Kafka Queue
108108
* @param {Object} message the Kafka message in JSON format
109109
*/
110-
const remove = async(message) => {
110+
const remove = async (message) => {
111111
// Remove the record from MongoDB
112112
await Leaderboard.deleteOne({
113113
reviewSummationId: message.payload.id

0 commit comments

Comments
 (0)