Monday 23 September 2019

spark_streaming_examples

Create Spark Streaming Context:
==========================================
scala:
---------------


import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

// not necessary since Spark 1.3


// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

// Split each line into words
val words = lines.flatMap(_.split(" "))


import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate



java:
---------------
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;

import scala.Tuple2;


// Create a local StreamingContext with two working thread and batch interval of 1
second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

// Split each line into words
JavaDStream<String> words = lines.flatMap(
  new FlatMapFunction<String, String>() {
    @Override public Iterable<String> call(String x) {
      return Arrays.asList(x.split(" "));
    }
  });


// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(
  new PairFunction<String, String, Integer>() {
    @Override public Tuple2<String, Integer> call(String s) {
      return new Tuple2<String, Integer>(s, 1);
    }
  });
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
  new Function2<Integer, Integer, Integer>() {
    @Override public Integer call(Integer i1, Integer i2) {
      return i1 + i2;
    }
  });


// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();


jssc.start();             
// Start the computation
jssc.awaitTermination(); 
// Wait for the computation to terminate





python:
---------------
from pyspark
import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate





==================================================================

nc -lk 9999

$SPARK_HOME/bin/run-example streaming.NetworkWordCount localhost 9999

$SPARK_HOME/bin/run-example streaming.JavaNetworkWordCount localhost 9999

$SPARK_HOME/bin/spark-submit $SPARK_HOME/examples/src/main/python/streaming/network_wordcount.py localhost 9999

==================================================================

$SPARK_HOME/bin/run-example streaming.TwitterPopularTags FlRx3d0n8duIQ0UvGeGtTA DS7TTbxhmQ7oCUlDntpQQRqQllFFOiyNoOMEDD0lA 1643982224-xTfNpLrARoWKxRh9KtFqc7aoB8KAAHkCcfC5vDk PqkbuBqF3AVskgx1OKgXKOZzV7EMWRmRG0p8hvLQYKs

==================================================================

$FLUME_HOME/bin/flume-ng agent -n spark-flume --conf $FLUME_HOME/conf -f $FLUME_HOME/test/spark-flume.conf -Dflume.root.logger=DEBUG,console

telnet localhost 3000

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.FlumeEventCount localhost 1234

==================================================================

$SPARK_HOME/bin/run-example streaming.HdfsWordCount file:/home/orienit/spark/input/test

==================================================================
$SPARK_HOME/bin/run-example streaming.QueueStream

==================================================================

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream activeUserCount localhost 44444

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream popularUsersSeen localhost 44444



==================================================================
Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 my-consumer-group topic1,topic2 1


==================================================================



$SPARK_HOME/bin/run-example streaming.StatefulNetworkWordCount localhost 9999









3 comments:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...