Spark Streaming
Spark Streaming 基本介紹
Last updated
Spark Streaming 基本介紹
Last updated
對於雲端運算而言,另一個問題是如何提供即時的資料處理。在原本 hadoop 的架構下,由於目標在處理大量的資料,並不提供即時的資料運算。然而,隨著雲端運算的概念發展,越來越多的應用需要及時結果,因此,也有許多平台 (如: Storm, Infosphere) 考慮了即時資料串流處理的方式,簡單來說,把輸入資料作為資料流,把運算做為流節點。提供透過各節點的處理,可以把有用的資料留下,作為放入儲存前的前處理,對於此類流運算而言,節點之間的同步成了最大的問題,也常常是該運算架構中的瓶頸。 對於 Spark 來說,流運算架構也基於前述的 RDD 資料結構,而把所輸入的資料切為多個小型的 RDD,並對每一個 RDD 進行運算,如下圖所示:
相同的,Spark Streaming 也可以用 WordCount 來簡單作為範例,如果只是為了 DEMO Spark Streaming 的功用,我們可以輸入以下兩行程式:
第一個程式是 Spark Streaming 的範例,該程式會透過監聽 port 為 9999 的 TCP 連線作為輸入。
另一方面,第二支程式則是作為輸入,利用 netcat 的功能,建立 TCP 連線,並可以輸入內容,而運作之後結果如下:
若運行範例出現以下錯誤: java.net.ConnectException: Connection refused (Connection refused),確認一下,指令: nc -l -p 9999,必須先運行。如果仍有問題,可以進一步確認 9999 這個通訊埠是否被占用。
在圖中,我們可以看到左邊視窗是 WordCount 的結果,而右邊是輸入的內容。在此範例中,每一行輸入都會被單獨計算,並和下一個時間的輸入互相獨立,因此,就算輸入相同的內容 ("hi"),字數並不會累計。
當然, 我們也可以一次輸入大量文字,此時可以看到在每秒 (1000 ms) 的區間內產生大量的計數結果。然而,若是要對串流資訊做更好的處理,就必須要考慮不同時間點計算結果之間的關聯性,這也是使用 Spark Streaming 時要特別設計的部分。
上面的程式是執行 NetworkWordCount 的成果,其原始的程式碼可以在下列網址找到: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
在此程式中,作為輸入的是 TCP socket 的連線。在這程式中,負責處理進入 streaming 資料的指令為:
在第一行中,宣告了讀取 Streaming 的時間間格,在範例程式中,為 1 秒讀取一次。在第二行程式中,定義了輸入的資料格式為 socket 文字輸入 (ssc.socketTextStream),同時也設定了 socket 所聽取的 IP 位址 (args(0)) 以及通訊埠 (args(1).toInt),args(0) 和 args(1) 為使用者執行程式時輸入的指令,以及最後是資料儲存的格式 (MEMORY_AND_DISK_SER)。
在 Spark 中,資料儲存格式受到以下 5 個參數影響: useDisk、useMemory、useOffHeap、deserialized、replication。舉例來說,範例程式中的 MEMORY_AND_DISK_SER 就對應到 {T, T, F, F, 1} 的設定。如果我們希望資料有容錯 (有備份),則可以使用 MEMORY_AND_DISK_SER_2 的設定。 可以進一步參考: https://www.cnblogs.com/luogankun/p/3801047.html https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/storage/StorageLevel.html
在第二行程式中,資料來源的格式除了 socket 輸入外,還可以用其他的格式輸入,例如: fileStream,也可以支援其他的格式,包含: Kafka、Flume、Kinesis,甚至可以自定義格式 (python 目前不支援)。
在 Spark Streaming 中,需要特別注意的是,NetworkWordCount 這一隻程式是 socket client,而非 socket server。這點,我們可以從和 netcat (nc) 的分工中看出來,nc -l -p 9999
的意義即是建立一個 socket server 並監聽 port 9999,也因此,在執行 NetworkWordCount 之前必須先開啟 netcat,以免找不到 socket server 而出錯。
考慮到程式維護的便利性 (Scala 的入門門檻有點高,和學弟co-work有點麻煩...),之後應該會慢慢轉移到 python 來進行程式的開發。為此,在之後寫的文章中,都會加上 python 的部分。