技术标签: 2024年程序员学习 flink 学习 大数据
先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7
深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
如果你需要这些资料,可以添加V获取:vip204888 (备注大数据)
vim /etc/profile
添加内容:
export HADOOP\_CONF\_DIR=/export/server/hadoop/etc/hadoop
# 执行生效
source /etc/profile
6)、将Flink依赖Hadoop 框架JAR包上传至/export/server/flink-standalone/lib目录
[root@node1 ~]# cd /export/server/flink-standalone/lib/
[root@node1 lib]# rz
commons-cli-1.4.jar
flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
7)、分发到集群其他机器
scp -r /export/server/flink-standalone root@node2:/export/server
scp -r /export/server/flink-standalone root@node3:/export/server
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
接下来,启动服务进程,运行批处理程序:词频统计WordCount。
1)、启动HDFS集群,在node1上执行如下命令
start-dfs.sh
2)、启动集群,执行如下命令
# 一键启动所有服务JobManager和TaskManagers
[root@node1 ~]# /export/server/flink-standalone/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node1.
Starting taskexecutor daemon on host node1.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
3)、访问Flink UI界面:http://node1:8081/#/overview
4)、执行官方测试案例
# 准备测试数据
[root@node1 ~]# hdfs dfs -mkdir -p /wordcount/input/
[root@node1 ~]# hdfs dfs -put /root/words.txt /wordcount/input/
运行程序,使用--input指定处理数据文件路径
/export/server/flink-standalone/bin/flink run \
/export/server/flink-standalone/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input/words.txt
# 使用--output指定处理结果数据存储目录
/export/server/flink-standalone/bin/flink run \
/export/server/flink-standalone/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input/words.txt \
--output hdfs://node1:8020/wordcount/output/result
[root@node1 ~]# hdfs dfs -text /wordcount/output/result
5)、关闭Standalone集群服务
# 一键停止所有服务JobManager和TaskManagers
[root@node1 ~]# /export/server/flink-standalone/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 6600) on host node1.
Stopping taskexecutor daemon (pid: 3016) on host node2.
Stopping taskexecutor daemon (pid: 3034) on host node3.
Stopping standalonesession daemon (pid: 6295) on host node1.
补充:Flink Standalone集群启动与停止,也可以逐一服务启动
# 每个服务单独启动
# 在node1上启动
/export/server/flink-standalone/bin/jobmanager.sh start
# 在node1、node2、node3.
/export/server/flink-standalone/bin/taskmanager.sh start # 每台机器执行
# ===============================================================
# 每个服务单独停止
# 在node1上停止
/export/server/flink-standalone/bin/jobmanager.sh stop
# 在node1、node2、node3
/export/server/flink-standalone/bin/taskmanager.sh stop
从Standalone架构图中,可发现JobManager存在单点故障(SPOF
),一旦JobManager出现意外,整个集群无法工作。为了确保集群的高可用,需要搭建Flink的Standalone HA。
Flink Standalone HA集群,类似YARN HA 集群安装部署,可以启动多个主机点JobManager,使用Zookeeper集群监控JobManagers转态,进行选举leader,实现自动故障转移。
在 Zookeeper 的协助下,一个 Standalone的Flink集群会同时有多个活着的 JobManager,其中**只有一个处于Active工作状态,其他处于 Standby 状态。**当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选一个新的 JobManager 来接管 Flink 集群。
1)、集群规划
# 在node1上复制一份standalone
[root@node1 ~]# cd /export/server/
[root@node1 server]# cp -r flink-standalone flink-ha
# 删除日志文件
[root@node1 ~]# rm -rf /export/server/flink-ha/log/\*
2)、启动ZooKeeper,在node1上启动
start-zk.sh
3)、启动HDFS,在node1上启动,如果没有关闭,不用重启
start-dfs.sh
4)、停止集群,在node1操作,进行HA高可用配置
/export/server/flink-standalone/bin/stop-cluster.sh
5)、修改flink-conf.yaml,在node1操作
vim /export/server/flink-ha/conf/flink-conf.yaml
修改内容:
jobmanager.rpc.address: node1
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_standalone
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink/checkpoints
state.savepoints.dir: hdfs://node1:8020/flink/savepoints
6)、修改masters,在node1操作
vim /export/server/flink-ha/conf/masters
修改内容:
node1:8081
node2:8081
7)、分发到集群其他机器,在node1操作
scp -r /export/server/flink-ha root@node2:/export/server/
scp -r /export/server/flink-ha root@node3:/export/server/
8)、修改node2上的flink-conf.yaml
[root@node2 ~]# vim /export/server/flink-ha/conf/flink-conf.yaml
修改内容:33 行
jobmanager.rpc.address: node2
9)、重新启动Flink集群
# node1和node2上执行
/export/server/flink-ha/bin/jobmanager.sh start
# node1和node2、node3执行
/export/server/flink-ha/bin/taskmanager.sh start # 每台机器执行
在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的Workload,因此 Flink 也支持在 Yarn 集群运行。
为什么使用
Flink on Yarn或Spark on Yarn?
当应用程序(MR、Spark、Flink)运行在YARN集群上时,可以实现容灾恢复。
Flink on YARN安装配置,此处考虑高可用HA配置,集群机器安装软件框架示意图:
1)、关闭YARN的内存检查(node1
操作)
# yarn-site.xml中添加配置
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
添加如下内容:
<!-- 关闭yarn内存检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
2)、 配置Application最大的尝试次数(node1
操作)
# yarn-site.xml中添加配置
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
添加如下内容:
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>
3)、同步yarn-site.xml配置文件(node1
操作)
cd /export/server/hadoop/etc/hadoop
scp -r yarn-site.xml root@node2:$PWD
scp -r yarn-site.xml root@node3:$PWD
4)、启动HDFS集群和YARN集群(node1
操作)
[root@node1 ~]# start-dfs.sh
[root@node1 ~]# start-yarn.sh
5)、添加HADOOP_CONF_DIR
环境变量(集群所有机器)
# 添加环境变量
vim /etc/profile
添加内容:
export HADOOP\_CONF\_DIR=/export/server/hadoop/etc/hadoop
环境变量生效
source /etc/profile
6)、上传软件及解压(node1
操作)
[root@node1 ~]# cd /export/software/
[root@node1 software]# rz
上传软件包:flink-1.13.1-bin-scala_2.11.tgz
[root@node1 software]# chmod u+x flink-1.13.1-bin-scala\_2.11.tgz
[root@node1 software]# tar -zxf flink-1.13.1-bin-scala\_2.11.tgz -C /export/server/
[root@node1 ~]# cd /export/server/
[root@node1 server]# chown -R root:root flink-1.13.1
[root@node1 server]# mv flink-1.13.1 flink-yarn
7)、将Flink依赖Hadoop 框架JAR包上传至/export/server/flink-yarn/lib
目录
[root@node1 ~]# cd /export/server/flink-yarn/lib/
[root@node1 lib]# rz
commons-cli-1.4.jar
flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
8)、配置HA高可用,依赖Zookeeper及重试次数(node1
操作)
# 修改配置文件
vim /export/server/flink-yarn/conf/flink-conf.yaml
添加如下内容:
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/yarn-ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability.zookeeper.path.root: /flink-yarn-ha
high-availability.cluster-id: /cluster_yarn
yarn.application-attempts: 10
9)、集群所有机器,同步分发Flink 安装包,便于任意机器提交运行Flink Job。
scp -r /export/server/flink-yarn root@node2:/export/server/
scp -r /export/server/flink-yarn root@node3:/export/server/
10)、启动Zookeeper集群(node1
操作)
start-zk.sh
在Flink中执行应用有如下三种部署模式(Deployment Mode):
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/28d9a859154f9ba2e2fd90d2110735.png#pic_center
Flink on YARN :Session 模式
,表示多个Flink Job运行共享Standalone集群资源。
先向Hadoop YARN申请资源,启动运行服务JobManager和TaskManagers,再提交多个Job到Flink 集群上执行。
Session 会话模式:arn-session.sh(开辟资源) + flink run(提交任务)
准备测试数据,测试运行批处理词频统计WordCount程序
[root@node1 ~]# vim /root/words.txt
添加数据
spark python spark hive spark hive
python spark hive spark python
mapreduce spark hadoop hdfs hadoop spark
hive mapreduce
数据文件上传
[root@node1 ~]# hdfs dfs -mkdir -p /wordcount/input/
[root@node1 ~]# hdfs dfs -put /root/words.txt /wordcount/input/
export HADOOP\_CLASSPATH=`hadoop classpath`
/export/server/flink-yarn/bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 2
# 参数说明
-d:后台执行
-s: 每个TaskManager的slot数量
-jm:JobManager的内存(单位MB)
-tm:每个TaskManager容器的内存(默认值:MB)
# 提交flink 集群运行yarn后,提示信息
JobManager Web Interface: http://node1:44263
..................................................................
$ echo "stop" | ./bin/yarn-session.sh -id application_1633441564219_0001
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1633441564219_0001
此时,没有任何TaskManager运行在容器Container中,需要等待有Flink Job提交执行时,才运行TaskManager。
/export/server/flink-yarn/bin/flink run \
-t yarn-session \
-Dyarn.application.id=application_1652168669227_0001 \
/export/server/flink-yarn/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input/words.txt
# 优雅 停止应用,如果设置重启次数,即使停止应用,也会重启,一直到超过次数以后,才能真正停止应用
echo "stop" | /export/server/flink-yarn/bin/yarn-session.sh -id application_1633441564219_0001
# kill 命令,直接将运行在yarn应用杀死,毫不留情
yarn application -kill application_1633441564219_0001
每个Flink Job提交运行到Hadoop YARN集群时,根据自身的情况,单独向YARN申请资源,直到作业执行完成
在Hadoop YARN中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。
采用Job分离模式,每个Flink Job运行,都会申请资源,运行属于自己的Flink 集群。
export HADOOP\_CLASSPATH=`hadoop classpath`
/export/server/flink-yarn/bin/flink run \
-t yarn-per-job -m yarn-cluster \
-yjm 1024 -ytm 1024 -ys 1 \
/export/server/flink-yarn/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input
# 参数说明
-m:指定需要连接的jobmanager(主节点)地址,指定为 yarn-cluster,启动一个新的yarn-session
-yjm:JobManager可用内存,单位兆
-ytm:每个TM所在的Container可申请多少内存,单位兆
-ys:每个TM会有多少个Slot
-yd:分离模式(后台运行,不指定-yd, 终端会卡在提交的页面上)
解决办法: 在 flink 配置文件里 flink-conf.yaml设置
classloader.check-leaked-classloader: false
Flink 1.11 引入了一种新的部署模式,即 Application 模式,目前可以支持基于 Hadoop YARN 和 Kubernetes 的 Application 模式。
# 1、Session 模式:
所有作业Job共享1个集群资源,隔离性差,JM 负载瓶颈,每个Job中main 方法在客户端执行。
# 2、Per-Job 模式:
每个作业单独启动1个集群,隔离性好,JM 负载均衡,Job作业main 方法在客户端执行。
以上两种模式,main方法都是在客户端执行,需要获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。此外,提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应的临时目录,带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。
Application 模式下,用户程序的 main 方法将在集群中运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。
Application 模式为每个提交的应用程序创建一个集群,并在应用程序完成时终止。Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main 可以[节省所需的 CPU 周期],还可以[节省本地下载依赖项所需的带宽]。
Application 模式==使用 bin/flink run-application提交作业,本质上是Session和Per-Job模式的折衷。
export HADOOP\_CLASSPATH=`hadoop classpath`
/export/server/flink-yarn/bin/flink run-application \
-t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=1 \
/export/server/flink-yarn/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input
由于MAIN方法在JobManager(也就是NodeManager的容器Container)中执行,当Flink Job执行完成以后,启动MRJobHistoryServer
历史服务器,查看AppMaster日志信息。
# node1 上启动历史服务
[root@node1 ~]# mr-jobhistory-daemon.sh start historyserver
第二步、查看UI界面:http://node1:8088/cluster
测试Flink Job不同运行模式时,注意事项如下
基于Flink计算引擎,分别实现批处理(Batch)和流计算(Streaming )中:词频统计WordCount。
第一点:Flink API== ,提供四个层次API,越在下面API,越复杂和灵活;越在上面API,使用越简单和抽象
第二点:编程模型==,无论编写批处理还是流计算程序,分为三个部分:Data Source、Transformations和Data Sink
# 第一步、从数据源DataSource获取数据
流计算:DataStream
批处理:DataSet
# 第二步、对数据进行转换处理
# 第三步、结果数据输出DataSink
无论批处理Batch,还是流计算Stream,首先需要创建执行环境ExecutionEnvironment对象
,类似Spark中SparkSession
或者SparkContext
。
创建整个Flink基础课程Maven Project,设置MAVEN Repository仓库目录及Maven安装目录
约定:每天创建一个Maven Module](),创建第1天Maven Module,模块结构:
POM文件添加如下内容:
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
<repository>
<id>central_maven</id>
<name>central maven</name>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn\_spark.jar META-INF/\*.RSA META-INF/\*.DSA META-INF/\*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
日志配置文件:log4j.properties
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console
# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
首先,基于Flink计算引擎,[实现离线批处理Batch:从文本文件读取数据,词频统计]。
批处理时词频统计思路如下伪代码所示:
spark flink flink flink spark
|
| flatMap
|
3-1. 分割单词 spark, flink, flink, flink, spark
|
| map
|
3-2. 转换二元组 (spark, 1) (flink, 1) (flink, 1) (flink, 1) (spark, 1)
|
| groupBy(0)
|
3-3. 按照单词分组
spark -> [(spark, 1) (spark, 1)]
flink -> [(flink, 1) (flink, 1) (flink, 1) ]
|
|sum(1)
|
3-4. 组内数据求和,第二元素值累加
spark -> 1 + 1 = 2
flink -> 1 + 1 + 1 =3
基于Flink编写批处理或流计算程序步骤如下:(5个步骤)
1.执行环境-env
2.数据源-source
3.数据转换-transformation
4.数据接收器-sink
5.触发执行-execute
编写批处理词频统计:BatchWordCount
,创建Java类
package cn.itqzd.flink.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/\*\*
\* 使用Flink计算引擎实现离线批处理:词频统计WordCount
\* 1.执行环境-env
\* 2.数据源-source
\* 3.数据转换-transformation
\* 4.数据接收器-sink
\* 5.触发执行-execute
\*/
public class BatchWordCount {
public static void main(String[] args) throws Exception {
// 1.执行环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment() ;
// 2.数据源-source
DataSource<String> inputDataSet = env.readTextFile("datas/words.txt");
// 3.数据转换-transformation
/\*
spark flink spark hbase spark
|flatMap
分割单词: spark, flink, spark
|map
转换二元组:(spark, 1) (flink, 1) (spark, 1), TODO:Flink Java API中提供元组类Tuple
|groupBy(0)
分组:spark -> [(spark, 1), (spark, 1)] flink -> [(flink, 1)]
|sum(1)
求和:spark -> 1 + 1 = 2, flink = 1
\*/
// 3-1. 分割单词
FlatMapOperator<String, String> wordDataSet = inputDataSet.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.trim().split("\\s+");
for (String word : words) {
out.collect(word);
}
}
});
// 3-2. 转换二元组
MapOperator<String, Tuple2<String, Integer>> tupleDataSet = wordDataSet.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 3-3. 分组及求和, TODO: 当数据类型为元组时,可以使用下标指定元素,从0开始
AggregateOperator<Tuple2<String, Integer>> resultDataSet = tupleDataSet.groupBy(0).sum(1);
// 4.数据接收器-sink
resultDataSet.print();
// 5.触发执行-execute, TODO:批处理时,无需触发,流计算必须触发执行
//env.execute("BatchWordCount") ;
}
}
编写Flink程序,接收TCP Socket的单词数据,并以空格进行单词拆分,分组统计单词个数。
package cn.itqzd.flink.stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/\*\*
\* 使用Flink计算引擎实现实时流计算:词频统计WordCount,从TCP Socket消费数据,结果打印控制台。
\* 1.执行环境-env
\* 2.数据源-source
\* 3.数据转换-transformation
\* 4.数据接收器-sink
\* 5.触发执行-execute
\*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1.执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.数据源-source
DataStreamSource<String> inputDataStream = env.socketTextStream("node1", 9999);
// 3.数据转换-transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
// 3-1. 分割单词
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.trim().split("\\s+")) {
out.collect(word);
}
}
})
// 3-2. 转换二元组
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<>(word, 1);
}
})
// 3-3. 分组和组内求和
.keyBy(0).sum(1);
**网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**
**需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)**
![img](https://img-blog.csdnimg.cn/img_convert/1b81a88fce4c35e9b0e26ce532d00816.png)
**一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
.执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.数据源-source
DataStreamSource<String> inputDataStream = env.socketTextStream("node1", 9999);
// 3.数据转换-transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
// 3-1. 分割单词
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.trim().split("\\s+")) {
out.collect(word);
}
}
})
// 3-2. 转换二元组
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<>(word, 1);
}
})
// 3-3. 分组和组内求和
.keyBy(0).sum(1);
**网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**
**需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)**
[外链图片转存中...(img-ogttqxTk-1713686056777)]
**一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
文章浏览阅读645次。这个肯定是末尾的IDAT了,因为IDAT必须要满了才会开始一下个IDAT,这个明显就是末尾的IDAT了。,对应下面的create_head()代码。,对应下面的create_tail()代码。不要考虑爆破,我已经试了一下,太多情况了。题目来源:UNCTF。_攻防世界困难模式攻略图文
文章浏览阅读2.9k次,点赞3次,收藏10次。偶尔会用到,记录、分享。1. 数据库导出1.1 切换到dmdba用户su - dmdba1.2 进入达梦数据库安装路径的bin目录,执行导库操作 导出语句:./dexp cwy_init/[email protected]:5236 file=cwy_init.dmp log=cwy_init_exp.log 注释: cwy_init/init_123..._达梦数据库导入导出
文章浏览阅读1.9k次。1. 在官网上下载KindEditor文件,可以删掉不需要要到的jsp,asp,asp.net和php文件夹。接着把文件夹放到项目文件目录下。2. 修改html文件,在页面引入js文件:<script type="text/javascript" src="./kindeditor/kindeditor-all.js"></script><script type="text/javascript" src="./kindeditor/lang/zh-CN.js"_kindeditor.js
文章浏览阅读2.3k次,点赞6次,收藏14次。SPI的详情简介不必赘述。假设我们通过SPI发送0xAA,我们的数据线就会变为10101010,通过修改不同的内容,即可修改SPI中0和1的持续时间。比如0xF0即为前半周期为高电平,后半周期为低电平的状态。在SPI的通信模式中,CPHA配置会影响该实验,下图展示了不同采样位置的SPI时序图[1]。CPOL = 0,CPHA = 1:CLK空闲状态 = 低电平,数据在下降沿采样,并在上升沿移出CPOL = 0,CPHA = 0:CLK空闲状态 = 低电平,数据在上升沿采样,并在下降沿移出。_stm32g431cbu6
文章浏览阅读1.2k次,点赞2次,收藏8次。数据链路层习题自测问题1.数据链路(即逻辑链路)与链路(即物理链路)有何区别?“电路接通了”与”数据链路接通了”的区别何在?2.数据链路层中的链路控制包括哪些功能?试讨论数据链路层做成可靠的链路层有哪些优点和缺点。3.网络适配器的作用是什么?网络适配器工作在哪一层?4.数据链路层的三个基本问题(帧定界、透明传输和差错检测)为什么都必须加以解决?5.如果在数据链路层不进行帧定界,会发生什么问题?6.PPP协议的主要特点是什么?为什么PPP不使用帧的编号?PPP适用于什么情况?为什么PPP协议不_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输
文章浏览阅读587次。软件测试工程师移民加拿大 无证移民,未受过软件工程师的教育(第1部分) (Undocumented Immigrant With No Education to Software Engineer(Part 1))Before I start, I want you to please bear with me on the way I write, I have very little gen...
文章浏览阅读304次。Thinkpad X250笔记本电脑,装的是FreeBSD,进入BIOS修改虚拟化配置(其后可能是误设置了安全开机),保存退出后系统无法启动,显示:secure boot failed ,把自己惊出一身冷汗,因为这台笔记本刚好还没开始做备份.....根据错误提示,到bios里面去找相关配置,在Security里面找到了Secure Boot选项,发现果然被设置为Enabled,将其修改为Disabled ,再开机,终于正常启动了。_安装完系统提示secureboot failure
文章浏览阅读10w+次,点赞93次,收藏352次。1、用strtok函数进行字符串分割原型: char *strtok(char *str, const char *delim);功能:分解字符串为一组字符串。参数说明:str为要分解的字符串,delim为分隔符字符串。返回值:从str开头开始的一个个被分割的串。当没有被分割的串时则返回NULL。其它:strtok函数线程不安全,可以使用strtok_r替代。示例://借助strtok实现split#include <string.h>#include <stdio.h&_c++ 字符串分割
文章浏览阅读2.3k次。1 .高斯日记 大数学家高斯有个好习惯:无论如何都要记日记。他的日记有个与众不同的地方,他从不注明年月日,而是用一个整数代替,比如:4210后来人们知道,那个整数就是日期,它表示那一天是高斯出生后的第几天。这或许也是个好习惯,它时时刻刻提醒着主人:日子又过去一天,还有多少时光可以用于浪费呢?高斯出生于:1777年4月30日。在高斯发现的一个重要定理的日记_2013年第四届c a组蓝桥杯省赛真题解答
文章浏览阅读851次,点赞17次,收藏22次。摘要:本文利用供需算法对核极限学习机(KELM)进行优化,并用于分类。
文章浏览阅读1.1k次。一、系统弱密码登录1、在kali上执行命令行telnet 192.168.26.1292、Login和password都输入msfadmin3、登录成功,进入系统4、测试如下:二、MySQL弱密码登录:1、在kali上执行mysql –h 192.168.26.129 –u root2、登录成功,进入MySQL系统3、测试效果:三、PostgreSQL弱密码登录1、在Kali上执行psql -h 192.168.26.129 –U post..._metasploitable2怎么进入
文章浏览阅读257次。本文将为初学者提供Python学习的详细指南,从Python的历史、基础语法和数据类型到面向对象编程、模块和库的使用。通过本文,您将能够掌握Python编程的核心概念,为今后的编程学习和实践打下坚实基础。_python人工智能开发从入门到精通pdf