Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 8ec4b57

Browse files
Merge pull request #17 from topcoder-platform/enrich-2
Correct enrich behaviour
2 parents affbf6e + dd1b563 commit 8ec4b57

File tree

6 files changed

+116
-54
lines changed

6 files changed

+116
-54
lines changed

README.md

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,16 @@ The following parameters can be set in config files or in env variables:
5252
- ES.USER_SKILL_PROPERTY_NAME: the user property name of skill, default value is 'skills'
5353
- ES.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME: the org property name of org skill providers, default value is 'skillProviders'
5454
- ES.USER_GROUP_PROPERTY_NAME: the user property name of group, default value is 'groups'
55-
- ENRICH_USER_PIPELINE_NAME: the user enrich pipeline name, default value is 'enrich_user'
55+
- ATTRIBUTE_GROUP_PIPELINE_ID: The pipeline id for enrichment with attribute group. Default is `attributegroup-pipeline`
56+
- SKILL_PROVIDER_PIPELINE_ID: The pipeline id for enrichment with skill provider. Default is `skillprovider-pipeline`
57+
- USER_PIPELINE_ID: The pipeline id for enrichment of user details. Default is `user-pipeline`
58+
- ATTRIBUTE_GROUP_ENRICH_POLICYNAME: The enrich policy for attribute group. Default is `attributegroup-policy`
59+
- SKILL_PROVIDER_ENRICH_POLICYNAME: The enrich policy for skill provider. Default is `skillprovider-policy`
60+
- ROLE_ENRICH_POLICYNAME: The enrich policy for role. Default is `role-policy`
61+
- ACHIEVEMENT_PROVIDER_ENRICH_POLICYNAME: The enrich policy for achievement provider. Default is `achievementprovider-policy`
62+
- SKILL_ENRICH_POLICYNAME: The enrich policy for skill. Default is `skill-policy`
63+
- ATTRIBUTE_ENRICH_POLICYNAME: The enrich policy for skill. Default is `attribute-policy`
64+
5665

5766
There is a `/health` endpoint that checks for the health of the app. This sets up an expressjs server and listens on the environment variable `PORT`. It's not part of the configuration file and needs to be passed as an environment variable
5867

@@ -103,15 +112,10 @@ Configuration for the tests is at `config/test.js`, only add such new configurat
103112
```
104113

105114
4. Initialize Elasticsearch index
115+
For this, refer to the [Ubahn API](https://github.com/topcoder-platform/u-bahn-api) repository. In this repository, you need to execute the following script (after following their deployment guide):
106116

107117
```bash
108-
npm run init-es
109-
```
110-
111-
To delete and re-create the index:
112-
113-
```bash
114-
npm run init-es force
118+
npm run insert-data
115119
```
116120

117121
5. Start the processor and health check dropin
@@ -148,3 +152,8 @@ To run the UBahn ES Processor using docker, follow the below steps
148152
## Verification
149153

150154
see [VERIFICATION.md](VERIFICATION.md)
155+
156+
## TODO
157+
158+
- Correct the tests
159+
- Update API codebase config and README for enrich policy env vars

config/default.js

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,33 @@ module.exports = {
5151

5252
ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders',
5353

54-
ENRICH_USER_PIPELINE_NAME: process.env.ENRICH_USER_PIPELINE_NAME || 'enrich_user'
54+
ENRICHMENT: {
55+
attributegroup: {
56+
enrichPolicyName: process.env.ATTRIBUTE_GROUP_ENRICH_POLICYNAME || 'attributegroup-policy',
57+
pipelineId: process.env.ATTRIBUTE_GROUP_PIPELINE_ID || 'attributegroup-pipeline'
58+
},
59+
skillprovider: {
60+
enrichPolicyName: process.env.SKILL_PROVIDER_ENRICH_POLICYNAME || 'skillprovider-policy',
61+
pipelineId: process.env.SKILL_PROVIDER_PIPELINE_ID || 'skillprovider-pipeline'
62+
},
63+
user: {
64+
pipelineId: process.env.USER_PIPELINE_ID || 'user-pipeline'
65+
},
66+
role: {
67+
enrichPolicyName: process.env.ROLE_ENRICH_POLICYNAME || 'role-policy'
68+
},
69+
achievementprovider: {
70+
enrichPolicyName: process.env.ACHIEVEMENT_PROVIDER_ENRICH_POLICYNAME || 'achievementprovider-policy'
71+
},
72+
skill: {
73+
enrichPolicyName: process.env.SKILL_ENRICH_POLICYNAME || 'skill-policy'
74+
},
75+
attribute: {
76+
enrichPolicyName: process.env.ATTRIBUTE_ENRICH_POLICYNAME || 'attribute-policy'
77+
},
78+
organization: {
79+
enrichPolicyName: process.env.ORGANIZATION_ENRICH_POLICYNAME || 'organization-policy'
80+
}
81+
}
5582
}
5683
}

docker-kafka-es/docker-compose.yml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ services:
1414
KAFKA_ADVERTISED_HOST_NAME: localhost
1515
KAFKA_CREATE_TOPICS: "u-bahn.action.aggregate:1:1,groups.notification.member.add:1:1,groups.notification.member.delete:1:1"
1616
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
17-
esearch:
18-
image: elasticsearch:7.7.1
19-
container_name: ubahn-data-processor-es_es
20-
ports:
21-
- "9200:9200"
22-
environment:
23-
- discovery.type=single-node
17+
# esearch:
18+
# image: elasticsearch:7.7.1
19+
# container_name: ubahn-data-processor-es_es
20+
# ports:
21+
# - "9200:9200"
22+
# environment:
23+
# - discovery.type=single-node

src/common/constants.js

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,78 +8,95 @@ const { validProperties } = require('./helper')
88
const topResources = {
99
achievementprovider: {
1010
index: config.get('ES.ACHIEVEMENT_PROVIDER_INDEX'),
11-
mappingFields: ['id', 'achievementsProviderId', 'name', 'uri', 'certifierId', 'certifiedDate', 'created', 'updated', 'createdBy', 'updatedBy'],
12-
enrichPolicy: 'achievementprovider-policy',
13-
type: config.get('ES.ACHIEVEMENT_PROVIDER_TYPE')
11+
type: config.get('ES.ACHIEVEMENT_PROVIDER_TYPE'),
12+
enrich: {
13+
policyName: config.get('ES.ENRICHMENT.achievementprovider.enrichPolicyName')
14+
}
1415
},
1516
attribute: {
1617
index: config.get('ES.ATTRIBUTE_INDEX'),
17-
enrichPolicy: 'attribute-policy',
18-
mappingFields: ['id', 'name', 'attributeGroupId', 'created', 'updated', 'createdBy', 'updatedBy'],
19-
type: config.get('ES.ATTRIBUTE_TYPE')
18+
type: config.get('ES.ATTRIBUTE_TYPE'),
19+
enrich: {
20+
policyName: config.get('ES.ENRICHMENT.attribute.enrichPolicyName')
21+
},
22+
ingest: {
23+
pipeline: {
24+
id: config.get('ES.ENRICHMENT.attributegroup.pipelineId')
25+
}
26+
}
2027
},
2128
attributegroup: {
2229
index: config.get('ES.ATTRIBUTE_GROUP_INDEX'),
23-
type: config.get('ES.ATTRIBUTE_GROUP_TYPE')
30+
type: config.get('ES.ATTRIBUTE_GROUP_TYPE'),
31+
enrich: {
32+
policyName: config.get('ES.ENRICHMENT.attributegroup.enrichPolicyName')
33+
}
2434
},
25-
2635
organization: {
2736
index: config.get('ES.ORGANIZATION_INDEX'),
28-
enrichPolicy: 'organization-policy',
29-
mappingFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy'],
3037
type: config.get('ES.ORGANIZATION_TYPE')
3138
},
3239
role: {
3340
index: config.get('ES.ROLE_INDEX'),
34-
enrichPolicy: 'role-policy',
35-
mappingFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy'],
36-
type: config.get('ES.ROLE_TYPE')
41+
type: config.get('ES.ROLE_TYPE'),
42+
enrich: {
43+
policyName: config.get('ES.ENRICHMENT.role.enrichPolicyName')
44+
}
3745
},
3846
skill: {
3947
index: config.get('ES.SKILL_INDEX'),
40-
enrichPolicy: 'skill-policy',
41-
mappingFields: ['id', 'skillProviderId', 'name', 'externalId', 'uri', 'created', 'updated', 'createdBy', 'updatedBy'],
42-
type: config.get('ES.SKILL_TYPE')
48+
type: config.get('ES.SKILL_TYPE'),
49+
enrich: {
50+
policyName: config.get('ES.ENRICHMENT.skill.enrichPolicyName')
51+
},
52+
ingest: {
53+
pipeline: {
54+
id: config.get('ES.ENRICHMENT.skillprovider.pipelineId')
55+
}
56+
}
4357
},
4458
skillprovider: {
4559
index: config.get('ES.SKILL_PROVIDER_INDEX'),
46-
type: config.get('ES.SKILL_PROVIDER_TYPE')
60+
type: config.get('ES.SKILL_PROVIDER_TYPE'),
61+
enrich: {
62+
policyName: config.get('ES.ENRICHMENT.skillprovider.enrichPolicyName')
63+
}
4764
},
4865
user: {
4966
index: config.get('ES.USER_INDEX'),
50-
type: config.get('ES.USER_TYPE')
67+
type: config.get('ES.USER_TYPE'),
68+
ingest: {
69+
pipeline: {
70+
id: config.get('ES.ENRICHMENT.user.pipelineId')
71+
}
72+
}
5173
}
5274
}
5375

5476
const userResources = {
5577
achievement: {
5678
propertyName: config.get('ES.USER_ACHIEVEMENT_PROPERTY_NAME'),
57-
relateTopResource: 'achievementprovider',
5879
relateKey: 'achievementsProviderId',
5980
validate: payload => validProperties(payload, ['userId', 'achievementsProviderId'])
6081
},
6182
externalprofile: {
6283
propertyName: config.get('ES.USER_EXTERNALPROFILE_PROPERTY_NAME'),
63-
relateTopResource: 'organization',
6484
relateKey: 'organizationId',
6585
validate: payload => validProperties(payload, ['userId', 'organizationId'])
6686
},
6787
userattribute: {
6888
propertyName: config.get('ES.USER_ATTRIBUTE_PROPERTY_NAME'),
69-
relateTopResource: 'attribute',
7089
relateKey: 'attributeId',
7190
validate: payload => validProperties(payload, ['userId', 'attributeId']),
7291
isNested: true // For ES index creation
7392
},
7493
userrole: {
7594
propertyName: config.get('ES.USER_ROLE_PROPERTY_NAME'),
76-
relateTopResource: 'role',
7795
relateKey: 'roleId',
7896
validate: payload => validProperties(payload, ['userId', 'roleId'])
7997
},
8098
userskill: {
8199
propertyName: config.get('ES.USER_SKILL_PROPERTY_NAME'),
82-
relateTopResource: 'skill',
83100
relateKey: 'skillId',
84101
validate: payload => validProperties(payload, ['userId', 'skillId'])
85102
}

src/common/helper.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
126126
body,
127127
if_seq_no: seqNo,
128128
if_primary_term: primaryTerm,
129-
pipeline: config.get('ES.ENRICH_USER_PIPELINE_NAME'),
129+
pipeline: config.get('ES.ENRICHMENT.user.pipelineId'),
130130
refresh: 'wait_for'
131131
})
132132
}
@@ -153,16 +153,17 @@ async function getOrg (organizationId, transactionId) {
153153
*/
154154
async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) {
155155
const client = await getESClient()
156-
await client.update({
156+
await client.index({
157157
index: config.get('ES.ORGANIZATION_INDEX'),
158158
type: config.get('ES.ORGANIZATION_TYPE'),
159159
id: organizationId,
160160
transactionId,
161-
body: { doc: body },
161+
body,
162162
if_seq_no: seqNo,
163163
if_primary_term: primaryTerm,
164164
refresh: 'wait_for'
165165
})
166+
await client.enrich.executePolicy({ name: config.get('ES.ENRICHMENT.organization.enrichPolicyName') })
166167
}
167168

168169
/**

src/services/ProcessorService.js

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,20 @@ async function processCreate (message, transactionId) {
2424
// process the top resources such as user, skill...
2525
helper.validProperties(message.payload, ['id'])
2626
const client = await helper.getESClient()
27-
await client.create({
27+
await client.index({
2828
index: topResources[resource].index,
2929
type: topResources[resource].type,
3030
id: message.payload.id,
3131
transactionId,
3232
body: _.omit(message.payload, ['resource', 'originalTopic']),
33+
pipeline: topResources[resource].ingest ? topResources[resource].ingest.pipeline.id : undefined,
3334
refresh: 'wait_for'
3435
})
35-
if (topResources[resource].enrichPolicy) {
36-
await client.enrich.executePolicy({ name: topResources[resource].enrichPolicy })
36+
if (topResources[resource].enrich) {
37+
await client.enrich.executePolicy({
38+
name: topResources[resource].enrich.policyName,
39+
transactionId
40+
})
3741
}
3842
} else if (_.includes(_.keys(userResources), resource)) {
3943
// process user resources such as userSkill, userAttribute...
@@ -105,20 +109,22 @@ async function processUpdate (message, transactionId) {
105109
const { index, type } = topResources[resource]
106110
const id = message.payload.id
107111
const { body: source } = await client.get({ index, type, id, transactionId })
108-
await client.update({
112+
await client.index({
109113
index,
110114
type,
111115
id,
112116
transactionId,
113-
body: {
114-
doc: _.assign(source._source, _.omit(message.payload, ['resource', 'originalTopic']))
115-
},
117+
body: _.assign(source._source, _.omit(message.payload, ['resource', 'originalTopic'])),
118+
pipeline: topResources[resource].ingest ? topResources[resource].ingest.pipeline.id : undefined,
116119
if_seq_no: source._seq_no,
117120
if_primary_term: source._primary_term,
118121
refresh: 'wait_for'
119122
})
120-
if (topResources[resource].enrichPolicy) {
121-
await client.enrich.executePolicy({ name: topResources[resource].enrichPolicy })
123+
if (topResources[resource].enrich) {
124+
await client.enrich.executePolicy({
125+
name: topResources[resource].enrich.policyName,
126+
transactionId
127+
})
122128
}
123129
} else if (_.includes(_.keys(userResources), resource)) {
124130
// process user resources such as userSkill, userAttribute...
@@ -129,7 +135,6 @@ async function processUpdate (message, transactionId) {
129135
logger.info(`Resource validated for ${relateId}`)
130136
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
131137
logger.info(`User fetched ${user.id} and ${relateId}`)
132-
// const relateId = message.payload[userResource.relateKey]
133138

134139
// check the resource exist
135140
if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
@@ -196,8 +201,11 @@ async function processDelete (message, transactionId) {
196201
transactionId,
197202
refresh: 'wait_for'
198203
})
199-
if (topResources[resource].enrichPolicy) {
200-
await client.enrich.executePolicy({ name: topResources[resource].enrichPolicy })
204+
if (topResources[resource].enrich) {
205+
await client.enrich.executePolicy({
206+
name: topResources[resource].enrich.policyName,
207+
transactionId
208+
})
201209
}
202210
} else if (_.includes(_.keys(userResources), resource)) {
203211
// process user resources such as userSkill, userAttribute...

0 commit comments

Comments
 (0)