Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Sync master with dev #38

Merged
merged 13 commits into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Publish to Bus API
  • Loading branch information
xxcxy committed Oct 28, 2020
commit f7a859aed47cc7b6d4f017a18d220a2c16e2dc6d
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ The following parameters can be set in config files or in env variables:
- AMAZON.IS_LOCAL_DB: Use local or AWS Amazon DynamoDB
- AMAZON.DYNAMODB_URL: The local url, if using local Amazon DynamoDB
- HEALTH_CHECK_TIMEOUT: health check timeout in milliseconds
- BUSAPI_URL: Topcoder Bus API URL
- KAFKA_ERROR_TOPIC: The error topic at which bus api will publish any errors
- KAFKA_MESSAGE_ORIGINATOR: The originator value for the kafka messages
- LEADERBOARD_CREATE_TOPIC: Kafka topic for create message
- LEADERBOARD_UPDATE_TOPIC: Kafka topic for update message
- LEADERBOARD_DELETE_TOPIC: Kafka topic for delete message
- LEADERBOARD_AGGREGATE_TOPIC: Kafka topic that is used to combine all create, update and delete message(s)

## Local DynamoDB
Change to the ./docker-dynamodb directory and run `docker-compose up`.
Expand Down
10 changes: 9 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,13 @@ module.exports = {
// Below configuration is required if IS_LOCAL_DB is true
DYNAMODB_URL: process.env.DYNAMODB_URL || 'http://localhost:8000'
},
HEALTH_CHECK_TIMEOUT: process.env.HEALTH_CHECK_TIMEOUT || 3000
HEALTH_CHECK_TIMEOUT: process.env.HEALTH_CHECK_TIMEOUT || 3000,

BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'leaderboard-api',
LEADERBOARD_CREATE_TOPIC: process.env.LEADERBOARD_CREATE_TOPIC || 'leaderboard.action.create',
LEADERBOARD_UPDATE_TOPIC: process.env.LEADERBOARD_UPDATE_TOPIC || 'leaderboard.action.update',
LEADERBOARD_DELETE_TOPIC: process.env.LEADERBOARD_DELETE_TOPIC || 'leaderboard.action.delete',
LEADERBOARD_AGGREGATE_TOPIC: process.env.LEADERBOARD_AGGREGATE_TOPIC || 'leaderboard.action.aggregate'
}
52 changes: 52 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"joi": "^14.0.0",
"lodash": "^4.17.19",
"superagent": "^5.1.0",
"tc-bus-api-wrapper": "github:topcoder-platform/tc-bus-api-wrapper",
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v2.6.3",
"uuid": "^8.3.1",
"winston": "^3.1.0"
Expand Down
47 changes: 46 additions & 1 deletion src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,22 @@ const _ = require('lodash')
const config = require('config')
const request = require('superagent')
const models = require('../models')
const logger = require('./logger')
const m2mAuth = require('tc-core-library-js').auth.m2m
const busApi = require('tc-bus-api-wrapper')

const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))

const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))

// mapping operation to topic
const OP_TO_TOPIC = {
create: config.LEADERBOARD_CREATE_TOPIC,
update: config.LEADERBOARD_UPDATE_TOPIC,
delete: config.LEADERBOARD_DELETE_TOPIC
}

/*
* Check if the Group ID is configured to be processed
* @param {String []} groupIds Array of group ID
Expand Down Expand Up @@ -75,9 +87,42 @@ const autoWrapExpress = (obj) => {
return obj
}

/**
* Posts the message to bus api
* @param {String} op The action
* @param {String} resource The name of the resource
* @param {Object} result The payload
*/
async function publishMessage (op, resource, result) {
logger.debug(`Publishing message to bus: resource ${resource}, data ${JSON.stringify(result, null, 2)}`)
const topic = OP_TO_TOPIC[op]
const payload = _.assign({ resource }, result)
if (!OP_TO_TOPIC[op]) {
logger.warn(`Invalid operation: ${op}`)
return
}

logger.debug(`Posting event to Kafka topic ${topic}, ${JSON.stringify(payload, null, 2)}`)
const message = {
topic,
originator: config.KAFKA_MESSAGE_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': 'application/json',
payload
}
await busApiClient.postEvent(message)

// Post to the aggregate topic
message.payload.originalTopic = topic
message.topic = config.LEADERBOARD_AGGREGATE_TOPIC
logger.debug(`Posting event to aggregate topic ${message.topic}`)
await busApiClient.postEvent(message)
}

module.exports = {
isGroupIdValid,
reqToAPI,
wrapExpress,
autoWrapExpress
autoWrapExpress,
publishMessage
}
16 changes: 15 additions & 1 deletion src/services/GroupService.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
const _ = require('lodash')
const joi = require('joi')
const helper = require('../common/helper')
const logger = require('../common/logger')
const errors = require('../common/errors')
const { Group } = require('../models')
Expand All @@ -25,7 +26,15 @@ async function createGroup (groupId) {
if (entity) {
throw new errors.ConflictError(`groupId # ${groupId} already exists.`)
}
return Group.create({ groupId })

const dbEntity = await Group.create({ groupId })
try {
await helper.publishMessage('create', 'group', { groupId })
} catch (err) {
logger.logFullError(err)
}

return dbEntity
}

createGroup.schema = {
Expand All @@ -42,6 +51,11 @@ async function deleteGroup (groupId) {
throw new errors.NotFoundError(`groupId # ${groupId} doesn't exist`)
}
await Group.delete(entity)
try {
await helper.publishMessage('create', 'group', { groupId })
} catch (err) {
logger.logFullError(err)
}
}

deleteGroup.schema = {
Expand Down
24 changes: 22 additions & 2 deletions src/services/LeaderboardService.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,14 @@ async function createLeaderboard (challengeId, memberId, review) {
groupIds: _.map(groupIds, e => String(e))
}

return Leaderboard.create(record)
const dbEntity = await Leaderboard.create(record)
try {
await helper.publishMessage('create', 'leaderboard', record)
} catch (err) {
logger.logFullError(err)
}

return dbEntity
}

createLeaderboard.schema = {
Expand Down Expand Up @@ -244,7 +251,14 @@ async function updateLeaderboard (challengeId, memberId, review) {
})
}

return existRecords[0].save()
const dbEntity = await existRecords[0].save()
try {
await helper.publishMessage('update', 'leaderboard', existRecords[0])
} catch (err) {
logger.logFullError(err)
}

return dbEntity
}

updateLeaderboard.schema = {
Expand Down Expand Up @@ -326,6 +340,12 @@ async function deleteLeaderboard (reviewId) {
throw new errors.NotFoundError(`Leaderboard record with review id: ${reviewId} doesn't exist`)
}
await Leaderboard.delete(entity)

try {
await helper.publishMessage('delete', 'leaderboard', { reviewId })
} catch (err) {
logger.logFullError(err)
}
}

deleteLeaderboard.schema = {
Expand Down