@@ -13,7 +13,6 @@ AWS.config.region = config.ES.AWS_REGION
13
13
14
14
// Elasticsearch client
15
15
let esClient
16
-
17
16
// Mutex to ensure that only one elasticsearch action is carried out at any given time
18
17
const esClientMutex = new Mutex ( )
19
18
@@ -56,18 +55,18 @@ async function getESClient () {
56
55
} )
57
56
}
58
57
59
- return esClient
60
- }
61
-
62
- /**
63
- * Wraps original get es client function
64
- * to control access to elasticsearch using a mutex
65
- */
66
- async function getESClientWrapper ( ) {
67
- const client = await getESClient ( )
68
- const release = await esClientMutex . acquire ( )
58
+ // Patch the transport to enable mutex
59
+ esClient . transport . originalRequest = esClient . transport . request
60
+ esClient . transport . request = async ( params ) => {
61
+ const release = await esClientMutex . acquire ( )
62
+ try {
63
+ return await esClient . transport . originalRequest ( params )
64
+ } finally {
65
+ release ( )
66
+ }
67
+ }
69
68
70
- return { client , release }
69
+ return esClient
71
70
}
72
71
73
72
/**
@@ -87,25 +86,17 @@ function validProperties (payload, keys) {
87
86
/**
88
87
* Function to get user from es
89
88
* @param {String } userId
90
- * @param {Boolean } isTransaction Is this part of a transaction?
89
+ * @param {Boolean } sourceOnly
91
90
* @returns {Object } user
92
91
*/
93
- async function getUser ( userId , isTransaction = false ) {
94
- const { client, release } = await getESClientWrapper ( )
95
-
96
- try {
97
- const user = await client . get ( { index : config . get ( 'ES.USER_INDEX' ) , type : config . get ( 'ES.USER_TYPE' ) , id : userId } )
98
-
99
- if ( isTransaction ) {
100
- return { seqNo : user . _seq_no , primaryTerm : user . _primary_term , user : user . _source , release }
101
- }
92
+ async function getUser ( userId , sourceOnly = true ) {
93
+ const client = await getESClient ( )
102
94
103
- return { seqNo : user . _seq_no , primaryTerm : user . _primary_term , user : user . _source }
104
- } finally {
105
- if ( ! isTransaction ) {
106
- release ( )
107
- }
95
+ if ( sourceOnly ) {
96
+ return client . getSource ( { index : config . get ( 'ES.USER_INDEX' ) , type : config . get ( 'ES.USER_TYPE' ) , id : userId } )
108
97
}
98
+
99
+ return client . get ( { index : config . get ( 'ES.USER_INDEX' ) , type : config . get ( 'ES.USER_TYPE' ) , id : userId } )
109
100
}
110
101
111
102
/**
@@ -114,92 +105,43 @@ async function getUser (userId, isTransaction = false) {
114
105
* @param {Number } seqNo
115
106
* @param {Number } primaryTerm
116
107
* @param {Object } body
117
- * @param {Boolean } isTransaction If this is part of a transaction, it will not attempt to release
118
108
*/
119
- async function updateUser ( userId , body , seqNo , primaryTerm , isTransaction = false ) {
120
- let client , release
121
-
122
- if ( isTransaction ) {
123
- client = await getESClient ( )
124
- } else {
125
- const esClient = await getESClientWrapper ( )
126
- client = esClient . client
127
- release = esClient . release
128
- }
129
-
130
- try {
131
- await client . update ( {
132
- index : config . get ( 'ES.USER_INDEX' ) ,
133
- type : config . get ( 'ES.USER_TYPE' ) ,
134
- id : userId ,
135
- body : { doc : body } ,
136
- if_seq_no : seqNo ,
137
- if_primary_term : primaryTerm
138
- } )
139
- } finally {
140
- if ( ! isTransaction ) {
141
- release ( )
142
- }
143
- }
109
+ async function updateUser ( userId , body , seqNo , primaryTerm ) {
110
+ const client = await getESClient ( )
111
+ await client . update ( {
112
+ index : config . get ( 'ES.USER_INDEX' ) ,
113
+ type : config . get ( 'ES.USER_TYPE' ) ,
114
+ id : userId ,
115
+ body : { doc : body } ,
116
+ if_seq_no : seqNo ,
117
+ if_primary_term : primaryTerm
118
+ } )
144
119
}
145
120
146
121
/**
147
122
* Function to get org from es
148
123
* @param {String } organizationId
149
- * @param {Boolean } isTransaction Is this part of a transaction?
150
124
* @returns {Object } organization
151
125
*/
152
- async function getOrg ( organizationId , isTransaction = false ) {
153
- const { client, release } = await getESClientWrapper ( )
154
-
155
- try {
156
- const org = await client . get ( { index : config . get ( 'ES.ORGANIZATION_INDEX' ) , type : config . get ( 'ES.ORGANIZATION_TYPE' ) , id : organizationId } )
157
-
158
- if ( isTransaction ) {
159
- return { seqNo : org . _seq_no , primaryTerm : org . _primary_term , org : org . _source , release }
160
- }
161
-
162
- return { seqNo : org . _seq_no , primaryTerm : org . _primary_term , org : org . _source }
163
- } finally {
164
- if ( ! isTransaction ) {
165
- release ( )
166
- }
167
- }
126
+ async function getOrg ( organizationId ) {
127
+ const client = await getESClient ( )
128
+ return client . getSource ( { index : config . get ( 'ES.ORGANIZATION_INDEX' ) , type : config . get ( 'ES.ORGANIZATION_TYPE' ) , id : organizationId } )
168
129
}
169
130
170
131
/**
171
132
* Function to update es organization
172
133
* @param {String } organizationId
173
134
* @param {Object } body
174
- * @param {Number } seqNo
175
- * @param {Number } primaryTerm
176
- * @param {Boolean } isTransaction If this is part of a transaction, it will not attempt to lock
177
135
*/
178
- async function updateOrg ( organizationId , body , seqNo , primaryTerm , isTransaction = false ) {
179
- let client , release
180
-
181
- if ( isTransaction ) {
182
- client = await getESClient ( )
183
- } else {
184
- const esClient = await getESClientWrapper ( )
185
- client = esClient . client
186
- release = esClient . release
187
- }
188
-
189
- try {
190
- await client . update ( {
191
- index : config . get ( 'ES.ORGANIZATION_INDEX' ) ,
192
- type : config . get ( 'ES.ORGANIZATION_TYPE' ) ,
193
- id : organizationId ,
194
- body : { doc : body } ,
195
- if_seq_no : seqNo ,
196
- if_primary_term : primaryTerm
197
- } )
198
- } finally {
199
- if ( ! isTransaction ) {
200
- release ( )
201
- }
202
- }
136
+ async function updateOrg ( organizationId , body ) {
137
+ const client = await getESClient ( )
138
+ await client . update ( {
139
+ index : config . get ( 'ES.ORGANIZATION_INDEX' ) ,
140
+ type : config . get ( 'ES.ORGANIZATION_TYPE' ) ,
141
+ id : organizationId ,
142
+ body : { doc : body } ,
143
+ refresh : 'true'
144
+ } )
203
145
}
204
146
205
147
/**
@@ -216,7 +158,7 @@ function getErrorWithStatus (message, statusCode) {
216
158
217
159
module . exports = {
218
160
getKafkaOptions,
219
- getESClientWrapper ,
161
+ getESClient ,
220
162
validProperties,
221
163
getUser,
222
164
updateUser,
0 commit comments