Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit e85851f

Browse files
Merge pull request #16 from topcoder-platform/revert-15-develop
Revert "Fix bug in processor"
2 parents 5e53d8c + afe0d51 commit e85851f

File tree

6 files changed

+38
-33
lines changed

6 files changed

+38
-33
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ The following parameters can be set in config files or in env variables:
1313

1414
- LOG_LEVEL: the log level
1515
- PORT: the server port
16-
- KAFKA_GROUP_ID: group id of the consumer; default value: 'leaderboard-processor-group'
1716
- KAFKA_URL: comma separated Kafka hosts
1817
- KAFKA_CLIENT_CERT: Kafka connection certificate, optional;
1918
if not provided, then SSL connection is not used, direct insecure connection is used;

config/default.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ module.exports = {
66
LOG_LEVEL: process.env.LOG_LEVEL || 'debug',
77
PORT: process.env.PORT || 3000,
88

9-
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'leaderboard-processor-group',
109
KAFKA_URL: process.env.KAFKA_URL || 'localhost:9092',
1110
// below two params are used for secure Kafka connection, they are optional
1211
// for the local Kafka, they are not needed

package-lock.json

Lines changed: 18 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
"joi": "^14.0.0",
2828
"lodash": "^4.17.11",
2929
"mongoose": "^5.3.3",
30-
"no-kafka": "^3.4.3",
30+
"no-kafka": "^3.2.4",
3131
"superagent": "^3.8.3",
3232
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#feature/m2mtoken",
3333
"topcoder-healthcheck-dropin": "^1.0.3",

src/app.js

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
require('./bootstrap')
66
const config = require('config')
7+
const _ = require('lodash')
78
const logger = require('./common/logger')
89
const Kafka = require('no-kafka')
910
const healthcheck = require('topcoder-healthcheck-dropin')
@@ -12,14 +13,14 @@ const ProcessorService = require('./services/ProcessorService')
1213
// start Kafka consumer
1314
logger.info('Start Kafka consumer.')
1415
// create consumer
15-
const options = { connectionString: config.KAFKA_URL, groupId: config.KAFKA_GROUP_ID }
16+
const options = { connectionString: config.KAFKA_URL }
1617
if (config.KAFKA_CLIENT_CERT && config.KAFKA_CLIENT_CERT_KEY) {
1718
options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY }
1819
}
19-
const consumer = new Kafka.GroupConsumer(options)
20+
const consumer = new Kafka.SimpleConsumer(options)
2021

2122
// data handler
22-
const dataHandler = async (messageSet, topic, partition) => Promise.each(messageSet, async (m) => {
23+
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => {
2324
const message = m.message.value.toString('utf8')
2425
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
2526
m.offset}; Message: ${message}.`)
@@ -46,7 +47,7 @@ const dataHandler = async (messageSet, topic, partition) => Promise.each(message
4647
return
4748
}
4849

49-
try {
50+
return (async () => {
5051
switch (topic) {
5152
case config.CREATE_DATA_TOPIC:
5253
await ProcessorService.upsert(messageJSON)
@@ -60,12 +61,10 @@ const dataHandler = async (messageSet, topic, partition) => Promise.each(message
6061
default:
6162
throw new Error(`Invalid topic: ${topic}`)
6263
}
63-
64+
})()
6465
// commit offset
65-
await consumer.commitOffset({ topic, partition, offset: m.offset })
66-
} catch (err) {
67-
logger.logFullError(err)
68-
}
66+
.then(() => consumer.commitOffset({ topic, partition, offset: m.offset }))
67+
.catch((err) => logger.logFullError(err))
6968
})
7069

7170
// check if there is kafka connection alive
@@ -81,15 +80,16 @@ function check () {
8180
return connected
8281
}
8382

84-
const topics = [config.CREATE_DATA_TOPIC, config.UPDATE_DATA_TOPIC, config.DELETE_DATA_TOPIC]
85-
// consume configured topics
8683
consumer
87-
.init([{
88-
subscriptions: topics,
89-
handler: dataHandler
90-
}])
84+
.init()
85+
// consume configured topics
9186
.then(() => {
9287
healthcheck.init([check])
88+
89+
const topics = [config.CREATE_DATA_TOPIC, config.UPDATE_DATA_TOPIC, config.DELETE_DATA_TOPIC]
90+
_.each(topics, (tp) => {
91+
consumer.subscribe(tp, { time: Kafka.LATEST_OFFSET }, dataHandler)
92+
})
9393
})
9494
.catch((err) => logger.logFullError(err))
9595

src/services/ProcessorService.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ const {
1414
* Returns the tests passed using the metadata information
1515
* @param {object} metadata the object from which to retrieve the tests passed
1616
*/
17-
function getTestsPassed (metadata = {}) {
17+
function getTestsPassed(metadata = {}) {
1818
const tests = metadata.assertions || {}
1919

2020
let testsPassed = tests.total - tests.pending - tests.failed
@@ -30,7 +30,7 @@ function getTestsPassed (metadata = {}) {
3030
* Handle create / update topic messages from Kafka queue
3131
* @param {Object} message the Kafka message in JSON format
3232
*/
33-
const upsert = async (message) => {
33+
const upsert = async(message) => {
3434
const submission = await helper.reqToAPI(`${config.SUBMISSION_API_URL}/${message.payload.submissionId}`)
3535

3636
const existRecord = await Leaderboard.findOne({
@@ -71,7 +71,7 @@ const upsert = async (message) => {
7171

7272
if (!helper.isGroupIdValid(challengeDetail.body.result.content[0].groupIds)) {
7373
logger.debug(`Group ID of Challenge # ${submission.body.challengeId} is not configured for processing!`)
74-
// Ignore the message
74+
// Ignore the message
7575
return
7676
}
7777

@@ -107,7 +107,7 @@ upsert.schema = {
107107
* Handle delete topic message from Kafka Queue
108108
* @param {Object} message the Kafka message in JSON format
109109
*/
110-
const remove = async (message) => {
110+
const remove = async(message) => {
111111
// Remove the record from MongoDB
112112
await Leaderboard.deleteOne({
113113
reviewSummationId: message.payload.id

0 commit comments

Comments
 (0)