一、背景
某系統作為公司產品矩陣底座,每天面對數十億級流量請求。在覈心介面全鏈路壓測時發現需多次透過網路請求分散式快取,影響介面耗時。快取、熔斷、限流作為應對高併發系統的三板斧,其中熔斷限流作為系統的自我保護機制,而快取作為介面效能提升利器備受關注。
二、問題分析
分散式記憶體快取讀取相對訪問db磁碟至少是數量級領先,但仍然存在網路請求,於是引入本地快取形成本地快取、分散式快取與db形成三級儲存結構。但任何事情都具有兩面性,引入快取後同一份資料會儲存在不同的地方,帶來資料一致性的問題。
目前資料一致性解決方案有強一致、弱一致及最終一致,強一致的需要額外的手段保障實現成本高,最終一致為為弱一致性的特例。結合當前業務場景及實現成本,此處選擇最終一致性作為當前系統的一致性解決方案,放棄強一致不代表不追求一致性,會盡可能追求資料的一致性。
三、本地快取選型
LinkedHashMap
優點:jdk內建數據結構,無需引入其他元件,執行緒安全;
缺點:缺少容量限制、無淘汰策略,需要自己開發;
Guava
優點:Guava作為Google團隊開源的一款Java核心增強庫,在效能和穩定性上都有保障,同時提供容量限制、兩種淘汰策略(LRU和LFU);
缺點:淘汰策略不夠完善,使用任何一種都存在一定問題。比如LRU在面對偶爾批次重新整理資料時很容易將快取內容擠出,帶來擊穿風險。LFU在面對突發流量時顯得力不從心,需要時間累積使用頻率。
Caffeine
優點:caffeine以Guava為基礎改進而來,站在巨人的肩膀上有更高的起點。在LFU基礎上改進形成W-TinyLFU淘汰演算法,在一定程度上解決了LFU的缺點,讓caffeine有更高的快取命中效率;
根據官方的測試資料來看,Caffeine在效能上優於其他幾種元件,因此此處選用Caffeine作為本地快取,Caffeine過期策略:
expireAfterAccess: 在指定的過期時間內沒有讀寫,再次訪問時會將資料判斷為失效。如果一直訪問則永不過期,不能單獨使用;
expireAfterWrite: 在指定的過期時間內沒有再次寫入,再次訪問時會將資料判斷為失效,此時請求會阻塞等待載入新資料返回;
refreshAfterWrite: 在指定的過期時間後訪問時,會再次載入重新整理快取資料,在重新整理任務未完成之前,其他執行緒返回舊值;
上述任何一種策略單獨使用都存在一定的問題,下面嘗試將兩者組合進行分析:
組合1:expireAfterAccess和expireAfterWrite 缺點:快取過期後,由一個請求去執行後端查詢,其他請求都在阻塞等待結果返回。如果同時有大量的請求阻塞,對系統吞吐量有一定影響。
組合2:expireAfterAccess和refreshAfterWrite 缺點:如果某個key一直被訪問,則會一直不過期。
組合3:expireAfterWrite和refreshAfterWrite 將組合3兩個過期時間比例設成3:1,可以在保證資料失效的同時可以防止大量請求因資料過期造成阻塞,因此選用組合3作為本地快取的過期策略。
四、本地快取選型
常見的分散式快取有Memcache、Redis等,但Redis相比Memcache有更豐富的功能特性(持久化、釋出訂閱、主從複製等), 因此選用Redis作為分散式快取。
五、MySQL與Redis資料同步
1、先更新MySQL再更新Redis
缺點:A、在高併發情況下,假設請求A先寫MySQL然後卡頓,隨後請求B寫MySQL再更新Redis,請求A最後再更新Redis,會存在舊值覆蓋新值;B、寫完資料庫立即寫快取,可能會存在浪費系統資源;
是否推薦使用:不推薦
2、先更新Redis再更新MySQL
缺點:更新Redis成功但寫MySQL失敗;
是否推薦使用:不推薦
3、先刪除Redis再寫MySQL
缺點:在高併發情況下,假設請求A先刪除Redis然後卡頓,請求B請求Redis沒值則讀取MySQL,再快取在Redis,然後請求A再寫MySQL,則此時快取中的值為髒資料。
解決辦法:延時雙刪
是否推薦使用:不推薦
4、先寫MySQL再刪除Redis
缺點1:假設寫請求A先來,請求A寫MySQL然後卡頓沒來得及刪除快取,請求B讀取快取會是舊值,隨後請求A就將快取刪除,B讀取了一次舊值,可以接受;
缺點2:
快取到過期時間會自動失效;
請求A查詢快取,發快取中沒有資料,查詢資料庫的舊值,但由於網路原因卡頓了,沒有來得及更新快取;
請求B寫資料庫,接著刪除了快取;
請求A更新舊值到快取中;
上述情形出現條件需要滿足:快取剛好失效請求A查詢耗時比請求B更新資料庫耗時時間還要長,出現機率極小。
是否推薦使用:推薦
5、利用Canal監聽MySQL binlog
Canal是用Java開發的基於資料庫增量日誌解析,提供增量資料訂閱&消費的中介軟體。目前Canal支援MySQL的binlog解析,解析完成後才利用Canal Client來處理獲得的相關資料。
綜上所述,前3種方案存在問題,第四種可以接受但是需在執行MySQL變更時需要操作Redis操作重新整理快取,存在耦合,因此選用方案5。
六、redis與本地快取同步
1、訊息佇列廣播模式
當binlog有變更時推送訊息到訊息佇列,應用例項採用廣播模式消費,消費時搶鎖更新Redis快取,然後更新本地快取;
優點:廣播機制實現本地快取更新;
缺點:依賴分散式鎖、需要支援廣播的訊息佇列;
2、基於redis釋出訂閱
當canal收到binlog變更時,將變更訊息推送到訊息佇列,應用例項以叢集模式消費後更新分散式快取,然後向Redis叢集釋出訊息。註冊訂閱的例項收到釋出訊息後更新本地快取(覆蓋或刪除本地快取),具體流程如上所示。
缺點:當訂閱Redis叢集的例項比較多時,更新本地快取可能會存在時延;
優點:輕量;
目前訊息佇列技術棧選用的是Kafka,但Kafka無法支援廣播消費(RocketMQ支援廣播消費),同時廣播消費需要分散式鎖支援相對較重,因此此處選用Redis釋出訂閱模式。
七、方案實踐
1、canal部署
此處在雲環境搭建canal服務,示意圖如上:
canal配置binlog監聽:
anal.instance.master.address:資料庫db連線; canal.instance.dbUsername:連線資料庫賬號; canal.instance.dbPassword:連線資料庫密碼; canal.instance.filter.regex:監聽庫中哪些表格的正規表示式; canal.mq.topic:傳送訊息佇列的主題;
2、消費kafka更新分散式快取及釋出變更:
消費kafka:
public void onMessage(List<String> messages) throws KafkaConsumeRetryException { if (CollectionUtils.isEmpty(messages)) { return; } log.info("[Binlog] receive msg from bin log {}", messages); for (String msg : messages) { BinlogKafkaDTO binlogKafkaDTO = JSONObject.parseObject(msg, BinlogKafkaDTO.class); //省略非關鍵程式碼 ICouponCacheHandler iCouponCacheHandler = handleMap.get(binlogKafkaDTO.getTable()); if (iCouponCacheHandler == null) { log.error("[Binlog] no handler find msg:{}", binlogKafkaDTO); continue; } //判斷是否為插入還是更新,刪除需走另外的邏輯 boolean insertOrUpdate = SfStrUtil.equals(binlogKafkaDTO.getType(), CacheConstant.EventType.INSERT) || SfStrUtil.equals(binlogKafkaDTO.getType(), CacheConstant.EventType.UPDATE); iCouponCacheHandler.updateCache(multiLevelCacheUtil, binlogKafkaDTO, insertOrUpdate); } } default void updateCache(MultiLevelCacheUtil multiLevelCacheUtil, BinlogKafkaDTO binlogKafkaDTO, Boolean insertOrUpdate) { JSONArray jsonArray = binlogKafkaDTO.getData(); for (int i = 0, num = jsonArray.size(); i < num; ++i) { BinlogBaseInfo obj = JSONObject.parseObject(JSONObject.toJSONString(jsonArray.get(i)), BinlogBaseInfo.class); String id = obj.getId(); if (insertOrUpdate) { multiLevelCacheUtil.putForCache(String.format(getCachePrefix(), id), JSONObject.toJSONString(obj), CacheConstant.EXPIRE_SIZE); } else { //delete multiLevelCacheUtil.deleteCache(String.format(getCachePrefix(), id)); } } }
更新分散式快取及釋出變更:
public <T> void putForCache(String key, T value, long expireTime) { if (expireTime <= 0) { redisUtil.set(key, value); } else { redisUtil.set(key, value, expireTime); } redisUtil.getRedisTemplate().convertAndSend(key, value instanceof String ? value : SfJsonUtil.toJsonStr(value)); } public void deleteCache(String key) { try { redisUtil.unlink(key); redisUtil.getRedisTemplate().convertAndSend(key, CacheConstant.DELETE_FLAG); } catch (Exception e) { log.error("deleteCache error key:{}", key, e); } }
3、向redis訂閱註冊及回撥處理
監聽回撥基類:
public abstract class AbstractRedisMessageListener implements MessageListener { private CaffeineCacheUtil caffeineCacheUtil; public AbstractRedisMessageListener(CaffeineCacheUtil caffeineCacheUtil) { this.caffeineCacheUtil = caffeineCacheUtil; } @Override public void onMessage(Message message, byte[] pattern) { String data = LocalCacheConstant.VALUE_SERIALIZER.deserialize(message.getBody()); String channel = new String(message.getChannel()); //如果是刪除redis快取,則清除本地快取 否則更新本地快取或者插入 則直接覆蓋 if (SfStrUtil.equals(LocalCacheConstant.DELETE_FLAG, data)) { caffeineCacheUtil.evictCache(getCacheName(), channel); } else { caffeineCacheUtil.putCache(getCacheName(), channel, data); } } public abstract Set<String> getTopics(); public abstract SubscribeType getSubscribeType(); public abstract String getCacheName(); }
渠道黑名單監聽實現如下:
@Slf4j @Component public class BlackWaybillSourceRedisMessageListener extends AbstractRedisMessageListener { public BlackIpRedisMessageListener(CaffeineCacheUtil caffeineCacheUtil) { super(caffeineCacheUtil); } @Override public Set<String> getTopics() { return Sets.newHashSet(LocalCacheConstant.RedisMessageTopic.BlackWaybillSource); } @Override public SubscribeType getSubscribeType() { return SubscribeType.CHANNEL_TYPE; } @Override public String getCacheName() { return LocalCacheConstant.CaffineCacheName.BlackWaybillSource; } }
此處以將所有渠道黑名單作為集合當成一個key,則以頻道註冊。比如需快取訂單,需要以訂單號維度快取訂單,則以模式註冊。
向redis訂閱註冊:
@Configuration public class RedisMessageListenerConfig { @Bean @ConditionalOnBean(AbstractRedisMessageListener.class) public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,List<AbstractRedisMessageListener> redisMessageListeners) { RedisMessageListenerContainer redisMessageListenercontainer = new RedisMessageListenerContainer(); redisMessageListenercontainer.setConnectionFactory(factory); for (AbstractRedisMessageListener redisMessageListener : redisMessageListeners) { //判斷是模式註冊還是頻道訂閱 boolean channelType = ObjectUtil.equals(SubscribeType.CHANNEL_TYPE, redisMessageListener.getSubscribeType()); redisMessageListener.getTopics().stream().forEach(channel -> redisMessageListenercontainer.addMessageListener(redisMessageListener, channelType ? new ChannelTopic(channel):new PatternTopic(channel))); } log.info("Register listener for redisMessageListenerContainer num:{}", redisMessageListeners.size()); return redisMessageListenercontainer; } }
4、快取建立
利用spring cacheManager批次建立快取:
public CacheManager cacheManagerWithCaffeine() { CaffeineCacheManager cacheManager = new CaffeineCacheManager(); Caffeine caffeine = Caffeine.newBuilder() //cache的初始容量值 .initialCapacity(StringUtils.isEmpty(initCapacity)?100:Integer.parseInt(initCapacity)) //maximumSize用來控制cache的最大快取數量,maximumSize和maximumWeight(最大權重)不可以同時使用, .maximumSize(StringUtils.isEmpty(maxSize)?1000:Long.parseLong(maxSize)) //建立或更新之後多久重新整理,需要設定cacheLoader .refreshAfterWrite(StringUtils.isEmpty(refreshAfterWrite)?10:Long.parseLong(refreshAfterWrite), TimeUnit.SECONDS); if (StringUtils.isNotBlank(expireAfterAccess)) { caffeine.expireAfterAccess(Long.parseLong(expireAfterAccess), TimeUnit.SECONDS); } if (StringUtils.isNotBlank(expireAfterWrite)) { caffeine.expireAfterWrite(Long.parseLong(expireAfterWrite), TimeUnit.SECONDS); } cacheManager.setCaffeine(caffeine); cacheManager.setCacheLoader(cacheLoader()); if (StringUtils.isEmpty(cacheNames)){ cacheNames = "userCache,commonCache"; } // 根據名字可以建立多個cache,但是多個cache使用相同的策略 cacheManager.setCacheNames(Arrays.asList(cacheNames.split(","))); // 是否允許值為空 cacheManager.setAllowNullValues(false); return cacheManager; }
快取引數引數設定:
caffine配置 caffeine.cacheNames=waybillSource caffeine.initCapacity=32 caffeine.maxSize=64 #寫入多久後過期 caffeine.expireAfterWrite=1800 #寫入多久後過期過,同時請求先返回舊資料然後再載入 caffeine.refreshAfterWrite=600
批次建立caffeine簡單方便,但因追求通用性缺乏個性化定製.比如所有快取過期時間、容量都是一致的,此時可以採用caffeine手動建立快取進行深度定製,同樣以渠道黑名單為例。
根據api手動構建快取:
@Component public class BlackWaybillSourceCache { private LoadingCache<String, List< BlackWaybillSource>> cache = Caffeine.newBuilder() .refreshAfterWrite(10, TimeUnit.MINUTES) .expireAfterWrite(30, TimeUnit.MINUTES) .initialCapacity(2) .maximumSize(2) .build(new CacheLoader<String, List< BlackWaybillSource>>() { @Nullable @Override public List<WaybillSource> load(@Nonnull String key) throws Exception { ....... return JSONObject.parseArray(jsonString, BlackWaybillSource.class); } }); public List< BlackWaybillSource> queryBlackWaybillSource() { return cache.get(WAYBILL_SOUERCE); } public void put(String key,String value){ cache.put(key,JSONObject.parseArray(value,BlackWaybillSource.class)); } public void evict(String key){ cache.invalidate(key); } }
此時BlackWaybillSourceRedisMessageListener需實現onMessage,更新及刪除分別刪除上面WaybillSourceCache的put和evict方法實現回撥更新。
5、效果
功能上線前後Redis叢集併發量及執行命令數對比:
叢集IP | 每秒併發量 | 每分鐘命令數 |
---|---|---|
xx.78.32 | 4737—>1896 | 263400—>113400 |
xx.78.35 | 6184—>1716 | 351060—>103680 |
xx.78.36 | 6262—>2203 | 354840—>123540 |
xx.78.39 | 5409—>1656 | 307560—>101640 |
從redis執行併發量及命令數來看,本地快取攔截很多請求,降低Redis叢集的負載,提升響應速度。
根據記憶體dump計算出本地快取佔用記憶體約為60M左右;
核心介面耗時下降約為19%;
redis命令數及併發量降低67%;
八、總結
經過分析選型,用redis與caffeine構建兩級快取,用canal解決db和redis之間的資料同步,用redis釋出訂閱解決分散式快取與本地快取之間的資料同步,構建近乎實時的兩級快取,介面效能得到約19%的提升,redis叢集負載下降一半以上。