My Avatar

Shadow

I love bleak day, like something will happen

Kafka

2017年11月16日 星期四, 发表于 北京

如果你对本文有任何的建议或者疑问, 可以在 这里给我提 Issues, 谢谢! :)

  学习一下Kafka,原理还有集群搭建以及使用


Kafka原理

  kafka是一个分布式的消息系统,确切的说是一个分布式的流式平台(streaming platform),因为它不只是一个简单的消息订阅系统,他除了能发布、订阅消息外、某一方面也能存储数据、还有流式的处理数据,总结来说它的功能下面这个图展示的比较清楚:

  一方面,应用可以作为producer,产生数据,发送给kafka,应用也可以作为consumer,从kafka消费数据,这是消息队列的作用。

  另一方面,kafka提供Streaming processor的功能,应用可以作为流式处理器,订阅一部分数据,并进行实时的处理,然后再将处理后的数据发布到kafka中供其他应用订阅。

  最后,kafka还提供了connector的功能,可以从db、file等其他datasource中接收数据,并转发其他的db、file等数据目的地。

主题和日志

  kafka里面,某一类别的流式数据被称为一个主题,一个主题可以有多个订阅者,也就是订阅了这个主题的消费者能够接收到该主题的数据。

  对于每个主题,kafka是如何存储的呢?是通过分区的日志来存储的。具体见下图:

  每个主题的数据被分成多个分区(分区数用户可以自己指定,也可以是一个分区),每一个分区是一个顺序的、不变的数据记录的序列,分区中的记录每一个都被分配了一个连续的id,也就是偏移量,用来唯一标识分区中每一个记录。

  kafka将所有的发布的记录都保存了下来,即使被消费了还是保存着,这样消费者可以重复消费,直到数据超过了配置的保存时间,才会被丢弃。

  kafka的容错性是怎么保证的? 首先kafka是分布式的,那么每个分区,可以在多个服务器上都有备份。而对于每一个分区,在有其数据的所有服务器中,会有一个是leader的角色,而其他的服务器都是follower,leader负责接收所有客户端的读写请求,而follower则只是单纯的备份了所有的数据,但是一旦leader崩溃了,则会有新的follower成为leader。

  可是这样,leader的压力不会很大吗?那么kafka是怎么做负载均衡的呢?其实,因为每个topic一般都会有多个分区(partition),而每一个分区的leader服务器都是随机选择的,所以A分区的leader虽然是a,但是B分区的leader有可能是b,这样就不存在一台服务器压力过大的情况了。

  接着,我们来谈谈,kafka中的消费者角色的问题,他和普通的消息队列的消费者还不太一样,见下图:

  首先,它这里有个消费者群组的概念,每个消费者群组里都有一定数量的消费者实体。而每个topic的数据,都会广播发到所有的消费者群组里,也就是不同的消费者群组只要订阅了相同主题的数据,那么这些数据肯定是每个消费者群组都会发一份。

  但是,在一个消费者群组里面,这一份数据只会被其中一个消费者实体接收到并消费掉。

  并且,能保证,一个partition的数据,都是发送到一个相同的消费者实体中,并且是按照顺序消费。所以一个消费者群组里的消费者实体肯定不会超过partition的数量。

  这样做的优点是什么呢?

  首先来看传统的消息系统,当做为消息队列使用时,可能有一堆的消费者,但每份数据只被一个consumer消费,而在发布订阅模式下,消息则广播给所有的消费者,这两种方式都有各自的优缺点,作为队列时,一个消息有多个消费者可以接受,这样扩展了系统的处理能力,但是缺点是,一旦一个消息被消费了,它就不存在了,因为别的消费者都没有接收到。而发布订阅模式,虽然所有消费者都接收到消息,但是他没有办法扩展消费者处理数据的能力,因为每个消费者只有一个实例去处理数据,而kafka通过消费者群组的概念解决了这个问题。

Kafka搭建

  搭建kafka需要zookeeper,那么先搭建一个zookeeper集群吧

  

zookeeper集群搭建

  Apache ZooKeeper 是一个针对分布式系统的、开箱即用的、可靠的、可扩展的、高性能的协调服务,他提供的主要功能包括: 名称服务、锁定、同步、配置管理、领导者选举。

  zookeeper本身也是一个分布式架构,它有多个server,并且当超过半数的server可用时,整个zookeeper服务就是可用的,这也就是为什么zookeeper服务器一般都是奇数个的原因,因为如果是三个服务器,那么2个坏掉就不可用了,四个服务,也是2个坏掉就不可用了,并没有增加容错性,那为什么不用三个服务器呢?是吧

  每个zookeeper的客户端都和其中的一台server连接,所有的server都在内存中维持着相同的一份数据,所以,客户端读数据时,都是直接从与之相连的server上读取,而写入数据的时候,首先请求发送到与之相连的zookeeper服务器,然后该服务器告诉zookeeper的leader服务器,leader服务器再将写请求发送给各个zookeeper服务器,只有超过半数的服务器写成功了,才会返回给客户端写成功。

  所以,可见增加zookeeper的服务器数量,对于读取性能没有影响,但是在写数据时,因为要写的机器多了,所以会有一定的性能影响。

  至于zookeeper具体的原理还有功能实现,我们以后再介绍,因为此篇是为了搭建kafka,所以这里主要介绍下如何搭建zookeeper集群。

  首先,下载zookeeper安装包:http://mirror.bit.edu.cn/apache/zookeeper/stable/

  zookeeper在开发测试时也可以使用单机模式,配置安装比较简单,我们不介绍,主要介绍下集群搭建模式(生产环境必须是集群模式)

  这里我用的zookeeper是3.4.10版本,是当前的稳定版本

  首先准备了三台机器,假设ip为:1.1.1.1,1.1.1.2,1.1.1.3

  将安装包下载到三台服务器上,解压缩,同时创建一个存储zookeeper的快照和日志文件数据的目录,假设为/data/zookeeper/zkdata

  接下来在三台机器都做如下操作:

  首先,进入zookeeper的安装目录,进入conf目录,会发现有个zoo_sample.cfg的文件,这是示例配置文件,我们将它拷贝为zoo.cfg,这才是zookeeper默认使用的配置文件,然后修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
tickTime=2000

initLimit=10

syncLimit=5

dataDir=/data/zookeeper/zkdata

clientPort=2181

server.1=1.1.1.1:2888:3888
server.2=1.1.1.2:2888:3888
server.3=1.1.1.3:2888:3888

autopurge.snapRetainCount=3

autopurge.purgeInterval=12

  tickTime:   这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。而且也是zookeeper的基本时间单位

  initLimit:   这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒

  syncLimit:   这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒

  dataDir:   快照日志的存储路径,快照是指对所有znode数据的一个拷贝,而事务日志是指每次的更新操作就会记录到日志里。

  dataLogDir:   事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多,但是就算配置到不同的目录,也是基于这两个目录分布在不同的磁盘上时才会提高性能,而我们只有一个设备,所以就没有配置这个目录。

  clientPort:   这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

  autopurge.snapRetainCount:   在zookeeper3.4.0之后才出现的参数,在3.4.0之前,zookeeper产生的数据文件,即快照和事务日志是需要自己定时清理的,写脚本或者写定时任务清理,而在3.4.0之后,通过配置autopurge相关参数可以自动清理不需要的数据文件。这里的snapRetainCount就是指保留的快照数量,设置了之后,每次清理就只会保存最近的这么多快照文件,以及这些快照对应的事务日志文件

  autopurge.purgeInterval:   自动清理的频率,以小时数为单位,默认为0,即代表不清理

  server.x=ip:port1:port2   当配置集群时,需要设置该参数,指定集群中的所有服务器ip地址以及服务器与服务器之间通讯的端口即port1,而port2则是用来选举leader单独用到的端口。

  在所有服务器上都写好配置文件后,我们需要在各个服务器的dataDir下创建一个myid的文件,文件内容为该服务器的id,如下操作:

1
2
3
echo "1" > /data/zookeeper/zkdata/myid
echo "2" > /data/zookeeper/zkdata/myid
echo "3" > /data/zookeeper/zkdata/myid

  然后cd到bin目录下:

1
./zkServer.sh start

  三台服务器都启动后,执行:

1
./zkServer.sh status

  查询状态,可以看到服务器是follower还是leader

  需要注意的是,zookeeper是用java写的,因此服务器上得有java环境。

kafka集群搭建

  这里我使用的是1.0.0版本,下载地址:http://kafka.apache.org/downloads

  这里我们也使用三台服务器搭建集群

  将安装包扔到三台服务器上,解压

  然后分别配置config目录下的server.properties文件,因为基本上自带的文件就已经配置的差不多了,我们只需要改几个地方:

1
2
3
4
5
6
7
8
9
10
11
12
broker.id=1
log.dirs=/data/kafka/data
zookeeper.connect=1.1.1.1:2181,1.1.1.2:2181,1.1.1.3:2181

listeners=PLAINTEXT://1.1.1.1:9092

log.retention.hours=168
log.cleanup.policy=delete
log.retention.bytes=-1
auto.create.topics.enable=true
default.replication.factor=3
num.partitions=1

  broker.id:每个服务器都是不同的且唯一的一个整数,我们这里取1,2,3即可

  log.dirs:这里的log并不是指系统的日志输出,而是kafka里需要持久化的数据,所以这里的目录是持久化数据存储的目录,可以用逗号分隔多个目录,这里我就填一个即可

  zookeeper.connect:填写用逗号分隔的每个zookeeper服务器的地址即可,当然kafka是自带了一个zookeeper server的,如果是开发测试,不想自己搭zookeeper,可以用它自带的单机的zookeeper server,但是还是推荐自己搭一个zookeeper服务,毕竟用的地方还是蛮多的。

  listeners:这个listeners一定要写上ip地址,不然后面用java client连接的时候会报超时

  还有几个常用配置如下:

  log.retention.hours:日志保存的时间,也就是在这个时间之后,不管数据有没有被消费者消费,都会被系统clean掉,至于clean的方式是delete还是压缩保存,就参考下一个参数了

  log.cleanup.policy:日志清除的方式,支持delete和compact

  log.retention.bytes:指定每个分区的最大文件大小,如果一个分区的大小超过了这个限制,就会被clean掉,默认是-1,代表没有限制

  auto.create.topics.enable:是否允许自动创建主题,如果producer向一个不存在的主题发送数据,则会自动创建一个主题,主题分区数和备份数由下两个参数决定

  default.replication.factor:默认创建的主题的备份数,一般建议设置成server的数量即可

  num.partitions:默认创建的主题的分区数量,消费者消费的数据,能保证来自一个分区的数据都是顺序的,所有如果有强制要求,消费者接收到的数据全部有序,则可以设定一个分区来保证。

  写完配置文件后,在每个服务器上进入bin目录,后台启动kafka服务

1
./kafka-server-start.sh -daemon ../config/server.properties

  在log目录下可以查看系统日志,看是否有启动出错,我在第一次启动就是出了个莫名其妙的错误,好像是读取配置文件错误,我检查了下没问题,然后第二次启动就成功了。。很奇怪

  用jps命令可以看到kafka已经启动了

  接下来我们测试下,首先在一台机器上,我们用命令行创建一个topic

1
./kafka-topics.sh --create --zookeeper 1.1.1.1:2181 --replication-factor 3 --partitions 1 --topic test

  接着,我们在1.1.1.1机器上,创建一个producer

1
./kafka-console-producer.sh --broker-list 1.1.1.1:9092 --topic test

  然后,我们在1.1.1.2机器上,创建一个consumer

1
./kafka-console-consumer.sh --bootstrap-server 1.1.1.2:9092 --topic test --from-beginning

  接着我们在producer端输入一行行的数据,就可以看到consumer端也会一行行地接收到数据了,如下图:

  生产者:

  消费者:

  如果我们要看topic的状态,我们可以按照如下指令查看:

1
./kafka-topics.sh --describe --zookeeper 1.1.1.1:2181 --topic test

  结果如下:

  其中,第一行,描述了,topic名,分区数,备份数,接下来的每一行,代表对每一个分区的描述,因为我们这个test主题只有一个分区,因此下面只有一行数据,这一行里,首先是分区所属的主题名,然后是分区id,然后是该分区的leader服务器id,这里是2,然后是所有备份该分区的服务器id列表,这里3,2,1三个服务器都备份了,最后的Isr是代表,目前alive并且跟leader已经同步上的服务器列表,这里同样也是3,2,1三个服务器都是已经完成同步并且活跃的状态

  至此我们的kafka集群环境就搭建完毕了。

kafka client producer使用

  当我们需要用java来作为producer向kafka生产数据时,可以引入maven包:

1
2
3
4
5
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>1.0.0</version>
</dependency>

  然后,我们可以按照如下方式构建一个producer:

1
2
3
4
5
6
7
8
9
10
11
Properties props = new Properties();
props.put("bootstrap.servers", "1.1.1.1:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

  在props里面对producer进行配置,具体配置的含义可以参考http://kafka.apache.org/documentation/#producerconfigs

  这里对主要的几个做说明:

  bootstrap.servers:这是用来初始化连接时指定的kafka服务器名和端口号,可以指定多个,用逗号分隔,例如1.1.1.1:9092,1.1.1.2:9092,需要注意的是,这里的服务器只是初始化连接用的,并不需要指定所有的kafka集群中的机器,一般来说列表中指定的服务器数量大于1个即可,防止指定一个时服务器down了

  acks:这是指partition的leader需要接受多少确认才返回客户端写入成功,有三个备选值,当acks为0时,代表producer send操作后立即返回成功,这时实际上数据还是放在buffer里,还没有request到服务器;如果acks为1,则代表leader会将该数据写到自己的本地日志中,但并不等其他follower确认写入,就立即返回成功;如果acks为all,则代表leader会等到所有在线并且与leader同步的其他follower都写入成功后,才会返回

  建立了producer之后,我们就可以通过send方法来发送数据了:

1
2
3
4
5
6
7
8
9
10
11
//方法1
producer.send(new ProducerRecord<String, String>(topic, key, value));

//方法2
producer.send(new ProducerRecord<String, String>(topic, key, value),(metadata,exception) ->{
	if(exception !=null){
		//写入失败
	}else{
		//写入成功
	}
});

  我们发送的都是一个ProducerRecord的对象,在构造该对象实例时,可以指定topic、partition、key、value

  如果指定了partition,那么就会将数据发送到指定的分区,如果没有指定partition,但是指定了key,则会根据key的hash值选择一个分区,所有相同key的数据保证分到一个分区,如果key也没有指定,那么数据会随机分到一个分区。

  send方法都是异步操作,立即返回,但这时数据还没有request到服务器,而是在buffer区,等到buffer区的数据一起request到服务器并且成功后,对于方法1,方法的返回值是一个future,可以通过future.get()来阻塞程序,如果抛出异常,则send失败,如果get方法返回了,则代表成功

  对于方法2,更好理解点,传入了一个callback参数,这里用lambda表达式展示,callback就是一个带metadata和exception两个参数的接口,如果失败了,则exception不为空,否则成功