一、測試數據
7369,SMITH,CLERK,7902,1980/12/17,800,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,20
7839,KING,PRESIDENT,1981/11/17,5000,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,20
7900,JAMES,CLERK,7698,1981/12/3,9500,30
7902,FORD,ANALYST,7566,1981/12/3,3000,20
7934,MILLER,CLERK,7782,1982/1/23,1300,10
二、創建DataFrame
方式一:DSL方式操作
- 實例化SparkContext和SparkSession對象
- 利用StructType類型構建schema,用于定義數據的結構信息
- 通過SparkContext對象讀取文件,生成RDD
- 將RDD[String]轉換成RDD[Row]
- 通過SparkSession對象創建dataframe
- 完整代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.scala.demo.sql import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} object Demo01 { def main(args: Array[String]): Unit = { // 1.創建SparkContext和SparkSession對象 val sc = new SparkContext( new SparkConf().setAppName( "Demo01" ).setMaster( "local[2]" )) val sparkSession = SparkSession.builder().getOrCreate() // 2. 使用StructType來定義Schema val mySchema = StructType(List( StructField( "empno" , DataTypes.IntegerType, false ), StructField( "ename" , DataTypes.StringType, false ), StructField( "job" , DataTypes.StringType, false ), StructField( "mgr" , DataTypes.StringType, false ), StructField( "hiredate" , DataTypes.StringType, false ), StructField( "sal" , DataTypes.IntegerType, false ), StructField( "comm" , DataTypes.StringType, false ), StructField( "deptno" , DataTypes.IntegerType, false ) )) // 3. 讀取數據 val empRDD = sc.textFile( "file:///D:\\TestDatas\\emp.csv" ) // 4. 將其映射成ROW對象 val rowRDD = empRDD.map(line => { val strings = line.split( "," ) Row(strings( 0 ).toInt, strings( 1 ), strings( 2 ), strings( 3 ), strings( 4 ), strings( 5 ).toInt,strings( 6 ), strings( 7 ).toInt) }) // 5. 創建DataFrame val dataFrame = sparkSession.createDataFrame(rowRDD, mySchema) // 6. 展示內容 DSL dataFrame.groupBy( "deptno" ).sum( "sal" ).as( "result" ).sort( "sum(sal)" ).show() } } |
結果如下:
方式二:SQL方式操作
- 實例化SparkContext和SparkSession對象
- 創建case class Emp樣例類,用于定義數據的結構信息
- 通過SparkContext對象讀取文件,生成RDD[String]
- 將RDD[String]轉換成RDD[Emp]
- 引入spark隱式轉換函數(必須引入)
- 將RDD[Emp]轉換成DataFrame
- 將DataFrame注冊成一張視圖或者臨時表
- 通過調用SparkSession對象的sql函數,編寫sql語句
- 停止資源
- 具體代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.scala.demo.sql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} // 0. 數據分析 // 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30 // 1. 定義Emp樣例類 case class Emp(empNo:Int,empName:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptNo:Int) object Demo02 { def main(args: Array[String]): Unit = { // 2. 讀取數據將其映射成Row對象 val sc = new SparkContext( new SparkConf().setMaster( "local[2]" ).setAppName( "Demo02" )) val mapRdd = sc.textFile( "file:///D:\\TestDatas\\emp.csv" ) .map(_.split( "," )) val rowRDD:RDD[Emp] = mapRdd.map(line => Emp(line( 0 ).toInt, line( 1 ), line( 2 ), line( 3 ), line( 4 ), line( 5 ).toInt, line( 6 ), line( 7 ).toInt)) // 3。創建dataframe val spark = SparkSession.builder().getOrCreate() // 引入spark隱式轉換函數 import spark.implicits._ // 將RDD轉成Dataframe val dataFrame = rowRDD.toDF // 4.2 sql語句操作 // 1、將dataframe注冊成一張臨時表 dataFrame.createOrReplaceTempView( "emp" ) // 2. 編寫sql語句進行操作 spark.sql( "select deptNo,sum(sal) as total from emp group by deptNo order by total desc" ).show() // 關閉資源 spark.stop() sc.stop() } } |
結果如下:
到此這篇關于Spark SQL 2.4.8 操作 Dataframe的兩種方式的文章就介紹到這了,更多相關Spark SQL 操作 Dataframe內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/sujiangming/article/details/120756862