RocketMq4.7源码解析之二(name,broker启动流程)
文章目录
- broker启动流程
- NameServer启动流程
- mian
- createNamesrvController
- NamesrvController构造
- start
- initialize
- start
broker启动流程
broker配置了配置文件地址,以及环境变量配置文件所在目录
vf
NameServer启动流程
mian
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
createNamesrvController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
//设置版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
//解析命令行
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
//命令行为null,则返回
System.exit(-1);
return null;
}
//names业务参数
//初次加载时,从环境变量中获取ROCKETMQ_HOME并赋值
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//网络参数
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//设置监听端口
nettyServerConfig.setListenPort(9876);
//解析-c configFile 通过,c 命令指定配置文件的路径。
//使用“ 一属性名属性值”,例如一listenPort 9876 。
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
//读取配置文件
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
//解析配置文件属性
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
//设置配置文件路径
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//-p 打印当前加载配置属性
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
//解析命令行属性填充到对象
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//ROCKETMQ_HOME环境变量非空判断
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//日志相关
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//log打印name和netty配置属性
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
//使用配置文件属性,注册并覆盖默认属性
controller.getConfiguration().registerConfig(properties);
return controller;
}
NamesrvController构造
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
//路由信息管理,内部有topic,broker,集群,以及心跳之类的信息
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
//合并对象成员变量属性,并保存对象
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
public Configuration(InternalLogger log, Object... configObjects) {
this.log = log;
if (configObjects == null || configObjects.length == 0) {
return;
}
//合并对象成员变量属性,并保存对象
for (Object configObject : configObjects) {
registerConfig(configObject);
}
}
public Configuration registerConfig(Object configObject) {
try {
readWriteLock.writeLock().lockInterruptibly();
try {
//对象成员变量值填充到prop
Properties registerProps = MixAll.object2Properties(configObject);
//属性覆盖allConfigs
merge(registerProps, this.allConfigs);
//记录对象
configObjectList.add(configObject);
} finally {
readWriteLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("registerConfig lock error");
}
return this;
}
start
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
//初始化controller
boolean initResult = controller.initialize();
//初始化失败,退出
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//注册JVM 钩子函数并启动服务器,
//在JVM 进程关闭之前,先将线程池关闭,及时释放资源。 tomcat也有使用这种方法释放资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
//启动
controller.start();
return controller;
}
initialize
public boolean initialize() {
//加载KV 配置
this.kvConfigManager.load();
//创建NettyServer 网络处理对象,创建线程池和信号量
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//创建Netty业务线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
//NameServer 每隔10s扫描一次Broker , 移除处于不激活状态的Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//NameServer每隔10 分钟打印一次KV 配置。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
start
public void start() {
//Worker线程池
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
//初始化handler和编码器
prepareSharableHandlers();
ServerBootstrap childHandler =
//设置EventLoopGroup,来接受和处理新的连接,以进行事件的处理,如接受新连接以及读/写数据
//server socket接收了新建连接后,会把connection socket放到workerGroup中进行处理
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
//epoll还是nio channel
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//分别添加2个handler,处理请求
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
//设置编码器
encoder,
//设置解码器
new NettyDecoder(),
/*
当连接空闲时间太长时,将会触发一个IdleStateEvent 事件.
ChannelInboundHandler中重写userEvent-Triggered()方法来处理该IdleStateEvent 事件
*/
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
//异步地绑定服务器;调用 sync()方法阻塞等待直到绑定完成
ChannelFuture sync = this.serverBootstrap.bind().sync();
//记录port
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
还没有评论,来说两句吧...