• 隐藏侧边栏
  • 展开分类目录
  • 关注微信公众号
  • 我的GitHub
  • QQ:1753970025
Chen Jiehua

Zookeeper 

Kafka的运行依赖于zookeeper,那么zookeeper这货究竟是啥……

简介

Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

安装配置

官方下载Zookeeper的二进制包,解压到本地即可直接运行:

$ wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.8.tar.gz
$ tar -zxvf zookeeper-3.4.8.tar.gz
$ cd zookeeper-3.4.8

配置文件在 conf/zoo.cfg,其中部分参数:

# The number of milliseconds of each tick
tickTime=2000
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
  • tickTime:Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
  • dataDir:Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
  • clientPort:客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

然后我们就可以启动zookeeper了:

$ ./bin/zkServer.sh start
$ ./bin/zkServer.sh status
$ ./bin/zkServer.sh stop

集群

上面我们以standalone的形式启动了一台zookeeper服务器,但这样的部署方式并非高可用的。 虽然ZooKeeper是一个针对分布式系统的协调服务,但它本身也是一个分布式应用程序。为了实现高可用性,我们可以考虑以集群的方式进行部署:

ZooKeeper 服务器的集合形成了一个 ZooKeeper 集合体(ensemble)。在任何给定的时间内,一个 ZooKeeper 客户端可连接到一个 ZooKeeper 服务器。每个 ZooKeeper 服务器都可以同时处理大量客户端连接。每个客户端定期发送 ping 到它所连接的 ZooKeeper 服务器,让服务器知道它处于活动和连接状态。被询问的 ZooKeeper 服务器通过 ping 确认进行响应,表示服务器也处于活动状态。如果客户端在指定时间内没有收到服务器的确认,那么客户端会连接到集合体中的另一台服务器,而且客户端会话会被透明地转移到新的 ZooKeeper 服务器。

配置

修改配置文件 conf/zoo.cfg:

# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5

# zookeeper server
server.1=192.168.211.1:2888:3888 
server.2=192.168.211.2:2888:3888
  • initLimit:Zookeeper 服务器集群中 Follower 服务器连接到 Leader 服务器,初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10 个心跳时间(tickTime)的长度后表明连接失败。总的时间长度就是 10*2000=20 秒;
  • syncLimit:标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒;
  • server.A=B:C:D
    •  A 是一个数字,表示这个是第几号服务器;
    • B 是这个服务器的 ip 地址;
    • C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
    • D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

规模

当客户端请求读取特定 znode 的内容时,读取操作是在客户端所连接的服务器上进行的。因此,由于只涉及集合体中的一个服务器,所以读取是快速和可扩展的。

然而,为了成功完成写入操作,要求 ZooKeeper 集合体的严格意义上的多数节点都是可用的。在启动 ZooKeeper 服务时,集合体中的某个节点被选举为领导者。当客户端发出一个写入请求时,所连接的服务器会将请求传递给领导者。此领导者对集合体的所有节点发出相同的写入请求。如果严格意义上的多数节点(也被称为法定数量(quorum))成功响应该写入请求,那么写入请求被视为已成功完成。然后,一个成功的返回代码会返回给发起写入请求的客户端。

法定数量是通过严格意义上的多数节点来表示的。因此,节点数量应该是奇数的。而对于数据的写入,只有当写入法定数量的节点时,写入操作才是成功的。这意味着,随着在集合体中的节点数量的增加,写入性能会下降,因为必须将写入内容写入到更多的服务器中,并在更多服务器之间进行协调。

因此,在 ZooKeeper 集合体中,三、五或七是最典型的节点数量

数据模型

ZooKeeper 的数据模型类似于Unix文件系统,由 znodes 组成。可以将 znodes(ZooKeeper 数据节点)视为类似 UNIX 的传统系统中的文件,但它们可以有子节点。存储在 znode 中的数据的默认最大大小为 1 MB。

不过,即使 ZooKeeper 的层次结构看起来与文件系统相似,也不应该将它用作一个通用的文件系统。相反,应该只将它用作少量数据的存储机制,以便为分布式应用程序提供可靠性、可用性和协调。

zookeeper node通过 zkCli.sh 我们可以查看zookeeper中存储的数据:

$ ./bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
	connect host:port
	get path [watch]
	ls path [watch]
	set path data [version]
	rmr path
	delquota [-n|-b] path
	quit 
	printwatches on|off
	create [-s] [-e] path data acl
	stat path [watch]
	close 
	ls2 path [watch]
	history 
	listquota path
	setAcl path acl
	getAcl path
	sync path
	redo cmdno
	addauth scheme auth
	delete path [version]
	setquota -n|-b val path
[zk: localhost:2181(CONNECTED) 1] ls /
[test, zookeeper]

[zk: localhost:2181(CONNECTED) 2] get /zookeeper/quota

cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0

[zk: localhost:2181(CONNECTED) 3] rmr /test

Zk in Kafka

说了这么多,我们现在来看看zookeeper在kakfa中的作用。我们知道Kafka的运行必须依赖于zookeeper,对于kafka的配置 config/server.properties:

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

delete.topic.enable=true

其中,最关键的参数 zookeeper.connect,这里我们也可以指定某一个znode,比如:

zookeeper.connect=localhost:2181/kafka_test

那么所有相关的数据都是存储到 /kafka_test 这个 znode 下。

zookeeper kafka

KafkaOffsetMonitor

对于kafka中的topics、consumer groups、offset等数据,为了方便查看,我们可以通过KafkaOffsetMonitor进行查看,具体详见github

首先,下载kom的jar包,然后启动:

java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --offsetStorage kafka
     --zk zk-server1,zk-server2 \
     --port 8080 \
     --refresh 10.seconds \
     --retain 2.days

Pykafka

参考官方文档,初始化 KafkaClient 时指定 zookeeper_hosts,然后就可以创建 balanced_consumers:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykafka


def main():
    client = pykafka.client.KafkaClient(zookeeper_hosts="localhost:2181/kafka_test")
    print client.topics
    topic = client.topics["test"]
    consumer = topic.get_balanced_consumer(consumer_group="spark")
    for msg in consumer:
        print msg, msg.offset, msg.value
        break


if __name__ == "__main__":
    main()
码字很辛苦,转载请注明来自ChenJiehua《Zookeeper》

评论