1
1
Kafka Partitions Assignment Optimizer
2
2
====
3
3
4
- If you have more than 4 brokers spread on several top-of-rack switches (_ TOR_ ),
5
- you might be interested in balancing replicas and leaders properly to
6
- survive to a switch failure and to avoid bottlenecks.
4
+ If you have more than 4 brokers spread on several top-of-rack switches (_ TOR_ )
5
+ or availability zones ( _ AZ _ ), you might be interested in balancing replicas
6
+ and leaders properly to survive to a switch failure and to avoid bottlenecks.
7
7
8
- On addition to that, when you're re-assigning replicas because of server failure,
8
+ In addition to that, when you're re-assigning replicas because of broker failure,
9
9
or changing the topology (server(s) addition) or the replication factor,
10
- you might be interested in minimizing the number of partitions to move.
10
+ you might be interested in minimizing the number of partitions to move
11
+ to avoid killing your network.
11
12
12
- For this latter, the ` kafka-reassign-partitions.sh ` utility is not doing a perfect
13
- job at minimizing the number of replicas moves.
13
+ For this latter, the ` kafka-reassign-partitions.sh ` utility provided with Kafka
14
+ is not doing a perfect job at minimizing the number of replicas moves.
14
15
15
16
To give a concrete example, adding or removing a server from the cluster is
16
17
generating lots of replica moves (i.e. network traffic) that might impact the
17
- overall cluster availability .
18
+ overall cluster performance .
18
19
19
- Also , if you running a version of Kafka which does not include
20
- [ KIP-36 (rack aware replica assignment)] ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment ) )
20
+ Last but not least , if you're running a version of Kafka which does not include
21
+ [ KIP-36 (rack aware replica assignment)] ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment )
21
22
you don't have any knowledge about the network topology in the
22
23
assignment algorithm.
23
24
24
25
## Demonstration: ` kafka-reassign-partitions.sh ` under-efficiency
25
26
26
- Lets assume we have a cluster with 20 brokers, named 0-19, spread across 2 switches .
27
- Brokers with odd numbers are all on the same _ TOR _ ` tor1 ` ,
28
- brokers with even numbers are wired to ` tor2 ` .
27
+ Lets assume we have a cluster with 20 brokers, named 0-19, spread across 2 AZ .
28
+ Brokers with odd numbers are all on the same _ AZ _ ` b ` ,
29
+ brokers with even numbers are wired to ` a ` .
29
30
30
31
We have a topic ` x.y.z.t ` with 10 partitions and a replication factor of 2.
31
32
@@ -84,7 +85,7 @@ That's rather unfortunate, since computing the diff manually,
84
85
we could simply change the assignment of partition ` 1 ` , like:
85
86
86
87
```
87
- {"topic":"x.y.z.t","partition":1,"replicas":[8,1]},
88
+ {"topic":"x.y.z.t","partition":1,"replicas":[8,1]},
88
89
```
89
90
90
91
All the other moves are not required.
@@ -103,13 +104,16 @@ of writing.
103
104
104
105
105
106
# Replica assignment as a constraint satisfaction problem
106
-
107
+
107
108
If you think out of the box, replicas assignments looks like an
108
- [ optimization function] ( https://en.wikipedia.org/wiki/Mathematical_optimization )
109
- under specific constraints, or a
110
- [ constraint satisfaction problem] ( https://en.wikipedia.org/wiki/Constraint_satisfaction_problem )
109
+ [ constraint satisfaction problem] ( https://en.wikipedia.org/wiki/Constraint_satisfaction_problem ) .
110
+
111
111
For instance, "no two replicas of the same partition assigned to the same broker" is one of
112
- these constraints.
112
+ these constraints which could be expressed as an equation, opening the door
113
+ to [ mathematical optimization] ( https://en.wikipedia.org/wiki/Mathematical_optimization )
114
+ to find the optimum.
115
+
116
+ ## Minimize the number of replicas to move
113
117
114
118
To minimize the move of replicas, the idea is to assign more weight (i.e. more value)
115
119
to existing assignments, so that the linear optimization will try to preserve
@@ -124,7 +128,7 @@ would be expressed as
124
128
125
129
![ Constraint example] ( images/constraint1.png )
126
130
127
- Now you got the trick, there are no limits on constraints to add. The current implementation
131
+ Now you got the trick, there are (almost) no limits on constraints to add. The current implementation
128
132
includes for instance _ leader preservation_ , i.e. the preferred leader has more weight
129
133
than the other partitions.
130
134
180
184
t1b1p0, t1b1p0_l, ... , t1b32p9, t1b32p9_l;
181
185
```
182
186
183
- # Usage
187
+ # Real World Usage
188
+
189
+ Kafka Partitions Assignment Optimizer is public with ❤ by the DAPLAB: [ https://kafka-optimizer.daplab.ch/ ] ( https://kafka-optimizer.daplab.ch/ ) .
190
+
191
+ API endpoint: ** https://kafka-optimizer.daplab.ch/submit **
192
+
193
+ ## Set ` $ZK `
194
+
195
+ In order to run the below example seamlessly, set the zookeeper server(s):
196
+
197
+ ```
198
+ ZK=daplab-wn-22.fri.lan:2181
199
+ ```
184
200
185
201
## Retrieve current assignment
186
202
@@ -189,7 +205,7 @@ $ echo '{"topics": [{"topic": "public.tweets"},{"topic": "trumpet"}], "version":
189
205
```
190
206
191
207
```
192
- $ /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper daplab-wn-22.fri.lan:2181 --generate --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2,3
208
+ $ /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper $ZK --generate --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2,3
193
209
Current partition replica assignment
194
210
195
211
{"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[1,3]},{"topic":"public.tweets","partition":5,"replicas":[0,2]},{"topic":"public.tweets","partition":0,"replicas":[3,0]},{"topic":"trumpet","partition":0,"replicas":[1,3,0]},{"topic":"public.tweets","partition":3,"replicas":[2,3]},{"topic":"public.tweets","partition":8,"replicas":[3,2]},{"topic":"public.tweets","partition":7,"replicas":[2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,1]},{"topic":"public.tweets","partition":2,"replicas":[1,2]},{"topic":"public.tweets","partition":9,"replicas":[0,3]},{"topic":"public.tweets","partition":4,"replicas":[3,1]}]}
@@ -200,39 +216,68 @@ Proposed partition reassignment configuration
200
216
201
217
## Generate REST payload
202
218
203
- Copy the ` Current partition replica assignment ` part and past it in the
219
+ Copy the ` Current partition replica assignment ` part of the above output and paste it in the
204
220
` partitions ` attribute in the ` payload.json ` file, i.e. something like:
205
221
206
- * ` brokers ` attribute is of the format: ` brokerId[:rack][,brokerId[:rack]]* ` ,
207
- i.e. a list of comma-separated broker ids and optional ` :rack ` assignment.
208
- * ` partitions ` attribute is a copy-paste of the ` kafka-reassign-partitions ` command
209
-
210
222
```
211
223
{
212
- "brokers": "0:tor2 ,1:tor1 ,2:tor2 ,3:tor1 ",
224
+ "brokers": "0:a ,1:b ,2:a ,3:b ",
213
225
"partitions": {"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[1,3]},{"topic":"public.tweets","partition":5,"replicas":[0,2]},{"topic":"public.tweets","partition":0,"replicas":[3,0]},{"topic":"trumpet","partition":0,"replicas":[1,3,0]},{"topic":"public.tweets","partition":3,"replicas":[2,3]},{"topic":"public.tweets","partition":8,"replicas":[3,2]},{"topic":"public.tweets","partition":7,"replicas":[2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,1]},{"topic":"public.tweets","partition":2,"replicas":[1,2]},{"topic":"public.tweets","partition":9,"replicas":[0,3]},{"topic":"public.tweets","partition":4,"replicas":[3,1]}]}
214
226
}
215
227
```
216
228
229
+ * ` brokers ` attribute is of the format: ` brokerId[:rack][,brokerId[:rack]]* ` ,
230
+ i.e. a list of comma-separated broker ids and optional ` :rack ` assignment.
231
+ * ` partitions ` attribute is a copy-paste of the ` kafka-reassign-partitions ` command
232
+
217
233
# Call the REST API
218
234
219
235
POST the previously generated payload:
220
236
221
237
```
222
- $ curl -X POST --data @payload.json http ://localhost:4567 /submit
238
+ $ curl -X POST --data @payload.json https ://kafka-optimizer.daplab.ch /submit
223
239
{"version":1,"partitions":[{"topic":"public.tweets","partition":4,"replicas":[3,2]},{"topic":"public.tweets","partition":5,"replicas":[0,1]},{"topic":"public.tweets","partition":6,"replicas":[1,0]},{"topic":"public.tweets","partition":7,"replicas":[2,1]}]}
224
240
```
225
241
226
- You can now copy the output and paste it into ` reassignment-file.json ` file and call
242
+ You can now copy the output of the command above
243
+ and paste it into ` reassignment-file.json ` file and call:
244
+
245
+ ```
246
+ /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper $ZK --reassignment-json-file reassignment-file.json -execute
247
+ ```
248
+
249
+ You can now verify the re-assignment calling:
227
250
228
251
```
229
- kafka-reassign-partitions --zookeeper $ZK --reassignment-json-file reassignment-file.json -execute
252
+ /usr/hdp/current/ kafka-broker/bin/kafka- reassign-partitions.sh --zookeeper $ZK --reassignment-json-file reassignment-file.json -verify
230
253
```
231
254
255
+ # Increase Number of Replicas
256
+
257
+ Another common use case is the increase of the replication factor.
258
+ This can be done quickly in setting the attribute ` newReplicationFactor ` in the payload
259
+ ```
260
+ {
261
+ "brokers": "0,1,2,3,4,5,6,7",
262
+ "partitions": {"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[1,3]},{"topic":"public.tweets","partition":5,"replicas":[0,2]},{"topic":"public.tweets","partition":0,"replicas":[3,0]},{"topic":"trumpet","partition":0,"replicas":[1,3,0]},{"topic":"public.tweets","partition":3,"replicas":[2,3]},{"topic":"public.tweets","partition":8,"replicas":[3,2]},{"topic":"public.tweets","partition":7,"replicas":[2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,1]},{"topic":"public.tweets","partition":2,"replicas":[1,2]},{"topic":"public.tweets","partition":9,"replicas":[0,3]},{"topic":"public.tweets","partition":4,"replicas":[3,1]}]},
263
+ "newReplicationFactor": 3
264
+ }
265
+ ```
266
+
267
+ Call again the service:
268
+
269
+ ```
270
+ $ curl -X POST --data @payload.json https://kafka-optimizer.daplab.ch/submit
271
+ {"version":1,"partitions":[{"topic":"public.tweets","partition":0,"replicas":[3,2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,2,1]},{"topic":"public.tweets","partition":2,"replicas":[1,3,2]},{"topic":"public.tweets","partition":3,"replicas":[2,3,1]},{"topic":"public.tweets","partition":4,"replicas":[3,0,1]},{"topic":"public.tweets","partition":5,"replicas":[0,2,1]},{"topic":"public.tweets","partition":6,"replicas":[1,0,3]},{"topic":"public.tweets","partition":7,"replicas":[2,1,0]},{"topic":"public.tweets","partition":8,"replicas":[3,0,2]},{"topic":"public.tweets","partition":9,"replicas":[0,1,3]}]}
272
+ ```
273
+
274
+ All the partitions have 3 replicas now, and the existing ones are preserved as much as possible (in this example, all the existing replicas are preserved, some leader have been changed though).
275
+
276
+
232
277
# No changes
233
278
234
- If the current assignment is already optimal, the API will simply answer with an empty list,
235
- as follow:
279
+ Please note that the API do return only the changes. If the current assignment is already optimal,
280
+ the API will simply answer with an empty list, as follow:
236
281
237
282
```
238
283
{"version":1,"partitions":[]}
0 commit comments