当消息生产者本地事务处理成功与消息发送成功不一致时,传统的处理方式无法解决该问题,事务消息实现了消息生产者本地事务与消息发送的原子性,保证了消息生产者本地事务处理成功与消息发送成功的最终一致。用户实现类似 X/Open XA 的分布事务功能,通过 CMQ 事务消息能达到分布式事务的最终一致。
说明
模块交互图
?
?
其中,事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。1. 生产者向 MQ 服务端发送消息(为了方便说明,整个 MQ 服务端用 MQ Server 来表示)。
2. MQ Server 将消息持久化成功之后,向生产者 ACK 确认消息已经发送成功,此时消息为半消息(暂不能投递的消息)。
3. 生产者发送消息成功后,开始执行本地事务逻辑。
4. 生产者根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,消费者最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息(直接置消息已消费状态),消费者将不会接受该消息。
5. 在断网或者是生产者应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。生产者根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
发送事务消息
发送事务消息包含以下两个步骤:
1. 发送半消息并执行本地事务
2. 提交本地事务执行状态
其中,提交本地事务执行状态有两种方式:
执行本地事务完成后,SDK 主动提交。
执行本地事务后一直没有提交状态,MQ Server 会主动发送回查,此时 SDK 提交本地事务执行状态。
事务执行状态有以下三种情况:
TransactionStatus.COMMIT 提交事务,消费者可以消费到该消息。
TransactionStatus.ROLLBACK 回滚事务,消息被丢弃,消费者不会消费到该消息。
TransactionStatus.UN_KNOW 无法判断状态,等待 MQ Server 再次发送回查。
发送事务消息示例
package demo;?import com.qcloud.cmq.client.common.ClientConfig;import com.qcloud.cmq.client.common.LogHelper;import com.qcloud.cmq.client.common.ResponseCode;import com.qcloud.cmq.client.common.TransactionStatus;import com.qcloud.cmq.client.consumer.Message;import com.qcloud.cmq.client.exception.MQClientException;import com.qcloud.cmq.client.producer.*;import org.slf4j.Logger;?import java.util.ArrayList;import java.util.List;?public class ProducerTransactionDemo {?private final static Logger logger = LogHelper.getLog();?public static void main(String[] args) {TransactionProducer producer = new TransactionProducer();// 设置 Name Server地址,在控制台上获取, 必须设置producer.setNameServerAddress("http://cmq-nameserver-dev.api.tencentyun.com");// 设置 SecretId,在控制台上获取,必须设置producer.setSecretId("AKID******w7D7");// 设置 SecretKey,在控制台上获取,必须设置producer.setSecretKey("qV2N******TgiY");// 设置签名方式,可以不设置,默认为 SHA1producer.setSignMethod(ClientConfig.SIGN_METHOD_SHA256);// 设置发送消息失败时,重试的次数,设置为0表示不重试,默认为2producer.setRetryTimesWhenSendFailed(3);// 设置请求超时时间, 默认3000msproducer.setRequestTimeoutMS(5000);// 设置首次回查的时间间隔,默认5000msproducer.setFirstCheckInterval(5);// 消息发往的队列,在控制台创建String queue = "test_transaction";// 设置回查的回调函数,检查本地事务的校验结果producer.setChecker(queue, new TransactionStatusCheckerImpl());??try{// 启动对象前必须设置好相关参数producer.start();String msg = "test_message";?TransactionExecutor executor = new TransactionExecutor() {@Overridepublic TransactionStatus execute(String msg, Object arg) {//执行用户的本地事务 ... ...System.out.println("do local transaction service!");return TransactionStatus.COMMIT;}};?// 同步发送单条事务消息SendResult result = producer.sendTransactionMessage(queue, msg, executor, "test");?if (result.getReturnCode() == ResponseCode.SUCCESS) {System.out.println("==> send success! msg_id:" + result.getMsgId() + " request_id:" + result.getRequestId());} else {System.out.println("==> code:" + result.getReturnCode() + " error:" + result.getErrorMsg());}?}catch (MQClientException e){e.printStackTrace();}?}}?class TransactionStatusCheckerImpl implements TransactionStatusChecker{?@Overridepublic TransactionStatus checkStatus(Message msg) {//用户实现,检查本地事务的执行状态 ... ...return TransactionStatus.COMMIT;}}