有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

操作场景

本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
说明
以 Java 客户端为例说明,其他语言客户端请参见 SDK 文档

前提条件

操作步骤

步骤1:安装 Java 依赖库

在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
</dependencies>

步骤2. 生产消息

import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.java.example.AsyncProducerExample; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
?
public class NormalMessageSyncProducer {
private static final Logger log = LoggerFactory.getLogger(NormalMessageSyncProducer.class);
?
private NormalMessageSyncProducer() {
}
?
public static void main(String[] args) throws ClientException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
?
// 添加配置的ak和sk
String accessKey = "yourAccessKey"; //ak
String secretKey = "yourSecretKey"; //sk
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
?
// 填写腾讯云提供的接入地址
String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String topic = "yourNormalTopic";
// 通常在一个客户端内无需创建过多的生产者。
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置主题名,此处的设置非必须,但是推荐设置,以便生产者可以在正式发送消息前,预先抓取消息路由。
.setTopics(topic)
// 如生产者未初始化可能会报 M {@link ClientException} 的错误。
.build();
// 此处定义消息主体。
byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
// Set topic for the current message.
.setTopic(topic)
// 在 topic 下进行的消息二级分类,区别同一个主题内不同的消息。
.setTag(tag)
// 消息键,除消息 ID 外可以区别不同消息的其他途径。
.setKeys("yourMessageKey-1c151062f96e")
.setBody(body)
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
}
// 发送完成后,如无别的需要可以关闭生产者客户端。
producer.close();
}
}

步骤3. 消费消息

腾讯云消息队列 TDMQ RocketMQ 版 5.x 系列支持两种消费模式,分别为 Push Consumer 和 Simple Consumer,以下代码示例以 Push Consumer 为例。
import java.io.IOException; import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
?
public class NormalPushConsumer {
private static final Logger log = LoggerFactory.getLogger(NormalPushConsumer.class);
?
private NormalPushConsumer() {
}
?
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
?
// 添加配置的 ak 和 sk
String accessKey = "yourAccessKey"; //ak
String secretKey = "yourSecretKey"; //sk
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
?
// 填写腾讯云提供的接入地址
String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
// 通常在一个客户端内无需创建过多的消费者。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者组名称。
.setConsumerGroup(consumerGroup)
// 设置消费者订阅名称
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// 处理消息并返回消息消费结果。
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
// 生产环境无需阻塞主线程。
Thread.sleep(Long.MAX_VALUE);
// 消费完成后,如无别的需要可以关闭消费者客户端。
pushConsumer.close();
}
}

步骤4. 查看消息详情

发送完成消息后会得到一个消息 ID (messageID),开发者可以在 “消息查询” 页面查询刚刚发送的消息,如下图所示;并且可以查看特定消息的详情和轨迹等信息,详情见 消息查询 章节。
?
?
?


http://www.vxiaotou.com