Spark SQL

Spark SQL 的格式與資料查詢的運用

SPARK SQL 操作

Spark SQL 並不是將 Spark 作為一個 SQL server 使用,而是指 Spark 的一種資料型態。考慮到 Spark 內的資料是以 RDD 為架構,因此,很適合當作一個資料庫的單元 (例如: table) 來進行操作。同時,由於 RDD 的封閉性 (唯讀) 以及資料的多樣態 (key-value) ,也很適合使用 SQL 指令進行 RDD 資料的操作。

首先,我們先進入 Spark shell,並 import 必要的檔案:

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import spark.implicits._
import spark.implicits._

接著,我們引入一個 json 格式的檔案,作為輸入 RDD 的資料,該檔案內容如下:

~$ cat /usr/lib/spark/examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

在該檔案中,我們可以看到,對每一個 json 字串,都帶有兩個欄位: name 以及 age。接著,我們把該 json 檔案讀入 Spark 中。由於我們必沒有宣告欄位中數值的型態,Spark 將會透過隱式 (implicits) 轉換,賦予其資料型態。

scala> val df = spark.read.json("/usr/lib/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

df 就是透過 json 轉換出來的 RDD,我們可以利用df.show()來看其中內容。其資料結構會顯示像是 SQL 表格的形式。

scala> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

scala> df.filter($"age" > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

scala> df.select("name").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

不只是資料格式為 SQL 的表格,我們也可以透過一些類 SQL 指令對 RDD 進行操作,例如,列出 age > 29 的數值,或是只顯示某一些欄位。

考慮到 json 格式的巢狀結構,我們也可以試著引入一個比較複雜的資料結構,如下所示:

cat test.json
{"name":"Michael", "address":{"city": "New York","postcode": "10021"}}
{"name":"Andy", "age":30, "address":{"city": "New York","postcode": "10022"}}
{"name":"Justin", "age":19, "address":{"city": "New York","postcode": "10023"}}

這樣巢狀結構的資料,也可以引入 Spark SQL 中,會是欄位中的多重數值。

scala> val df_new = spark.read.json("/home/ubuntu/sparkcode/SQLtest/src/main/scala/test.json")
df_new: org.apache.spark.sql.DataFrame = [address: struct<city: string, postcode: string>, age: bigint ... 1 more field]

scala> df_new.show()
+----------------+----+-------+
|         address| age|   name|
+----------------+----+-------+
|[New York,10021]|null|Michael|
|[New York,10022]|  30|   Andy|
|[New York,10023]|  19| Justin|
+----------------+----+-------+

scala> df_new.printSchema()
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- postcode: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

更多關於 Spark SQL 的資訊,可以參考官方網站:

Note: SQL 指令紀錄

登入 mySQL,並切換到 spark 資料庫。

ubuntu@testspark:~$ mysql -u root -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 210
Server version: 5.7.24-0ubuntu0.16.04.1 (Ubuntu)

Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> use spark;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed

創建 table

mysql> create table sampleCount (bandID int(4), AreaID int(4), APid int(4), RPid int(4), count int(4));
Query OK, 0 rows affected (0.03 sec)

mysql> create table avgSamples (bandID int(4), AreaID int(4), APid int(4), RPid int(4), RSSI float(4));
Query OK, 0 rows affected (0.03 sec)

mysql> create table allSamples (bandID int(4), AreaID int(4), APid int(4), RPid int(4), round int(4), RSSI float(4));
Query OK, 0 rows affected (0.04 sec)

刪除 table

mysql> drop table sampleCount;
Query OK, 0 rows affected (0.01 sec)

mysql> drop table avgSamples;
Query OK, 0 rows affected (0.01 sec)

mysql> drop table allSamples;
Query OK, 0 rows affected (0.01 sec)

查看目前的 table

mysql> show tables;
+-----------------+
| Tables_in_spark |
+-----------------+
| allSamples      |
| avgSamples      |
| sampleCount     |
| student         |
+-----------------+
4 rows in set (0.00 sec)

同時插入 table 多筆數值:

// INSERT INTO sampleCount (bandID, AreaID, APid, RPid, count)
// VALUES (0, 1, 1, 1, 59),
// (0, 1, 2, 1, 60),
// (0, 1, 3, 1, 37);
============================================================
mysql> INSERT INTO sampleCount (bandID, AreaID, APid, RPid, count)VALUES (0, 1, 1, 1, 59),(0, 1, 2, 1, 60),(0, 1, 3, 1, 37);
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

將寫好 SQL 指令的文字檔直接執行:

ubuntu@testspark:~$ mysql -u root -p spark < SQL-824-.txt
// mysql -u root -p [db_name] < [file_name]

讀取 table 的內容:

// select [column_name] from [table_name]
mysql> select * from sampleCount;
+--------+--------+------+------+-------+
| bandID | AreaID | APid | RPid | count |
+--------+--------+------+------+-------+
|      0 |      1 |    1 |    1 |    59 |
|      0 |      1 |    2 |    1 |    60 |
|      0 |      1 |    3 |    1 |    37 |
+--------+--------+------+------+-------+
3 rows in set (0.00 sec)

計算 table 內某一欄位的有值數目:

// select count([column_name]) from [table_name];
mysql> select count(RPid) from sampleCount;
+-------------+
| count(RPid) |
+-------------+
|         216 |
+-------------+
1 row in set (0.00 sec)

Last updated