提问 发文

流式计算来袭,使用Python和PySpark处理流式数据

微微菌

| 2024-03-14 13:49 163 0 0

Apache Spark是一个开源的分析工具,适用于数据工程、数据科学、机器学习和其他与数据有关的实践的从小型到大型的数据处理。使用Apache Spark,我们可以并行处理大量数据,将其连接到数百个来源,我们还可以利用微批流。此外,根据用户的需要,Spark流式数据具有高度的容错性。

PySpark使我们能够使用Apache Spark的力量,同时又能利用Python编程语言的简单性。

在这篇文章中,我们将尝试创建一个简单的Python脚本来创建流式数据管道。我们将解释如何使用具有非常低延迟的连续流。我们还将讨论微批流和比较这两种类型的流。

什么是PySpark流式数据?
当我们说到流式数据时,大多数人都会想到视频或音乐的流媒体。然而,流式数据被我们周围的各种设备所使用 -- 健康配件、智能手表、智能手机、智能家居设备等。

在PySpark中,我们区分了两种主要的流式数据类型 -- 连续流和微批流。

连续流
在连续流中,输入数据使用面向流的数据处理引擎进行处理,该引擎可以实时执行聚合、连接和窗口化等操作。

随着新数据的到来,这种处理管道的输出被持续更新(聚合/合并/连接)。在PySpark中,我们把这种流称为结构化流。虽然它是延迟最低的"最快"解决方案,但它不像微批流那样具有容错性,而且需要强大的集群来运行。

微批流
在微批流中,输入数据是以小而离散的数据批处理的。使用这种方法,我们可以使用通常用于成批数据的数据转换,同时保持低(但大于连续流)延迟、高容错性和对窗口功能的支持。

使用案例
在PySpark中,我们可以使用readStream和writeStream方法来读/写数据流。

readStream方法允许我们从Spark支持的任何来源开始读取。这个方法返回流式数据帧。

writeStream方法允许我们将readStream创建的流媒体DataFrame同时写到一个或多个位置。

对于连续流,我们可以使用这个简单的代码片段。它的作用是什么?当这个脚本运行时,我们将能够向MacOs终端输入文字,我们将把它作为一个源。同时,该脚本将在VS Code中的jupyter笔记本中打印处理后的流,延迟非常低。

from pyspark.sql import SparkSession

# 创建一个Spark会话
spark = SparkSession.builder.appName("Continuous Stream").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF")

# 从流式数据源读取数据
streamingDF = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

# 定义处理逻辑
processedDF = streamingDF.selectExpr("value as input", "length(value) as length")

# 将输出写入一个流式处理器
query = processedDF.writeStream.format("console").start()

# 开启数据流
query.awaitTermination(20) # Start the stream for 20 seconds to allow input to be typed in the terminal

# 20秒后,关闭数据流
query.stop()


注意,我们使用readStream方法从源“socket”中读取数据流。我们的输入被加载为常规的PySpark DataFrame。加载后,我们使用selectExpr方法,通过表达式从DataFrame中进行选择。在我们的例子中,我们将创建一个新的列,叫做input,由sql命令值生成的输入,基本上就是我们在终端输入的值。然后,我们将有一个由sql命令length(value)生成的长度列,它是我们输入的长度(字符数)。

在PySpark中,在我们用.start()启动流之前,操作不会被运行。你可以同时应用许多转换,但除非我们执行动作--在我们的例子中是.start(),否则它不会开始执行过程。

在代码片段中,我们使用localhost:9999的套接字作为我们的源。为了将数据传递给流,我们必须首先在localhost上启动9999端口的监听服务器。要做到这一点,我们必须。

在MasOS终端输入:nc -l 9999 。这将使用netcat工具启动9999端口的监听服务器。在Windows PowerShell中输入:New-Object System.Net.Sockets.TcpListener(9999).Start()。这个命令使用.NET框架中的TcpListener类在9999端口创建了一个新的TCP监听器。在我们开始监听9999端口后,我们可以运行带有流代码的jupyer笔记本单元,并开始在终端/PowerShell中打字。

推荐书单
《Python从入门到精通(第2版)》
《Python从入门到精通(第2版)》从初学者角度出发,通过通俗易懂的语言、丰富多彩的实例,详细介绍了使用Python进行程序开发应该掌握的各方面技术。全书共分23章,包括初识Python、Python语言基础、运算符与表达式、流程控制语句、列表和元组、字典和集合、字符串、Python中使用正则表达式、函数、面向对象程序设计、模块、异常处理及程序调试、文件及目录操作、操作数据库、GUI界面编程、Pygame游戏编程、网络爬虫开发、使用进程和线程、网络编程、Web编程、Flask框架、e起去旅行网站、AI图像识别工具等内容。所有知识都结合具体实例进行介绍,涉及的程序代码都给出了详细的注释,读者可轻松领会Python程序开发的精髓,快速提升开发技能。除此之外,该书还附配了243集高清教学微视频及PPT电子教案。


————————————————

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/weixin_39915649/article/details/131121513

收藏 0
分享
分享方式
微信

评论

游客

全部 0条评论

轻松设计高效搭建,减少3倍设计改稿与开发运维工作量

开始免费试用 预约演示

扫一扫关注公众号 扫一扫联系客服

©Copyrights 2016-2022 杭州易知微科技有限公司 浙ICP备2021017017号-3 浙公网安备33011002011932号

互联网信息服务业务 合字B2-20220090

400-8505-905 复制
免费试用
微信社区
易知微-数据可视化
微信扫一扫入群