Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 5424805

Browse files
Revert "Group elasticsearch actions on the same message to use the same mutex"
1 parent 555320c commit 5424805

File tree

3 files changed

+44
-86
lines changed

3 files changed

+44
-86
lines changed

src/app.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
global.Promise = require('bluebird')
66
const config = require('config')
77
const Kafka = require('no-kafka')
8-
const _ = require('lodash')
98
const healthcheck = require('topcoder-healthcheck-dropin')
109
const logger = require('./common/logger')
1110
const helper = require('./common/helper')
@@ -67,25 +66,24 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
6766
await consumer.commitOffset({ topic, partition, offset: m.offset })
6867
return
6968
}
70-
const transactionId = _.uniqueId('transaction_')
69+
7170
try {
7271
switch (topic) {
7372
case config.UBAHN_CREATE_TOPIC:
74-
await ProcessorService.processCreate(messageJSON, transactionId)
73+
await ProcessorService.processCreate(messageJSON)
7574
break
7675
case config.UBAHN_UPDATE_TOPIC:
77-
await ProcessorService.processUpdate(messageJSON, transactionId)
76+
await ProcessorService.processUpdate(messageJSON)
7877
break
7978
case config.UBAHN_DELETE_TOPIC:
80-
await ProcessorService.processDelete(messageJSON, transactionId)
79+
await ProcessorService.processDelete(messageJSON)
8180
break
8281
}
8382

8483
logger.debug(`Successfully processed message with count ${messageCount}`)
8584
} catch (err) {
8685
logger.logFullError(err)
8786
} finally {
88-
helper.checkEsMutexRelease(transactionId)
8987
logger.debug(`Commiting offset after processing message with count ${messageCount}`)
9088

9189
// Commit offset regardless of error

src/common/helper.js

Lines changed: 16 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@ AWS.config.region = config.ES.AWS_REGION
1313

1414
// Elasticsearch client
1515
let esClient
16-
let transactionId
1716
// Mutex to ensure that only one elasticsearch action is carried out at any given time
1817
const esClientMutex = new Mutex()
19-
const mutexReleaseMap = {}
2018

2119
/**
2220
* Get Kafka options
@@ -60,24 +58,11 @@ async function getESClient () {
6058
// Patch the transport to enable mutex
6159
esClient.transport.originalRequest = esClient.transport.request
6260
esClient.transport.request = async (params) => {
63-
const tId = _.get(params.query, 'transactionId')
64-
params.query = _.omit(params.query, 'transactionId')
65-
if (!tId || tId !== transactionId) {
66-
const release = await esClientMutex.acquire()
67-
mutexReleaseMap[tId || 'noTransaction'] = release
68-
transactionId = tId
69-
}
61+
const release = await esClientMutex.acquire()
7062
try {
7163
return await esClient.transport.originalRequest(params)
7264
} finally {
73-
if (params.method !== 'GET' || !tId) {
74-
const release = mutexReleaseMap[tId || 'noTransaction']
75-
delete mutexReleaseMap[tId || 'noTransaction']
76-
transactionId = undefined
77-
if (release) {
78-
release()
79-
}
80-
}
65+
release()
8166
}
8267
}
8368

@@ -101,30 +86,32 @@ function validProperties (payload, keys) {
10186
/**
10287
* Function to get user from es
10388
* @param {String} userId
104-
* @param {String} transactionId
89+
* @param {Boolean} sourceOnly
10590
* @returns {Object} user
10691
*/
107-
async function getUser (userId, transactionId) {
92+
async function getUser (userId, sourceOnly = true) {
10893
const client = await getESClient()
109-
const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, transactionId })
110-
return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }
94+
95+
if (sourceOnly) {
96+
return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
97+
}
98+
99+
return client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
111100
}
112101

113102
/**
114103
* Function to update es user
115104
* @param {String} userId
116105
* @param {Number} seqNo
117106
* @param {Number} primaryTerm
118-
* @param {String} transactionId
119107
* @param {Object} body
120108
*/
121-
async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
109+
async function updateUser (userId, body, seqNo, primaryTerm) {
122110
const client = await getESClient()
123111
await client.update({
124112
index: config.get('ES.USER_INDEX'),
125113
type: config.get('ES.USER_TYPE'),
126114
id: userId,
127-
transactionId,
128115
body: { doc: body },
129116
if_seq_no: seqNo,
130117
if_primary_term: primaryTerm
@@ -134,33 +121,26 @@ async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
134121
/**
135122
* Function to get org from es
136123
* @param {String} organizationId
137-
* @param {String} transactionId
138124
* @returns {Object} organization
139125
*/
140-
async function getOrg (organizationId, transactionId) {
126+
async function getOrg (organizationId) {
141127
const client = await getESClient()
142-
const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, transactionId })
143-
return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
128+
return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId })
144129
}
145130

146131
/**
147132
* Function to update es organization
148133
* @param {String} organizationId
149-
* @param {Number} seqNo
150-
* @param {Number} primaryTerm
151-
* @param {String} transactionId
152134
* @param {Object} body
153135
*/
154-
async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) {
136+
async function updateOrg (organizationId, body) {
155137
const client = await getESClient()
156138
await client.update({
157139
index: config.get('ES.ORGANIZATION_INDEX'),
158140
type: config.get('ES.ORGANIZATION_TYPE'),
159141
id: organizationId,
160-
transactionId,
161142
body: { doc: body },
162-
if_seq_no: seqNo,
163-
if_primary_term: primaryTerm
143+
refresh: 'true'
164144
})
165145
}
166146

@@ -176,21 +156,6 @@ function getErrorWithStatus (message, statusCode) {
176156
return error
177157
}
178158

179-
/**
180-
* Ensure the esClient mutex is released
181-
* @param {String} tId transactionId
182-
*/
183-
function checkEsMutexRelease (tId) {
184-
if (tId === transactionId) {
185-
const release = mutexReleaseMap[tId]
186-
delete mutexReleaseMap[tId]
187-
transactionId = undefined
188-
if (release) {
189-
release()
190-
}
191-
}
192-
}
193-
194159
module.exports = {
195160
getKafkaOptions,
196161
getESClient,
@@ -199,6 +164,5 @@ module.exports = {
199164
updateUser,
200165
getOrg,
201166
updateOrg,
202-
getErrorWithStatus,
203-
checkEsMutexRelease
167+
getErrorWithStatus
204168
}

src/services/ProcessorService.js

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ const {
1515
/**
1616
* Process create entity message
1717
* @param {Object} message the kafka message
18-
* @param {String} transactionId
1918
*/
20-
async function processCreate (message, transactionId) {
19+
async function processCreate (message) {
2120
const resource = message.payload.resource
2221
if (_.includes(_.keys(topResources), resource)) {
2322
// process the top resources such as user, skill...
@@ -34,7 +33,7 @@ async function processCreate (message, transactionId) {
3433
// process user resources such as userSkill, userAttribute...
3534
const userResource = userResources[resource]
3635
userResource.validate(message.payload)
37-
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
36+
const user = await helper.getUser(message.payload.userId)
3837
const relateId = message.payload[userResource.relateKey]
3938
if (!user[userResource.propertyName]) {
4039
user[userResource.propertyName] = []
@@ -46,13 +45,13 @@ async function processCreate (message, transactionId) {
4645
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
4746
} else {
4847
user[userResource.propertyName].push(_.omit(message.payload, 'resource'))
49-
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
48+
await helper.updateUser(message.payload.userId, user)
5049
}
5150
} else if (_.includes(_.keys(organizationResources), resource)) {
5251
// process org resources such as org skill provider
5352
const orgResources = organizationResources[resource]
5453
orgResources.validate(message.payload)
55-
const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId)
54+
const org = await helper.getOrg(message.payload.organizationId)
5655
const relateId = message.payload[orgResources.relateKey]
5756
if (!org[orgResources.propertyName]) {
5857
org[orgResources.propertyName] = []
@@ -64,7 +63,7 @@ async function processCreate (message, transactionId) {
6463
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
6564
} else {
6665
org[orgResources.propertyName].push(_.omit(message.payload, 'resource'))
67-
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
66+
await helper.updateOrg(message.payload.organizationId, org)
6867
}
6968
} else {
7069
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`)
@@ -80,16 +79,14 @@ processCreate.schema = {
8079
payload: Joi.object().keys({
8180
resource: Joi.string().required()
8281
}).required().unknown(true)
83-
}).required(),
84-
transactionId: Joi.string().required()
82+
}).required()
8583
}
8684

8785
/**
8886
* Process update entity message
8987
* @param {Object} message the kafka message
90-
* @param {String} transactionId
9188
*/
92-
async function processUpdate (message, transactionId) {
89+
async function processUpdate (message) {
9390
const resource = message.payload.resource
9491
if (_.includes(_.keys(topResources), resource)) {
9592
logger.info(`Processing top level resource: ${resource}`)
@@ -98,16 +95,15 @@ async function processUpdate (message, transactionId) {
9895
const client = await helper.getESClient()
9996
const { index, type } = topResources[resource]
10097
const id = message.payload.id
101-
const source = await client.get({ index, type, id, transaction: true })
98+
const source = await client.getSource({ index, type, id })
10299
await client.update({
103100
index,
104101
type,
105102
id,
106103
body: {
107-
doc: _.assign(source._source, _.omit(message.payload, 'resource'))
104+
doc: _.assign(source, _.omit(message.payload, 'resource'))
108105
},
109-
if_seq_no: source._seq_no,
110-
if_primary_term: source._primary_term
106+
refresh: true
111107
})
112108
} else if (_.includes(_.keys(userResources), resource)) {
113109
// process user resources such as userSkill, userAttribute...
@@ -116,7 +112,10 @@ async function processUpdate (message, transactionId) {
116112
logger.info(`Processing user level resource: ${resource}:${relateId}`)
117113
userResource.validate(message.payload)
118114
logger.info(`Resource validated for ${relateId}`)
119-
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
115+
let user = await helper.getUser(message.payload.userId, false)
116+
const seqNo = user._seq_no
117+
const primaryTerm = user._primary_term
118+
user = user._source
120119
logger.info(`User fetched ${user.id} and ${relateId}`)
121120
// const relateId = message.payload[userResource.relateKey]
122121

@@ -128,15 +127,15 @@ async function processUpdate (message, transactionId) {
128127
const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
129128
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
130129
logger.info(`Updating ${user.id} and ${relateId}`)
131-
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
130+
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm)
132131
logger.info(`Updated ${user.id} and ${relateId}`)
133132
}
134133
} else if (_.includes(_.keys(organizationResources), resource)) {
135134
logger.info(`Processing org level resource: ${resource}`)
136135
// process org resources such as org skill providers
137136
const orgResource = organizationResources[resource]
138137
orgResource.validate(message.payload)
139-
const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId)
138+
const org = await helper.getOrg(message.payload.organizationId)
140139
const relateId = message.payload[orgResource.relateKey]
141140

142141
// check the resource exist
@@ -146,7 +145,7 @@ async function processUpdate (message, transactionId) {
146145
} else {
147146
const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId])
148147
org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
149-
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
148+
await helper.updateOrg(message.payload.organizationId, org)
150149
}
151150
} else {
152151
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`)
@@ -162,16 +161,14 @@ processUpdate.schema = {
162161
payload: Joi.object().keys({
163162
resource: Joi.string().required()
164163
}).required().unknown(true)
165-
}).required(),
166-
transactionId: Joi.string().required()
164+
}).required()
167165
}
168166

169167
/**
170168
* Process delete entity message
171169
* @param {Object} message the kafka message
172-
* @param {String} transactionId
173170
*/
174-
async function processDelete (message, transactionId) {
171+
async function processDelete (message) {
175172
const resource = message.payload.resource
176173
if (_.includes(_.keys(topResources), resource)) {
177174
// process the top resources such as user, skill...
@@ -187,7 +184,7 @@ async function processDelete (message, transactionId) {
187184
// process user resources such as userSkill, userAttribute...
188185
const userResource = userResources[resource]
189186
userResource.validate(message.payload)
190-
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
187+
const user = await helper.getUser(message.payload.userId)
191188
const relateId = message.payload[userResource.relateKey]
192189

193190
// check the resource exist
@@ -196,13 +193,13 @@ async function processDelete (message, transactionId) {
196193
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
197194
} else {
198195
_.remove(user[userResource.propertyName], [userResource.relateKey, relateId])
199-
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
196+
await helper.updateUser(message.payload.userId, user)
200197
}
201198
} else if (_.includes(_.keys(organizationResources), resource)) {
202199
// process user resources such as org skill provider
203200
const orgResource = organizationResources[resource]
204201
orgResource.validate(message.payload)
205-
const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId)
202+
const org = await helper.getOrg(message.payload.organizationId)
206203
const relateId = message.payload[orgResource.relateKey]
207204

208205
// check the resource exist
@@ -211,7 +208,7 @@ async function processDelete (message, transactionId) {
211208
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
212209
} else {
213210
_.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId])
214-
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
211+
await helper.updateOrg(message.payload.organizationId, org)
215212
}
216213
} else {
217214
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`)
@@ -227,8 +224,7 @@ processDelete.schema = {
227224
payload: Joi.object().keys({
228225
resource: Joi.string().required()
229226
}).required().unknown(true)
230-
}).required(),
231-
transactionId: Joi.string().required()
227+
}).required()
232228
}
233229

234230
module.exports = {

0 commit comments

Comments
 (0)