go cassandra 示例2


当前第2页 返回上一页

以下代码为批量插入数据库(到达预设长度后,立即持久化。当最后长度不满足后,到达一定时间插入数据库),通过通道eventQueue1操作数据

package dbwrite

import (
	"fmt"
	"github.com/gocql/gocql"
	uuid "github.com/satori/go.uuid"
	"time"
	"src/golangConfig"
	"src/cassandra"
)

type CassandraTable struct {
	cassandraInsertCql  string
	srcIp               string
	.....表字段

func NewCassandraTable(cassandraInsertCql string, srcIp string, .....表字段) *CassandraTable {
	var cassandraTable = new(CassandraTable)
	cassandraTable.cassandraInsertCql = cassandraInsertCql
	cassandraTable.srcIp = srcIp
	.....表字段
	return cassandraTable
}

var (
	eventQueue1     chan *CassandraTable
	BatchWriteSize1 int
	Workers1        int
	lingerTime1     time.Duration

	batchProcessor1 = func(batch *gocql.Batch) error {
		yxCassandra.InsertCassandraMany(batch)
		return nil
	}
	errHandler1 = func(err error, batch *gocql.Batch) {
		fmt.Println("some error happens")
	}
)

func setWorkers1() {
	for i := 0; i < Workers1; i++ {
		go func() {
			batch := cassandra.CassandraSessionBatchInit()
			lingerTimer := time.NewTimer(5 * time.Second)
			if !lingerTimer.Stop() {
				<-lingerTimer.C
			}
			defer lingerTimer.Stop()

			for {
				select {
				case cassandraTable := <-eventQueue1:
					ul, _ := uuid.NewV4()
					batch.Query(cassandraTable.cassandraInsertCql, cassandraTable.srcIp, .....表字段, ul.String())
					if batch.Size() != BatchWriteSize1 {
						if batch.Size() == 1 {
							lingerTimer.Reset(lingerTime1)
						}
						break
					}
					if err := batchProcessor1(batch); err != nil {
						errHandler1(err, batch)
					}

					if !lingerTimer.Stop() {
						<-lingerTimer.C
					}

					batch = cassandra.CassandraSessionBatchInit()
				case <-lingerTimer.C:
					if err := batchProcessor1(batch); err != nil {
						errHandler1(err, batch)
					}
					batch = cassandra.CassandraSessionBatchInit()
				}
			}
		}()
	}
}

func PushCassandraElement(e *CassandraTable) {
	eventQueue1 <- e
}

func CassandraInit() {
	eventQueue1 = make(chan *CassandraTable, golangConfig.InitConfigMap["cassandraEventQueueCache"].(int))
	BatchWriteSize1 = golangConfig.InitConfigMap["cassandraBatchWriteSize"].(int)
	Workers1 = golangConfig.InitConfigMap["cassandraWorkers"].(int)
	lingerTime1 = golangConfig.InitConfigMap["cassandraLingerTime"].(time.Duration)
	cassandra.CassandraConnectInit()
	setWorkers1()
}

向通道存入值

cassandraTable:=dbwrite.NewCassandraTable(yxCassandra.CassandraInsertCql, packetInfo.srcIp....)
dbwrite.PushCassandraElement(cassandraTable)

 


本文来自:51CTO博客

感谢作者:mb6018e97449ea1

查看原文:go cassandra 示例2

返回前面的内容

相关阅读 >>

Golang 协程占多大内存

Go设计模式之策略模式浅谈

Golang rune几个字节

Go-array

Golang不能隐式转换吗

Go的切片(进阶版)

一个微服务 demo 将我三年 Go web 开发经验传授给你

微服务实战Go micro v3 系列(二)- helloworld

easydss如何定制Go语言生成csv文件?

Golang中的defer关键字

更多相关阅读请进入《Go》频道 >>




打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

分享从这里开始,精彩与您同在

评论

管理员已关闭评论功能...