從大方向來說,spark 算子大致可以分為以下兩類:
1)transformation 變換/轉換算子:這種變換并不觸發提交作業,完成作業中間過程處理。
transformation 操作是延遲計算的,也就是說從一個rdd 轉換生成另一個 rdd 的轉換操作不是馬上執行,需要等到有 action 操作的時候才會真正觸發運算。
2)action 行動算子:這類算子會觸發 sparkcontext 提交 job 作業。
action 算子會觸發 spark 提交作業(job),并將數據輸出 spark系統。
從小方向來說,spark 算子大致可以分為以下三類:
1)value數據類型的transformation算子,這種變換并不觸發提交作業,針對處理的數據項是value型的數據。
2)key-value數據類型的transfromation算子,這種變換并不觸發提交作業,針對處理的數據項是key-value型的數據對。
3)action算子,這類算子會觸發sparkcontext提交job作業。
引言
通常寫spark的程序用scala比較方便,畢竟spark的源碼就是用scala寫的。然而,目前java開發者特別多,尤其進行數據對接、上線服務的時候,這時候,就需要掌握一些spark在java中的使用方法了
一、map
map在進行數據處理、轉換的時候,不能更常用了
在使用map之前 首先要定義一個轉換的函數 格式如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
function<string, labeledpoint> transform = new function<string, labeledpoint>() { //string是某一行的輸入類型 labeledpoint是轉換后的輸出類型 @override public labeledpoint call(string row) throws exception { //重寫call方法 string[] rowarr = row.split( "," ); int rowsize = rowarr.length; double [] doublearr = new double [rowsize- 1 ]; //除了第一位的lable外 其余的部分解析成double 然后放到數組中 for ( int i = 1 ; i < rowsize; i++) { string each = rowarr[i]; doublearr[i] = double .parsedouble(each); } //用剛才得到的數據 轉成向量 vector feature = vectors.dense(doublearr); double label = double .parsedouble(rowarr[ 0 ]); //構造用于分類訓練的數據格式 labelpoint labeledpoint point = new labeledpoint(label, feature); return point; } }; |
需要特別注意的是:
1、call方法的輸入應該是轉換之前的數據行的類型 返回值應是處理之后的數據行類型
2、如果轉換方法中調用了自定義的類,注意該類名必須實現序列化 比如
1
2
|
public class treeensemble implements serializable { } |
3、轉換函數中如果調用了某些類的對象,比如該方法需要調用外部的一個參數,或者數值處理模型(標準化,歸一化等),則該對象需要聲明是final
然后就是在合適的時候調用該轉換函數了
1
|
javardd<labeledpoint> rdd = oridata.tojavardd().map(transform); |
這種方式是需要將普通的rdd轉成javardd才能使用的,轉成javardd的這一步操作不耗時,不用擔心
二、filter
在避免數據出現空值、0等場景中也非常常用,可以滿足sql中where的功能
這里首先也是要定義一個函數,該函數給定數據行 返回布爾值 實際效果是將返回為true的數據保留
1
2
3
4
5
6
7
|
function<string, boolean > boolfilter = new function<string, boolean >() { //string是某一行的輸入類型 boolean是對應的輸出類型 用于判斷數據是否保留 @override public boolean call(string row) throws exception { //重寫call方法 boolean flag = row!= null ; return flag; } }; |
通常該函數實際使用中需要修改的僅僅是row的類型 也就是數據行的輸入類型,和上面的轉換函數不同,此call方法的返回值應是固定為boolean
然后是調用方式
1
|
javardd<labeledpoint> rdd = oridata.tojavardd().filter(boolfilter); |
三、maptopair
該方法和map方法有一些類似,也是對數據進行一些轉換。不過此函數輸入一行 輸出的是一個元組,最常用的方法是用來做交叉驗證 或者統計錯誤率 召回率 計算auc等等
同樣,需要先定義一個轉換函數
1
2
3
4
5
6
7
8
|
function<string, boolean > transformer = new pairfunction<labeledpoint, object, object>() { //labeledpoint是輸入類型 后面的兩個object不要改動 @override public tuple2 call(labeledpoint row) throws exception { //重寫call方法 通常只改動輸入參數 輸出不要改動 double predicton = thismodel.predict(row.features()); double label = row.label(); return new tuple2(predicton, label); } }); |
關于調用的類、類的對象,要求和之前的一致,類需要實現序列化,類的對象需要聲明成final類型
相應的調用如下:
1
|
javapairrdd<object, object> predictionsandlabels = oridata.maptopair(transformer); |
然后對該predictionsandlabels的使用,計算準確率、召回率、精準率、auc,接下來的博客中會有,敬請期待
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:https://www.cnblogs.com/starwater/p/9195764.html