提问 发文

大数据Flink大屏实时计算深度剖析

微微菌

| 2024-04-02 15:06 118 0 0

1. 实时计算应用场景
1.1 智能推荐
什么是智能推荐?
定义: 根据用户行为习惯所提供的数据, 系统提供策略模型,自动推荐符合用户行为的信息。
例举:
比如根据用户对商品的点击数据(时间周期,点击频次), 推荐类似的商品;
根据用户的评价与满意度, 推荐合适的品牌;
根据用户的使用习惯与点击行为,推荐类似的资讯。

应用案例:

在这里插入图片描述


1.2 实时数仓
什么是实时数仓
数据仓库(Data Warehouse),可简写为DW或DWH,是一个庞大的数据存储集合,通过对各种业务数
据进行筛选与整合,生成企业的分析性报告和各类报表,为企业的决策提供支持。实时仓库是基于
Storm/Spark(Streaming)/Flink等实时处理框架,构建的具备实时性特征的数据仓库。

应用案例
分析物流数据, 提升物流处理效率。


在这里插入图片描述


阿里巴巴菜鸟网络实时数仓设计:


在这里插入图片描述


数仓分层处理架构(流式ETL):
ODS -> DWD -> DWS -> ADS
ODS(Operation Data Store):操作数据层, 一般为原始采集数据。
DWD(Data Warehouse Detail) :明细数据层, 对数据经过清洗,也称为DWI。
DWS(Data Warehouse Service):汇总数据层,基于DWD层数据, 整合汇总成分析某一个主题域的服
务数据,一般是宽表, 由多个属性关联在一起的表, 比如用户行为日志信息:点赞、评论、收藏等。
ADS(Application Data Store): 应用数据层, 将结果同步至RDS数据库中, 一般做报表呈现使用。


在这里插入图片描述


1.3 大数据分析应用
IoT数据分析
什么是IoT
物联网是新一代信息技术,也是未来发展的趋势,英文全称为: Internet of things(IOT),顾名
思义, 物联网就是万物相联。物联网通过智能感知、识别技术与普适计算等通信感知技术,广泛
应用于网络的融合中,也因此被称为继计算机、互联网之后世界信息产业发展的第三次浪潮。


应用案例
物联网设备运营分析:


在这里插入图片描述


华为Iot数据分析平台架构:


在这里插入图片描述


智慧城市
城市中汽车越来越多, 川流不息,高德地图等APP通过技术手段采集了越来越多的摄像头、车流
的数据。
但道路却越来越拥堵,越来越多的城市开始通过大数据技术, 对城市实行智能化管理。
2018年, 杭州采用AI智慧城市,平均通行速度提高15%,监控摄像头日报警次数高达500次,识
别准确率超过92%,AI智慧城市通报占全体95%以上,在中国城市交通堵塞排行榜, 杭州从中国
第5名降至57名。

在这里插入图片描述

在这里插入图片描述


金融风控
风险是金融机构业务固有特性,与金融机构相伴而生。金融机构盈利的来源就是承担风险的风险溢
价。
金融机构中常见的六种风险:市场风险、信用风险、流动性风险、操作风险、声誉风险及法律风
险。其中最主要的是市场风险和信用风险。
线上信贷流程,通过后台大数据系统进行反欺诈和信用评估:


在这里插入图片描述


电商行业
用户在电商的购物网站数据通过实时大数据分析之后, 通过大屏汇总展示, 比如天猫的双11购物
活动,通过大屏, 将全国上亿买家的订单数据可视化,实时性的动态展示,包含总览数据,流式
TopN数据,多维区域统计数据等,极大的增强了对海量数据的可读性。


在这里插入图片描述


TopN排行:


在这里插入图片描述


2 Flink快速入门
大数据Flink概述
大数据Flink入门案例

3. Flink接入体系
3.1 Flink Connectors
Flink 连接器包含数据源输入与汇聚输出两部分。Flink自身内置了一些基础的连接器,数据源输入包含文件、目录、Socket以及 支持从collections 和 iterators 中读取数据;汇聚输出支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。
官方地址
Flink还可以支持扩展的连接器,能够与第三方系统进行交互。目前支持以下系统:

Flink还可以支持扩展的连接器,能够与第三方系统进行交互。目前支持以下系统:

Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
Google PubSub (source/sink)
JDBC (sink)
常用的是Kafka、ES、HDFS以及JDBC。

3.2 JDBC(读/写)
Flink Connectors JDBC 如何使用?
功能: 将集合数据写入数据库中

 <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.2</version>
</dependency>


代码:


import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;


public class JDBCConnectorApplication {
public static void main(String[] args)throws Exception {
// 1. 创建运行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 创建集合数据
List<String> list = Arrays.asList(
"192.168.116.141\t1601297294548\tPOST\taddOrder",
"192.168.116.142\t1601297294549\tGET\tgetOrder"
);
// 3. 读取集合数据,写入数据库
env.fromCollection(list).addSink(JdbcSink.sink(
// 配置SQL语句
"insert into t_access_log(ip, time, type, api) values(?, ?, ?, ?)",
new JdbcStatementBuilder<String>() {
@Override
public void accept(PreparedStatement preparedStatement,
String s) throws SQLException {
System.out.println("receive ====> " + s);
// 解析数据
String[] elements = String.valueOf(s).split("\t");
for (int i = 0; i < elements.length; i++) {
// 新增数据
preparedStatement.setString(i+1, elements[i]);
}
}
},
// JDBC 连接配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://192.168.116.141:3306/flink?useSSL=false")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
));
// 4. 执行任务
env.execute("jdbc-job");
}
}


数据表:


DROP TABLE IF EXISTS `t_access_log`;
CREATE TABLE `t_access_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`ip` varchar(32) NOT NULL COMMENT 'IP地址',
`time` varchar(255) NULL DEFAULT NULL COMMENT '访问时间',
`type` varchar(32) NOT NULL COMMENT '请求类型',
`api` varchar(32) NOT NULL COMMENT 'API地址',
PRIMARY KEY (`id`)
) ENGINE = InnoDB AUTO_INCREMENT=1;


自定义写入数据源
功能:读取Socket数据, 采用流方式写入数据库中。
代码:


public class CustomSinkApplication {
public static void main(String[] args)throws Exception {
// 1. 创建运行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取Socket数据源
DataStreamSource<String> socketTextStream =
env.socketTextStream("192.168.116.141", 9911, "\n");
// 3. 转换处理流数据
SingleOutputStreamOperator<AccessLog> outputStreamOperator =
socketTextStream.map(new MapFunction<String, AccessLog>() {
@Override
public AccessLog map(String s) throws Exception {
System.out.println(s);
// 根据分隔符解析数据
String[] elements = s.split("\t");
// 将数据组装为对象
AccessLog accessLog = new AccessLog();
accessLog.setNum(1);
for (int i = 0; i < elements.length; i++) {
if (i == 0) accessLog.setIp(elements[i]);
if (i == 1) accessLog.setTime(elements[i]);
if (i == 2) accessLog.setType(elements[i]);
if (i == 3) accessLog.setApi(elements[i]);
}
return accessLog;
}
});
// 4. 配置自定义写入数据源
outputStreamOperator.addSink(new MySQLSinkFunction());
// 5. 执行任务
env.execute("custom jdbc sink");
}


自定义数据源


   private static class MySQLSinkFunction extends RichSinkFunction<AccessLog>{

private Connection connection;

private PreparedStatement preparedStatement;
@Override
public void open(Configuration parameters) throws Exception {
String url="jdbc:mysql://192.168.11.14:3306/flik?useSSL=fales";
String username="admin";
String password="admin";

connection= DriverManager.getConnection(url,username,password);
String sql="insert into xxx_log(ip,time,type,api) valuse(?,?,?,?)"
preparedStatement=connection.prepareStatement(sql);
}

@Override
public void close() throws Exception {
try {
if (null==connection)connection.close();
connection=null;
}catch (Exception e){
e.printStackTrace();
}
}


@Override
public void invoke(AccessLog accessLog, Context context) throws Exception {
preparedStatement.setString(1,accessLog.getIp());
preparedStatement.setString(2,accessLog.getTime());
preparedStatement.setString(3,accessLog.getType());
preparedStatement.setString(4,accessLog.getApi());
preparedStatement.execute();
}
}


AccessLog:


@Data
public class AccessLog {
/**
* IP地址
*/
private String ip;
/**
* 访问时间
*/
private String time;
/**
* 请求类型
*/
private String type;
/**
* API地址
*/
private String api;
private Integer num;


}


测试数据:注意 \t


192.168.116.141 1603166893313 GET getOrder
192.168.116.142 1603166893314 POST addOrder


自定义读取数据源
功能: 读取数据库中的数据, 并将结果打印出来。
代码:


   public static void main(String[] args) {
// 1. 创建运行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 配置自定义MySQL读取数据源
DataStreamSource<AccessLog> streamSource = env.addSource(new
MySQLSourceFunction());
// 3. 设置并行度
streamSource.print().setParallelism(1);
// 4. 执行任务
env.execute("custom jdbc source");

}


3.3 HDFS(读/写)

通过Sink写入HDFS数据
功能: 将Socket接收到的数据, 写入至HDFS文件中。

依赖


 <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.1</version>
</dependency>


代码:


import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

public class HDFSSinkApplication {
public static void main(String[] args) {
// 1. 创建运行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取Socket数据源
// DataStreamSource<String> socketTextStream =
env.socketTextStream("127.0.0.1", 9911, "\n");
DataStreamSource<String> socketTextStream =
env.socketTextStream("192.168.116.141", 9911, "\n");
// 3. 创建hdfs sink
BucketingSink<String> bucketingSink = new BucketingSink<>("F:/oldlu/Flink/hdfs");
bucketingSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm"));
bucketingSink.setWriter(new StringWriter())
.setBatchSize(5 * 1024)// 设置每个文件的大小
.setBatchRolloverInterval(5 * 1000)// 设置滚动写入新文件的时间
.setInactiveBucketCheckInterval(30 * 1000)// 30秒检查一次不写入 的文件
.setInactiveBucketThreshold(60 * 1000);// 60秒不写入,就滚动写入新的文件
// 4. 写入至HDFS文件中
socketTextStream.addSink(bucketingSink).setParallelism(1);
// 5. 执行任务
env.execute("flink hdfs source");

}
}


数据源模拟实现:


     <!-- Netty 核心组件依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
<!-- spring boot 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- Spring data jpa 组件依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.jdbc.version}</version>
</dependency>
<!-- Redis 缓存依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>


代码:


public class NettyServerHandler extends ChannelInboundHandlerAdapter {

/* 客户端通道记录集合*/
public static List<Channel> channelList = new ArrayList<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Server >>>> 连接已建立:" + ctx);
super.channelActive(ctx);
// 将成功建立的连接通道,加入到集合当中
channelList.add(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
System.out.println("Server >>>> 收到的消息:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println("Server >>>> 读取数据出现异常");
cause.printStackTrace();
ctx.close();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
super.channelUnregistered(ctx);
// 移除无效的连接通道
channelList.remove(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
// 移除无效的连接通道
channelList.remove(ctx.channel());
}
}


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.Random;


public class SocketSourceApplication {
/**
* 服务端的端口
*/
private int port;
/**
* 初始化构造方法
*
* @param port
*/
public SocketSourceApplication(int port) {
this.port = port;
}
/**
* ip 访问列表
*/
private static String[] accessIps = new String[]{
"192.168.116.141",
"192.168.116.142",
"192.168.116.143"
};
/**
* 请求访问类型
*/
private static String[] accessTypes = new String[]{
"GET",
"POST",
"PUT"
};
/**
* 请求接口信息
*/
private static String[] accessApis = new String[]{
"addOrder",
"getAccount",
"getOrder"
};
public void runServer() throws Exception {
// 1. 创建netty服务
// 2. 定义事件boss监听组
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 3. 定义用来处理已经被接收的连接
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 4. 定义nio服务启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 5. 配置nio服务启动的相关参数
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// tcp最大缓存连接个数,tcp_max_syn_backlog(半连接上限数量)
.option(ChannelOption.SO_BACKLOG, 128)
// 保持连接的正常状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 根据日志级别打印输出
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
// 管道注册handler
ChannelPipeline pipeline = socketChannel.pipeline();
// 编码通道处理
pipeline.addLast("decode", new StringDecoder());
// 转码通道处理
pipeline.addLast("encode", new StringEncoder());
// 处理接收到的请求
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println(">>>>>server 启动<<<<<<<");
// 6. 开启新线程,模拟数据,广播发送
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
String accessLog = getAccessLog();
System.out.println("broadcast (" +
NettyServerHandler.channelList.size() + ") ==> " + accessLog);
if (NettyServerHandler.channelList.size() > 0) {
for (Channel channel :
NettyServerHandler.channelList) {
channel.writeAndFlush(accessLog);
}
}
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// 7. 启动netty服务
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取访问日志
*
* @return
*/
private String getAccessLog() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(accessIps[new
Random().nextInt(accessIps.length)]).append("\t")
.append(System.currentTimeMillis()).append("\t")
.append(accessTypes[new
Random().nextInt(accessTypes.length)]).append("\t")
.append(accessApis[new
Random().nextInt(accessApis.length)]).append("\t\n");
return stringBuilder.toString();
}
/**
* netty服务端启动
*
* @param args
*/
public static void main(String[] args) throws Exception {
new SocketSourceApplication(9911).runServer();
}
}


读取HDFS文件数据


import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @Auther: Ybb
* @Date: 2021/08/29/1:14 下午
* @Description:
*/
public class HDFSSourceApplication {
public static void main(String[] args) {
// 1. 创建运行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取HDFS数据源
DataStreamSource<String> file =
env.readTextFile("hdfs://192.168.116.141:9090/hadoop-env.sh");
// 3. 打印文件内容
file.print().setParallelism(1);
// 4. 执行任务
env.execute("flink hdfs source");

}
}


Hadoop环境安装

  1. 配置免密码登录
    生成秘钥:

[root@flink1 hadoop-2.6.0-cdh5.15.2]# ssh-keygen -t rsa -P ''
Generating public/private rsa key pair.


将秘钥写入认证文件:

[root@flink1 .ssh]# cat id_rsa.pub >> ~/.ssh/authorized_keys


 修改认证文件权限:

[root@flink1 .ssh]# chmod 600 ~/.ssh/authorized_keys


  1. 配置环境变量
    将Hadoop安装包解压, 将Hadoop加入环境变量/etc/profile:

export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.15.2
export PATH=$HADOOP_HOME/bin:$PATH


执行生效:

source /etc/profile
1
修改Hadoop配置文件
1) 修改hadoop-env.sh文件
vi /opt/hadoop-2.6.0-cdh5.15.2/etc/hadoop/hadoop-env.sh
1
修改JAVA_HOME:

export JAVA_HOME=/opt/jdk1.8.0_301
1
2)修改core-site.xml文件


<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://flink:9090</value>
</property>
</configuration>


这里的主机名称是flink。
3)修改hdfs-site.xml文件


<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop-2.6.0-cdh5.15.2/tmp</value>
</property>
</configuration>


4)修改mapred-site.xml文件


<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

</configuration>


5)修改slaves文件


flink


这里配置的是单节点, 指向本机主机名称。
6)修改yarn-site.xml


<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>


  1. 启动Hadoop服务

[root@flink hadoop-2.6.0-cdh5.15.2]# ./sbin/start-all.sh
This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh
21/08/23 11:59:17 WARN util.NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes where
applicable
Starting namenodes on [flink]
flink: starting namenode, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/hadoop-root-namenode-flink.out
flink: starting datanode, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/hadoop-root-datanode-flink.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/hadoop-root-secondarynamenode-flink.out
21/08/23 11:59:45 WARN util.NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes where
applicable
starting yarn daemons
starting resourcemanager, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/yarn-root-resourcemanager-flink.out
flink: starting nodemanager, logging to /opt/hadoop-2.6.0-
cdh5.15.2/logs/yarn-root-nodemanager-flink.out


上传一个文件, 用于测试:


hdfs dfs -put /opt/hadoop-2.6.0-cdh5.15.2/etc/hadoop/hadoop-env.sh /


本文为二次转载。 

 

 

 

收藏 0
分享
分享方式
微信

评论

游客

全部 0条评论

10603

文章

11.88W+

人气

19

粉丝

1

关注

官方媒体

轻松设计高效搭建,减少3倍设计改稿与开发运维工作量

开始免费试用 预约演示

扫一扫关注公众号 扫一扫联系客服

©Copyrights 2016-2022 杭州易知微科技有限公司 浙ICP备2021017017号-3 浙公网安备33011002011932号

互联网信息服务业务 合字B2-20220090

400-8505-905 复制
免费试用
微信社区
易知微-数据可视化
微信扫一扫入群