You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Refactor @xianwill's post a bit with some copy edits and tightening.
I removed some of the more casual tone but tried to keep @xianwill shining
through. Also put a little bit easier of an onramp into the topic at the front
:smiling_imp:
This scenario is likely fairly relatable to a lot of folks:
13
-
14
-
* I'm using Kafka to ingest data from my application that I want to analyze later.
15
-
* I want my Kafka data to land in my data warehouse and be queryable pretty soon after ingestion, but I'm fine with some seconds or minutes worth of latency before data lands in my warehouse and becomes queryable.
16
-
* I haven't found a blog post or O'Reilly book excerpt yet that makes me feel super confident about my current approach for moving my Kafka streams into my data warehouse. Things I've tried so far _work_ but either cost a lot, are really complicated to setup and maintain, or both.
17
-
18
-
19
-
Scribd is in exactly this position. We use Spark Structured Streaming jobs running in Databricks to write data ingested on Kafka topics into Delta Lake tables. Our monthly AWS bill keeps reminding us that there _should_ be a better solution for this common ETL need.
20
-
21
-
Spark Structured Streaming is a powerful streaming framework that can easily satisfy the pipeline described above with a few lines of code (about 70 very narrow lines in our case), but the cost profile is pretty high and it isn't auto-scale friendly. Our current ETL solution for moving data from our 40ish Kafka topics to our data warehouse relies on several Databricks streaming jobs with no auto-scaling (because auto-scaling doesn't really work for Databricks streaming jobs). These jobs are hosted in Databricks and run 24x7. Every once in a while, a job will fail with an obscure stack trace that doesn't really point to anything actionable on our side. Our streaming jobs recover just fine and preserve correctness, but the alerts are a bit obnoxious.
22
-
23
-
The lack of effective auto-scaling support for Databricks streaming jobs is annoying too. We have peaks and valleys in our ingestion streams. We'd love to be able to scale out to handle extra load dynamically and scale back in to save some dollars. We'd also love to handle long-term growth organically and transparently without having to push new configuration.
24
-
25
-
Kafka Delta Ingest is an open source application developed at Scribd with the very specific goal of optimizing for this use case and removing the pain points we currently experience with our Databricks streaming jobs. Its written in Rust so the runtime is super efficient. Its fully distributed with no coordination between workers (no driver node hanging out). And it _only_ supports a very specific use case, so errors are clear and actionable.
26
-
27
-
The cost profile for a set of ECS services running Kafka Delta Ingest tasks to handle the ETL for our topics is much lower than the Databricks streaming job solution. Kafka Delta Ingest does not have nearly the same flexibility that a Spark Structured Streaming application would provide, but _we don't care_. Without Kafka Delta Streams, we use the same jar to handle all of our topics with Databricks and pass in different parameters for each stream anyway. We use the tiniest little slice of the Spark Structured Streaming API to get this job done today. This is a big reason we had high confidence that we could build a replacement in Rust. Basically we're trading a sledge hammer for a custom pin hammer to knock our tiny nails in.
28
-
29
-
## Some Deets
30
-
31
-
There is bit of an impedance mismatch between Kafka streams and data warehouse file structure. [Parquet is a columnar format](https://parquet.apache.org/documentation/latest/), and each Parquet file (in fact each row group within a file) in a Delta Lake table should include a lot of rows to enable queries to leverage all the neat optimization features of parquet and run as fast as possible. Messages consumed from a Kafka topic come in one at a time though. To bridge this mismatch, Kafka Delta Ingest spends most of its time buffering messages in memory. It checks a few process arguments to make the largest possible parquet files. Those arguments are:
13
+
Streaming data from Apache Kafka into Delta Lake is an integral part of
14
+
Scribd's data platform, but has been challenging to manage and
15
+
scale. We use Spark Structured Streaming jobs to read data from
16
+
Kafka topics and write that data into [Delta Lake](https://delta.io) tables. This approach gets the job
17
+
done but in production our experience has convinced us that a different
18
+
approach is necessary to efficiently bring data from Kafka to Delta Lake. To
The user requirements are likely relatable to a lot of folks:
23
+
24
+
*_My application emits data into Kafka that I want to analyze later._
25
+
*_I want my Kafka data to land in the data warehouse and be queryable pretty soon after ingestion._
26
+
27
+
Looking around the internet, there are few approaches people will blog about
28
+
but many would either cost too much, be really complicated to setup/maintain,
29
+
or both. Our first Spark-based attempt at solving this problem falls under
30
+
"both."
31
+
32
+
Spark Structured Streaming is a powerful streaming framework that can easily
33
+
satisfy the requirements described above with a few lines of code (about 70 in
34
+
our case) but the cost profile is pretty high. Despite the relative simplicity
35
+
of the code, the cluster resources necessary are significant. Many of our
36
+
variable throughput Kafka topics leave us wishing for auto-scaling too.
37
+
38
+
[Kafka Delta Ingest](https://github.com/delta-io/kafka-delta-ingest) is an open
39
+
source daemon created by Scribd in the [Delta Lake project](https://delta.io)
40
+
with the very specific goal of optimizing the path from Kafka to Delta Lake. By
41
+
focusing on this very specific use-case, we can remove many of the pain points
42
+
we currently experience with our Spark streaming jobs. The daemon is written in
43
+
[Rust](https://rust-lang.org) which has helped us keep the runtime super
44
+
efficient. It is also fully distributed with no coordination between workers,
45
+
meaning no driver node hanging out and a smaller overall infrastructure
46
+
footprint.
47
+
48
+
## In depth
49
+
50
+
There is bit of an impedance mismatch between Kafka streams and data warehouse
51
+
file structure. [Parquet is a columnar
52
+
format](https://parquet.apache.org/documentation/latest/), and each Parquet
53
+
file (in fact each row group within a file) in a Delta Lake table should
54
+
include a lot of rows to enable queries to leverage all the neat optimization
55
+
features of parquet and run as fast as possible. Messages consumed from a Kafka
56
+
topic come in one at a time though. To bridge this mismatch, Kafka Delta Ingest
57
+
spends most of its time buffering messages in memory. It checks a few process
58
+
arguments to make the largest possible parquet files. Those arguments are:
32
59
33
60
* allowed_latency - the latency allowed between each Delta write
34
61
* max_messages_per_batch - the maximum number of messages/records to include in each Parquet row group within a file
@@ -37,7 +64,7 @@ There is bit of an impedance mismatch between Kafka streams and data warehouse f
37
64
Internally, our internal Kafka usage guidelines include these constraints:
38
65
39
66
* Messages written to Kafka
40
-
* Must be JSON
67
+
* Must be JSON
41
68
* Must include an ISO 8601 timestamp representing when the message was ingested/created (field name is flexible, but this timestamp must be included somewhere in the message)
42
69
43
70
* Records written to Delta Lake
@@ -53,21 +80,30 @@ Other potential users of Kafka Delta Ingest may have different guidelines on how
53
80
54
81
* JSON formatted messages
55
82
* Buffer flush triggers that thread the needle between query performance and persistence latency
56
-
* Very basic message transformations to limit the message schema contraints we push up to our producer applications
83
+
* Very basic message transformations to limit the message schema constraints we push up to our producer applications
57
84
58
85
### Example
59
86
60
-
Let's say we have an application that writes messages onto a Kafka topic called `web_requests` every time it handles an HTTP request. The message schema written by the producer application includes fields like `status` (i.e. 200, 302, 400, 404, 500), `method` (i.e. "GET", "POST", "PUT") , `url` (the specific URL requested - i.e. /documents/9 or /books/42) and `meta.producer.timestamp` which is an ISO-8601 timestamp representing the date and time the producer wrote the message. Our Delta Lake table is partitioned by a field called `date` which has a `yyyy-MM-dd` format. We choose not to force our producer application to provide this field explicitly. Instead, we will configure our Kafka Delta Ingest stream to perform a transformation of the `meta.producer.timestamp` field that the producer already intends to send.
87
+
Let's say we have an application that writes messages onto a Kafka topic called
88
+
`web_requests` every time it handles an HTTP request. The message schema
89
+
written by the producer application includes fields such as:
61
90
62
-
To accomplish this with Kafka Delta Ingest, using the "web_requests" stream as an example, we would:
91
+
*`status`: 200, 404, 500, 302, etc.
92
+
*`method`: `GET`, `POST`, etc.
93
+
*`url`: Requested URL, e.g. `/documents/42`, etc.
94
+
*`meta.producer.timestamp`: an ISO-8601 timestamp representing the time the producer wrote the message.
63
95
64
-
* Create the "web_requests" topic
65
-
* Create the schema for our Delta Lake table ("kafka_delta_ingest.web_requests")
66
-
* Launch one or more kafka-delta-ingest workers to handle the topic-to-table ETL
96
+
Many of our tables are partitioned partitioned by a field called `date` which
97
+
has a `yyyy-MM-dd` format. We choose not to force our producer application to
98
+
provide this field explicitly. Instead, we will configure our Kafka Delta
99
+
Ingest stream to perform a transformation of the `meta.producer.timestamp`
100
+
field that the producer already intends to send.
67
101
68
-
The Delta table `CREATE` looks roughly like:
102
+
To accomplish this with Kafka Delta Ingest, using the "web_requests" stream as an example, we would:
The Delta Lake schema we create includes more fields than the producer actually sends. Fields not written by the producer include the `meta.kafka` struct and the `date` field.
122
+
The Delta Lake schema we create includes more fields than the producer
123
+
actually sends. Fields not written by the producer include the `meta.kafka`
124
+
struct and the `date` field.
87
125
88
-
Launching a Kafka Delta Ingest worker to get this job done looks like:
89
-
90
-
```
126
+
3. Launch one or more kafka-delta-ingest workers to handle the topic-to-table pipeline:
*`ingest` is the primary subcommand of kafka-delta-ingest. Basically, this means launch a process to read from a topic and write to a Delta table.
103
-
* The two unnamed parameters that immediately follow `ingest` are the topic name and the table name. So in the example, we are saying read from the "web_requests" topic, and write to the Delta table at "s3://path_to_web_requests_delta_table".
104
-
*`-l` is the `--allowed_latency` parameter. We are saying, buffer 60 seconds worth of messages before writing a file.
105
-
*`-K` is a multi-value argument passed through to [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) configuration. In the example, we are configuring our consumer to start from the earliest offsets available in Kafka by default (Kafka Delta Ingest has internal logic to figure out where it left off on previous runs and will seek the consumer manually except on the first run).
106
-
*`-t` (aka `--transform`) is a multi-value argument where we can setup all of the message transformations that must be applied to a message _before_ it becomes a Delta record. This is how we add those extra fields we want in our Delta table that are not provided by the producer. Kafka Delta Ingest supports two types of transform parameters. One type applies a JMESPath expression to an existing field in the message, and another type to pull in "well known" metadata fields available in the message processing context (This only supports Kafka message metadata fields like partition, offset, topic and timestamp at the moment).
137
+
The parameters passed to the daemon configure the allowed latency, some primitive data augmentation, the source topic, and the destination Delta table. For more detailed documentation, consult the [readme](https://github.com/delta-io/kafka-delta-ingest#readme).
107
138
108
-
Kafka Delta Ingest relies on Kafka consumer groups to coordinate partition assignment across many workers handling the same topic. So, if we want to scale out the number of workers handling "web_requests" we can just launch more ECS tasks with the same configuration and respond to Kafka's rebalance events.
139
+
Internally, Kafka Delta Ingest relies on Kafka consumer groups to coordinate
140
+
partition assignment across many workers handling the same topic. If we
141
+
want to scale out the number of workers handling "web_requests" we can just
142
+
launch more ECS tasks with the same configuration and respond to Kafka's
So we have one Kafka Delta Ingest ECS service per topic-to-table ETL workload. Each service runs 24x7. We expect high volume topics to require more worker nodes and scale out and in occassionally, and low volume topics to require a single worker (more on that later).
115
-
116
-
### Contributions
117
-
118
-
Contributions to Kafka Delta Ingest are very welcome and encouraged. Our core team has been focused on supporting our internal use case so far, but we would love to see Kafka Delta Ingest grow into a more well rounded solution. We have not been using the [GitHub issue list](https://github.com/delta-io/kafka-delta-ingest/issues) for managing work just yet since we are mostly managing work internally until we have our primary workloads fully covered, but we will be paying much more attention to this channel in the very near future.
119
-
120
-
One especially interesting area for contribution is related to data format. A lot of folks are using Avro and Protobuf instead of JSON these days. We still happen to use JSON on all of our ingestion streams at the moment, but I'd love to see Avro and Protobuf support in Kafka Delta Ingest. I believe we will see usage of one or both of these formats popping up internally at Scribd in the near future, so an enhancement to abstract over Kafka message data format would likely benefit the community in the near term and eventually Scribd as well.
121
-
122
-
Another big win contribution would be a subcommand intended to run periodically rather than continuously (24x7). I suspect a lot of folks are familiar with a scenario where Kafka is used as a buffer between data warehouse writes that occur periodically throughout the day. We have several low-volume topics that are not a good fit for 24x7 streaming because they only produce a one or two messages per second. Having a 24x7 process buffer these topics in memory would be very awkward. It would make a lot more sense to let these buffer in Kafka and launch a periodic cron-style job to do the ETL a few times a day. This is similar to the "Trigger Once" capability in [Spark Structured Streaming](https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html).
123
-
124
-
Another vector for contribution is [delta-rs](https://github.com/delta-io/delta-rs). Delta-rs is another scribd sponsored open source project and is a key dependency of kafka-delta-ingest. Any write-oriented improvement accepted in delta-rs is very likely to benefit kafka-delta-ingest.
149
+
We have one Kafka Delta Ingest ECS service per topic-to-table ETL workload. Each service runs 24x7. We expect high volume topics to require more worker nodes and scale out and in occassionally, and low volume topics to require a single worker (more on that later).
125
150
126
151
127
-
### Summary
152
+
### 💙
128
153
129
-
My favorite thing about Kafka Delta Ingest is its very narrow scope to optimize and replace a _very_ common use case that you _could_ support with Spark Structured Streaming and Databricks, but much less efficiently. Basically, I love that we are creating a very specific tool for a very common need.
154
+
My favorite thing about Kafka Delta Ingest is its very narrow scope to optimize
155
+
and replace a _very_ common use case that you _could_ support with Spark
156
+
Structured Streaming, but much less efficiently. Basically, I love that we are
157
+
creating a very specific tool for a very common need.
130
158
131
159
Compare/contrast of Kafka Delta Ingest vs Spark Structured Streaming:
132
160
@@ -137,4 +165,32 @@ Compare/contrast of Kafka Delta Ingest vs Spark Structured Streaming:
137
165
* Kafka Delta Ingest is an application that makes strong assumptions about the source and sink and is only configurable via command line arguments, whereas Spark Structured Streaming is a library that you must write and compile code against to yield a jar that can then be hosted as a job.
138
166
* Kafka Delta Ingest is fully distributed and master-less - there is no "driver" node. Nodes can be spun up on a platform like ECS with little thought to coordination or special platform dependencies. A Spark Structured Streaming job must be launched on a platform like Databricks or EMR capable of running a Spark cluster.
139
167
168
+
## Get Involved!
169
+
170
+
Contributions to Kafka Delta Ingest are very welcome and encouraged. Our core team has been focused on supporting our internal use case so far, but we would love to see Kafka Delta Ingest grow into a more well rounded solution. We have not been using the [GitHub issue list](https://github.com/delta-io/kafka-delta-ingest/issues) for managing work just yet since we are mostly managing work internally until we have our primary workloads fully covered, but we will be paying much more attention to this channel in the very near future.
171
+
172
+
One especially interesting area for contribution is related to data format. A
173
+
lot of folks are using Avro and Protobuf instead of JSON these days. We use
174
+
JSON on all of our ingestion streams at the moment, but I'd love to see Avro
175
+
and Protobuf support in Kafka Delta Ingest.
176
+
177
+
Another big contribution would be support for running periodically
178
+
rather than continuously (24x7). I suspect a lot of folks have situations
179
+
where Kafka is used as a buffer between data warehouse writes that
180
+
occur periodically throughout the day. We have several low-volume topics that
181
+
are not a good fit for 24x7 streaming because they only produce a one or two
182
+
messages per second. Having a 24x7 process buffer these topics in memory would
183
+
be very awkward. It would make a lot more sense to let these buffer in Kafka
184
+
and launch a periodic cron-style job to do the ETL a few times a day. This is
185
+
similar to the "Trigger Once" capability in [Spark Structured
0 commit comments