Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit d6a48bc

Browse files
Revert "Update to listen to only aggregate topic"
This reverts commit f558d59.
1 parent f558d59 commit d6a48bc

File tree

6 files changed

+12
-45
lines changed

6 files changed

+12
-45
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ The following parameters can be set in config files or in env variables:
2323
- UBAHN_CREATE_TOPIC: the create ubahn entity Kafka message topic, default value is 'u-bahn.action.create'
2424
- UBAHN_UPDATE_TOPIC: the update ubahn entity Kafka message topic, default value is 'u-bahn.action.update'
2525
- UBAHN_DELETE_TOPIC: the delete ubahn entity Kafka message topic, default value is 'u-bahn.action.delete'
26-
- UBAHN_AGGREGATE_TOPIC: the ubahn entity aggregate topic, that contains create, update and delete topics. Default value is 'u-bahn.action.aggregate'
2726
- ES.HOST: Elasticsearch host, default value is 'localhost:9200'
2827
- ES.AWS_REGION: The Amazon region to use when using AWS Elasticsearch service, default value is 'us-east-1'
2928
- ES.API_VERSION: Elasticsearch API version, default value is '7.4'

VERIFICATION.md

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,5 @@
11
# Verification
22

3-
**NOTE** - For all kafka message below, update the topic to be the one set in config.UBAHN_AGGREGATE_TOPIC and inside the payload object, create a new attribute named `originalTopic` with the value of the original topic. Example:
4-
5-
```
6-
{
7-
"topic": "u-bahn.action.aggregate",
8-
"originator": "u-bahn-api",
9-
"timestamp": "2019-07-08T00:00:00.000Z",
10-
"mime-type": "application/json",
11-
"payload": {
12-
"originalTopic": "u-bahn.action.create"
13-
"resource": "user",
14-
"id": "391a3656-9a01-47d4-8c6d-64b68c44f212",
15-
"handle": "user"
16-
}
17-
}
18-
```
19-
20-
Additionally, you will be entering the messages into only one topic:
21-
22-
```
23-
docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic u-bahn.action.aggregate
24-
```
25-
263
1. start kafka server, start elasticsearch, initialize Elasticsearch, start processor app
274
2. start kafka-console-producer to write messages to `u-bahn.action.create`
285
topic:

config/default.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ module.exports = {
1717
UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
1818
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
1919
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
20-
UBAHN_AGGREGATE_TOPIC: process.env.UBAHN_AGGREGATE_TOPIC || 'u-bahn.action.aggregate',
2120

2221
ES: {
2322
HOST: process.env.ES_HOST || 'localhost:9200',

docker-kafka-es/docker-compose.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ services:
1212
- "9092:9092"
1313
environment:
1414
KAFKA_ADVERTISED_HOST_NAME: localhost
15-
# KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1"
16-
KAFKA_CREATE_TOPICS: "u-bahn.action.aggregate:1:1"
15+
KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1"
1716
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
1817
# esearch:
1918
# image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2

src/app.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
5353
}
5454

5555
try {
56-
switch (messageJSON.payload.originalTopic) {
56+
switch (topic) {
5757
case config.UBAHN_CREATE_TOPIC:
5858
await ProcessorService.processCreate(messageJSON)
5959
break
@@ -63,8 +63,6 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
6363
case config.UBAHN_DELETE_TOPIC:
6464
await ProcessorService.processDelete(messageJSON)
6565
break
66-
default:
67-
throw new Error(`Unknown original topic: ${messageJSON.payload.originalTopic}`)
6866
}
6967

7068
logger.debug('Successfully processed message')
@@ -90,8 +88,7 @@ const check = () => {
9088
return connected
9189
}
9290

93-
// const topics = [config.UBAHN_CREATE_TOPIC, config.UBAHN_UPDATE_TOPIC, config.UBAHN_DELETE_TOPIC]
94-
const topics = [config.UBAHN_AGGREGATE_TOPIC]
91+
const topics = [config.UBAHN_CREATE_TOPIC, config.UBAHN_UPDATE_TOPIC, config.UBAHN_DELETE_TOPIC]
9592

9693
consumer
9794
.init([{

src/services/ProcessorService.js

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ const {
1111
userResources,
1212
organizationResources
1313
} = require('../common/constants')
14-
const config = require('config')
1514

1615
/**
1716
* Process create entity message
@@ -27,7 +26,7 @@ async function processCreate (message) {
2726
index: topResources[resource].index,
2827
type: topResources[resource].type,
2928
id: message.payload.id,
30-
body: _.omit(message.payload, ['resource', 'originalTopic']),
29+
body: _.omit(message.payload, 'resource'),
3130
refresh: 'wait_for'
3231
})
3332
} else if (_.includes(_.keys(userResources), resource)) {
@@ -45,7 +44,7 @@ async function processCreate (message) {
4544
logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId}`)
4645
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
4746
} else {
48-
user[userResource.propertyName].push(_.omit(message.payload, ['resource', 'originalTopic']))
47+
user[userResource.propertyName].push(_.omit(message.payload, 'resource'))
4948
await helper.updateUser(message.payload.userId, user)
5049
}
5150
} else if (_.includes(_.keys(organizationResources), resource)) {
@@ -63,7 +62,7 @@ async function processCreate (message) {
6362
logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId}`)
6463
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
6564
} else {
66-
org[orgResources.propertyName].push(_.omit(message.payload, ['resource', 'originalTopic']))
65+
org[orgResources.propertyName].push(_.omit(message.payload, 'resource'))
6766
await helper.updateOrg(message.payload.organizationId, org)
6867
}
6968
} else {
@@ -78,8 +77,7 @@ processCreate.schema = {
7877
timestamp: Joi.date().required(),
7978
'mime-type': Joi.string().required(),
8079
payload: Joi.object().keys({
81-
resource: Joi.string().required(),
82-
originalTopic: Joi.string().required().valid(config.UBAHN_CREATE_TOPIC)
80+
resource: Joi.string().required()
8381
}).required().unknown(true)
8482
}).required()
8583
}
@@ -103,7 +101,7 @@ async function processUpdate (message) {
103101
type,
104102
id,
105103
body: {
106-
doc: _.assign(source, _.omit(message.payload, ['resource', 'originalTopic']))
104+
doc: _.assign(source, _.omit(message.payload, 'resource'))
107105
},
108106
refresh: 'wait_for'
109107
})
@@ -124,7 +122,7 @@ async function processUpdate (message) {
124122
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
125123
} else {
126124
const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
127-
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, ['resource', 'originalTopic']))
125+
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
128126
logger.info(`Updating ${user.id} and ${relateId}`)
129127
await helper.updateUser(message.payload.userId, user)
130128
logger.info(`Updated ${user.id} and ${relateId}`)
@@ -143,7 +141,7 @@ async function processUpdate (message) {
143141
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
144142
} else {
145143
const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId])
146-
org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, ['resource', 'originalTopic']))
144+
org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
147145
await helper.updateOrg(message.payload.organizationId, org)
148146
}
149147
} else {
@@ -158,8 +156,7 @@ processUpdate.schema = {
158156
timestamp: Joi.date().required(),
159157
'mime-type': Joi.string().required(),
160158
payload: Joi.object().keys({
161-
resource: Joi.string().required(),
162-
originalTopic: Joi.string().required().valid(config.UBAHN_UPDATE_TOPIC)
159+
resource: Joi.string().required()
163160
}).required().unknown(true)
164161
}).required()
165162
}
@@ -222,8 +219,7 @@ processDelete.schema = {
222219
timestamp: Joi.date().required(),
223220
'mime-type': Joi.string().required(),
224221
payload: Joi.object().keys({
225-
resource: Joi.string().required(),
226-
originalTopic: Joi.string().required().valid(config.UBAHN_DELETE_TOPIC)
222+
resource: Joi.string().required()
227223
}).required().unknown(true)
228224
}).required()
229225
}

0 commit comments

Comments
 (0)