Scala: 存取 MySQL 的資料
在 Spark 上透過 JDBC 存取 mySQL
在之前,我們介紹了 Spark SQL 的格式,在這個實作中,我們將不再使用 Spark SQL 的 RDD 作為資料庫讀取,而是使用一個j外部的 mySQL 資料庫作為讀寫的資料庫,並透過 JDBC 來進行操作。當我們要以 Spark 存取 SQL 資料庫時,由於 Spark 是一個基於 Java 的運算架構,最簡單的方式就是透過 JDBC (Java Database Connectivity) 來建立連線與資料存取。
資料庫設定
以下是測試用的 mySQL 資料庫設定:
$ mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 18
Server version: 5.7.24-0ubuntu0.16.04.1 (Ubuntu)
Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> create database spark;
Query OK, 1 row affected (0.00 sec)
mysql> use spark;
Database changed
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
Query OK, 0 rows affected (0.17 sec)
mysql> insert into student values (1, 'Xueqian', 'F', '23');
Query OK, 1 row affected (0.01 sec)
mysql> select * from student;
+------+---------+--------+------+
| id | name | gender | age |
+------+---------+--------+------+
| 1 | Xueqian | F | 23 |
+------+---------+--------+------+
1 row in set (0.00 sec)
目前的設定為:
(user) root -> (database) spark -> (table) student
student 的格式可以參考上方的執行成果。
Spark 程式撰寫 (讀寫 JDBC)
首先,要先更改 SBT 的設定檔,加入 SQL 的支援:
name := "testSQL"
version := "1.0"
scalaVersion := "2.11.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.2"
libraryDependencies += "mysql" % "mysql-connector-java" % "6.0.5"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
接著,以下是讀取 SQL 的 Scala 程式:
import scala.math.random
import org.apache.spark.sql.SparkSession
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
object testSQL {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("testSQL")
.config("spark.master", "local")
.getOrCreate()
// jdbc url
val jdbcurl = "jdbc:mysql://localhost:3306/spark"
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "lab711")
prop.put("driver", "com.mysql.jdbc.Driver")
// schema (table format)
val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("gender", StringType, true), StructField("age", IntegerType, true)))
// input data
val studentRDD1 = spark.sparkContext.parallelize(Array("1 Alice F 26","2 Bob M 27")).map(_.split(" "))
val studentRDD2 = spark.sparkContext.parallelize(Array("3 Cindy F 26","4 David M 27")).map(_.split(" "))
val rowRDD1 = studentRDD1.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
val rowRDD2 = studentRDD2.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
val studentDF1 = spark.createDataFrame(rowRDD1, schema)
val studentDF2 = spark.createDataFrame(rowRDD2, schema)
println("========sparkSQL format========")
studentDF1.show()
studentDF1.printSchema()
// Insert (append)
println("========Insert SQL========")
studentDF1.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
studentDF2.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
// Read 1 (for reuse, defined as var)
var readdf = spark.sqlContext.read.jdbc(jdbcurl, "spark.student", prop)
readdf.show()
// Insert (Overwrite)
studentDF1.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
studentDF2.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
// Read 2
readdf = spark.sqlContext.read.jdbc(jdbcurl, "spark.student", prop)
readdf.show()
spark.stop()
}
}
在這份程式中,我們示範了 3 種不同的操作: 附加寫入 (append)、覆蓋寫入 (overwrite) 以及讀取 (read)。
附加寫入 (append): 將資料附在 table 下方,不會清除原有資料
覆蓋寫入 (overwrite): 清空 table 資料後,將資料寫入 table
讀取 (read): 讀取 table 中的資訊
相同的,透過 sbt compile
和 sbt run
就可以執行該程式,並將結果寫入資料庫中。執行的結果,我們分別列出 Read 1 和 Read 2 兩個時間點的執行結果。
// Read 1
+---+-------+------+---+
| id| name|gender|age|
+---+-------+------+---+
| 1|Xueqian| F| 23|
| 1| Alice| F| 26|
| 2| Bob| M| 27|
| 3| Cindy| F| 26|
| 4| David| M| 27|
+---+-------+------+---+
// Read 2
+---+-----+------+---+
| id| name|gender|age|
+---+-----+------+---+
| 3|Cindy| F| 26|
| 4|David| M| 27|
+---+-----+------+---+
在 Read 1時,資料一共有 5 行,包括一開始創建 table 留下來的資訊,而在 Read 2 時,由於 overwrite 清空所有資料,也因此只剩下 2 行資訊。在程式結束後,我們也可以透過 SQL 指令查詢 "student" 裡的資訊,結果會和 Read 2 的資訊一致。
mysql> select * from student;
+------+-------+--------+------+
| id | name | gender | age |
+------+-------+--------+------+
| 3 | Cindy | F | 26 |
| 4 | David | M | 27 |
+------+-------+--------+------+
2 rows in set (0.00 sec)
Last updated
Was this helpful?