数据库 \ redis \ Redis Cluster多机房高可用实现--基于客户端<上篇>

Redis Cluster多机房高可用实现--基于客户端<上篇>

总点击209
简介:原文地址:https://cachecloud.github.io/2016/11/03/Redis%20Cluster多机房高可用实现/ 本文以Redis-Cluster为例子,实际使用中Redis-Sentinel和RedisStandalone也是一样的。

原文地址:https://cachecloud.github.io/2016/11/03/Redis%20Cluster多机房高可用实现/

本文以Redis-Cluster为例子,实际使用中Redis-Sentinel和Redis Standalone也是一样的。


一、现有问题

由于Redis本身的一些特性(例如复制)以及使用场景,造成Redis不太适合部署在不同的机房,所以通常来看Redis集群都是在同一个机房部署的。虽然Redis集群自身已经具备了高可用的特性,即使几个Redis节点异常或者挂掉,Redis Cluster也会实现故障自动转移,对应用方来说也可以在很短时间内恢复故障。但是如果发生了机房故障(断电、断网等极端情况),如果应用方降级或者容错机制做的不好甚至业务本身不能降级,或者会丢失重要数据,或者可能瞬间会跑满应用的线程池造成服务不可用,对于一些重要的服务来说是非常致命的。为了应对像机房故障这类情况,保证应用方在这种极端情况下,仍然可以正常服务(系统正常运行、数据正常),所以需要给出一个Redis跨机房的方案。

二、实现思路和目标:

1.思路

使用CacheCloud开通两个位于两个不同机房的Redis-Cluster集群(例如:兆维、北显):一个叫major,作为主Redis服务,一个叫minor,作为备用Redis服务。

开发定制版的客户端,利用netflix的hystrix组件能够解除依赖隔离的特性,在major出现故障时,将故障隔离,并将请求自动转发到minor,并且对于应用的主线程池没有影响。(有关hystrix的请求流程流程见下图,有关hystrix使用请参考:http://hot66hot.iteye.com/blog/2155036

Redis Cluster多机房高可用实现--基于客户端<上篇>

2.实现目标:

客户端易接入,如同使用Jedis API一样。

真正实现跨机房的故障转移。

依赖隔离,也就是说即使Redis出现问题,也不会影响主线程池。

读取数据正常。

写数据尽可能一致。

更多的故障转移可配置参数(hystrix):例如隔离线程池大小,超时等

暴露相关统计数据和报表:如jmx和hystrix-dashboard

三、实施:

1.利用hystrix能够隔离依赖的特性,为major和minor分别放到不同的线程池中(与应用的主线程池隔离)

2.客户端接口和初始化方法:由于是定制化客户端,所以暂时没有通用的方法,所有的API需要自己实现。


1


2


3


4


5


6


7


public interface RedisCrossRoomClient {


String set(String key,String value);


String get(String key);


}


初始化方法,需要传入两个初始化好的PipeLineCluster


1


PipeLineCluster是我们内部对于JedisCluster的扩展,这里看成JedisCluster即可。


1


2


3


4


5


6


7


8


9


10


11


12


13


14


15


16


public class RedisClusterCrossRoomClientImpl implements RedisCrossRoomClient {


private Logger logger = LoggerFactory.getLogger(RedisClusterCrossRoomClientImpl.class);


/**


* 主


*/


private PipelineCluster majorPipelineCluster;


/**


* 备


*/


private PipelineCluster minorPipelineCluster;


public RedisClusterCrossRoomClientImpl(PipelineCluster majorPipelineCluster,PipelineCluster minorPipelineCluster) {


this.majorPipelineCluster = majorPipelineCluster;


this.minorPipelineCluster = minorPipelineCluster;


}


}


3.读操作方案:如下图,正常run指向到major,异常(2.1图中所有指向getFallback)指向到minor。

Redis Cluster多机房高可用实现--基于客户端<上篇>

例如:正常情况下都是从majorPipelineCluster读取数据,当出现非正常情况时(hystrix阀门开启、线程池拒绝、超时、异常)等情况时,走minorPipelineCluster的逻辑

基础类


1


2


3


4


5


6


7


8


9


10


11


12


13


14


15


16


17


18


19


20


21


22


23


24


25


public class BaseCommand {


protected final Logger logger = LoggerFactory.getLogger(this.getClass());


/**


* hystrix参数,例如超时、线程池、关门策略、开门策略等等。


*/


protected static final String MAJOR_READ_COMMAND_KEY = "major_read_command";


protected static final String MAJOR_WRITE_COMMAND_KEY = "major_write_command";


protected static final String MAJOR_GROUP_KEY = "major_redis_group";


protected static final String MAJOR_THREAD_POOL_KEY = "major_redis_pool";


public static int majorTimeOut = 1000;


public static int majorThreads = 100;


/**


* hystrix参数,例如超时、线程池、关门策略、开门策略等等。


*/


protected static final String MINOR_READ_COMMAND_KEY = "minor_read_command";


protected static final String MINOR_WRITE_COMMAND_KEY = "minor_write_command";


protected static final String MINOR_GROUP_KEY = "minor_redis_group";


protected static final String MINOR_THREAD_POOL_KEY = "minor_redis_pool";


public static int minorTimeOut = 1000;


public static int minorThreads = 100;


}


读命令类


1


2


3


4


5


6


7


8


9


10


11


12


13


14


15


16


17


18


19


20


21


22


23


24


25


26


27


28


29


30


31


32


33


34


35


36


37


38


39


40


41


42


43


44


45


46


47


48


public abstract class ReadCommand<T> extends BaseCommand {


protected abstract T readMajor();


protected abstract T readMinor();


public T read() {


// 1.收集总数


RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.ALL);


DataComponentCommand<T> majorCommand =


new DataComponentCommand<T>(MAJOR_READ_COMMAND_KEY,MAJOR_GROUP_KEY,MAJOR_THREAD_POOL_KEY,


majorTimeOut,majorThreads) {


@Override


protected T run() throws Exception {


// 2.收集run总数


RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.RUN);


return readMajor();


}


@Override


public T getBusinessFallback() {


// 3.收集fallback总数


RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.FALLBACK_ALL);


RedisCrossRoomHystrixStat.counterFallBack(MAJOR_READ_COMMAND_KEY);


return new DataComponentCommand<T>(MINOR_READ_COMMAND_KEY,MINOR_GROUP_KEY,MINOR_THREAD_POOL_KEY,


minorTimeOut,minorThreads) {


@Override


protected T run() throws Exception {


// 4.收集fallback-run总数


RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.FALLBACK_RUN);


return readMinor();


}


@Override


public T getBusinessFallback() throws RedisCrossRoomReadMinorFallbackException {


// 5.收集fallback-fallback总数


RedisCrossRoomClientStatusCollector.collectCrossRoomStatus(HystrixStatCountTypeEnum.FALLBACK_FALLBACK);


throw new RedisCrossRoomReadMinorFallbackException("MinorFallbackException");


}


}.execute();


}


};


return majorCommand.execute();


}


}


例如get(String key)命令


1


2


3


4


5


6


7


8


9


10


11


12


13


14


15


16


17


18


19


20


public class RedisClusterCrossRoomClientImpl implements RedisCrossRoomClient {


...


@Override


public String get(final String key) {


return new ReadCommand<String>() {


@Override


protected String readMajor() {


return majorPipelineCluster.get(key);


}


@Override


protected String readMinor() {


return minorPipelineCluster.get(key);


}


}.read();


}


...


}


4.写操作方案目标:尽可能双写,如果发生故障暂时只是做了隔离,没有做数据同步处理(未来会考虑接入MQ),目前只把写入的结果返回给应用方,应用方来维持一致性。

MultiWriteResult类,四个成员变量分别为:

序号

参数

含义

1

DataStatusEnum majorStatus

主集群执行结果状态

2

T majorResult

主集群执行Redis命令结果

3

DataStatusEnum minorStatus

备用集群执行结果状态

4

T minorResult

备用集群执行Redis命令结果

Redis Cluster多机房高可用实现--基于客户端&lt;上篇&gt;


1


2


3


4


5


6


7


8


9


10


11


12


13


14


15


16


17


18


19


20


21


22


23


24


25


26


27


28


29


30


31


32


33


34


35


36


37


38


39


40


41


42


43


44


45


46


47


48


49


50


51


52


53


54


55


56


57


58


59


60


61


62


63


64


65


66


public abstract class WriteCommand<T> extends BaseCommand {


protected abstract T writeMajor();


protected abstract T writeMinor();


protected abstract String getCommandParam();


public MultiWriteResult<T> write() {


DataComponentCommand<T> majorCommand =


new DataComponentCommand<T>(MAJOR_WRITE_COMMAND_KEY,majorThreads) {


@Override


protected T run() throws Exception {


return writeMajor();


}


@Override


public T getBusinessFallback() {


logger.warn("major cross-room failed: {}",getCommandParam());


return null;


}


};


DataComponentCommand<T> minorCommand =


new DataComponentCommand<T>(MINOR_WRITE_COMMAND_KEY,minorThreads) {


@Override


protected T run() throws Exception {


return writeMinor();


}


@Override


public T getBusinessFallback() {


logger.warn("minor cross-room failed: {}",getCommandParam());


return null;


}


};


Future<T> majorFuture = majorCommand.queue();


Future<T> minorFuture = minorCommand.queue();


T majorResult = null;


T minorResult = null;


try {


majorResult = majorFuture.get();


} catch (Exception e) {


logger.error(e.getMessage(),e);


}


try {


minorResult = minorFuture.get();


} catch (Exception e) {


logger.error(e.getMessage(),e);


}


DataStatusEnum majorStatus = DataStatusEnum.SUCCESS;


DataStatusEnum minorStatus = DataStatusEnum.SUCCESS;


if (majorResult == null) {


majorStatus = DataStatusEnum.FAIL;


}


if (minorResult == null) {


minorStatus = DataStatusEnum.FAIL;


}


return new MultiWriteResult<T>(majorStatus,majorResult,minorStatus,minorResult);


}


}


例如set命令


1


2


3


4


5


6


7


8


9


10


11


12


13


14


15


16


17


18


19


20


21


22


public class RedisClusterCrossRoomClientImpl implements RedisCrossRoomClient {


...


@Override


public MultiWriteResult<String> set(final String key,final String value) {


return new WriteCommand<String>() {


@Override


protected String writeMajor() {


return majorPipelineCluster.set(key,value);


}


@Override


protected String writeMinor() {


return minorPipelineCluster.set(key,value);


}


@Override


protected String getCommandParam() {


return String.format("set key %s value %s",key,value);


}


}.write();


}


...


四、对外暴露的数据和报表:

(1) hystrix-dashboard报表:实时统计图。


(2) jmx相关数据:major和minor相关统计,run和fallback调用次数、异常次数。

五、测试读:

1.major服务正常,但是major的线程池确实不够用

(1)

测试代码

测试方法:major的线程池设置小一些,请求的并发量大一些,每个线程做1000次随机读并返回主线程


测试验证:每个请求都有返回结果(前提是key是存在的)


1


2


3


4


5


6


7


8


9


10


11


12


13


14


15


16


17


18


19


20


21


22


23


24


25


26


27


28


29


30


31


32


33


34


35


36


37


38


39


40


41


42


43


44


45


46


47


48


49


50


51


52


53


54


55


56


57


58


59


60


61


62


63


64


/**


* 主线程池跑满:线程池size过小(major:30,minor:80,并发请求线程100)


*


* @throws InterruptedException


*/


@Test


public void testRandomReadWithEnoughThreads() throws InterruptedException {


redisClusterCrossRoomClient.setMajorThreads(30);


redisClusterCrossRoomClient.setMinorThreads(80);


int threadNum = 100;


int perSize = TOTAL_SIZE / threadNum;


int totalNeedGet = 1000;


CountDownLatch countDownLatch = new CountDownLatch(threadNum);


for (int i = 0; i < threadNum; i++) {


int start = perSize * i + 1;


int end = perSize * (i + 1);


Thread thread = new RandomReadThread(start,end,totalNeedGet,countDownLatch);


thread.start();


}


countDownLatch.await();


System.out.println("request counter: " + TOTAL_SIZE);


System.out.println("readSuccess counter:" + readSuccessCounter.get());


}


class RandomReadThread extends Thread {


private int start;


private int end;


private int totalNeedGet;


private CountDownLatch countDownLatch;


private long counter;


public RandomReadThread(int start,int end,int totalNeedGet,CountDownLatch countDownLatch) {


this.start = start;


this.end = end;


this.totalNeedGet = totalNeedGet;


this.countDownLatch = countDownLatch;


}


@Override


public void run() {


while (true) {


try {


if (counter >= totalNeedGet) {


countDownLatch.countDown();


break;


}


if (counter % 100 == 0) {


logger.info("{} execute {} th,total size {}",Thread.currentThread().getName(),counter,


totalNeedGet);


}


int id = start + new Random().nextInt(end - start);


String key = "user:" + id;


String result = redisClusterCrossRoomClient.get(key);


if (StringUtils.isBlank(result)) {


logger.warn("key {},value is null",key);


} else {


readSuccessCounter.incrementAndGet();


}


counter++;


TimeUnit.MILLISECONDS.sleep(10);


} catch (Exception e) {


e.printStackTrace();


}


}


}


}


意见反馈 常见问题 官方微信 返回顶部