• 隐藏侧边栏
  • 展开分类目录
  • 关注微信公众号
  • 我的GitHub
  • QQ:1753970025
Chen Jiehua

Kafka 简介 

目录

最近接触到了kafka和zookeeper,顺便把一些知识点记录一下备忘……

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

简介

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展。

为什么我们要使用消息系统呢?

  • 解耦:消息系统在数据处理过程中间插入了一个隐含的接口层,两边的处理过程都要实现这一接口。这允许我们独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  • 扩展:消息队列解耦了处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
  • 易恢复:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  • 异步:消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

Kafka 架构

概念

  • Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker;
  • Producer:负责发布消息到Kafka broker;
  • Consumer:消息消费者,向Kafka broker读取消息的客户端;
  • Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group);
  • Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处);
  • Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition;
  • Segment:Partition物理上由多个segment组成;
  • Offset:每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。Partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息;

拓扑

一个kafka集群一般由以下部分构成:

kafka

 

  • 若干个Producer:产生消息数据,push到broker;
  • 若干个Broker:Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高;
  • 若干个Consumer:从broker pull消息,对消息数据进行处理;
  • 若干个Consumer Group:一般情况下,同一个group的consumer不会重复处理数据;
  • 一个zookeeper集群:Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance;

物理结构

在逻辑上,我们可以认为topic是一个queue,每个producer指定了topic之后就将其产生的消息push到相应的queue。

在物理上,为了使得Kafka的吞吐率可以线性提高,kafka把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

配置文件 server.properties 中主要涉及的参数有:

  • log.dirs=/data/kafka:数据文件存储根目录
  • num.partitions=1:默认的partitions数量

在单机环境(只有一个broker),创建一个名为 test 的 topic,partitions=2:

# 首先下载 kafka
$ tar -xzf kafka_2.11-0.9.0.0.tgz
$ cd kafka_2.11-0.9.0.0
# kafka 的运行需要依赖 zookeeper,下载的包内置了一个 zookeeper,我们可以直接启动
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 kafka
$ ./bin/kafka-server-start.sh config/server.properties
# 创建 topic
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

这样,我们在 /data/kafka 目录下将可以看到:

drwxrwxr-x 2 jachua jachua 4.0K  5月  9 17:24 test-0
drwxrwxr-x 2 jachua jachua 4.0K  5月  9 17:24 test-1

在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录。每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中,segment文件生命周期由服务端配置参数决定。

segment文件

segment文件由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。

segment文件的命名,partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度。

# /data/kafka/test-0
-rw-rw-r-- 1 jachua jachua 10M  5月  9 17:24 00000000000000000000.index
-rw-rw-r-- 1 jachua jachua   0  5月  9 17:24 00000000000000000000.log

kafka-fs-index-correspond-data

索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。

Pykafka

接下来我们可以很容易使用python的第三方库pykafka进行操作,另外也有一个同类的kafka库kafka-python,其对balance consumer的支持比较弱,具体的对比可以参考issue

from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
print client.topics
topic = client.topics['test']
with topic.get_sync_producer() as producer:
     for i in range(4):
          producer.produce('test message ' + str(i ** 2))
from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
print client.topics
topic = client.topics['test']
consumer = topic.get_simple_consumer()
   for message in consumer:
      if message is not None:
          print message.offset, message.value

参考:

码字很辛苦,转载请注明来自ChenJiehua《Kafka 简介》

评论