rabbitMq 消息丢失处理机制 Confirm模式 [go 版本]


本文摘自网络,作者,侵删。

此篇文章本作者理解的一知半解,还不够深刻,有时间我再补充,今天有点忙,欢迎大家评论讲解,3Q!!!!

参考文章:https://studygolang.com/artic...

生产者 producer.go
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)

//因:快速实现逻辑,故:不处理错误逻辑
func main() {
    conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost")
    ch, _ := conn.Channel()
    body := "Hello World!! " + time.Now().Format("2006-01-02 15:04:05")
    fmt.Println(body)
    var exchange_name string = "j_exch_head"
    var routing_key string = "jkey"
    var etype string = amqp.ExchangeHeaders

    a := ch.Confirm(false)
    //声明交换器
    ch.ExchangeDeclare(exchange_name, etype, true, false, false, true, nil)

    confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1)) // 处理确认逻辑
    defer confirmOne(confirms) // 处理方法
    ch.Publish(
        exchange_name, // exchange 这里为空则不选择 exchange
        routing_key,   // routing key
        false,         // mandatory
        false,         // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers:     amqp.Table{"x-match": "any", "mail": "470047253@qq.com", "author": "Jhonny"}, // 头部信息 any:匹配一个即可 all:全部匹配
            //Expiration:  "3000", // 设置过期时间
        },
    )

    fmt.Println(a)
    // defer 关键字
    defer conn.Close() // 压栈 后进先出
    defer ch.Close()   // 压栈 后进先出
}
// 消息确认
func confirmOne(confirms <-chan amqp.Confirmation) {
    if confirmed := <-confirms; confirmed.Ack {
        fmt.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
    } else {
        fmt.Printf("confirmed delivery of delivery tag: %d", confirmed.DeliveryTag)
    }
}
消费者 consumer.go
package main

import (
    "github.com/streadway/amqp"
    "log"
)

func main() {
    conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost")
    ch, _ := conn.Channel()
    var exchange_name string = "j_exch_head"
    var routing_key string = "jkey"
    var queue_name string = "j_queue"
    var etype string = amqp.ExchangeHeaders // 头部交换机
    ch.QueueDeclare(queue_name, true, false, true, false, nil)
    ch.QueueBind(
        queue_name,    // queue name
        routing_key,   // routing key: Headers 头部交换机跟routing_key 没关系
        exchange_name, // exchange
        false,
        amqp.Table{"mail": "470047253@qq.com"}, // 头部信息 any:匹配一个即可 all:全部匹配
    )
    //声明交换器
    ch.ExchangeDeclare(exchange_name, etype, true, false, false, false, nil)
    //监听
    msgs, _ := ch.Consume(
        queue_name, // queue name
        "",         // consumer
        true,       // auto-ack 自动确认
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // args
    )
    forever := make(chan bool)
    go func() {
        for d := range msgs {
            //println("tset")
            log.Printf(" [x] %s", d.Body)
        }
    }()
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

本文来自:Segmentfault

感谢作者:forlife

查看原文:rabbitMq 消息丢失处理机制 Confirm模式 [go 版本]

相关阅读 >>

Go2设计草案介绍

Golang最适合做什么

聊聊Golang的ddd项目结构

手撸Golang 行为型设计模式 责任链模式

手撸Golang Go与微服务 grpc

模块二 Go语言进阶技术-错误处理(上)

手撸Golang 结构型设计模式 代理模式

Golang调试工具有哪些?

Golang语言学习之Go语言变量

分享一个Golang http 验证码示例

更多相关阅读请进入《Go》频道 >>




打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

分享从这里开始,精彩与您同在

评论

管理员已关闭评论功能...