|
| 1 | +--- |
| 2 | +layout: post |
| 3 | +title: "Importing MySQL Data into Delta Lake" |
| 4 | +author: alexk |
| 5 | +tags: |
| 6 | +- databricks |
| 7 | +- spark |
| 8 | +- deltalake |
| 9 | +team: Data Engineering |
| 10 | +--- |
| 11 | + |
| 12 | +OLTP databases are a common data source for Data Lake based warehouses which use Big Data tools to run |
| 13 | +batch analytics pipelines. Classic hadoop toolset comes with |
| 14 | +[Apache Sqoop](https://sqoop.apache.org/) - a tool for bulk import/export |
| 15 | +of data between HDFS and relational data stores. Our pipelines were using this tool as well, primarily |
| 16 | +to import MySQL data into HDFS. When Platform Engineering team at Scribd took on a effort |
| 17 | +to migrate our on-premise Hadoop workloads to [Databricks Lakehouse Platform](https://databricks.com/product/data-lakehouse) |
| 18 | +on AWS we had to write our own tool to import data from MySQL directly into S3 backed [Delta Lake](https://delta.io/). |
| 19 | +In this post I will share the details about [sql-delta-import](https://github.com/scribd/sql-delta-import) - an |
| 20 | +open-source spark utility to import data from any JDBC compatible database into Delta Lake |
| 21 | + |
| 22 | +### Sample import |
| 23 | + |
| 24 | +Importing data into a Delta Lake table is as easy as |
| 25 | + |
| 26 | +```shell script |
| 27 | +spark-submit / |
| 28 | +--class "com.scribd.importer.spark.ImportRunner" sql-delta-import_2.12-0.1.0-SNAPSHOT.jar / |
| 29 | +--jdbc-url jdbc:mysql://hostName:port/database / |
| 30 | +--source source.table |
| 31 | +--destination destination.table |
| 32 | +--split-by id |
| 33 | +``` |
| 34 | + |
| 35 | +### This looks a lot like `sqoop`... why didn't you just use that? |
| 36 | + |
| 37 | +We considered using `sqoop` at first but quickly dismissed that option for multiple reasons |
| 38 | + |
| 39 | +#### 1. Databricks Lakehouse Platform does not come with `sqoop` |
| 40 | +Yes we could have ran our sqoop jobs on EMR clusters but we wanted to run everything in Databricks and |
| 41 | +avoid additional technology footprint. But even if we drop that restriction... |
| 42 | + |
| 43 | +#### 2. `sqoop` does not support writing data directly to Delta Lake |
| 44 | +`scoop` can only import data as text or parquet. Writing to delta directly allows us to |
| 45 | +optimize data storage for best performance on reads by just adding a couple of configuration options |
| 46 | + |
| 47 | +```shell script |
| 48 | +spark-submit / |
| 49 | +--conf spark.databricks.delta.optimizeWrite.enabled=true / |
| 50 | +--conf spark.databricks.delta.autoCompact.enabled=true / |
| 51 | +--class "com.scribd.importer.spark.ImportRunner" sql-delta-import_2.12-0.1.0-SNAPSHOT.jar / |
| 52 | +--jdbc-url jdbc:mysql://hostName:port/database / |
| 53 | +--source source.table |
| 54 | +--destination destination.table |
| 55 | +--split-by id |
| 56 | +``` |
| 57 | + |
| 58 | +#### 3. `--num-mappers` just not good enough to control parallelism when working with a database |
| 59 | +`sqooop` uses map-reduce under the hood. We can specify `--num-mappers` parameter that controls how many |
| 60 | +mappers will be used to import data. Small number of mappers can result in large volume |
| 61 | +of data per import and long running transactions. Large number of mappers will result in many connections |
| 62 | +to database potentially overloading it especially when there are a lot of `sqoop` jobs running in parallel. |
| 63 | +Additionally since there are no reduce stages in `sqoop` jobs large number of mappers will result in large |
| 64 | +number of output files and potentially introducing a small files problem. |
| 65 | + |
| 66 | +`sql delta import` uses `--chunks` parameter to control number of... well... chunks to split the source table |
| 67 | +into and standard spark parameters like `--num-executors` and `--executor-cores` to control data import |
| 68 | +concurrency thus allowing you to tune those parameters independently |
| 69 | + |
| 70 | +```shell script |
| 71 | +spark-submit --num-executors 15 --executor-cores 4 / |
| 72 | +--conf spark.databricks.delta.optimizeWrite.enabled=true / |
| 73 | +--conf spark.databricks.delta.autoCompact.enabled=true / |
| 74 | +--class "com.scribd.importer.spark.ImportRunner" sql-delta-import_2.12-0.1.0-SNAPSHOT.jar / |
| 75 | +--jdbc-url jdbc:mysql://hostName:port/database / |
| 76 | +--source source.table |
| 77 | +--destination destination.table |
| 78 | +--split-by id |
| 79 | +--chunks 500 |
| 80 | +``` |
| 81 | + |
| 82 | +in the example above source table will be split into 500 chunks resulting in quick transactions and released connections |
| 83 | +but no more than 60 concurrent connections will be used for import since max degree of parallelism is 60 (15 executors x 4 cores). |
| 84 | +`delta.optimizeWrite` and `delta.autoCompact` configuration will yield optimal file size output for the destination table |
| 85 | + |
| 86 | +#### 3.1 `--num-mappers` and data skew just don't play nicely together |
| 87 | + |
| 88 | +When `sqoop` imports data, source table will be split into ranges based on `--split-by` column and each mapper |
| 89 | +would import it's corresponding range. This works good when `--split-by` column has a near uniform distribution |
| 90 | +of data, but that's not always the case with source tables... As tables age we tend to add additional columns to them to |
| 91 | +take on new business requirements so over time data in latest rows has a higher fill rate than earlier rows. |
| 92 | + |
| 93 | + |
| 94 | + |
| 95 | +Our source tables here at Scribd definitely have these characteristics. We also have some tables that have entire |
| 96 | +ranges of data missing due to data cleanup. At some point large chunks of data were just deleted from these tables. |
| 97 | + |
| 98 | + |
| 99 | + |
| 100 | +This type of data skew will result in processing time skew and output file size skew when you can only control number of |
| 101 | +mappers. Yes we can introduce additional computed synthetic column in the source table as our `split-by` column but now |
| 102 | +there is an additional column that does not add business value, app developers need to be aware of it, computing and |
| 103 | +storing it takes up database resources and if we plan to use it for imports it's better be indexed, thus even more |
| 104 | +compute and storage resources. |
| 105 | + |
| 106 | +With `sql-delta-import` we can "solve" this problem by making number of chunks much larger than max degree of parallelism. |
| 107 | +This way large chunks with high data density are broken up into smaller pieces that a single executor can handle. |
| 108 | +Executors that get chunks with little or no data can just quickly process them and move on to do some real work. |
| 109 | + |
| 110 | + |
| 111 | +### Advanced use cases |
| 112 | + |
| 113 | +For advanced use cases you don't have to use provided spark application directly. `sql-delta-import` |
| 114 | +libraries can be imported into your own project. You can specify custom data transformations or JDBC dialect to gain a |
| 115 | +more precised control of data type handling |
| 116 | + |
| 117 | +```scala |
| 118 | +... |
| 119 | +import com.scribd.importer.spark._ |
| 120 | +import com.scribd.importer.spark.transform.DataTransform._ |
| 121 | + |
| 122 | + implicit val spark: SparkSession = SparkSession.builder().master("local").getOrCreate() |
| 123 | + |
| 124 | + |
| 125 | + // All additional possible jdbc connector properties described here - https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-configuration-properties.html |
| 126 | + val jdbcUrl = "jdbc:mysql://hostName:port/database" |
| 127 | + |
| 128 | + val config = ImportConfig(source = "table", destination = "target_database.table", splitBy = "id", chunks = 10) |
| 129 | + |
| 130 | + // Whatever functions are passed to below transform will be applied during import |
| 131 | + val transforms = new DataTransform(Seq( |
| 132 | + df => df.withColumn("id", col("id").cast(types.StringType)), //custom function to cast id column to string |
| 133 | + timeStampsToStrings //included transform function converts all Timestamp columns to their string representation |
| 134 | + )) |
| 135 | + |
| 136 | + val importer = new JDBCImport(jdbcUrl = jdbcUrl, importConfig = config, dataTransform = transforms) |
| 137 | + |
| 138 | + importer.run() |
| 139 | +``` |
| 140 | + |
| 141 | +--- |
| 142 | +Prior to migrating to Databricks Lakehouse Platform we had roughly 300 `sqoop` jobs. We were able to |
| 143 | +successfully port all of them to `sql-delta-import`. Today they happily coexist in production with other spark |
| 144 | +jobs allowing us to use uniform set of tools for orchestrating, scheduling, monitoring and logging for all of our jobs. |
0 commit comments