以下代码为批量插入数据库(到达预设长度后,立即持久化。当最后长度不满足后,到达一定时间插入数据库),通过通道eventQueue1操作数据
package dbwrite import ( "fmt" "github.com/gocql/gocql" uuid "github.com/satori/go.uuid" "time" "src/golangConfig" "src/cassandra" ) type CassandraTable struct { cassandraInsertCql string srcIp string .....表字段 func NewCassandraTable(cassandraInsertCql string, srcIp string, .....表字段) *CassandraTable { var cassandraTable = new(CassandraTable) cassandraTable.cassandraInsertCql = cassandraInsertCql cassandraTable.srcIp = srcIp .....表字段 return cassandraTable } var ( eventQueue1 chan *CassandraTable BatchWriteSize1 int Workers1 int lingerTime1 time.Duration batchProcessor1 = func(batch *gocql.Batch) error { yxCassandra.InsertCassandraMany(batch) return nil } errHandler1 = func(err error, batch *gocql.Batch) { fmt.Println("some error happens") } ) func setWorkers1() { for i := 0; i < Workers1; i++ { go func() { batch := cassandra.CassandraSessionBatchInit() lingerTimer := time.NewTimer(5 * time.Second) if !lingerTimer.Stop() { <-lingerTimer.C } defer lingerTimer.Stop() for { select { case cassandraTable := <-eventQueue1: ul, _ := uuid.NewV4() batch.Query(cassandraTable.cassandraInsertCql, cassandraTable.srcIp, .....表字段, ul.String()) if batch.Size() != BatchWriteSize1 { if batch.Size() == 1 { lingerTimer.Reset(lingerTime1) } break } if err := batchProcessor1(batch); err != nil { errHandler1(err, batch) } if !lingerTimer.Stop() { <-lingerTimer.C } batch = cassandra.CassandraSessionBatchInit() case <-lingerTimer.C: if err := batchProcessor1(batch); err != nil { errHandler1(err, batch) } batch = cassandra.CassandraSessionBatchInit() } } }() } } func PushCassandraElement(e *CassandraTable) { eventQueue1 <- e } func CassandraInit() { eventQueue1 = make(chan *CassandraTable, golangConfig.InitConfigMap["cassandraEventQueueCache"].(int)) BatchWriteSize1 = golangConfig.InitConfigMap["cassandraBatchWriteSize"].(int) Workers1 = golangConfig.InitConfigMap["cassandraWorkers"].(int) lingerTime1 = golangConfig.InitConfigMap["cassandraLingerTime"].(time.Duration) cassandra.CassandraConnectInit() setWorkers1() }
向通道存入值
cassandraTable:=dbwrite.NewCassandraTable(yxCassandra.CassandraInsertCql, packetInfo.srcIp....) dbwrite.PushCassandraElement(cassandraTable)
本文来自:51CTO博客
感谢作者:mb6018e97449ea1
查看原文:go cassandra 示例2
相关阅读 >>
一个微服务 demo 将我三年 Go web 开发经验传授给你
微服务实战Go micro v3 系列(二)- helloworld
更多相关阅读请进入《Go》频道 >>

Go语言101
一个与时俱进的Go编程知识库。