SPARK SQL 操作
Spark SQL 並不是將 Spark 作為一個 SQL server 使用,而是指 Spark 的一種資料型態。考慮到 Spark 內的資料是以 RDD 為架構,因此,很適合當作一個資料庫的單元 (例如: table) 來進行操作。同時,由於 RDD 的封閉性 (唯讀) 以及資料的多樣態 (key-value) ,也很適合使用 SQL 指令進行 RDD 資料的操作。
首先,我們先進入 Spark shell,並 import 必要的檔案:
Copy Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> import spark.implicits._
import spark.implicits._
接著,我們引入一個 json 格式的檔案,作為輸入 RDD 的資料,該檔案內容如下:
Copy ~$ cat /usr/lib/spark/examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
在該檔案中,我們可以看到,對每一個 json 字串,都帶有兩個欄位: name 以及 age。接著,我們把該 json 檔案讀入 Spark 中。由於我們必沒有宣告欄位中數值的型態,Spark 將會透過隱式 (implicits) 轉換,賦予其資料型態。
Copy scala> val df = spark.read.json("/usr/lib/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
df 就是透過 json 轉換出來的 RDD,我們可以利用df.show()
來看其中內容。其資料結構會顯示像是 SQL 表格的形式。
Copy scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.filter($"age" > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
不只是資料格式為 SQL 的表格,我們也可以透過一些類 SQL 指令對 RDD 進行操作,例如,列出 age > 29 的數值,或是只顯示某一些欄位。
考慮到 json 格式的巢狀結構,我們也可以試著引入一個比較複雜的資料結構,如下所示:
Copy cat test.json
{"name":"Michael", "address":{"city": "New York","postcode": "10021"}}
{"name":"Andy", "age":30, "address":{"city": "New York","postcode": "10022"}}
{"name":"Justin", "age":19, "address":{"city": "New York","postcode": "10023"}}
這樣巢狀結構的資料,也可以引入 Spark SQL 中,會是欄位中的多重數值。
Copy scala> val df_new = spark.read.json("/home/ubuntu/sparkcode/SQLtest/src/main/scala/test.json")
df_new: org.apache.spark.sql.DataFrame = [address: struct<city: string, postcode: string>, age: bigint ... 1 more field]
scala> df_new.show()
+----------------+----+-------+
| address| age| name|
+----------------+----+-------+
|[New York,10021]|null|Michael|
|[New York,10022]| 30| Andy|
|[New York,10023]| 19| Justin|
+----------------+----+-------+
scala> df_new.printSchema()
root
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- postcode: string (nullable = true)
|-- age: long (nullable = true)
|-- name: string (nullable = true)
更多關於 Spark SQL 的資訊,可以參考官方網站:
Note: SQL 指令紀錄
登入 mySQL,並切換到 spark 資料庫。
Copy ubuntu@testspark:~$ mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 210
Server version: 5.7.24-0ubuntu0.16.04.1 (Ubuntu)
Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> use spark;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
創建 table
Copy mysql> create table sampleCount (bandID int(4), AreaID int(4), APid int(4), RPid int(4), count int(4));
Query OK, 0 rows affected (0.03 sec)
mysql> create table avgSamples (bandID int(4), AreaID int(4), APid int(4), RPid int(4), RSSI float(4));
Query OK, 0 rows affected (0.03 sec)
mysql> create table allSamples (bandID int(4), AreaID int(4), APid int(4), RPid int(4), round int(4), RSSI float(4));
Query OK, 0 rows affected (0.04 sec)
刪除 table
Copy mysql> drop table sampleCount;
Query OK, 0 rows affected (0.01 sec)
mysql> drop table avgSamples;
Query OK, 0 rows affected (0.01 sec)
mysql> drop table allSamples;
Query OK, 0 rows affected (0.01 sec)
查看目前的 table
Copy mysql> show tables;
+-----------------+
| Tables_in_spark |
+-----------------+
| allSamples |
| avgSamples |
| sampleCount |
| student |
+-----------------+
4 rows in set (0.00 sec)
同時插入 table 多筆數值:
Copy // INSERT INTO sampleCount (bandID, AreaID, APid, RPid, count)
// VALUES (0, 1, 1, 1, 59),
// (0, 1, 2, 1, 60),
// (0, 1, 3, 1, 37);
============================================================
mysql> INSERT INTO sampleCount (bandID, AreaID, APid, RPid, count)VALUES (0, 1, 1, 1, 59),(0, 1, 2, 1, 60),(0, 1, 3, 1, 37);
Query OK, 3 rows affected (0.00 sec)
Records: 3 Duplicates: 0 Warnings: 0
將寫好 SQL 指令的文字檔直接執行:
Copy ubuntu@testspark:~$ mysql -u root -p spark < SQL-824-.txt
// mysql -u root -p [db_name] < [file_name]
讀取 table 的內容:
Copy // select [column_name] from [table_name]
mysql> select * from sampleCount;
+--------+--------+------+------+-------+
| bandID | AreaID | APid | RPid | count |
+--------+--------+------+------+-------+
| 0 | 1 | 1 | 1 | 59 |
| 0 | 1 | 2 | 1 | 60 |
| 0 | 1 | 3 | 1 | 37 |
+--------+--------+------+------+-------+
3 rows in set (0.00 sec)
計算 table 內某一欄位的有值數目:
Copy // select count([column_name]) from [table_name];
mysql> select count(RPid) from sampleCount;
+-------------+
| count(RPid) |
+-------------+
| 216 |
+-------------+
1 row in set (0.00 sec)