Redis - 如何通过 Sentinel (哨兵) 进行主从切换
Redis Sentinel
Redis Sentinel 是用于监控Redis集群中Master状态的工具 , Sentinel 只在 Server 端做主从切换 , 客户端需要额外的开发 , 所以我们的介绍 , 会分为Redis服务端 , 和 客户端两部分.
Sentinel的作用 :
1.Master的状态检测
2.如果Master出现异常 , 则会进行Master-Slave切换, 将其中一个Slave作为Master , 而将之前的Master实例切换为Slave .
3.Master-Slave切换后 , master_redis.conf , slave_redis.conf 和 sentinel.conf的内容都会发生改变 , 即 master_redis.conf中会多出一行slaveof的配置 , sentinel.conf的监控目标会随着变更
服务端工作方式:
Sentinel 对于不可用有两种定义
主观不可用(SDOWN) : SDOWN 是单个Sentinel实例检测到redis实例的状态 , 自己主观上判断为不可用
客观不可用(ODOWN) : ODOWN 需要Sentinel集群内一定数量(配置文件)的实例达成一致 , 才会认为Master节点不可用 , 然后执行failover策略 .
1.每个Sentinel实例以每秒一次的频率向自己所监控的Master,Slave以及其他Sentinel实例发送一个PING命令
2.如果一个实例距离最后一次有效回复PING命令的时间超过 down-after-millisenconds配置所指定的值 , 则这个实例会被Sentinel标记为主观不可用(SDOWN) .
3.如果一个Master实例被标记为主观不可用 , 则Sentinel 集群会进行投票 , 通过SENTINEL is_master_down_by_addr 命令 来获得其他Sentinel对Master的检测结果 , 如果超过指定数量的Sentinel认为该实例不可用 , 则Master会被标记为客观不可用(ODOWN) .
4.从SDOWN切换到ODOWN状态 , 不需要使用一致性算法 , 只使用gossip协议.
5.ODOWN状态只适用于Master节点 , Slave节点和Sentinel节点不会存在ODOWN状态 , 也不需要进行投票
客户端工作方式:
对于客户端的工作方式 , 我们以分析Java版本客户端 Jedis的源码为主。
构造器 :
public JedisSentinelPool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig,
final int connectionTimeout, final int soTimeout,
final String password,
final int database,
final String clientName) {
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
//初始化 Sentinels
HostAndPort master = initSentinels(sentinels, masterName);
// 初始化连接池
initPool(master);
}
初始化方法 (initSentinels) :
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
HostAndPort master = null;
boolean sentinelAvailable = false;
log.info("Trying to find master from available Sentinels...");
// 遍历Sentinel集群
for (String sentinel : sentinels) {
// 解析 Sentinel 地址
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
log.fine("Connecting to Sentinel " + hap);
Jedis jedis = null;
try {
// 创建Sentinel 连接
jedis = new Jedis(hap.getHost(), hap.getPort());
// 根据MasterName获取Master地址 , 返回一个集合 , 下标 0 是地址 , 下标 1 是端口
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
// connected to sentinel...
sentinelAvailable = true;
if (masterAddr == null || masterAddr.size() != 2) {
log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + ".");
continue;
}
// 实例化地址
master = toHostAndPort(masterAddr);
log.fine("Found Redis master at " + master);
// 如果在任何一个Sentinel中找到了master , 跳出循环
break;
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e + ". Trying next one.");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
if (master == null) {
if (sentinelAvailable) {
// can connect to sentinel, but master name seems to not
// monitored
throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored...");
} else {
throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running...");
}
}
log.info("Redis master running at " + master + ", starting Sentinel listeners...");
// 遍历Sentinel集群地址 , 针对每一个实例 , 启动一个监听器
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
return master;
}
在initSentinels方法中 , 遍历Sentinel集群 , 并通过与Jedis绑定的客户端 , 发送一个 get-master-addr-by-name命令 ,来询问master节点的地址 。 直到找到master节点的地址 , 或者确认不存在指定名字的master节点。
在这段代码的最后 , 我们可以看到针对每一个Sentinel实例 都启动了一个监听器 , 我们来分析一下 MasterListener做了什么。
MasterListener
protected class MasterListener extends Thread { protected String masterName; protected String host; protected int port; protected long subscribeRetryWaitTimeMillis = 5000; // 因监听器可能被多个线程访问 , 所以jedis对象被修饰为**可见的** , // 即一个线程修改了Jedis实例 , 其他的线程也可以得到最新的实例 protected volatile Jedis j; protected AtomicBoolean running = new AtomicBoolean(false); protected MasterListener() { } public MasterListener(String masterName, String host, int port) { super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port)); this.masterName = masterName; this.host = host; this.port = port; } public MasterListener(String masterName, String host, int port, long subscribeRetryWaitTimeMillis) { this(masterName, host, port); this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; } public void run() { running.set(true); while (running.get()) { j = new Jedis(host, port); try { // double check that it is not being shutdown if (!running.get()) { break; } // 订阅 channelName 为 "+switch-master" 的消息 j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.fine("Sentinel " + host + ":" + port + " published: " + message + "."); String[] switchMasterMsg = message.split(" "); if (switchMasterMsg.length > 3) { if (masterName.equals(switchMasterMsg[0])) { // 初始连接池 initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); } else { log.fine("Ignoring message on +switch-master for master name "+ switchMasterMsg[0] + ", our master name is " + masterName); } } else { log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel +switch-master: " + message); } } //channelName }, "+switch-master"); } catch (JedisConnectionException e) { if (running.get()) { log.log(Level.SEVERE, "Lost connection to Sentinel at " + host + ":" + port + ". Sleeping 5000ms and retrying.",e); try { Thread.sleep(subscribeRetryWaitTimeMillis); } catch (InterruptedException e1) { log.log(Level.SEVERE, "Sleep interrupted: ", e1); } } else { log.fine("Unsubscribing from Sentinel at " + host + ":" + port); } } finally { j.close(); } } }
protected volatile Jedis j; protected AtomicBoolean running = new AtomicBoolean(false);
从成员变量中我们可以看到 为了保障线程安全 , 代码中使用了volatile(可见性关键字) , 和布尔的原子变量 , 我会在另外的文章里 , 描述他们在使用上的区别 。
在 MasterListener 这个监听器里 , 我们可以看到这里订阅了一个名为 “+switch-master” 的事件 , 当得到这个事件的时候,调用 initPool 方法 , 用来更新Master节点的地址 , 并且初始化连接池 。
还没有评论,来说两句吧...