您当前的位置:首页 > 互联网教程

如何确定Kafka的分区数,key和consumer线程数

发布时间:2025-05-19 18:16:25    发布人:远客网络

如何确定Kafka的分区数,key和consumer线程数

一、如何确定Kafka的分区数,key和consumer线程数

一、客户端/服务器端需要使用的内存就越多

先说说客户端的情况。Kafka 0.8.2之后推出了Java版的全新的producer,这个producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。

服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。

每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit-n的限制。

Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。

说了这么多“废话”,很多人肯定已经不耐烦了。那你说到底要怎么确定分区数呢?答案就是:视情况而定。基本上你还是需要通过一系列实验和测试来确定。当然测试的依据应该是吞吐量。虽然LinkedIn这篇文章做了Kafka的基准测试,但它的结果其实对你意义不大,因为不同的硬件、软件、负载情况测试出来的结果必然不一样。我经常碰到的问题类似于,官网说每秒能到10MB,为什么我的producer每秒才1MB?——且不说硬件条件,最后发现他使用的消息体有1KB,而官网的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数= Tt/ max(Tp, Tc)

Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大,因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。

另外,Kafka并不能真正地做到线性扩展(其实任何系统都不能),所以你在规划你的分区数的时候最好多规划一下,这样未来扩展时候也更加方便。

默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key)% numPartitions,如下图所示:

def partition(key: Any, numPartitions: Int): Int={

Utils.abs(key.hashCode)% numPartitions

这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?

if(key== null){//如果没有指定key

val id= sendPartitionPerTopicCache.get(topic)//先看看Kafka有没有缓存的现成的分区Id

partitionId//如果有的话直接使用这个分区Id就好了

case None=>//如果没有的话,

val availablePartitions= topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)//找出所有可用分区的leader所在的broker

if(availablePartitions.isEmpty)

throw new LeaderNotAvailableException("No leader for any partition in topic"+ topic)

val index= Utils.abs(Random.nextInt)% availablePartitions.size//从中随机挑一个

val partitionId= availablePartitions(index).partitionId

sendPartitionPerTopicCache.put(topic, partitionId)//更新缓存以备下一次直接使用

可以看出,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)

我个人的观点,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。让我们来看看具体Kafka是如何分配的。

topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。——其实ConsoleConsumer可以使用通配符的功能实现同时消费多个topic数据,但这和本文无关。

再讨论分配策略之前,先说说KafkaStream——它是consumer的关键类,提供了遍历方法用于consumer程序调用实现数据的消费。其底层维护了一个阻塞队列,所以在没有新消息到来时,consumer是处于阻塞状态的,表现出来的状态就是consumer程序一直在等待新消息的到来。——你当然可以配置成带超时的consumer,具体参看参数consumer.timeout.ms的用法。

下面说说Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0~ P9,consumer线程数是3, C0~ C2,那么每个线程都分配哪些分区呢?

val nPartsPerConsumer= curPartitions.size/ curConsumers.size//每个consumer至少保证消费的分区数

val nConsumersWithExtraPart= curPartitions.size% curConsumers.size//还剩下多少个分区需要单独分配给开头的线程们

for(consumerThreadId<- consumerThreadIdSet){//对于每一个consumer线程

val myConsumerPosition= curConsumers.indexOf(consumerThreadId)//算出该线程在所有线程中的位置,介于[0, n-1]

assert(myConsumerPosition>= 0)

// startPart就是这个线程要消费的起始分区数

val startPart= nPartsPerConsumer* myConsumerPosition+ myConsumerPosition.min(nConsumersWithExtraPart)

// nParts就是这个线程总共要消费多少个分区

val nParts= nPartsPerConsumer+(if(myConsumerPosition+ 1> nConsumersWithExtraPart) 0 else 1)

针对于这个例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart为10%3=1,说明每个线程至少保证3个分区,还剩下1个分区需要单独分配给开头的若干个线程。这就是为什么C0消费4个分区,后面的2个线程每个消费3个分区,具体过程详见下面的Debug截图信息:

nConsumersWithExtraPart= 10% 3= 1

startPart= 1* 3+ min(1, 1)= 4---也就是从分区4开始读

nParts= 3+(if(1+ 1> 1) 0 else 1)= 3读取3个分区,即4,5,6

startPart= 3* 0+ min(1, 0)=0---从分区0开始读

nParts= 3+(if(0+ 1> 1) 0 else 1)= 4读取4个分区,即0,1,2,3

startPart= 3* 2+ min(2, 1)= 7---从分区7开始读

nParts= 3+ if(2+ 1> 1) 0 else 1)= 3读取3个分区,即7, 8, 9

说到这里,经常有个需求就是我想让某个consumer线程消费指定的分区而不消费其他的分区。坦率来说,目前Kafka并没有提供自定义分配策略。做到这点很难,但仔细想一想,也许我们期望Kafka做的事情太多了,毕竟它只是个消息引擎,在Kafka中加入消息消费的逻辑也许并不是Kafka该做的事情。

二、【Kafka系列】Kafka Consumer消费时的线程模型

1、在讨论Kafka消费者消费消息时的线程模型之前,我们需要了解Kafka消息消费的两种方式:@KafkaListener注解用于声明方法作为Kafka消息监听器,被动等待Kafka集群推送消息。另一方面,poll方法允许消费者主动拉取消息。

2、消息监听器(@KafkaListener)示例:

3、通过注入KafkaConsumer bean并调用poll方法,手动消费消息。首先,在配置类或服务中注入KafkaConsumer,然后在服务类中注入KafkaConsumer并调用poll方法。

4、在生产环境中,实现更复杂的逻辑来管理KafkaConsumer生命周期、异常处理和线程安全。

5、总体而言,@KafkaListener适合简化消息监听和处理,而poll方法允许对消息的主动控制。

6、Kafka消费者采用消息拉取模型,要求消费者主动调用KafkaConsumer#poll(java.time.Duration)方法从broker拉取数据。Kafka客户端设计为非线程安全,允许在多线程环境中灵活使用。

7、消息拉取机制涉及内部维护的链表ConcurrentLinkedQueue来缓存已拉取的消息数据。

8、在实际项目中,消费速度可达到十万每秒,使用默认配置时,每个fetch请求通常返回1条消息记录,单机fetch请求QPS可达4,5k,保证了较低的消费延迟。

9、在高负载场景下,可采用单线程消费模型,但吞吐量受限,不适用于实时性要求高的应用。多线程消费模型,每个线程管理独立的KafkaConsumer实例,可提升吞吐量和消息处理效率。

10、多线程消费模型涉及消费者组与分区的关系,每个消费者组中的多个线程共同协作处理同一个topic下的不同分区。

11、为解决线程安全问题,可采用类似Reactor模型的多线程消费模型,实现消息接受与处理的完全解耦,提升消费效率。

12、总结,线程模型主要有单线程、多线程对应topic和Reactor模型。推荐使用Reactor模型,实现消息接受与处理完全解耦,提升消息处理效率。

三、java exchanger用在什么场景

1、Exchanger是在两个任务之间交换对象的栅栏。当两个任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有对方的对象。Exchanger的典型应用场景是:一个任务在创建对象,而这些对象的生产代价很高,另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费。

2、为了演示Exchanger类,我们将创建生产者和消费者任务。ExchangerProducer和ExchangerConsumer使用一个List<Fat>作为要求交换的对象,它们都包含一个用于这个List<Fat>的Exchanger。当你调用Exchanger.exchange()方法时,它将阻塞直至对方任务调用它自己的exchange()方法,那时,这两个exchange()方法将同时完成,而List<Fat>被交换:

3、import java.util.concurrent.CopyOnWriteArrayList;

4、import java.util.concurrent.Exchanger;

5、import java.util.concurrent.ExecutorService;

6、import java.util.concurrent.Executors;

7、import java.util.concurrent.TimeUnit;

8、class ExchangerProducer implements Runnable{

9、private List<Fat> holder;

10、private Exchanger<List<Fat>> exchanger;

11、public ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder){

12、for(int i= 0;i< ExchangerDemo.size; i++){

13、holder= exchanger.exchange(holder);

14、} catch(InterruptedException e){

15、System.out.println("Producer stopped.");

16、class ExchangerConsumer implements Runnable{

17、private List<Fat> holder;

18、private Exchanger<List<Fat>> exchanger;

19、public ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder){

20、holder= exchanger.exchange(holder);

21、//在循环内删除元素,这对于CopyOnWriteArrayList是没有问题的

22、System.out.println("Exchanged count="+ num);

23、} catch(InterruptedException e){

24、System.out.println("Consumer stopped. Final value:"+ value);

25、public static void main(String[] args) throws Exception{

26、ExecutorService exec= Executors.newCachedThreadPool();

27、List<Fat> producerList= new CopyOnWriteArrayList<>();

28、List<Fat> consumerList= new CopyOnWriteArrayList<>();

29、Exchanger<List<Fat>> exchanger= new Exchanger<>();

30、exec.execute(new ExchangerProducer(exchanger, producerList));

31、exec.execute(new ExchangerConsumer(exchanger, consumerList));

32、TimeUnit.SECONDS.sleep(delay);

33、private static int counter= 1;

34、private final int id= counter++;

35、for(int i= 1; i<10000; i++){

36、d+=(Math.PI+ Math.E)/(double)i;

37、public void print(){System.out.println(this);}

38、public String toString(){return"Fat id="+ id;}

39、Consumer stopped. Final value: Fat id=88300

40、在main()中,创建了用于两个任务的单一的Exchanger,以及两个用于互换的CopyOnWriteArrayList。这个特定的List变体允许列表在被遍历的时候调用remove()方法,而不会抛出ConcurrentModifiedException异常。ExchangerProducer将填充这个List,然后将这个满列表跟ExchangerConsumer的空列表交换。交换之后,ExchangerProducer可以继续的生产Fat对象,而ExchangerConsumer则开始使用满列表中的对象。因为有了Exchanger,填充一个列表和消费另一个列表便同时发生了。