聊聊tempo的ExclusiveQueues


本文摘自网络,作者,侵删。

本文主要研究一下tempo的ExclusiveQueues

ExclusiveQueues

tempo/pkg/flushqueues/exclusivequeues.go

type ExclusiveQueues struct {
    queues     []*util.PriorityQueue
    index      *atomic.Int32
    activeKeys sync.Map
}
ExclusiveQueues定义了queues、index、activeKeys属性

New

tempo/pkg/flushqueues/exclusivequeues.go

// New creates a new set of flush queues with a prom gauge to track current depth
func New(queues int, metric prometheus.Gauge) *ExclusiveQueues {
    f := &ExclusiveQueues{
        queues: make([]*util.PriorityQueue, queues),
        index:  atomic.NewInt32(0),
    }

    for j := 0; j < queues; j++ {
        f.queues[j] = util.NewPriorityQueue(metric)
    }

    return f
}
New方法先创建ExclusiveQueues,然后根据指定的queue个数通过util.NewPriorityQueue(metric)创建PriorityQueue

Enqueue

tempo/pkg/flushqueues/exclusivequeues.go

// Enqueue adds the op to the next queue and prevents any other items to be added with this key
func (f *ExclusiveQueues) Enqueue(op util.Op) {
    _, ok := f.activeKeys.Load(op.Key())
    if ok {
        return
    }

    f.activeKeys.Store(op.Key(), struct{}{})
    f.Requeue(op)
}
Enqueue方法先从activeKeys查找指定的key,若已经存在则提前返回,不存在则放入activeKeys中,然后执行f.Requeue(op)

Requeue

tempo/pkg/flushqueues/exclusivequeues.go

// Requeue adds an op that is presumed to already be covered by activeKeys
func (f *ExclusiveQueues) Requeue(op util.Op) {
    flushQueueIndex := int(f.index.Inc()) % len(f.queues)
    f.queues[flushQueueIndex].Enqueue(op)
}
Requeue方法首先通过int(f.index.Inc()) % len(f.queues)计算flushQueueIndex,然后找到对应的queue,执行Enqueue方法

Dequeue

tempo/pkg/flushqueues/exclusivequeues.go

// Dequeue removes the next op from the requested queue.  After dequeueing the calling
//  process either needs to call ClearKey or Requeue
func (f *ExclusiveQueues) Dequeue(q int) util.Op {
    return f.queues[q].Dequeue()
}
Dequeue方法执行f.queues[q]对应queue的Dequeue

Clear

tempo/pkg/flushqueues/exclusivequeues.go

// Clear unblocks the requested op.  This should be called only after a flush has been successful
func (f *ExclusiveQueues) Clear(op util.Op) {
    f.activeKeys.Delete(op.Key())
}
Clear方法将指定key从activeKeys中移除

Stop

tempo/pkg/flushqueues/exclusivequeues.go

// Stop closes all queues
func (f *ExclusiveQueues) Stop() {
    for _, q := range f.queues {
        q.Close()
    }
}
Stop方法遍历f.queues,挨个执行q.Close()

小结

tempo的ExclusiveQueues定义了queues、index、activeKeys属性;它提供了Enqueue、Requeue、Dequeue、Clear、Stop方法。

doc

  • tempo

本文来自:Segmentfault

感谢作者:.container .card .information strong

查看原文:聊聊tempo的ExclusiveQueues

相关阅读 >>

如何处理Golang返回值较多问题

我用Go-zero开发了第一个线上项目

手撸Golang 基本数据结构与算法 图的最短路径  狄克斯特拉算法

Go语言从入门到实战,带你拿下Golang的高效编程法

Golang如何捕获错误

Go - 如何解析 json 数据?

分享一个Go json 踩坑记录

vim--Golang开发配置

我想和你聊聊:freetsdb不只是influxdb集群

无缝连接 dubbo-Go 与 grpc

更多相关阅读请进入《Go》频道 >>




打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

分享从这里开始,精彩与您同在

评论

管理员已关闭评论功能...