Kafka生产者(producer)

生产者发送消息的基本流程

Kafka生产者(producer)

  • 从创建一个 ProducerRecord 对象开始, Producer Record 对象需要包含目标主题和要 发送的内容。我们还可以指定键或分区。在发送 ProducerReco rd 对象时,生产者要先把键 和值对象序列化成字节数组,这样它们才能够在网络上传输。

  • 接下来,数据被传给分区器。如果之前在 Producer Record 对象里指定了分区,那么分 区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 Producer Record 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和 分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息 会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

  • 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka ,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入 失败, 则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还 是失败,就返回错误信息。

使用 Kafka 生产者

三种发送方式

1、 发送并忘记

忽略 send 方法的返回值,不做任何处理。大多数情况下,消息会正常到达,而且生产 者会自动重试,但有时会丢失消息。

2、 同步非阻塞发送

获得 send 方法返回的 Future 对象,在合适的时候调用 Future 的 get 方法。参见代码, 模块 kafka-no-spring 下包 sendtype 中。

3、 异步发送

实现接口 org.apache.kafka.clients.producer.Callback,然后将实现类的实例作为参数传递 给 send 方法。参见代码,模块 kafka-no-spring 下包 sendtype 中。

生产者常用配置

生产者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内 存使用,性能和可靠性方面有较大影响。可以参考 org.apache.kafka.clients.producer 包下的 ProducerConfig 类。

1. acks:

指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参 数对消息丢失的可能性有重大影响。

acks=0:生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞 吐量高。

acks=1:只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消 息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误 响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新 首领,消息还是会丢失。默认使用这个配置。

acks=all:只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的 成功响应。延迟高。

2. buffer.memory

设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果数据产生 速度大于向 broker 发送的速度,导致生产者空间不足,producer 会阻塞或者抛出异常。缺 省 33554432 (32M)

3. max.block.ms

指定了在调用 send()方法或者使用 partitionsFor()方法获取元数据时生产者的阻塞时间。 当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达 到 max.block.ms 时,生产者会抛出超时异常。缺省 60000ms

4. retries

发送失败时,指定生产者可以重发消息的次数。默认情况下,生产者在每次重试之间等 待 100ms,可以通过参数 retry.backoff.ms 参数来改变这个时间间隔。缺省 0

5. receive.buffer.bytes 和 send.buffer.bytes

指定 TCP socket 接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系 统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨 数据中心的网络一般都有比较高的延迟和比较低的带宽。缺省 102400

6. batch.size

当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一 个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会 被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批 次也有可能被发送。缺省 16384(16k)

7. linger.ms

指定了生产者在发送批次前等待更多消息加入批次的时间。它和 batch.size 以先到者为 先。也就是说,一旦我们获得消息的数量够 batch.size 的数量了,他将会立即发送而不顾这 项设置,然而如果我们获得消息字节数比 batch.size 设置要小的多,我们需要“linger”特定的 时间以获取更多的消息。这个设置默认为 0,即没有延迟。设定 linger.ms=5,例如,将会减 少请求数目,但是同时会增加 5ms 的延迟,但也会提升消息的吞吐量。

8. compression.type

producer 用于压缩数据的压缩类型。默认是无压缩。正确的选项值是 none、gzip、snappy。 压缩最好用于批量处理,批量处理消息越多,压缩性能越好。snappy 占用 cpu 少,提供较 好的性能和可观的压缩比,如果比较关注性能和网络带宽,用这个。如果带宽紧张,用 gzip, 会占用较多的 cpu,但提供更高的压缩比。

9. client.id

当向 server 发出请求时,这个字符串会发送给 server。目的是能够追踪请求源头,以此 来允许 ip/port 许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因 为没有任何功能性的目的,除了记录和跟踪。

10. max.in.flight.requests.per.connection

指定了生产者在接收到服务器响应之前可以发送多个消息,值越高,占用的内存越大, 当然也可以提升吞吐量。发生错误时,可能会造成数据的发送顺序改变,默认是 5 (修改)。

如果需要保证消息在一个分区上的严格顺序,这个值应该设为 1。不过这样会严重影响 生产者的吞吐量。

11. request.timeout.ms

客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发 请求;超过重试次数将抛异常

12. metadata.fetch.timeout.ms

是指我们所获取的一些元数据的第一个时间数据。元数据包含:topic,host,partitions。 此项配置是指当等待元数据 fetch 成功完成所需要的时间,否则会跑出异常给客户端

13. timeout.ms

此配置选项控制 broker 等待副本确认的最大时间。如果确认的请求数目在此时间内没 有实现,则会返回一个错误。这个超时限制是以 server 端度量的,没有包含请求的网络延迟。 这个参数和 acks 的配置相匹配。

14. max.request.size

控制生产者发送请求最大大小。假设这个值为 1M,如果一个请求里只有一个消息,那 这个消息不能大于 1M,如果一次请求是一个批次,该批次包含了 1000 条消息,那么每个 消息不能大于 1KB。注意:broker 具有自己对消息记录尺寸的覆盖,如果这个尺寸小于生产 者的这个设置,会导致消息被拒绝。

消息顺序发送

Kafka 可以保证同一个分区里的消息是有序的。也就是说,如果生产者一定的顺序发送 消息, broker 就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。 在某些情况下, 顺序是非常重要的。例如,往一个账户存入 100 元再取出来,这个与先取 钱再存钱是截然不同的!不过,有些场景对顺序不是很敏感。

如果把 retires 设为非零整数,同时把 max.in.flight.request.per.connection 设为比 1 大的 数,那么,如果第一个批次消息写入失败,而第二个批次写入成功, broker 会重试写入第 一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。

一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所 以不建议把 retires 设为 0 。可以把 max.in.flight.request.per.connection 设为 1,这样在生产 者尝试发送第一批消息时,就不会有其他的消息发送给 broker 。不过这样会严重影响生产 者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

序列化和反序列化

Kafka为我们提供了不同类型的序列化器和反序列化器

内置序列化器

Kafka生产者(producer)

内置反序列化器

Kafka生产者(producer)

创建生产者对象必须指定序列化器,默认的序列化器并不能满足我们所有的场景。我们 完全可以自定义序列化器。只要实现 org.apache.kafka.common.serialization.Serializer 接口即 可。 示例代码如下:

/**
 * @Author: 张钰博
 * @Date: 2020/4/7 14:07
 * @Description: 自定义序列化器,序列化JavaBean
 */
//实现Kafka中的Serializer
public class SelfSerializer  implements Serializer<DemoUser> {
    //一般做相关配置
    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, DemoUser data) {
        try {
            byte[] name;
            int nameSize;
            if(data==null){
                return null;
            }
            if(data.getName()!=null){
                name = data.getName().getBytes("UTF-8");
                //字符串的长度
                nameSize = data.getName().length();
            }else{
                name = new byte[0];
                nameSize = 0;
            }
            /*id的长度4个字节,字符串的长度描述4个字节,
            字符串本身的长度nameSize个字节*/
            ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);
            buffer.putInt(data.getId());//4
            buffer.putInt(nameSize);//4
            buffer.put(name);//nameSize
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error serialize DemoUser:"+e);
        }
    }

    //一般用于释放资源
    @Override
    public void close() {

    }
}

反序列化

/**
 * @Author: 张钰博
 * @Date: 2020/4/7 14:21
 * @Description: 自定义反序列化器,反序列化JavaBean
 */
public class SelfDeserializer implements Deserializer<DemoUser> {


    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        //do nothing
    }

    @Override
    public DemoUser deserialize(String topic, byte[] data) {
        try {
            if(data==null){
                return null;
            }
            if(data.length<8){
                throw new SerializationException("Error data size.");
            }
            ByteBuffer buffer = ByteBuffer.wrap(data);
            int id;
            String name;
            int nameSize;
            id = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameByte = new byte[nameSize];
            buffer.get(nameByte);
            name = new String(nameByte,"UTF-8");
            return new DemoUser(id,name);
        } catch (Exception e) {
            throw new SerializationException("Error Deserializer DemoUser."+e);
        }

    }

    @Override
    public void close() {
        //do nothing
    }
}

提示:
有很多成熟的序列化框架,可以帮我们把自定义JavaBean序列化为数组,例如messagePack,Avro,Protobuf等。首先推荐Apache的Avro和Google的Protobuf。

分区

我们在新增ProducerRecord对象中可以看到,ProducerRecord包含了目标主题,键和值, Kafka 的消息都是一个个的键值对。键可以设置为默认的 null。

键的主要用途有两个:一,用来决定消息被写往主题的哪个分区,拥有相同键的消息将 被写往同一个分区,二,还可以作为消息的附加消息。

如果键值为 null,并且使用默认的分区器,分区器使用轮询算法将消息均衡地分布到各 个分区上。

如果键不为空,并且使用默认的分区器,Kafka 对键进行散列(Kafka 自定义的散列算法, 具体算法原理不知),然后根据散列值把消息映射到特定的分区上。很明显,同一个键总是 被映射到同一个分区。但是只有不改变主题分区数量的情况下,键和分区之间的映射才能保 持不变,一旦增加了新的分区,就无法保证了,所以如果要使用键来映射分区,那就要在创 建主题的时候把分区规划好,而且永远不要增加新分区。

1. 分区原因和策略

(1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;

(2)可以提高并发,因为可以以 Partition 为单位读写。

2. 分区的原则

我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。

Kafka生产者(producer)

(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;

(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;

(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后 面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

3. 自定义分区器

某些情况下,数据特性决定了需要进行特殊分区,比如电商业务,北京的业务量明显比 较大,占据了总业务量的 20%,我们需要对北京的订单进行单独分区处理,默认的散列分区 算法不合适了, 我们就可以自定义分区算法,对北京的订单单独处理,其他地区沿用散列 分区算法。或者某些情况下,我们用 value 来进行分区。 示例代码如下:

/**
 * @Author: 张钰博
 * @Date: 2020/4/7 14:34
 * @Description: 自定义分区器,以value作为分区
 */
public class SelfPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取某一个主题下的所有分区
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        //获取该主题下的所有分区数目
        int num = partitionInfos.size();
        //对num进行取模运算
        int parId = value.hashCode() % num;
        //将消息发送到该分区Id下
        return parId;
    }

    @Override
    public void close() {
        //do nothing
    }

    @Override
    public void configure(Map<String,?> configs) {
        //do nothing
    }

}

保障生产者数据高可用

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

Kafka生产者(producer)

1.副本数据同步策略

方案 优点 缺点
半数以上完成同步,就发送ack 延迟低 选举新的 leader 时,容忍 n 台 节点的故障,需要 2n+1 个副 本
全部完成同步,才发送 ack 选举新的 leader 时,容忍 n 台 节点的故障,需要 n+1 个副 本 延迟高

Kafka 选择了第二种方案,原因如下:

1.同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1 个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。

2.虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。

2. ISR策略

采用第二种方案之后,设想以下情景:leader 收到数据,所有 follower 都开始同步数据, 但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去, 直到它完成同步,才能发送 ack。这个问题怎么解决呢?

答: Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集 合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower 长时间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由

3.ACK应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失, 所以没必要等 ISR 中的 follower 全部接收成功。 所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡, 选择以下的配置。

acks 参数配置

0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还 没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;

1:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据;

acks = 1 数据丢失案例

Kafka生产者(producer)

-1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会 造成数据重复。

acks = -1 数据重复案例

Kafka生产者(producer)

4.故障处理细节

Log文件中的HW和LEO

Kafka生产者(producer)

LEO:指的是每个副本最大的 offset;

HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。

(1)follower 故障 follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘 记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。 等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重 新加入 ISR 了。

(2)leader 故障 leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的 数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

  • 注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

Exactly Once 语义

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被 发送一次,即 At Most Once 语义。

At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Least Once 可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说 交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。在 0.11 版 本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局 去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。 0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论 向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语 义,就构成了 Kafka 的 Exactly Once 语义。即:

At Least Once + 幂等性 = Exactly Once

要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。Kafka 的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在 初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而 Broker 端会对做缓存,当具有相同主键的消息提交时,Broker 只 会持久化一条。

但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨 分区跨会话的 Exactly Once。

酷客网相关文章:

赞(0)

评论 抢沙发

评论前必须登录!