SPARK
  • Spark 基本介紹
  • RDD 與 兩種操作
    • Resilient Distributed Dataset (RDD)
    • RDD vs. DataFrame vs. DataSet
    • Action 和 Transformation
  • Spark 環境安裝
    • Spark 平台的安裝
    • Spark 的編譯環境建立
    • IntelliJ IDEA 開發環境
    • Spark 語言選擇: Scala vs. python
  • Spark 分項簡介
    • Spark MLlib
    • Spark Streaming
    • Spark SQL
  • Spark 程式解說
    • Scala: SparkPi 解說
    • Scala: Multi-class Classifier
    • Scala: MLlib SVM
    • Scala: 資料的輸入與處理
    • Scala: 存取 MySQL 的資料
Powered by GitBook
On this page
  • SPARK SQL 操作
  • Note: SQL 指令紀錄

Was this helpful?

  1. Spark 分項簡介

Spark SQL

Spark SQL 的格式與資料查詢的運用

SPARK SQL 操作

Spark SQL 並不是將 Spark 作為一個 SQL server 使用,而是指 Spark 的一種資料型態。考慮到 Spark 內的資料是以 RDD 為架構,因此,很適合當作一個資料庫的單元 (例如: table) 來進行操作。同時,由於 RDD 的封閉性 (唯讀) 以及資料的多樣態 (key-value) ,也很適合使用 SQL 指令進行 RDD 資料的操作。

首先,我們先進入 Spark shell,並 import 必要的檔案:

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import spark.implicits._
import spark.implicits._

接著,我們引入一個 json 格式的檔案,作為輸入 RDD 的資料,該檔案內容如下:

~$ cat /usr/lib/spark/examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

在該檔案中,我們可以看到,對每一個 json 字串,都帶有兩個欄位: name 以及 age。接著,我們把該 json 檔案讀入 Spark 中。由於我們必沒有宣告欄位中數值的型態,Spark 將會透過隱式 (implicits) 轉換,賦予其資料型態。

scala> val df = spark.read.json("/usr/lib/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

df 就是透過 json 轉換出來的 RDD,我們可以利用df.show()來看其中內容。其資料結構會顯示像是 SQL 表格的形式。

scala> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

scala> df.filter($"age" > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

scala> df.select("name").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

不只是資料格式為 SQL 的表格,我們也可以透過一些類 SQL 指令對 RDD 進行操作,例如,列出 age > 29 的數值,或是只顯示某一些欄位。

考慮到 json 格式的巢狀結構,我們也可以試著引入一個比較複雜的資料結構,如下所示:

cat test.json
{"name":"Michael", "address":{"city": "New York","postcode": "10021"}}
{"name":"Andy", "age":30, "address":{"city": "New York","postcode": "10022"}}
{"name":"Justin", "age":19, "address":{"city": "New York","postcode": "10023"}}

這樣巢狀結構的資料,也可以引入 Spark SQL 中,會是欄位中的多重數值。

scala> val df_new = spark.read.json("/home/ubuntu/sparkcode/SQLtest/src/main/scala/test.json")
df_new: org.apache.spark.sql.DataFrame = [address: struct<city: string, postcode: string>, age: bigint ... 1 more field]

scala> df_new.show()
+----------------+----+-------+
|         address| age|   name|
+----------------+----+-------+
|[New York,10021]|null|Michael|
|[New York,10022]|  30|   Andy|
|[New York,10023]|  19| Justin|
+----------------+----+-------+

scala> df_new.printSchema()
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- postcode: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

更多關於 Spark SQL 的資訊,可以參考官方網站:

Note: SQL 指令紀錄

登入 mySQL,並切換到 spark 資料庫。

ubuntu@testspark:~$ mysql -u root -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 210
Server version: 5.7.24-0ubuntu0.16.04.1 (Ubuntu)

Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> use spark;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed

創建 table

mysql> create table sampleCount (bandID int(4), AreaID int(4), APid int(4), RPid int(4), count int(4));
Query OK, 0 rows affected (0.03 sec)

mysql> create table avgSamples (bandID int(4), AreaID int(4), APid int(4), RPid int(4), RSSI float(4));
Query OK, 0 rows affected (0.03 sec)

mysql> create table allSamples (bandID int(4), AreaID int(4), APid int(4), RPid int(4), round int(4), RSSI float(4));
Query OK, 0 rows affected (0.04 sec)

刪除 table

mysql> drop table sampleCount;
Query OK, 0 rows affected (0.01 sec)

mysql> drop table avgSamples;
Query OK, 0 rows affected (0.01 sec)

mysql> drop table allSamples;
Query OK, 0 rows affected (0.01 sec)

查看目前的 table

mysql> show tables;
+-----------------+
| Tables_in_spark |
+-----------------+
| allSamples      |
| avgSamples      |
| sampleCount     |
| student         |
+-----------------+
4 rows in set (0.00 sec)

同時插入 table 多筆數值:

// INSERT INTO sampleCount (bandID, AreaID, APid, RPid, count)
// VALUES (0, 1, 1, 1, 59),
// (0, 1, 2, 1, 60),
// (0, 1, 3, 1, 37);
============================================================
mysql> INSERT INTO sampleCount (bandID, AreaID, APid, RPid, count)VALUES (0, 1, 1, 1, 59),(0, 1, 2, 1, 60),(0, 1, 3, 1, 37);
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

將寫好 SQL 指令的文字檔直接執行:

ubuntu@testspark:~$ mysql -u root -p spark < SQL-824-.txt
// mysql -u root -p [db_name] < [file_name]

讀取 table 的內容:

// select [column_name] from [table_name]
mysql> select * from sampleCount;
+--------+--------+------+------+-------+
| bandID | AreaID | APid | RPid | count |
+--------+--------+------+------+-------+
|      0 |      1 |    1 |    1 |    59 |
|      0 |      1 |    2 |    1 |    60 |
|      0 |      1 |    3 |    1 |    37 |
+--------+--------+------+------+-------+
3 rows in set (0.00 sec)

計算 table 內某一欄位的有值數目:

// select count([column_name]) from [table_name];
mysql> select count(RPid) from sampleCount;
+-------------+
| count(RPid) |
+-------------+
|         216 |
+-------------+
1 row in set (0.00 sec)
PreviousSpark StreamingNextScala: SparkPi 解說

Last updated 6 years ago

Was this helpful?

https://spark.apache.org/docs/latest/sql-programming-guide.html