Skip to content

Commit e42b8be

Browse files
committed
Detail integration with Datadog and Databricks based off internal documentation
I'm ghost-writing this for @houqp since he already did the majority of the work on our internal documentation 😸
1 parent 979a1ed commit e42b8be

File tree

1 file changed

+176
-0
lines changed

1 file changed

+176
-0
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
---
2+
layout: post
3+
title: "Integrating Databricks jobs with Datadog"
4+
author: qphou
5+
tags:
6+
- featured
7+
- databricks
8+
- datadog
9+
- datapipe
10+
team: Core Platform
11+
---
12+
13+
Batch and streaming Spark jobs are an integral part of our data platform and
14+
like our other production applications, we need
15+
[Datadog](https://datadoghq.com) instrumentation. We rely on
16+
[Databricks](https://databricks.com/customers/scribd) to power those Spark
17+
workloads, but integrating Datadog and Databricks wasn't turn-key. In this
18+
post, I'll share the two code snippets necessary to enable this integration: a custom cluster init script, and a special class to load into the Spark job.
19+
20+
Rather than relying on the Spark UI in Databricks, piping these metrics into
21+
Datadog allows us to build extremely useful dashboards and more importantly
22+
**monitors** for our Spark workloads that can tie into our alerting
23+
infrastructure.
24+
25+
26+
## Configuring the Databricks cluster
27+
28+
When creating the cluster in Databricks, we use the following init script-based
29+
configuration to set up the Datadog agent. It also likely possible to set this
30+
up via [customized containers with Databricks Container
31+
Services](https://docs.databricks.com/clusters/custom-containers.html) but the
32+
`databricks` runtime images don't get updated as frequently enough for our
33+
purposes.
34+
35+
* Add cluster init script to setup datadog below
36+
* Set following environment variables for the cluster:
37+
* `ENVIRONMENT=development/staging/production`
38+
* `APP_NAME=your_spark_app_name`
39+
* `DATADOG_API_KEY=KEY`
40+
41+
All your Datadog metrics will be automatically tagged with `env` and `spark_app` tags.
42+
43+
44+
```bash
45+
#!/bin/bash
46+
# reference: https://docs.databricks.com/clusters/clusters-manage.html#monitor-performance
47+
#
48+
# This init script takes the following environment variables as input
49+
# * DATADOG_API_KEY
50+
# * ENVIRONMENT
51+
# * APP_NAME
52+
53+
echo "Setting up metrics for spark applicatin: ${APP_NAME}"
54+
echo "Running on the driver? $DB_IS_DRIVER"
55+
echo "Driver ip: $DB_DRIVER_IP"
56+
57+
if [[ $DB_IS_DRIVER = "TRUE" ]]; then
58+
cat << EOF >> /home/ubuntu/databricks/spark/conf/metrics.properties
59+
*.sink.statsd.host=${DB_DRIVER_IP}
60+
EOF
61+
62+
DD_INSTALL_ONLY=true \
63+
DD_AGENT_MAJOR_VERSION=7 \
64+
DD_API_KEY=${DATADOG_API_KEY} \
65+
DD_HOST_TAGS="[\"env:${ENVIRONMENT}\", \"spark_app:${APP_NAME}\"]" \
66+
bash -c "$(curl -L https://raw.githubusercontent.com/DataDog/datadog-agent/7.20.0-rc.10/cmd/agent/install_script.sh)"
67+
68+
69+
cat << EOF >> /etc/datadog-agent/datadog.yaml
70+
use_dogstatsd: true
71+
# bind on all interfaces so it's accessible from executors
72+
bind_host: 0.0.0.0
73+
dogstatsd_non_local_traffic: true
74+
dogstatsd_stats_enable: false
75+
logs_enabled: false
76+
cloud_provider_metadata:
77+
- "aws"
78+
EOF
79+
80+
# NOTE: you can enable the following config for debugging purpose
81+
echo "dogstatsd_metrics_stats_enable: false" >> /etc/datadog-agent/datadog.yaml
82+
83+
sudo service datadog-agent start
84+
fi
85+
```
86+
87+
Once the cluster has been launched with the appropriate Datadog agent support,
88+
we must then integrate a Statsd client into the Spark app itself.
89+
90+
### Instrumenting Spark
91+
92+
Integrating Statsd in Spark is _very_ simple, but for consistency we use a
93+
variant of the `Datadog` class listed below. Additionally, for Spark Streaming applications,
94+
the `Datadog` class also comes with a helper method that you can use to forward
95+
all the streaming progress metrics into Datadog:
96+
97+
```scala
98+
datadog.collectStreamsMetrics
99+
```
100+
101+
By invoking this method, all streaming progress metrics will be tagged with `spark_app` and `label_name`
102+
tags. We use these streaming metrics to understand stream lag, issues with our
103+
batch sizes, and a number of other actionable metrics.
104+
105+
And that’s it for the application setup!
106+
107+
108+
```scala
109+
import com.timgroup.statsd.{NonBlockingStatsDClientBuilder, StatsDClient}
110+
import org.apache.spark.sql.SparkSession
111+
import org.apache.spark.sql.streaming.StreamingQueryListener
112+
113+
import scala.collection.JavaConverters._
114+
115+
/** Datadog class for automating Databricks <> Datadog integration.
116+
*
117+
* NOTE: this package relies on datadog agent to be installed and configured
118+
* properly on the driver node.
119+
*
120+
* == Example ==
121+
* implicit val spark = SparkSession.builder().getOrCreate()
122+
* val datadog = new Datadog(AppName)
123+
* // automatically forward spark streaming metrics to datadog
124+
* datadog.collectStreamsMetrics
125+
*
126+
* // you can use `datadog.statsdcli()` to create statsd clients from both driver
127+
* // and executors to emit custom emtrics
128+
* val statsd = datadog.statsdcli()
129+
* statsd.count(s"${AppName}.foo_counter", 100)
130+
*/
131+
class Datadog(val appName: String)(implicit spark: SparkSession) extends Serializable {
132+
val driverHost: String = spark.sparkContext.getConf
133+
.getOption("spark.driver.host")
134+
.orElse(sys.env.get("SPARK_LOCAL_IP"))
135+
.get
136+
137+
def statsdcli(): StatsDClient = {
138+
new NonBlockingStatsDClientBuilder()
139+
.prefix(s"spark")
140+
.hostname(driverHost)
141+
.build()
142+
}
143+
144+
val metricsTag = s"spark_app:$appName"
145+
146+
def collectStreamsMetrics(): Unit = {
147+
spark.streams.addListener(new StreamingQueryListener() {
148+
val statsd: StatsDClient = statsdcli()
149+
override def onQueryStarted(queryStarted: StreamingQueryListener.QueryStartedEvent): Unit = {}
150+
override def onQueryTerminated(queryTerminated: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
151+
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
152+
val progress = event.progress
153+
val queryNameTag = s"query_name:${progress.name}"
154+
statsd.gauge("streaming.batch_id", progress.batchId, metricsTag, queryNameTag)
155+
statsd.count("streaming.input_rows", progress.numInputRows, metricsTag, queryNameTag)
156+
statsd.gauge("streaming.input_rows_per_sec", progress.inputRowsPerSecond, metricsTag, queryNameTag)
157+
statsd.gauge("streaming.process_rows_per_sec", progress.processedRowsPerSecond, metricsTag, queryNameTag)
158+
progress.durationMs.asScala.foreach { case (op, v) =>
159+
statsd.gauge(
160+
"streaming.duration", v, s"operation:$op", metricsTag, queryNameTag)
161+
}
162+
}
163+
})
164+
}
165+
}
166+
```
167+
168+
**Note:** : There is a known issue for Spark applications that exits
169+
immediately after an metric has been emitted. We still have some work to do in
170+
order to properly flush metrics before the application exits.
171+
172+
---
173+
174+
In the future a more "native" integration between Databricks and Datadog would
175+
be nice, but these two code snippets have helped bridge a crucial
176+
instrumentation and monitoring gap with our production Spark workloads. Hopefully you find them useful!

0 commit comments

Comments
 (0)