sparkstreaming测试之二使用网络数据源
测试思路:
创新互联致力于互联网品牌建设与网络营销,包括成都网站设计、网站建设、SEO优化、网络推广、整站优化营销策划推广、电子商务、移动互联网营销等。创新互联为不同类型的客户提供良好的互联网应用定制及解决方案,创新互联核心团队十年专注互联网开发,积累了丰富的网站经验,为广大企业客户提供一站式企业网站建设服务,在网站建设行业内树立了良好口碑。
首先,创建网络数据源数据发送器(程序一);
其次,创建spark接收数据程序(程序二);
接着,将程序一打包,放在服务器上执行。这里有三个参数分别是:所要发送的数据文件,通过哪个端口号发送,每隔多少毫秒发送一次数据;
最后,运行spark程序,这里每隔5秒处理一次数据。有两个参数:监听的端口号,每隔多少毫秒接收一次数据。
观察效果。
程序一:
sparkStreaming import java.io.PrintWriter import java.net.ServerSocket import scala.io.Source object SalaSimulation { (length: ) = { java.util.Random rdm = Random rdm.nextInt(length) } (args: Array[]){ (args.length != ){ System..println() System.() } filename = args() lines = Source.(filename).getLines.toList filerow = lines.length listener = ServerSocket(args().toInt) (){ socket = listener.accept() Thread(){ = { (+socket.getInetAddress) out = PrintWriter(socket.getOutputStream()) (){ Thread.(args().toLong) content = lines((filerow)) (content) out.write(content +) out.flush() } socket.close() } }.start() } } }
程序二:
sparkStreaming import org.apache.log4j.{LoggerLevel} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{SecondsStreamingContext} import org.apache.spark.{SparkContextSparkConf} import org.apache.spark.streaming.StreamingContext._ object NetworkWordCount { def main(args: Array[]){ Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) conf = SparkConf().setAppName().setMaster() sc = SparkContext(conf) ssc = StreamingContext(sc()) lines = ssc.socketTextStream(args()args().toIntStorageLevel.) words = lines.flatMap(_.split()) wordCounts = words.map(x=>(x)).reduceByKey(_+_) wordCounts.print() ssc.start() ssc.awaitTermination() } }
新闻标题:sparkstreaming测试之二使用网络数据源
网站URL:http://cdiso.cn/article/gohgsi.html