Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 5619585

Browse files
git push origin developMerge branch 'issue_40' into develop
2 parents ac91ab2 + 3ae937f commit 5619585

File tree

7 files changed

+152
-5
lines changed

7 files changed

+152
-5
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ The following parameters can be set in config files or in env variables:
2828
- AMAZON.IS_LOCAL_DB: Use local or AWS Amazon DynamoDB
2929
- AMAZON.DYNAMODB_URL: The local url, if using local Amazon DynamoDB
3030
- HEALTH_CHECK_TIMEOUT: health check timeout in milliseconds
31+
- BUSAPI_URL: Topcoder Bus API URL
32+
- KAFKA_ERROR_TOPIC: The error topic at which bus api will publish any errors
33+
- KAFKA_MESSAGE_ORIGINATOR: The originator value for the kafka messages
34+
- LEADERBOARD_CREATE_TOPIC: Kafka topic for create message
35+
- LEADERBOARD_UPDATE_TOPIC: Kafka topic for update message
36+
- LEADERBOARD_DELETE_TOPIC: Kafka topic for delete message
37+
- LEADERBOARD_AGGREGATE_TOPIC: Kafka topic that is used to combine all create, update and delete message(s)
3138

3239
## Local DynamoDB
3340
Change to the ./docker-dynamodb directory and run `docker-compose up`.

config/default.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,13 @@ module.exports = {
3131
// Below configuration is required if IS_LOCAL_DB is true
3232
DYNAMODB_URL: process.env.DYNAMODB_URL || 'http://localhost:8000'
3333
},
34-
HEALTH_CHECK_TIMEOUT: process.env.HEALTH_CHECK_TIMEOUT || 3000
34+
HEALTH_CHECK_TIMEOUT: process.env.HEALTH_CHECK_TIMEOUT || 3000,
35+
36+
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
37+
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
38+
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'leaderboard-api',
39+
LEADERBOARD_CREATE_TOPIC: process.env.LEADERBOARD_CREATE_TOPIC || 'leaderboard.action.create',
40+
LEADERBOARD_UPDATE_TOPIC: process.env.LEADERBOARD_UPDATE_TOPIC || 'leaderboard.action.update',
41+
LEADERBOARD_DELETE_TOPIC: process.env.LEADERBOARD_DELETE_TOPIC || 'leaderboard.action.delete',
42+
LEADERBOARD_AGGREGATE_TOPIC: process.env.LEADERBOARD_AGGREGATE_TOPIC || 'leaderboard.action.aggregate'
3543
}

package-lock.json

Lines changed: 52 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"joi": "^14.0.0",
3838
"lodash": "^4.17.19",
3939
"superagent": "^5.1.0",
40+
"tc-bus-api-wrapper": "github:topcoder-platform/tc-bus-api-wrapper",
4041
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v2.6.3",
4142
"uuid": "^8.3.1",
4243
"winston": "^3.1.0"

src/common/helper.js

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,22 @@ const _ = require('lodash')
66
const config = require('config')
77
const request = require('superagent')
88
const models = require('../models')
9+
const logger = require('./logger')
910
const m2mAuth = require('tc-core-library-js').auth.m2m
11+
const busApi = require('tc-bus-api-wrapper')
1012

1113
const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))
1214

15+
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
16+
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
17+
18+
// mapping operation to topic
19+
const OP_TO_TOPIC = {
20+
create: config.LEADERBOARD_CREATE_TOPIC,
21+
update: config.LEADERBOARD_UPDATE_TOPIC,
22+
delete: config.LEADERBOARD_DELETE_TOPIC
23+
}
24+
1325
/*
1426
* Check if the Group ID is configured to be processed
1527
* @param {String []} groupIds Array of group ID
@@ -75,9 +87,42 @@ const autoWrapExpress = (obj) => {
7587
return obj
7688
}
7789

90+
/**
91+
* Posts the message to bus api
92+
* @param {String} op The action
93+
* @param {String} resource The name of the resource
94+
* @param {Object} result The payload
95+
*/
96+
async function publishMessage (op, resource, result) {
97+
logger.debug(`Publishing message to bus: resource ${resource}, data ${JSON.stringify(result, null, 2)}`)
98+
const topic = OP_TO_TOPIC[op]
99+
const payload = _.assign({ resource }, result)
100+
if (!OP_TO_TOPIC[op]) {
101+
logger.warn(`Invalid operation: ${op}`)
102+
return
103+
}
104+
105+
logger.debug(`Posting event to Kafka topic ${topic}, ${JSON.stringify(payload, null, 2)}`)
106+
const message = {
107+
topic,
108+
originator: config.KAFKA_MESSAGE_ORIGINATOR,
109+
timestamp: new Date().toISOString(),
110+
'mime-type': 'application/json',
111+
payload
112+
}
113+
await busApiClient.postEvent(message)
114+
115+
// Post to the aggregate topic
116+
message.payload.originalTopic = topic
117+
message.topic = config.LEADERBOARD_AGGREGATE_TOPIC
118+
logger.debug(`Posting event to aggregate topic ${message.topic}`)
119+
await busApiClient.postEvent(message)
120+
}
121+
78122
module.exports = {
79123
isGroupIdValid,
80124
reqToAPI,
81125
wrapExpress,
82-
autoWrapExpress
126+
autoWrapExpress,
127+
publishMessage
83128
}

src/services/GroupService.js

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
const _ = require('lodash')
55
const joi = require('joi')
6+
const helper = require('../common/helper')
67
const logger = require('../common/logger')
78
const errors = require('../common/errors')
89
const { Group } = require('../models')
@@ -25,7 +26,15 @@ async function createGroup (groupId) {
2526
if (entity) {
2627
throw new errors.ConflictError(`groupId # ${groupId} already exists.`)
2728
}
28-
return Group.create({ groupId })
29+
30+
const dbEntity = await Group.create({ groupId })
31+
try {
32+
await helper.publishMessage('create', 'group', { groupId })
33+
} catch (err) {
34+
logger.logFullError(err)
35+
}
36+
37+
return dbEntity
2938
}
3039

3140
createGroup.schema = {
@@ -42,6 +51,11 @@ async function deleteGroup (groupId) {
4251
throw new errors.NotFoundError(`groupId # ${groupId} doesn't exist`)
4352
}
4453
await Group.delete(entity)
54+
try {
55+
await helper.publishMessage('delete', 'group', { groupId })
56+
} catch (err) {
57+
logger.logFullError(err)
58+
}
4559
}
4660

4761
deleteGroup.schema = {

src/services/LeaderboardService.js

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,14 @@ async function createLeaderboard (challengeId, memberId, review) {
114114
groupIds: _.map(groupIds, e => String(e))
115115
}
116116

117-
return Leaderboard.create(record)
117+
const dbEntity = await Leaderboard.create(record)
118+
try {
119+
await helper.publishMessage('create', 'leaderboard', record)
120+
} catch (err) {
121+
logger.logFullError(err)
122+
}
123+
124+
return dbEntity
118125
}
119126

120127
createLeaderboard.schema = {
@@ -244,7 +251,14 @@ async function updateLeaderboard (challengeId, memberId, review) {
244251
})
245252
}
246253

247-
return existRecords[0].save()
254+
const dbEntity = await existRecords[0].save()
255+
try {
256+
await helper.publishMessage('update', 'leaderboard', existRecords[0])
257+
} catch (err) {
258+
logger.logFullError(err)
259+
}
260+
261+
return dbEntity
248262
}
249263

250264
updateLeaderboard.schema = {
@@ -326,6 +340,12 @@ async function deleteLeaderboard (reviewId) {
326340
throw new errors.NotFoundError(`Leaderboard record with review id: ${reviewId} doesn't exist`)
327341
}
328342
await Leaderboard.delete(entity)
343+
344+
try {
345+
await helper.publishMessage('delete', 'leaderboard', { reviewId })
346+
} catch (err) {
347+
logger.logFullError(err)
348+
}
329349
}
330350

331351
deleteLeaderboard.schema = {

0 commit comments

Comments
 (0)