Skip to content

Commit e1c858d

Browse files
authored
Merge pull request scribd#65 from scribd/delta-streams
Add a blog post with some of the caveats and praise for Delta Lake
2 parents d04026e + 8c87530 commit e1c858d

File tree

4 files changed

+182
-0
lines changed

4 files changed

+182
-0
lines changed

_posts/2019-08-28-real-time-data-platform.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ tags:
66
- kafka
77
- aws
88
- data
9+
- real-time
910
team: Core Platform
1011
---
1112

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
---
2+
layout: post
3+
title: Streaming data in and out of Delta Lake
4+
tags:
5+
- databricks
6+
- real-time
7+
- kafka
8+
- featured
9+
team: Core Platform
10+
author: rtyler
11+
---
12+
13+
14+
With [Delta Lake](https://delta.io) we don't have the lines between
15+
streaming and batch data typically found in data platforms. Scribd
16+
developers can treat data as real-time as they wish! Delta Lake enables some
17+
workloads to treat data sets like they are traditional "batchy" data stores,
18+
while other workloads work with _the same data_ as a streaming source or sink.
19+
This immense flexibility allows our data engineers and scientists to mix and
20+
match data quickly, providing the business with valuable results at
21+
unprecedented speed.
22+
23+
At its core Delta Lake combines [Apache Parquet](https://parquet.apache.org/) with a transaction log. This simple
24+
foundation enables _incredible_ higher level data-access behaviors from [Apache Spark](https://spark.apache.org), which powers the vast majority of our data platform at Scribd.
25+
When we first considered building a
26+
[Real-time Data Platform](/blog/2019/real-time-data-platform.html)
27+
the storage layer was "to be determined". In hindsight, I cannot imagine how a
28+
team of less than ten developers could have successfully delivered on the
29+
"Real-time" vision in so short a time. Much of that success rests on adopting
30+
Delta Lake, and in this post I would like to share some of the motivations,
31+
valuable features, and perhaps most importantly **caveats** to adopting Delta
32+
Lake for streaming data needs.
33+
34+
35+
## Beforetimes
36+
37+
Storage is the most foundational component of a data platform, and we were in
38+
bad shape at the beginning of effort. The original storage layer was built on top
39+
of [HDFS](https://en.wikipedia.org/wiki/HDFS), which was a _very_ reasonable decision at the time. Unfortunately as the years
40+
went on, our use of HDFS did not keep up with the times. Technical debt accrued in many forms:
41+
42+
* Uncompressed data
43+
* Multiple different file types, depending on what era a partition of data was written in, it might be Parquet, ORC, RCFile, or just dumb plaintext.
44+
* [Small files](https://www.quora.com/What-is-the-small-file-problem-in-Hadoop?share=1), over 60% of the files in the cluster were considered "small files".
45+
46+
47+
![HDFS is fine](/post-images/2020-06-delta-lake/this-is-fine.png)
48+
49+
50+
The storage layer was failing to meet our _batch_ needs well before we had even
51+
considered layering streaming data on top of it.
52+
53+
54+
## Welcome to the future
55+
56+
[Delta Lake](https://delta.io) solved a **lot** of the problems we had, and
57+
even a few we did not know we had yet! We adopted Delta Lake inline with our shift into the cloud, which I recently wrote about on the Databricks blog:
58+
[Accelerating developers by ditching the data center](https://databricks.com/blog/2020/06/10/accelerating-developers-by-ditching-the-data-center.html).
59+
Yet, Delta Lake wasn't our first choice and didn't motivate our shift to AWS.
60+
Our original prototype consisted of writing Parquet files to S3, where we
61+
immediately noticed potential problems.
62+
63+
### Data Consistency
64+
65+
S3 is _eventually consistent_. If you create an object `bucket/foo.gz`, you can
66+
retrieve `bucket/foo.gz` immediately, but other clients issuing list or
67+
metadata commands may see `foo.gz` appear at different times. In a system where
68+
one job is writing data into a bucket and another is reading data out of that
69+
bucket, **consistency** becomes a major concern. Many organizations solve this
70+
by deploying
71+
[S3Guard](https://hadoop.apache.org/docs/r3.1.1/hadoop-aws/tools/hadoop-aws/s3guard.html)
72+
which helps address the problem. Delta Lake provides us with **ACID transactions**
73+
that make the entire data consistency question moot.
74+
75+
> What I wrote to storage is exactly what the other job will read
76+
77+
### Streaming to Delta
78+
79+
Delta Lake makes building a streaming platform almost _trivial_ with two key
80+
higher level behaviors: streaming sinks and sources. Like practically any data
81+
store you can stream data _into_ a Delta table, though Delta's transactions
82+
make this a much safer operation when deploying on eventually consistent data
83+
stores like S3. Delta tables can however also act as a **source** for another streaming consumer.
84+
85+
In Scribd's deployment, this allows us to have some Spark Streaming jobs which
86+
are consuming directly from [Apache Kafka](https://kafka.apache.org), while
87+
other downstream streaming jobs consume _from Delta tables themselves_.
88+
89+
My [previous post](/blog/2020/shipping-rust-to-production.html) alluded to the "View Analytics" project, which relies heavily on Delta Lake's streaming support:
90+
91+
![View Analytics data pipeline](/post-images/2020-06-delta-lake/view-analytics.png)
92+
93+
94+
By utilizing Delta tables as _streaming sources_ in the pipeline above, we
95+
enable ad-hoc and other batch workloads to query data within the pipeline as if
96+
it were just another table in the catalog. This is **key** for us because it
97+
means many of our data consumers are _only_ interacting with Delta tables,
98+
rather than having to switch between pulling data from Kafka topics and
99+
separately from batch tables. Delta Lake allows the data consumers to treat
100+
them all as just tables, although some are a little more real-time than others!
101+
102+
#### Caveats
103+
104+
Delta tables within a streaming context do have some limitations which are
105+
important to be aware of when designing a system. Many of our lessons learned
106+
came by partnering with [Databricks](https://databricks.com) during the design
107+
phase of the View Analytics project. Fortunately they were able to identify
108+
some ways in which our original designs would have failed before we ended up
109+
building and deploying anything to production.
110+
111+
Some things to keep in mind:
112+
113+
* Multiple streams can _append_ to the same table concurrently, *but* if there
114+
are any non-append writers (e.g. [merge writers](https://docs.delta.io/latest/delta-update.html)) then no other
115+
writers should run concurrently with the non-append writer. There are some distinctions here depending on whether the jobs are running in a Databricks runtime or not, and whether those jobs are running in the same workspace. Generally speaking it's best to only use append-only tables as streaming sources.
116+
* When there are any non-append writers, an optimize cannot run externally. In essence it should be executed inline in a streaming job when the merge writer is not running, i.e. periodically within a `foreachBatch`. Locking features only available in the Databricks runtime may allow for concurrent upsert writers, but your mileage may vary!
117+
* [Checkpoints](https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing) must be managed *carefully*. each checkpoint location should belong exclusively to a single write stream. restarts of the job must always use the same checkpoint location. do not reference the same checkpoint location from multiple write streams as they will overwrite each others checkpoints (very bad).
118+
119+
120+
### Optimize
121+
122+
Building further upon the foundation laid by transactions, Delta Lake provides
123+
an `OPTIMIZE` command, which helps prevent the small files problem entirely.
124+
125+
In the streaming context, it is highly unlikely that event data will come in
126+
perfectly even-sized batches that can be written to storage. At a high level,
127+
when the optimize command is run it will:
128+
129+
* Create a new transaction
130+
* Take a bunch of small files
131+
* Combine them together as larger, well-sized and compressed files
132+
* Write those to storage
133+
* Completes the transaction.
134+
135+
136+
This can all be done _online_ which means whatever data is streaming into the table can continue to stream.
137+
138+
139+
**NOTE:** Since optimize doesn't delete those small files after it operates, periodic [vacuum commands](https://docs.delta.io/0.3.0/delta-utility.html#vacuum) are necessary to reduce storage bloat.
140+
141+
142+
### General caveats
143+
144+
Delta Lake is a great piece of technology for streams and batch workloads
145+
alike, regardless of how it is used, there are some general limitations to bear
146+
in mind. Perhaps the most notable is that it can/should _only_ be accessed from
147+
Apache Spark. This usually means that a Spark cluster must be launched in order
148+
to read or write from Delta tables, and the corresponding costs associated with
149+
that should be known. While there are
150+
[efforts](https://github.com/reiseburo/delta.rs) to provide client APIs
151+
external to Spark, nothing yet exists which is production ready.
152+
153+
We are Databricks customers, which means we're actually using the proprietary
154+
Delta libraries which are include in the Databricks Spark runtimes. In our
155+
testing and development we have observed a number of important gaps between
156+
what is available in the open source delta.io client libraries, compared to
157+
what is present in the Databricks version. These gaps have mostly affected our
158+
ability to do fully local testing, dictating a certain amount of manual testing
159+
for Spark streaming jobs launched in Databricks, before they are deployed to
160+
production.
161+
162+
163+
There are a number of other caveats when using Delta that are important to be
164+
aware of, and while I cannot cite every single one here I will stress: read the
165+
documentation **thoroughly**. There are a number of callouts in the docs which
166+
highlight a constraint or behavior which will impact the design of streaming
167+
systems you may build on Delta Lake. Very little of what we learned working
168+
with Databricks was _not_ documented, in almost all cases we had misread,
169+
misunderstood, or simply missed a salient detail in the documentation.
170+
171+
172+
---
173+
174+
We started working on the real-time data platform in January, our
175+
first production streaming workloads were deployed by March. Adopting Delta
176+
Lake allowed us to move quickly and deploy streaming systems at an incredible
177+
pace. For a company that has a long history of batch workloads, the sudden
178+
arrival of streaming data has been transformative. Rather than waiting 24-48
179+
hours for data in some cases, data consumers are able to access newly written
180+
data within _seconds_.
181+
1.09 MB
Loading
Loading

0 commit comments

Comments
 (0)