Kafka消费者(consumer)

消费者和消费者群组、分区再均衡

消费者的含义,同一般消息中间件中消费者的概念。在高并发的情况下,生产者产生消 息的速度是远大于消费者消费的速度,单个消费者很可能会负担不起,此时有必要对消费者 进行横向伸缩,于是我们可以使用多个消费者从同一个主题读取消息,对消息进行分流。

1.消费者群组

Kafka 里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个 消费者接收主题一部分分区的消息。

Kafka消费者(consumer)

如上图,主题 T 有 4 个分区,群组中只有一个消费者,则该消费者将收到主题 T1 全部 4 个分区的消息。

Kafka消费者(consumer)

如上图,在群组中增加一个消费者 2,那么每个消费者将分别从两个分区接收消息,上 图中就表现为消费者 1 接收分区 1 和分区 3 的消息,消费者 2 接收分区 2 和分区 4 的消息。

Kafka消费者(consumer)

如上图,在群组中有 4 个消费者,那么每个消费者将分别从 1 个分区接收消息。

Kafka消费者(consumer)

但是,当我们增加更多的消费者,超过了主题的分区数量,就会有一部分的消费者被闲 置,不会接收到任何消息。 往消费者群组里增加消费者是进行横向伸缩能力的主要方式。

所以我们有必要为主题设 定合适规模的分区,在负载均衡的时候可以加入更多的消费者。但是要记住,一个群组里消 费者数量超过了主题的分区数量,多出来的消费者是没有用处的。

Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。 换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者。

最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。

2.分区再均衡

当消费者群组里的消费者发生变化,或者主题里的分区发生了变化,都会导致再均衡现 象的发生。从前面的知识中,我们知道,Kafka 中,存在着消费者对分区所有权的关系, 这样无论是消费者变化,比如增加了消费者,新消费者会读取原本由其他消费者读取的 分区,消费者减少,原本由它负责的分区要由其他消费者来读取,增加了分区,哪个消费者 来读取这个新增的分区,这些行为,都会导致分区所有权的变化,这种变化就被称为再均衡。

再均衡对 Kafka 很重要,这是消费者群组带来高可用性和伸缩性的关键所在。不过一般 情况下,尽量减少再均衡,因为再均衡期间,消费者是无法读取消息的,会造成整个群组一 小段时间的不可用。 消费者通过向称为群组协调器的 broker(不同的群组有不同的协调器)发送心跳来维持 它和群组的从属关系以及对分区的所有权关系。如果消费者长时间不发送心跳,群组协调器 认为它已经死亡,就会触发一次再均衡。 在 0.10.1 及以后的版本中,心跳由单独的线程负责,相关的控制参数为 max.poll.interval.ms。

3.消费者分区分配的过程

消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求,第一个加入群主的消 费者成为群主,群主会获得群组的成员列表,并负责给每一个消费者分配分区。分配完毕后, 群组把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者 只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程在每次再 均衡时都会发生。

4.小结

消费者订阅某个主题,读取消息。消费者读取消息的时候按照消息生成的顺序依次进行读取。

Q1:消费者如何判断这条消息是否被读取过呢?

答:偏移量,消费者通过检查偏移量来判断消息是否进行读取过。偏移量保存到zookeeper上或者Kafka上

当服务器宕机或者kafka重启后,可以根据上一次的偏移量来继续进行读取

Q2:多个消费者可以消费同一个消息么?

答:不可以,同一个群组是不能消费同一个消息。在不同群组内可以消费同一个消息

Kafka消费者(consumer)

注意:

  1. 分区数量决定了消费者的数量,消费者的数量一旦大于分区数量,则有一部分消费者是无法消费消息的
  2. 多个消费者会平均消费分区里面内容,假设有4个分区3个消费者,则其中有一个消费者就会多消费一个分区消息
  3. 分区再均衡由kafka内部实现,一般情况下, 要禁止分区在均衡。在kafka的集群发现要分区再均衡的时候,消费者是不能够读取消息,会在一小段时间内处于不可用状态。
  4. 消费者会定期向Kafka控制器发送心跳,证明还处于活动期。一旦某个消费者挂了以后,则kafka会进行分区再均衡,将原本属于他消费的分区交给别人进行消费

Kafka消费者(consumer)

消费方式

consumer 采用 pull(拉)模式从 broker 中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。 它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息, 典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适 当的速率消费消息。

pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数 据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有 数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

使用 Kafka 消费者

1.订阅

创建消费者后,使用 subscribe()方法订阅主题,这个方法接受一个主题列表为参数,也 可以接受一个正则表达式为参数;正则表达式同样也匹配多个主题。如果新创建了新主题, 并且主题名字和正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的 主题。比如,要订阅所有和 test 相关的主题,可以 subscribe(“tets.*”)

2.轮询

为了不断的获取消息,我们要在循环中不断的进行轮询,也就是不停调用 poll 方法。

poll 方法的参数为超时时间,控制 poll 方法的阻塞时间,它会让消费者在指定的毫秒数 内一直等待 broker 返回数据。poll 方法将会返回一个记录(消息)列表,每一条记录都包含 了记录所属的主题信息,记录所在分区信息,记录在分区里的偏移量,以及记录的键值对。

poll 方法不仅仅只是获取数据,在新消费者第一次调用时,它会负责查找群组,加入群 组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。

3.多线程下的消费者

KafkaConsumer 的实现不是线程安全的,所以我们在多线程的环境下,使用 KafkaConsumer 的实例要小心,应该每个消费数据的线程拥有自己的 KafkaConsumer 实例,KafkaConsumer 的实现不是线程安全的,所以我们在多线程的环境下,使用 KafkaConsumer 的实例要小心,应该每个消费数据的线程拥有自己的 KafkaConsumer 实例。代码如下

public class KafkaConConsumer {
    //创建两个定长线程
    private static ExecutorService executorService = Executors.newFixedThreadPool(2);
    private static class ConsumerWorker implements Runnable{
        //创建Kafka生产者
        private KafkaConsumer<String,String> consumer;
        //封装Kafka需要传入的配置参数
        public ConsumerWorker(Map<String, Object> config, String topic) {
            Properties properties = new Properties();
            properties.putAll(config);
            this.consumer = new KafkaConsumer<String, String>(properties);
            //consumer可以订阅多个Topic主题,此时我们只订阅1个主题
            consumer.subscribe(Collections.singletonList(topic));
        }
        @Override
        public void run() {
            //获取线程的Id和producer在内存中的地址所算出来的哈希值
            final String id = Thread.currentThread().getId() +"-"+System.identityHashCode(consumer);
            try {
                while(true){
                    //拉取间隔时间 单位(毫秒)
                    ConsumerRecords<String, String> records = consumer.poll(500);
                    //遍历集合中的消息
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println("线程Id:"+id+","+"主题Topic:"+record.topic()+","+"分区:"+record.partition()+","+"" + "偏移量:"+record.offset()+","+"键:"+record.key()+"值:"+record.value());
                    }
                }
            } finally {
                consumer.close();
            }
        }
    }

    public static void main(String[] args) {
        //消费配置的实例
        Map<String, Object> config = KafkaConst.consumerConfigMap("concurrent", StringDeserializer.class, StringDeserializer.class);
        for(int i = 0; i< BusiConst.CONCURRENT_PARTITIONS_COUNT; i++){
            executorService.submit(new ConsumerWorker(config, BusiConst.CONCURRENT_USER_INFO_TOPIC));
        }
    }


消费者配置

消费者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内 存使用,性能和可靠性方面有较大影响。可以参考 org.apache.kafka.clients.consumer 包下 ConsumerConfig 类。

  • 1.fetch.min.bytes

每次 fetch 请求时,server 应该返回的最小字节数。如果没有足够的数据返回,请求会 等待,直到足够的数据才会返回。缺省为 1 个字节。多消费者下,可以设大这个值,以降低 broker 的工作负载

  • fetch.wait.max.ms

    如果没有足够的数据能够满足 fetch.min.bytes,则此项配置是指在应答 fetch 请求之前, server 会阻塞的最大时间。缺省为 500 个毫秒。和上面的 fetch.min.bytes 结合起来,要么满 足数据的大小,要么满足时间,就看哪个条件先满足

  • max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认 1MB。假设一个主题有 20 个分区和 5 个消费者,那么每个消费者至少要有 4MB 的可用内存来接收记录,而且一旦 有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的 message.max.bytes 更大, 否则消费者可能无法读取消息。

  • session.timeout.ms

如果 consumer 在这段时间内没有发送心跳信息,则它会被认为挂掉了。默认 3 秒。

  • auto.offset.reset

消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理。默认值是 latest,从最新的记录开始读取,另一个值是 earliest,表示消费者从起始位置读取分区的记 录。

注意:默认值是 latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开 始读取数据(在消费者启动之后生成的记录),可以先启动生产者,再启动消费者,观察到 这种情况。

  • enable .auto.commit

默认值 true,表明消费者是否自动提交偏移。为了尽量避免重复数据和数据丢失,可以 改为 false,自行控制何时提交。

  • partition.assignment.strategy

分区分配给消费者的策略。系统提供两种策略。默认为 Range。允许自定义策略。

  • Range:

把主题的连续分区分配给消费者

  • RoundRobin:

把主题的分区循环分配给消费者。

自定义分区策略:

   /**
  *自定义分区器,以value作为分区
  */
 public class SelfPartitioner implements Partitioner {
     @Override
     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
         //获取某一个主题下的所有分区
         List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
         //获取该主题下的所有分区数目
         int num = partitionInfos.size();
         //对num进行取模运算
         int parId = value.hashCode()%num;
         //将消息发送到该分区Id下
         return parId;
     }

     @Override
     public void close() {
         //do nothing
     }

     @Override
     public void configure(Map<String, ?> configs) {
         //do nothing
     }

 }
  • client.id

当向 server 发出请求时,这个字符串会发送给 server。目的是能够追踪请求源头,以此 来允许 ip/port 许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因 为没有任何功能性的目的,除了记录和跟踪。

  • max.poll.records

控制每次 poll 方法返回的的记录数量。

  • receive.buffer.bytes 和 send.buffer.bytes

指定 TCP socket 接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系 统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨 数据中心的网络一般都有比较高的延迟和比较低的带宽。

分区分配策略

一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及 到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。

Kafka 有两种分配策略,一是 RoundRobin,一是 Range。

1. Round-Robin 轮训策略

Kafka默认分区策略

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。

Kafka消费者(consumer)

这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

2. Randomness 随机策略

所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示

Kafka消费者(consumer)

先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。 本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

3. Key-ordering 按消息键保序策略

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。

Kafka消费者(consumer)

保障消费者高可用

当消息变成已提交状态(也就是写入到所有in-sync副本)后,它才能被消费端读取。这保证了消费者读取到的数据始终是一致的,为了达到高可靠,消费者需要保证在消费消息时不丢失数据。

在处理分区消息时,消费者一般的处理流程为:拉取批量消息,处理完成后提交位移,然后再拉取下一批消息。提交位移保证了当前消费者发生故障或重启时,其他消费者可以接着上一次的消息位移来进行处理。需要提醒的是,消费端丢失消息的一个主要原因为:消费者拉取消息后还没处理完就提交位移,一旦在消息处理过程中发生故障,新的消费者会从已提交的位移接着处理,导致发生故障时的消息丢失。

下面来看下消费端处理流程中的一些需要注意的细节。

1. 重要的可靠性配置

如果希望设计一个高可靠的消费者,那么消费者中有4个重要的属性需要慎重考虑。

  • group.id

如果有多个消费者拥有相同的group.id并且订阅相同的主题,那么每个消费者会负责消费一部分的消息。如果消费组内存在多个消费者,那么一个消费者发生故障那么其他消费者可以接替其工作,保证高可用。

  • auto.offset.reset

当消费者读取消息,Kafka中没有提交的位移(比如消费者所属的消费组第一次启动)或者希望读取的位移不合法(比如消费组曾经长时间下线导致位移落后)时,消费者如何处理?当设置为earliest,消费者会从分区的起始端开始读取,这可能会导致消费者重复处理消息,但也将消息丢失可能性降低到最小;当设置为latest,消费者会从分区末端开始读取,这会导致消息丢失可能性加大,但会降低消息重复处理的概率。

  • enable.auto.commit

你希望消费者定期自动提交位移,还是应用手动提交位移?自动提交位移可以让应用在处理消息时不用实现提交位移的逻辑,并且如果我们是在poll循环中使用相同的线程处理消息,那么自动提交位移可以保证在消息处理完成后才提交位移。如果我们在poll循环中使用另外的线程处理消息,那么自动提交位移可能会导致提交还没完成处理的消息位移。

  • auto.commit.interval.ms

它与第三个属性有关。如果选择了自动提交位移,那么这个属性控制提交位移的时间间隔。默认值是5秒,通常来说降低间隔可以降低消息重复处理的可能性。

2. 需要注意的细节

  • 手动提交位移

如果我们选择手动提交位移,下面来根据不同场景来讨论如何实现更可靠的消费者。

  • 处理完消息后立即提交

如果在poll循环中进行消息处理,并且处理完后提交位移,那么提交位移的实现方式非常简单。对于这种场景,可以考虑使用自动提交而不是手动提交。

  • 在处理消息过程中多次提交

消费者拉取批量消息后处理消息时,在处理过程中可以使用手动提交位移方式来多次更新位移。这种方式可以使得消息重复处理可能性降低。不过在这个场景中,如果不加以注意,那么可能会提交上一次拉取的最大位移而不是当前已经处理的消息位移。

  • 消费者的重试

在某些场景下,消费者拉取消息后进行处理时会遇到一些问题,可能希望这些消息可以延迟处理。比如,对于从Kafka拉取消息然后持久化到数据库的应用来说,如果某个时刻数据库不可用,我们可能希望延后重试。延后重试的策略可以分成如下两大类:

第一种处理方式是,我们提交已经处理成功的位移,然后将处理失败的消息存储到一个缓冲区,并不断进行重试处理这些消息。另外,在处理这些消息时可能poll循环仍然在继续,我们可以使用pause()方法来使得poll不会返回新的数据,这样使得重试更容易。

第二种处理方式是,我们把处理失败的消息写入到另外的主题,然后继续处理当前的消息。对于失败消息的主题,我们既可以使用同一个消费组进行处理,也可以使用不同的消费组进行处理。这种主题类似于其他消息系统使用的死信队列(dead-letter-queue)。

  • 持久化状态

在某些场景下,我们可能需要在拉取消息时维护状态。比如,对于计算滑动平均数(moving average),我们每次拉取新消息时需要更新相应的平均数。当消费者重启时,我们不但需要从上一次提交的位移开始消费,同时还需要从相应的滑动平均数中恢复。一种处理方式是,我们提交位移时将滑动平均数写入到一个用于保存结果的主题,这样应用重启时可以获取上一次的处理结果。但由于Kafka不支持多操作的事务性,因此这种方式并不严谨。我们当然可以自己加以处理,但这个问题解决起来比较复杂,建议可以使用Kafka Streams这样的开源库。

  • 消息处理时间长

某些应用拉取消息回来后处理消息时间比较长(比如依赖于一个阻塞服务或者进行复杂的计算),而某些版本的消费者如果长时间不poll消息会导致会话超时,因此使用这些版本的应用需要不断的拉取消息来发送心跳包到broker。一种常见的处理方式是,我们使用多线程来处理消息,然后当前线程调用pause()来使得既可以调用poll()而且消费者不会拉取新的消息;当消息处理完成后,再调用resume()来使得消费者恢复正常拉取逻辑。

  • 有且仅有一次的语义

Kafka不支持有且仅有一次的语义,但可以支持至少一次的语义。因此对于需要实现有且仅有一次语义的应用来说,我们需要自己额外处理。

一种常见的处理方式为,我们使用支持唯一键的外部系统(比如关系型数据库、Elasticsearch等)来进行结果去重。我们可以自己实现唯一键并且在消息中加入此属性,也可以根据消息的主题、分区以及位移信息来生成唯一键。另外,如果该外部系统支持事务,那么我们可以在一个事务中同时保存消息处理结果和位移。消费者重启时可以从该系统中获取位移,并且使用seek()方法来开始从相应的位移开始消费。

酷客网相关文章:

赞(0)

评论 抢沙发

评论前必须登录!