農林漁牧網

您現在的位置是:首頁 > 林業

程式碼+案例詳解:使用Spark處理大資料最全指南

2022-07-24由 讀芯術 發表于 林業

森林類別程式碼22表示什麼

全文共

17984

字,預計學習時長

30

分鐘或更長

程式碼+案例詳解:使用Spark處理大資料最全指南

如今,有不少關於Spark的相關介紹,但很少有人從資料科學家的角度來解釋該計算機引擎。因此,本文將試著介紹並詳細闡述——如何執行Spark?

程式碼+案例詳解:使用Spark處理大資料最全指南

一切是如何開始的呢?--- MapReduce(用於大規模資料集的程式設計模型)

程式碼+案例詳解:使用Spark處理大資料最全指南

假設我們的任務是砍伐森林中的所有樹木,有兩種選擇:

· 讓戴夫·巴蒂斯塔(美國職業摔跤運動員)用電動電鋸把樹一棵接一棵地砍掉。

· 找500個普通人用一般的斧頭砍伐不同的樹。

你更喜歡哪種方法?

雖然選項1仍然是一些人的選擇,但是對選項2的需求促使了MapReduce的出現。

在大資料中,巴蒂斯塔的解決方案稱為垂直擴充套件/擴大,就像在單個工作單元中新增/填充大量記憶體和硬碟一樣。

而第二種解決方法稱為水平擴充套件/橫向擴充套件。就像把許多普通機器連線在一起(用更少的記憶體),然後並行使用這些機器。

垂直擴充套件相對於水平擴充套件而言,有以下幾個優勢:

· 問題規模較小時,速度更快:假設是2棵樹。巴蒂斯塔會用那可怕的電鋸一下把兩棵樹砍掉,而兩個普通人則還得用斧頭砍這兩棵樹。

· 易於理解:這就是做事的一貫方式,通常按順序思考問題,這也是整個計算機體系結構和設計的演變過程。

而水平擴充套件優勢如下:

· 更加便宜:僱傭50個普通人比僱傭一個像巴蒂斯塔這樣的人要便宜得多。除此之外,巴蒂斯塔需要很多的照顧和保養,以幫助他保持冷靜。他非常敏感,就算對一些小事情也是這樣,猶如記憶體過高的機器。

· 問題規模較大時速度更快:設想一下有1000棵樹,1000普通工人VS 巴蒂斯塔。利用水平擴充套件時,如果面臨一個很大的問題,只需要僱傭100或1000個廉價工人即可。但和巴蒂斯塔工作卻不是這樣。你必須增加記憶體,而這也意味著需要更多的冷卻基礎設施和保養費用。

程式碼+案例詳解:使用Spark處理大資料最全指南

MapReduce使第二種選擇成為可能,透過允許使用計算機叢集進行並行化來實現這種可能性。

MapReduce由兩個術語組成:

對映:

其主要是apply/map函式。將資料分成n個組塊,並將每個塊傳送給不同的工作單元 (對映器)。若想對資料行應用某個函式,該工作單元就會照做。

歸約:

使用基於groupby key的某個函式彙總資料。其主要是利用groupby。

當然,系統如期工作還有許多事情需要完成。

程式碼+案例詳解:使用Spark處理大資料最全指南

為什麼使用Spark?

Hadoop(大資料平臺)是引入MapReduce程式設計正規化的首個開源系統,而Spark是使其速度更快(100倍)的系統。

Hadoop過去有很多資料傳送指令,因為其過去常常將中間結果寫入檔案系統。

這就影響了分析速度。

Spark有一個記憶體模型,因此Spark在工作時不會向磁碟寫入太多內容。

簡單地說,Spark比Hadoop更快,現在很多人都在使用Spark。

程式碼+案例詳解:使用Spark處理大資料最全指南

開始使用Spark

安裝Spark本身就是一個令人頭痛的問題。

如果想了解Spark是如何工作的以及如何真正地使用,建議在社群版線上Databricks上使用Sparks。別擔心,這是免費的。

程式碼+案例詳解:使用Spark處理大資料最全指南

註冊並登入後,螢幕會出現以下顯示。

程式碼+案例詳解:使用Spark處理大資料最全指南

在此可建立新的筆記本。

選擇Python筆記本,自定義筆記本名稱。

一旦啟動一個新的筆記本並嘗試執行任何命令,筆記本會詢問是否要啟動一個新的叢集。點選確定。

下一步檢查sparkcontext是否存在。要檢查sparkcontext是否存在,只需執行以下命令:

程式碼+案例詳解:使用Spark處理大資料最全指南

這意味著執行Spark就需要新建一個筆記本。

程式碼+案例詳解:使用Spark處理大資料最全指南

載入資料

下一步是上傳用於學習Spark的一些資料。只需點選主頁選項卡上的“匯入並檢視資料”。

本文末尾會使用多個數據集來說明,但現在先從一些非常簡單的東西開始。

程式碼+案例詳解:使用Spark處理大資料最全指南

可以看到檔案載入到這個位置了。

程式碼+案例詳解:使用Spark處理大資料最全指南

第一個Spark程式

本文傾向透過示例學習,所以讓我們完成分散式計算的“Hello World”: WordCount 程式。

程式碼+案例詳解:使用Spark處理大資料最全指南

這是一個小例子,其統計了文件字數並輸出了其中的10。

大多數工作是在第二指令中完成的。

如果目前還是跟不上,也別擔心,你的任務就是執行Spark。

但是在討論Spark的基礎知識之前,先了解一下Python基礎知識。如果使用過Python的函數語言程式設計,那麼理解Spark將變得容易得多。

對於沒有使用過Python的人,以下是一個簡短介紹。

程式碼+案例詳解:使用Spark處理大資料最全指南

Python中程式設計的函式方法

程式碼+案例詳解:使用Spark處理大資料最全指南

1。 對映

map用於將函式對映到陣列或列表中。如果想應用某函式到列表中的各元素中,只需透過使用for迴圈來實現,但是python lambda函式可允許在python的單行中實現這一點。

my_list = [1,2,3,4,5,6,7,8,9,10]

# Lets say I want to square each term in my_list。

squared_list = map(lambda x:x**2,my_list)

print(list(squared_list))

——————————————————————————————

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

在上面的例子中,可將map看作一個函式,該函式輸入兩個引數—一個函式和一個列表。

然後,其將該函式應用於列表中各元素,而lambda則可供編寫行內函數使用。在這裡lambda x:x**2定義了一個函式,將x輸入,返回x。

也可以用另外一個合適的函式來代替lambda。例如:

def squared(x):

return x**2

my_list = [1,2,3,4,5,6,7,8,9,10]

# Lets say I want to square each term in my_list。

squared_list = map(squared,my_list)

print(list(squared_list))

——————————————————————————————

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

同樣的結果,但是lambda表示式使程式碼更緊湊,可讀性更強。

2。 篩選

另一個廣泛使用的函式是filter函式。此函式輸入兩個引數—一個條件和一個篩選列表。

如果想使用條件篩選列表,請使用filter函式。

my_list = [1,2,3,4,5,6,7,8,9,10]

# Lets say I want only the even numbers in my list。

filtered_list = filter(lambda x:x%2==0,my_list)

print(list(filtered_list))

————————————————————————————————-

[2, 4, 6, 8, 10]

3。 約歸

下面介紹的函式是reduce函式。這個函式將是Spark中的主力部分。

這個函式輸入兩個引數——一個歸約函式,該函式輸入兩個引數,以及一個應用約歸函式的列表。

import functools

my_list = [1,2,3,4,5]

# Lets say I want to sum all elements in my list。

sum_list = functools。reduce(lambda x,y:x+y,my_list)

print(sum_list)

在python2中,約歸曾經是Python的一部分,現在我們必須使用reduce,使其作為函式工具的一部分。

在這裡,lambda函式輸入兩個值x和y,返回它們的和。直觀地,可以認為約歸函式的工作原理如下:

Reduce function first sends 1,2 ; the lambda function returns 3

Reduce function then sends 3,3 ; the lambda function returns 6

Reduce function then sends 6,4 ; the lambda function returns 10

Reduce function finally sends 10,5 ; the lambda function returns 15

在約歸中使用的lambda函式的一個條件是它必須是:

· 交換律 a + b = b + a 和

· 結合律 (a + b) + c == a + (b + c)。

在上面的例子中,使用了交換律和結合律。另外還可以使用的其他函式:max, min, *等等。

程式碼+案例詳解:使用Spark處理大資料最全指南

再次回到Spark

既然已經掌握了Python函數語言程式設計的基本知識,現在開始瞭解Spark。

首先深入研究一下spark是如何工作的。Spark實際上由驅動和工作單元兩部分組成。

工作單元通常執行這些需要完成的任務,而驅動則是釋出任務指令的。

彈性分散式資料集

RDD(彈性分散式資料集)是一種並行的資料結構,分佈在工作單元節點之間。RDD是Spark程式設計的基本單元。

在wordcount示例中,其第一行

lines = sc。textFile(“/FileStore/tables/shakespeare”)

獲取一個文字檔案,將其分佈到工作單元節點上,這樣RDD就可以並行地處理此檔案。還可以使用sc。parallelize函式平行計算列表。

例如:

data = [1,2,3,4,5,6,7,8,9,10]

new_rdd = sc。parallelize(data,4)

new_rdd

————————————————————————————————-

ParallelCollectionRDD[22] at parallelize at PythonRDD。scala:267

在Spark中,可以對RDD執行兩種不同型別的操作:轉換和操作。

1。 轉換:從現有的RDD中建立新的資料集

2。 操作:從Spark中獲取結果的機制

程式碼+案例詳解:使用Spark處理大資料最全指南

轉換基礎

程式碼+案例詳解:使用Spark處理大資料最全指南

假設已經以RDD的形式獲取了資料。

目前可以透過訪問工作機器來重報資料。現在想對資料進行一些轉換。

比如你可能想要篩選、應用某個功能等等。

在Spark中,這可以由Transformation函式完成。

Spark提供了很多轉換函式。

以下列出一些筆者常用的函式:

1。 Map函式:

將給定函式用於RDD。

注意其句法與Python略有不同,但是可以完成同樣的操作。現在還不必擔心collect操作,因為目前只需要將其視為在squared_rdd中收集資料然後返回列表的函式。

data = [1,2,3,4,5,6,7,8,9,10]

rdd = sc。parallelize(data,4)

squared_rdd = rdd。map(lambda x:x**2)

squared_rdd。collect()

————————————————————————————

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

2。 Filter函式:

此處依舊沒什麼驚喜。將輸入作為一個條件,僅保留滿足該條件的元素。

data = [1,2,3,4,5,6,7,8,9,10]

rdd = sc。parallelize(data,4)

filtered_rdd = rdd。filter(lambda x:x%2==0)

filtered_rdd。collect()

————————————————————————————

[2, 4, 6, 8, 10]

3。 distinct函式:

僅返回RDD中的不同元素。

data = [1,2,2,2,2,3,3,3,3,4,5,6,7,7,7,8,8,8,9,10]

rdd = sc。parallelize(data,4)

distinct_rdd = rdd。distinct()

distinct_rdd。collect()

————————————————————————————

[8, 4, 1, 5, 9, 2, 10, 6, 3, 7]

4。 flatmap函式:

與 map函式相似,但是每個輸入項可對映到0個或更多個輸出項。

data = [1,2,3,4]

rdd = sc。parallelize(data,4)

flat_rdd = rdd。flatMap(lambda x:[x,x**3])

flat_rdd。collect()

————————————————————————————

[1, 1, 2, 8, 3, 27, 4, 64]

5。 Reduce By Key函式:

此函式與Hadoop MapReduce中的reduce相似。

目前,Spark若只與Lists一起使用,是無法求得數值的。

在Spark中,有一對RDD的概念使其更加靈活。假設有一個數據,其中包含產品、類別和售價。這種情況下仍然可以並行化資料。

data = [(‘Apple’,‘Fruit’,200),(‘Banana’,‘Fruit’,24),(‘Tomato’,‘Fruit’,56),(‘Potato’,‘Vegetable’,103),(‘Carrot’,‘Vegetable’,34)]

rdd = sc。parallelize(data,4)

現在的RDD rdd 包含各元組。

目前想找出從每個類別中獲得的總收入。

要實現這一目標,必須將rdd轉換為一對rdd,以使其只包含鍵值對/元組。

category_price_rdd = rdd。map(lambda x: (x[1],x[2]))

category_price_rdd。collect()

————————————————————————————————-

[(‘Fruit’, 200), (‘Fruit’, 24), (‘Fruit’, 56), (‘Vegetable’, 103), (‘Vegetable’, 34)]

此處應用map函式獲取所需格式的rdd。使用文字格式執行時,形成的RDD有很多字串。然後使用map函式將其轉換為所需格式。

所以現在的category_price_rdd中包含產品類別和售價。

如果想將關鍵類別進行約歸併統計總價,那麼可以這樣做:

category_total_price_rdd = category_price_rdd。reduceByKey(lambda x,y:x+y)

category_total_price_rdd。collect()

————————————————————————————-[(‘Vegetable’, 137), (‘Fruit’, 280)]

6。 Group By Key函式:

與reduceByKey相似,Group By Key只是把所有元素放入迭代器中,並不會reduce。舉個例子,如果想保留關鍵類別和所有產品你的價值,可以使用此函式。

再次使用map函式,獲取所需形式的資料。

data = [(‘Apple’,‘Fruit’,200),(‘Banana’,‘Fruit’,24),(‘Tomato’,‘Fruit’,56),(‘Potato’,‘Vegetable’,103),(‘Carrot’,‘Vegetable’,34)]

rdd = sc。parallelize(data,4)

category_product_rdd = rdd。map(lambda x: (x[1],x[0]))

category_product_rdd。collect()

——————————————————————————————

[(‘Fruit’, ‘Apple’), (‘Fruit’, ‘Banana’), (‘Fruit’, ‘Tomato’), (‘Vegetable’, ‘Potato’), (‘Vegetable’, ‘Carrot’)]

然後像下面這樣使用groupByKey:

grouped_products_by_category_rdd = category_product_rdd。groupByKey()

findata = grouped_products_by_category_rdd。collect()

for data in findata:

print(data[0],list(data[1]))

——————————————————————————————

Vegetable [‘Potato’, ‘Carrot’]

Fruit [‘Apple’, ‘Banana’, ‘Tomato’]

此處groupByKey函式執行,其返回該類別中的類別和產品列表。

程式碼+案例詳解:使用Spark處理大資料最全指南

操作基礎

程式碼+案例詳解:使用Spark處理大資料最全指南

至此已經篩選了資料,並在其上映射了一些函式。接下來要完成計算。

現在希望獲取本地計算機上的資料或將其儲存到檔案中,或者以excel或任何視覺化工具中的某些圖形的形式顯示結果。

為此需要進行一些操作。

筆者傾向使用的一些常見操作如下:

1。 collect

上文已多次使用過此操作。該操作將整個RDD返回到應用程式中。

2。 reduce

使用函式func(該函式接受兩個引數並返回一個)來聚合資料集的元素。該函式可交換和組合,以便並行進行正確計算。

rdd = sc。parallelize([1,2,3,4,5])

rdd。reduce(lambda x,y : x+y)

————————————————-

15

3。 take

有時需要檢視RDD包含內容,但無需獲取記憶體中的所有元素。take操作返回包含RDD前n個元素的列表。

rdd = sc。parallelize([1,2,3,4,5])

rdd。take(3)

————————————————-

[1, 2, 3]

4。 takeOrdered

takeOrdered操作使用自然順序或自定義比較器返回RDD的前n個元素。

rdd = sc。parallelize([5,3,12,23])

# descending order

rdd。takeOrdered(3,lambda s:-1*s)

——

[23, 12, 5]

rdd = sc。parallelize([(5,23),(3,34),(12,344),(23,29)])

# descending order

rdd。takeOrdered(3,lambda s:-1*s[1])

——-

[(12, 344), (3, 34), (23, 29)]

至此所有的基礎都已涉及,接下來回到wordcount示例。

程式碼+案例詳解:使用Spark處理大資料最全指南

理解WordCount示例

程式碼+案例詳解:使用Spark處理大資料最全指南

目前已基本瞭解Spark所提供的轉換和操作。

現在理解wordcount程式應該不難。接下來一起逐行完成該程式。

第一行建立RDD並將其分發給工作單位。

lines = sc。textFile(“/FileStore/tables/shakespeare”)

此RDD行包含檔案中的語句列表。使用take操作可檢視rdd內容。

lines。take(5)

——————————————————————

[‘The Project Gutenberg EBook of The Complete Works of William Shakespeare, by ’, ‘William Shakespeare’, ‘’, ‘This eBook is for the use of anyone anywhere at no cost and with’, ‘almost no restrictions whatsoever。 You may copy it, give it away or’]

此RDD格式如下:

[‘word1 word2 word3’,‘word4 word3 word2’]

實際上,下一行是整個工序中的主要函式。

counts = (lines。flatMap(lambda x: x。split(‘ ’))

。map(lambda x: (x, 1))

。reduceByKey(lambda x,y : x + y))

該函式包含對RDD行進行的一系列轉換。首先進行flatmap轉換。flatmap轉換將行作為輸入,單詞作為輸出。因此,進行flatmap轉換之後,RDD的形式如下:

[‘word1’,‘word2’,‘word3’,‘word4’,‘word3’,‘word2’]

接下來,對flatmap輸出進行map轉換,將RDD轉換為:

[(‘word1’,1),(‘word2’,1),(‘word3’,1),(‘word4’,1),(‘word3’,1),(‘word2’,1)]

最後,進行reduceByKey轉換以計算每個單詞出現的時間。

隨後,RDD接近最終的理想形式。

[(‘word1’,1),(‘word2’,2),(‘word3’,2),(‘word4’,1)]

下一行是一個操作,它在本地獲取生成的RDD的前10個元素。

output = counts。take(10)

此行僅輸出結果。

for (word, count) in output:

print(“%s: %i” % (word, count))

以上就是wordcount程式。

到目前為止,我們討論了Wordcount示例以及可以在Spark中使用的基本轉換和操作。但是在現實生活中並不做文字計數。

程式碼+案例詳解:使用Spark處理大資料最全指南

Spark應用例項

程式碼+案例詳解:使用Spark處理大資料最全指南

接下來用具體例項解決一些常見的轉換。

所研究的資料集是Movielens,該資料集是一個穩定基準資料集。1700部電影中的1000名使用者給出了100000份評分,釋出於1998年4月。

Movielens資料集包含大量檔案,但本文僅處理3個檔案:

1。 使用者: 此檔名為 “u。user”, 檔案中的列如下:

[‘user_id’, ‘age’, ‘sex’, ‘occupation’, ‘zip_code’]

2。 評分: 此檔名為 “u。data”, 檔案中的列如下:

[‘user_id’, ‘movie_id’, ‘rating’, ‘unix_timestamp’]

3。 電影: 此檔名為 “u。item”, 檔案中的列如下:

[‘movie_id’, ‘title’, ‘release_date’, ‘video_release_date’, ‘imdb_url’, and 18 more columns。。。。。]

首先使用主頁選項卡上的“匯入和瀏覽資料”將這3個檔案匯入spark例項。

程式碼+案例詳解:使用Spark處理大資料最全指南

程式碼+案例詳解:使用Spark處理大資料最全指南

程式碼+案例詳解:使用Spark處理大資料最全指南

業務合作伙伴聯絡我們並要求從這些資料中找出25部評分最高的電影。一部電影會收到多少次評分?

在不同的RDD中載入資料,看看資料包含什麼內容吧。

userRDD = sc。textFile(“/FileStore/tables/u。user”)

ratingRDD = sc。textFile(“/FileStore/tables/u。data”)

movieRDD = sc。textFile(“/FileStore/tables/u。item”)

print(“userRDD:”,userRDD。take(1))

print(“ratingRDD:”,ratingRDD。take(1))

print(“movieRDD:”,movieRDD。take(1))

——————————————————————————————-

userRDD: [‘1|24|M|technician|85711’]

ratingRDD: [‘196\t242\t3\t881250949’]

movieRDD: [‘1|Toy Story (1995)|01-Jan-1995|-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0’]

值得注意的是,回答這個問題需要使用ratingRDD。但是ratingRDD中沒有電影名稱。

所以必須使用 movie_id合併movieRDD和ratingRDD。

如何在Spark中做到這一點?

以下是使用的程式碼。其中還使用了一個新的轉換leftOuterJoin。請閱讀以下程式碼中的文件和評論。

# Create a RDD from RatingRDD that only contains the two columns of interest i。e。 movie_id,rating。

RDD_movid_rating = ratingRDD。map(lambda x : (x。split(“\t”)[1],x。split(“\t”)[2]))

print(“RDD_movid_rating:”,RDD_movid_rating。take(4))

# Create a RDD from MovieRDD that only contains the two columns of interest i。e。 movie_id,title。

RDD_movid_title = movieRDD。map(lambda x : (x。split(“|”)[0],x。split(“|”)[1]))

print(“RDD_movid_title:”,RDD_movid_title。take(2))

# merge these two pair RDDs based on movie_id。 For this we will use the transformation leftOuterJoin()。 See the transformation document。

rdd_movid_title_rating = RDD_movid_rating。leftOuterJoin(RDD_movid_title)

print(“rdd_movid_title_rating:”,rdd_movid_title_rating。take(1))

# use the RDD in previous step to create (movie,1) tuple pair RDD

rdd_title_rating = rdd_movid_title_rating。map(lambda x: (x[1][1],1 ))

print(“rdd_title_rating:”,rdd_title_rating。take(2))

# Use the reduceByKey transformation to reduce on the basis of movie_title

rdd_title_ratingcnt = rdd_title_rating。reduceByKey(lambda x,y: x+y)

print(“rdd_title_ratingcnt:”,rdd_title_ratingcnt。take(2))

# Get the final answer by using takeOrdered Transformation

print “#####################################”

print “25 most rated movies:”,rdd_title_ratingcnt。takeOrdered(25,lambda x:-x[1])

print “#####################################”

OUTPUT:

——————————————————————————————————RDD_movid_rating: [(‘242’, ‘3’), (‘302’, ‘3’), (‘377’, ‘1’), (‘51’, ‘2’)]

RDD_movid_title: [(‘1’, ‘Toy Story (1995)’), (‘2’, ‘GoldenEye (1995)’)]

rdd_movid_title_rating: [(‘1440’, (‘3’, ‘Above the Rim (1994)’))] rdd_title_rating: [(‘Above the Rim (1994)’, 1), (‘Above the Rim (1994)’, 1)]

rdd_title_ratingcnt: [(‘Mallrats (1995)’, 54), (‘Michael Collins (1996)’, 92)]

25 most rated movies: [(‘Star Wars (1977)’, 583), (‘Contact (1997)’, 509), (‘Fargo (1996)’, 508), (‘Return of the Jedi (1983)’, 507), (‘Liar Liar (1997)’, 485), (‘English Patient, The (1996)’, 481), (‘Scream (1996)’, 478), (‘Toy Story (1995)’, 452), (‘Air Force One (1997)’, 431), (‘Independence Day (ID4) (1996)’, 429), (‘Raiders of the Lost Ark (1981)’, 420), (‘Godfather, The (1972)’, 413), (‘Pulp Fiction (1994)’, 394), (‘Twelve Monkeys (1995)’, 392), (‘Silence of the Lambs, The (1991)’, 390), (‘Jerry Maguire (1996)’, 384), (‘Chasing Amy (1997)’, 379), (‘Rock, The (1996)’, 378), (‘Empire Strikes Back, The (1980)’, 367), (‘Star Trek: First Contact (1996)’, 365), (‘Back to the Future (1985)’, 350), (‘Titanic (1997)’, 350), (‘Mission: Impossible (1996)’, 344), (‘Fugitive, The (1993)’, 336), (‘Indiana Jones and the Last Crusade (1989)’, 331)] #####################################

《星球大戰》是Movielens資料集中評分最高的電影。

現在可以使用以下命令在一個命令中完成所有步驟,但現在程式碼有點亂。

這樣做是為了表明Spark的連結功能可以使用,藉此繞過變數建立的過程。

print(((ratingRDD。map(lambda x : (x。split(“\t”)[1],x。split(“\t”)[2])))。

leftOuterJoin(movieRDD。map(lambda x : (x。split(“|”)[0],x。split(“|”)[1]))))。

map(lambda x: (x[1][1],1))。

reduceByKey(lambda x,y: x+y)。

takeOrdered(25,lambda x:-x[1]))

再來一次。練習:

現在想要使用相同的資料集找到評分最高的25部電影。事實上只需要那些至少有100次評分的電影。

# We already have the RDD rdd_movid_title_rating: [(u‘429’, (u‘5’, u‘Day the Earth Stood Still, The (1951)’))]

# We create an RDD that contains sum of all the ratings for a particular movie

rdd_title_ratingsum = (rdd_movid_title_rating。

map(lambda x: (x[1][1],int(x[1][0])))。

reduceByKey(lambda x,y:x+y))

print(“rdd_title_ratingsum:”,rdd_title_ratingsum。take(2))

# Merge this data with the RDD rdd_title_ratingcnt we created in the last step

# And use Map function to divide ratingsum by rating count。

rdd_title_ratingmean_rating_count = (rdd_title_ratingsum。

leftOuterJoin(rdd_title_ratingcnt)。

map(lambda x:(x[0],(float(x[1][0])/x[1][1],x[1][1]))))

print(“rdd_title_ratingmean_rating_count:”,rdd_title_ratingmean_rating_count。take(1))

# We could use take ordered here only but we want to only get the movies which have count

# of ratings more than or equal to 100 so lets filter the data RDD。

rdd_title_rating_rating_count_gt_100 = (rdd_title_ratingmean_rating_count。

filter(lambda x: x[1][1]>=100))

print(“rdd_title_rating_rating_count_gt_100:”,rdd_title_rating_rating_count_gt_100。take(1))

# Get the final answer by using takeOrdered Transformation

print(“#####################################”)

print (“25 highly rated movies:”)

print(rdd_title_rating_rating_count_gt_100。takeOrdered(25,lambda x:-x[1][0]))

print(“#####################################”)

OUTPUT:

——————————————————————————————

rdd_title_ratingsum: [(‘Mallrats (1995)’, 186), (‘Michael Collins (1996)’, 318)]

rdd_title_ratingmean_rating_count: [(‘Mallrats (1995)’, (3。4444444444444446, 54))]

rdd_title_rating_rating_count_gt_100: [(‘Butch Cassidy and the Sundance Kid (1969)’, (3。949074074074074, 216))]

25 highly rated movies: [(‘Close Shave, A (1995)’, (4。491071428571429, 112)), (“Schindler‘s List (1993)”, (4。466442953020135, 298)), (’Wrong Trousers, The (1993)‘, (4。466101694915254, 118)), (’Casablanca (1942)‘, (4。45679012345679, 243)), (’Shawshank Redemption, The (1994)‘, (4。445229681978798, 283)), (’Rear Window (1954)‘, (4。3875598086124405, 209)), (’Usual Suspects, The (1995)‘, (4。385767790262173, 267)), (’Star Wars (1977)‘, (4。3584905660377355, 583)), (’12 Angry Men (1957)‘, (4。344, 125)), (’Citizen Kane (1941)‘, (4。292929292929293, 198)), (’To Kill a Mockingbird (1962)‘, (4。292237442922374, 219)), (“One Flew Over the Cuckoo’s Nest (1975)”, (4。291666666666667, 264)), (‘Silence of the Lambs, The (1991)’, (4。28974358974359, 390)), (‘North by Northwest (1959)’, (4。284916201117318, 179)), (‘Godfather, The (1972)’, (4。283292978208232, 413)), (‘Secrets & Lies (1996)’, (4。265432098765432, 162)), (‘Good Will Hunting (1997)’, (4。262626262626263, 198)), (‘Manchurian Candidate, The (1962)’, (4。259541984732825, 131)), (‘Dr。 Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)’, (4。252577319587629, 194)), (‘Raiders of the Lost Ark (1981)’, (4。252380952380952, 420)), (‘Vertigo (1958)’, (4。251396648044692, 179)), (‘Titanic (1997)’, (4。2457142857142856, 350)), (‘Lawrence of Arabia (1962)’, (4。23121387283237, 173)), (‘Maltese Falcon, The (1941)’, (4。2101449275362315, 138)), (‘Empire Strikes Back, The (1980)’, (4。204359673024523, 367))]

到目前為止,已經討論過RDD,因為其非常強大。

RDD也可以處理非關係型資料庫。

他們讓你完成很多無法用SparkSQL完成的事情?

是的,你也可以透過Spark使用 SQL,接下來就談談這個。

程式碼+案例詳解:使用Spark處理大資料最全指南

Spark Datafrmes

程式碼+案例詳解:使用Spark處理大資料最全指南

Spark為資料科學家提供了DataFrame API來處理關係資料。

請記住,在後臺它仍然是所有RDD,這就是本文伊始關注RDD的原因。

下文將從使用Spark DataFrames所需的一些常用功能開始。它的一些句法變化與Pandas很相像。

1。 讀取檔案

ratings = spark。read。load(“”,format=“csv”, sep=“\t”, inferSchema=“true”, header=“false”)

2。 顯示檔案

使用Spark DataFrames顯示檔案有兩種方式。

程式碼+案例詳解:使用Spark處理大資料最全指南

程式碼+案例詳解:使用Spark處理大資料最全指南

本文傾向於 display,因其看起來更為美觀簡潔。

3。 改變列名

這是一個好功能,一直都很有用。注意不要遺漏列表前的*。

ratings = ratings。toDF(*[‘user_id’, ‘movie_id’, ‘rating’, ‘unix_timestamp’])

程式碼+案例詳解:使用Spark處理大資料最全指南

4。 一些基本統計結果

print(ratings。count()) #Row Count

print(len(ratings。columns)) #Column Count

————————————————————————————-

100000

4

還可以使用以下方式看到資料幀的統計結果

程式碼+案例詳解:使用Spark處理大資料最全指南

5。 選擇部分列

程式碼+案例詳解:使用Spark處理大資料最全指南

6。 篩選

使用多個條件篩選資料幀:

程式碼+案例詳解:使用Spark處理大資料最全指南

7。 Groupby函式

Groupby函式可以與spark資料幀結合使用。操作與pandas groupby函式基本相同,只是需要匯入pyspark。sql。 functions函式。

from pyspark。sql import functions as F

display(ratings。groupBy(“user_id”)。agg(F。count(“user_id”),F。mean(“rating”)))

本文中已從每個user_id中找到了評分數以及平均評分。

程式碼+案例詳解:使用Spark處理大資料最全指南

8。 排序

程式碼+案例詳解:使用Spark處理大資料最全指南

如下所示,還可以使用F。desc函式進行降序排序

程式碼+案例詳解:使用Spark處理大資料最全指南

程式碼+案例詳解:使用Spark處理大資料最全指南

使用spark Dataframes資料幀進行增加/合併

無法找到與Spark DataFrames合併的pandas對應項,但SQL可以與與dataframes一起使用,所以可以使用SQL合併dataframes。

試著在Ratings上執行SQL。

首先將ratings df註冊到臨時表ratings_table,其上可執行sql操作。

如你所見,SQL select語句的結果還是Spark Datadframe。

程式碼+案例詳解:使用Spark處理大資料最全指南

現在再新增一個Spark Dataframe,觀察是否可以使用SQL查詢來使用連線:

#get one more dataframe to join

movies = spark。read。load(“/FileStore/tables/u。item”,format=“csv”, sep=“|”, inferSchema=“true”, header=“false”)

# change column names

movies = movies。toDF(*[“movie_id”,“movie_title”,“release_date”,“video_release_date”,“IMDb_URL”,“unknown”,“Action”,“Adventure”,“Animation ”,“Children”,“Comedy”,“Crime”,“Documentary”,“Drama”,“Fantasy”,“Film_Noir”,“Horror”,“Musical”,“Mystery”,“Romance”,“Sci_Fi”,“Thriller”,“War”,“Western”])

程式碼+案例詳解:使用Spark處理大資料最全指南

現在嘗試加入movie_id上的表格,以獲得評級表中的電影名。

程式碼+案例詳解:使用Spark處理大資料最全指南

嘗試用RDD做之前做的事情。找到評分最高的25部電影:

程式碼+案例詳解:使用Spark處理大資料最全指南

同時找到擁有超過100票的25部評分最高電影:

程式碼+案例詳解:使用Spark處理大資料最全指南

上面的查詢中使用了GROUP BY,HAVING和ORDER BY子句以及別名。這表明使用sqlContext。sql可以完成相當複雜的事情。

程式碼+案例詳解:使用Spark處理大資料最全指南

關於Display命令的小提示

還可以使用display命令以在筆記本中顯示圖表。

程式碼+案例詳解:使用Spark處理大資料最全指南

選擇“繪圖選項”時,可以看到更多選項。

程式碼+案例詳解:使用Spark處理大資料最全指南

程式碼+案例詳解:使用Spark處理大資料最全指南

Spark Dataframe與RDD的相互轉換

有時可能希望Spark Dataframe與RDD的相互轉換,這樣就可以充分利用這兩個不同的功能。

要從DF轉換為RDD,只需執行以下操作:

程式碼+案例詳解:使用Spark處理大資料最全指南

由RDD轉換為資料幀:

程式碼+案例詳解:使用Spark處理大資料最全指南

RDD以時間和編碼工作上的付出為代價提供更多控制。而Dataframes則提供熟悉的編碼平臺。但是現在可以在這兩者之間來回轉換了。

程式碼+案例詳解:使用Spark處理大資料最全指南

結論

Spark提供了一個介面,在此可以對資料進行轉換和操作。Spark還擁有Dataframe API,便於簡化資料科學家向大資料的過渡。

程式碼+案例詳解:使用Spark處理大資料最全指南

留言 點贊 發個朋友圈

留言 點贊 發個朋友圈

如需轉載,請後臺留言,遵守轉載規範