RocketMq源码刨析
想必大家都比较熟悉RocketMQ,阿里开源消息队列项目。对于队列来说可以直接强势得理解成,处理并非、分布式事务得敌虫。
RocketMq4.3版本 支持分布式事物 案例入口【org.apache.rocketmq.example.transaction.TransactionProducer】 TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name" ); ExecutorService executorService = new ThreadPoolExecutor(2 , 5 , 100 , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000 ), new ThreadFactory() { @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread" ); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start();
案例入口【org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendMessageInTransaction】 public TransactionSendResult sendMessageInTransaction (final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null" , null ); } Validators.checkMessage(msg, this .defaultMQProducer); SendResult sendResult = null ; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true" ); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this .defaultMQProducer.getProducerGroup()); try { sendResult = this .send(msg); } catch (Exception e) { throw new MQClientException("send message Exception" , e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null ; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null ) { msg.putUserProperty("__transactionId__" , sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"" .equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null ) { log.debug("Used new transaction API" ); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}" , localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception" , e); log.info(msg.toString()); localException = e; } } break ; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break ; default : break ; } try { this .endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed" , e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; }
总要节点
获取之前注册得TransactionListener本地事务回调组件:TransactionListener transactionListener = getCheckListener();
验证消息: Validators.checkMessage(msg, this.defaultMQProducer);
发送消息: sendResult = this.send(msg);
获取发送消息回调结果:switch (sendResult.getSendStatus())
如果开启事务transactionListener,执行本地事务:localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
根据本地事务执行的结果去发送commit消息或者rollback消息:this.endTransaction(sendResult, localTransactionState, localException);
本地事务逻辑 案例入口【org.apache.rocketmq.example.transaction.executeLocalTransaction】 @Override public LocalTransactionState executeLocalTransaction (Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3 ; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; }
Netty线程检查事务状态 案例入口【org.apache.rocketmq.example.transaction.checkLocalTransaction】 @Override public LocalTransactionState checkLocalTransaction (MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0 : return LocalTransactionState.UNKNOW; case 1 : return LocalTransactionState.COMMIT_MESSAGE; case 2 : return LocalTransactionState.ROLLBACK_MESSAGE; default : return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; }
业务场景源码正在创作..