|
| 1 | +--- |
| 2 | +layout: post |
| 3 | +title: Reporting on Kafka Connect Jobs |
| 4 | +--- |
| 5 | + |
| 6 | +At the risk of diluting the brand message (i.e. testing kafka stuff |
| 7 | +using Clojure), in this post, I'm going to introduce some code for |
| 8 | +extracting a report on the status of Kafka Connect jobs. I'd argue |
| 9 | +it's still "on-message", falling as it does under the |
| 10 | +observability/metrics umbrella and since observability is an integral |
| 11 | +part of [testing in |
| 12 | +production](https://medium.com/@copyconstruct/testing-in-production-the-safe-way-18ca102d0ef1) |
| 13 | +then I think we're on safe ground. |
| 14 | + |
| 15 | +I know I promised a deep-dive on the test-machine journal but it's |
| 16 | +been a crazy week and I needed to self-sooth by writing about |
| 17 | +something simpler that was mostly ready to go. |
| 18 | + |
| 19 | +## Kafka Connect API |
| 20 | + |
| 21 | +The distributed version of Kafka Connect provides an HTTP API for |
| 22 | +managing jobs and providing access to their configuration and current |
| 23 | +status, including any errors that have caused the job to stop |
| 24 | +working. It also provides metrics over JMX but that requires |
| 25 | + |
| 26 | + 1. Server configuration that is not enabled by default |
| 27 | + 2. Access to a port which is often only exposed inside the production |
| 28 | + stack and is intended to support being queried by a "proper" |
| 29 | + monitoring system |
| 30 | + |
| 31 | +This is not to say that you shouldn't go ahead and setup proper |
| 32 | +monitoring. You definitely should. But you needn't let the absence of |
| 33 | +it prevent you from quickly getting an idea of overall health of your |
| 34 | +Kafka Connect system. |
| 35 | + |
| 36 | +For this script we'll be hitting two of the endpoints provided by |
| 37 | +Kafka Connect |
| 38 | + |
| 39 | +## GET /connectors |
| 40 | + |
| 41 | +Here's the function that hits the `/connectors` endpoint. It uses Zach |
| 42 | +Tellman's [aleph](https://github.com/ztellman/aleph) and |
| 43 | +[manifold](https://github.com/ztellman/manifold) libraries. The |
| 44 | +`http/get` function returns a deferred that allows the API call to be |
| 45 | +handled asynchronously by setting up a "chain" of operations to deal |
| 46 | +with the response when it arrives. |
| 47 | + |
| 48 | +{% highlight clojure %} |
| 49 | + |
| 50 | +(ns grumpybank.observability.kc |
| 51 | + (:require |
| 52 | + [aleph.http :as http] |
| 53 | + [manifold.deferred :as d] |
| 54 | + [clojure.data.json :as json] |
| 55 | + [byte-streams :as bs])) |
| 56 | + |
| 57 | +(defn connectors |
| 58 | + [connect-url] |
| 59 | + (d/chain (http/get (format "%s/connectors" connect-url)) |
| 60 | + #(update % :body bs/to-string) |
| 61 | + #(update % :body json/read-str) |
| 62 | + #(:body %))) |
| 63 | + |
| 64 | +{% endhighlight %} |
| 65 | + |
| 66 | +## GET /connectors/:connector-id/status |
| 67 | + |
| 68 | +Here's the function that hits the `/connectors/:connector-id/status` |
| 69 | +endpoint. Again, we invoke the API endpoint and setup a chain to deal |
| 70 | +with the response by first converting the raw bytes to a string, and |
| 71 | +then reading the JSON string into a Clojure map. Just the same as |
| 72 | +before. |
| 73 | + |
| 74 | +{% highlight clojure %} |
| 75 | +(defn connector-status |
| 76 | + [connect-url connector] |
| 77 | + (d/chain (http/get (format "%s/connectors/%s/status" |
| 78 | + connect-url |
| 79 | + connector)) |
| 80 | + #(update % :body bs/to-string) |
| 81 | + #(update % :body json/read-str) |
| 82 | + #(:body %))) |
| 83 | +{% endhighlight %} |
| 84 | + |
| 85 | +## Generating a report |
| 86 | + |
| 87 | +Depending on how big your Kafka Connect installation becomes and how |
| 88 | +you deploy connectors you might easily end up with 100s of connectors |
| 89 | +returned by the request above. Submitting a request to the status |
| 90 | +endpoint for each one in serial would take quite a while. On the |
| 91 | +other-hand, the server on the other side is capable of handling many |
| 92 | +requests in parallel. This is especially true if there are a few Kafka |
| 93 | +Connect nodes co-operating behind a load-balancer. |
| 94 | + |
| 95 | +This is why it is advantageous to use aleph here for the HTTP requests |
| 96 | +instead of the more commonly used clj-http. Once we have our list of |
| 97 | +connectors, we can fire off simultaneous requests for the status of |
| 98 | +each connector, and collect the results asynchronously. |
| 99 | + |
| 100 | +{% highlight clojure %} |
| 101 | +(defn connector-report |
| 102 | + [connect-url] |
| 103 | + (let [task-failed? #(= "FAILED" (get % "state")) |
| 104 | + task-running? #(= "RUNNING" (get % "state")) |
| 105 | + task-paused? #(= "PAUSED" (get % "state"))] |
| 106 | + (d/chain (connectors connect-url) |
| 107 | + #(apply d/zip (map (partial connector-status connect-url) %)) |
| 108 | + #(map (fn [s] |
| 109 | + {:connector (get s "name") |
| 110 | + :failed? (failed? s) |
| 111 | + :total-tasks (count (get s "tasks")) |
| 112 | + :failed-tasks (->> (get s "tasks") |
| 113 | + (filter task-failed?) |
| 114 | + count) |
| 115 | + :running-tasks (->> (get s "tasks") |
| 116 | + (filter task-running?) |
| 117 | + count) |
| 118 | + :paused-tasks (->> (get s "tasks") |
| 119 | + (filter task-paused?) |
| 120 | + count) |
| 121 | + :trace (when (failed? s) |
| 122 | + (traces s))}) %)))) |
| 123 | +{% endhighlight %} |
| 124 | + |
| 125 | +Here we first define a few helper predicates (`task-failed?`, |
| 126 | +`task-running?`, and `task-paused?`) for classifying the status |
| 127 | +eventually returned by `connector-status`. Then we kick off the |
| 128 | +asynchronous pipeline by requesting a list of connectors using |
| 129 | +`connectors`. |
| 130 | + |
| 131 | +The first operation on the chain is to apply the result to `d/zip` |
| 132 | +which as described above will invoke the status API calls concurrently |
| 133 | +and return a vector with all the responses once they are all complete. |
| 134 | + |
| 135 | +Then we simply map the results over an anonymous function which builds |
| 136 | +a map out of with the connector id together with whether it has |
| 137 | +failed, how many of its tasks are in each state, and when the connector |
| 138 | +*has* failed, the stacktrace provided by the status endpoint. |
| 139 | + |
| 140 | +If you have a huge number of connect jobs you might need to split the |
| 141 | +initial list into smaller batches and submit each batch in |
| 142 | +parallel. This can easily be done using Clojure's built-in `partition` |
| 143 | +function but I didn't find this to be necessary on our fairly large |
| 144 | +collection of kafka connect jobs. |
| 145 | + |
| 146 | +Wrap these functions up in a simple command line script and run it |
| 147 | +after making any changes to your kafka-connect configuration to make |
| 148 | +sure everything is still hunky-dory. |
| 149 | + |
| 150 | +Here's a [gist](https://gist.github.com/cddr/da5215ed83653872bee3febdbb435e65) |
| 151 | +that wraps these functions up into a quick and dirty script that reports the |
| 152 | +results to STDOUT. Feel free to re-use, refactor, and integrate with |
| 153 | +your own script to make sure after making changes to your deployed Kafka |
| 154 | +Connect configuration, everything remains hunky-dory. |
| 155 | + |
| 156 | + |
0 commit comments