@@ -13,6 +13,7 @@ AWS.config.region = config.ES.AWS_REGION
13
13
14
14
// Elasticsearch client
15
15
let esClient
16
+
16
17
// Mutex to ensure that only one elasticsearch action is carried out at any given time
17
18
const esClientMutex = new Mutex ( )
18
19
@@ -55,20 +56,20 @@ async function getESClient () {
55
56
} )
56
57
}
57
58
58
- // Patch the transport to enable mutex
59
- esClient . transport . originalRequest = esClient . transport . request
60
- esClient . transport . request = async ( params ) => {
61
- const release = await esClientMutex . acquire ( )
62
- try {
63
- return await esClient . transport . originalRequest ( params )
64
- } finally {
65
- release ( )
66
- }
67
- }
68
-
69
59
return esClient
70
60
}
71
61
62
+ /**
63
+ * Wraps original get es client function
64
+ * to control access to elasticsearch using a mutex
65
+ */
66
+ async function getESClientWrapper ( ) {
67
+ const client = await getESClient ( )
68
+ const release = await esClientMutex . acquire ( )
69
+
70
+ return { client, release }
71
+ }
72
+
72
73
/**
73
74
* Function to valid require keys
74
75
* @param {Object } payload validated object
@@ -86,17 +87,25 @@ function validProperties (payload, keys) {
86
87
/**
87
88
* Function to get user from es
88
89
* @param {String } userId
89
- * @param {Boolean } sourceOnly
90
+ * @param {Boolean } isTransaction Is this part of a transaction?
90
91
* @returns {Object } user
91
92
*/
92
- async function getUser ( userId , sourceOnly = true ) {
93
- const client = await getESClient ( )
93
+ async function getUser ( userId , isTransaction = false ) {
94
+ const { client, release } = await getESClientWrapper ( )
94
95
95
- if ( sourceOnly ) {
96
- return client . getSource ( { index : config . get ( 'ES.USER_INDEX' ) , type : config . get ( 'ES.USER_TYPE' ) , id : userId } )
97
- }
96
+ try {
97
+ const user = await client . get ( { index : config . get ( 'ES.USER_INDEX' ) , type : config . get ( 'ES.USER_TYPE' ) , id : userId } )
98
+
99
+ if ( isTransaction ) {
100
+ return { seqNo : user . _seq_no , primaryTerm : user . _primary_term , user : user . _source , release }
101
+ }
98
102
99
- return client . get ( { index : config . get ( 'ES.USER_INDEX' ) , type : config . get ( 'ES.USER_TYPE' ) , id : userId } )
103
+ return { seqNo : user . _seq_no , primaryTerm : user . _primary_term , user : user . _source }
104
+ } finally {
105
+ if ( ! isTransaction ) {
106
+ release ( )
107
+ }
108
+ }
100
109
}
101
110
102
111
/**
@@ -105,43 +114,92 @@ async function getUser (userId, sourceOnly = true) {
105
114
* @param {Number } seqNo
106
115
* @param {Number } primaryTerm
107
116
* @param {Object } body
117
+ * @param {Boolean } isTransaction If this is part of a transaction, it will not attempt to release
108
118
*/
109
- async function updateUser ( userId , body , seqNo , primaryTerm ) {
110
- const client = await getESClient ( )
111
- await client . update ( {
112
- index : config . get ( 'ES.USER_INDEX' ) ,
113
- type : config . get ( 'ES.USER_TYPE' ) ,
114
- id : userId ,
115
- body : { doc : body } ,
116
- if_seq_no : seqNo ,
117
- if_primary_term : primaryTerm
118
- } )
119
+ async function updateUser ( userId , body , seqNo , primaryTerm , isTransaction = false ) {
120
+ let client , release
121
+
122
+ if ( isTransaction ) {
123
+ client = await getESClient ( )
124
+ } else {
125
+ const esClient = await getESClientWrapper ( )
126
+ client = esClient . client
127
+ release = esClient . release
128
+ }
129
+
130
+ try {
131
+ await client . update ( {
132
+ index : config . get ( 'ES.USER_INDEX' ) ,
133
+ type : config . get ( 'ES.USER_TYPE' ) ,
134
+ id : userId ,
135
+ body : { doc : body } ,
136
+ if_seq_no : seqNo ,
137
+ if_primary_term : primaryTerm
138
+ } )
139
+ } finally {
140
+ if ( ! isTransaction ) {
141
+ release ( )
142
+ }
143
+ }
119
144
}
120
145
121
146
/**
122
147
* Function to get org from es
123
148
* @param {String } organizationId
149
+ * @param {Boolean } isTransaction Is this part of a transaction?
124
150
* @returns {Object } organization
125
151
*/
126
- async function getOrg ( organizationId ) {
127
- const client = await getESClient ( )
128
- return client . getSource ( { index : config . get ( 'ES.ORGANIZATION_INDEX' ) , type : config . get ( 'ES.ORGANIZATION_TYPE' ) , id : organizationId } )
152
+ async function getOrg ( organizationId , isTransaction = false ) {
153
+ const { client, release } = await getESClientWrapper ( )
154
+
155
+ try {
156
+ const org = await client . get ( { index : config . get ( 'ES.ORGANIZATION_INDEX' ) , type : config . get ( 'ES.ORGANIZATION_TYPE' ) , id : organizationId } )
157
+
158
+ if ( isTransaction ) {
159
+ return { seqNo : org . _seq_no , primaryTerm : org . _primary_term , org : org . _source , release }
160
+ }
161
+
162
+ return { seqNo : org . _seq_no , primaryTerm : org . _primary_term , org : org . _source }
163
+ } finally {
164
+ if ( ! isTransaction ) {
165
+ release ( )
166
+ }
167
+ }
129
168
}
130
169
131
170
/**
132
171
* Function to update es organization
133
172
* @param {String } organizationId
134
173
* @param {Object } body
174
+ * @param {Number } seqNo
175
+ * @param {Number } primaryTerm
176
+ * @param {Boolean } isTransaction If this is part of a transaction, it will not attempt to lock
135
177
*/
136
- async function updateOrg ( organizationId , body ) {
137
- const client = await getESClient ( )
138
- await client . update ( {
139
- index : config . get ( 'ES.ORGANIZATION_INDEX' ) ,
140
- type : config . get ( 'ES.ORGANIZATION_TYPE' ) ,
141
- id : organizationId ,
142
- body : { doc : body } ,
143
- refresh : 'true'
144
- } )
178
+ async function updateOrg ( organizationId , body , seqNo , primaryTerm , isTransaction = false ) {
179
+ let client , release
180
+
181
+ if ( isTransaction ) {
182
+ client = await getESClient ( )
183
+ } else {
184
+ const esClient = await getESClientWrapper ( )
185
+ client = esClient . client
186
+ release = esClient . release
187
+ }
188
+
189
+ try {
190
+ await client . update ( {
191
+ index : config . get ( 'ES.ORGANIZATION_INDEX' ) ,
192
+ type : config . get ( 'ES.ORGANIZATION_TYPE' ) ,
193
+ id : organizationId ,
194
+ body : { doc : body } ,
195
+ if_seq_no : seqNo ,
196
+ if_primary_term : primaryTerm
197
+ } )
198
+ } finally {
199
+ if ( ! isTransaction ) {
200
+ release ( )
201
+ }
202
+ }
145
203
}
146
204
147
205
/**
@@ -158,7 +216,7 @@ function getErrorWithStatus (message, statusCode) {
158
216
159
217
module . exports = {
160
218
getKafkaOptions,
161
- getESClient ,
219
+ getESClientWrapper ,
162
220
validProperties,
163
221
getUser,
164
222
updateUser,
0 commit comments