diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..976f73c --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*.pyc +srv.pid +pelican.pid +.DS_Store +.idea/ +*.iml diff --git a/Architectural_Patterns_for_Near_Real-Time_Data_Processing_with_Apache_Hadoop.html b/Architectural_Patterns_for_Near_Real-Time_Data_Processing_with_Apache_Hadoop.html index a2a2252..40e8d93 100644 --- a/Architectural_Patterns_for_Near_Real-Time_Data_Processing_with_Apache_Hadoop.html +++ b/Architectural_Patterns_for_Near_Real-Time_Data_Processing_with_Apache_Hadoop.html @@ -3,7 +3,7 @@
-你好,Eric Levine!Eric是我们的信用和安全组的一名工程主管。Eric告诉我们如何把坏人留下然后去土耳其的港湾(就是一个池子,但是叫“港湾”更好听些)游泳。 +(译注:这貌似是在开玩笑……参考本文最后一段,貌似是去游泳时不叫着同事。不好理解,欢迎指正)
+我从事计算机科学的道路起源于我的哥哥和导师Matthew。Matthew大学毕业回家以后,想要教他的小弟弟写些代码。他教了我一些基础知识,然后帮我从书上继续学习,继续开发。由此开始,等到我高中毕业的时候,我已经在运营一个有十几个普通用户的网络游戏了。
+我最后到Airbnb是挺长的一个过程,从2008年开始。08年的时候,我收到了一个YouTube的猎头的电话,问我想不想去YouTube实习。这个实习以一种非常积级的方式改变了我的职业生涯。四年以前,我在YouTube +时候的同一个猎头,向我提供了一个在Airbnb工作的机会。因为有了以前的事,我知道自己可以相信她,并且从外边看来,Airbnb很有吸引力。当时我还在Google工作,我认为自己积累的技能并不如我希望的那样是可移植的。因为你在离开的时候,不能带着很多Google的基础设施一起走,但是我想要自己的技能更灵活一些。Airbnb很适合我的想法,这也的确是我做出的最佳决策。
+在我加入Airbnb以后,遇到的最有趣的技术挑战绝对是我们基于机器学习的风险检测系统。我的同事Nassem HaKim和Aaron Keys写过一篇很好的博客来描述这个系统,接下来的迭代一定会使这个系统更上一层楼。
+我们想要开始思考关于已认证ID(Verified ID) +的事,我们想要在这方面做更多。现在处于一个很好的开始,我们开始将我们从其它领域学到的东西进行应用,把它们应用于身份识别,来更好地理解我们这个平台的用户。接下来,我们还有很多非常棒的工作计划来提升我们的系统,来对我们的社区进行更多的保护。
+我最喜欢的核心价值是“拥抱冒险”(Embrace the Adventure)。 我对它的理解是:要认识到想要完美是不可能的,当遇到未知的情况时,一个人要学会“逆来顺受(roll with the punches)”。我们鼓励员工来修补有问题的东西,来真正地做产品的主人,将这个核心价值来应用到我们的工程文化当中。是这个核心价值以及我们应用它的方式使得Airbnb如此独特。
+我最棒的Airbnb体验是和一个土耳其的小村庄Kirazli的一对夫妇一起。房子非常惊艳,设计得很美,并且很适合我们停留。每天早上我被村庄呼叫祷告者的声音唤醒。在懒懒地起床之后,在主人提供传统土耳其早餐之前,我会出去游一小会泳。主人接着会带我去山上走一走,从树上摘些新鲜的无花果。整个体验非常愉快和令人鼓舞。
+ +你是否曾想过Airbnb给房东的价格提示是怎么实现的呢?
+
在这个动态定价功能里,我们给房东显示产生订单的概率(绿色表示几率大,红色表示几率小),或者说预测的订单量,让房东能够简单的点点按钮,就能够对他们的房源进行动态调价。
+有很多特征会季节性的参与到房源订单预测中,针对房源和价格的独特特征。这些特征之间以复杂的方式进行相互作用,结果是机器学习模型难以解释。于是我们开始开发一个机器学习包,帮助产生可解释、可理解的机器学习模型。这对我们的开发人员和用户都有很大帮助,可解释性意味着我们可以向房东解释,为什么他们得到的实际订单量比他们期望的订单量低或者高。
+引入Aerosolve:一个为人类构建的机器学习包。
+我们已经实践了让人和机器(机器学习)搭档共同工作的理念,其结果也超过了人或者机器单独工作的效果。
+从项目初期,我们就致力于通过易于理解的模型协助人们解释复杂的数据,以改善对数据集的理解。Aerosolve模型将数据暴露出来让人们易于理解,而不是将其含义隐藏在模型错综复杂的层次之中。
+举个例子,我们可以简单的通过下图,得到一个房源在市场上的价格与其产生的订单之间的负相关关系。我们让模型很宽,每一个变量或者变量的组合,都用一个明确的附加函数来建模,而不是将特征传给很多个非线性的深深隐藏的层。这使得模型易于解释,同时保证了学习能力。
+
图1 模型权重 vs. 市场价格百分比
+图中的红线表示的是经验值,在这个实例中,我们通常认为需求会随着价格的增加而下降。在Aerosolve中,我们可以通过一个简单的文本配置,把经验值添加到训练过程中,影响模型。图中的黑线是用数百万数据学习得到的模型的值,模型使用实际数据修正了人的假设,同时允许人为设置变量初始值。
+我们对全球的社区进行建模,基于Airbnb房源的位置信息,通过算法自动生成本地社区。这跟手工实现的社区多边形有两个不同之处。首先,它们是自动创建的,我们可以为新建市场快速构建社区信息;其次,它们是分层构建的,我们可以快速可扩展的累积点状(如房源浏览)或者多边形(如搜索结果)的统计信息。
+分层还让我们能够借助上层社区的统计优势,上层社区完全包含子社区。这些使用Kd-tree构建的社区对用户是不可见的,但是它们会用于计算一些地域特征,并最终应用在机器学习模型中。我们用下图展示Kd-tree数据结构自动创建本地社区的能力。这里,注意到我们让算法避免跨越较大的水域。即使是“金银岛”也会有它自己的社区。为了避免社区边界突然变化,我们采用了多种方法来平滑社区信息。你可以进一步阅读,在Github上的Image Impressionism demo of Aerosolve中,你能看到这种类型的平滑。
+
图2 为San Francisco自动生成的本地社区示意图
+因为每一个房源都有它的独特之处,我们为Aerosolve创建了图像分析算法,负责处理房源的装修、内饰等细节。我们使用两种训练数据来训练Aerosolve模型,下图左侧模型是我们基于专业摄影师的打分训练得出的,右侧模型是在自然预定数据基础上训练得出的。
+
图3 通过机器学习做图片排序。左侧:图片基于专业摄影师打分的训练结果排序。右侧:图片基于自然预定、点击、展现数据的训练结果排序。
+我们还负责很多其他计算任务,包括地域事件。例如下图,我们可以发现SXSW(译注:西南偏南,美国德克萨斯州州府奥斯汀举办的音乐节)期间Austin的住宿需求的增加,我们可能会让房东在这个需求高峰期开门迎客。
+
图4 Austin的季节性需求
+有一些特征,例如季节性的需求天生就是“很尖的”。其他的一些特征,例如评论数量,则一般不应该体现出同样的剧烈波动。我们使用三次多项式曲线(cubic polynomial splines)来平滑那些比较平滑的特征,使用迪拉克δ函数(Dirac delta functions)来保留波动。例如,在表示评论数和三星数(总共5星)的关系时,没有评论和有一个评论之间存在很强的不连续性。
+
图5 使用二项式曲线平滑特征
+最后,当所有的特征都转换、平滑之后,所有的这些数据会集成到一个价格模型中,这个模型有成千上万互相作用的参数,房东可以根据这个模型的预测结果,得到一个房源的定价带来订单的概率。
+你可以通过Github检出Aerosolve的源码。里面提供了一些实例,阐述了怎样将Aerosolve应用到你自己的模型中,例如,教一个算法绘出一幅点彩画派风格的作品。例子里面还有一个基于美国人口普查数据预测收入的实例。
+
图6 Aerosolve学习绘出点彩画派风格的作品
+ +Spark和MapReduce使用YARN管理集群资源的简单比较。
+继MapReduce之后,最著名的Apache YARN应用要数Apache Spark了。在Cloudera,我们通过努力让Spark-on-YARN(SPARK-1101)保持稳定,并在CDH 5.0.0中加入了对Spark的支持。
+通过这篇文章你可以学习Spark和MapReduce在架构上的区别,了解你为什么要关注这个问题的原因,以及搞清楚它们是怎样在YARN集群资源管理系统上运行的。
+在MapReduce中,计算的最上层单元是是job。系统加载数据,执行一个map函数,shuffle数据,执行一个reduce函数,然后将数据写回到持久化存储器。Spark有一个类似job的概念(虽然一个job可以由多个stage组成,而不是仅仅只包含map和reduce),不过Spark还有一个更高层级的概念叫做应用程序(application),应用程序可以以并行或者串行的方式跑多个job。
+
熟悉Spark API的人都知道,一个应用程序对应着一个SparkContext类的实例。一个应用程序可以只用于一个批量计算的任务,或者一个包含了多个独立任务的交互式会话,再或是一个持续响应请求的服务。不同于MapReduce,一个应用程序拥有一系列进程,叫做executors,他们在集群上运行,即使没有job在运行,这些executor仍然被这个应用程序占有。这种方式让数据存储在内存中,以支持快速访问,同时让task快速启动成为现实。
+MapReduce的每个task是在其自己的进程中运行的,当一个task结束,进程随之终结。在Spark中,多个task可以在同一个进程中并行的执行,并且这些进程常驻在Spark应用程序的整个生命周期中,即使是没有job在运行的时候。
+上述Spark模型的优势在于速度:task可以快速的启动,处理内存中的数据。不过他的缺点是粗粒度的资源管理。因为一个应用程序的executor数量是固定的,并且每个executor有固定的资源分配,一个应用程序在其整个运行周期中始终占有相同数量的资源。(当YARN支持container resizing时,我们计划在Spark中利用它,实现资源动态分配。)(译注:原文写于2014年05月30日)
+Spark需要依赖一个活跃的driver(active driver)进程来管理job和调度task。一般的,这个driver进程跟启动应用程序的客户端进程是同一个,不过在YARN模式中(后面会讲到),driver进程可以在集群中运行。相反的,在MapReduce中,客户端进程在job启动之后可以退出,而job还会继续运行。在Hadoop 1.x中,JobTracker负责task的调度,在Hadoop 2.x中,MapReduce应用程序master接管了这个功能。
+Spark支持可插拔的集群管理。集群管理系统负责启动executor进程。Spark应用程序开发人员不必担心到底使用的是哪一种集群管理系统。
+Spark支持YARN、Mesos以及自带的Standalone集群管理系统。这三个系统都包含了两个组件。一个master服务(YARN ResourceManager、Mesos master或Spark standalone master),他会决定哪个应用程序运行executor进程,同时也决定在哪儿、什么时候运行。每个节点上运行一个slave服务(YARN NodeManager, Mesos slave或者Spark standalone slave),slave实际启动executor进程。资源管理系统同时还监控服务的可用性和资源消耗。
+使用YARN来管理集群资源较使用Spark standalone和Mesos有一些优势:
+YARN集中配置一个集群资源池,允许运行在YARN之上的不同框架动态共享这个资源池。你可以用整个集群跑一个MapReduce任务,然后使用其中一部分做一次Impala查询,另一部分运行一个Spark应用程序,并且完全不用修改任何配置。
+你可以利用YARN的所有调度特性来做分类、隔离以及任务优先。
+最后,YARN是唯一支持Spark安全特性的集群资源管理系统。使用YARN,Spark可以在Kerberized Hadoop之上运行,在它的进程之间使用安全认证。
+当在YARN上运行Spark时,每个Spark executor作为一个YARN container运行。MapReduce为每一个task生成一个container并启动一个JVM,而Spark在同一个container中执行多个task。Spark的这种方式让task的启动时间快了几个量级。
+Spark支持以yarn-cluster和yarn-client两种模式在YARN上运行。明显的,yarn-cluster用于产品环境较为合理,而yarn-client更适合用在交互式调试的场景中,它能让开发者实时的看到应用程序的输出。
+要想理解这两种模式的区别,需要先理解YARN应用程序的master概念。在YARN中,每个应用程序实例都有一个master进程,它是为应用程序启动的第一个container。应用程序负责从ResourceManager申请资源,同时分配这些资源,告诉NodeManagers为该应用程序启动container。master的存在,就可以避免一个活跃的客户端 - 启动程序的进程可以退出,而由YARN管理的进程会继续在集群中运行。
+在yarn-cluster模式中,driver在master上运行,这意味着这个进程同时负责驱动应用程序和向YARN申请资源,并且这个进程在YARN container中运行。启动应用程序的客户端不用常驻在应用程序的运行周期中。下图是yarn-cluster模式:
+
然而yarn-cluster并不太适合交互式的Spark实用场景。需要用户输入,如spark-shell和PySpark,需要Spark deriver在启动应用程序的客户端进程中运行。在yarn-client模式中,master仅仅出现在向YARN申请executor container的过程中。客户端进程在containers启动之后,与它们进行通讯,调度任务。下图是yarn-client模式:
+
下图的表格简要的给出了这两种模式的区别:
+
应用程序(Application):应用程序可以是一个单独的job,也可以是一系列的job,或者是一个持续运行的可以进行交互的会话服务。
+Spark Driver:Spark driver是在Spark上下文(代表一个应用程序会话)中运行的一个进程。Driver负责将应用程序转化成可在集群上运行的独立的步骤。每个应用程序有一个driver。
+Spark Application Master:master负责接受driver的资源请求,然后向YARN申请资源,并选择一组合适的hosts/containers来运行Spark程序。每个应用程序有一个master。
+Spark Executor:在计算节点上的一个JVM实例,服务于一个单独的Spark应用程序。一个executor在他的生命周期中可以运行多个task,这些task可以是并行运行的。一个节点可能有多个executor,一个应用程序又会有多个节点在运行executor。
+Spark Task:一个Spark Task表示一个任务单元,它在一个分区上运行,处理数据的一个子集。
+原文作者Sandy Ryza是Cloudera的数据科学家,同时还是Apache Hadoop的贡献者。
+ +原编者注:“Estimating Financial Risk with Spark”是原文作者Sandy在Spark Summit East 2015的一个演讲,内附视频和演讲的内容。
-当你在编写Spark程序时,需要理解transformation、action和RDD,它们是编写Spark程序的基础。然而当你发现任务失败,或者从打算从Spark的web UI去查找任务运行太慢的原因时,你会发现你还需要理解job、stage和task,它们对于写出“好”的、“快”的Spark程序至关重要。只有理解了Spark的底层运行机制,才能在不同场景下写出高效的程序。
-通过这篇文章,你可以了解Spark程序是如何在集群上执行的,文章最后会给出一些如何写出高效Spark程序的实用样例。
-一个Spark程序的运行需要一个driver进程和一组分散在集群节点上的executor进程。
-Driver进程负责控制任务的总体流程,而executor进程则是负责以task的形式来执行任务,同时存储用户选择cache的任何数据。Driver进程和executor进程一般都是常驻在整个程序运行的生命周期的,然而"dynamic resource allocation"实现了动态分配executor。每个executor都有一定数量的slots(槽位)来执行task,并且在executor的整个生命周期中,多个task是可以并行执行的。虽然集群上的具体处理过程会跟所使用的集群管理系统有关(YARN、Mesos、Spark ...
-Apache Spark在批处理/ETL和流式处理方面的广泛应用,让Lambda架构成为可能。
+几件事让你将注意力集中在从一个临时需求,到一个核心项目上。
+记得有一次,在跟一个客户历经了三周时间,设计实现了一个POC(proof-of-concept,概念验证)数据摄取管道之后,客户的架构主管告诉我们:
+你知道吗,我很喜欢这个设计 - 我喜欢数据在到达时已经经过验证;我喜欢设计里面,将原始数据存储起来做研究分析,同时又做了一层汇总预处理,以快速响应业务分析师的需求;我还喜欢自动处理数据延迟,以及自动处理数据结构和算法变化的机制。
+但是,他继续说,我真的希望有一个实时处理模块。从数据被收集,到数据报表展现有一个小时的时延。我知道这样做可以提高效率,同时也让我们能够对数据做清洗。但是在我们的部分应用场景中,能够立马对新的数据做出反应,比保证数据100%经过验证更为重要。
+我们能快速的在POC中增加一个实时数据处理模块吗?它可以让我们的用户更加满意。
+这个架构师客户所提到的正是我们所说的Lambda架构 - 最早由Nathan Marz提出,该架构通常包含批处理和实时处理两个部分。人们经常同时需要这两部分(译注:而不仅仅是实时部分),原因是实时的数据天生存在一个问题:不能百分百保证每条数据发送/接收了一次,存在一些重复发送/接收的数据,会产生噪音。数据因为网络或者服务器不稳定造成的延迟,也常常会导致一些问题。Lambda架构通过处理两次数据来处理这些问题:一次是数据实时处理,另一次是数据批量处理 - 实时处理保证快速,而批量处理保证可靠。
+但是这种方法有一定的代价:你需要在两个不同的系统中实现和维护两套相同的业务逻辑。举个例子,如果你的批处理系统是基于Apache Hive或者Apache Pig实现的,而实时系统是基于Apache Storm实现的,那么你需要分别用SQL和Java编写和维护同样的汇总逻辑。就像Jay Kreps的文章Questioning the Lambda Architecture提到的那样,这种情况会很快变成运维噩梦。
+如果我们是基于Hive为这个客户实现的POC系统,我不得不告诉他们:“不,已经没有足够的时间让我们基于Storm重新实现整个业务逻辑了。”,不过幸运的是,我们用的是Apache Spark,而不是Hive。
+Spark是一个有名的机器学习框架,不过它同样很适合做ETL任务。Spark有一套简洁易用的APIs(比MapReduce可读性强,也没有MapReduce那么呆板),它的REPL接口还允许跟业务人员快速的进行业务逻辑原型设计。很显然,如果汇总数据执行的显著比MapReduce快的话,是没有人会抱怨的。
+在这个实例中,Spark给我们带来的最大好处是Spark Streaming,它让我们能够在实时流处理中重用为批处理编写的汇总程序。我们不需要重新实现一套业务逻辑,也不用测试、维护另一套代码。这样的话,我们就可以迅速的在剩余的有限时间内部署一套实时系统 - 给用户、开发人员,甚至他们的管理层都留下了深刻的印象。
+这里用一个简单的例子来说明我们是怎么做的。简单起见,只列出了最重要的步骤,可以点这里查看完整的源码。
+1 首先,我们写了一个函数来实现业务逻辑。在这个例子里面,我们想要从一系列事件日志中统计每天的errors的数量。这些日志由日期、时间、日志级别、进程以及日志内容组成: +
+14/08/07 19:19:26 INFO Executor: Finished task ID 11 ++为了统计每天的errors数量,我们需要对每天的日志按照日志等级过滤,然后进行计数: +
+def countErrors(rdd: RDD[String]): RDD[(String, Int)] = {
+ rdd
+ .filter(_.contains("ERROR")) // Keep "ERROR" lines
+ .map( s => (s.split(" ")(0), 1) ) // Return tuple with date & count
+ .reduceByKey(_ + _) // Sum counts for each date
+}
+
+在上述函数中,我们过滤保留了所有含有“ERROR”的行,然后用一个map函数,将每一行的第一位,也就是日期作为key。然后运行一个reduceByKey函数计算得到每天的errors数量。
+如我们所见,这个函数将一个RDD转换成另一个。RDD是Spark的主要数据结构。Spark对用户隐藏了处理分布式数据集的复杂操作,我们可以像处理其他任何数据那样处理RDD。
+2 我们可以在Spark ETL过程中使用这个函数,从HDFS读取数据到RDD,统计errors,然后将结果保存到HDFS: +
+val sc = new SparkContext(conf) +val lines = sc.textFile(...) +val errCount = countErrors(lines) +errCount.saveAsTextFile(...) ++
在这个例子里面,我们初始化一个SparkContext来在Spark集群中执行我们的代码。(注:在Spark REPL中我们不必自己初始化SparkContext,REPL自动做了初始化)SparkContext初始化完成之后,我们用它来从文件读取数据到RDD,执行我们的计数函数,然后将结果保存到文件。
+spark.textFile and errCount.saveAsTextFile接收的URLs参数可以是hdfs://…,本地文件系统,Amazon S3或者其他的文件系统。
+3 现在,假设我们不能等到一天结束之后再去统计errors,而是每分钟都想更新每天的errors数量。我们不用重新实现汇总方法 - 我们只需要在streaming代码里面重用它: +
+val ssc = new StreamingContext(sparkConf, 60)
+// Create the DStream from data sent over the network
+val dStream = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK_SER)
+// Counting the errors in each RDD in the stream
+val errCountStream = dStream.transform(rdd => ErrorCount.countErrors(rdd))
+// printing out the current error count
+errCountStream.foreachRDD(rdd => {
+ System.out.println("Errors this minute:%d".format(rdd.first()._2))
+})
+// creating a stream with running error count
+val stateStream = errCountStream.updateStateByKey[Int](updateFunc)
+// printing the running error count
+stateStream.foreachRDD(rdd => {
+ System.out.println("Errors today:%d".format(rdd.first()._2))
+})
+
+我们又一次初始化了一个上下文,这次是SteamingContext。SteamingContext监听一个数据流事件(这个例子里面是监听网络socket,产品环境下,我们通常使用像Apache Kafka这样更可靠的服务),然后将数据转换成一个RDDs流。
+每一个RDD表示了一个微批量的流。每个微批量的时间窗是可以配置的(这个例子里面设置的是60秒一个批量),可以用来平衡吞吐量(大批量)和延迟(小批量)。
+我们在DStream上跑一个map操作,使用countErrors函数,将流里面每一行数据组成的RDD转换成(date, errorCount)的RDD。
+对于每一个RDD,我们输出每一个批量的error数量,然后用同一个RDD来更新总的error数(天的总数)的流。我们用这个流来打印总数。
+简单起见,我们这里将输出打印到屏幕,这里也可以输出到HDFS、Apache HBase或者Kafka(实时程序和用户可以使用它)。
+总的来说:Spark Streaming让你只需要实现一次业务逻辑,然后在批量ETL处理、流式处理中重用这个实现。在前述的客户使用场景中,Spark让我们可以基于批量处理快速(几个小时之内)实现一个实时数据处理层,这个时髦的demo给了我们的客户一个深刻的印象。然而这不仅仅让我们短期受益,从长期来看,这个架构只需要少量的运维开销,同时也减少了因为多套代码导致错误的风险。
+感谢Hari Shreedharan, Ted Malaska, Grant Henke和Sean Owen的贡献和反馈。
+Gwen Shapira是Cloudera的软件工程师(之前是解决方案架构师)。她也是O’Reilly出版的Hadoop Application Architectures一书的作者之一。
+ ++14/08/07 19:19:26 INFO Executor: Finished task ID 11 ++为了统计每天的errors数量,我们需要对每天的日志按照日志等级过滤,然后进行计数: +
+def countErrors(rdd: RDD\[String\]): RDD\[(String, Int)\] = {
+ rdd
+ .filter(\_.contains("ERROR")) // Keep "ERROR" lines
+ .map( s => (s.split(" ")(0), 1) ) // Return tuple with date & count
+ .reduceByKey(\_ + \_) // Sum counts for each date
+}
+
+在上述函数中,我们过滤保留了所有含有“ERROR”的行,然后用一个map函数,将每一行的第一位,也就是日期作为key。然后运行一个reduceByKey函数计算得到每天的errors数量。
+如我们所见,这个函数将一个RDD转换成另一个。RDD是Spark的[主要数据结构](http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds)。Spark对用户隐藏了处理分布式数据集的复杂操作,我们可以像处理其他任何数据那样处理RDD。
+ ++ 2 我们可以在Spark ETL过程中使用这个函数,从HDFS读取数据到RDD,统计errors,然后将结果保存到HDFS: ++val sc = new SparkContext(conf) +val lines = sc.textFile(...) +val errCount = countErrors(lines) +errCount.saveAsTextFile(...) ++
在这个例子里面,我们初始化一个SparkContext来在Spark集群中执行我们的代码。(注:在Spark REPL中我们不必自己初始化SparkContext,REPL自动做了初始化)SparkContext初始化完成之后,我们用它来从文件读取数据到RDD,执行我们的计数函数,然后将结果保存到文件。
+spark.textFile and errCount.saveAsTextFile接收的URLs参数可以是hdfs://…,本地文件系统,Amazon S3或者其他的文件系统。
+ ++ 3 现在,假设我们不能等到一天结束之后再去统计errors,而是每分钟都想更新每天的errors数量。我们不用重新实现汇总方法 - 我们只需要在streaming代码里面重用它: +
+val ssc = new StreamingContext(sparkConf, 60)
+// Create the DStream from data sent over the network
+val dStream = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK_SER)
+// Counting the errors in each RDD in the stream
+val errCountStream = dStream.transform(rdd => ErrorCount.countErrors(rdd))
+// printing out the current error count
+errCountStream.foreachRDD(rdd => {
+ System.out.println("Errors this minute:%d".format(rdd.first()._2))
+})
+// creating a stream with running error count
+val stateStream = errCountStream.updateStateByKey\[Int\](updateFunc)
+// printing the running error count
+stateStream.foreachRDD(rdd => {
+ System.out.println("Errors today:%d".format(rdd.first()._2))
+})
+
+我们又一次初始化了一个上下文,这次是SteamingContext。SteamingContext监听一个数据流事件(这个例子里面是监听网络socket,产品环境下,我们通常使用像Apache Kafka这样更可靠的服务),然后将数据转换成一个RDDs流。
+每一个RDD表示了一个微批量的流。每个微批量的时间窗是可以配置的(这个例子里面设置的是60秒一个批量),可以用来平衡吞吐量(大批量)和延迟(小批量)。
+我们在DStream上跑一个map操作,使用countErrors函数,将流里面每一行数据组成的RDD转换成(date, errorCount)的RDD。
+对于每一个RDD,我们输出每一个批量的error数量,然后用同一个RDD来更新总的error数(天的总数)的流。我们用这个流来打印总数。
+ +简单起见,我们这里将输出打印到屏幕,这里也可以输出到HDFS、Apache HBase或者Kafka(实时程序和用户可以使用它)。 + +#####结论##### + +总的来说:Spark Streaming让你只需要实现一次业务逻辑,然后在批量ETL处理、流式处理中重用这个实现。在前述的客户使用场景中,Spark让我们可以基于批量处理快速(几个小时之内)实现一个实时数据处理层,这个时髦的demo给了我们的客户一个深刻的印象。然而这不仅仅让我们短期受益,从长期来看,这个架构只需要少量的运维开销,同时也减少了因为多套代码导致错误的风险。 + +#####感谢##### + +感谢Hari Shreedharan, Ted Malaska, Grant Henke和Sean Owen的贡献和反馈。 + +Gwen Shapira是Cloudera的软件工程师(之前是解决方案架构师)。她也是O’Reilly出版的Hadoop Application Architectures一书的作者之一。 + + diff --git a/content/how-to-tune-your-apache-spark-jobs-part-1.md b/content/how-to-tune-your-apache-spark-jobs-part-1.md new file mode 100644 index 0000000..5f1e08f --- /dev/null +++ b/content/how-to-tune-your-apache-spark-jobs-part-1.md @@ -0,0 +1,154 @@ +Title: Apache Spark性能调优(一) +Slug: how-to-tune-your-apache-spark-jobs-part-1 +Date: 2015-06-16 +Category: Spark +Author: 杨文华 +Tags: Spark, YARN, Tuning +Type: 翻译 +OriginAuthor: Sandy Ryza +OriginTime: 2015-03-09 +OriginUrl: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/ + + +原编者注:[Estimating Financial Risk with Spark](http://spark-summit.org/east/2015/talk/estimating-financial-risk-with-spark)是原文作者Sandy在Spark Summit East 2015的一个演讲,内附视频和演讲的内容。 + +当你写Spark代码时,需要理解transformation、action和RDD,它们是编写Spark程序的基础;当你发现任务失败,或者打算分析任务运行慢的原因时,你还需要理解job、stage和task,它们对于写出“好”的、“快”的Spark应用程序至关重要。理解Spark的底层运行机制,对于写出高效的应用程序是非常有帮助的。 + +通过这篇文章,你可以学到Spark程序是怎样在集群上执行的基础知识,同时还可以获得怎样写出高效的Spark应用程序的一些建议。 + + +#####你的Spark程序是怎样执行的##### + + +一个Spark应用程序由一个driver进程和一组分散在集群节点上的executor进程组成。 + +Driver进程负责控制任务的总体流程,executor进程负责以task的形式来执行任务,同时存储用户需要cache的数据。Driver进程和executor进程一般都是常驻在应用程序的整个运行周期中的([dynamic resource allocation](http://spark.apache.org/docs/1.2.0/job-scheduling.html#dynamic-resource-allocation)实现了动态分配executor)。每个executor都有一定数量的slots(槽位)来执行task,在executor的整个生命周期中多个task是可以并行执行的。虽然集群上的具体处理过程会跟所使用的集群管理系统有关(YARN、Mesos、Spark Standalone),但每个Spark应用程序都有driver和executor。 + + + +Job是应用程序执行层级中的第一层,在应用程序里面执行一个action操作即会触发启动一个对应的Spark job。Spark通过检查action所依赖的RDDs来制定执行计划,这个执行计划从“最远”的RDDs(即没有其他依赖的RDDs或者是已经cache的数据)开始,到最后一个需要生成action结果的RDD结束。 + +执行计划会将job的transformation操作合并成stages。一个stage对应于一组执行同一段代码的task的集合,每个task处理数据的不同子集。每个stage都包含了一系列不需要shuffle所有数据就可以完成的transformations。 + +Spark如何确定数据是否需要shuffle呢?一个RDD是由固定数量的分区组成了,每个分区又包含了一定数量的数据记录。对于窄依赖(比如map、filter),计算一个分区的数据只来源于其父RDD的某一个分区,每一条数据记录也只对应于其父RDD中的唯一一条数据记录(特殊的,coalesce这样的transformation虽然可以依赖父RDD的多个分区,但是子RDD的某一个分区只依赖父RDD的有限分区-不需要shuffle所有数据,我们仍然认为coalesce是一种窄依赖)。 + +Spark还有另一种transformation称作宽依赖(比如groupByKey、reduceByKey),在宽依赖中,计算一个分区所需的数据可能来源于父RDD的多个分区。所有key相同的数据都需要被放到同一个分区中,且由同一个task来处理,为了实现这样的操作,Spark就需要执行shuffle,即让数据在集群中传输并最终落到一个新的stage和一些新的分区中。 + +看下面的这段代码: + +
+sc.textFile("someFile.txt").
+ map(mapFunc).
+ flatMap(flatMapFunc).
+ filter(filterFunc).
+ count()
+
+
+这段代码执行了一个action操作,这个action依赖一系列针对源于一个文本文件的RDD的transformation操作。这段代码会在一个stage执行,因为所有这三个transformation操作(map、flatMap、filter)的输出,都只依赖于各自的输入所对应的分区。
+
+相反的,看看下面这段代码,目标是在文本文件中出现次数大等于1000的单词中,找出每个字符出现的次数。
+
+
+val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
+val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
+val filtered = wordCounts.filter(_._2 >= 1000)
+val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).
+ reduceByKey(_ + _)
+charCounts.collect()
+
+
+整个过程可以划分为三个stages。reduceByKey是stage的边界,因为计算reduceByKey的结果需要按照key对数据进程重新分区。
+
+下图是一个更为复杂的转换图,包含了一个join操作。
+
+
+
+见下图,粉色方框的部分是上图的stage划分。
+
+
+
+在每一个stage边界上,数据会被父stages的任务写到磁盘(译者注:本地磁盘),然后子stage的task再通过网络拉取这些数据。由于stage边界会产生大量的磁盘和网络I/O,stage边界是昂贵的,应当尽量避免。父stage的数据分区数量跟子stage的数据分区数量可能是不一样的。触发stage边界的transformation操作(比如reduceByKey)一般都接受numPartitions这个参数,以让开发人员可以决定给子stage的数据划分多少个分区。
+
+在MapReduce的任务调优中,reducer的个数是一个重要参数,类似的,在Spark中,stage边界的分区数量调优通常也能对程序的性能起到关键性作用。在[Apache Spark性能调优(二)](/how-to-tune-your-apache-spark-jobs-part-2.html)中我们会深入探讨该数量的调优。
+
+
+#####选择正确的操作#####
+
+
+当一个开发人员尝试用Spark解决一个问题时,可以选择不同的actions和transformations的组合来得到想要的结果。然而并非所有的实现都具有相同的性能:避开一些常规的陷阱,选择一种正确的实现,往往可以让程序的执行效率完全不同。对于怎样选择合适的actions和transformations,一些规则可以给你指明方向。
+
+近期[SPARK-5097](https://issues.apache.org/jira/browse/SPARK-5097)的一些工作让SchemaRDD逐渐稳定,这将向开发人员开放Spark的Catalyst优化程序,Spark可以自动选择使用那些更优的操作。待SchemaRDD成为Spark的一个稳定组件之后,对于开发者来说,某些操作的选择将是透明的。
+
+选择合适的操作组合,主要的目的就是减少shuffle的次数和数据量,这是因为shuffle往往是昂贵的,所有的shuffle数据都必须写磁盘和走网络传输。repartition、join、cogroup,以及任何\*by、\*ByKey的transformation都会产生shuffle。并非所有的操作都具有相同的性能,新手Spark开发者常常遇到的性能陷阱是选择了错误的操作。选择合适的操作通常有以下几个规则:
+
++ 在做组合归约时,避免使用groupByKey。例如,rdd.groupByKey().mapValues(\_.sum)跟rdd.reduceByKey(\_ + \_)可以得到一样的结果,但是前者会让所有数据走网络传输,而后者会在每个分区中,对每个key先算出本地sum,然后在shuffle之后,使用这些本地sum计算全局sum。
+
++ 当输入和输出的数据类型不同时,避免使用reduceByKey。例如,想要找出每个key对应的不重复字符串,一种方法是用map操作将每条数据转换成一个Set,然后用reduceByKey来合并这些Sets:
++rdd.map(kv => (kv.\_1, new Set\[String\]() + kv.\_2)) + .reduceByKey(\_ ++ \_) ++
这段代码会产生大量的不必要的对象,因为每一条记录都会产生新的set对象。更好的方式是使用aggregateByKey,它可以更高效的处理map-side的累计:
++val zero = new collection.mutable.Set\[String\]() +rdd.aggregateByKey(zero)( + (set, v) => set += v, + (set1, set2) => set1 ++= set2) ++ ++ 避免使用flatMap-join-groupBy这种形式的组合。当两个数据集已经按照key group了,如果这时候你想join两个数据集,然后再group,你可以改而使用cogroup,后者可以避免unpacking和repacking的消耗。 + + +#####什么时候不产生shuffle##### + +知道什么情况下上述的那些transformations不产生shuffle同样很重要。当前一个transformation已经按照同样的partitioner对数据进行了分区,Spark就可以避免产生shuffle。考虑下面这段代码: + +
+rdd1 = someRdd.reduceByKey(...) +rdd2 = someOtherRdd.reduceByKey(...) +rdd3 = rdd1.join(rdd2) ++ +因为没有给reduceByKey传递partitioner,这里会使用默认的partitioner,rdd1和rdd2都会采用hash-partitioned的方式来分区,于是这两个reduceByKey会产生两次shuffle。如果RDDs(rdd1和rdd2)有同样数量的分区,那么join操作不会产生额外的shuffle。因为上述的RDDs是使用相同的分区方式,rdd1里面同一个分区中的key一定是分布在rdd2的某一个分区里面的。所以,rdd3里面任何一个分区的数据只会依赖rdd1的某一个分区以及rdd2的某一个分区,不会出现第三次shuffle。 + +假设someRdd有四个分区,而someOtherRdd有两个分区,两个reduceByKey都是三个分区,task的执行是如下图所示的这样: + + + +但是如果rdd1和rdd2使用不同的partitioner,或者使用默认partitioner(hash)但是具有不同的分区数,结果有什么不同呢?这种情况下,只有其中一个rdd(分区数较少的那一个)需要在做join时重新shuffle。 + +相同的transformation操作,相同的输入,不同的分区数量(译者注:rdd1的reduceByKey用两个分区,rdd2的reduceByKey用三个分区): + + + +对两个数据集做join操作时,一种避免shuffle的方法是利用[广播变量](http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)。当其中一个数据集足够小,小到可以放进executor的内存里面时,就可以在driver上把它加载进一个hash table,然后广播到每个executor,然后map操作可以引用这个hash table来做查找(lookups)。 + + +#####什么情况下shuffle多反而更好##### + +通常我们都需要最小化shuffle的次数。当并行度增加时,额外的shuffle反而可以提升性能。例如,如果需要处理的数据由一些较大的不可分割的文件组成,那么由InputFormat指定的分区方式可能会让每个分区都分配了大量的数据,产生的分区过少,不能充分利用可用CPU核数。在这种情况下,加载完数据之后对数据做一次repartition,给一个更大的分区数(会触发shuffle),就可以让后续的操作利用更多的集群CPU。 + +另外一种例外的场景是使用reduce或者aggregate操作来聚合数据。由于driver负责合并计算所有executor的结果,如果在一个具有很大分区的数据上做聚合,driver的计算资源很快就会成为瓶颈。为了缓解driver的负载,可以首先使用reduceByKey或者aggregateByKey来做一次分布式的聚合,以将整个数据集分解成为更小的分区。各个分区内的数据在发送给driver做最后的聚合之前,会先在各个分区内并行的合并。可以参见[treeReduce](https://github.com/apache/spark/blob/f90ad5d426cb726079c490a9bb4b1100e2b4e602/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L58)和[treeAggregate](https://github.com/apache/spark/blob/f90ad5d426cb726079c490a9bb4b1100e2b4e602/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala#L90)的例子看它们是怎么工作的(注意:在Spark1.2中,目前最近的版本都是把他们标记为developer APIs的,但是[SPARK-5430](https://issues.apache.org/jira/browse/SPARK-5430)寻求将他们添加到Spark core的稳定版本中)。 + +这个技巧在需要聚合的数据已经按照key分组的情况下特别有用。假设有这样一个场景,一个Spark应用程序需要统计一个语料库中每个单词出现的次数,然后把结果汇总到driver上,生成一个map。一种方法是直接使用aggregate操作,计算每个分区上的本地map,然后合并这些map到driver上。另外一种方法是先使用aggregateByKey,在各个分区上并行的计算单词数量,然后简单的通过collectAsMap操作在driver上得到结果。 + + +#####二次排序##### + +另外一个重要需要注意的性能陷阱是repartitionAndSortWithinPartitions操作,他看起来是一个神秘的操作,但是似乎总是出现各种奇怪的现象。这个操作让排序与shuffle机制相结合,排序可以跟其他的操作合并到一起,这样就可以更高效的处理大量的数据。 + +Hive on Spark里join的内部实现都使用了这个操作。它同时还是[二次排序(secondary sort)](http://www.quora.com/What-is-secondary-sort-in-Hadoop-and-how-does-it-work)的重要组成部分,即当你想要对一组数据记录按照key分组,同时在遍历key对应的值的时候,让这些值能够按照特定的顺序被遍历。实际应用中,某些算法想要按照用户对事件进行分组,然后按照用户出现(译者注:如注册、访问)的先后顺序对每个用户的事件进行分析。目前利用repartitionAndSortWithinPartitions来做次要排序还需要一点额外的准备工作,不过[SPARK-3655](https://issues.apache.org/jira/browse/SPARK-3655)可以让事情变得简单很多。 + + +#####结语##### + +你现在应该对影响Spark程序运行效率的基本因素有了较好的认识。在[下一节](/how-to-tune-your-apache-spark-jobs-part-2.html),我们会继续从资源请求、并行和数据结构等方面介绍Spark的调优。 + +原文作者Sandy Ryza是Cloudera的数据科学家,他同时还为Apache Hadoop和Apache Spark项目贡献代码。他是O’Reilly出版的[Advanced Analytics with Spark](http://shop.oreilly.com/product/0636920035091.do)一书的作者之一。 + + diff --git a/content/how-to-tune-your-apache-spark-jobs-part-2.md b/content/how-to-tune-your-apache-spark-jobs-part-2.md new file mode 100644 index 0000000..bfc04ab --- /dev/null +++ b/content/how-to-tune-your-apache-spark-jobs-part-2.md @@ -0,0 +1,142 @@ +Title: Apache Spark性能调优(二) +Slug: how-to-tune-your-apache-spark-jobs-part-2 +Date: 2015-06-24 +Category: Spark +Author: 杨文华 +Tags: Spark, YARN, Tuning +Type: 翻译 +OriginAuthor: Sandy Ryza +OriginTime: 2015-03-30 +OriginUrl: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ + + +这篇文章是Apache Spark性能调优的第二部分,延续[Apache Spark性能调优(一)](/how-to-tune-your-apache-spark-jobs-part-1.html)。本文尝试涵盖几乎所有让Spark程序高效运行的问题。首先,你会学习资源调优,或者说通过配置,最大限度的利用Spark集群提供的所有资源;然后会学习并行调优,并行度是影响job性能的调优最困难同时也是最重要的参数;最后你会学到数据本身的表现形式,存储在磁盘上供Spark读取的形式,以及在内存中作为cache或者传输的形式。 + + +#####资源分配调优##### + +在Spark用户邮件组里面充斥着类似这样的一些问题:“我有一个500个节点的Spark集群,但是当我跑一个应用程序时,只有两个tasks同时执行,不应该啊,怎么破?”。考虑到Spark有如此多的参数来控制Spark集群资源的使用,产生这些问题也还不算不合理。在这一部分,你可以学习怎样尽可能多的使用Spark集群的所有资源(榨干你的Spark集群)。不同的集群管理系统(YARN、Mesos、Spark Standalone)的推荐配置会有一些差别,这里我们只关注YARN,Cloudera也建议所有用户使用YARN。 + +选择YARN的背景以及在YARN上运行Spark的更详细资料可以参见[Apache Spark Resource Management and YARN App Models](http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/)。 + +Spark(以及YARN)关注的最主要的两个资源是CPU和内存。磁盘和网络I/O当然也是影响Spark性能的一部分因素,但是Spark和YARN目前在这方面都没有什么动作。 + +Spark应用程序的每个executor都有相同的固定的CPU核数,以及相同的固定的堆(heap)大小。在执行spark-submit, spark-shell, 和pyspark时,CPU核数可以通过--executor-cores来指定;CPU核数也可以通过在spark-defaults.conf文件或者SparkConf对象中设置spark.executor.cores来指定。类似的,堆大小也可以通过--executor-memory(译者注:原文这里应该是笔误写成了--executor-cores)或者spark.executor.memory来指定。CPU核数控制每个executor可以并行运行的task数量。设置“--executor-cores 5”意思是每个executor最多可以同时跑5个任务。堆大小(spark.executor.memory)影响Spark能够cache的数据量,同时影响用于做group、aggregation、join时产生的shuffle的数据结构的内存大小。 + +可以通过设置--num-executors或者spark.executor.instances来控制申请的executor数量。从CDH 5.4/Spark 1.3开始,如果通过设置spark.dynamicAllocation.enabled属性开启了[动态分配executor(dynamic allocation)](https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation)就可以不用自己设置executor数量。动态分配executor可以让一个Spark应用程序在有task积压时申请增加executor,在executor变为idle状态时释放executor。 + +考虑Spark申请的资源怎样与YARN的可用资源相适应也是比较重要的。相关的YARN属性如下: + ++ yarn.nodemanager.resource.memory-mb控制一个节点上所有container可用的内存总和。 + ++ yarn.nodemanager.resource.cpu-vcores控制一个节点上所有container可用的CPU核数总和。 + +给executor申请5个核(cores)结果是向YARN申请5个虚拟核(virtual cores)。从YARN申请内存会复杂一些: + ++ --executor-memory/spark.executor.memory控制executor的堆大小,但是JVMs同时还会使用一部分的堆外内存,比如interned Strings和direct byte buffers。每个executor最终向YARN申请的总内存还需要加上spark.yarn.executor.memoryOverhead,这个属性的默认值是max(384, 0.07 * spark.executor.memory),单位是MB。 + ++ YARN可能会对申请的内存做上舍入。YARN的yarn.scheduler.minimum-allocation-mb和yarn.scheduler.increment-allocation-mb属性分别控制了最小分配内存大小和申请的增量大小。 + +下图显示了Spark和YARN中的内存属性层级(默认不伸缩): + + + +最后还有其他一些关于Spark executor的配置需要考虑: + ++ 应用程序master是一个非executor的container,它负责从YARN申请containers。在yarn-client模式下,master的默认配置是1024MB内存和一个vcore;在yarn-cluster模式下,master同时还会跑driver,这时候配置--driver-memory和--driver-cores属性就比较重要了。 + ++ 给executor配置过多的内存常常会导致过多的垃圾回收。粗略估计,每个executor的内存大小最好不要超过64GB。 + ++ 我注意到HDFS client在大量线程并行操作时会出现一些问题。粗略估计,每个executor并行5个task就可以达到最高的写吞吐量,所以每个executor的核数最好不要超过5个。 + ++ 小executor(比如executor都设置成1核和只足够跑一个task的内存大小)无法利用同一个JVM跑多个task的优势。比如,每个executor都需要复制一份广播变量,很多的小executor会导致广播需要复制很多份。 + +更具体一些,这里给出一个最大限度的使用集群资源的实例:假设你有一个Spark集群,其中6个节点上跑了NodeManagers,每一个节点有16核、64GB内存。假设NodeManager的yarn.nodemanager.resource.memory-mb和yarn.nodemanager.resource.cpu-vcores分别设置成63 * 1024 = 64512MB和15。我们不会给YARN container分配节点所有的机器资源,因为节点的OS和Hadoop daemons还需要占用一些资源。这个例子里面,我们预留1GB和1核给这些系统进程。Cloudera Manager可以帮助自动计算和配置这些YARN属性。 + +也许最直接的配置是:--num-executors 6 --executor-cores 15 --executor-memory 63G,但这样是不对的: + ++ 63GB加上spark.yarn.executor.memoryOverhead超过了NodeManager的63GB内存限制。 + ++ master会用掉其中一个节点的一个核,就是说那个节点上容纳不了一个15核的executor。 + ++ 每个executor15个核达不到很好的HDFS I/O吞吐量。 + +一个较好的配置是:--num-executors 17 --executor-cores 5 --executor-memory 19G。为什么? + ++ 这个配置,除了master所在的节点分配了两个executor之外,其他节点都分配了三个executor。 + ++ --executor-memory是通过(每个节点使用63/3的内存) = 21,21 * 0.07 = 1.47,21 – 1.47 ~ 19推导出来的。 + + +#####并行化调优##### + +读到这里,你应该已经知道Spark是一个并行处理引擎。不过不那么明显的是,Spark并不是一个“神奇”的并行处理引擎,它在一定限制条件下实现最大化的并行。每个Spark的stage包含一定数量的task,这些task是顺序执行的。在Spark调优中,stage中的task数量应当说是影响性能最重要的一个参数了。 + +但是这个数量是如何确定的呢?Spark将RDDs合并成stages的方式在[前一部分](/how-to-tune-your-apache-spark-jobs-part-1.html)中已经讲过了。快速回顾一下,像repartition、reduceByKey这样的transformation会产生stage边界。一个stage中的task数量等于这个stage中的最后那个RDD的分区数量。一个stage中,一个RDD的分区数量又等于它所依赖的RDD的分区数量,除了几个例外:coalesce允许生成一个分区数比它依赖的RDD少的RDD,union生成一个分区数是它的父RDDs分区数的和的RDD,cartesian生成一个分区数是它的父RDDs分区数的乘积的RDD。 + +那没有父RDDs的RDD呢?由textFile或者hadoopFile生成的RDDs,他们的分区数量取决于所使用的底层MapReduce InputFormat。通常一个HDFS block就会生成一个分区。由parallelize生成的RDDs的分区数量,可以在程序中给定,如果没有给定就会使用spark.default.parallelism这个配置的值。 + +可以调用rdd.partitions().size()来确定一个RDD中的分区数量。 + +一个主要的问题是task的数量过于少。假如task的数量比可用的槽位(slot)少的话,这个stage就不能利用全部可用的CPU。 + +Task数量太少同时还意味着每个task中的所有聚合操作都面临更大的内存压力。所有join、cogroup、\*ByKey操作都涉及到将一些对象放到hashmaps或in-memory buffers中,以做分组或排序。join、cogroup、groupByKey在它们触发的shuffle的下游的stage中的task中使用这些数据结构;而reduceByKey和aggregateByKey在它们触发的shuffle的两边的stage中的task里面使用这些数据结构。 + +当给聚合操作的数据不能装入内存时,会出现一些严重问题。首先,将大量的数据保存在这些数据结构中会给垃圾回收带来压力;其次,当数据不能装入内存,Spark会将他们溢写(spill)到磁盘,产生磁盘I/O和排序。这种在大shuffle情况下产生的问题,或许是我在Cloudera客户中见过的任务失败的头号诱因。 + +那么怎样增加分区的数量呢?如果这个stage是来自于从Hadoop读取数据,你有这几个选择: + ++ 使用repartition操作,他会触发一次shuffle。 + ++ 配置InputFormat创建更多的切分。 + ++ 将输入数据以更小的块写到HDFS。 + +如果一个stage是从另外的stage得到的输入数据,触发stage边界的transformation可以接受一个numPartitions参数,就像: + +
+val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X) ++ +那么X应该取什么值呢?优化这个分区数量的最直接的方式是实验:先得到它的父RDD的分区数量,然后不断将这个数乘以1.5直到性能不再增加。 + +也有一个更学术的方法可以计算X的值,不过给定一个先验值比较困难,因为有些数量难以计算。我在这里提到这种方法,不是因为推荐它做一种常规方法,而是因为它可以帮助我们理解其中的原理。我们的目标是运行足够的task,以使所有输入给task的数据都能够装入到task的可用内存中。 + +每个task的可用内存等于(spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores,spark.shuffle.memoryFraction和spark.shuffle.safetyFraction的默认值分别是0.2和0.8。 + +所有shuffle的数据在内存中的大小难以确定。最接近的尝试是找到这个stage的Shuffle Spill(内存)和Shuffle Spill(Disk)的比例,然后乘以总的shuffle数量。然而,如果stage在做归约(reduction)的话,计算就比较复杂了。 + + + +然后做一个上舍入,因为更多的分区总是比更少的分区好一些。 + +事实上,可能有人会有疑问,为何更多的task(以及更多的分区)更好。这个建议与MapReduce中的建议是相反的,MapReduce中要求你对task的数量保守一些。这里面的不同之处在于MapReduce启动一个任务的开销很高,而Spark不是。 + + +#####为你的数据结构瘦身##### + +数据是以数据记录(record)的形式流经Spark的。一条记录有两种表现形式:反序列化的Java对象的形式和序列化的二进制的形式。一般而言,Spark在内存中使用数据记录的反序列化形式,而当数据记录存储在磁盘或者做网络传输时,使用序列化的形式。目前有一些[计划好的](https://issues.apache.org/jira/browse/SPARK-2926)[工作](https://issues.apache.org/jira/browse/SPARK-4550)在将某些in-memory shuffle数据以序列化的方式来存储。 + +spark.serializer这个属性控制数据在这两种表现形式之间的转换方式。Kryo序列化方式对应org.apache.spark.serializer.KryoSerializer,是推荐的选项。不幸的是,在早期的Spark版本中,Kryo有一些不稳定性问题,后期版本为了不破坏兼容性,并没有把Kryo作为默认选项。不过开发人员还是应当首选Kryo来做序列化。 + +数据记录在这两种表现形式下的大小对Spark的性能有很大的影响。检查数据结构,尽量削减数据结构的大小是很值得做的一件事情。 + +臃肿的反序列化对象会导致Spark更多的溢写数据到磁盘,同时减少了Spark可以cache(比如在MEMORY storage level模式下)的反序列化数据记录的数量。Spark调优有一个很好的[章节](http://spark.apache.org/docs/1.2.0/tuning.html#memory-tuning)专门来讲数据结构的瘦身。 + +臃肿的序列化对象会导致更多的磁盘和网络I/O,同时减少了Spark可以cache(比如在MEMORY_SER storage level模式下)的序列化数据记录。这里要注意的是你需要确保使用SparkConf#registerKryoClasses这个API来对自定义的类进行注册。 + + +#####数据格式##### + +如果你可以决定数据存储在磁盘上的方式,那么你应当选择一种可扩展的二进制数据格式,例如Avro,Parquet,Thrift或者Protobuf。选择其中一种数据格式,并且一直都用这种格式。说的更明白一点,当谈及在Hadoop上使用Avro,Thrift或者Protobuf,意思应该是每条数据记录都是用Avro/Thrift/Protobuf格式存储在文件中的。JSON格式不值得尝试。 + +每次当你考虑将大量的数据以JSON格式存储时,你可以联想一下中世纪将要产生的冲突与对立,加拿大将要被大坝拦截的美丽河流,或者是将要在美国腹地建造的为了给你解析文件的CPU提供能源的核设施产生的核泄漏(译者注:原作者是诗人吗...)。同时,学习一点人际交往能力,以使你能够说服你的同僚和上级也不要使用JSON格式存储数据。 + +原文作者Sandy Ryza是Cloudera的数据科学家,他同时还为Apache Hadoop和Apache Spark项目贡献代码。他是O’Reilly出版的[Advanced Analytics with Spark](http://shop.oreilly.com/product/0636920035091.do)一书的作者之一。 + + diff --git a/content/images/aerosolve-machine-learning-for-humans-1.gif b/content/images/aerosolve-machine-learning-for-humans-1.gif new file mode 100644 index 0000000..78a5497 Binary files /dev/null and b/content/images/aerosolve-machine-learning-for-humans-1.gif differ diff --git a/content/images/aerosolve-machine-learning-for-humans-2.png b/content/images/aerosolve-machine-learning-for-humans-2.png new file mode 100644 index 0000000..e657d41 Binary files /dev/null and b/content/images/aerosolve-machine-learning-for-humans-2.png differ diff --git a/content/images/aerosolve-machine-learning-for-humans-3.png b/content/images/aerosolve-machine-learning-for-humans-3.png new file mode 100644 index 0000000..db59d57 Binary files /dev/null and b/content/images/aerosolve-machine-learning-for-humans-3.png differ diff --git a/content/images/aerosolve-machine-learning-for-humans-4.png b/content/images/aerosolve-machine-learning-for-humans-4.png new file mode 100644 index 0000000..8f2a804 Binary files /dev/null and b/content/images/aerosolve-machine-learning-for-humans-4.png differ diff --git a/content/images/aerosolve-machine-learning-for-humans-5.png b/content/images/aerosolve-machine-learning-for-humans-5.png new file mode 100644 index 0000000..a3de036 Binary files /dev/null and b/content/images/aerosolve-machine-learning-for-humans-5.png differ diff --git a/content/images/aerosolve-machine-learning-for-humans-6.png b/content/images/aerosolve-machine-learning-for-humans-6.png new file mode 100644 index 0000000..cc977d1 Binary files /dev/null and b/content/images/aerosolve-machine-learning-for-humans-6.png differ diff --git a/content/images/aerosolve-machine-learning-for-humans-7.gif b/content/images/aerosolve-machine-learning-for-humans-7.gif new file mode 100644 index 0000000..a0f9a91 Binary files /dev/null and b/content/images/aerosolve-machine-learning-for-humans-7.gif differ diff --git a/content/images/apache-spark-resource-management-and-yarn-app-models-f1.png b/content/images/apache-spark-resource-management-and-yarn-app-models-f1.png new file mode 100644 index 0000000..eedfbbb Binary files /dev/null and b/content/images/apache-spark-resource-management-and-yarn-app-models-f1.png differ diff --git a/content/images/apache-spark-resource-management-and-yarn-app-models-f2.png b/content/images/apache-spark-resource-management-and-yarn-app-models-f2.png new file mode 100644 index 0000000..d2c338b Binary files /dev/null and b/content/images/apache-spark-resource-management-and-yarn-app-models-f2.png differ diff --git a/content/images/apache-spark-resource-management-and-yarn-app-models-f3.png b/content/images/apache-spark-resource-management-and-yarn-app-models-f3.png new file mode 100644 index 0000000..9246aa3 Binary files /dev/null and b/content/images/apache-spark-resource-management-and-yarn-app-models-f3.png differ diff --git a/content/images/apache-spark-resource-management-and-yarn-app-models-f4.png b/content/images/apache-spark-resource-management-and-yarn-app-models-f4.png new file mode 100644 index 0000000..4a5f58e Binary files /dev/null and b/content/images/apache-spark-resource-management-and-yarn-app-models-f4.png differ diff --git a/content/images/how-to-tune-your-apache-spark-jobs-part-1-f1.png b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f1.png new file mode 100644 index 0000000..6d19829 Binary files /dev/null and b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f1.png differ diff --git a/content/images/how-to-tune-your-apache-spark-jobs-part-1-f2.png b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f2.png new file mode 100644 index 0000000..ef10764 Binary files /dev/null and b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f2.png differ diff --git a/content/images/how-to-tune-your-apache-spark-jobs-part-1-f3.png b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f3.png new file mode 100644 index 0000000..e4e62b6 Binary files /dev/null and b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f3.png differ diff --git a/content/images/how-to-tune-your-apache-spark-jobs-part-1-f4.png b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f4.png new file mode 100644 index 0000000..dc1c80e Binary files /dev/null and b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f4.png differ diff --git a/content/images/how-to-tune-your-apache-spark-jobs-part-1-f5.png b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f5.png new file mode 100644 index 0000000..5dbcb88 Binary files /dev/null and b/content/images/how-to-tune-your-apache-spark-jobs-part-1-f5.png differ diff --git a/content/images/how-to-tune-your-apache-spark-jobs-part-2-f1.png b/content/images/how-to-tune-your-apache-spark-jobs-part-2-f1.png new file mode 100644 index 0000000..535180b Binary files /dev/null and b/content/images/how-to-tune-your-apache-spark-jobs-part-2-f1.png differ diff --git a/content/images/how-to-tune-your-apache-spark-jobs-part-2-f2.png b/content/images/how-to-tune-your-apache-spark-jobs-part-2-f2.png new file mode 100644 index 0000000..7ee01b4 Binary files /dev/null and b/content/images/how-to-tune-your-apache-spark-jobs-part-2-f2.png differ diff --git a/content/images/streampatterns-f1.png b/content/images/streampatterns-f1.png new file mode 100644 index 0000000..16bb2a9 Binary files /dev/null and b/content/images/streampatterns-f1.png differ diff --git a/content/images/streampatterns-f2.png b/content/images/streampatterns-f2.png new file mode 100644 index 0000000..72cc201 Binary files /dev/null and b/content/images/streampatterns-f2.png differ diff --git a/content/images/streampatterns-f3.png b/content/images/streampatterns-f3.png new file mode 100644 index 0000000..09a98d0 Binary files /dev/null and b/content/images/streampatterns-f3.png differ diff --git a/content/images/streampatterns-f4.png b/content/images/streampatterns-f4.png new file mode 100644 index 0000000..75a5ea3 Binary files /dev/null and b/content/images/streampatterns-f4.png differ diff --git a/content/meet_the_nerds_Eric_Levine.md b/content/meet_the_nerds_Eric_Levine.md new file mode 100644 index 0000000..4a134fa --- /dev/null +++ b/content/meet_the_nerds_Eric_Levine.md @@ -0,0 +1,48 @@ +Title: 俺们公司的呆子们之:Eric Levine +Slug: Meet the Nerds: Eric Levine +Date: 2015-07-09 +Category: team +Author: 黄兴 +Tags: Airbnb +Type: 翻译 +OriginAuthor: Mike Curtis +OriginUrl: http://nerds.airbnb.com/meet-nerds-eric-levine/ + + +### 俺们公司的呆子们之:Eric Levine + + +你好,Eric Levine!Eric是我们的信用和安全组的一名工程主管。Eric告诉我们如何把坏人留下然后去土耳其的港湾(就是一个池子,但是叫“港湾”更好听些)游泳。 +(译注:这貌似是在开玩笑……参考本文最后一段,貌似是去游泳时不叫着同事。不好理解,欢迎指正) + +##### 你是怎么开始从事计算机科学的呢? + +我从事计算机科学的道路起源于我的哥哥和导师Matthew。Matthew大学毕业回家以后,想要教他的小弟弟写些代码。他教了我一些基础知识,然后帮我从书上继续学习,继续开发。由此开始,等到我高中毕业的时候,我已经在运营一个有十几个普通用户的网络游戏了。 + +##### 你是怎么到Airbnb来的呢? + +我最后到Airbnb是挺长的一个过程,从2008年开始。08年的时候,我收到了一个YouTube的猎头的电话,问我想不想去YouTube实习。这个实习以一种非常积级的方式改变了我的职业生涯。四年以前,我在YouTube +时候的同一个猎头,向我提供了一个在Airbnb工作的机会。因为有了以前的事,我知道自己可以相信她,并且从外边看来,Airbnb很有吸引力。当时我还在Google工作,我认为自己积累的技能并不如我希望的那样是可移植的。因为你在离开的时候,不能带着很多Google的基础设施一起走,但是我想要自己的技能更灵活一些。Airbnb很适合我的想法,这也的确是我做出的最佳决策。 + +##### 你加入Airbnb以后遇到的最有趣的技术挑战是啥呢? + +在我加入Airbnb以后,遇到的最有趣的技术挑战绝对是我们基于机器学习的风险检测系统。我的同事Nassem HaKim和Aaron Keys写过一篇很好的博客来描述这个系统,接下来的迭代一定会使这个系统更上一层楼。 + +##### 你接下来准备搞什么呢? + +我们想要开始思考关于已认证ID(Verified ID) +的事,我们想要在这方面做更多。现在处于一个很好的开始,我们开始将我们从其它领域学到的东西进行应用,把它们应用于身份识别,来更好地理解我们这个平台的用户。接下来,我们还有很多非常棒的工作计划来提升我们的系统,来对我们的社区进行更多的保护。 + +##### 你最喜欢的核心价值是什么,像是怎么去做的呢? + +我最喜欢的核心价值是“拥抱冒险”(Embrace the Adventure)。 我对它的理解是:要认识到想要完美是不可能的,当遇到未知的情况时,一个人要学会“逆来顺受(roll with the punches)”。我们鼓励员工来修补有问题的东西,来真正地做产品的主人,将这个核心价值来应用到我们的工程文化当中。是这个核心价值以及我们应用它的方式使得Airbnb如此独特。 + +##### 你最棒的Airbnb体验是什么? + +我最棒的Airbnb体验是和一个土耳其的小村庄Kirazli的一对夫妇一起。房子非常惊艳,设计得很美,并且很适合我们停留。每天早上我被村庄呼叫祷告者的声音唤醒。在懒懒地起床之后,在主人提供传统土耳其早餐之前,我会出去游一小会泳。主人接着会带我去山上走一走,从树上摘些新鲜的无花果。整个体验非常愉快和令人鼓舞。 + + \ No newline at end of file diff --git a/favicon.ico b/favicon.ico new file mode 100644 index 0000000..f3d2239 Binary files /dev/null and b/favicon.ico differ diff --git a/how-to-tune-your-apache-spark-jobs-part-1.html b/how-to-tune-your-apache-spark-jobs-part-1.html index 546d0e4..1f88c1a 100644 --- a/how-to-tune-your-apache-spark-jobs-part-1.html +++ b/how-to-tune-your-apache-spark-jobs-part-1.html @@ -3,7 +3,7 @@ -