基于 MySQL 分布式锁,防止多副本应用初始化数据重复


当前第2页 返回上一页

核心代码:

func NewLockDb(action, holder string, lease time.Duration) *lockDb {
    return &lockDb{
        db:       GetDB(context.Background()),
        stopCh:   make(chan struct{}),
        action:   action,
        holder:   holder,
        leaseAge: lease,
    }
}

func (s *lockDb) Lock() (bool, error) {
    err := s.cleanExpired()
    if err != nil {
        return false, errx.WithStackOnce(err)
    }

    err = s.db.Create(&lock{
        ExpiredAt: time.Now().Add(s.leaseAge),
        Action:    s.action,
        Holder:    s.holder,
    }).Error
    if err != nil {
        // Duplicate entry '<action_val>' for key 'action'
        if strings.Contains(err.Error(), "Duplicate entry") {
            return false, nil
        }
        return false, errx.WithStackOnce(err)
    }

    s.startLease()

    log.Debugf("%s get lock", s.holder)

    return true, nil
}

func (s *lockDb) UnLock() error {
    s.stopLease()
    var err error

    defer func() {
        err = s.db.
            Where("action = ? and holder = ?", s.action, s.holder).
            Delete(&lock{}).
            Error
    }()

    return err
}

func (s *lockDb) cleanExpired() error {
    err := s.db.
        Where("expired_at < ?", time.Now()).
        Delete(&lock{}).
        Error

    return err
}

func (s *lockDb) startLease() {
    go func() {
        // 剩余 1/4 时刷新租约
        ticker := time.NewTicker(s.leaseAge * 3 / 4)
        for {
            select {
            case <-ticker.C:
                err := s.refreshLease()
                if err != nil {
                    log.Errorf("refreash lease err: %s", err)
                } else {
                    log.Debug("lease refreshed")
                }
            case <-s.stopCh:
                log.Debug("lease stopped")
                return
            }
        }
    }()
}

func (s *lockDb) stopLease() {
    close(s.stopCh)
}

func (s *lockDb) refreshLease() error {
    err := s.db.Model(&lock{}).
        Where("action = ? and holder = ?", s.action, s.holder).
        Update("expired_at", time.Now().Add(s.leaseAge)).
        Error

    return err
}

使用及测试:

func TestLock(t *testing.T) {
    i := 3
    wg := &sync.WaitGroup{}
    wg.Add(i)

    for i > 0 {
        holder := strconv.Itoa(i)
        action := "test"

        i--
        go func() {
            defer wg.Done()

            locker := dbcore.NewLockDb(action, holder, 10*time.Second)

            if _, err := locker.Lock(); err != nil {
                t.Logf("not hold the lock, err: %+v", err)
                return
            }

            time.Sleep(30 * time.Second)
            locker.UnLock()
        }()
    }

    wg.Wait()
}

完整代码:https://github.com/win5do/go-...

这个分布式锁实现在初始数据场景是够用了,但并不完美,例如:依赖时间同步,不能容忍时间偏斜;获取锁不是阻塞的,如果要抢锁需要使用方自旋; 锁不可重入,粒度是进程级别,同一个 Action,当前进程获取锁后,释放后才能再次获取锁。

大家可以思考一下如何完善。


本文来自:Segmentfault

感谢作者:.container .card .information strong

查看原文:基于 MySQL 分布式锁,防止多副本应用初始化数据重复

返回前面的内容

相关阅读 >>

分享一个Go语言采坑:闭包共享变量问题

详解Golang编译成dll文件

Golang json乱码解决方法

Golang 怎么做热更新

Go语言函数引用传递值

leetcode232 用栈实现队列 Golang

Go是什么动态语言?

手撸Golang 行为型设计模式 责任链模式

deepin下配置protobuf(含Go语言protoc-gen-Go插件安装)

Golang map 不排序怎么办

更多相关阅读请进入《Go》频道 >>




打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

分享从这里开始,精彩与您同在

评论

管理员已关闭评论功能...