当前位置: 首页 > news >正文

Rabbitmq 搭建使用案例 [附源码]

Rabbitmq 搭建使用案例

文章目录

    • RabbitMQ搭建
      • docker
    • 代码
      • golang
        • 生产者
        • 消费者
    • 可视化
      • 消费进度

RabbitMQ搭建

docker

docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_MANAGEMENT_PLUGIN=rabbitmq_management_agent -p 15672:15672 -p 5672:5672 rabbitmq:management

代码

golang

生产者
package main

import (
	"flag"
	"fmt"
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"strconv"
	"time"
)

func main() {
	var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
	var exchange = flag.String("exchange", "logs", "Exchange name")
	var key = flag.String("key", "log", "Routing key")

	flag.Parse()

	// 连接到RabbitMQ服务器
	conn, err := amqp.Dial(*url)
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 声明一个交换机
	err = ch.ExchangeDeclare(
		*exchange, // name: 交换机名称
		"fanout",  // kind: 交换机类型
		true,      // durable: 是否持久化
		false,     // autoDelete: 没有队列绑定时是否自动删除
		false,     // internal: 是否是内部交换机
		false,     // noWait: 是否需要等待服务器响应
		nil,       // args: 其他参数
	)
	if err != nil {
		log.Fatalf("Failed to declare an exchange: %v", err)
	}

	// 发送消息
	body := "Hello World!" + fmt.Sprintf(time.Now().String())

	for i := 0; i < 20; i++ {
		body = strconv.Itoa(i) + body
		err = ch.Publish(
			*exchange, // 交换机名称
			*key,      // 路由键
			false,     // 强制发布
			false,     // 立即发布
			amqp.Publishing{
				ContentType:  "text/plain",
				DeliveryMode: amqp.Persistent,
				Body:         []byte(body),
				Expiration:   "10000", // 3000 3秒
			})
	}

	if err != nil {
		log.Fatalf("Failed to publish a message: %v", err)
	}

	fmt.Printf(" [x] Sent %s", body)
}

消费者
package main

import (
	"flag"
	"fmt"
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
	var exchange = flag.String("exchange", "logs", "Exchange name")
	var key = flag.String("key", "log", "Routing key")

	flag.Parse()

	// 连接到RabbitMQ服务器
	conn, err := amqp.Dial(*url)
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	// 声明一个交换机
	err = ch.ExchangeDeclare(
		*exchange, // name: 交换机名称
		"fanout",  // kind: 交换机类型
		true,      // durable: 是否持久化
		false,     // autoDelete: 没有队列绑定时是否自动删除
		false,     // internal: 是否是内部交换机
		false,     // noWait: 是否需要等待服务器响应
		nil,       // args: 其他参数
	)
	if err != nil {
		log.Fatalf("Failed to declare an exchange: %v", err)
	}

	// 声明一个队列
	q, err := ch.QueueDeclare(
		"queue01", // 随机生成队列名称
		true,      // 持久化
		false,     // 删除
		false,     // 独占
		false,     // 不等消息
		nil,       // 其他参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定队列到交换机
	err = ch.QueueBind(
		q.Name,    // 队列名称
		*key,      // 路由键
		*exchange, // 交换机名称
		false,     // 现在绑定
		nil,       // 其他参数
	)
	if err != nil {
		log.Fatalf("Failed to bind a queue: %v", err)
	}

	// 接收消息
	msgs, err := ch.Consume(
		q.Name,       // 队列名称
		"consumer01", // 消费者标签
		false,        // 自动ack
		false,        // 不独占
		false,        // 不等消息
		false,        // 不从服务器获取消息
		nil,          // 其他参数
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer: %v", err)
	}

	fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
	for d := range msgs {
		// 输出接收到的消息
		fmt.Printf(" [x] Received %s\n", d.Body)
		err = ch.Ack(d.DeliveryTag, true)
		if err != nil {
			log.Fatalf("Failed to ack message: %v", err)
		}
	}
}

可视化

看板

http://localhost:15672/

账户密码

admin
admin

在这里插入图片描述

消费进度

http://localhost:15672/#/queues

在这里插入图片描述

相关文章:

  • C语言面试题1-10
  • 油烟净化器清新餐饮生活,助力打造绿色餐饮
  • 【渗透测试】|文件上传
  • 动态规划part03 Day43
  • 网络请求客户端WebClient的使用
  • 【Linux】Socket中的心跳机制(心跳包)
  • C语言数据结构堆排序、向上调整和向下调整的时间复杂度的计算、TopK问题等的介绍
  • redis数据类型之Hash,Bitmaps
  • P3128 [USACO15DEC] Max Flow P题解(树上差分,最近公共祖先,图论)
  • HDR视频相关标准-HDR vivid(二)
  • WordPress外贸网站建设的成功要素与技术点
  • 摩尔线程MTT S4000 AI GPU助力30亿参数大模型训练,性能比肩英伟达同类解决方案
  • Go语言GoFly框架快速新增接口/上手写代码
  • 23种设计模式之一— — — —装饰模式详细介绍与讲解
  • Java final关键字
  • AI在肿瘤学临床决策中的应用:一种多模态方法
  • Web3 知识体系架构图
  • Unix环境高级编程--8-进程控制---8.1-8.2进程标识-8.3fork函数-8.4 vfork函数
  • 【Linux】使用AddressSanitizer分析内存非法使用问题
  • docker不删除容器更改其挂载目录
  • 全球前瞻|王毅赴巴西出席金砖外长会,加拿大迎来“几十年来最重要大选”
  • 广州海关原党委委员、副关长刘小威被开除党籍
  • 中国海警局新闻发言人就菲律宾非法登临铁线礁发表谈话
  • 伊朗南部港口火势蔓延,部分集装箱再次发生爆炸
  • 释新闻|SEVIS是什么?在美留学生遭身份中止意味什么?
  • 韩国检方重启调查金建希操纵股价案