# Spark Streaming

## Spark Streaming 基本介紹

對於雲端運算而言，另一個問題是如何提供即時的資料處理。在原本 hadoop 的架構下，由於目標在處理大量的資料，並不提供即時的資料運算。然而，隨著雲端運算的概念發展，越來越多的應用需要及時結果，因此，也有許多平台 (如: Storm, Infosphere) 考慮了即時資料串流處理的方式，簡單來說，把輸入資料作為資料流，把運算做為流節點。提供透過各節點的處理，可以把有用的資料留下，作為放入儲存前的前處理，對於此類流運算而言，節點之間的同步成了最大的問題，也常常是該運算架構中的瓶頸。\
\
對於 Spark 來說，流運算架構也基於前述的 RDD 資料結構，而把所輸入的資料切為多個小型的 RDD，並對每一個 RDD 進行運算，如下圖所示:

![來自: https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html](/files/-LQ7Xca42deocfy2oDoL)

相同的，Spark Streaming 也可以用 WordCount 來簡單作為範例，如果只是為了 DEMO Spark Streaming 的功用，我們可以輸入以下兩行程式:

```
./bin/run-example streaming.NetworkWordCount localhost 9999
```

第一個程式是 Spark Streaming 的範例，該程式會透過監聽 port 為 9999 的 TCP 連線作為輸入。

```
$ nc -l -p 9999
```

另一方面，第二支程式則是作為輸入，利用 netcat 的功能，建立 TCP 連線，並可以輸入內容，而運作之後結果如下:

![Spark Streaming (example 1)](/files/-LQ7Y-d6PgMuRVntdz9M)

{% hint style="warning" %}
若運行範例出現以下錯誤: java.net.ConnectException: Connection refused (Connection refused)，確認一下，指令: nc -l -p 9999，必須先運行。如果仍有問題，可以進一步確認 9999 這個通訊埠是否被占用。
{% endhint %}

在圖中，我們可以看到左邊視窗是 WordCount 的結果，而右邊是輸入的內容。在此範例中，每一行輸入都會被單獨計算，並和下一個時間的輸入互相獨立，因此，就算輸入相同的內容 ("hi")，字數並不會累計。

![Spark Streaming (example 2)](/files/-LQ7YAe5EcuGqFnCIJ6M)

當然, 我們也可以一次輸入大量文字，此時可以看到在每秒 (1000 ms) 的區間內產生大量的計數結果。然而，若是要對串流資訊做更好的處理，就必須要考慮不同時間點計算結果之間的關聯性，這也是使用 Spark Streaming 時要特別設計的部分。

## NetworkWordCount.scala

上面的程式是執行 NetworkWordCount 的成果，其原始的程式碼可以在下列網址找到: <https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala>

```
// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println
```

在此程式中，作為輸入的是 TCP socket 的連線。在這程式中，負責處理進入 streaming 資料的指令為:

```
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
```

在第一行中，宣告了讀取 Streaming 的時間間格，在範例程式中，為 1 秒讀取一次。在第二行程式中，定義了輸入的資料格式為 socket 文字輸入 (ssc.socketTextStream)，同時也設定了 socket 所聽取的 IP 位址 (args(0)) 以及通訊埠 (args(1).toInt)，args(0) 和 args(1) 為使用者執行程式時輸入的指令，以及最後是資料儲存的格式 (MEMORY\_AND\_DISK\_SER)。

{% hint style="info" %}
在 Spark 中，資料儲存格式受到以下 5 個參數影響: useDisk、useMemory、useOffHeap、deserialized、replication。舉例來說，範例程式中的 MEMORY\_AND\_DISK\_SER 就對應到 {T, T, F, F, 1} 的設定。如果我們希望資料有容錯 (有備份)，則可以使用 MEMORY\_AND\_DISK\_SER\_2 的設定。\
可以進一步參考:\
<https://www.cnblogs.com/luogankun/p/3801047.html>\
<https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/storage/StorageLevel.html>
{% endhint %}

在第二行程式中，資料來源的格式除了 socket 輸入外，還可以用其他的格式輸入，例如:  fileStream，也可以支援其他的格式，包含: Kafka、Flume、Kinesis，甚至可以自定義格式 (python 目前不支援)。

在 Spark Streaming 中，需要特別注意的是，NetworkWordCount 這一隻程式是 socket client，而非 socket server。這點，我們可以從和 netcat (nc) 的分工中看出來，`nc -l -p 9999`的意義即是建立一個 socket server 並監聽 port 9999，也因此，在執行 NetworkWordCount 之前必須先開啟 netcat，以免找不到 socket server 而出錯。

## NetworkWordCount.py

考慮到程式維護的便利性 (Scala 的入門門檻有點高，和學弟co-work有點麻煩...)，之後應該會慢慢轉移到 python 來進行程式的開發。為此，在之後寫的文章中，都會加上 python 的部分。

```
from __future__ import print_function

import sys
import findspark
findspark.init() # for the SPARK environment 

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        sys.exit(-1)
    sc = SparkContext("local[3]", appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
        .map(lambda word: (word, 1))\
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://spark-nctu.gitbook.io/spark/spark-jie/spark-streaming.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
