kafka配置及实践

概述

Apache Kafka™ is a distributed streaming platform.
我使用的是 kafka 0.9+。
参考官方文档,本文包括kafka的集群架构和工作原理,配置文件解析,常用脚本命令和java API操作。

kafka的集群架构和原理

kafka

Topics and Logs

Topic - 每个消息所属的类别,用来区分消息
partition - 对于一个topic来说,可以配置多个partition来分开存放这个Topic的消息
单个partition按顺序存放
对于一个topic来说,生产出来的消息会被分发到各个partition内存储。一个消息被分发到哪个partition内是可配置的。
消息在每个partition内都是有序的,但是整体是不能保证有序的。partition内的数据存放到日志文件中,每个partition中的记录都被分配一个顺序的id号,称为唯一标识partition内每个记录的偏移量。
kakfa会保留所有的数据,即使是已经被消费过的数据。只有数据过期了才会被删除,数据保留多久是可以配置的。
消息的消费
消费者通过partition内的偏移量来读取消息,就像读取一个数组一样,可以重复访问任何数据。

Distribution

日志的分区分布在Kafka集群中的服务器(broker)上,每个服务器处理数据并请求分区的一部分。每个分区都跨可配置数量的服务器进行复制,以实现容错。
每个分区有一个服务器,充当“leader”,零个或多个服务器充当“关注者”。领导者处理分区的所有读取和写入请求,而追随者被动地复制领导者。如果领导失败,其中一个追随者将自动成为新的领导者。每个服务器都作为一些partitions的leader,和其他的partitions的follower。(既是leader,也是follower)

Producers

生产者将数据发布到他们选择的主题。生产者负责选择要分配给主题中哪个分区的记录。这可以通过循环方式简单地平衡负载,或者可以根据某些语义分区功能(例如基于记录中的某些关键字)来完成。

Consumers

每个consumer都可以指定一个consumergroup,若不指定就属于默认的Group。Consumer Group作为订阅topics的基本单位。该Group订阅的topics中的一个partition只能由组内的一个consumer消费,如果组内的consumers个数多于partitions的数目,那么意味着某些consumers将不会得到任何消息,所以consumer数不能多于partition数。如果某个consumer失败,那么它负责的partitions将会分配到其他的consumer上。
Consumer Group

Kafka as a Messaging System

Kafka结合了队列和发布-订阅两种模式。消费者可以顺序读取数据,未来的数据将会发布给订阅者。

Kafka as a Storage System

Kafka的数据都保存在本地磁盘中,每个partion都是一个文件夹(topic-xxx),存放在配置文件中指定的logs.dir中。一个partition由多个segment组成,每个segment由一个索引文件(.index)和一个数据文件(.log)组成。

配置文件解释

server.properties

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
# broker全局的唯一编号,不能重复
broker.id=1
# Switch to enable topic deletion or not, default value is false
# 如果不设置为true的话,那么客户端删除一个topic将只会把topic标志为删,并不是真的删除.
# 在企业中,一般设置为false,因为业务多,所以删除topic有很大风险
delete.topic.enable=true
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# broker的主机,
host.name=172.17.0.3
port=9092
# 向producers和consumers广播的ip地址和port(此处有坑,见后)
advertised.host.name=10.0.0.100
advertised.port=9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads handling network requests
# 处理网络请求的线程数量
num.network.threads=3
# The number of threads doing disk I/O
# 用来处理磁盘IO的线程数量
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
# 接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
# 数据存放的位置
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 一个topic在当前broker的partition数量
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
# 数据保存的最长时间,超时就会被删除
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 每个segment的大小
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# 周期性检查文件数据是否超时的时间
log.retention.check.interval.ms=300000
# 日志清理是否打开
log.cleaner.enable=true
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# zookeeper的网络位置
zookeeper.connect=zk1:2181
# Timeout in ms for connecting to zookeeper
# zookeeper连接超时时间
zookeeper.connection.timeout.ms=6000

参考文章
按照官方文档的说法,advertised.host.name和advertised.port这两个参数用于定义集群向Producer和 Consumer广播的节点host和port,如果不定义的话,会默认使用host.name和port的定义。但在实际应用中,我发现如果不定义 advertised.host.name参数,使用Java客户端从远端连接集群时,会发生连接超时,抛出异常:org.apache.kafka.common.errors.TimeoutException: Batch Expired
经过debug发现,连接到集群是成功的,但连接到集群后更新回来的集群meta信息却是错误的.
能够看到,metadata中的Cluster信息,节点的hostname是iZ25wuzqk91Z这样的一串数字,而不是实际的ip地址 10.0.0.100和101。iZ25wuzqk91Z其实是远端主机的hostname,这说明在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的。要解决这一问题,把host.name和advertised.host.name都配置成绝对 的ip地址就可以了。

producer.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
# 指定kafka节点列表,不用全部指定.从任意一个kafka节点都能得到全部kafka集群元数据
bootstrap.servers=kfk1:9092,kfk2:9092,kfk3:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4
# 消息是否压缩 0:不压缩 1:用gzip压缩 2:用snappy压缩 3:用lz4压缩
# 压缩后消息头指明压缩类型,故在消费者端消息解压是透明的且无需指定的
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
# 指定分区类型,默认是key哈希到对应分区
partitioner.class=kafka.producer.DefaultPartitioner
# 设置发送消息是否需要服务端的反馈
# 0:producer will not wait for ack
# 1:send ACK when leader get
# 2:send ACK when all follwers get
#request.required.acks=0
# the maximum amount of time the client will wait for the response of a request
request.timeout.ms=10000
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# 同步还是异步分送消息, 默认同步"sync", 异步为"async"
# 异步可以提高吞吐量,消息将会缓存在本地buffer中,并适时批量发送,但是也有可能丢失消息
producer.type=sync
# 在异步模式下,当消息缓存的时间超过此值后将会被发送
queue.buffering.max.ms=5000
# 异步模式下,buffer最大缓存消息量,buffer中的消息量达到此阈值后,producer将会阻塞或者将接下来的消息抛弃
queue.buffering.max.messages=10000
# 在异步模式下,每次批量发送的数据量,默认为200
batch.num.messages=500
# 异步模式下,当buffer满了的时候,过了一定时间,buffer中依旧没有消息被发送,那么
# -1 : producer继续阻塞,阻塞时间不限,消息不会被抛弃
# 0 : 立刻清空buffer,消息被抛弃
queue.enqueue.timeout.ms=-1
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

consumer.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=zk1:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000

常用脚本命令

启用一个broker实例

1
bin/kafka-server-start.sh config/server.properties

启用多个broker实例

1
2
3
bin/kafka-server-start.sh config/server1.properties
(broker.id要唯一,而且在同一个机器上运行时配置文件中的listeners=PLAINTEXT://:9093,log.dir=/tmp/kafka-logs-1不能重复)
bin/kafka-server-start.sh config/server2.properties

创建一个topic

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

删除一个topic

1
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

查看所有topic

1
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看某个topic的详细信息

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

用控制台生产消息

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

用控制台消费消息

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Use Kafka Connect to import/export data

java API

producer

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
//以下三项配置是必须的
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //必须的
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
}
}

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}