|
| 1 | +Kafka Partitions Assignment Optimizer |
| 2 | +==== |
| 3 | + |
| 4 | +If you have more than 4 brokers spread on several top-of-rack switches (TOR), |
| 5 | +you might be interested in balancing replicas and leaders properly to |
| 6 | +survive to a switch failure and to avoid bottlenecks. |
| 7 | + |
| 8 | +On addition to that, when you're re-assigning replicas because of server failure, |
| 9 | +or changing the topology (server(s) addition) or the replication factor, |
| 10 | +you might be interested in minimizing the number of partitions to move. |
| 11 | + |
| 12 | +For this latter, the `kafka-reassign-partitions.sh` utility is not doing a perfect |
| 13 | +job at minimizing the number of replicas moves. |
| 14 | + |
| 15 | +To give a concrete example, adding or removing a server from the cluster is |
| 16 | +generating lots of replica moves (i.e. network traffic) that might impact the |
| 17 | +overall cluster availability. |
| 18 | + |
| 19 | +Also, if you running a version of Kafka which does not include |
| 20 | +[KIP-36 (rack aware replica assignment)](https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment)) |
| 21 | +you don't have any knowledge of the network topology in the |
| 22 | +assignment algorithm. |
| 23 | + |
| 24 | +## Demonstration: `kafka-reassign-partitions.sh` under-efficiency |
| 25 | + |
| 26 | +Assume have a cluster with 20 brokers, named 0-19, spread across 2 switches. |
| 27 | +Brokers with odd numbers are all on the same TOR `tor1`, |
| 28 | +brokers with even numbers are wired to `tor2`. |
| 29 | + |
| 30 | +We have a topic `x.y.z.t` with 10 partitions and a replication factor of 2. |
| 31 | + |
| 32 | +Lets run `kafka-reassign-partitions` and see what's happening. |
| 33 | + |
| 34 | +1. Generate a file `topics-to-move.json` with the topic |
| 35 | +``` |
| 36 | +{ |
| 37 | + "topics": [{"topic": "x.y.z.t"}], |
| 38 | + "version": 1 |
| 39 | +} |
| 40 | +``` |
| 41 | + |
| 42 | +2. Call `kafka-reassign-partitions` trying to remove broker `19` |
| 43 | + |
| 44 | +``` |
| 45 | +$ kafka-reassign-partitions --zookeeper $ZK --generate \ |
| 46 | + --topics-to-move-json-file topics-to-move.json \ |
| 47 | + --broker-list 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18 |
| 48 | +
|
| 49 | +Current partition replica assignment |
| 50 | +
|
| 51 | +{"version":1,"partitions":[ |
| 52 | + {"topic":"x.y.z.t","partition":0,"replicas":[7,18]}, |
| 53 | + {"topic":"x.y.z.t","partition":1,"replicas":[8,19]}, |
| 54 | + {"topic":"x.y.z.t","partition":2,"replicas":[9,10]}, |
| 55 | + {"topic":"x.y.z.t","partition":3,"replicas":[0,11]}, |
| 56 | + {"topic":"x.y.z.t","partition":4,"replicas":[1,12]}, |
| 57 | + {"topic":"x.y.z.t","partition":5,"replicas":[2,13]}, |
| 58 | + {"topic":"x.y.z.t","partition":6,"replicas":[3,14]}, |
| 59 | + {"topic":"x.y.z.t","partition":7,"replicas":[4,15]}, |
| 60 | + {"topic":"x.y.z.t","partition":8,"replicas":[5,16]}, |
| 61 | + {"topic":"x.y.z.t","partition":9,"replicas":[6,17]} |
| 62 | +]} |
| 63 | +
|
| 64 | +Proposed partition reassignment configuration |
| 65 | +
|
| 66 | +{"version":1,"partitions":[ |
| 67 | + {"topic":"x.y.z.t","partition":0,"replicas":[14,17]}, |
| 68 | + {"topic":"x.y.z.t","partition":1,"replicas":[15,18]}, |
| 69 | + {"topic":"x.y.z.t","partition":2,"replicas":[16,0]}, |
| 70 | + {"topic":"x.y.z.t","partition":3,"replicas":[17,1]}, |
| 71 | + {"topic":"x.y.z.t","partition":4,"replicas":[18,2]}, |
| 72 | + {"topic":"x.y.z.t","partition":5,"replicas":[0,3]}, |
| 73 | + {"topic":"x.y.z.t","partition":6,"replicas":[1,4]}, |
| 74 | + {"topic":"x.y.z.t","partition":7,"replicas":[2,5]}, |
| 75 | + {"topic":"x.y.z.t","partition":8,"replicas":[3,6]}, |
| 76 | + {"topic":"x.y.z.t","partition":9,"replicas":[4,7]} |
| 77 | +]} |
| 78 | +``` |
| 79 | + |
| 80 | +(I did just re-format and sort the json output for sake of clarity). |
| 81 | + |
| 82 | +If you compare partition by partition, you can see a **lot** of changes in the partition assignment. |
| 83 | +When computing the diff manually, we could simply change the assignment of partition `1`, like |
| 84 | + |
| 85 | +``` |
| 86 | + {"topic":"x.y.z.t","partition":1,"replicas":[8,1]}, |
| 87 | +``` |
| 88 | + |
| 89 | +All the other moves are not required. |
| 90 | + |
| 91 | +Of course `kafka-reassign-partitions` is only proposing an example reassignment |
| 92 | +configuration and editing manually might appear easy, |
| 93 | +but when you're dealing with bigger topics with 40 or more partitions |
| 94 | +and you're under fire, you'd like to have a tool which is doing that for you properly |
| 95 | +without too many manual edits. |
| 96 | + |
| 97 | +LinkedIn open-sourced its [kafka-tools](https://github.com/linkedin/kafka-tools) |
| 98 | +which has really nice features for day to day operations, but lots of |
| 99 | +`random.shuffle(replicas)` are used internally, which might end-up in |
| 100 | +sub-optimal placements. The tool don't have rack awareness either. |
| 101 | + |
| 102 | + |
| 103 | +# Replica assignment as an optimization function |
| 104 | + |
| 105 | +If you think out of the box, replicas assignments looks like an |
| 106 | +[optimization function](https://en.wikipedia.org/wiki/Mathematical_optimization) |
| 107 | +under specific constraints. |
| 108 | + |
| 109 | +For instance, "no two replicas of the same partition assigned to the same broker" is one of |
| 110 | +these constraint. |
| 111 | + |
| 112 | +To minimize the move of replicas, the idea is to assign more weight (i.e. more value) |
| 113 | +to existing assignments, so that the linear optimization will try to preserve |
| 114 | +existing assignment (and in turn minimising the number of bytes moved across the brokers). |
| 115 | + |
| 116 | +Let's define a variable as a concatenation of broker id and partition id, such as |
| 117 | +`b9_p6`. This variable will be 1 if the partition 6 is assigned to the broker 9. |
| 118 | + |
| 119 | +The previous constraint, "no two replicas of the same partition assigned to the same broker", |
| 120 | +would be expressed as |
| 121 | + |
| 122 | + |
| 123 | + |
| 124 | +Now you got the trick, there are no limits on constraints to add. The current implementation |
| 125 | +includes for instance _leader preservation_, i.e. the preferred leader has more weight |
| 126 | +than the other partitions. |
| 127 | + |
| 128 | +[lp_solve]() is used in the background to solve the linear equation generated. |
| 129 | + |
| 130 | + |
| 131 | + |
| 132 | + |
| 133 | +## Example of equation |
| 134 | + |
| 135 | +Here is an example of equations generated based on the current assignment and the list of |
| 136 | +brokers. |
| 137 | + |
| 138 | +``` |
| 139 | +// Optimization function, based on current assignment |
| 140 | +max: 1 t1b12p5 + 4 t1b19p6_l + 2 t1b14p8 + 2 t1b11p2_l + 1 t1b21p2 ... |
| 141 | +
|
| 142 | +// Constrain on replication factor for every partition |
| 143 | +t1b1p0 + ... + t1b19p0_l + ... + t1b32p0_l = 2; |
| 144 | +t1b1p1 + ... + t1b19p1_l + ... + t1b32p1_l = 2; |
| 145 | +... |
| 146 | +
|
| 147 | +// Constraint on having one and only one leader per partition |
| 148 | +t1b1p0_l + ... + t1b19p0_l + ... + t1b32p0_l = 1; |
| 149 | +t1b1p1_l + ... + t1b19p1_l + ... + t1b32p1_l = 1; |
| 150 | +... |
| 151 | +
|
| 152 | +// Constraint on min/max replicas per broker |
| 153 | +t1b1p0 + t1b1p0_l + ... + t1b1p9 + t1b1p9_l <= 2; |
| 154 | +t1b1p0 + t1b1p0_l + ... + t1b1p9 + t1b1p9_l >= 1; |
| 155 | +... |
| 156 | +
|
| 157 | +// Constraint on min/max leaders per broker |
| 158 | +t1b1p0_l + t1b1p1_l + ... + t1b1p8_l + t1b1p9_l <= 1; |
| 159 | +t1b2p0_l + t1b2p1_l + ... + t1b2p8_l + t1b2p9_l >= 0; |
| 160 | +... |
| 161 | +
|
| 162 | +// Constraint on no leader and replicas on the same broker |
| 163 | +t1b1p0 + t1b1p0_l <= 1; |
| 164 | +t1b1p1 + t1b1p1_l <= 1; |
| 165 | +... |
| 166 | +
|
| 167 | +// Constrain on min/max total replicas per racks. tor02 here |
| 168 | +t1b2p0 + t1b2p0_l + ... t1b30p9 + t1b30p9_l <= 10; |
| 169 | +t1b2p0 + t1b2p0_l + ... t1b30p9 + t1b30p9_l >= 10; |
| 170 | +... |
| 171 | +
|
| 172 | +// Constrain on min/max replicas per partitions per racks. p0 on tor02 here |
| 173 | +t1b2p0 + t1b2p0_l + ... + t1b30p0 + t1b30p0_l <= 1; |
| 174 | +... |
| 175 | +
|
| 176 | +// All variables are binary |
| 177 | +bin |
| 178 | +t1b1p0, t1b1p0_l, ... , t1b32p9, t1b32p9_l; |
| 179 | +``` |
| 180 | + |
| 181 | +# Usage |
| 182 | + |
| 183 | +## Retrieve current assignment |
| 184 | + |
| 185 | +``` |
| 186 | +$ echo '{"topics": [{"topic": "public.tweets"},{"topic": "trumpet"}], "version":1}' > topics-to-move.json |
| 187 | +``` |
| 188 | + |
| 189 | +``` |
| 190 | +$ /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper daplab-wn-22.fri.lan:2181 --generate --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2,3 |
| 191 | +Current partition replica assignment |
| 192 | +
|
| 193 | +{"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[1,3]},{"topic":"public.tweets","partition":5,"replicas":[0,2]},{"topic":"public.tweets","partition":0,"replicas":[3,0]},{"topic":"trumpet","partition":0,"replicas":[1,3,0]},{"topic":"public.tweets","partition":3,"replicas":[2,3]},{"topic":"public.tweets","partition":8,"replicas":[3,2]},{"topic":"public.tweets","partition":7,"replicas":[2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,1]},{"topic":"public.tweets","partition":2,"replicas":[1,2]},{"topic":"public.tweets","partition":9,"replicas":[0,3]},{"topic":"public.tweets","partition":4,"replicas":[3,1]}]} |
| 194 | +Proposed partition reassignment configuration |
| 195 | +
|
| 196 | +{"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[2,1]},{"topic":"public.tweets","partition":5,"replicas":[1,0]},{"topic":"public.tweets","partition":0,"replicas":[0,2]},{"topic":"trumpet","partition":0,"replicas":[1,0,2]},{"topic":"public.tweets","partition":3,"replicas":[3,1]},{"topic":"public.tweets","partition":8,"replicas":[0,1]},{"topic":"public.tweets","partition":1,"replicas":[1,3]},{"topic":"public.tweets","partition":7,"replicas":[3,2]},{"topic":"public.tweets","partition":2,"replicas":[2,0]},{"topic":"public.tweets","partition":9,"replicas":[1,2]},{"topic":"public.tweets","partition":4,"replicas":[0,3]}]} |
| 197 | +``` |
| 198 | + |
| 199 | +## Generate REST payload |
| 200 | + |
| 201 | +Copy the `Current partition replica assignment` part and past it in the |
| 202 | +`partitions` attribute in the `payload.json` file, i.e. something like: |
| 203 | + |
| 204 | +``` |
| 205 | +{ |
| 206 | + "brokers": "0:tor2,1:tor1,2:tor2,3:tor1", |
| 207 | + "partitions": {"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[1,3]},{"topic":"public.tweets","partition":5,"replicas":[0,2]},{"topic":"public.tweets","partition":0,"replicas":[3,0]},{"topic":"trumpet","partition":0,"replicas":[1,3,0]},{"topic":"public.tweets","partition":3,"replicas":[2,3]},{"topic":"public.tweets","partition":8,"replicas":[3,2]},{"topic":"public.tweets","partition":7,"replicas":[2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,1]},{"topic":"public.tweets","partition":2,"replicas":[1,2]},{"topic":"public.tweets","partition":9,"replicas":[0,3]},{"topic":"public.tweets","partition":4,"replicas":[3,1]}]} |
| 208 | +} |
| 209 | +``` |
| 210 | + |
| 211 | +# Call the REST API |
| 212 | + |
| 213 | +POST the previously generated payload: |
| 214 | + |
| 215 | +``` |
| 216 | +$ curl -X POST --data @payload.json http://localhost:4567/submit |
| 217 | +{"version":1,"partitions":[{"topic":"public.tweets","partition":4,"replicas":[3,2]},{"topic":"public.tweets","partition":5,"replicas":[0,1]},{"topic":"public.tweets","partition":6,"replicas":[1,0]},{"topic":"public.tweets","partition":7,"replicas":[2,1]}]} |
| 218 | +``` |
| 219 | + |
| 220 | +You can now copy the output and paste it into `reassignment-file.json` file and call |
| 221 | + |
| 222 | +``` |
| 223 | +kafka-reassign-partitions --zookeeper $ZK --reassignment-json-file reassignment-file.json -execute |
| 224 | +``` |
| 225 | + |
| 226 | +# No changes |
| 227 | + |
| 228 | +If no change, the API call answers: |
| 229 | + |
| 230 | +``` |
| 231 | +{"version":1,"partitions":[]} |
| 232 | +``` |
0 commit comments