Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 6ba12fe

Browse files
Debugging why members solution did not work
1 parent dc3c600 commit 6ba12fe

File tree

3 files changed

+86
-44
lines changed

3 files changed

+86
-44
lines changed

src/app.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
global.Promise = require('bluebird')
66
const config = require('config')
77
const Kafka = require('no-kafka')
8+
const _ = require('lodash')
89
const healthcheck = require('topcoder-healthcheck-dropin')
910
const logger = require('./common/logger')
1011
const helper = require('./common/helper')
@@ -66,24 +67,25 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
6667
await consumer.commitOffset({ topic, partition, offset: m.offset })
6768
return
6869
}
69-
70+
const transactionId = _.uniqueId('transaction_')
7071
try {
7172
switch (topic) {
7273
case config.UBAHN_CREATE_TOPIC:
73-
await ProcessorService.processCreate(messageJSON)
74+
await ProcessorService.processCreate(messageJSON, transactionId)
7475
break
7576
case config.UBAHN_UPDATE_TOPIC:
76-
await ProcessorService.processUpdate(messageJSON)
77+
await ProcessorService.processUpdate(messageJSON, transactionId)
7778
break
7879
case config.UBAHN_DELETE_TOPIC:
79-
await ProcessorService.processDelete(messageJSON)
80+
await ProcessorService.processDelete(messageJSON, transactionId)
8081
break
8182
}
8283

8384
logger.debug(`Successfully processed message with count ${messageCount}`)
8485
} catch (err) {
8586
logger.logFullError(err)
8687
} finally {
88+
helper.checkEsMutexRelease(transactionId)
8789
logger.debug(`Commiting offset after processing message with count ${messageCount}`)
8890

8991
// Commit offset regardless of error

src/common/helper.js

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ AWS.config.region = config.ES.AWS_REGION
1313

1414
// Elasticsearch client
1515
let esClient
16+
let transactionId
1617
// Mutex to ensure that only one elasticsearch action is carried out at any given time
1718
const esClientMutex = new Mutex()
19+
const mutexReleaseMap = {}
1820

1921
/**
2022
* Get Kafka options
@@ -58,11 +60,24 @@ async function getESClient () {
5860
// Patch the transport to enable mutex
5961
esClient.transport.originalRequest = esClient.transport.request
6062
esClient.transport.request = async (params) => {
61-
const release = await esClientMutex.acquire()
63+
const tId = _.get(params.query, 'transactionId')
64+
params.query = _.omit(params.query, 'transactionId')
65+
if (!tId || tId !== transactionId) {
66+
const release = await esClientMutex.acquire()
67+
mutexReleaseMap[tId || 'noTransaction'] = release
68+
transactionId = tId
69+
}
6270
try {
6371
return await esClient.transport.originalRequest(params)
6472
} finally {
65-
release()
73+
if (params.method !== 'GET' || !tId) {
74+
const release = mutexReleaseMap[tId || 'noTransaction']
75+
delete mutexReleaseMap[tId || 'noTransaction']
76+
transactionId = undefined
77+
if (release) {
78+
release()
79+
}
80+
}
6681
}
6782
}
6883

@@ -86,32 +101,30 @@ function validProperties (payload, keys) {
86101
/**
87102
* Function to get user from es
88103
* @param {String} userId
89-
* @param {Boolean} sourceOnly
104+
* @param {String} transactionId
90105
* @returns {Object} user
91106
*/
92-
async function getUser (userId, sourceOnly = true) {
107+
async function getUser (userId, transactionId) {
93108
const client = await getESClient()
94-
95-
if (sourceOnly) {
96-
return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
97-
}
98-
99-
return client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
109+
const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, transactionId })
110+
return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }
100111
}
101112

102113
/**
103114
* Function to update es user
104115
* @param {String} userId
105116
* @param {Number} seqNo
106117
* @param {Number} primaryTerm
118+
* @param {String} transactionId
107119
* @param {Object} body
108120
*/
109-
async function updateUser (userId, body, seqNo, primaryTerm) {
121+
async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
110122
const client = await getESClient()
111123
await client.update({
112124
index: config.get('ES.USER_INDEX'),
113125
type: config.get('ES.USER_TYPE'),
114126
id: userId,
127+
transactionId,
115128
body: { doc: body },
116129
if_seq_no: seqNo,
117130
if_primary_term: primaryTerm
@@ -121,26 +134,33 @@ async function updateUser (userId, body, seqNo, primaryTerm) {
121134
/**
122135
* Function to get org from es
123136
* @param {String} organizationId
137+
* @param {String} transactionId
124138
* @returns {Object} organization
125139
*/
126-
async function getOrg (organizationId) {
140+
async function getOrg (organizationId, transactionId) {
127141
const client = await getESClient()
128-
return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId })
142+
const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, transactionId })
143+
return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
129144
}
130145

131146
/**
132147
* Function to update es organization
133148
* @param {String} organizationId
149+
* @param {Number} seqNo
150+
* @param {Number} primaryTerm
151+
* @param {String} transactionId
134152
* @param {Object} body
135153
*/
136-
async function updateOrg (organizationId, body) {
154+
async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) {
137155
const client = await getESClient()
138156
await client.update({
139157
index: config.get('ES.ORGANIZATION_INDEX'),
140158
type: config.get('ES.ORGANIZATION_TYPE'),
141159
id: organizationId,
160+
transactionId,
142161
body: { doc: body },
143-
refresh: 'true'
162+
if_seq_no: seqNo,
163+
if_primary_term: primaryTerm
144164
})
145165
}
146166

@@ -156,6 +176,21 @@ function getErrorWithStatus (message, statusCode) {
156176
return error
157177
}
158178

179+
/**
180+
* Ensure the esClient mutex is released
181+
* @param {String} tId transactionId
182+
*/
183+
function checkEsMutexRelease (tId) {
184+
if (tId === transactionId) {
185+
const release = mutexReleaseMap[tId]
186+
delete mutexReleaseMap[tId]
187+
transactionId = undefined
188+
if (release) {
189+
release()
190+
}
191+
}
192+
}
193+
159194
module.exports = {
160195
getKafkaOptions,
161196
getESClient,
@@ -164,5 +199,6 @@ module.exports = {
164199
updateUser,
165200
getOrg,
166201
updateOrg,
167-
getErrorWithStatus
202+
getErrorWithStatus,
203+
checkEsMutexRelease
168204
}

src/services/ProcessorService.js

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ const {
1515
/**
1616
* Process create entity message
1717
* @param {Object} message the kafka message
18+
* @param {String} transactionId
1819
*/
19-
async function processCreate (message) {
20+
async function processCreate (message, transactionId) {
2021
const resource = message.payload.resource
2122
if (_.includes(_.keys(topResources), resource)) {
2223
// process the top resources such as user, skill...
@@ -33,7 +34,7 @@ async function processCreate (message) {
3334
// process user resources such as userSkill, userAttribute...
3435
const userResource = userResources[resource]
3536
userResource.validate(message.payload)
36-
const user = await helper.getUser(message.payload.userId)
37+
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
3738
const relateId = message.payload[userResource.relateKey]
3839
if (!user[userResource.propertyName]) {
3940
user[userResource.propertyName] = []
@@ -45,13 +46,13 @@ async function processCreate (message) {
4546
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
4647
} else {
4748
user[userResource.propertyName].push(_.omit(message.payload, 'resource'))
48-
await helper.updateUser(message.payload.userId, user)
49+
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
4950
}
5051
} else if (_.includes(_.keys(organizationResources), resource)) {
5152
// process org resources such as org skill provider
5253
const orgResources = organizationResources[resource]
5354
orgResources.validate(message.payload)
54-
const org = await helper.getOrg(message.payload.organizationId)
55+
const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId)
5556
const relateId = message.payload[orgResources.relateKey]
5657
if (!org[orgResources.propertyName]) {
5758
org[orgResources.propertyName] = []
@@ -63,7 +64,7 @@ async function processCreate (message) {
6364
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
6465
} else {
6566
org[orgResources.propertyName].push(_.omit(message.payload, 'resource'))
66-
await helper.updateOrg(message.payload.organizationId, org)
67+
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
6768
}
6869
} else {
6970
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`)
@@ -79,14 +80,16 @@ processCreate.schema = {
7980
payload: Joi.object().keys({
8081
resource: Joi.string().required()
8182
}).required().unknown(true)
82-
}).required()
83+
}).required(),
84+
transactionId: Joi.string().required()
8385
}
8486

8587
/**
8688
* Process update entity message
8789
* @param {Object} message the kafka message
90+
* @param {String} transactionId
8891
*/
89-
async function processUpdate (message) {
92+
async function processUpdate (message, transactionId) {
9093
const resource = message.payload.resource
9194
if (_.includes(_.keys(topResources), resource)) {
9295
logger.info(`Processing top level resource: ${resource}`)
@@ -95,15 +98,16 @@ async function processUpdate (message) {
9598
const client = await helper.getESClient()
9699
const { index, type } = topResources[resource]
97100
const id = message.payload.id
98-
const source = await client.getSource({ index, type, id })
101+
const source = await client.get({ index, type, id, transaction: true })
99102
await client.update({
100103
index,
101104
type,
102105
id,
103106
body: {
104-
doc: _.assign(source, _.omit(message.payload, 'resource'))
107+
doc: _.assign(source._source, _.omit(message.payload, 'resource'))
105108
},
106-
refresh: true
109+
if_seq_no: source._seq_no,
110+
if_primary_term: source._primary_term
107111
})
108112
} else if (_.includes(_.keys(userResources), resource)) {
109113
// process user resources such as userSkill, userAttribute...
@@ -112,10 +116,7 @@ async function processUpdate (message) {
112116
logger.info(`Processing user level resource: ${resource}:${relateId}`)
113117
userResource.validate(message.payload)
114118
logger.info(`Resource validated for ${relateId}`)
115-
let user = await helper.getUser(message.payload.userId, false)
116-
const seqNo = user._seq_no
117-
const primaryTerm = user._primary_term
118-
user = user._source
119+
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
119120
logger.info(`User fetched ${user.id} and ${relateId}`)
120121
// const relateId = message.payload[userResource.relateKey]
121122

@@ -127,15 +128,15 @@ async function processUpdate (message) {
127128
const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
128129
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
129130
logger.info(`Updating ${user.id} and ${relateId}`)
130-
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm)
131+
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
131132
logger.info(`Updated ${user.id} and ${relateId}`)
132133
}
133134
} else if (_.includes(_.keys(organizationResources), resource)) {
134135
logger.info(`Processing org level resource: ${resource}`)
135136
// process org resources such as org skill providers
136137
const orgResource = organizationResources[resource]
137138
orgResource.validate(message.payload)
138-
const org = await helper.getOrg(message.payload.organizationId)
139+
const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId)
139140
const relateId = message.payload[orgResource.relateKey]
140141

141142
// check the resource exist
@@ -145,7 +146,7 @@ async function processUpdate (message) {
145146
} else {
146147
const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId])
147148
org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
148-
await helper.updateOrg(message.payload.organizationId, org)
149+
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
149150
}
150151
} else {
151152
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`)
@@ -161,14 +162,16 @@ processUpdate.schema = {
161162
payload: Joi.object().keys({
162163
resource: Joi.string().required()
163164
}).required().unknown(true)
164-
}).required()
165+
}).required(),
166+
transactionId: Joi.string().required()
165167
}
166168

167169
/**
168170
* Process delete entity message
169171
* @param {Object} message the kafka message
172+
* @param {String} transactionId
170173
*/
171-
async function processDelete (message) {
174+
async function processDelete (message, transactionId) {
172175
const resource = message.payload.resource
173176
if (_.includes(_.keys(topResources), resource)) {
174177
// process the top resources such as user, skill...
@@ -184,7 +187,7 @@ async function processDelete (message) {
184187
// process user resources such as userSkill, userAttribute...
185188
const userResource = userResources[resource]
186189
userResource.validate(message.payload)
187-
const user = await helper.getUser(message.payload.userId)
190+
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
188191
const relateId = message.payload[userResource.relateKey]
189192

190193
// check the resource exist
@@ -193,13 +196,13 @@ async function processDelete (message) {
193196
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
194197
} else {
195198
_.remove(user[userResource.propertyName], [userResource.relateKey, relateId])
196-
await helper.updateUser(message.payload.userId, user)
199+
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
197200
}
198201
} else if (_.includes(_.keys(organizationResources), resource)) {
199202
// process user resources such as org skill provider
200203
const orgResource = organizationResources[resource]
201204
orgResource.validate(message.payload)
202-
const org = await helper.getOrg(message.payload.organizationId)
205+
const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId)
203206
const relateId = message.payload[orgResource.relateKey]
204207

205208
// check the resource exist
@@ -208,7 +211,7 @@ async function processDelete (message) {
208211
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
209212
} else {
210213
_.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId])
211-
await helper.updateOrg(message.payload.organizationId, org)
214+
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
212215
}
213216
} else {
214217
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`)
@@ -224,7 +227,8 @@ processDelete.schema = {
224227
payload: Joi.object().keys({
225228
resource: Joi.string().required()
226229
}).required().unknown(true)
227-
}).required()
230+
}).required(),
231+
transactionId: Joi.string().required()
228232
}
229233

230234
module.exports = {

0 commit comments

Comments
 (0)