Spark Streaming

Spark Streaming 基本介紹

Spark Streaming 基本介紹

對於雲端運算而言,另一個問題是如何提供即時的資料處理。在原本 hadoop 的架構下,由於目標在處理大量的資料,並不提供即時的資料運算。然而,隨著雲端運算的概念發展,越來越多的應用需要及時結果,因此,也有許多平台 (如: Storm, Infosphere) 考慮了即時資料串流處理的方式,簡單來說,把輸入資料作為資料流,把運算做為流節點。提供透過各節點的處理,可以把有用的資料留下,作為放入儲存前的前處理,對於此類流運算而言,節點之間的同步成了最大的問題,也常常是該運算架構中的瓶頸。 對於 Spark 來說,流運算架構也基於前述的 RDD 資料結構,而把所輸入的資料切為多個小型的 RDD,並對每一個 RDD 進行運算,如下圖所示:

相同的,Spark Streaming 也可以用 WordCount 來簡單作為範例,如果只是為了 DEMO Spark Streaming 的功用,我們可以輸入以下兩行程式:

./bin/run-example streaming.NetworkWordCount localhost 9999

第一個程式是 Spark Streaming 的範例,該程式會透過監聽 port 為 9999 的 TCP 連線作為輸入。

$ nc -l -p 9999

另一方面,第二支程式則是作為輸入,利用 netcat 的功能,建立 TCP 連線,並可以輸入內容,而運作之後結果如下:

若運行範例出現以下錯誤: java.net.ConnectException: Connection refused (Connection refused),確認一下,指令: nc -l -p 9999,必須先運行。如果仍有問題,可以進一步確認 9999 這個通訊埠是否被占用。

在圖中,我們可以看到左邊視窗是 WordCount 的結果,而右邊是輸入的內容。在此範例中,每一行輸入都會被單獨計算,並和下一個時間的輸入互相獨立,因此,就算輸入相同的內容 ("hi"),字數並不會累計。

當然, 我們也可以一次輸入大量文字,此時可以看到在每秒 (1000 ms) 的區間內產生大量的計數結果。然而,若是要對串流資訊做更好的處理,就必須要考慮不同時間點計算結果之間的關聯性,這也是使用 Spark Streaming 時要特別設計的部分。

NetworkWordCount.scala

上面的程式是執行 NetworkWordCount 的成果,其原始的程式碼可以在下列網址找到: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

在此程式中,作為輸入的是 TCP socket 的連線。在這程式中,負責處理進入 streaming 資料的指令為:

val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

在第一行中,宣告了讀取 Streaming 的時間間格,在範例程式中,為 1 秒讀取一次。在第二行程式中,定義了輸入的資料格式為 socket 文字輸入 (ssc.socketTextStream),同時也設定了 socket 所聽取的 IP 位址 (args(0)) 以及通訊埠 (args(1).toInt),args(0) 和 args(1) 為使用者執行程式時輸入的指令,以及最後是資料儲存的格式 (MEMORY_AND_DISK_SER)。

在 Spark 中,資料儲存格式受到以下 5 個參數影響: useDisk、useMemory、useOffHeap、deserialized、replication。舉例來說,範例程式中的 MEMORY_AND_DISK_SER 就對應到 {T, T, F, F, 1} 的設定。如果我們希望資料有容錯 (有備份),則可以使用 MEMORY_AND_DISK_SER_2 的設定。 可以進一步參考: https://www.cnblogs.com/luogankun/p/3801047.html https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/storage/StorageLevel.html

在第二行程式中,資料來源的格式除了 socket 輸入外,還可以用其他的格式輸入,例如: fileStream,也可以支援其他的格式,包含: Kafka、Flume、Kinesis,甚至可以自定義格式 (python 目前不支援)。

在 Spark Streaming 中,需要特別注意的是,NetworkWordCount 這一隻程式是 socket client,而非 socket server。這點,我們可以從和 netcat (nc) 的分工中看出來,nc -l -p 9999的意義即是建立一個 socket server 並監聽 port 9999,也因此,在執行 NetworkWordCount 之前必須先開啟 netcat,以免找不到 socket server 而出錯。

NetworkWordCount.py

考慮到程式維護的便利性 (Scala 的入門門檻有點高,和學弟co-work有點麻煩...),之後應該會慢慢轉移到 python 來進行程式的開發。為此,在之後寫的文章中,都會加上 python 的部分。

from __future__ import print_function

import sys
import findspark
findspark.init() # for the SPARK environment 

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        sys.exit(-1)
    sc = SparkContext("local[3]", appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
        .map(lambda word: (word, 1))\
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

Last updated