Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 48e701d

Browse files
Pass sequence and primary term during update to ensure that we are updating sequentially
1 parent 64bfee1 commit 48e701d

File tree

2 files changed

+28
-6
lines changed

2 files changed

+28
-6
lines changed

src/common/helper.js

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,16 @@ let esClient = {
6666
}
6767
},
6868

69+
async get () {
70+
const release = await this.mutex.acquire()
71+
72+
try {
73+
return this.client.get(...arguments)
74+
} finally {
75+
release()
76+
}
77+
},
78+
6979
indices: undefined
7080
}
7181

@@ -127,26 +137,35 @@ function validProperties (payload, keys) {
127137
/**
128138
* Function to get user from es
129139
* @param {String} userId
140+
* @param {Boolean} sourceOnly
130141
* @returns {Object} user
131142
*/
132-
async function getUser (userId) {
143+
async function getUser (userId, sourceOnly = true) {
133144
const client = await getESClient()
134-
return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
145+
146+
if (sourceOnly) {
147+
return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
148+
}
149+
150+
return client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
135151
}
136152

137153
/**
138154
* Function to update es user
139155
* @param {String} userId
156+
* @param {Number} seqNo
157+
* @param {Number} primaryTerm
140158
* @param {Object} body
141159
*/
142-
async function updateUser (userId, body) {
160+
async function updateUser (userId, body, seqNo, primaryTerm) {
143161
const client = await getESClient()
144162
await client.update({
145163
index: config.get('ES.USER_INDEX'),
146164
type: config.get('ES.USER_TYPE'),
147165
id: userId,
148166
body: { doc: body },
149-
refresh: 'true'
167+
if_seq_no: seqNo,
168+
if_primary_term: primaryTerm
150169
})
151170
}
152171

src/services/ProcessorService.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ async function processUpdate (message) {
112112
logger.info(`Processing user level resource: ${resource}:${relateId}`)
113113
userResource.validate(message.payload)
114114
logger.info(`Resource validated for ${relateId}`)
115-
const user = await helper.getUser(message.payload.userId)
115+
let user = await helper.getUser(message.payload.userId, false)
116+
const seqNo = user._seq_no
117+
const primaryTerm = user._primary_term
118+
user = user._source
116119
logger.info(`User fetched ${user.id} and ${relateId}`)
117120
// const relateId = message.payload[userResource.relateKey]
118121

@@ -124,7 +127,7 @@ async function processUpdate (message) {
124127
const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
125128
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
126129
logger.info(`Updating ${user.id} and ${relateId}`)
127-
await helper.updateUser(message.payload.userId, user)
130+
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm)
128131
logger.info(`Updated ${user.id} and ${relateId}`)
129132
}
130133
} else if (_.includes(_.keys(organizationResources), resource)) {

0 commit comments

Comments
 (0)