轻松入门进阶Flink第六课 项目实战实时电商数据大屏-kafka生产消费_flink实时电商项目运用场景-程序员宅基地

技术标签: flink  kafka  大数据  教程  

第22讲:项目背景和整体架构设计

从这一课时开始我们进入实战课程的学习。本项目是一个模拟实时电商数据大屏,本课时先介绍该项目的背景、架构设计和技术选型。

背景

我们在第 01 课时“Flink 的应用场景和架构模型”中提到过,Flink 应用最广的一个场景便是实时计算大屏。每年的双十一、618 电商大促等,各大公司的实时数据战报和数据大屏是一道亮丽的风景线。

01.jpeg

实时大屏对数据有非常高的稳定性和精确性要求,特别是面向公众第三方的数据大屏,同时要求高吞吐、低延迟、极高的稳定性和绝对零误差。随时电商大促的成交记录一次次被刷新,背后是下单、支付、发货高达几万甚至十几万的峰值 QPS。

在面向实际运营的数据大屏中,需要提供高达几十种维度的数据,每秒的数据量高达千万甚至亿级别,这对于我们的实时计算架构提出了相当高的要求。那么我们的大屏背后的实时处理在这种数据量规模如何才能达到高吞吐、低延迟、极高的稳定性和绝对零误差的呢?

技术选型和整体架构

image (2).png
  典型的实时计算大屏服务的背后技术架构图

在上图的架构图中,涉及几个关键的技术选型,我们下面一一进行讲解。

业务库 Binlog 同步利器——Canal

我们的实时计算架构一般是基于业务数据进行的,但无论是实时计算大屏还是常规的数据分析报表,都不能影响业务的正常进行,所以这里需要引入消息中间件或增量同步框架 Canal。

我们生产环境中的业务数据绝大多数都是基于 MySQL 的,所以需要一个能够实时监控 MySQL 业务数据变化的工具。Canal 是阿里巴巴开源的数据库 Binlog 日志解析框架,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

image (3).png

Canal 的原理也非常简单,它会伪装成一个数据库的从库,来读取 Binlog 并进行解析。关于 Canal 的更多资料,你可以参考这里

解耦和海量数据支持——Kafka

在实时大屏的技术架构下,我们的数据源绝大多数情况下都是消息。我们需要一个强大的消息中间件来支撑高达几十万 QPS,同时支持海量数据存储。

首先,我们为什么需要引入消息中间件?主要是下面三个目的:

  • 同步变异步

  • 应用解耦

  • 流量削峰

在我们的架构中,为了和业务数据互相隔离,需要使用消息中间件进行解耦从而互不影响。另外在双十一等大促场景下,交易峰值通常出现在某一个时间段,这个时间段系统压力陡增,数据量暴涨,消息中间件还起到了削峰的作用。

为什么选择 Kafka?

Kafka 是最初由 Linkedin 公司开发,是一个分布式、高吞吐、多分区的消息中间件。Kafka 经过长时间的迭代和实践检验,因为其独特的优点已经成为目前主流的分布式消息引擎,经常被用作企业的消息总线、实时数据存储等。

Kafka 从众多的消息中间件中脱颖而出,主要是因为高吞吐、低延迟的特点;另外基于 Kafka 的生态越来越完善,各个实时处理框架包括 Flink 在消息处理上都会优先进行支持。在第 14 课时“Flink Exactly-once 实现原理解析”中提到了 Flink 和 Kafka 结合实现端到端精确一次语义的原理。

Kafka 作为大数据生态系统中已经必不可少的一员,主要的特性如下所示。

  • 高吞吐:可以满足每秒百万级别消息的生产和消费,并且可以通过横向扩展,保证数据处理能力可以得到线性扩展。

  • 低延迟:以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。

  • 高容错:Kafka 允许集群的节点出现失败。

  • 可靠性:消息可以根据策略进行磁盘的持久化,并且读写效率都很高。

  • 生态丰富:Kafka 周边生态极其丰富,与各个实时处理框架结合紧密。

实时计算服务——Flink

Flink 在当前的架构中主要承担了消息消费、维表关联、消息发送等,我们在之前的课程中多次提到过 Flink 的优势,主要包括:

  • 状态管理,实时数仓里面会进行很多的聚合计算,这些都需要对状态进行访问和管理,Flink 支持强大的状态管理;

  • 丰富的 API,Flink 提供极为丰富的多层次 API,包括 Stream API、Table API 及 Flink SQL;

  • 生态完善,实时数仓的用途广泛,Flink 支持多种存储(HDFS、ES 等);

  • 批流一体,Flink 已经在将流计算和批计算的 API 进行统一。

对于 Flink 的一些特点我们不做过多展开了。这里需要注意的是,Flink 在消费完成后一般会把计算结果数据发往三个方向:

  • 高度汇总,高度汇总指标一般存储在 Redis、HBase 中供前端直接查询使用;

  • 明细数据,在一些场景下,我们的运营和业务人员需要查询明细数据,有一些明细数据极其重要,比如双十一派送的包裹中会有一些丢失和破损;

  • 实时消息,Flink 在计算完成后,有一个下游是发往消息系统,这里的作用主要是提供给其他业务复用;另外,在一些情况下,我们计算好明细数据也需要再次经过消息系统才能落库,将原来直接落库拆成两步,方便我们进行问题定位和排查。

百花齐放——OLAP 数据库选择

OLAP 的选择是当前实时架构中最有争议和最困难的。目前市面上主流的开源 OLAP 引擎包含但不限于:Hive、Hawq、Presto、Kylin、Impala、SparkSQL、Druid、Clickhouse、Greeplum 等,可以说目前没有一个引擎能在数据量,灵活程度和性能上做到完美,用户需要根据自己的需求进行选型。

我个人曾经在之前写过的博客中用了两万字分析了目前市面上主流的 OLAP 数据库的选型问题,这里直接给出结论:

  • Hive、Hawq、Impala:基于 SQL on Hadoop

  • Presto 和 Spark SQL 类似:基于内存解析 SQL 生成执行计划

  • Kylin:用空间换时间、预计算

  • Druid:数据实时摄入加实时计算

  • ClickHouse:OLAP 领域的 HBase,单表查询性能优势巨大

  • Greenpulm:OLAP 领域的 PostgreSQL

如果你的场景是基于 HDFS 的离线计算任务,那么 Hive、Hawq 和 Imapla 就是你的调研目标。

如果你的场景解决分布式查询问题,有一定的实时性要求,那么 Presto 和 SparkSQL 可能更符合你的期望。

如果你的汇总维度比较固定,实时性要求较高,可以通过用户配置的维度 + 指标进行预计算,那么不妨尝试 Kylin 和 Druid。

ClickHouse 则在单表查询性能上独领风骚,远超过其他的 OLAP 数据库。

Greenpulm 作为关系型数据库产品,性能可以随着集群的扩展线性增长,更加适合进行数据分析。

实时大屏方案和指标

在我们本次的案例中,将针对实时大屏中几个重要的指标进行计算,其中包括实时交易额、销售额排名 TOPN 等指标。

整个课程的设计包含以下几个部分:

  • 实时数据的模拟

  • Flink 消费 Kafka 数据开发

  • Flink 中的业务逻辑开发

  • Flink 计算结果写入 Redis 等

  • 其他

在这个案例中,我们还会从原理和源码层面讲解 Flink 消费 Kafka 的方式和注意事项等。

点击这里下载本课程源码

总结

这一课时我们讲解了 Flink 实时大屏的架构设计和技术选型,其中涉及 Binlog 增量同步、消息中间件 Kafka 的优点、OLAP 的技术选型等。在接下来的课程中,我们会一一讲解这些知识点。通过本课时,你将学习实时数据处理和大屏展示背后的技术架构和实现,在面对相似的业务场景时也可以根据我们本课时的技术选型和架构进行灵活处理。


第23讲:Mock Kafka 消息并发送

本课时主要讲解 Kafka 的一些核心概念,以及模拟消息并发送。

大数据消息中间件的王者——Kafka

在上一课时中提过在实时计算的场景下,我们绝大多数的数据源都是消息系统。所以,一个强大的消息中间件来支撑高达几十万的 QPS,以及海量数据存储就显得极其重要。

Kafka 从众多的消息中间件中脱颖而出,主要是因为高吞吐低延迟的特点;另外基于 Kafka 的生态越来越完善,各个实时处理框架包括 Flink 在消息处理上都会优先进行支持。在第 14 课时“Flink Exactly-once 实现原理解析”中提到 Flink 和 Kafka 结合实现端到端精确一次语义的原理。

Kafka 从众多的消息中间件中脱颖而出,已经成为大数据生态系统中必不可少的一员,主要的特性包括:

  • 高吞吐

  • 低延迟

  • 高容错

  • 可靠性

  • 生态丰富

为了接下来更好地理解和使用 Kafka,我们首先来看一下 Kafka 中的核心概念和基本入门。

Kafka 核心概念

Kafka 是一个消息队列,生产者向消息队列中写入数据,消费者从队列中获取数据并进行消费。作为一个企业级的消息中间件,Kafka 会支持庞大的业务,不同的业务会有多个队列,我们用 Topic 来给队列命名,在使用 Kafka 时必须指定 Topic。

我们可以认为一个 Topic 就是一个队列,每个 Topic 又会被分成多个 Partition,这样做是为了横向扩展,提高吞吐量。

Kafka 中每个 Partition 都对应一个 Broker,一个 Broker 可以管理多个 Partition。举个例子,假如 Kafka 的某个 Topic 有 10 个 Partition、2 个 Broker,那么每个 Broker 就会管理 5 个 Partition。我们可以把 Partition 简单理解为一个文件,在接收生产者的数据时,需要将数据动态追加到 Partition 上。

生产者会决定将数据写入哪个 Partition,消费者自己维护消费数据的位置,我们称为 Offset

1.png

同时,Kafka 提供了时间策略对过期的消息进行处理。

Kafka 的每个消费者都有一个消费组来进行标识,同一个消费组的不同实例分布在多个进程或者多个机器上。

Kafka 的源数据存储在 ZooKeeper 中,其中包含 Broker、Topic、Partition 等信息。在 0.8 版本之前,Kafka 还会将消费的 Offset 存储在 ZooKeeper 中。

此外,ZooKeeper 还负责集群的 Broker 选举,以及所有 Topic 的 Partition 副本信息等。

Kafka 连接 Flink

我们在第 12 课时“Flink 常用的 Source 和 Connector”中提过,Flink 中支持了比较丰富的用来连接第三方的连接器,Kafka Connector 是 Flink 支持的各种各样的连接器中比较完善的之一。

Flink 提供了专门的 Kafka 连接器,向 Kafka Topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 机制,可提供 exactly-once 的处理语义。为此,Flink 并不完全依赖于跟踪 Kafka 消费组的偏移量,而是在内部跟踪和检查偏移量。

同时也提过,我们在使用 Kafka 连接器时需要引用相对应的 Jar 包依赖。对于某些连接器比如 Kafka 是有版本要求的,一定要去官方网站找到对应的依赖版本。

我在下表中给出了不同版本的 Kafka,以及对应的 Connector 关系:

2.png

Kafka 本地环境搭建

我们在本地环境搭建一个 Kafka_2.11-2.1.0 版本的 Kafka 单机环境,然后模拟一些数据写入到队列中。

我们可以在这里下载对应版本的 Kafka,把压缩包进行解压,然后使用下面的命令启动单机版本的 Kafka。

解压:

> tar -xzf kafka_2.11-2.1.0.tgz
> cd kafka_2.11-2.1.0

启动 ZooKeeper 和 Kafka Server:

启动ZK:nohup bin/zookeeper-server-start.sh config/zookeeper.properties  &
启动Server: 
nohup bin/kafka-server-start.sh config/server.properties &

创建一个名为 test 的 Topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Kafka Producer

首先我们需要新增一个依赖,然后向名为 test 的 Topic 中写入数据。

新增 Maven 依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

向 test 这个 Topic 中写入数据:

public class KafkaProducer {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(5000);
        DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        // 2.0 配置 KafkaProducer
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
                "127.0.0.1:9092", //broker 列表
                "test",           //topic
                new SimpleStringSchema()); // 消息序列化
    <span class="hljs-comment">//写入 Kafka 时附加记录的事件时间戳</span>
    producer.setWriteTimestampToKafka(<span class="hljs-keyword">true</span>);
    text.addSink(producer);
    env.execute();
}

}

需要注意的是,我们这里使用了一个自定义的 MyNoParalleSource 类,该类使用了 Flink 提供的自定义 Source 方法,该方法会源源不断地产生一些测试数据,代码如下:

public class MyNoParalleSource implements SourceFunction<String> {
    //private long count = 1L;
    private boolean isRunning = true;
    /**
     * 主要的方法
     * 启动一个source
     * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while(isRunning){
            //图书的排行榜
            List<String> books = new ArrayList<>();
            books.add("Pyhton从入门到放弃");//10
            books.add("Java从入门到放弃");//8
            books.add("Php从入门到放弃");//5
            books.add("C++从入门到放弃");//3
            books.add("Scala从入门到放弃");
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));
            //每2秒产生一条数据
            Thread.sleep(2000);
        }
    }
    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}

我们在后面会使用这个方法来模拟生产中的订单数据,并进行接续处理;然后通过下面的命令就可以查看本地 Kafka 的 test 这个 Topic 中的数据:

image (2).png

至此,我们就成功地向 Kafka 中写入数据了。

源码解析

FlinkKafkaProducer 的代码十分简洁,首先继承了 TwoPhaseCommitSinkFunction,在第 14 课时“Flink Exactly-once 实现原理解析”中详细讲解过,这个类是 Flink 和 Kafka 结合实现精确一次处理语义的关键。

FlinkProducer 提供了 6 种构造方法,我们可以根据需要选择不同的构造函数:

public FlinkKafkaProducer011(
            String brokerList, 
            String topicId, 
            SerializationSchema<IN> serializationSchema);
 
public FlinkKafkaProducer011(
            String topicId, 
            SerializationSchema<IN> serializationSchema, 
            Properties producerConfig);
 
public FlinkKafkaProducer011(
            String topicId, 
            SerializationSchema<IN> serializationSchema, 
            Properties producerConfig, 
            Optional<FlinkKafkaPartitioner<IN>> customPartitioner);
 
public FlinkKafkaProducer011(
            String brokerList, 
            String topicId, 
            KeyedSerializationSchema<IN> serializationSchema);
 
public FlinkKafkaProducer011(
            String topicId, 
            KeyedSerializationSchema<IN> serializationSchema, 
            Properties producerConfig);
 
public FlinkKafkaProducer011(
            String topicId, 
            KeyedSerializationSchema<IN> serializationSchema, 
            Properties producerConfig, 
            Semantic semantic);
 
public FlinkKafkaProducer011(
            String defaultTopicId, 
            KeyedSerializationSchema<IN> serializationSchema, 
            Properties producerConfig, 
            Optional<FlinkKafkaPartitioner<IN>> customPartitioner);
 
public FlinkKafkaProducer011(
			String defaultTopicId,
			KeyedSerializationSchema<IN> serializationSchema,
			Properties producerConfig,
			Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
			Semantic semantic,
			int kafkaProducersPoolSize);

这里有个特别需要注意的属性:FlinkKafkaPartitioner,这个类定义了数据写入 Kafka 的规则,如果用户没有指定,则会默认 FlinkFixedPartitioner,核心处理逻辑如下:

public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
...
   @Override
   public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
      Preconditions.checkArgument(
         partitions != null && partitions.length > 0,
         "Partitions of the target topic is empty.");
      return partitions[parallelInstanceId % partitions.length];
   }
...
}

此外,FlinkProducer 还封装了 beginTransaction、preCommit、commit、abort 等方法,这几个方法便是实现精确一次处理语义的关键。

总结

本课时我们介绍了 Kafka 的核心概念,可以对其有一个全面的了解,并且还搭建了单机版的 Kafka 环境,使用自定义的数据源向 Kafka 中写入数据,最后从源码层面介绍了 FlinkProducer 的核心实现。通过本课时的学习,你可以对 Kafka 有全面的了解,并且能够使用 Kafka 连接器发送消息。


第24讲:Flink 消费 Kafka 数据业务开发

在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为高吞吐低延迟的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费 Kafka 中的数据方式和源码实现。

Flink 如何消费 Kafka

Flink 在和 Kafka 对接的过程中,跟 Kafka 的版本是强相关的。上一课时也提到了,我们在使用 Kafka 连接器时需要引用相对应的 Jar 包依赖,对于某些连接器比如 Kafka 是有版本要求的,一定要去官方网站找到对应的依赖版本。

我们本地的 Kafka 版本是 2.1.0,所以需要对应的类是 FlinkKafkaConsumer。首先需要在 pom.xml 中引入 jar 包依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

下面将对 Flink 消费 Kafka 数据的方式进行分类讲解。

消费单个 Topic

上一课时我们在本地搭建了 Kafka 环境,并且手动创建了名为 test 的 Topic,然后向名为 test 的 Topic 中写入了数据。

那么现在我们要消费这个 Topic 中的数据,该怎么做呢?

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.enableCheckpointing(5000);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
    // 如果你是0.8版本的Kafka,需要配置
    //properties.setProperty("zookeeper.connect", "localhost:2181");
    //设置消费组
    properties.setProperty("group.id", "group_test");
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
    //设置从最早的ffset消费
    consumer.setStartFromEarliest();
    //还可以手动指定相应的 topic, partition,offset,然后从指定好的位置开始消费
    //HashMap<KafkaTopicPartition, Long> map = new HashMap<>();
    //map.put(new KafkaTopicPartition("test", 1), 10240L);
    //假如partition有多个,可以指定每个partition的消费位置
    //map.put(new KafkaTopicPartition("test", 2), 10560L);
    //然后各个partition从指定位置消费
    //consumer.setStartFromSpecificOffsets(map);
    env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String value, Collector<String> out) throws Exception {
            System.out.println(value);
        }
    });
    env.execute("start consumer...");
}

在设置消费 Kafka 中的数据时,可以显示地指定从某个 Topic 的每一个 Partition 中进行消费。

消费多个 Topic

我们的业务中会有这样的情况,同样的数据根据类型不同发送到了不同的 Topic 中,比如线上的订单数据根据来源不同分别发往移动端和 PC 端两个 Topic 中。但是我们不想把同样的代码复制一份,需重新指定一个 Topic 进行消费,这时候应该怎么办呢?

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 如果你是0.8版本的Kafka,需要配置
//properties.setProperty("zookeeper.connect", "localhost:2181");
//设置消费组
properties.setProperty("group.id", "group_test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
ArrayList<String> topics = new ArrayList<>();
        topics.add("test_A");
        topics.add("test_B");
       // 传入一个 list,完美解决了这个问题
        FlinkKafkaConsumer<Tuple2<String, String>> consumer = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties);
...

我们可以传入一个 list 来解决消费多个 Topic 的问题,如果用户需要区分两个 Topic 中的数据,那么需要在发往 Kafka 中数据新增一个字段,用来区分来源。

消息序列化

我们在上述消费 Kafka 消息时,都默认指定了消息的序列化方式,即 SimpleStringSchema。这里需要注意的是,在我们使用 SimpleStringSchema 的时候,返回的结果中只有原数据,没有 topic、parition 等信息,这时候可以自定义序列化的方式来实现自定义返回数据的结构。

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
    //是否表示流的最后一条元素,设置为false,表示数据会源源不断地到来
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }
    //这里返回一个ConsumerRecord<String,String>类型的数据,除了原数据还包括topic,offset,partition等信息
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new ConsumerRecord<String, String>(
                record.topic(),
                record.partition(),
                record.offset(),
                new String(record.key()),
                new String(record.value())
        );
    }
    //指定数据的输入类型
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
    }
}

这里自定义了 CustomDeSerializationSchema 信息,就可以直接使用了。

Parition 和 Topic 动态发现

在很多场景下,随着业务的扩展,我们需要对 Kafka 的分区进行扩展,为了防止新增的分区没有被及时发现导致数据丢失,消费者必须要感知 Partition 的动态变化,可以使用 FlinkKafkaConsumer 的动态分区发现实现。

我们只需要指定下面的配置,即可打开动态分区发现功能:每隔 10ms 会动态获取 Topic 的元数据,对于新增的 Partition 会自动从最早的位点开始消费数据。

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");

如果业务场景需要我们动态地发现 Topic,可以指定 Topic 的正则表达式:

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(Pattern.compile("^test_([A-Za-z0-9]*)$"), new SimpleStringSchema(), properties);
Flink 消费 Kafka 设置 offset 的方法

Flink 消费 Kafka 需要指定消费的 offset,也就是偏移量。Flink 读取 Kafka 的消息有五种消费方式:

  • 指定 Topic 和 Partition

  • 从最早位点开始消费

  • 从指定时间点开始消费

  • 从最新的数据开始消费

  • 从上次消费位点开始消费

/**
* Flink从指定的topic和parition中指定的offset开始
*/
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("test", 0), 10000L);
offsets.put(new KafkaTopicPartition("test", 1), 20000L);
offsets.put(new KafkaTopicPartition("test", 2), 30000L);
consumer.setStartFromSpecificOffsets(offsets);
/**
* Flink从topic中最早的offset消费
*/
consumer.setStartFromEarliest();
/**
* Flink从topic中指定的时间点开始消费
*/
consumer.setStartFromTimestamp(1559801580000l);
/**
* Flink从topic中最新的数据开始消费
*/
consumer.setStartFromLatest();
/**
* Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
*/
consumer.setStartFromGroupOffsets();

源码解析

Drawing 0.png

从上面的类图可以看出,FlinkKafkaConsumer 继承了 FlinkKafkaConsumerBase,而 FlinkKafkaConsumerBase 最终是对 SourceFunction 进行了实现。

整体的流程:FlinkKafkaConsumer 首先创建了 KafkaFetcher 对象,然后 KafkaFetcher 创建了 KafkaConsumerThread 和 Handover,KafkaConsumerThread 负责直接从 Kafka 中读取 msg,并交给 Handover,然后 Handover 将 msg 传递给 KafkaFetcher.emitRecord 将消息发出。

因为 FlinkKafkaConsumerBase 实现了 RichFunction 接口,所以当程序启动的时候,会首先调用 FlinkKafkaConsumerBase.open 方法:

public void open(Configuration configuration) throws Exception {
   // 指定offset的提交方式
   this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
         getIsAutoCommitEnabled(),
         enableCommitOnCheckpoints,
         ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
   // 创建分区发现器
   this.partitionDiscoverer = createPartitionDiscoverer(
         topicsDescriptor,
         getRuntimeContext().getIndexOfThisSubtask(),
         getRuntimeContext().getNumberOfParallelSubtasks());
   this.partitionDiscoverer.open();
   subscribedPartitionsToStartOffsets = new HashMap<>();
   final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
   if (restoredState != null) {
      for (KafkaTopicPartition partition : allPartitions) {
         if (!restoredState.containsKey(partition)) {
            restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
         }
      }
      for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
         if (!restoredFromOldState) {
        <span class="hljs-keyword">if</span> (KafkaTopicPartitionAssigner.assign(
           restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
              == getRuntimeContext().getIndexOfThisSubtask()){
           subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
        }
     } <span class="hljs-keyword">else</span> {
       subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
     }
  }
  <span class="hljs-keyword">if</span> (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
     subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -&gt; {
        <span class="hljs-keyword">if</span> (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
           LOG.warn(
              <span class="hljs-string">"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution."</span>,
              entry.getKey());
           <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;
        }
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;
     });
  }
  LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading {} partitions with offsets in restored state: {}"</span>,
     getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);

} else {

  <span class="hljs-keyword">switch</span> (startupMode) {
     <span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
        <span class="hljs-keyword">if</span> (specificStartupOffsets == <span class="hljs-keyword">null</span>) {
           <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
              <span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.SPECIFIC_OFFSETS +
                 <span class="hljs-string">", but no specific offsets were specified."</span>);
        }
        <span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
           Long specificOffset = specificStartupOffsets.get(seedPartition);
           <span class="hljs-keyword">if</span> (specificOffset != <span class="hljs-keyword">null</span>) {
                             subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - <span class="hljs-number">1</span>);
           } <span class="hljs-keyword">else</span> {
           subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
           }
        }
        <span class="hljs-keyword">break</span>;
     <span class="hljs-keyword">case</span> TIMESTAMP:
        <span class="hljs-keyword">if</span> (startupOffsetsTimestamp == <span class="hljs-keyword">null</span>) {
           <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalStateException(
              <span class="hljs-string">"Startup mode for the consumer set to "</span> + StartupMode.TIMESTAMP +
                 <span class="hljs-string">", but no startup timestamp was specified."</span>);
        }
        <span class="hljs-keyword">for</span> (Map.Entry&lt;KafkaTopicPartition, Long&gt; partitionToOffset
              : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
           subscribedPartitionsToStartOffsets.put(
              partitionToOffset.getKey(),
              (partitionToOffset.getValue() == <span class="hljs-keyword">null</span>)
                  KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                    : partitionToOffset.getValue() - <span class="hljs-number">1</span>);
        }
        <span class="hljs-keyword">break</span>;
     <span class="hljs-keyword">default</span>:
        <span class="hljs-keyword">for</span> (KafkaTopicPartition seedPartition : allPartitions) {
           subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
        }
  }
  <span class="hljs-keyword">if</span> (!subscribedPartitionsToStartOffsets.isEmpty()) {
     <span class="hljs-keyword">switch</span> (startupMode) {
        <span class="hljs-keyword">case</span> EARLIEST:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> LATEST:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> TIMESTAMP:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              startupOffsetsTimestamp,
              subscribedPartitionsToStartOffsets.keySet());
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> SPECIFIC_OFFSETS:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              specificStartupOffsets,
              subscribedPartitionsToStartOffsets.keySet());
           List&lt;KafkaTopicPartition&gt; partitionsDefaultedToGroupOffsets = <span class="hljs-keyword">new</span> ArrayList&lt;&gt;(subscribedPartitionsToStartOffsets.size());
           <span class="hljs-keyword">for</span> (Map.Entry&lt;KafkaTopicPartition, Long&gt; subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
              <span class="hljs-keyword">if</span> (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                 partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
              }
           }
           <span class="hljs-keyword">if</span> (partitionsDefaultedToGroupOffsets.size() &gt; <span class="hljs-number">0</span>) {
              LOG.warn(<span class="hljs-string">"Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}"</span> +
                    <span class="hljs-string">"; their startup offsets will be defaulted to their committed group offsets in Kafka."</span>,
                 getRuntimeContext().getIndexOfThisSubtask(),
                 partitionsDefaultedToGroupOffsets.size(),
                 partitionsDefaultedToGroupOffsets);
           }
           <span class="hljs-keyword">break</span>;
        <span class="hljs-keyword">case</span> GROUP_OFFSETS:
           LOG.info(<span class="hljs-string">"Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}"</span>,
              getRuntimeContext().getIndexOfThisSubtask(),
              subscribedPartitionsToStartOffsets.size(),
              subscribedPartitionsToStartOffsets.keySet());
     }
  } <span class="hljs-keyword">else</span> {
     LOG.info(<span class="hljs-string">"Consumer subtask {} initially has no partitions to read from."</span>,
        getRuntimeContext().getIndexOfThisSubtask());
  }

}
}

对 Kafka 中的 Topic 和 Partition 的数据进行读取的核心逻辑都在 run 方法中:

public void run(SourceContext<T> sourceContext) throws Exception {
   if (subscribedPartitionsToStartOffsets == null) {
      throw new Exception("The partitions were not set for the consumer");
   }
   this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
   this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
   final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
   this.offsetCommitCallback = new KafkaCommitCallback() {
      @Override
      public void onSuccess() {
         successfulCommits.inc();
      }
      @Override
      public void onException(Throwable cause) {
         LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
         failedCommits.inc();
      }
   };

if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
LOG.info(“Consumer subtask {} creating fetcher with offsets {}.”,
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);

this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
if (!running) {
return;
}
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
kafkaFetcher.runFetchLoop();
} else {
runWithPartitionDiscovery();
}
}

Flink 消费 Kafka 数据代码

上面介绍了 Flink 消费 Kafka 的方式,以及消息序列化的方式,同时介绍了分区和 Topic 的动态发现方法,那么回到我们的项目中来,消费 Kafka 数据的完整代码如下:

public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(5000);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        //设置消费组
        properties.setProperty("group.id", "group_test");
        properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        //设置从最早的ffset消费
        consumer.setStartFromEarliest();
        env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                System.out.println(value);
            }
        });
        env.execute("start consumer...");
    }
}

我们可以直接右键运行代码,在控制台中可以看到数据的正常打印,如下图所示:

Drawing 1.png

通过代码可知,我们之前发往 Kafka 的消息被完整地打印出来了。

总结

这一课时介绍了 Flink 消费 Kafka 的方式,比如从常用的指定单个或者多个 Topic、消息的序列化、分区的动态发现等,还从源码上介绍了 Flink 消费 Kafka 的原理。通过本课时的学习,相信你可以对 Flink 消费 Kafka 有一个较为全面地了解,根据业务场景可以正确选择消费的方式和配置。


第25讲:Flink 中 watermark 的定义和使用

第 08 课时我们提过窗口和时间的概念,Flink 框架支持事件时间、摄入时间和处理时间三种。Watermark(水印)的出现是用于处理数据从 Source 产生,再到转换和输出,在这个过程中由于网络和反压的原因导致了消息乱序问题。

那么在实际的开发过程中,如何正确地使用 Watermark 呢?

使用 Watermark 必知必会

Watermark 和事件时间

事件时间(Event Time)是数据产生的时间,这个时间一般在数据中自带,由消息的生产者生成。例如,我们的上游是 Kafka 消息,那么每个生成的消息中自带一个时间戳代表该条数据的产生时间,这个时间是固定的,从数据的诞生开始就一直携带。所以,我们在处理消息乱序的情况时,会用 EventTime 和 Watermark 进行配合使用。

我们只需要一行代码,就可以在代码中指定 Flink 系统使用的时间类型为 EventTime:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

那么为什么不用处理时间(Processing Time)和摄入时间(Ingestion Time)呢?

处理时间(Processing Time)指的是数据被 Flink 框架处理时机器的系统时间,这个时间本身存在不确定性,比如因为网络延迟等原因。

摄入时间(Ingestion Time)理论上处于事件时间(Event Time)和处理时间(Processing Time)之间,可以用来防止 Flink 内部处理数据发生乱序的情况,但是无法解决数据进入 Flink 之前的乱序行为。

所以我们一般都会用 EventTime、WaterMark 和窗口配合使用来解决消息的乱序和延迟问题。

水印的本质是时间戳

水印的本质是一个一个的时间戳,这个时间戳存在 DataStream 的数据流中,Watermark 的生成方式有两种:

  • AssignerWithPeriodicWatermarks 生成周期水印,周期默认的时间是 200ms;

  • AssignerWithPunctuatedWatermarks 按需生成水印。

当 Flink 系统中出现了一个 Watermark T,那么就意味着 EventTime <= T 的数据都已经到达。当 Wartermark T 通过窗口后,后续到来的迟到数据就会被丢弃。

窗口触发和乱序时间

Flink 在用时间 + 窗口 + 水印来解决实际生产中的数据乱序问题,有如下的触发条件:

  • watermark 时间 >= window_end_time;

  • 在 [window_start_time,window_end_time) 中有数据存在,这个窗口是左闭右开的。

但是有些业务场景需要我们等待一段时间,也就是接受一定范围的迟到数据,此时 allowedLateness 的设置就显得尤为重要。简单地说,allowedLateness 的设置就是对于那些水印通过窗口的结束时间后,还允许等待一段时间。

如果业务中的实际数据因为网络原因,乱序现象非常严重,allowedLateness 允许迟到的时间如果设置太小,则会导致很多次极少量数据触发窗口计算,严重影响数据的正确性。

Flink 消费 Kafka 保证消息有序

我们在第 23 课时“Mock Kafka 消息并发送”中提过,可以认为 Kafka 中的一个 Topic 就是一个队列,每个 Topic 又会被分成多个 Partition,每个 Partition 中的消息是有序的。但是有的业务场景需要我们保障所有 Partition 中的消息有序,一般情况下需要把 Partition 的个数设置为一个,但这种情况是不能接受的,会严重影响数据的吞吐量。

但是,Flink 消费 Kafka 时可以做到数据的全局有序,也可以多个 Partition 并发消费,这就是 Flink 中的 Kafka-partition-aware 特性。

我们在使用这种特性生成水印时,水印会在 Flink 消费 Kafka 的消费端生成,并且每个分区的时间戳严格升序。当数据进行 Shuffle 时,水印的合并机制会产生全局有序的水印。

image (2).png

我们从上图中可以看出,每个生成的水印是如何在多个分区的数据中进行传递的。

代码实现如下:

FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("topic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});
DataStream<MyType> stream = env.addSource(kafkaSource);

Flink 预定义的时间戳提取器和水印发射器

Flink 本身提供了两个预定义实现类去生成水印:

  • AscendingTimestampExtractor 时间戳递增

  • BoundedOutOfOrdernessTimestampExtractor 处理乱序消息和延迟时间

AscendingTimestampExtractor 递增时间戳提取器

AscendingTimestampExtractor 是周期性生成水印的一个简单实现,这种方式会产生严格递增的水印。它的实现如下:

public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
...
   public AscendingTimestampExtractor<T> withViolationHandler(MonotonyViolationHandler handler) {
      this.violationHandler = requireNonNull(handler);
      return this;
   }
   @Override
   public final long extractTimestamp(T element, long elementPrevTimestamp) {
      final long newTimestamp = extractAscendingTimestamp(element);
      if (newTimestamp >= this.currentTimestamp) {
         this.currentTimestamp = newTimestamp;
         return newTimestamp;
      } else {
         violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
         return newTimestamp;
      }
   }
   @Override
   public final Watermark getCurrentWatermark() {
      return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
   }
...
}

该种水印的生成方式适用于那些数据本身的时间戳在每个并行的任务中是单调递增的,例如,我们上面使用 AscendingTimestampExtractor 处理 Kafka 多个 Partition 的情况。

一个简单的案例如下所示:

DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
        @Override
        public long extractAscendingTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});
BoundedOutOfOrdernessTimestampExtractor 允许特定数量延迟的提取器

我们在上面提过有些业务场景需要等待一段时间,也就是接受一定范围的迟到数据,此时 allowedLateness 的设置就显得尤为重要。这种提取器也是周期性生成水印的实现,接受 allowedLateness 作为参数。

它的实现如下:

public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
...
   private final long maxOutOfOrderness;
   public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
      if (maxOutOfOrderness.toMilliseconds() < 0) {
         throw new RuntimeException("Tried to set the maximum allowed " +
            "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
      }
      this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
      this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
   }
   public long getMaxOutOfOrdernessInMillis() {
      return maxOutOfOrderness;
   }
...
   @Override
   public final Watermark getCurrentWatermark() {
      // this guarantees that the watermark never goes backwards.
      long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
      if (potentialWM >= lastEmittedWatermark) {
         lastEmittedWatermark = potentialWM;
      }
      return new Watermark(lastEmittedWatermark);
   }
   @Override
   public final long extractTimestamp(T element, long previousElementTimestamp) {
      long timestamp = extractTimestamp(element);
      if (timestamp > currentMaxTimestamp) {
         currentMaxTimestamp = timestamp;
      }
      return timestamp;
   }
}

BoundedOutOfOrdernessTimestampExtractor 的构造器接收 maxOutOfOrderness 这个参数,该参数是指定我们接收的消息允许滞后的最大时间。

案例

下面是一个接收 Kafka 消息进行处理,自定义窗口和水印的案例:

public class WindowWaterMark {
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> <span class="hljs-keyword">throws</span> Exception </span>{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    <span class="hljs-comment">//设置为eventtime事件类型</span>
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    <span class="hljs-comment">//设置水印生成时间间隔100ms</span>
    env.getConfig().setAutoWatermarkInterval(<span class="hljs-number">100</span>);
    DataStream&lt;String&gt; dataStream = env
            .socketTextStream(<span class="hljs-string">"127.0.0.1"</span>, <span class="hljs-number">9000</span>)
            .assignTimestampsAndWatermarks(<span class="hljs-keyword">new</span> AssignerWithPeriodicWatermarks&lt;String&gt;() {
                <span class="hljs-keyword">private</span> Long currentTimeStamp = <span class="hljs-number">0L</span>;
                <span class="hljs-comment">//设置允许乱序时间</span>
                <span class="hljs-keyword">private</span> Long maxOutOfOrderness = <span class="hljs-number">5000L</span>;
                <span class="hljs-meta">@Override</span>
                <span class="hljs-function"><span class="hljs-keyword">public</span> Watermark <span class="hljs-title">getCurrentWatermark</span><span class="hljs-params">()</span> </span>{
                    <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Watermark(currentTimeStamp - maxOutOfOrderness);
                }
                <span class="hljs-meta">@Override</span>
                <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">long</span> <span class="hljs-title">extractTimestamp</span><span class="hljs-params">(String s, <span class="hljs-keyword">long</span> l)</span> </span>{
                    String[] arr = s.split(<span class="hljs-string">","</span>);
                    <span class="hljs-keyword">long</span> timeStamp = Long.parseLong(arr[<span class="hljs-number">1</span>]);
                    currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                    System.err.println(s + <span class="hljs-string">",EventTime:"</span> + timeStamp + <span class="hljs-string">",watermark:"</span> + (currentTimeStamp - maxOutOfOrderness));
                    <span class="hljs-keyword">return</span> timeStamp;
                }
            });
    dataStream.map(<span class="hljs-keyword">new</span> MapFunction&lt;String, Tuple2&lt;String, Long&gt;&gt;() {
        <span class="hljs-meta">@Override</span>
        <span class="hljs-function"><span class="hljs-keyword">public</span> Tuple2&lt;String, Long&gt; <span class="hljs-title">map</span><span class="hljs-params">(String s)</span> <span class="hljs-keyword">throws</span> Exception </span>{
            String[] split = s.split(<span class="hljs-string">","</span>);
            <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Tuple2&lt;String, Long&gt;(split[<span class="hljs-number">0</span>], Long.parseLong(split[<span class="hljs-number">1</span>]));
        }
    })
            .keyBy(<span class="hljs-number">0</span>)
            .window(TumblingEventTimeWindows.of(Time.seconds(<span class="hljs-number">5</span>)))
            .aggregate(<span class="hljs-keyword">new</span> AggregateFunction&lt;Tuple2&lt;String,Long&gt;, Object, Object&gt;() {
                ...
            })
            .print();
    env.execute(<span class="hljs-string">"WaterMark Test Demo"</span>);
}<span class="hljs-comment">//</span>

}

在这个案例中,我们使用的 AssignerWithPeriodicWatermarks 来自定义水印发射器和时间戳提取器,设置允许乱序时间为 5 秒,并且在一个 5 秒的窗口内进行聚合计算。
在这个案例中,可以看到如何正确使用 Flink 提供的 API 进行水印和时间戳的设置。

总结

这一课时讲解了生产环境中正确使用 Watermark 需要注意的事项,并且介绍了如何保证 Kafka 消息的全局有序,Flink 中自定义的时间戳提取器和水印发射器;最后用一个案例讲解了如何正确使用水印和设置乱序事件。通过这一课时你可以学习到生产中设置水印的正确方法和原理。


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/fegus/article/details/124603499

智能推荐

c# 调用c++ lib静态库_c#调用lib-程序员宅基地

文章浏览阅读2w次,点赞7次,收藏51次。四个步骤1.创建C++ Win32项目动态库dll 2.在Win32项目动态库中添加 外部依赖项 lib头文件和lib库3.导出C接口4.c#调用c++动态库开始你的表演...①创建一个空白的解决方案,在解决方案中添加 Visual C++ , Win32 项目空白解决方案的创建:添加Visual C++ , Win32 项目这......_c#调用lib

deepin/ubuntu安装苹方字体-程序员宅基地

文章浏览阅读4.6k次。苹方字体是苹果系统上的黑体,挺好看的。注重颜值的网站都会使用,例如知乎:font-family: -apple-system, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Microsoft YaHei, Source Han Sans SC, Noto Sans CJK SC, W..._ubuntu pingfang

html表单常见操作汇总_html表单的处理程序有那些-程序员宅基地

文章浏览阅读159次。表单表单概述表单标签表单域按钮控件demo表单标签表单标签基本语法结构<form action="处理数据程序的url地址“ method=”get|post“ name="表单名称”></form><!--action,当提交表单时,向何处发送表单中的数据,地址可以是相对地址也可以是绝对地址--><!--method将表单中的数据传送给服务器处理,get方式直接显示在url地址中,数据可以被缓存,且长度有限制;而post方式数据隐藏传输,_html表单的处理程序有那些

PHP设置谷歌验证器(Google Authenticator)实现操作二步验证_php otp 验证器-程序员宅基地

文章浏览阅读1.2k次。使用说明:开启Google的登陆二步验证(即Google Authenticator服务)后用户登陆时需要输入额外由手机客户端生成的一次性密码。实现Google Authenticator功能需要服务器端和客户端的支持。服务器端负责密钥的生成、验证一次性密码是否正确。客户端记录密钥后生成一次性密码。下载谷歌验证类库文件放到项目合适位置(我这边放在项目Vender下面)https://github.com/PHPGangsta/GoogleAuthenticatorPHP代码示例://引入谷_php otp 验证器

【Python】matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距-程序员宅基地

文章浏览阅读4.3k次,点赞5次,收藏11次。matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距

docker — 容器存储_docker 保存容器-程序员宅基地

文章浏览阅读2.2k次。①Storage driver 处理各镜像层及容器层的处理细节,实现了多层数据的堆叠,为用户 提供了多层数据合并后的统一视图②所有 Storage driver 都使用可堆叠图像层和写时复制(CoW)策略③docker info 命令可查看当系统上的 storage driver主要用于测试目的,不建议用于生成环境。_docker 保存容器

随便推点

网络拓扑结构_网络拓扑csdn-程序员宅基地

文章浏览阅读834次,点赞27次,收藏13次。网络拓扑结构是指计算机网络中各组件(如计算机、服务器、打印机、路由器、交换机等设备)及其连接线路在物理布局或逻辑构型上的排列形式。这种布局不仅描述了设备间的实际物理连接方式,也决定了数据在网络中流动的路径和方式。不同的网络拓扑结构影响着网络的性能、可靠性、可扩展性及管理维护的难易程度。_网络拓扑csdn

JS重写Date函数,兼容IOS系统_date.prototype 将所有 ios-程序员宅基地

文章浏览阅读1.8k次,点赞5次,收藏8次。IOS系统Date的坑要创建一个指定时间的new Date对象时,通常的做法是:new Date("2020-09-21 11:11:00")这行代码在 PC 端和安卓端都是正常的,而在 iOS 端则会提示 Invalid Date 无效日期。在IOS年月日中间的横岗许换成斜杠,也就是new Date("2020/09/21 11:11:00")通常为了兼容IOS的这个坑,需要做一些额外的特殊处理,笔者在开发的时候经常会忘了兼容IOS系统。所以就想试着重写Date函数,一劳永逸,避免每次ne_date.prototype 将所有 ios

如何将EXCEL表导入plsql数据库中-程序员宅基地

文章浏览阅读5.3k次。方法一:用PLSQL Developer工具。 1 在PLSQL Developer的sql window里输入select * from test for update; 2 按F8执行 3 打开锁, 再按一下加号. 鼠标点到第一列的列头,使全列成选中状态,然后粘贴,最后commit提交即可。(前提..._excel导入pl/sql

Git常用命令速查手册-程序员宅基地

文章浏览阅读83次。Git常用命令速查手册1、初始化仓库git init2、将文件添加到仓库git add 文件名 # 将工作区的某个文件添加到暂存区 git add -u # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,不处理untracked的文件git add -A # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,包括untracked的文件...

分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120-程序员宅基地

文章浏览阅读202次。分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120

【C++缺省函数】 空类默认产生的6个类成员函数_空类默认产生哪些类成员函数-程序员宅基地

文章浏览阅读1.8k次。版权声明:转载请注明出处 http://blog.csdn.net/irean_lau。目录(?)[+]1、缺省构造函数。2、缺省拷贝构造函数。3、 缺省析构函数。4、缺省赋值运算符。5、缺省取址运算符。6、 缺省取址运算符 const。[cpp] view plain copy_空类默认产生哪些类成员函数

推荐文章

热门文章

相关标签