1. kafka是什么? 分布式的、水平可扩展的、高容错的、有分区和副本的 消息缓冲系统或流式数据处理和存储平台 为什么kafka的性能比较高? i. 消息集:message set,生产者将消息发送给kafka的时候,可以将多条消息一起发送,减少io的次数 ii. 二进制传输:kafka的消息传递过程中,会将数据转换为字节数组来进行传递 iii. 顺序读写磁盘:根据offset递增的顺序读取磁盘 iv. "零"拷贝:kafka在传输数据的时候,log文件中的数据直接通过系统内存直接网络传输,不经过应用的内存(kafka进程的内存)在进行数据的交换 v. 端到端的数据压缩:kafka支持直接传输压缩的数据(压缩是kafka的producer api负责),kafka接收到producer传输过来的压缩过的数据,不会进行解压缩操作而直接存储, 然后当consumer获取数据的时候,直接将压缩好的数据传输过去,consumer接收到压缩数据后再进行解压缩操作。kafka默认支持三种方式的压缩,即lz4(一般不用)、gzip和 snappy(最常用),producer端配置参数为 compression.codec,参数默认为none,表示不进行压缩。 2. 相关概念: 消息 offset,key,value,timestamp broker 指的是一个kafka服务进程 topic 一组消息的逻辑整体,producer将消息写到一个topic中,consumer消费某个topic的消息 partition 一个topic包含多个分区,producer发布到topic的消息根据key的不同发布到不同的partition中,一个分区中的消息有两个特性 有序:按照进入kafka的时间排序 数据不可变:进入kafka集群的数据不可进行变动 producer 生产者 consumer & consumerGroup 消费者和消费者组 多个consumer共同进行数据消费,然后多个consumer之间形成负载均衡的特性 一个consumerGroup中如果consumer的数量和消费的topic的partition数量一样多,那么每个consumer消费一个partition的数据, 一个consumerGroup中如果consumer的数量超过消费的topic的partition的数量,那么将会有部分consumer处于不消费数据的状态 streams[0.10+] 处理kafka流式数据的一个模块/api,类似于storm或sparkstreaming等流式数据处理平台 connector[0.10+] 可以讲kafka中的数据持久化到存储系统,或者从存储系统读入数据,比较类似于flume的数据收集功 3. 架构图和原理 a,b 消息发送的格式: 一个kafka的message有一个固定长度的header和一个变长的消息体body构成,header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成, 当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,如是否压缩、压缩格式等),如果magic的值为0,那么不存在attributes属性。 body是由N个字节构成的消息体,包含了具体的key-value信息【注意:每个版本的kafka的消息格式是不一样的】 消息存储的格式: 存储在磁盘上的日志采用不同于producer发送的消息的格式,每个日志文件都是一个”log entries“序列,每个log entry包含一个四字节整数(message长度,值为1+4+N), 一个字节magic,四个字节的crc32值,最终是N个字节的消息数据。每条消息都有一个当前partition下唯一的64位的offset,指定该消息的起始下标位置,存储消息格式如下: ------------------------------------ message length: 4 bytes(value:1+4+N) magic value: 1 byte crc: 4 bytes payload: n bytes ------------------------------------ 这个”log entries“并非由一个文件构成,而是分成多个segment file(日志文件,存储具体的消息记录)和一个索引文件(存储每个segment文件的offset偏移量范围),如图所示: 图 消息存储的过程: a.一个topic分为多个partition来进行数据的管理,一个partition中的数据是有序、不可变的,使用偏移量(offset)唯一标识的一条数据 b.partition接收到producer发送来到数据后,会产生一个递增的offset偏移量数据,同时将数据存储在本地的磁盘文件中(文件内容追加的方式写入数据),partition的数据存活时间 超过参数值(log.retention.{ms,minutes,hours},默认为7天)的时候进行删除 c.consumer根据offset消费对应topic partition中的数据(也就是每个topic的partition都拥有自己的offset偏移量) 注意:kafka的数据消费是顺序读写的,磁盘的顺序读写速度是(600M/s),比随机读写速度(100k/s)快得多 消息存储的分布式机制: a.一个topic中的所有数据分布式地存储在kafka集群的所有broker上,以分区(partition)的形式进行数据的存储,每个分区允许存在备份分区。 b.每个分区在kafka集群中存储在一个broker节点上的分区叫做leader,存储在其他broker上的备份分区叫做followers,只有leader节点负责该分区的数据读写操作,followers节点 作为leader节点的热备节点,从leader节点备份数据,当leader节点挂掉的时候,followers节点中会有一个节点编程leader节点,重新提供服务。 c.kafka集群的partition的leader和followers切换依赖zookeeper。 消息的产生和收集机制: a.kafka集群中由producer负责数据的产生,并发送到对应的topic,producer通过push的方式将数据发送到对应的topic中。 b.producer发送到topic的数据是由key-value键值对组成的,kafka根据key的不同决定将数据发送到哪一个partition,默认采用hash的机制发送数据到不同的partition中,配置参数 为 partitioner.class。 c.producer发送数据的方式分为sync(同步)和 async(异步)两种,默认为同步方式,由参数 producer.type 决定,producer提供重试机制,默认失败重试3次。 消息的消费机制: a.kafka有两种模式消费数据:队列和发布订阅。在队列模式下,一条数据只会发送给consumer group中的一个consumer进行消费,在发布订阅模式下,一条数据会发送给多个consumer消费 两种模式是通过consumer的 group.id 参数来决定的,当多个consumer的group.id是一样情况下是队列模式(相同group.id的consumer组成一个 consumer group组),如果不一 样,则是发布订阅模式。consumer基于offset对kafka中的数据进行消费,对于一个consumer group中的所有consumer共享一个offset偏移量。 b.kafka的数据是按照分区进行排序的(插入的顺序),也就是每个分区的数据是有序的。在consumer进行消费的时候,也是对分区的数据进行有序的消费的,但是不保证所有数据的有序性,因为 存在多个分区 c.当一个consumer group组中的消费者数量和topic的分区的数量一致的时候,此时一个consumer消费一个partition的数据,如果不一致,那么可能出现一个consumer消费多个 partition的数据或不消费数据的情况,这个机制是根据consumer和partitiond的数量动态变化的。 d.consumer通过poll的方式主动从kafka集群中获取数据。 高可用性和分区的副本: a.kafka的replication指的是partitiond的复制,一个partition的所有分区中只有一个分区是leader节点,其他分区都是follower节点。replication对kafka的吞吐率有一定的 影响,但是极大增强了可用性 b.follower节点会定时从leader节点获取增量数据,一个活跃的follower节点必须满足以下两个条件: i.所有的节点必须维护和zookeeper的连接(通过zk的心跳实现) ii.follower必须能够及时将leader上的writing复制过来,不能落后太多,“落后太多”由参数 replica.log.time.max.ms 和 replica.log.max.messages来决定 c.kafka leader election:kafka分区主节点的选择机制,通过ISR实现(in-sync-replication)leader的选择。当leader宕机的时候,即当一个partition宕机或者落后数据 太多,leader会将该partition的broker标识符从isr列表删除,kafka将会从剩余的isr列表中选择一个节点来充当leader(此时所有follower会在zk中试图创建一个文件夹,创建 成功的follower称为leader)。 d.message delivery semantics:消息传输协议/类型,是对消息系统中消息传递可靠性保障的一个定义。 三种类型的可靠性保证: i. at most once: 最多只发送一次,允许数据丢失。消费者fetch消息,在zk保存offset,然后处理消息 ii. at least once: 最少发送一次,允许数据重复。消费者fetch消息,处理消息,然后在zk保存offset iii. exactly once: 仅发送一次,不允许数据丢失和数据重复,有可能存在数据发送失败的情况。没有严格去实现 kafka中两种实现数据传输可靠性的方式: producer端:通过参数 request.required.acks 来设置,acks是生产者等待kafka集群的接收确认返回值,主要有三个值: i. 0,表示producer不等待kafka的返回结果,有可能存在数据丢失 ii. 1,表示producer等待kafka的一个分区(主分区)的数据保存成功的返回结果,有可能数据在分区的备份分区copy主分区数据的时候主分区宕机,导致数据丢失 iii. -1,表示producer等待kafka的所有分区的数据保存成功的返回结果 注意:配合以下参数使用 超时时间:request.timeout.ms 最大尝试次数:message.send.max.retries 尝试间隔时间:retry.backoff.ms consumer端:通过offset偏移量实现数据消费的可靠性,小于offset的消息是已经被消费的,大于或等于offset的数据是没有被消费的。 4. 安装 单机:一台主机且只有一个broker 伪分布式:一台主机且有多个broker 完全分布式:多台主机且有多个broker 以完全分布式搭建为例: 1. 搭建jdk、scala环境 2. 搭建zookeeper环境 修改zookeeper.out日志文件的位置 $ vim $ZOOKEEPER_HOME/bin/zkServer.sh 修改 _ZOO_DAEMON_OUT="$ZOO_LOG_DIR/zookeeper.out" 为 _ZOO_DAEMON_OUT="/var/log/zookeeper/zookeeper.out" 3. 根据scala的版本下载kafka的版本,解压到指定目录和创建软链接 4. 配置信息 vim config/server.properties #kafka集群中broker的id broker.id=0 //#监听的端口号,默认为9092 //port=9092 //#远程能访问的hostname,默认为localhost //host.name=nimbusz //#外网远程能访问的ip //#advertised.listeners=PLAINTEXT://your.host.name:9092 [kafka0.10+] listeners=PLAINTEXT://nimbusz:9092 #advertised.listeners=PLAINTEXT://your.host.name:9092 #处理网络请求的线程数量,默认为3个 num.network.threads=3 #磁盘io的线程数,默认为8个 num.io.threads=8 #数据发送和接收时字节缓冲大小,默认都为100k socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 #一个请求过程中服务端能够接收的最大数据大小,默认为100M,配置的目的是防止出现OOM socket.request.max.bytes=104857600 #指定kafka才能出磁盘的路径,可以使用","分割,指定多个磁盘路径,将数据分布到不同的磁盘中,以提升磁盘的读写性能 log.dirs=/tmp/kafka-logs #指定最小的分区数,默认为1 num.partitions=1 #指定每一个数据文件或日志文件有多少个线程来j进行数据恢复 num.recovery.threads.per.data.dir=1 #配置zk的连接信息,默认kafka在zk中的根路径为“/” zookeeper.connect=numbusz:2181,supervisor01z:2181,supervisor02z:2181/kafka #配置连接zk的过期时间 zookeeper.connection.timeout.ms=6000 vim log4j.properties kafka的日志默认存储在${KAFKA_HOME}/logs文件夹中,可以通过修改log4j.properties文件来更改日志存储路径 kafka.logs.dir=logs 或者可以通过修改kafka-run-class.sh更改kafka打印的日志目录 vim kafka-run-class.sh #LOG_DIR="$base_dir/logs" LOG_DIR="/var/log/kafka/logs" 5. 复制分发kafka安装目录到各个节点,并修改server.properties文件相关参数【主要是broker.id】 scp -r kafka-xxx ubuntu@supervisor01z:/xxx ... 6. 启动kafka集群【注意前提是先启动zk】 bin/kafka-server-start.sh -daemon config/server.properties 7. 关闭kafka服务【关闭的是当前主机的所有kafka进程】 bin/kafka-server-stop.sh 5. kafka在zk中的元数据的描述 /kafka /consumers 连接kafka的消费者的信息,包括消费者&读取数据的偏移量 /brokers kafka集群中broker的相关描述信息 /config topic的配置信息 /controller topic分区各个副本之间leader选举时候用到的一个目录 /controller_epoch 和controller类似 /admin kafka管理相关信息 6. 常用命令 topic常用命令 #创建一个topic: bin/kafka-topics.sh --create --zookeepeer xxx,xxx,xxx/kafka --topic xxx --partitions 5 --replication-factor 3 --config xxx=xxx partitions:topic分区数,通常该值是broker数量的1-2倍 replication-factor:需要给定每个分区的副本因子,该值不能超过broker的数量,一般不会超过3 #查看集群中有哪些topic: bin/kafka-topics.sh --list --zookeepeer xxx,xxx,xxx/kafka #查看集群中topic的详细信息: bin/kafka-topics.sh --describe --zookeepeer xxx,xxx,xxx/kafka --topic xxx #修改topic配置信息: 修改配置:bin/kafka-topics.sh --alter --zookeepeer xxx,xxx,xxx/kafka --topic xxx --config xxx=xxx 删除配置:bin/kafka-topics.sh --alter --zookeepeer xxx,xxx,xxx/kafka --topic xxx --delete-config xxx 重置分区【注意分区数量只允许增加,而不允许减小】:bin/kafka-topics.sh --alter --zookeepeer xxx,xxx,xxx/kafka --topic xxx --partitions 10 副本重置:bin/kafka-topics.sh --alter --zookeepeer xxx,xxx,xxx/kafka --topic xxx --replication-assignment 0:1:2,1:2:3 #删除topic: bin/kafka-topics.sh --delete --zookeepeer xxx,xxx,xxx/kafka --topic xxx bin/kafka-run-class.sh kafka.admin.TopicCommand --delete --zookeeper xxx,xxx,xxx/kafka --topic xxx 注意:默认情况下是标记删除,即在zk的/kafka/admin/delete_topics目录下有一个标记,没有实际删除topic对应zookeeper的元数据信息和topic对应的磁盘文件 #如果需要实际删除topic,则有两种方式: 方式一:通过delete命令标记删除后,手动将本地磁盘以及zk上的相关topicd的信息删除即可 方式二:配置server.properties文件,给定参数delete.topic.enable=true,使用delete命令删除时就真正删除了这个topic producer常用命令 #启动生产者: bin/kafka-console-producer.sh --broker-list nimbusz:9092,supervisor01z:9092,supervisor02z:9092 --topic xxx cusumer常用命令 #启动消费者: bin/kafka-console-consumer.sh --zookeeper xxx,xxx,xxx/kafka --topic xxx [--from-beginning] [--property group.id=g1]