Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Correct enrich behaviour #17

Merged
merged 2 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Correct enrich behaviour
  • Loading branch information
callmekatootie committed Oct 21, 2020
commit 7e0c04fef1138c84e489be6e2befa0dd7f8b06d4
25 changes: 17 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,16 @@ The following parameters can be set in config files or in env variables:
- ES.USER_SKILL_PROPERTY_NAME: the user property name of skill, default value is 'skills'
- ES.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME: the org property name of org skill providers, default value is 'skillProviders'
- ES.USER_GROUP_PROPERTY_NAME: the user property name of group, default value is 'groups'
- ENRICH_USER_PIPELINE_NAME: the user enrich pipeline name, default value is 'enrich_user'
- ATTRIBUTE_GROUP_PIPELINE_ID: The pipeline id for enrichment with attribute group. Default is `attributegroup-pipeline`
- SKILL_PROVIDER_PIPELINE_ID: The pipeline id for enrichment with skill provider. Default is `skillprovider-pipeline`
- USER_PIPELINE_ID: The pipeline id for enrichment of user details. Default is `user-pipeline`
- ATTRIBUTE_GROUP_ENRICH_POLICYNAME: The enrich policy for attribute group. Default is `attributegroup-policy`
- SKILL_PROVIDER_ENRICH_POLICYNAME: The enrich policy for skill provider. Default is `skillprovider-policy`
- ROLE_ENRICH_POLICYNAME: The enrich policy for role. Default is `role-policy`
- ACHIEVEMENT_PROVIDER_ENRICH_POLICYNAME: The enrich policy for achievement provider. Default is `achievementprovider-policy`
- SKILL_ENRICH_POLICYNAME: The enrich policy for skill. Default is `skill-policy`
- ATTRIBUTE_ENRICH_POLICYNAME: The enrich policy for skill. Default is `attribute-policy`


There is a `/health` endpoint that checks for the health of the app. This sets up an expressjs server and listens on the environment variable `PORT`. It's not part of the configuration file and needs to be passed as an environment variable

Expand Down Expand Up @@ -103,15 +112,10 @@ Configuration for the tests is at `config/test.js`, only add such new configurat
```

4. Initialize Elasticsearch index
For this, refer to the [Ubahn API](https://github.com/topcoder-platform/u-bahn-api) repository. In this repository, you need to execute the following script (after following their deployment guide):

```bash
npm run init-es
```

To delete and re-create the index:

```bash
npm run init-es force
npm run insert-data
```

5. Start the processor and health check dropin
Expand Down Expand Up @@ -148,3 +152,8 @@ To run the UBahn ES Processor using docker, follow the below steps
## Verification

see [VERIFICATION.md](VERIFICATION.md)

## TODO

- Correct the tests
- Update API codebase config and README for enrich policy env vars
29 changes: 28 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,33 @@ module.exports = {

ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders',

ENRICH_USER_PIPELINE_NAME: process.env.ENRICH_USER_PIPELINE_NAME || 'enrich_user'
ENRICHMENT: {
attributegroup: {
enrichPolicyName: process.env.ATTRIBUTE_GROUP_ENRICH_POLICYNAME || 'attributegroup-policy',
pipelineId: process.env.ATTRIBUTE_GROUP_PIPELINE_ID || 'attributegroup-pipeline'
},
skillprovider: {
enrichPolicyName: process.env.SKILL_PROVIDER_ENRICH_POLICYNAME || 'skillprovider-policy',
pipelineId: process.env.SKILL_PROVIDER_PIPELINE_ID || 'skillprovider-pipeline'
},
user: {
pipelineId: process.env.USER_PIPELINE_ID || 'user-pipeline'
},
role: {
enrichPolicyName: process.env.ROLE_ENRICH_POLICYNAME || 'role-policy'
},
achievementprovider: {
enrichPolicyName: process.env.ACHIEVEMENT_PROVIDER_ENRICH_POLICYNAME || 'achievementprovider-policy'
},
skill: {
enrichPolicyName: process.env.SKILL_ENRICH_POLICYNAME || 'skill-policy'
},
attribute: {
enrichPolicyName: process.env.ATTRIBUTE_ENRICH_POLICYNAME || 'attribute-policy'
},
organization: {
enrichPolicyName: process.env.ORGANIZATION_ENRICH_POLICYNAME || 'organization-policy'
}
}
}
}
14 changes: 7 additions & 7 deletions docker-kafka-es/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ services:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "u-bahn.action.aggregate:1:1,groups.notification.member.add:1:1,groups.notification.member.delete:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
esearch:
image: elasticsearch:7.7.1
container_name: ubahn-data-processor-es_es
ports:
- "9200:9200"
environment:
- discovery.type=single-node
# esearch:
# image: elasticsearch:7.7.1
# container_name: ubahn-data-processor-es_es
# ports:
# - "9200:9200"
# environment:
# - discovery.type=single-node
63 changes: 40 additions & 23 deletions src/common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,78 +8,95 @@ const { validProperties } = require('./helper')
const topResources = {
achievementprovider: {
index: config.get('ES.ACHIEVEMENT_PROVIDER_INDEX'),
mappingFields: ['id', 'achievementsProviderId', 'name', 'uri', 'certifierId', 'certifiedDate', 'created', 'updated', 'createdBy', 'updatedBy'],
enrichPolicy: 'achievementprovider-policy',
type: config.get('ES.ACHIEVEMENT_PROVIDER_TYPE')
type: config.get('ES.ACHIEVEMENT_PROVIDER_TYPE'),
enrich: {
policyName: config.get('ES.ENRICHMENT.achievementprovider.enrichPolicyName')
}
},
attribute: {
index: config.get('ES.ATTRIBUTE_INDEX'),
enrichPolicy: 'attribute-policy',
mappingFields: ['id', 'name', 'attributeGroupId', 'created', 'updated', 'createdBy', 'updatedBy'],
type: config.get('ES.ATTRIBUTE_TYPE')
type: config.get('ES.ATTRIBUTE_TYPE'),
enrich: {
policyName: config.get('ES.ENRICHMENT.attribute.enrichPolicyName')
},
ingest: {
pipeline: {
id: config.get('ES.ENRICHMENT.attributegroup.pipelineId')
}
}
},
attributegroup: {
index: config.get('ES.ATTRIBUTE_GROUP_INDEX'),
type: config.get('ES.ATTRIBUTE_GROUP_TYPE')
type: config.get('ES.ATTRIBUTE_GROUP_TYPE'),
enrich: {
policyName: config.get('ES.ENRICHMENT.attributegroup.enrichPolicyName')
}
},

organization: {
index: config.get('ES.ORGANIZATION_INDEX'),
enrichPolicy: 'organization-policy',
mappingFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy'],
type: config.get('ES.ORGANIZATION_TYPE')
},
role: {
index: config.get('ES.ROLE_INDEX'),
enrichPolicy: 'role-policy',
mappingFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy'],
type: config.get('ES.ROLE_TYPE')
type: config.get('ES.ROLE_TYPE'),
enrich: {
policyName: config.get('ES.ENRICHMENT.role.enrichPolicyName')
}
},
skill: {
index: config.get('ES.SKILL_INDEX'),
enrichPolicy: 'skill-policy',
mappingFields: ['id', 'skillProviderId', 'name', 'externalId', 'uri', 'created', 'updated', 'createdBy', 'updatedBy'],
type: config.get('ES.SKILL_TYPE')
type: config.get('ES.SKILL_TYPE'),
enrich: {
policyName: config.get('ES.ENRICHMENT.skill.enrichPolicyName')
},
ingest: {
pipeline: {
id: config.get('ES.ENRICHMENT.skillprovider.pipelineId')
}
}
},
skillprovider: {
index: config.get('ES.SKILL_PROVIDER_INDEX'),
type: config.get('ES.SKILL_PROVIDER_TYPE')
type: config.get('ES.SKILL_PROVIDER_TYPE'),
enrich: {
policyName: config.get('ES.ENRICHMENT.skillprovider.enrichPolicyName')
}
},
user: {
index: config.get('ES.USER_INDEX'),
type: config.get('ES.USER_TYPE')
type: config.get('ES.USER_TYPE'),
ingest: {
pipeline: {
id: config.get('ES.ENRICHMENT.user.pipelineId')
}
}
}
}

const userResources = {
achievement: {
propertyName: config.get('ES.USER_ACHIEVEMENT_PROPERTY_NAME'),
relateTopResource: 'achievementprovider',
relateKey: 'achievementsProviderId',
validate: payload => validProperties(payload, ['userId', 'achievementsProviderId'])
},
externalprofile: {
propertyName: config.get('ES.USER_EXTERNALPROFILE_PROPERTY_NAME'),
relateTopResource: 'organization',
relateKey: 'organizationId',
validate: payload => validProperties(payload, ['userId', 'organizationId'])
},
userattribute: {
propertyName: config.get('ES.USER_ATTRIBUTE_PROPERTY_NAME'),
relateTopResource: 'attribute',
relateKey: 'attributeId',
validate: payload => validProperties(payload, ['userId', 'attributeId']),
isNested: true // For ES index creation
},
userrole: {
propertyName: config.get('ES.USER_ROLE_PROPERTY_NAME'),
relateTopResource: 'role',
relateKey: 'roleId',
validate: payload => validProperties(payload, ['userId', 'roleId'])
},
userskill: {
propertyName: config.get('ES.USER_SKILL_PROPERTY_NAME'),
relateTopResource: 'skill',
relateKey: 'skillId',
validate: payload => validProperties(payload, ['userId', 'skillId'])
}
Expand Down
7 changes: 4 additions & 3 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
body,
if_seq_no: seqNo,
if_primary_term: primaryTerm,
pipeline: config.get('ES.ENRICH_USER_PIPELINE_NAME'),
pipeline: config.get('ES.ENRICHMENT.user.pipelineId'),
refresh: 'wait_for'
})
}
Expand All @@ -153,16 +153,17 @@ async function getOrg (organizationId, transactionId) {
*/
async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) {
const client = await getESClient()
await client.update({
await client.index({
index: config.get('ES.ORGANIZATION_INDEX'),
type: config.get('ES.ORGANIZATION_TYPE'),
id: organizationId,
transactionId,
body: { doc: body },
body,
if_seq_no: seqNo,
if_primary_term: primaryTerm,
refresh: 'wait_for'
})
await client.enrich.executePolicy({ name: config.get('ES.ENRICHMENT.organization.enrichPolicyName') })
}

/**
Expand Down
22 changes: 9 additions & 13 deletions src/services/ProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ async function processCreate (message, transactionId) {
// process the top resources such as user, skill...
helper.validProperties(message.payload, ['id'])
const client = await helper.getESClient()
await client.create({
await client.index({
index: topResources[resource].index,
type: topResources[resource].type,
id: message.payload.id,
transactionId,
body: _.omit(message.payload, ['resource', 'originalTopic']),
pipeline: topResources[resource].ingest ? topResources[resource].ingest.pipeline.id : undefined,
refresh: 'wait_for'
})
if (topResources[resource].enrichPolicy) {
await client.enrich.executePolicy({ name: topResources[resource].enrichPolicy })
if (topResources[resource].enrich) {
await client.enrich.executePolicy({ name: topResources[resource].enrich.policyName })
}
} else if (_.includes(_.keys(userResources), resource)) {
// process user resources such as userSkill, userAttribute...
Expand Down Expand Up @@ -105,20 +106,19 @@ async function processUpdate (message, transactionId) {
const { index, type } = topResources[resource]
const id = message.payload.id
const { body: source } = await client.get({ index, type, id, transactionId })
await client.update({
await client.index({
index,
type,
id,
transactionId,
body: {
doc: _.assign(source._source, _.omit(message.payload, ['resource', 'originalTopic']))
},
body: _.assign(source._source, _.omit(message.payload, ['resource', 'originalTopic'])),
pipeline: topResources[resource].ingest ? topResources[resource].ingest.pipeline.id : undefined,
if_seq_no: source._seq_no,
if_primary_term: source._primary_term,
refresh: 'wait_for'
})
if (topResources[resource].enrichPolicy) {
await client.enrich.executePolicy({ name: topResources[resource].enrichPolicy })
if (topResources[resource].enrich) {
await client.enrich.executePolicy({ name: topResources[resource].enrich.policyName })
}
} else if (_.includes(_.keys(userResources), resource)) {
// process user resources such as userSkill, userAttribute...
Expand All @@ -129,7 +129,6 @@ async function processUpdate (message, transactionId) {
logger.info(`Resource validated for ${relateId}`)
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
logger.info(`User fetched ${user.id} and ${relateId}`)
// const relateId = message.payload[userResource.relateKey]

// check the resource exist
if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
Expand Down Expand Up @@ -196,9 +195,6 @@ async function processDelete (message, transactionId) {
transactionId,
refresh: 'wait_for'
})
if (topResources[resource].enrichPolicy) {
await client.enrich.executePolicy({ name: topResources[resource].enrichPolicy })
}
} else if (_.includes(_.keys(userResources), resource)) {
// process user resources such as userSkill, userAttribute...
const userResource = userResources[resource]
Expand Down