《新手福利:Apache Spark入門攻略》要點(diǎn):
本文介紹了新手福利:Apache Spark入門攻略,希望對(duì)您有用。如果有疑問,可以聯(lián)系我們。
【編者按】時(shí)至今日,Spark已成為大數(shù)據(jù)領(lǐng)域最火的一個(gè)開源項(xiàng)目,具備高性能、易于使用等特性.然而作為一個(gè)年輕的開源項(xiàng)目,其使用上存在的挑戰(zhàn)亦弗成為不大,這里為大家分享SciSpike軟件架構(gòu)師Ashwini Kuntamukkala在Dzone上進(jìn)行的Spark入門總結(jié)(雖然有些地方基于的是Spark 1.0版本,但仍然值得閱讀)——Apache Spark:An Engine for Large-Scale Data Processing,由OneAPM工程師翻譯.
本文聚焦Apache Spark入門,了解其在大數(shù)據(jù)領(lǐng)域的地位,覆蓋Apache Spark的安裝及應(yīng)用法式的建立,并解釋一些常見的行為和操作.
一、 為什么要使用Apache Spark
時(shí)下,我們正處在一個(gè)“大數(shù)據(jù)”的時(shí)代,每時(shí)每刻,都有各種類型的數(shù)據(jù)被生產(chǎn).而在此紫外,數(shù)據(jù)增幅的速度也在顯著增加.從廣義上看,這些數(shù)據(jù)包括交易數(shù)據(jù)、社交媒體內(nèi)容(比如文本、圖像和視頻)以及傳感器數(shù)據(jù).那么,為什么要在這些內(nèi)容上投入如此多精力,其原因無非就是從海量數(shù)據(jù)中提取洞見可以對(duì)生活和生產(chǎn)實(shí)踐進(jìn)行很好的指導(dǎo).
在幾年前,只有少部分公司擁有足夠的技術(shù)力量和資金去儲(chǔ)存和挖掘大量數(shù)據(jù),并對(duì)其挖掘從而獲得洞見.然而,被雅虎2009年開源的Apache Hadoop對(duì)這一狀況產(chǎn)生了顛覆性的沖擊——通過使用商用服務(wù)器組成的集群大幅度地降低了海量數(shù)據(jù)處理的門檻.因此,許多行業(yè)(好比Health care、Infrastructure、Finance、Insurance、Telematics、Consumer、Retail、Marketing、E-commerce、Media、 Manufacturing和Entertainment)開始了Hadoop的征程,走上了海量數(shù)據(jù)提取價(jià)值的道路.著眼Hadoop,其主要提供了兩個(gè)方面的功能:
下圖展示了MapReduce的數(shù)據(jù)處理流程,其中一個(gè)Map-Reduce step的輸出將作為下一個(gè)典型Hadoop job的輸入結(jié)果.
在整個(gè)過程中,中間結(jié)果會(huì)借助磁盤傳遞,因此對(duì)比計(jì)算,大量的Map-Reduced作業(yè)都受限于IO.然而對(duì)于ETL、數(shù)據(jù)整合和清理這樣的用例來說,IO約束并不會(huì)產(chǎn)生很大的影響,因?yàn)檫@些場(chǎng)景對(duì)數(shù)據(jù)處理時(shí)間往往不會(huì)有較高的需求.然而,在現(xiàn)實(shí)世界中,同樣存在許多對(duì)延時(shí)要求較為苛刻的用例,好比:
毫無疑問,歷經(jīng)數(shù)年發(fā)展,Hadoop生態(tài)圈中的豐富工具已深受用戶喜愛,然而這里仍然存在眾多問題給使用帶來了挑戰(zhàn):
1.每個(gè)用例都需要多個(gè)不同的技術(shù)堆棧來支撐,在不同使用場(chǎng)景下,大量的辦理方案往往捉襟見肘.
2.在生產(chǎn)環(huán)境中機(jī)構(gòu)往往必要精通數(shù)門技術(shù).
3.很多技術(shù)存在版本兼容性問題.
4.無法在并行job中更快地共享數(shù)據(jù).
而通過Apache Spark,上述問題迎刃而解!Apache Spark是一個(gè)輕量級(jí)的內(nèi)存集群計(jì)算平臺(tái),通過分歧的組件來支撐批、流和交互式用例,如下圖.
二、 關(guān)于Apache Spark
Apache Spark是個(gè)開源和兼容Hadoop的集群計(jì)算平臺(tái).由加州大學(xué)伯克利分校的AMPLabs開發(fā),作為Berkeley Data Analytics Stack(BDAS)的一部分,當(dāng)下由大數(shù)據(jù)公司Databricks保駕護(hù)航,更是Apache旗下的頂級(jí)項(xiàng)目,下圖顯示了Apache Spark堆棧中的分歧組件.
Apache Spark的5年夜優(yōu)勢(shì):
1.更高的性能,因?yàn)閿?shù)據(jù)被加載到集群主機(jī)的分布式內(nèi)存中.數(shù)據(jù)可以被快速的轉(zhuǎn)換迭代,并緩存用以后續(xù)的頻繁拜訪需求.很多對(duì)Spark感興趣的朋友可能也會(huì)聽過這樣一句話——在數(shù)據(jù)全部加載到內(nèi)存的情況下,Spark可以比Hadoop快100倍,在內(nèi)存不夠存放所有數(shù)據(jù)的情況下快Hadoop 10倍.
2.通過建立在Java、Scala、Python、SQL(應(yīng)對(duì)交互式查詢)的標(biāo)準(zhǔn)API以便利各行各業(yè)使用,同時(shí)還含有大量開箱即用的機(jī)器學(xué)習(xí)庫(kù).
3.與現(xiàn)有Hadoop v1 (SIMR) 和2.x (YARN) 生態(tài)兼容,因此機(jī)構(gòu)可以進(jìn)行無縫遷徙.
4.便利下載和安裝.便利的shell(REPL: Read-Eval-Print-Loop)可以對(duì)API進(jìn)行交互式的學(xué)習(xí).
5.借助高品級(jí)的架構(gòu)提高生產(chǎn)力,從而可以講精力放到計(jì)算上.
同時(shí),Apache Spark由Scala實(shí)現(xiàn),代碼異常簡(jiǎn)潔.
三、安裝Apache Spark
下表列出了一些緊張鏈接和先決條件:
如圖6所示,Apache Spark的部署方式包含standalone、Hadoop V1 SIMR、Hadoop 2 YARN/Mesos.Apache Spark需求一定的Java、Scala或Python知識(shí).這里,我們將專注standalone配置下的安裝和運(yùn)行.
1.安裝JDK 1.6+、Scala 2.10+、Python [2.6,3] 和sbt
2.下載Apache Spark 1.0.1 Release
3.在指定目次下Untar和Unzip spark-1.0.1.tgz
akuntamukkala@localhost~/Downloads$ pwd /Users/akuntamukkala/Downloads akuntamukkala@localhost~/Downloads$ tar -zxvf spark- 1.0.1.tgz -C /Users/akuntamukkala/spark
4.運(yùn)行sbt樹立Apache Spark
akuntamukkala@localhost~/spark/spark-1.0.1$ pwd /Users/akuntamukkala/spark/spark-1.0.1 akuntamukkala@localhost~/spark/spark-1.0.1$ sbt/sbt assembly
5.宣布Scala的Apache Spark standalone REPL
/Users/akuntamukkala/spark/spark-1.0.1/bin/spark-shell
假如是Python
/Users/akuntamukkala/spark/spark-1.0.1/bin/ pyspark
四、Apache Spark的事情模式
Spark引擎提供了在集群中所有主機(jī)上進(jìn)行分布式內(nèi)存數(shù)據(jù)處理的才能,下圖顯示了一個(gè)典型Spark job的處理流程.
下圖顯示了Apache Spark如安在集群中執(zhí)行一個(gè)作業(yè).
Master控制數(shù)據(jù)如何被分割,利用了數(shù)據(jù)當(dāng)?shù)匦?并在Slaves上跟蹤所有分布式計(jì)算.在某個(gè)Slave不可用時(shí),其存儲(chǔ)的數(shù)據(jù)會(huì)分配給其他可用的Slaves.雖然當(dāng)下(1.0.1版本)Master還存在單點(diǎn)故障,但后期必然會(huì)被修復(fù).
五、彈性散布式數(shù)據(jù)集(Resilient Distributed Dataset,RDD)
彈性分布式數(shù)據(jù)集(RDD,從Spark 1.3版本開始已被DataFrame替代)是Apache Spark的核心理念.它是由數(shù)據(jù)組成的不可變分布式集合,其主要進(jìn)行兩個(gè)操作:transformation和action.Transformation是類似在RDD上做 filter、map或union 以生成另一個(gè)RDD的操作,而action則是count、first、take(n)、collect 等促發(fā)一個(gè)計(jì)算并返回值到Master或者穩(wěn)定存儲(chǔ)系統(tǒng)的操作.Transformations一般都是lazy的,直到action執(zhí)行后才會(huì)被執(zhí)行.Spark Master/Driver會(huì)保留RDD上的Transformations.這樣一來,如果某個(gè)RDD丟失(也就是salves宕掉),它可以快速和便捷地轉(zhuǎn)換到集群中存活的主機(jī)上.這也就是RDD的彈性所在.
下圖展示了Transformation的lazy:
我們可以通過下面示例來理解這個(gè)概念:從文本中發(fā)現(xiàn)5個(gè)最常用的word.下圖顯示了一個(gè)可能的辦理方案.
在上面命令中,我們對(duì)文本進(jìn)行讀取而且建立字符串的RDD.每個(gè)條目代表了文本中的1行.
scala> val hamlet = sc.textFile(“/Users/akuntamukkala/temp/gutenburg.txt”)hamlet: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
scala> val topWordCount = hamlet.flatMap(str=>str.split(“ “)). filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case (word, count) => (count, word)}.sortByKey(false)topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at sortByKey at <console>:14
1. 通過上述命令我們可以發(fā)現(xiàn)這個(gè)操作異常簡(jiǎn)單——通過簡(jiǎn)單的Scala API來連接transformations和actions.
2. 可能存在某些words被1個(gè)以上空格分隔的情況,導(dǎo)致有些words是空字符串,因此必要使用filter(!_.isEmpty)將它們過濾掉.
3.每個(gè)word都被映射成一個(gè)鍵值對(duì):map(word=>(word,1)).
4.為了合計(jì)所有計(jì)數(shù),這里必要調(diào)用一個(gè)reduce步驟——reduceByKey(_+_). _+_ 可以非常便捷地為每個(gè)key賦值.
5. 我們得到了words以及各自的counts,下一步必要做的是根據(jù)counts排序.在Apache Spark,用戶只能根據(jù)key排序,而不是值.因此,這里必要使用map{case (word, count) => (count, word)}將(word, count)流轉(zhuǎn)到(count, word).
6. 必要計(jì)算最常用的5個(gè)words,因此必要使用sortByKey(false)做一個(gè)計(jì)數(shù)的遞減排序.
上述命令包括了一個(gè).take(5) (an action operation, which triggers computation)和在 /Users/akuntamukkala/temp/gutenburg.txt文本中輸出10個(gè)最常用的words.在Python shell中用戶可以實(shí)現(xiàn)同樣的功能.
RDD lineage可以經(jīng)由過程toDebugString(一個(gè)值得記住的操作)來跟蹤.
scala> topWordCount.take(5).foreach(x=>println(x))(1044,the)(730,and)(679,of)(648,to)(511,I)
常用的Transformations:
Transformation & Purpose | Example & Result |
---|---|
filter(func) Purpose:new RDD by selecting those data elements on which func returns true | scala> val rdd = sc.parallelize(List(“ABC”,”BCD”,”DEF”)) scala> val filtered = rdd.filter(_.contains(“C”)) scala> filtered.collect Result: Array[String] = Array(ABC, BCD) |
map(func) Purpose:return new RDD by applying func on each data element | scala> val rdd=sc.parallelize(List(1,2,3,4,5)) scala> val times2 = rdd.map(_*2) scala> times2.collect Result: Array[Int] = Array(2, 4, 6, 8, 10) |
flatMap(func) Purpose:Similar to map but func returns a Seq instead of a value. For example, mapping a sentence into a Seq of words | scala> val rdd=sc.parallelize(List(“Spark is awesome”,”It is fun”)) scala> val fm=rdd.flatMap(str=>str.split(“ “)) scala> fm.collect Result: Array[String] = Array(Spark, is, awesome, It, is, fun) |
reduceByKey(func,[numTasks]) Purpose:To aggregate values of a key using a function. “numTasks” is an optional parameter to specify number of reduce tasks | scala> val word1=fm.map(word=>(word,1)) scala> val wrdCnt=word1.reduceByKey(_+_) scala> wrdCnt.collect Result: Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1)) |
groupByKey([numTasks]) Purpose:To convert (K,V) to (K,Iterable<V>) | scala> val cntWrd = wrdCnt.map{case (word, count) => (count, word)} scala> cntWrd.groupByKey.collect Result: Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is))) |
distinct([numTasks]) Purpose:Eliminate duplicates from RDD | scala> fm.distinct.collect Result: Array[String] = Array(is, It, awesome, Spark, fun) |
常用的聚攏操作:
Transformation and Purpose | Example and Result |
---|---|
union Purpose:new RDD containing all elements from source RDD and argument. | Scala> val rdd1=sc.parallelize(List(‘A’,’B’)) scala> val rdd2=sc.parallelize(List(‘B’,’C’)) scala> rdd1.union(rdd2).collect Result: Array[Char] = Array(A, B, B, C) |
intersection Purpose:new RDD containing only common elements from source RDD and argument. | Scala> rdd1.intersection(rdd2).collect Result: Array[Char] = Array(B) |
cartesian Purpose:new RDD cross product of all elements from source RDD and argument | Scala> rdd1.cartesian(rdd2).collect Result: Array[(Char, Char)] = Array((A,B), (A,C), (B,B), (B,C)) |
subtract Purpose:new RDD created by removing data elements in source RDD in common with argument | scala> rdd1.subtract(rdd2).collect Result: Array[Char] = Array(A) |
join(RDD,[numTasks]) Purpose:When invoked on (K,V) and (K,W), this operation creates a new RDD of (K, (V,W)) | scala> val personFruit = sc.parallelize(Seq((“Andy”, “Apple”), (“Bob”, “Banana”), (“Charlie”, “Cherry”), (“Andy”,”Apricot”))) scala> val personSE = sc.parallelize(Seq((“Andy”, “Google”), (“Bob”, “Bing”), (“Charlie”, “Yahoo”), (“Bob”,”AltaVista”))) scala> personFruit.join(personSE).collect Result: Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista))) |
cogroup(RDD,[numTasks]) Purpose:To convert (K,V) to (K,Iterable<V>) | scala> personFruit.cogroup(personSe).collect Result: Array[(String, (Iterable[String], Iterable[String]))] = Array((Andy,(ArrayBuffer(Apple, Apricot),ArrayBuffer(google))), (Charlie,(ArrayBuffer(Cherry),ArrayBuffer(Yahoo))), (Bob,(ArrayBuffer(Banana),ArrayBuffer(Bing, AltaVista)))) |
更多transformations信息,請(qǐng)查看http://spark.apache.org/docs/latest/programming-guide.html#transformations
常用的actions
Action & Purpose | Example & Result |
---|---|
count Purpose:get the number of data elements in the RDD | scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.count Result: long = 3 |
collect Purpose:get all the data elements in an RDD as an array | scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.collect Result: Array[char] = Array(A, B, c) |
reduce(func) Purpose:Aggregate the data elements in an RDD using this function which takes two arguments and returns one | scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.reduce(_+_) Result: Int = 10 |
take (n) Purpose:: fetch first n data elements in an RDD. computed by driver program. | Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.take(2) Result: Array[Int] = Array(1, 2) |
foreach(func) Purpose:execute function for each data element in RDD. usually used to update an accumulator(discussed later) or interacting with external systems. | Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.foreach(x=>println(“%s*10=%s”. format(x,x*10))) Result: 1*10=10 4*10=40 3*10=30 2*10=20 |
first Purpose:retrieves the first data element in RDD. Similar to take(1) | scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.first Result: Int = 1 |
saveAsTextFile(path) Purpose:Writes the content of RDD to a text file or a set of text files to local file system/ HDFS | scala> val hamlet = sc.textFile(“/users/akuntamukkala/ temp/gutenburg.txt”) scala> hamlet.filter(_.contains(“Shakespeare”)). saveAsTextFile(“/users/akuntamukkala/temp/ filtered”) Result: akuntamukkala@localhost~/temp/filtered$ ls _SUCCESS part-00000 part-00001 |
更多actions參見http://spark.apache.org/docs/latest/programming-guide.html#actions
六、RDD持久性
Apache Spark中一個(gè)主要的能力便是在集群內(nèi)存中持久化/緩存RDD.這將顯著地提升交互速度.下表顯示了Spark中各種選項(xiàng).
Storage Level | Purpose |
---|---|
MEMORY_ONLY (Default level) | This option stores RDD in available cluster memory as deserialized Java objects. Some partitions may not be cached if there is not enough cluster memory. Those partitions will be recalculated on the fly as needed. |
MEMORY_AND_DISK | This option stores RDD as deserialized Java objects. If RDD does not fit in cluster memory, then store those partitions on the disk and read them as needed. |
MEMORY_ONLY_SER | This options stores RDD as serialized Java objects (One byte array per partition). This is more CPU intensive but saves memory as it is more space efficient. Some partitions may not be cached. Those will be recalculated on the fly as needed. |
MEMORY_ONLY_DISK_SER | This option is same as above except that disk is used when memory is not sufficient. |
DISC_ONLY | This option stores the RDD only on the disk |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | Same as other levels but partitions are replicated on 2 slave nodes |
上面的存儲(chǔ)等級(jí)可以通過RDD. cache操作上的 persist操作拜訪,可以方便地指定MEMORY_ONLY選項(xiàng).關(guān)于持久化等級(jí)的更多信息,可以拜訪這里http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
Spark使用Least Recently Used (LRU)算法來移除緩存中舊的、不常用的RDD,從而開釋出更多可用內(nèi)存.同樣還提供了一個(gè)unpersist 操作來強(qiáng)制移除緩存/持久化的RDD.
七、變量共享
Accumulators.Spark提供了一個(gè)非常便捷地途徑來避免可變的計(jì)數(shù)器和計(jì)數(shù)器同步問題——Accumulators.Accumulators在一個(gè)Spark context中通過默認(rèn)值初始化,這些計(jì)數(shù)器在Slaves節(jié)點(diǎn)上可用,但是Slaves節(jié)點(diǎn)不能對(duì)其進(jìn)行讀取.它們的作用便是來獲取原子更新,并將其轉(zhuǎn)發(fā)到Master.Master是唯一可以讀取和計(jì)算所有更新合集的節(jié)點(diǎn).舉個(gè)例子:
akuntamukkala@localhost~/temp$ cat output.logerrorwarninginfotraceerrorinfoinfoscala> val nErrors=sc.accumulator(0.0)scala> val logs = sc.textFile(“/Users/akuntamukkala/temp/output.log”)scala> logs.filter(_.contains(“error”)).foreach(x=>nErrors+=1)scala> nErrors.valueResult:Int = 2
Broadcast Variables.實(shí)際生產(chǎn)中,通過指定key在RDDs上對(duì)數(shù)據(jù)進(jìn)行合并的場(chǎng)景非常常見.在這種情況下,很可能會(huì)出現(xiàn)給slave nodes發(fā)送大體積數(shù)據(jù)集的情況,讓其負(fù)責(zé)托管需要做join的數(shù)據(jù).因此,這里很可能存在巨大的性能瓶頸,因?yàn)榫W(wǎng)絡(luò)IO比內(nèi)存拜訪速度慢100倍.為了解決這個(gè)問題,Spark提供了Broadcast Variables,如其名稱一樣,它會(huì)向slave nodes進(jìn)行廣播.因此,節(jié)點(diǎn)上的RDD操作可以快速拜訪Broadcast Variables值.舉個(gè)例子,期望計(jì)算一個(gè)文件中所有路線項(xiàng)的運(yùn)輸成本.通過一個(gè)look-up table指定每種運(yùn)輸類型的成本,這個(gè)look-up table就可以作為Broadcast Variables.
akuntamukkala@localhost~/temp$ cat packagesToShip.txt groundexpressmediapriorityprioritygroundexpressmediascala> val map = sc.parallelize(Seq((“ground”,1),(“med”,2), (“priority”,5),(“express”,10))).collect.toMapmap: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, media -> 2, priority -> 5, express -> 10)scala> val bcMailRates = sc.broadcast(map)
上述命令中,我們建立了一個(gè)broadcast variable,基于服務(wù)類別本錢的map.
scala> val pts = sc.textFile(“/Users/akuntamukkala/temp/packagesToShip.txt”)
在上述命令中,我們通過broadcast variable的mailing rates來計(jì)算運(yùn)輸本錢.
scala> pts.map(shipType=>(shipType,1)).reduceByKey(_+_). map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates. value(shipType))}.collect
通過上述命令,我們使用accumulator來累加所有運(yùn)輸?shù)谋惧X.詳細(xì)信息可通過下面的PDF查看http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf.
八、Spark SQL
通過Spark Engine,Spark SQL提供了一個(gè)便捷的途徑來進(jìn)行交互式分析,使用一個(gè)被稱為SchemaRDD類型的RDD.SchemaRDD可以通過已有RDDs建立,或者其他外部數(shù)據(jù)格式,好比Parquet files、JSON數(shù)據(jù),或者在Hive上運(yùn)行HQL.SchemaRDD非常類似于RDBMS中的表格.一旦數(shù)據(jù)被導(dǎo)入SchemaRDD,Spark引擎就可以對(duì)它進(jìn)行批或流處理.Spark SQL提供了兩種類型的Contexts——SQLContext和HiveContext,擴(kuò)展了SparkContext的功能.
SparkContext提供了到簡(jiǎn)單SQL parser的拜訪,而HiveContext則提供了到HiveQL parser的拜訪.HiveContext允許企業(yè)利用已有的Hive基礎(chǔ)設(shè)施.
這里看一個(gè)簡(jiǎn)單的SQLContext示例.
下面文本中的用戶數(shù)據(jù)通過“|”來朋分.
John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854
定義Scala case class來表現(xiàn)每一行:
case class Customer(name:String,age:Int,gender:String,address: String)
下面的代碼片段體現(xiàn)了如何使用SparkContext來建立SQLContext,讀取輸入文件,將每一行都轉(zhuǎn)換成SparkContext中的一條記載,并通過簡(jiǎn)單的SQL語句來查詢30歲以下的男性用戶.
val sparkConf = new SparkConf.setAppName(“Customers”)val sc = new SparkContext(sparkConf)val sqlContext = new SQLContext(sc)val r = sc.textFile(“/Users/akuntamukkala/temp/customers.txt”) val records = r.map(_.split(‘|’))val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable(“customers”)
sqlContext.sql(“select * from customers where gender=’M’ and age < 30”).collect.foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris, TX,75461]
更多使用SQL和HiveQL的示例請(qǐng)拜訪下面鏈接https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html.
九、Spark Streaming
Spark Streaming提供了一個(gè)可擴(kuò)展、容錯(cuò)、高效的途徑來處理流數(shù)據(jù),同時(shí)還利用了Spark的簡(jiǎn)易編程模型.從真正意義上講,Spark Streaming會(huì)將流數(shù)據(jù)轉(zhuǎn)換成micro batches,從而將Spark批處理編程模型應(yīng)用到流用例中.這種統(tǒng)一的編程模型讓Spark可以很好地整合批量處理和交互式流分析.下圖顯示了Spark Streaming可以從分歧數(shù)據(jù)源中讀取數(shù)據(jù)進(jìn)行分析.
Spark Streaming中的核心抽象是Discretized Stream(DStream).DStream由一組RDD組成,每個(gè)RDD都包括了規(guī)定時(shí)間(可配置)流入的數(shù)據(jù).圖12很好地展示了Spark Streaming如何通過將流入數(shù)據(jù)轉(zhuǎn)換成一系列的RDDs,再轉(zhuǎn)換成DStream.每個(gè)RDD都包括兩秒(設(shè)定的區(qū)間長(zhǎng)度)的數(shù)據(jù).在Spark Streaming中,最小長(zhǎng)度可以設(shè)置為0.5秒,因此處理延時(shí)可以達(dá)到1秒以下.
Spark Streaming同樣提供了 window operators,它有助于更有效率在一組RDD( a rolling window of time)上進(jìn)行計(jì)算.同時(shí),DStream還提供了一個(gè)API,其操作符(transformations和output operators)可以贊助用戶直接操作RDD.下面不妨看向包含在Spark Streaming下載中的一個(gè)簡(jiǎn)單示例.示例是在Twitter流中找出趨勢(shì)hashtags,詳見下面代碼.
spark-1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scalaval sparkConf = new SparkConf.setAppName(“TwitterPopularTags”)val ssc = new StreamingContext(sparkConf, Seconds(2))val stream = TwitterUtils.createStream(ssc, None, filters)
上述代碼用于建立Spark Streaming Context.Spark Streaming將在DStream中建立一個(gè)RDD,包括了每2秒流入的tweets.
val hashTags = stream.flatMap(status => status.getText.split(“ “).filter(_.startsWith(“#”)))
上述代碼片段將Tweet轉(zhuǎn)換成一組words,并過濾出所有以a#開首的.
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))
上述代碼展示了若何整合計(jì)算60秒內(nèi)一個(gè)hashtag流入的總次數(shù).
topCounts60.foreachRDD(rdd => {val topList = rdd.take(10)println(“\nPopular topics in last 60 seconds (%stotal):”.format(rdd.count)) topList.foreach{case (count, tag) => println(“%s (%stweets)”.format(tag, count))} })
上面代碼將找出top 10趨向tweets,然后將其打印.
ssc.start
上述代碼讓Spark Streaming Context 開端檢索tweets.一起聚焦一些常用操作,假設(shè)我們正在從一個(gè)socket中讀入流文本.
al lines = ssc.socketTextStream(“l(fā)ocalhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)
更多operators請(qǐng)拜訪http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations
Spark Streaming擁有大量強(qiáng)大的output operators,比如上文提到的 foreachRDD,了解更多可拜訪 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations.
十、附加進(jìn)修資源
《新手福利:Apache Spark入門攻略》是否對(duì)您有啟發(fā),歡迎查看更多與《新手福利:Apache Spark入門攻略》相關(guān)教程,學(xué)精學(xué)透。維易PHP學(xué)院為您提供精彩教程。
轉(zhuǎn)載請(qǐng)注明本頁(yè)網(wǎng)址:
http://www.fzlkiss.com/jiaocheng/13479.html