有奖捉虫:行业应用 & 管理与支持文档专题 HOT
当消息生产者本地事务处理成功与消息发送成功不一致时,传统的处理方式无法解决该问题,事务消息实现了消息生产者本地事务与消息发送的原子性,保证了消息生产者本地事务处理成功与消息发送成功的最终一致。用户实现类似 X/Open XA 的分布事务功能,通过 CMQ 事务消息能达到分布式事务的最终一致。
说明
该功能目前处于灰度测试阶段,暂时支持广州、上海、孟买地域,其他地域在逐步支持中,如需试用请通过 提交工单 的方式开通白名单。

模块交互图

?
?
其中,事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。
1. 生产者向 MQ 服务端发送消息(为了方便说明,整个 MQ 服务端用 MQ Server 来表示)。
2. MQ Server 将消息持久化成功之后,向生产者 ACK 确认消息已经发送成功,此时消息为半消息(暂不能投递的消息)。
3. 生产者发送消息成功后,开始执行本地事务逻辑。
4. 生产者根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,消费者最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息(直接置消息已消费状态),消费者将不会接受该消息。
5. 在断网或者是生产者应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。生产者根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

发送事务消息

发送事务消息包含以下两个步骤:
1. 发送半消息并执行本地事务
2. 提交本地事务执行状态
其中,提交本地事务执行状态有两种方式:
执行本地事务完成后,SDK 主动提交。
执行本地事务后一直没有提交状态,MQ Server 会主动发送回查,此时 SDK 提交本地事务执行状态。
事务执行状态有以下三种情况:
TransactionStatus.COMMIT 提交事务,消费者可以消费到该消息。
TransactionStatus.ROLLBACK 回滚事务,消息被丢弃,消费者不会消费到该消息。
TransactionStatus.UN_KNOW 无法判断状态,等待 MQ Server 再次发送回查。

发送事务消息示例

package demo;
?
import com.qcloud.cmq.client.common.ClientConfig;
import com.qcloud.cmq.client.common.LogHelper;
import com.qcloud.cmq.client.common.ResponseCode;
import com.qcloud.cmq.client.common.TransactionStatus;
import com.qcloud.cmq.client.consumer.Message;
import com.qcloud.cmq.client.exception.MQClientException;
import com.qcloud.cmq.client.producer.*;
import org.slf4j.Logger;
?
import java.util.ArrayList;
import java.util.List;
?
public class ProducerTransactionDemo {
?
private final static Logger logger = LogHelper.getLog();
?
public static void main(String[] args) {
TransactionProducer producer = new TransactionProducer();
// 设置 Name Server地址,在控制台上获取, 必须设置
producer.setNameServerAddress("http://cmq-nameserver-dev.api.tencentyun.com");
// 设置 SecretId,在控制台上获取,必须设置
producer.setSecretId("AKID******w7D7");
// 设置 SecretKey,在控制台上获取,必须设置
producer.setSecretKey("qV2N******TgiY");
// 设置签名方式,可以不设置,默认为 SHA1
producer.setSignMethod(ClientConfig.SIGN_METHOD_SHA256);
// 设置发送消息失败时,重试的次数,设置为0表示不重试,默认为2
producer.setRetryTimesWhenSendFailed(3);
// 设置请求超时时间, 默认3000ms
producer.setRequestTimeoutMS(5000);
// 设置首次回查的时间间隔,默认5000ms
producer.setFirstCheckInterval(5);
// 消息发往的队列,在控制台创建
String queue = "test_transaction";
// 设置回查的回调函数,检查本地事务的校验结果
producer.setChecker(queue, new TransactionStatusCheckerImpl());
?
?
try{
// 启动对象前必须设置好相关参数
producer.start();
String msg = "test_message";
?
TransactionExecutor executor = new TransactionExecutor() {
@Override
public TransactionStatus execute(String msg, Object arg) {
//执行用户的本地事务 ... ...
System.out.println("do local transaction service!");
return TransactionStatus.COMMIT;
}
};
?
// 同步发送单条事务消息
SendResult result = producer.sendTransactionMessage(queue, msg, executor, "test");
?
if (result.getReturnCode() == ResponseCode.SUCCESS) {
System.out.println("==> send success! msg_id:" + result.getMsgId() + " request_id:" + result.getRequestId());
} else {
System.out.println("==> code:" + result.getReturnCode() + " error:" + result.getErrorMsg());
}
?
}catch (MQClientException e){
e.printStackTrace();
}
?
}
}
?
class TransactionStatusCheckerImpl implements TransactionStatusChecker{
?
@Override
public TransactionStatus checkStatus(Message msg) {
//用户实现,检查本地事务的执行状态 ... ...
return TransactionStatus.COMMIT;
}
}

消费事务消息

消费事务消息与从普通队列或者订阅中消费一致,详情请参考 队列模型消费消息


http://www.vxiaotou.com