ActiveMQ中的NetworkConnector(网络连接器)详解 逃离我推掉我的手 2022-06-12 05:08 410阅读 0赞 注:本文以ActiveMQ5.10版本为基础。 我们知道,ActiveMQ中的**TransportConnector**(传输连接器)主要用于配置ActiveMQ服务端和客户端之间的通信方式,**NetworkConnector**(网络连接器)则主要用来配置ActiveMQ服务端与服务端之间的通信。在某些场景,网络拓扑中我们可能会需要大量的生产者和消费者,也就是说我们会有大量的ActiveMQ客户端,这样,我们就需要在网络环境中有多个MQ服务器,服务器之间是可以透传消息的,这是,我们就需要用到NetworkConnector,我们先从简单的分析起,假设我们需要部署两台MQ服务器,如下图: ![10190bfe-30d7-3021-8bbe-9d7882530083.png][] ** 使用NetworkConnector的简单场景** 如上图所示,服务器S1和S2通过NewworkConnector相连,则生产者P1发送消息,消费者C3和C4都可以接收到,而生产者P3发送的消息,消费者C1和C2同样也可以接收到,要使用NewworkConnector的功能,需要在服务器S1的activemq.xml中的broker节点下添加如下配置(注:10.79.11.172:61617为S2的地址): **`<networkConnectors> <networkConnector uri="`** `static:(tcp://10.79.11.172:61617)` **`"/> </networkConnectors>`** 如果只是这样,S1可以将消息发送到S2,即S2对S1来说仿佛就是S1的消费者,但此时通信只是单向的,发送到S2上的的消息是不能发送到S1上的如果要想S1也收到从S2发来的消息,需要在S2的activemq.xml中的broker节点下也添加如下配置(注:10.79.11.171:61617为S1的地址): **`<networkConnectors> <networkConnector uri="`** `static:(tcp://10.79.11.171` `:61617)` **`"/> </networkConnectors>`** 这样,S1和S2就可以双向通信了。这时,我们可以访问S1和S2的控制台页面(S1默认是 [http://10.79.11.171:8161][http_10.79.11.171_8161])中的“Network”来看桥接的结果。如果有多个,可以用逗号隔开,如下: **`<networkConnectors> <networkConnector uri="`** `static:(tcp://10.79.11.171` `:61617,tcp://10.79.11.173:61617,tcp://10.79.11.174:61617)` **`"/> </networkConnectors>`** ** ** 静态连接器有一些常用的属性,如下: **【initialReconnectDelay】** : 默认值1000,每次重连的间隔时间,单位为毫秒,当然得useExponentialBackOff=false才有效。 【**maxReconnectDelay**】:默认值30000,最大重连间隔,单位为毫秒。 【**useExponentialBackOff**】:默认值为true,是否不断增加重连时间间隔。 【**backOffMultiplier**】:默认值为2,使用useExponentialBackOff特性的次数。 例如:uri="static:(tcp://host1:61616,tcp://host2:61616)?maxReconnectDelay=5000&useExponentialBackOff=false" 有人会想,如果只有两三台MQ服务器,我们可以像上面那样简单配置,如果MQ服务器非常多,我们就不太可能在每个MQ服务器的xml文件中去一个个配置其它服务器地址了,所以,我们希望MQ服务器能在自己的网络中发现其他的MQ服务器并自动建立“桥接”,幸运的是,ActiveMQ提供了多播发现(Multicast Discovery)的功能,我们可以在每台MQ服务器的activemq.xml中的broker节点下添加如下配置: **<networkConnectors> <networkConnector uri="**multicast://default **"/> </networkConnectors>** 这样,这些在网络中的MQ服务器,在启动和运行中,将不断的发现其他新的MQ服务器了,并和他们创建连接, **注意,如果新的MQ服务器中没有配置networkConnectors,那么这种连接就是单向的,而不是双向。** ** **当然,如果NetworkConnector的功能就只有如此,那本文也就没有继续下去的必要了,接下来,让我们看看NetworkConnector常用的几个属性配置。个人认为,结合源码看属性,才能理解的更深刻和贴切,因为需要通过好几段话来解释一个功能,还不如直接给读者几行源码(非程序猿除外)来的深刻。 **name**:默认值是bridge,这无需过多说明,可以给每个MQ服务器起个属于自己的名字,看MQ日志也方便。 **dynamicOnly**:默认为false,官方文档是这样说的,默认的时候,也就是该值为false的时候,网络中的持久化订阅者是在启动的时候被激活的,如果改值为true,则对应的持久化订阅者被激活时,才激活一个网络的持久化订阅者,笔者看了下源码,调试过,但不了解该值配置true或false带来的实际上的效果是什么,有兴趣的读者可以去看一下DurableConduitBridge\#setupStaticDestinations中的源码,笔者弄明白后会后续补充的。 **decreaseNetworkConsumerPriority**:默认值为false,如果该值为false,网络上的消费者和本地的消费者有相同的优先级,就是说一个消息发送到本地的消费者和其他的网络消费者有相同的概率。如果该值为true,则其他网络消费者的优先级从-5开始算起,根据网络跳数(network hops)来算优先级,当然是越“远”优先级越低。 **networkTTL**:默认值为1,该值表示消息或订阅可以通过的消息提供者(brokers)的数量。拿上图来说,假设P1向S1发了条消息,另外有个ActiveMQ服务器S3(上图未画出)与S2相互桥接,但不直接与S1桥接,因为该值为1,即使S3上有该队列的消费者或主题的订阅者,但对于S1来说S3这些网络消费者和订阅者是不可见的,所以P1发的这条消息就不能被S3上的消费者给消费,如果想要让S3上的消费者也有机会消费,可以让S3和S1桥接,或者将S1的networkTTL参数配置为2. **messageTTL**:默认为1,从5.9版本开始有效,表示消息能通过的消息提供者数量,该值可以覆盖掉networkTTL中对消息通过数量的限制。 **consumerTTL**:默认为1,从5.9版本开始有效,表示订阅能通过的消息提供者数量,该值可以覆盖掉networkTTL中对订阅通过数量的限制。 **conduitSubscriptions**:默认为true。该值表示多个消费者订阅一个相同的目的地是否在网络上被对待为一个消费者。以上图为例,如果采用默认的分配策略,P1连续发120条消息,结果应该是C1和C2都收到40条,而C3和C4共收到40条(C3和C4平分这40条),为什么不是C1、C2、C3和C4都收到大概30条呢?因为该值为true,C3和C4对于S1来说只是一个消费者,而不是两个。注意,如果你的多个网络提供者中的消费者使用了消息选择器(selector),你又设置了conduitSubscriptions为true,则有可能会导致消息不会被发送到正确的网络消费者从而消息不被消费,因为网络消费者中的消息选择器对生产者的提供者来说是透明的,所以此种情况下,请将conduitSubscriptions设置为false。 **excludedDestinations**:默认为空,用来配置不会在网络中转发的目的地(Destination)。 **dynamicallyIncludedDestinations**:默认为空,用来配置可以在网络中转发的目的地,和上面的excludedDestinations的功能相反。例如: <networkConnector uri="static:(tcp://host)"> <**dynamicallyIncludedDestinations**> <queue physicalName="queue1"/> <topic physicalName="topic1"/> </**dynamicallyIncludedDestinations**> </networkConnector> **staticallyIncludedDestinations**:默认为空,用来配置可以在网络中转发的目的地,但它和dynamicallyIncludedDestinations不同的是dynamicallyIncludedDestinations只在对方有该目的地的消费者时才将消息转发给对方,staticallyIncludedDestinations则不管对方有没有该目的地都将消息转发给对方。 举例配置如下: <networkConnectors> <networkConnector uri="static:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61618)" > <**staticallyIncludedDestinations**> <queue physicalName="TopicTestQueue1"/> </**staticallyIncludedDestinations**> </networkConnector> </networkConnectors> **duplex**:默认值为false,默认情况下,在两个提供者之间的连接上的消息流动方向是单向(单工连接),如果该值设置为true,则一个连接上可以双向流动消息(双工连接)。可以想象,单工连接比双工连接吞吐量要高,但增加了连接数量方面的开销。如果在使用双工连接时为了增加吞吐量,可以建立多个双工连接,但每个连接必须起不同的名字,例如: <networkConnectors> <networkConnector **name="SYSTEM1"** duplex="true" uri="static:(tcp://10.79.6.56:61616)"> <dynamicallyIncludedDestinations> <topic physicalName="outgoing.System1" /> </dynamicallyIncludedDestinations> </networkConnector> <networkConnector **name="SYSTEM2"** duplex="true" uri="static:(tcp://10.79.6.56:61616)"> <dynamicallyIncludedDestinations> <topic physicalName="outgoing.System2"/> </dynamicallyIncludedDestinations> </networkConnector> </networkConnectors> **prefetchSize**:默认是1000,指网络连接中的消费者的预读数量,该值必须大于0,因为网络消费者不能采用poll来获取消息(轮询消息)。 **suppressDuplicateQueueSubscriptions**:(从5.3版本开始有效)默认为false,如果该参数为true,会防止提供者之间相互桥接时的队列重复订阅的问题,其实该参数为true和false只是ActiveMQ代码中的微妙的性能变化,对实际的集群结果并不会带来什么影响。 **bridgeTempDestinations**:默认值为true,该值用来指定在MQ服务器在网络中创建临时目的地(temp destinations)时是否需要广播公告消息(broadcast advisory messages )。如果设置为false,表示禁用此功能,则当生产者和消费者不是连接到同一个MQ服务器时,可以减少网络开销。 **alwaysSyncSend**:(从5.6版本开始有效)默认为false,表示是否总是异步发送。默认时,非持久化消息被发送到远程的MQ服务器时将采用请求/应答方式而不是oneway方式。此参数对待持久化和非持久化消息相同。 **staticBridge**:(从5.6版本开始有效)默认为false,如果设置为true,MQ服务器将不会动态应答新的消费者,也意味着该提供者将对其他远程提供者的主题不感兴趣。当设置为true是,我们可以使用staticallyIncludedDestinations来创建配置需要订阅的主题,例如: <networkConnector uri="static:(tcp://host)" **staticBridge**="true"> <**staticallyIncludedDestinations**> <queue physicalName="always.include.queue"/> </**staticallyIncludedDestinations**> </networkConnector> 消息如果一开始就是持久化的化的或者是被持久订阅的,则在网络经纪人中传播时也会保持这种特性,然而,如果消息一开始就不是持久化的或者不是持久订阅的,则在网络的MQ服务器中传播时也一定不会有这些特性。 总的消息顺序在网络经纪人中是不保证的,当然,即使不是网络经纪人,在单个经纪人环境下,有多个消费者时,消息被消费的顺序也是不保证的。 如果你希望提供者不受任何远程提供者上消费者的影响,或者希望不管远程提供者上是否有消费者都将所有消息转发到远程提供者,那么你可以考虑使用静态网络(static networks)。 就如前面你说看到的,如果你想增加吞吐量,或者想采用不同的通信方式(如tcp或者nio),又或者你想采用更灵活的配置,两个提供者之间可以由多个NetworkConnector相连, 例如,如果使用分布式队列(distributed queues),你可能希望在通过网络对队列接收者能采用等效加权(equivalent weighting),但只针对活跃的接收者,你可以如下配置: <networkConnectors> <networkConnector uri="static:(tcp://localhost:61617)" name="queues\_only" conduitSubscriptions="false" decreaseNetworkConsumerPriority="false"> <excludedDestinations> <topic physicalName=">"/> </excludedDestinations> </networkConnector> </networkConnectors> 这里你需要注意, **你只能在excludedDestinations和dynamicallyIncludedDestinations上使用通配符。 还有,当你使用持久主题订阅者时,请不要去修改NetworkConnector和提供者的(默认)名字,因为网络中的ActiveMQ是通过提供者名字和NetworkConnector名字加上持久订阅者名字来标志一个持久订阅者的。** ** **一般情况下,在采用网络提供者时,为了防止消息在提供者之间重复流动,一个消息是不允许再一次发送到同一个网络提供者的,但这样就有一个问题,如果某些消息发送到一个网络提供者时,该网络提供者刚好因为某种原因重启,导致消息消费失败,这些消息需要重新发送到该网络提供者,这样就矛盾了。有一种解决方案就是强制让客户端采用 **rebalanceClusterClients**方式进行重连( [http://manzhizhen.iteye.com/blog/2105572][http_manzhizhen.iteye.com_blog_2105572] )。再一个,就是让消息回到原来的地方重新发送,因为那个网络提供者已经没有消费者了。有一种目的地策略(destination policy),它通过配置一个replayWhenNoConsumers=true的conditionalNetworkBridgeFilterFactory来提供了这种行为,conditionalNetworkBridgeFilterFactory提供了一个可配置的基于提供者时间(broker-in time)的重发延迟(replayDelay ),配置如下: <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue="TEST.>" **enableAudit="false"**> <networkBridgeFilterFactory> ** <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>** </networkBridgeFilterFactory> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> 主要,在ActiveMQ5.9之前的版本使用replayWhenNoConsumers="true"需要加enableAudit="false",具体原因请看文章后面提供的官网链接,这里不细说。conditionalNetworkBridgeFilterFactory可以对目的地进行比例限制,可以有效的避免低优先级的网络消费者饿死本地的快消费者。 本文对网络连接器的属性介绍的不是很详细,官网的文章似乎也有些前后矛盾表述不清的地方,还是等以后有时间分析分析源代码再来补充吧! [10190bfe-30d7-3021-8bbe-9d7882530083.png]: /images/20220612/ee7ee2844dc54c5d899f1d98857f865d.png [http_10.79.11.171_8161]: http://10.79.11.171:8161/ [http_manzhizhen.iteye.com_blog_2105572]: http://manzhizhen.iteye.com/blog/2105572
还没有评论,来说两句吧...