Skip to content

Commit 0b0d716

Browse files
author
Andy Chambers
committed
Add generating-test-data post
1 parent 2141c07 commit 0b0d716

File tree

1 file changed

+205
-0
lines changed

1 file changed

+205
-0
lines changed
+205
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
---
2+
layout: post
3+
title: Generating Test Data
4+
---
5+
6+
In [A Test Helper for JDBC Sinks](/test-machine-test-jdbc-sink/) one
7+
part of the testing process that I glossed over a bit was the line
8+
"Generate some example records to load into the input topic". I said
9+
this like it was no big deal but actually there are a few moving parts
10+
that all need to come together for this to work and it's something I
11+
struggled to get to grips with at the beginning of our journey and
12+
have seen other experienced engineers struggle with too. Part of the
13+
problem I think is that a lot of the Kafka eco-system is made up of
14+
folks using statically typed languages like Scala, Kotlin etc. It does
15+
all work with dynamically typed languages like Clojure but there are
16+
just fewer of us around which makes it all the more important to share
17+
what we learn. So here's a quick guide to generating
18+
test-data and getting it into Kafka using the test-machine from Jackdaw
19+
20+
## Basic Data Generator
21+
22+
You may recall the fields enumerated in the whitelist from the example
23+
sink config. They were as follows:-
24+
25+
* customer-id
26+
* current-balance
27+
* updated-at
28+
29+
So a nice easy first step is to write a function to generate a map
30+
with these fields
31+
32+
{% highlight clojure %}
33+
(ns io.grumpybank.generators
34+
(:require
35+
[java-time :as t]))
36+
37+
(defn gen-customer-balance
38+
[]
39+
{:customer-id (str (java.util.UUID/randomUUID))
40+
:current-balance (rand-int 1000)
41+
:updated-at (t/to-millis-from-epoch (t/instant))})
42+
{% endhighlight %}
43+
44+
## Schema Definition
45+
46+
However this is not enough on it's own. The target database has a schema
47+
which is only implicit in the function above. The JDBC sink connector
48+
will create and evolve the schema for us if we allow it, but in
49+
order to do that, we need to write the data using the Avro serialization
50+
format. Here is Jay Kreps from Confluent [making the case for Avro](https://www.confluent.io/blog/avro-kafka-data/)
51+
and much of the confluent tooling leverages various aspects of this particular
52+
serialization format so it's a good default choice unless you have a good
53+
reason to choose otherwise.
54+
55+
So let's assume the app that produces the customer-balances topic has
56+
already defined a Avro schema. The thing we're trying to test is a
57+
consumer of that topic but as a tester, we have to wear the producer
58+
hat for for a while so we take a copy of the schema from the upstream
59+
app and make it available to our connector test.
60+
61+
{% highlight JSON %}
62+
{
63+
"type": "record",
64+
"name": "CustomerBalance",
65+
"namespace": "io.grumpybank.tables.CustomerBalance",
66+
"fields": [
67+
{
68+
"name": "customer_id",
69+
"type": "string"
70+
},
71+
{
72+
"name": "updated_at",
73+
"type": {
74+
"type": "long",
75+
"logicalType": "timestamp-millis"
76+
}
77+
},
78+
{
79+
"name": "current_balance",
80+
"type": ["null", "long"],
81+
"default": null
82+
}
83+
]
84+
}
85+
{% endhighlight %}
86+
87+
We can use the schema above to create an Avro
88+
[Serde](https://www.apache.org/dist/kafka/2.3.0/javadoc/org/apache/kafka/common/serialization/Serde.html).
89+
Serde is just the name given to the composition of the Serialization
90+
and Deserialization operations. Since one is the opposite of the other
91+
it has become a strong convention that that they are defined together
92+
and the Serde interface captures that convention.
93+
94+
The Serde will be used by the KafkaProducer to serialize the message
95+
value into a ByteArray before sending it off to the broker to be
96+
appended to the specified topic and replicated as per the topic
97+
settings. Here's a helper function for creating the Serde for a schema
98+
represented as JSON in a file using jackdaw.
99+
100+
{% highlight clojure %}
101+
(ns io.grumpybank.avro-helpers
102+
(:require
103+
[jackdaw.serdes.avro :as avro]
104+
[jackdaw.serdes.avro.schema-registry :as reg]))
105+
106+
(def schema-registry-url "http://localhost:8081")
107+
(def schema-registry-client (reg/client schema-registry-url 32))
108+
109+
(defn value-serde
110+
[filename]
111+
(avro/serde {:avro.schema-registry/client schema-registry-client
112+
:avro.schema-registry/url schema-registry-url}
113+
{:avro/schema (slurp filename)
114+
:key? false}))
115+
{% endhighlight %}
116+
117+
The Avro Serdes in jackdaw ultimately use the KafkaAvroSerializer/KafkaAvroDeserializer
118+
which share schemas via the Confluent Schema Registry and optionally
119+
checks for various levels of compatability. The Schema Registry is yet
120+
another topic worthy of it's own blog-post but fortunately Gwen
121+
Shapira has already [written
122+
it](https://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/).
123+
The Jackdaw avro serdes convert clojure data structures like the one
124+
output by `gen-customer-balance` into an [Avro
125+
GenericRecord](https://avro.apache.org/docs/1.8.2/api/java/org/apache/avro/generic/GenericRecord.html)
126+
I'll get into more gory detail about this some other time but for now,
127+
let's move quickly along and discuss the concept of "Topic Metadata".
128+
129+
## Topic Metadata
130+
131+
In Jackdaw, the convention adopted for associating Serdes with
132+
topics is known as "Topic Metadata". This is just a Clojure map so you
133+
can put all kinds of information in there if it helps fulfill some
134+
requirement. Here are a few bits of metadata that jackdaw will act upon
135+
136+
### When creating a topic
137+
* `:topic-name`
138+
* `:replication-factor`
139+
* `:partition-count`
140+
141+
### When serializing a message
142+
* `:key-serde`
143+
* `:value-serde`
144+
* `:key-fn`
145+
* `:partition-fn`
146+
147+
{% highlight clojure %}
148+
(ns io.grumpybank.connectors.test-helpers
149+
(:require
150+
[jackdaw.serdes :as serde]
151+
[io.grumpybank.avro-helpers :as avro]))
152+
153+
(defn topic-config
154+
[topic-name]
155+
{:topic-name topic-name
156+
:replication-factor 1
157+
:key-serde (serde/string-serde)
158+
:value-serde (avro/value-serde (str "./test/resources/schemas/"
159+
topic-name
160+
".json"))})
161+
162+
{% endhighlight %}
163+
164+
## Revisit the helper
165+
166+
Armed with all this new information, we can revisit the helper defined
167+
in the previous post and understand a bit more clearly what's going on
168+
and how it all ties together. For illustrative purposes, we've
169+
explicitly defined a few variables that were a bit obscured in the
170+
original example.
171+
172+
{% highlight clojure %}
173+
174+
(def kconfig {"bootstrap.servers" "localhost:9092"})
175+
(def topics {:customer-balances (topic-config "customer-balances")})
176+
(def seed-data (repeatedly 5 gen-customer-balance))
177+
(def topic-id :customer-balances)
178+
(def key-fn :id)
179+
180+
(fix/with-fixtures [(fix/topic-fixture kconfig topics)]
181+
(jdt/with-test-machine (jdt/kafka-transport kconfig topics)
182+
(fn [machine]
183+
(jdt/run-test machine (concat
184+
(->> seed-data
185+
(map (fn [record]
186+
[:write! topic-id record {:key-fn key-fn}])))
187+
[[:watch watch-fn {:timeout 5000}]])))))
188+
{% endhighlight %}
189+
190+
The vars `kconfig` and `topics` are used by both the `topic-fixture` (to create the
191+
required topic before starting to write test-data to it), and the `kafka-transport`
192+
which teaches the test-machine how read and write data from the listed topics. In
193+
fact the test-machine will start reading data from all listed topics straight
194+
away even before it is instructed to write anything.
195+
196+
Finally we write the test-data to kafka by supplying a list of commands to the
197+
`run-test` function. The `:write!` command takes a topic-identifier (one of the
198+
keys in the topics map), the message value, and a map of options in this case
199+
specifying that the message key can be derived from the message by invoking
200+
`(:id record)`. We could also specify things like the `:partition-fn`,
201+
`:timestamp` etc. When the command is executed by the test-machine, it looks up
202+
the topic-metadata for the specified identifier and uses it to build a ProducerRecord
203+
and send it off to the broker.
204+
205+
Next up will be a deep-dive into the test-machine journal and the watch command.

0 commit comments

Comments
 (0)