# Scala: 存取 MySQL 的資料

在之前，我們介紹了 Spark SQL 的格式，在這個實作中，我們將不再使用 Spark SQL 的 RDD 作為資料庫讀取，而是使用一個j外部的 mySQL 資料庫作為讀寫的資料庫，並透過 JDBC 來進行操作。當我們要以 Spark 存取 SQL 資料庫時，由於 Spark 是一個基於 Java 的運算架構，最簡單的方式就是透過 JDBC (Java Database Connectivity) 來建立連線與資料存取。

## 資料庫設定

以下是測試用的 mySQL 資料庫設定:

```
$ mysql -u root -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 18
Server version: 5.7.24-0ubuntu0.16.04.1 (Ubuntu)

Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database spark;
Query OK, 1 row affected (0.00 sec)

mysql> use spark;
Database changed
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
Query OK, 0 rows affected (0.17 sec)

mysql> insert into student values (1, 'Xueqian', 'F', '23');
Query OK, 1 row affected (0.01 sec)

mysql> select * from student;
+------+---------+--------+------+
| id   | name    | gender | age  |
+------+---------+--------+------+
|    1 | Xueqian | F      |   23 |
+------+---------+--------+------+
1 row in set (0.00 sec)
```

目前的設定為:

**(user)** root -> **(database)** spark -> **(table)** student

student 的格式可以參考上方的執行成果。

## Spark 程式撰寫 (讀寫 JDBC)

首先，要先更改 SBT 的設定檔，加入 SQL 的支援:

```
name := "testSQL"
version := "1.0"
scalaVersion := "2.11.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.2"
libraryDependencies += "mysql" % "mysql-connector-java" % "6.0.5"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
```

接著，以下是讀取 SQL 的 Scala 程式:

```
import scala.math.random
import org.apache.spark.sql.SparkSession
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object testSQL {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("testSQL")
      .config("spark.master", "local")
      .getOrCreate()

    // jdbc url
    val jdbcurl = "jdbc:mysql://localhost:3306/spark"

    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "lab711")
    prop.put("driver", "com.mysql.jdbc.Driver")


    // schema (table format)
    val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("gender", StringType, true), StructField("age", IntegerType, true)))

    // input data
    val studentRDD1 = spark.sparkContext.parallelize(Array("1 Alice F 26","2 Bob M 27")).map(_.split(" "))
    val studentRDD2 = spark.sparkContext.parallelize(Array("3 Cindy F 26","4 David M 27")).map(_.split(" "))

    val rowRDD1 = studentRDD1.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
    val rowRDD2 = studentRDD2.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))

    val studentDF1 = spark.createDataFrame(rowRDD1, schema)
    val studentDF2 = spark.createDataFrame(rowRDD2, schema)

    println("========sparkSQL format========")
    studentDF1.show()
    studentDF1.printSchema()

    // Insert (append)
    println("========Insert SQL========")
    studentDF1.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
    studentDF2.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)

    // Read 1 (for reuse, defined as var)
    var readdf = spark.sqlContext.read.jdbc(jdbcurl, "spark.student", prop)

    readdf.show()

    // Insert (Overwrite)
    studentDF1.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
    studentDF2.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)

    // Read 2
    readdf = spark.sqlContext.read.jdbc(jdbcurl, "spark.student", prop)

    readdf.show()
    
    spark.stop()
  }
}


```

{% hint style="info" %}
關於不同種類的資料格式，可以參考:\
<https://taiwansparkusergroup.gitbooks.io/spark-programming-guide-zh-tw/content/spark-sql/spark-sql-dataType-reference.html>
{% endhint %}

在這份程式中，我們示範了 3 種不同的操作: 附加寫入 (append)、覆蓋寫入 (overwrite) 以及讀取 (read)。

* 附加寫入 (append): 將資料附在 table 下方，不會清除原有資料
* 覆蓋寫入 (overwrite): 清空 table 資料後，將資料寫入 table
* 讀取 (read): 讀取 table 中的資訊

相同的，透過 `sbt compile` 和 `sbt run` 就可以執行該程式，並將結果寫入資料庫中。執行的結果，我們分別列出 Read 1 和 Read 2 兩個時間點的執行結果。

```
// Read 1
+---+-------+------+---+
| id|   name|gender|age|
+---+-------+------+---+
|  1|Xueqian|     F| 23|
|  1|  Alice|     F| 26|
|  2|    Bob|     M| 27|
|  3|  Cindy|     F| 26|
|  4|  David|     M| 27|
+---+-------+------+---+

// Read 2
+---+-----+------+---+
| id| name|gender|age|
+---+-----+------+---+
|  3|Cindy|     F| 26|
|  4|David|     M| 27|
+---+-----+------+---+
```

在 Read 1時，資料一共有 5 行，包括一開始創建 table 留下來的資訊，而在 Read 2 時，由於 overwrite 清空所有資料，也因此只剩下 2 行資訊。在程式結束後，我們也可以透過 SQL 指令查詢 "student" 裡的資訊，結果會和 Read 2 的資訊一致。

```
mysql> select * from student;
+------+-------+--------+------+
| id   | name  | gender | age  |
+------+-------+--------+------+
|    3 | Cindy | F      |   26 |
|    4 | David | M      |   27 |
+------+-------+--------+------+
2 rows in set (0.00 sec)
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://spark-nctu.gitbook.io/spark/spark-cheng-shi-jie/spark-cun-qu-mysql-de-liao.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
