Zookeeper 的 Golang 客户端


当前第2页 返回上一页

zk golang 库中有专为digest构造的方法:

zk.DigestACL(perms int32, user, password string)

此方法传入的密码需要是明文,其内部逻辑会将明文转为密文再向 zookeeper 传递。

使用示例:

conn.SetACL("/test", zk.DigestACL(31, "user1", "123456"), 0)

4 ip

ip 权限顾名思义,就是限制 ip 地址的访问权限。

把节点的权限设置给指定的 ip 地址后,其他 ip 将无法访问该节点。

设置指定 ip

func main() {
    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    _, err = conn.SetACL("/ip", ipACL(31, "172.17.0.1"), 0)
    if err != nil {
        panic(err)
    }
    fmt.Println("节点[/ip] 已设置 ip 权限")
}

其他 ip 访问此节点

新建了一个 docker 容器,ip 地址为 172.17.0.5,访问此节点时:

    data, _, err := conn.Get("/ip")
    if err != nil {
        panic(err)
    }
    fmt.Println(string(data))
panic: zk: not authenticated

报认证错误。

使用指定 ip 访问此节点

    data, _, err := conn.Get("/ip")
    if err != nil {
        panic(err)
    }
    fmt.Println("节点[/ip] 存储的内容:", string(data))

    acl, _, err := conn.GetACL("/ip")
    if err != nil {
        panic(err)
    }
    fmt.Println("\nacl 信息:")
    fmt.Println("scheme =", acl[0].Scheme)
    fmt.Println("id =", acl[0].ID)
    fmt.Println("permissions =", acl[0].Perms)

结果:

节点[/ip] 存储的内容:ip

acl 信息:
scheme = ip
id = 172.17.0.1
permissions = 31

三、监听 / watch

watch 用来实现发布/订阅功能,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。

每个 watch 仅有一次触发的机会,一旦触发会立即失效,想要持续监听,就需要一直注册。

go-zookeeper 中的监听机制是通过事件/ Event 完成的。

1 监听一个节点

1.1 全局监听

将监听器放到Connect函数中,如果有监听事件发生,会一直执行监听器的回调函数。

示例代码:

func callback(e zk.Event) {
    fmt.Println("++++++++++++++++++++++++")
    fmt.Println("path:", e.Path)
    fmt.Println("type:", e.Type.String())
    fmt.Println("state:", e.State.String())
    fmt.Println("------------------------")
}

func main() {
    eventCallbackOption := zk.WithEventCallback(callback)

    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second, eventCallbackOption)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    // 注册一个 watch
    exists, state, _, err := conn.ExistsW("/watch")
    if err != nil {
        panic(err)
    }

    if !exists {
        // 创建 /watch 时,触发监听事件,watch 失效
        _, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if err != nil {
            panic(err)
        }

        // 再注册一个 watch
        _, state, _, err = conn.ExistsW("/watch")
        if err != nil {
            panic(err)
        }
    }

    // 删除 /watch 时,触发监听事件,watch 失效
    err = conn.Delete("/watch", state.Version)
    if err != nil {
        panic(err)
    }
}

结果:

++++++++++++++++++++++++
path: 
type: EventSession
state: StateConnecting
------------------------
++++++++++++++++++++++++
path: 
type: EventSession
state: StateConnected
------------------------
2021/04/18 16:57:11 connected to 172.17.0.2:2181
++++++++++++++++++++++++
path: 
type: EventSession
state: StateHasSession
------------------------
2021/04/18 16:57:11 authenticated: id=72057651018596414, timeout=4000
2021/04/18 16:57:11 re-submitting `0` credentials after reconnect
++++++++++++++++++++++++
path: /watch
type: EventNodeCreated
state: Unknown
------------------------
++++++++++++++++++++++++
path: /watch
type: EventNodeDeleted
state: Unknown
------------------------
2021/04/18 16:57:11 recv loop terminated: EOF
2021/04/18 16:57:11 send loop terminated: <nil>
++++++++++++++++++++++++
path: 
type: EventSession
state: StateDisconnected
------------------------

1.2 只监听部分事件

对于不需要设置全局监听器的场景,需要对事件 channel 进行操作,watch 使用一次就失效。

func main() {

    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    // 注册一个 watch
    exists, _, eventChannel, err := conn.ExistsW("/watch")
    if err != nil {
        panic(err)
    }

    go func() {
        // 从事件 channel 中取出事件
        e := <-eventChannel
        fmt.Println("++++++++++++++++++++++++")
        fmt.Println("path:", e.Path)
        fmt.Println("type:", e.Type.String())
        fmt.Println("state:", e.State.String())
        fmt.Println("------------------------")
    }()

    if !exists {
        // 创建 /watch 时,触发监听事件,watch 失效
        _, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if err != nil {
            panic(err)
        }
    }
}

2 监听子节点

TODO: 监听子节点时服务端返回的事件类型可能不正确,待完善

四、客户端随机 host

func main() {
    hosts := []string{"172.17.0.2:2181", "172.17.0.3:2181", "172.17.0.4:2181"}
    hostProvider := new(zk.DNSHostProvider)

    err := hostProvider.Init(hosts)
    if err != nil {
        panic(err)
    }

    host, retry := hostProvider.Next() // 获得host
    fmt.Println(host, retry)

    time.Sleep(10 * time.Second) // 做一些事情

    hostProvider.Connected() // 将使用过的 host 放到 host_list 最后
}

zk.Connect函数在执行时,conn内的私有属性hostProvider已经对传的 host 切片做了 conn.hostProvider.Init(srvs) 处理。

但同时,也可以通过WithHostProvider函数替换默认的hostProvider


本文来自:简书

感谢作者:thepoy

查看原文:Zookeeper 的 Golang 客户端

返回前面的内容

相关阅读 >>

Golang判断数据类型和获取数据类型

Golang的zap日志库的简单封装

Golang 如何安装包

tools easily execute sql against structured text like csv or tsv

Go Go.mod详解

Golang调用cmd命令时如何隐藏dos窗口

基于Golang的手机号格式验证和邮箱格式验证

Golang怎么判断map是否为空

聊聊Gost的countwatch

json分级解析及数字解析实践

更多相关阅读请进入《Go》频道 >>




打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

分享从这里开始,精彩与您同在

评论

管理员已关闭评论功能...