zk golang 库中有专为digest
构造的方法:
zk.DigestACL(perms int32, user, password string)
此方法传入的密码需要是明文,其内部逻辑会将明文转为密文再向 zookeeper 传递。
使用示例:
conn.SetACL("/test", zk.DigestACL(31, "user1", "123456"), 0)
4 ip
ip 权限顾名思义,就是限制 ip 地址的访问权限。
把节点的权限设置给指定的 ip 地址后,其他 ip 将无法访问该节点。
设置指定 ip
func main() {
conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
if err != nil {
panic(err)
}
defer conn.Close()
_, err = conn.SetACL("/ip", ipACL(31, "172.17.0.1"), 0)
if err != nil {
panic(err)
}
fmt.Println("节点[/ip] 已设置 ip 权限")
}
其他 ip 访问此节点
新建了一个 docker 容器,ip 地址为 172.17.0.5,访问此节点时:
data, _, err := conn.Get("/ip")
if err != nil {
panic(err)
}
fmt.Println(string(data))
panic: zk: not authenticated
报认证错误。
使用指定 ip 访问此节点
data, _, err := conn.Get("/ip")
if err != nil {
panic(err)
}
fmt.Println("节点[/ip] 存储的内容:", string(data))
acl, _, err := conn.GetACL("/ip")
if err != nil {
panic(err)
}
fmt.Println("\nacl 信息:")
fmt.Println("scheme =", acl[0].Scheme)
fmt.Println("id =", acl[0].ID)
fmt.Println("permissions =", acl[0].Perms)
结果:
节点[/ip] 存储的内容:ip
acl 信息:
scheme = ip
id = 172.17.0.1
permissions = 31
三、监听 / watch
watch 用来实现发布/订阅功能,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。
每个 watch 仅有一次触发的机会,一旦触发会立即失效,想要持续监听,就需要一直注册。
go-zookeeper 中的监听机制是通过事件/ Event 完成的。
1 监听一个节点
1.1 全局监听
将监听器放到Connect
函数中,如果有监听事件发生,会一直执行监听器的回调函数。
示例代码:
func callback(e zk.Event) {
fmt.Println("++++++++++++++++++++++++")
fmt.Println("path:", e.Path)
fmt.Println("type:", e.Type.String())
fmt.Println("state:", e.State.String())
fmt.Println("------------------------")
}
func main() {
eventCallbackOption := zk.WithEventCallback(callback)
conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second, eventCallbackOption)
if err != nil {
panic(err)
}
defer conn.Close()
// 注册一个 watch
exists, state, _, err := conn.ExistsW("/watch")
if err != nil {
panic(err)
}
if !exists {
// 创建 /watch 时,触发监听事件,watch 失效
_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
panic(err)
}
// 再注册一个 watch
_, state, _, err = conn.ExistsW("/watch")
if err != nil {
panic(err)
}
}
// 删除 /watch 时,触发监听事件,watch 失效
err = conn.Delete("/watch", state.Version)
if err != nil {
panic(err)
}
}
结果:
++++++++++++++++++++++++
path:
type: EventSession
state: StateConnecting
------------------------
++++++++++++++++++++++++
path:
type: EventSession
state: StateConnected
------------------------
2021/04/18 16:57:11 connected to 172.17.0.2:2181
++++++++++++++++++++++++
path:
type: EventSession
state: StateHasSession
------------------------
2021/04/18 16:57:11 authenticated: id=72057651018596414, timeout=4000
2021/04/18 16:57:11 re-submitting `0` credentials after reconnect
++++++++++++++++++++++++
path: /watch
type: EventNodeCreated
state: Unknown
------------------------
++++++++++++++++++++++++
path: /watch
type: EventNodeDeleted
state: Unknown
------------------------
2021/04/18 16:57:11 recv loop terminated: EOF
2021/04/18 16:57:11 send loop terminated: <nil>
++++++++++++++++++++++++
path:
type: EventSession
state: StateDisconnected
------------------------
1.2 只监听部分事件
对于不需要设置全局监听器的场景,需要对事件 channel 进行操作,watch 使用一次就失效。
func main() {
conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
if err != nil {
panic(err)
}
defer conn.Close()
// 注册一个 watch
exists, _, eventChannel, err := conn.ExistsW("/watch")
if err != nil {
panic(err)
}
go func() {
// 从事件 channel 中取出事件
e := <-eventChannel
fmt.Println("++++++++++++++++++++++++")
fmt.Println("path:", e.Path)
fmt.Println("type:", e.Type.String())
fmt.Println("state:", e.State.String())
fmt.Println("------------------------")
}()
if !exists {
// 创建 /watch 时,触发监听事件,watch 失效
_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
panic(err)
}
}
}
2 监听子节点
TODO: 监听子节点时服务端返回的事件类型可能不正确,待完善
四、客户端随机 host
func main() {
hosts := []string{"172.17.0.2:2181", "172.17.0.3:2181", "172.17.0.4:2181"}
hostProvider := new(zk.DNSHostProvider)
err := hostProvider.Init(hosts)
if err != nil {
panic(err)
}
host, retry := hostProvider.Next() // 获得host
fmt.Println(host, retry)
time.Sleep(10 * time.Second) // 做一些事情
hostProvider.Connected() // 将使用过的 host 放到 host_list 最后
}
zk.Connect
函数在执行时,conn
内的私有属性hostProvider
已经对传的 host 切片做了 conn.hostProvider.Init(srvs)
处理。
但同时,也可以通过WithHostProvider
函数替换默认的hostProvider
。
本文来自:简书
感谢作者:thepoy
查看原文:Zookeeper 的 Golang 客户端
相关阅读 >>
tools easily execute sql against structured text like csv or tsv
更多相关阅读请进入《Go》频道 >>

Go语言101
一个与时俱进的Go编程知识库。