Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Revert "Fix bug in processor" #16

Merged
merged 1 commit into from
Jan 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ The following parameters can be set in config files or in env variables:

- LOG_LEVEL: the log level
- PORT: the server port
- KAFKA_GROUP_ID: group id of the consumer; default value: 'leaderboard-processor-group'
- KAFKA_URL: comma separated Kafka hosts
- KAFKA_CLIENT_CERT: Kafka connection certificate, optional;
if not provided, then SSL connection is not used, direct insecure connection is used;
Expand Down
1 change: 0 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ module.exports = {
LOG_LEVEL: process.env.LOG_LEVEL || 'debug',
PORT: process.env.PORT || 3000,

KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'leaderboard-processor-group',
KAFKA_URL: process.env.KAFKA_URL || 'localhost:9092',
// below two params are used for secure Kafka connection, they are optional
// for the local Kafka, they are not needed
Expand Down
29 changes: 18 additions & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"joi": "^14.0.0",
"lodash": "^4.17.11",
"mongoose": "^5.3.3",
"no-kafka": "^3.4.3",
"no-kafka": "^3.2.4",
"superagent": "^3.8.3",
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#feature/m2mtoken",
"topcoder-healthcheck-dropin": "^1.0.3",
Expand Down
30 changes: 15 additions & 15 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

require('./bootstrap')
const config = require('config')
const _ = require('lodash')
const logger = require('./common/logger')
const Kafka = require('no-kafka')
const healthcheck = require('topcoder-healthcheck-dropin')
Expand All @@ -12,14 +13,14 @@ const ProcessorService = require('./services/ProcessorService')
// start Kafka consumer
logger.info('Start Kafka consumer.')
// create consumer
const options = { connectionString: config.KAFKA_URL, groupId: config.KAFKA_GROUP_ID }
const options = { connectionString: config.KAFKA_URL }
if (config.KAFKA_CLIENT_CERT && config.KAFKA_CLIENT_CERT_KEY) {
options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY }
}
const consumer = new Kafka.GroupConsumer(options)
const consumer = new Kafka.SimpleConsumer(options)

// data handler
const dataHandler = async (messageSet, topic, partition) => Promise.each(messageSet, async (m) => {
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => {
const message = m.message.value.toString('utf8')
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
m.offset}; Message: ${message}.`)
Expand All @@ -46,7 +47,7 @@ const dataHandler = async (messageSet, topic, partition) => Promise.each(message
return
}

try {
return (async () => {
switch (topic) {
case config.CREATE_DATA_TOPIC:
await ProcessorService.upsert(messageJSON)
Expand All @@ -60,12 +61,10 @@ const dataHandler = async (messageSet, topic, partition) => Promise.each(message
default:
throw new Error(`Invalid topic: ${topic}`)
}

})()
// commit offset
await consumer.commitOffset({ topic, partition, offset: m.offset })
} catch (err) {
logger.logFullError(err)
}
.then(() => consumer.commitOffset({ topic, partition, offset: m.offset }))
.catch((err) => logger.logFullError(err))
})

// check if there is kafka connection alive
Expand All @@ -81,15 +80,16 @@ function check () {
return connected
}

const topics = [config.CREATE_DATA_TOPIC, config.UPDATE_DATA_TOPIC, config.DELETE_DATA_TOPIC]
// consume configured topics
consumer
.init([{
subscriptions: topics,
handler: dataHandler
}])
.init()
// consume configured topics
.then(() => {
healthcheck.init([check])

const topics = [config.CREATE_DATA_TOPIC, config.UPDATE_DATA_TOPIC, config.DELETE_DATA_TOPIC]
_.each(topics, (tp) => {
consumer.subscribe(tp, { time: Kafka.LATEST_OFFSET }, dataHandler)
})
})
.catch((err) => logger.logFullError(err))

Expand Down
8 changes: 4 additions & 4 deletions src/services/ProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const {
* Returns the tests passed using the metadata information
* @param {object} metadata the object from which to retrieve the tests passed
*/
function getTestsPassed (metadata = {}) {
function getTestsPassed(metadata = {}) {
const tests = metadata.assertions || {}

let testsPassed = tests.total - tests.pending - tests.failed
Expand All @@ -30,7 +30,7 @@ function getTestsPassed (metadata = {}) {
* Handle create / update topic messages from Kafka queue
* @param {Object} message the Kafka message in JSON format
*/
const upsert = async (message) => {
const upsert = async(message) => {
const submission = await helper.reqToAPI(`${config.SUBMISSION_API_URL}/${message.payload.submissionId}`)

const existRecord = await Leaderboard.findOne({
Expand Down Expand Up @@ -71,7 +71,7 @@ const upsert = async (message) => {

if (!helper.isGroupIdValid(challengeDetail.body.result.content[0].groupIds)) {
logger.debug(`Group ID of Challenge # ${submission.body.challengeId} is not configured for processing!`)
// Ignore the message
// Ignore the message
return
}

Expand Down Expand Up @@ -107,7 +107,7 @@ upsert.schema = {
* Handle delete topic message from Kafka Queue
* @param {Object} message the Kafka message in JSON format
*/
const remove = async (message) => {
const remove = async(message) => {
// Remove the record from MongoDB
await Leaderboard.deleteOne({
reviewSummationId: message.payload.id
Expand Down