Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit b004459

Browse files
Revert "Undo changes carried out for the issue of ES not updating all users and instead add mutex at top level"
This reverts commit 8942530.
1 parent 835878c commit b004459

File tree

3 files changed

+130
-40
lines changed

3 files changed

+130
-40
lines changed

src/app.js

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
global.Promise = require('bluebird')
66
const config = require('config')
77
const Kafka = require('no-kafka')
8+
const _ = require('lodash')
89
const healthcheck = require('topcoder-healthcheck-dropin')
910
const logger = require('./common/logger')
1011
const helper = require('./common/helper')
@@ -16,62 +17,79 @@ logger.info('Starting kafka consumer')
1617
// create consumer
1718
const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
1819

20+
let count = 0
1921
let mutex = new Mutex()
2022

23+
async function getLatestCount () {
24+
const release = await mutex.acquire()
25+
26+
try {
27+
count = count + 1
28+
29+
return count
30+
} finally {
31+
release()
32+
}
33+
}
34+
2135
/*
2236
* Data handler linked with Kafka consumer
2337
* Whenever a new message is received by Kafka consumer,
2438
* this function will be invoked
2539
*/
2640
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, async (m) => {
27-
const release = await mutex.acquire()
2841
const message = m.message.value.toString('utf8')
2942
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
3043
m.offset}; Message: ${message}.`)
3144
let messageJSON
45+
let messageCount = await getLatestCount()
3246

47+
logger.debug(`Current message count: ${messageCount}`)
3348
try {
3449
messageJSON = JSON.parse(message)
3550
} catch (e) {
3651
logger.error('Invalid message JSON.')
3752
logger.logFullError(e)
3853

54+
logger.debug(`Commiting offset after processing message with count ${messageCount}`)
55+
3956
// commit the message and ignore it
4057
await consumer.commitOffset({ topic, partition, offset: m.offset })
4158
return
42-
} finally {
43-
release()
4459
}
4560

4661
if (messageJSON.topic !== topic) {
4762
logger.error(`The message topic ${messageJSON.topic} doesn't match the Kafka topic ${topic}.`)
4863

64+
logger.debug(`Commiting offset after processing message with count ${messageCount}`)
65+
4966
// commit the message and ignore it
5067
await consumer.commitOffset({ topic, partition, offset: m.offset })
51-
release()
5268
return
5369
}
54-
70+
const transactionId = _.uniqueId('transaction_')
5571
try {
5672
switch (topic) {
5773
case config.UBAHN_CREATE_TOPIC:
58-
await ProcessorService.processCreate(messageJSON)
74+
await ProcessorService.processCreate(messageJSON, transactionId)
5975
break
6076
case config.UBAHN_UPDATE_TOPIC:
61-
await ProcessorService.processUpdate(messageJSON)
77+
await ProcessorService.processUpdate(messageJSON, transactionId)
6278
break
6379
case config.UBAHN_DELETE_TOPIC:
64-
await ProcessorService.processDelete(messageJSON)
80+
await ProcessorService.processDelete(messageJSON, transactionId)
6581
break
6682
}
6783

68-
logger.debug('Successfully processed message')
84+
logger.debug(`Successfully processed message with count ${messageCount}`)
6985
} catch (err) {
7086
logger.logFullError(err)
7187
} finally {
88+
helper.checkEsMutexRelease(transactionId)
89+
logger.debug(`Commiting offset after processing message with count ${messageCount}`)
90+
7291
// Commit offset regardless of error
7392
await consumer.commitOffset({ topic, partition, offset: m.offset })
74-
release()
7593
}
7694
})
7795

src/common/helper.js

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,17 @@ const config = require('config')
77
const elasticsearch = require('elasticsearch')
88
const _ = require('lodash')
99
const Joi = require('@hapi/joi')
10+
const { Mutex } = require('async-mutex')
11+
const logger = require('./logger')
1012

1113
AWS.config.region = config.ES.AWS_REGION
1214

1315
// Elasticsearch client
1416
let esClient
17+
let transactionId
18+
// Mutex to ensure that only one elasticsearch action is carried out at any given time
19+
const esClientMutex = new Mutex()
20+
const mutexReleaseMap = {}
1521

1622
/**
1723
* Get Kafka options
@@ -51,6 +57,31 @@ async function getESClient () {
5157
host
5258
})
5359
}
60+
61+
// Patch the transport to enable mutex
62+
esClient.transport.originalRequest = esClient.transport.request
63+
esClient.transport.request = async (params) => {
64+
const tId = _.get(params.query, 'transactionId')
65+
params.query = _.omit(params.query, 'transactionId')
66+
if (!tId || tId !== transactionId) {
67+
const release = await esClientMutex.acquire()
68+
mutexReleaseMap[tId || 'noTransaction'] = release
69+
transactionId = tId
70+
}
71+
try {
72+
return await esClient.transport.originalRequest(params)
73+
} finally {
74+
if (params.method !== 'GET' || !tId) {
75+
const release = mutexReleaseMap[tId || 'noTransaction']
76+
delete mutexReleaseMap[tId || 'noTransaction']
77+
transactionId = undefined
78+
if (release) {
79+
release()
80+
}
81+
}
82+
}
83+
}
84+
5485
return esClient
5586
}
5687

@@ -71,50 +102,66 @@ function validProperties (payload, keys) {
71102
/**
72103
* Function to get user from es
73104
* @param {String} userId
105+
* @param {String} transactionId
74106
* @returns {Object} user
75107
*/
76-
async function getUser (userId) {
108+
async function getUser (userId, transactionId) {
77109
const client = await getESClient()
78-
return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
110+
const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, transactionId })
111+
return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }
79112
}
80113

81114
/**
82115
* Function to update es user
83116
* @param {String} userId
117+
* @param {Number} seqNo
118+
* @param {Number} primaryTerm
119+
* @param {String} transactionId
84120
* @param {Object} body
85121
*/
86-
async function updateUser (userId, body) {
122+
async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
87123
const client = await getESClient()
88124
await client.update({
89125
index: config.get('ES.USER_INDEX'),
90126
type: config.get('ES.USER_TYPE'),
91127
id: userId,
92-
body: { doc: body }
128+
transactionId,
129+
body: { doc: body },
130+
if_seq_no: seqNo,
131+
if_primary_term: primaryTerm
93132
})
94133
}
95134

96135
/**
97136
* Function to get org from es
98137
* @param {String} organizationId
138+
* @param {String} transactionId
99139
* @returns {Object} organization
100140
*/
101-
async function getOrg (organizationId) {
141+
async function getOrg (organizationId, transactionId) {
102142
const client = await getESClient()
103-
return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId })
143+
const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, transactionId })
144+
return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
104145
}
105146

106147
/**
107148
* Function to update es organization
108149
* @param {String} organizationId
150+
* @param {Number} seqNo
151+
* @param {Number} primaryTerm
152+
* @param {String} transactionId
109153
* @param {Object} body
110154
*/
111-
async function updateOrg (organizationId, body) {
155+
async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) {
112156
const client = await getESClient()
113157
await client.update({
114158
index: config.get('ES.ORGANIZATION_INDEX'),
115159
type: config.get('ES.ORGANIZATION_TYPE'),
116160
id: organizationId,
117-
body: { doc: body }
161+
transactionId,
162+
body: { doc: body },
163+
if_seq_no: seqNo,
164+
if_primary_term: primaryTerm
118165
})
119166
}
120167

@@ -130,6 +177,21 @@ function getErrorWithStatus (message, statusCode) {
130177
return error
131178
}
132179

180+
/**
181+
* Ensure the esClient mutex is released
182+
* @param {String} tId transactionId
183+
*/
184+
function checkEsMutexRelease (tId) {
185+
if (tId === transactionId) {
186+
const release = mutexReleaseMap[tId]
187+
delete mutexReleaseMap[tId]
188+
transactionId = undefined
189+
if (release) {
190+
release()
191+
}
192+
}
193+
}
194+
133195
module.exports = {
134196
getKafkaOptions,
135197
getESClient,
@@ -138,5 +200,6 @@ module.exports = {
138200
updateUser,
139201
getOrg,
140202
updateOrg,
141-
getErrorWithStatus
203+
getErrorWithStatus,
204+
checkEsMutexRelease
142205
}

0 commit comments

Comments
 (0)