back to index

Kafka

2026/01/05
loading

Kafka是一个分布式的,支持多分区、多副本,基于Zookeeper的分布式消息流平台,同时也是一款开源的基于发布订阅模式的消息引擎系统

基本术语

消息:kafka中是数据单元被称为消息,也被称为记录,可以把它看做数据库表中某一行记录

批次:为了提高效率,消息会分批次写入kafka,批次就代指一组消息

主题:消息的种类称为主题, 可以说一个主题代表了一类信息,相当于是对消息进行了分类,主题就像是数据库中的表

分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

偏移量偏移量是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

重平衡:消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

消费者****群组:指的是由一个或多个消费者组成的群体

kafka特性

**高吞吐、**低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。

高伸缩性:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。

持久性、可靠性:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的

容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作

高并发:支持数千个客户端同时读写

使用场景

活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。

传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。

度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

流式处理:流式处理是有一个能够提供多种应用程序的领域。

限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

kafka系统架构

img

一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

Kafka特点

顺序写入

Kafka 将消息持久化到磁盘时,采用的是只追加 (Append-Only) 的模式。这种顺序写入的方式避免了磁盘磁头频繁寻道(随机写入)的巨大开销,

零拷贝

在传统的数据传输中(从磁盘读取并发送到网络),数据需要在内核空间和用户空间之间多次拷贝,消耗大量 CPU 和内存资源。

kafka利用linux的sendfile()系统调用,数据直接从磁盘传输到网卡,完全绕过来用户空间,减少了两次数据拷贝和上下文切换,显著提升了数据传输效率

批处理

无论是生产者发送还是消费者拉取,Kafka 都以“批”为单位进行。

  • 生产者:不会每产生一条消息就立即发送,而是先在本地攒成一个批次(例如,攒够 16KB 或等待 5ms),然后一次性发送。
  • 消费者:也是一次性拉取一批消息,而不是一条一条地获取。

消息压缩

Kafka 支持对批量消息进行压缩(如 Snappy, LZ4, GZIP 等)。由于批量消息的数据量大且有重复性,压缩率非常高(例如 JSON 消息可达 80%)。压缩后的数据在网络上传输和磁盘上存储时,体积大幅减小,进一步提升了网络 I/O 和磁盘 I/O 的效率。

消费者组优势

消费者组解决的是分布式下的分区分配、位移和容错,多进程消费通常需要自己分区,且偏移量需要自己记录,否则重启容易重复消费或丢进度。

消费者组由Kafka在broker侧维护组内有哪些成员,消费者会向broker发送心跳来维护自己是消费者组的一员。组内每个消费者实例拿到一部分分区,但一个分区同一时刻在组内只应被一个消费者处理。成员上下线会rebalance,分区在存活成员间自动重分。偏移量按group.id+topic+partition提交到集群,别的示例接管分区后可以从上次提交继续

Kafka使用

在Go语言中操作Kafka主流的客户端有两个:

  • Sarama:纯Go实现,无第三方依赖
  • confluent-kafka-go:基于c语言 librdkafka,性能高

安装依赖

1
go get github.com/IBM/sarama

通用配置

1
2
3
4
5
6
7
8
9
10
import (
"github.com/IBM/sarama"
)
func newConfig() *sarama.Config {
cfg := sarama.NewConfig()
cfg.Version = sarama.V3_6_0_0
cfg.Producer.Return.Successes = true
cfg.Consumer.Return.Errors = true
return cfg
}
  • Version:Kafka协议版本
  • Producer.Return.Successes:Producer是否需要确认消息发送成功
  • Consumer.Return.Errors:是否需要自动将错误推送到Errors通道

同步生产者

适合需要确认发送结果的场景(如订单消息),SendMessage发送后会阻塞等待Kafka响应,发完一条消息需要等待后才能再发一条。创建主题时建议使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 使用默认分区策略(有 Key 时按 Key 哈希;无 Key 时由分区器决定)。
func runProducer(brokers []string, topic string) error {
cfg := newConfig()
p, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return fmt.Errorf("创建生产者: %w", err)
}
defer func() { _ = p.Close() }()

for i := 0; i < 5; i++ {
key := fmt.Sprintf("user-%d", i%3)
body := fmt.Sprintf("msg-%d-%s", i, time.Now().Format(time.RFC3339Nano))
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(body),
}
partition, offset, err := p.SendMessage(msg)
if err != nil {
return fmt.Errorf("发送失败: %w", err)
}
}
return nil
}

异步生产者

异步生产者适合海量消息,不阻塞的场景,性能远高于同步生产者,通过Input()投递,结果在Successes()Errors()中获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func runProducerAsync(brokers []string, topic string) error {
cfg := newConfig()
p, err := sarama.NewAsyncProducer(brokers, cfg)
if err != nil {
return fmt.Errorf("创建 AsyncProducer: %w", err)
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for msg := range p.Successes() {
log.Printf("发送成功:",mshg)
}
}()
go func() {
defer wg.Done()
for pe := range p.Errors() {
log.Printf("发送失败 %v", pe.Err)
}
}()
for i := 0; i < 5; i++ {
key := fmt.Sprintf("user-%d", i%3)
body := fmt.Sprintf("async-%d-%s", i, time.Now().Format(time.RFC3339Nano))

p.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(body),
}
}
// 关闭后内部会刷完队列并关闭 Successes/Errors,wg 的 goroutine 随之退出
if err := p.Close(); err != nil {
return fmt.Errorf("关闭 AsyncProducer: %w", err)
}
wg.Wait()
return nil
}

消费者

Kafka只需写入一次消息,可以支持多个应用读取该消息,每个应用都可以读到全量的消息,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

c.ConsumePartition 启动该 topic partition 的消费者

并返回对应的 PartitionConsumer(pc),用于读取该分区的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func runConsumerAll(brokers []string, topic string) error {
cfg := newConfig()
c, err := sarama.NewConsumer(brokers, cfg)
if err != nil {
return fmt.Errorf("创建消费者: %w", err)
}
defer func() { _ = c.Close() }()

parts, err := c.Partitions(topic)
if err != nil {
return fmt.Errorf("查询分区: %w", err)
}
if len(parts) == 0 {
return fmt.Errorf("topic %q 无分区或不存在", topic)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
// 为每个分区各启一个消费者
for _, p := range parts {
wg.Add(1)
go func(partition int32) {
defer wg.Done()
pc, err := c.ConsumePartition(topic, partition, sarama.OffsetNewest)
if err != nil {
log.Printf("分区 %d 启动失败: %v", partition, err)
return
}
defer func() { _ = pc.Close() }()

for {
select {
// 监听取消信号,当cancel执行或上下文超时退出
case <-ctx.Done():
return
case err := <-pc.Errors():
if err != nil {
log.Printf("分区 %d 错误: %v", partition, err)
}
case msg, ok := <-pc.Messages():
if !ok {
return
}
log.Printf("接收消息:",msg)
}
}
}(p)
}
wg.Wait()
return nil
}

消费者组

消费者组处理器必须实现 sarama.ConsumerGroupHandler 接口,group.Consume启动消费者组消费流程,处理被分配的分区消息,Consume需循环调用,应对rebalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type consumerHandler struct{}

// 初始化:启动时执行一次
func (c consumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }

// 清理:退出时执行一次
func (c consumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

// 核心:消费消息
func (c consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// 循环读取消息
for msg := range claim.Messages() {
// 手动提交偏移量(生产环境必须加)
session.MarkMessage(msg, "")
}
return nil
}
// 消费者组
func runConsumerGroup(brokers []string, topic, groupID string) {
config := getConfig()
config.Consumer.Offsets.Initial = sarama.OffsetNewest // 新消费者从最新消息开始消费

group, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
log.Fatalf("创建消费者组失败: %v", err)
}
defer group.Close()

fmt.Println("消费者启动成功,等待消息...")
// 持续消费
handler := consumerHandler{}
for {
err := group.Consume(nil, []string{topic}, handler)
if err != nil {
log.Printf("消费异常: %v", err)
}
}
}
CATALOG
  1. 1. 基本术语
  2. 2. kafka特性
  3. 3. 使用场景
  4. 4. kafka系统架构
  5. 5. Kafka特点
    1. 5.1. 顺序写入
    2. 5.2. 零拷贝
    3. 5.3. 批处理
  6. 6. 消息压缩
  7. 7. 消费者组优势
  8. 8. Kafka使用
    1. 8.1. 安装依赖
    2. 8.2. 通用配置
    3. 8.3. 同步生产者
    4. 8.4. 异步生产者
    5. 8.5. 消费者
    6. 8.6. 消费者组