RocketMq源码刨析之分布式事务

RocketMq源码刨析

想必大家都比较熟悉RocketMQ,阿里开源消息队列项目。对于队列来说可以直接强势得理解成,处理并非、分布式事务得敌虫。

[源码地址]: https://github.com/apache/rocketmq

RocketMq4.3版本 支持分布式事物

案例入口【org.apache.rocketmq.example.transaction.TransactionProducer】

//实现监听
TransactionListener transactionListener = new TransactionListenerImpl();
//生产者本地初始化
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置线程池
producer.setExecutorService(executorService);
//设置生产者本地事务得回调组件
producer.setTransactionListener(transactionListener);
//开启消息处理
producer.start();

案例入口【org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendMessageInTransaction】

public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//获取之前注册得TransactionListener本地事务回调组件
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
//验证消息
Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//发送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;

//获取发送消息回调结果
switch (sendResult.getSendStatus()) {
case SEND_OK: {//发送成功
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}

//开启了本地事务回调组件才会进行回调处理
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

try {
//根据本地事务执行的结果去发送commit消息或者rollback消息
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

总要节点

  • 获取之前注册得TransactionListener本地事务回调组件:TransactionListener transactionListener = getCheckListener();
  • 验证消息: Validators.checkMessage(msg, this.defaultMQProducer);
  • 发送消息: sendResult = this.send(msg);
  • 获取发送消息回调结果:switch (sendResult.getSendStatus())
  • 如果开启事务transactionListener,执行本地事务:localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
  • 根据本地事务执行的结果去发送commit消息或者rollback消息:this.endTransaction(sendResult, localTransactionState, localException);

本地事务逻辑

案例入口【org.apache.rocketmq.example.transaction.executeLocalTransaction】
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

//这里会执行本地业务逻辑,此处省略...
//返回本地事物的执行结果(UNKNOW、commit、rollback)
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}

Netty线程检查事务状态

案例入口【org.apache.rocketmq.example.transaction.checkLocalTransaction】
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//实现本地事务处理得结果逻辑
//TODO 业务数据
//比如本地业务想表A插入数据,那么此处可以去表A查询数据是否存在,就可以指导本地事务是否成功
//根据本地事务响应得到结果,返回不同得状态。
//本地事物执行成功返回COMMIT_MESSAGE,反之失败返回ROLLBACK_MESSAGE
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}

业务场景源码正在创作..

文章作者: 陈 武
文章链接: http://www.updatecg.xin/2020/08/14/RocketMq/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 我的学习记录
打赏
  • 微信
  • 支付寶

评论