有奖捉虫:行业应用 & 管理与支持文档专题 HOT

操作场景

本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

步骤1:安装 Java 依赖库

在 pom.xml 添加以下依赖:
<!-- in your <dependencies> block -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.0</version>
</dependency>

步骤2:生产消息

编译并运行 MessageProducer.java。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.tencent.tdmq.demo.cloud.Constant;
?
/**
?
* 消息生产者
*/
public class MessageProducer {
?
/**
?
* 交换机名称
*/
private static final String EXCHANGE_NAME = "exchange_name";
?
public static void main(String[] args) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址 (完整复制控制台接入点地址)
factory.setUri("amqp://***");
// 设置Virtual Hosts (开源 RabbitMQ 控制台复制完整Vhost名称)
factory.setVirtualHost(VHOST_NAME);
// 设置用户名 (开源 RabbitMQ 控制台中Vhost的配置权限中的user名称)
factory.setUsername(USERNAME);
// 设置密码 (对应user的密钥)
factory.setPassword("****");
// 获取连接、建立通道
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
// 绑定消息交换机 (EXCHANGE_NAME必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (int i = 0; i < 10; i++) {
String message = "this is rabbitmq message " + i;
// 发布消息到交换机,交换机自动将消息投递到相应队列
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [producer] Sent '" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
参数
说明
EXCHANGE_NAME
Exchange 名称,在控制台 Exchange 列表获取。
factory.setUri
集群接入地址,在集群基本信息页面的客户端接入模块获取。
?
?
?
factory.setVirtualHost
Vhost 名称,在控制台 Vhost 列表获取。
factory.setUsername
用户名称,填写在控制台创建的用户名称。
factory.setPassword
用户密码,填写在控制台创建用户时填写的密码。

步骤3:消费消息

编译并运行 MessageConsumer.java。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.tencent.tdmq.demo.cloud.Constant;
?
import java.io.IOException;
import java.nio.charset.StandardCharsets;
?
/**
?
* 消息消费者
*/
public class MessageConsumer1 {
?
/**
?
* 队列名称
*/
public static final String QUEUE_NAME = "queue_name";
?
/**
?
* 交换机名称
*/
private static final String EXCHANGE_NAME = "exchange_name";
?
public static void main(String[] args) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址 (完整复制控制台接入点地址)
factory.setUri("amqp://***");
// 设置Virtual Hosts (开源 RabbitMQ 控制台中复制完整Vhost名称)
factory.setVirtualHost(VHOST_NAME);
// 设置用户名 (开源 RabbitMQ 控制台中Vhost的配置权限中的user名称)
factory.setUsername(USERNAME);
// 设置密码 (对应user的密钥)
factory.setPassword("****");
// 获取连接
Connection connection = factory.newConnection();
// 建立通道
Channel channel = connection.createChannel();
// 绑定消息交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 声明队列信息
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定消息交换机 (EXCHANGE_NAME必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println(" [Consumer1] Waiting for messages.");
// 订阅消息
channel.basicConsume(QUEUE_NAME, false, "ConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//接收到的消息,进行业务逻辑处理。
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
?
参数
说明
QUEUE_NAME
Queue 名称,在控制台 Queue 列表获取。
EXCHANGE_NAME
Exchange 名称,在控制台 Exchange 列表获取。
factory.setUri
集群接入地址,在集群基本信息页面的客户端接入模块获取。
?
?
?
factory.setVirtualHost
Vhost 名称,在控制台 Vhost 列表获取。
factory.setUsername
用户名称,填写在控制台创建的用户名称。
factory.setPassword
用户密码,填写在控制台创建用户时填写的密码。
?


http://www.vxiaotou.com