# Spark Streaming

## Spark Streaming 基本介紹

對於雲端運算而言，另一個問題是如何提供即時的資料處理。在原本 hadoop 的架構下，由於目標在處理大量的資料，並不提供即時的資料運算。然而，隨著雲端運算的概念發展，越來越多的應用需要及時結果，因此，也有許多平台 (如: Storm, Infosphere) 考慮了即時資料串流處理的方式，簡單來說，把輸入資料作為資料流，把運算做為流節點。提供透過各節點的處理，可以把有用的資料留下，作為放入儲存前的前處理，對於此類流運算而言，節點之間的同步成了最大的問題，也常常是該運算架構中的瓶頸。\
\
對於 Spark 來說，流運算架構也基於前述的 RDD 資料結構，而把所輸入的資料切為多個小型的 RDD，並對每一個 RDD 進行運算，如下圖所示:

![來自: https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html](https://13218333-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LPzeYWaV4cPH8nI_rTk%2F-LQ7XQWNzY_I2F5UWY_s%2F-LQ7Xca42deocfy2oDoL%2Fstreaming-flow.png?alt=media\&token=a9007e03-17cc-468d-89de-03601bf021c7)

相同的，Spark Streaming 也可以用 WordCount 來簡單作為範例，如果只是為了 DEMO Spark Streaming 的功用，我們可以輸入以下兩行程式:

```
./bin/run-example streaming.NetworkWordCount localhost 9999
```

第一個程式是 Spark Streaming 的範例，該程式會透過監聽 port 為 9999 的 TCP 連線作為輸入。

```
$ nc -l -p 9999
```

另一方面，第二支程式則是作為輸入，利用 netcat 的功能，建立 TCP 連線，並可以輸入內容，而運作之後結果如下:

![Spark Streaming (example 1)](https://13218333-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LPzeYWaV4cPH8nI_rTk%2F-LQ7XQWNzY_I2F5UWY_s%2F-LQ7Y-d6PgMuRVntdz9M%2FSS1.PNG?alt=media\&token=6d073a6c-31a4-4de3-82c7-08df01055747)

{% hint style="warning" %}
若運行範例出現以下錯誤: java.net.ConnectException: Connection refused (Connection refused)，確認一下，指令: nc -l -p 9999，必須先運行。如果仍有問題，可以進一步確認 9999 這個通訊埠是否被占用。
{% endhint %}

在圖中，我們可以看到左邊視窗是 WordCount 的結果，而右邊是輸入的內容。在此範例中，每一行輸入都會被單獨計算，並和下一個時間的輸入互相獨立，因此，就算輸入相同的內容 ("hi")，字數並不會累計。

![Spark Streaming (example 2)](https://13218333-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LPzeYWaV4cPH8nI_rTk%2F-LQ7XQWNzY_I2F5UWY_s%2F-LQ7YAe5EcuGqFnCIJ6M%2FSS2.PNG?alt=media\&token=e503143f-4c7b-4753-b056-02fe8879f2c8)

當然, 我們也可以一次輸入大量文字，此時可以看到在每秒 (1000 ms) 的區間內產生大量的計數結果。然而，若是要對串流資訊做更好的處理，就必須要考慮不同時間點計算結果之間的關聯性，這也是使用 Spark Streaming 時要特別設計的部分。

## NetworkWordCount.scala

上面的程式是執行 NetworkWordCount 的成果，其原始的程式碼可以在下列網址找到: <https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala>

```
// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println
```

在此程式中，作為輸入的是 TCP socket 的連線。在這程式中，負責處理進入 streaming 資料的指令為:

```
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
```

在第一行中，宣告了讀取 Streaming 的時間間格，在範例程式中，為 1 秒讀取一次。在第二行程式中，定義了輸入的資料格式為 socket 文字輸入 (ssc.socketTextStream)，同時也設定了 socket 所聽取的 IP 位址 (args(0)) 以及通訊埠 (args(1).toInt)，args(0) 和 args(1) 為使用者執行程式時輸入的指令，以及最後是資料儲存的格式 (MEMORY\_AND\_DISK\_SER)。

{% hint style="info" %}
在 Spark 中，資料儲存格式受到以下 5 個參數影響: useDisk、useMemory、useOffHeap、deserialized、replication。舉例來說，範例程式中的 MEMORY\_AND\_DISK\_SER 就對應到 {T, T, F, F, 1} 的設定。如果我們希望資料有容錯 (有備份)，則可以使用 MEMORY\_AND\_DISK\_SER\_2 的設定。\
可以進一步參考:\
<https://www.cnblogs.com/luogankun/p/3801047.html>\
<https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/storage/StorageLevel.html>
{% endhint %}

在第二行程式中，資料來源的格式除了 socket 輸入外，還可以用其他的格式輸入，例如:  fileStream，也可以支援其他的格式，包含: Kafka、Flume、Kinesis，甚至可以自定義格式 (python 目前不支援)。

在 Spark Streaming 中，需要特別注意的是，NetworkWordCount 這一隻程式是 socket client，而非 socket server。這點，我們可以從和 netcat (nc) 的分工中看出來，`nc -l -p 9999`的意義即是建立一個 socket server 並監聽 port 9999，也因此，在執行 NetworkWordCount 之前必須先開啟 netcat，以免找不到 socket server 而出錯。

## NetworkWordCount.py

考慮到程式維護的便利性 (Scala 的入門門檻有點高，和學弟co-work有點麻煩...)，之後應該會慢慢轉移到 python 來進行程式的開發。為此，在之後寫的文章中，都會加上 python 的部分。

```
from __future__ import print_function

import sys
import findspark
findspark.init() # for the SPARK environment 

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        sys.exit(-1)
    sc = SparkContext("local[3]", appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
        .map(lambda word: (word, 1))\
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

```
