Fescar example解析 - TM发送逻辑 素颜马尾好姑娘i 2022-02-13 02:15 106阅读 0赞 ## 开篇 ## 这篇文章的目的主要是理清楚Fescar的TM发送部分的逻辑,从时序图和源码两个层面进行分析。 文章中间会解答两个自己阅读代码中遇到的困惑(**估计大部分人看代码的时候也会遇到这个困惑**),包括**TmRpcClient的初始化过程**和**配置加载过程**。 文章的最后会附上GlobalAction相关Request的类关系图,便于理解依赖关系。 ## Fescar TM发送流程 ## ![TM Sender.jpg][] **说明:** * 1.DefaultGlobalTransaction执行begin/commit/rollback等调用DefaultTransactionManager。 * 2.DefaultTransactionManager内部调用syncCall()方法,进而调用TmRpcClient的sendMsgWithResponse()方法。 * 3.TmRpcClient调用父类AbstractRpcRemoting的sendAsyncRequest()方法构建发送队列。 * 4.AbstractRpcRemotingClient的MergedSendRunnable线程消费发送队列构建MergedWarpMessage调用sendRequest发送。 * 5.sendRequest()方法内部调用writeAndFlush完成消息发送。 ![TmRcpClient][] **说明:** * TmRpcClient的类依赖关系图如上。 * TmRpcClient继承自AbstractRpcRemotingClient类。 ## Fescar TM发送源码分析 ## public class DefaultTransactionManager implements TransactionManager { private static class SingletonHolder { private static final TransactionManager INSTANCE = new DefaultTransactionManager(); } /** * Get transaction manager. * * @return the transaction manager */ public static TransactionManager get() { return SingletonHolder.INSTANCE; } private DefaultTransactionManager() { } @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); return response.getXid(); } @Override public GlobalStatus commit(String xid) throws TransactionException { long txId = XID.getTransactionId(xid); GlobalCommitRequest globalCommit = new GlobalCommitRequest(); globalCommit.setTransactionId(txId); GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit); return response.getGlobalStatus(); } @Override public GlobalStatus rollback(String xid) throws TransactionException { long txId = XID.getTransactionId(xid); GlobalRollbackRequest globalRollback = new GlobalRollbackRequest(); globalRollback.setTransactionId(txId); GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback); return response.getGlobalStatus(); } @Override public GlobalStatus getStatus(String xid) throws TransactionException { long txId = XID.getTransactionId(xid); GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest(); queryGlobalStatus.setTransactionId(txId); GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus); return response.getGlobalStatus(); } private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException { try { return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request); } catch (TimeoutException toe) { throw new TransactionException(TransactionExceptionCode.IO, toe); } } } **说明:** * DefaultTransactionManager的beigin/commit/rollback方法内部最终调用syncCall()方法。 * syncCall方法内部执行TmRpcClient.getInstance().sendMsgWithResponse(request)调用TmRpcClient方法。 public final class TmRpcClient extends AbstractRpcRemotingClient { @Override public Object sendMsgWithResponse(Object msg) throws TimeoutException { return sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout()); } @Override public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout) throws TimeoutException { return sendAsyncRequestWithResponse(serverAddress, connect(serverAddress), msg, timeout); } } **说明:** * TmRpcClient内部执行发送sendMsgWithResponse调用sendAsyncRequestWithResponse。 * sendAsyncRequestWithResponse的实现在父类AbstractRpcRemoting当中。 public abstract class AbstractRpcRemoting extends ChannelDuplexHandler { protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws TimeoutException { if (timeout <= 0) { throw new FrameworkException("timeout should more than 0ms"); } return sendAsyncRequest(address, channel, msg, timeout); } private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout) throws TimeoutException { if (channel == null) { LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel."); return null; } // 构建RpcMessage对象 final RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setId(RpcMessage.getNextMessageId()); rpcMessage.setAsync(false); rpcMessage.setHeartbeat(false); rpcMessage.setRequest(true); rpcMessage.setBody(msg); // 通过MessageFuture包装实现超时 final MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeout); futures.put(rpcMessage.getId(), messageFuture); // 测试代码走的是这个分支 if (address != null) { // 根据address进行hash放置到不同的Map当中 ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap; BlockingQueue<RpcMessage> basket = map.get(address); if (basket == null) { map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>()); basket = map.get(address); } basket.offer(rpcMessage); if (LOGGER.isDebugEnabled()) { LOGGER.debug("offer message: " + rpcMessage.getBody()); } // 发送其实是另外一个线程单独执行发送操作的 if (!isSending) { synchronized (mergeLock) { mergeLock.notifyAll(); } } } else { ChannelFuture future; channelWriteableCheck(channel, msg); future = channel.writeAndFlush(rpcMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { MessageFuture messageFuture = futures.remove(rpcMessage.getId()); if (messageFuture != null) { messageFuture.setResultMessage(future.cause()); } destroyChannel(future.channel()); } } }); } // 通过Future实现限时超时机制 if (timeout > 0) { try { return messageFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (Exception exx) { LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg); if (exx instanceof TimeoutException) { throw (TimeoutException)exx; } else { throw new RuntimeException(exx); } } } else { return null; } } } **说明:** * 构建RpcMessage对象,包装Request。 * 构建MessageFuture对象,包装RpcMessage,实现超时等待功能。 * 通过basket进行分桶操作,真正执行发送的代码在AbstractRpcRemotingClient类的MergedSendRunnable。 * Request的发送类似生成消费者模型,上述代码只是生产者部分。 public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting implements RemotingService, RegisterMsgListener, ClientMessageSender { public class MergedSendRunnable implements Runnable { @Override public void run() { while (true) { synchronized (mergeLock) { try { mergeLock.wait(MAX_MERGE_SEND_MILLS); } catch (InterruptedException e) {} } isSending = true; for (String address : basketMap.keySet()) { BlockingQueue<RpcMessage> basket = basketMap.get(address); if (basket.isEmpty()) { continue; } MergedWarpMessage mergeMessage = new MergedWarpMessage(); while (!basket.isEmpty()) { RpcMessage msg = basket.poll(); mergeMessage.msgs.add((AbstractMessage)msg.getBody()); mergeMessage.msgIds.add(msg.getId()); } if (mergeMessage.msgIds.size() > 1) { printMergeMessageLog(mergeMessage); } Channel sendChannel = connect(address); try { sendRequest(sendChannel, mergeMessage); } catch (FrameworkException e) { if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && address != null) { destroyChannel(address, sendChannel); } LOGGER.error("", "client merge call failed", e); } } isSending = false; } } } **说明:** * MergedSendRunnable 负责消费待发送消息体并组装成MergedWarpMessage对象。 * sendRequest()方法内部将MergedWarpMessage再次包装成RpcMessage进行发送。 public abstract class AbstractRpcRemoting extends ChannelDuplexHandler { protected void sendRequest(Channel channel, Object msg) { RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setAsync(true); rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage); rpcMessage.setRequest(true); rpcMessage.setBody(msg); rpcMessage.setId(RpcMessage.getNextMessageId()); if (msg instanceof MergeMessage) { mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg); } channelWriteableCheck(channel, msg); if (LOGGER.isDebugEnabled()) { LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen()); } channel.writeAndFlush(rpcMessage); } } **说明:** * RpcMessage再次包装MergeMessage进行发送。 ## TmRpcClient初始化 ## public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean { public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode, FailureHandler failureHandlerHook) { setOrder(ORDER_NUM); setProxyTargetClass(true); this.applicationId = applicationId; this.txServiceGroup = txServiceGroup; this.mode = mode; this.failureHandlerHook = failureHandlerHook; } private void initClient() { TMClient.init(applicationId, txServiceGroup); if ((AT_MODE & mode) > 0) { RMClientAT.init(applicationId, txServiceGroup); } } public void afterPropertiesSet() { initClient(); } } **说明:** * GlobalTransactionScanner的构造函数执行后执行afterPropertiesSet并执行initClient()操作。 * initClient()内部执行TMClient.init(applicationId, txServiceGroup)进行TMClient的初始化。 public class TMClient { public static void init(String applicationId, String transactionServiceGroup) { TmRpcClient tmRpcClient = TmRpcClient.getInstance( applicationId, transactionServiceGroup); tmRpcClient.init(); } } public final class TmRpcClient extends AbstractRpcRemotingClient { public void init() { if (initialized.compareAndSet(false, true)) { init(SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS); } } public void init(long healthCheckDelay, long healthCheckPeriod) { // 注意initVars()方法 initVars(); ExecutorService mergeSendExecutorService = new ThreadPoolExecutor( MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX), MAX_MERGE_SEND_THREAD)); mergeSendExecutorService.submit(new MergedSendRunnable()); timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { reconnect(); } catch (Exception ignore) { LOGGER.error(ignore.getMessage()); } } }, healthCheckDelay, healthCheckPeriod, TimeUnit.SECONDS); } private void initVars() { enableDegrade = CONFIG.getBoolean( ConfigurationKeys.SERVICE_PREFIX + ConfigurationKeys.ENABLE_DEGRADE_POSTFIX); super.init(); } } **说明:** * 核心在于关注initVars()方法。 public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting implements RemotingService, RegisterMsgListener, ClientMessageSender { public void init() { NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this); // 核心构建发送的对象的连接池 nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory); nettyClientKeyPool.setConfig(getNettyPoolConfig()); serviceManager = new ServiceManagerStaticConfigImpl(); super.init(); } } public abstract class AbstractRpcRemoting extends ChannelDuplexHandler { public void init() { timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size()); for (MessageFuture future : futures.values()) { if (future.isTimeout()) { timeoutMessageFutures.add(future); } } for (MessageFuture messageFuture : timeoutMessageFutures) { futures.remove(messageFuture.getRequestMessage().getId()); messageFuture.setResultMessage(null); if (LOGGER.isDebugEnabled()) { LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody()); } } nowMills = System.currentTimeMillis(); } }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS); } } **说明:** * AbstractRpcRemotingClient的init()方法核心构建nettyClientKeyPool工厂。 * nettyClientKeyPool用于获取连接TC的对象的工厂池。 ## 配置加载分析 ## ![Config][] public class FileConfiguration implements Configuration { private static final Logger LOGGER = LoggerFactory.getLogger(FileConfiguration.class); private static final Config CONFIG = ConfigFactory.load(); } package com.typesafe.config; public final class ConfigFactory { private ConfigFactory() { } public static Config load() { return load(ConfigParseOptions.defaults()); } } **说明:** * 配置加载使用了[JAVA 配置管理库 typesafe.config][JAVA _ typesafe.config] * 默认加载classpath下的application.conf,application.json和application.properties文件。通过ConfigFactory.load()加载。 ## Request的类关系图 ## ![GlobalActionRequest.png][] [TM Sender.jpg]: /images/20220213/9c0c498bb0b949e59d0523b485dad8e4.png [TmRcpClient]: /images/20220213/122cbd096bdb4ae19e137050b60a37b0.png [Config]: /images/20220213/b6a4bdbd4f804566bab4f11fc4482b2f.png [JAVA _ typesafe.config]: https://blog.csdn.net/u010039929/article/details/76215687 [GlobalActionRequest.png]: /images/20220213/d40b4e13117b429197c1eb2814a9ab70.png
还没有评论,来说两句吧...