Scala: 存取 MySQL 的資料

在 Spark 上透過 JDBC 存取 mySQL

在之前,我們介紹了 Spark SQL 的格式,在這個實作中,我們將不再使用 Spark SQL 的 RDD 作為資料庫讀取,而是使用一個j外部的 mySQL 資料庫作為讀寫的資料庫,並透過 JDBC 來進行操作。當我們要以 Spark 存取 SQL 資料庫時,由於 Spark 是一個基於 Java 的運算架構,最簡單的方式就是透過 JDBC (Java Database Connectivity) 來建立連線與資料存取。

資料庫設定

以下是測試用的 mySQL 資料庫設定:

$ mysql -u root -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 18
Server version: 5.7.24-0ubuntu0.16.04.1 (Ubuntu)

Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database spark;
Query OK, 1 row affected (0.00 sec)

mysql> use spark;
Database changed
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
Query OK, 0 rows affected (0.17 sec)

mysql> insert into student values (1, 'Xueqian', 'F', '23');
Query OK, 1 row affected (0.01 sec)

mysql> select * from student;
+------+---------+--------+------+
| id   | name    | gender | age  |
+------+---------+--------+------+
|    1 | Xueqian | F      |   23 |
+------+---------+--------+------+
1 row in set (0.00 sec)

目前的設定為:

(user) root -> (database) spark -> (table) student

student 的格式可以參考上方的執行成果。

Spark 程式撰寫 (讀寫 JDBC)

首先,要先更改 SBT 的設定檔,加入 SQL 的支援:

name := "testSQL"
version := "1.0"
scalaVersion := "2.11.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.2"
libraryDependencies += "mysql" % "mysql-connector-java" % "6.0.5"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

接著,以下是讀取 SQL 的 Scala 程式:

import scala.math.random
import org.apache.spark.sql.SparkSession
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object testSQL {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("testSQL")
      .config("spark.master", "local")
      .getOrCreate()

    // jdbc url
    val jdbcurl = "jdbc:mysql://localhost:3306/spark"

    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "lab711")
    prop.put("driver", "com.mysql.jdbc.Driver")


    // schema (table format)
    val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("gender", StringType, true), StructField("age", IntegerType, true)))

    // input data
    val studentRDD1 = spark.sparkContext.parallelize(Array("1 Alice F 26","2 Bob M 27")).map(_.split(" "))
    val studentRDD2 = spark.sparkContext.parallelize(Array("3 Cindy F 26","4 David M 27")).map(_.split(" "))

    val rowRDD1 = studentRDD1.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
    val rowRDD2 = studentRDD2.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))

    val studentDF1 = spark.createDataFrame(rowRDD1, schema)
    val studentDF2 = spark.createDataFrame(rowRDD2, schema)

    println("========sparkSQL format========")
    studentDF1.show()
    studentDF1.printSchema()

    // Insert (append)
    println("========Insert SQL========")
    studentDF1.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
    studentDF2.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)

    // Read 1 (for reuse, defined as var)
    var readdf = spark.sqlContext.read.jdbc(jdbcurl, "spark.student", prop)

    readdf.show()

    // Insert (Overwrite)
    studentDF1.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
    studentDF2.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)

    // Read 2
    readdf = spark.sqlContext.read.jdbc(jdbcurl, "spark.student", prop)

    readdf.show()
    
    spark.stop()
  }
}

在這份程式中,我們示範了 3 種不同的操作: 附加寫入 (append)、覆蓋寫入 (overwrite) 以及讀取 (read)。

  • 附加寫入 (append): 將資料附在 table 下方,不會清除原有資料

  • 覆蓋寫入 (overwrite): 清空 table 資料後,將資料寫入 table

  • 讀取 (read): 讀取 table 中的資訊

相同的,透過 sbt compilesbt run 就可以執行該程式,並將結果寫入資料庫中。執行的結果,我們分別列出 Read 1 和 Read 2 兩個時間點的執行結果。

// Read 1
+---+-------+------+---+
| id|   name|gender|age|
+---+-------+------+---+
|  1|Xueqian|     F| 23|
|  1|  Alice|     F| 26|
|  2|    Bob|     M| 27|
|  3|  Cindy|     F| 26|
|  4|  David|     M| 27|
+---+-------+------+---+

// Read 2
+---+-----+------+---+
| id| name|gender|age|
+---+-----+------+---+
|  3|Cindy|     F| 26|
|  4|David|     M| 27|
+---+-----+------+---+

在 Read 1時,資料一共有 5 行,包括一開始創建 table 留下來的資訊,而在 Read 2 時,由於 overwrite 清空所有資料,也因此只剩下 2 行資訊。在程式結束後,我們也可以透過 SQL 指令查詢 "student" 裡的資訊,結果會和 Read 2 的資訊一致。

mysql> select * from student;
+------+-------+--------+------+
| id   | name  | gender | age  |
+------+-------+--------+------+
|    3 | Cindy | F      |   26 |
|    4 | David | M      |   27 |
+------+-------+--------+------+
2 rows in set (0.00 sec)

Last updated