1. 源碼解讀分為三部分,初始化和運行過程以及擴展點#
1.1 sentinel 自身初始化#
初始化為 sentinel 在 springboot 啟動時候,做了什麼?
在引入依賴後,對 spring boot 產生了什麼副作用
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2023.0.1.0</version>
</dependency>
我們看看依賴中包含什麼?
有很多,可以看到經典的幾個部分,我們先看看本體 spring-cloud-starter-alibaba-sentinel 中的內容,一般看三個部分,一是自動配置類,二是 SPI 接口,再是 springboot 中的擴展點 spring.factories
我們先看看有什麼
可以看到只是導入了自動配置類,看看裡面導入了什麼配置類
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration
第一第二個是對 springweb 框架的適配,第三個是 sentinel 提供對外的訪問端口,第四個是初始化和定制 sentinel,第五個是對 feign 的支持,我們一個一個看,首先看 sentinel 自身的初始化
1.2 sentinel 自身的初始化,屬性初始化,數據源初始化,切面初始化,restTemplate 初始化,這部分適配了 spring 項目,在沒有使用 mvc 的情況下#
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelAutoConfiguration {
@Value("${project.name:${spring.application.name:}}")
private String projectName;
@Autowired
private SentinelProperties properties;
// 屬性初始化
@PostConstruct
public void init() {
if (StringUtils.isEmpty(System.getProperty(LogBase.LOG_DIR))
&& StringUtils.isNotBlank(properties.getLog().getDir())) {
// 日誌
System.setProperty(LogBase.LOG_DIR, properties.getLog().getDir());
}
if (StringUtils.isEmpty(System.getProperty(LogBase.LOG_NAME_USE_PID))
&& properties.getLog().isSwitchPid()) {
// 日誌
System.setProperty(LogBase.LOG_NAME_USE_PID,
String.valueOf(properties.getLog().isSwitchPid()));
}
if (StringUtils.isEmpty(System.getProperty(SentinelConfig.APP_NAME_PROP_KEY))
&& StringUtils.isNotBlank(projectName)) {
// 設置工程名或者spring應用名字,一般是應用名${spring.application.name:}
System.setProperty(SentinelConfig.APP_NAME_PROP_KEY, projectName);
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.SERVER_PORT))
&& StringUtils.isNotBlank(properties.getTransport().getPort())) {
// sentinel 對外的控制端口,像是dashboard就是同個這個端口進行訪問的,默認為public static final String API_PORT = "8719";
System.setProperty(TransportConfig.SERVER_PORT,
properties.getTransport().getPort());
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.CONSOLE_SERVER))
&& StringUtils.isNotBlank(properties.getTransport().getDashboard())) {
// dashboard的端口地址
System.setProperty(TransportConfig.CONSOLE_SERVER,
properties.getTransport().getDashboard());
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.HEARTBEAT_INTERVAL_MS))
&& StringUtils
.isNotBlank(properties.getTransport().getHeartbeatIntervalMs())) {
// 心跳包的時間間隔,默認為private static final long DEFAULT_INTERVAL = 1000 * 10;
System.setProperty(TransportConfig.HEARTBEAT_INTERVAL_MS,
properties.getTransport().getHeartbeatIntervalMs());
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.HEARTBEAT_CLIENT_IP))
&& StringUtils.isNotBlank(properties.getTransport().getClientIp())) {
// 心跳包的客戶端ip,默認為本機ip
System.setProperty(TransportConfig.HEARTBEAT_CLIENT_IP,
properties.getTransport().getClientIp());
}
if (StringUtils.isEmpty(System.getProperty(SentinelConfig.CHARSET))
&& StringUtils.isNotBlank(properties.getMetric().getCharset())) {
System.setProperty(SentinelConfig.CHARSET,
properties.getMetric().getCharset());
}
if (StringUtils
.isEmpty(System.getProperty(SentinelConfig.SINGLE_METRIC_FILE_SIZE))
&& StringUtils.isNotBlank(properties.getMetric().getFileSingleSize())) {
System.setProperty(SentinelConfig.SINGLE_METRIC_FILE_SIZE,
properties.getMetric().getFileSingleSize());
}
if (StringUtils
.isEmpty(System.getProperty(SentinelConfig.TOTAL_METRIC_FILE_COUNT))
&& StringUtils.isNotBlank(properties.getMetric().getFileTotalCount())) {
System.setProperty(SentinelConfig.TOTAL_METRIC_FILE_COUNT,
properties.getMetric().getFileTotalCount());
}
if (StringUtils.isEmpty(System.getProperty(SentinelConfig.COLD_FACTOR))
&& StringUtils.isNotBlank(properties.getFlow().getColdFactor())) {
System.setProperty(SentinelConfig.COLD_FACTOR,
properties.getFlow().getColdFactor());
}
if (StringUtils.isNotBlank(properties.getBlockPage())) {
setConfig(BLOCK_PAGE_URL_CONF_KEY, properties.getBlockPage());
}
// 早期初始化
//是否一開始就初始化,默認為false,而是等到第一次調用的時候初始化
if (properties.isEager()) {
InitExecutor.doInit();
}
}
// 這個是支持SentinelResource註解的切面類初始化
@Bean
@ConditionalOnMissingBean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
// 對SentinelRestTemplate的初始化,初始化一個後置處理器,給他添加攔截器
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.web.client.RestTemplate")
@ConditionalOnProperty(name = "resttemplate.sentinel.enabled", havingValue = "true",
matchIfMissing = true)
public static SentinelBeanPostProcessor sentinelBeanPostProcessor(
ApplicationContext applicationContext) {
return new SentinelBeanPostProcessor(applicationContext);
}
// 外置屬性源的處理,初始化
// 在所有單例bean初始化後
/*
public void postRegister(AbstractDataSource dataSource) {
switch (this.getRuleType()) {
case FLOW -> FlowRuleManager.register2Property(dataSource.getProperty());
case DEGRADE -> DegradeRuleManager.register2Property(dataSource.getProperty());
case PARAM_FLOW -> ParamFlowRuleManager.register2Property(dataSource.getProperty());
case SYSTEM -> SystemRuleManager.register2Property(dataSource.getProperty());
case AUTHORITY -> AuthorityRuleManager.register2Property(dataSource.getProperty());
case GW_FLOW -> GatewayRuleManager.register2Property(dataSource.getProperty());
case GW_API_GROUP -> GatewayApiDefinitionManager.register2Property(dataSource.getProperty());
}
}
*/
@Bean
@ConditionalOnMissingBean
public SentinelDataSourceHandler sentinelDataSourceHandler(
DefaultListableBeanFactory beanFactory, SentinelProperties sentinelProperties,
Environment env) {
return new SentinelDataSourceHandler(beanFactory, sentinelProperties, env);
}
// 一些轉換器,例如將配置外部屬性源的時候設置轉換器
@ConditionalOnClass(ObjectMapper.class)
@Configuration(proxyBeanMethods = false)
protected static class SentinelConverterConfiguration {
// json
@Configuration(proxyBeanMethods = false)
protected static class SentinelJsonConfiguration {
private ObjectMapper objectMapper = new ObjectMapper();
public SentinelJsonConfiguration() {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
}
@Bean("sentinel-json-flow-converter")
public JsonConverter jsonFlowConverter() {
return new JsonConverter(objectMapper, FlowRule.class);
}
@Bean("sentinel-json-degrade-converter")
public JsonConverter jsonDegradeConverter() {
return new JsonConverter(objectMapper, DegradeRule.class);
}
@Bean("sentinel-json-system-converter")
public JsonConverter jsonSystemConverter() {
return new JsonConverter(objectMapper, SystemRule.class);
}
@Bean("sentinel-json-authority-converter")
public JsonConverter jsonAuthorityConverter() {
return new JsonConverter(objectMapper, AuthorityRule.class);
}
@Bean("sentinel-json-param-flow-converter")
public JsonConverter jsonParamFlowConverter() {
return new JsonConverter(objectMapper, ParamFlowRule.class);
}
}
// xml
@ConditionalOnClass(XmlMapper.class)
@Configuration(proxyBeanMethods = false)
protected static class SentinelXmlConfiguration {
private XmlMapper xmlMapper = new XmlMapper();
public SentinelXmlConfiguration() {
xmlMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
}
@Bean("sentinel-xml-flow-converter")
public XmlConverter xmlFlowConverter() {
return new XmlConverter(xmlMapper, FlowRule.class);
}
@Bean("sentinel-xml-degrade-converter")
public XmlConverter xmlDegradeConverter() {
return new XmlConverter(xmlMapper, DegradeRule.class);
}
@Bean("sentinel-xml-system-converter")
public XmlConverter xmlSystemConverter() {
return new XmlConverter(xmlMapper, SystemRule.class);
}
@Bean("sentinel-xml-authority-converter")
public XmlConverter xmlAuthorityConverter() {
return new XmlConverter(xmlMapper, AuthorityRule.class);
}
@Bean("sentinel-xml-param-flow-converter")
public XmlConverter xmlParamFlowConverter() {
return new XmlConverter(xmlMapper, ParamFlowRule.class);
}
}
}
}
1.3 對 springMVC 的適配初始化#
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = Type.SERVLET)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@ConditionalOnClass(SentinelWebInterceptor.class)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelWebAutoConfiguration implements WebMvcConfigurer {
private static final Logger log = LoggerFactory
.getLogger(SentinelWebAutoConfiguration.class);
@Autowired
private SentinelProperties properties;
@Autowired
private Optional<UrlCleaner> urlCleanerOptional;
@Autowired
private Optional<BlockExceptionHandler> blockExceptionHandlerOptional;
@Autowired
private Optional<RequestOriginParser> requestOriginParserOptional;
// 這裡初始化了一個攔截器,全局攔截器
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebInterceptor sentinelWebInterceptor(
SentinelWebMvcConfig sentinelWebMvcConfig) {
return new SentinelWebInterceptor(sentinelWebMvcConfig);
}
// 上面攔截器的一些配置
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebMvcConfig sentinelWebMvcConfig() {
SentinelWebMvcConfig sentinelWebMvcConfig = new SentinelWebMvcConfig();
// 是否將請求方法加入resource name
sentinelWebMvcConfig.setHttpMethodSpecify(properties.getHttpMethodSpecify());
sentinelWebMvcConfig.setWebContextUnify(properties.getWebContextUnify());
// 限流後的一下異常處理
if (blockExceptionHandlerOptional.isPresent()) {
blockExceptionHandlerOptional
.ifPresent(sentinelWebMvcConfig::setBlockExceptionHandler);
}
else {
if (StringUtils.hasText(properties.getBlockPage())) {
sentinelWebMvcConfig.setBlockExceptionHandler(((request, response,
e) -> response.sendRedirect(properties.getBlockPage())));
}
else {
// 限流後的一下異常處理,默認值
sentinelWebMvcConfig
.setBlockExceptionHandler(new DefaultBlockExceptionHandler());
}
}
urlCleanerOptional.ifPresent(sentinelWebMvcConfig::setUrlCleaner);
// 源名字解析
requestOriginParserOptional.ifPresent(sentinelWebMvcConfig::setOriginParser);
return sentinelWebMvcConfig;
}
// 註冊攔截器
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebMvcConfigurer sentinelWebMvcConfigurer() {
return new SentinelWebMvcConfigurer();
}
}
2 運行過程#
2.1 springMVC 攔截器運行邏輯#
攔截器一般有兩個方法,一個是請求前的方法,一個是請求後的方法,我們先看請求前的方法
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
// 首先獲取到資源名字
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
// 如果requests屬性中有$$sentinel_spring_web_entry_attr-rc,計數後放行
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
return true;
}
// Parse the request origin using registered origin parser.
// 根據HTTP生成源,默認為空,在StatisticSlot會用到
String origin = parseOrigin(request);
// 獲取監控容器的名字,這裡默認為sentinel_spring_web_context
String contextName = getContextName(request);
// 關鍵代碼,初始化調用上下文
ContextUtil.enter(contextName, origin);
// 進入資源,進入slot插件模塊,創建為
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try {
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
//slot插件模塊創建過程
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
//Sentinel default ProcessorSlots
//com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
//com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
//com.alibaba.csp.sentinel.slots.logger.LogSlot
//com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
//com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
//com.alibaba.csp.sentinel.slots.system.SystemSlot
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
//com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
//com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
// 所有的默認slot,執行循序也是上面的順序
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
2.2 進入 chain.entry (context, resourceWrapper, null, count, prioritized, args) 看看實際的運行過程#
我們先看看 slot 接口,方便理解,
public interface ProcessorSlot<T> {
// 本slot的運行邏輯
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;
// 本slot的運行完成後,需要幹什麼? 抽象類的實現是將參數強轉後進入下一個slot,他們的關係的單向鏈表
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;
// 同理
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
看看抽象類
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
// 保存下一個該執行的slot
private AbstractLinkedProcessorSlot<?> next = null;
//抽象類的實現是將參數強轉後進入下一個slot,他們的關係的單向列表
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {
next.exit(context, resourceWrapper, count, args);
}
}
public AbstractLinkedProcessorSlot<?> getNext() {
return next;
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
}
ok, 現在正是進入到調用 slot 的過程中,我們按照上面的循序一個一個看#
// Resolve the slot chain builder SPI.
//Sentinel default ProcessorSlots
//com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
//com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
//com.alibaba.csp.sentinel.slots.logger.LogSlot
//com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
//com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
//com.alibaba.csp.sentinel.slots.system.SystemSlot
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
//com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
//com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
// 所有的默認slot,執行循序也是上面的順序
NodeSelectorSlot,資源 node 選擇器#
我們必須先明確一點,node 是與 slotchain 綁定的,每一個唯一資源都有唯一一個 slotchain,而 slotchain 裡面保存了 node (defaultNode),並且保存了不同調用上下文的不同 node (defaultNode),官網有解說
資源都有唯一一個 slotchain;
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
我們看看 NodeSelectorSlot 是怎麼實現的
DefaultNode node = map.get(context.getName());
// 可以看到,以context.getName()調用上下文的名字為key去查找node,也就是說,一個資源可能有多個node,但是其實只會統計一次,在下面的ClusterBuilderSlot調用可以看到
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
// 設置調用鏈,因為可能嵌套進入不同的資源,
// context保存著入口node和調用鏈(雙向鏈表),這裡是把當前調用的node添加到上個節點的CTNode中的child,當然如果是第一次進入,就加入入口node中
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
// 設置當前調用,設置到當前調用CTNODE中的CurNode,加入了兩次,一次是加入到上級調用上下文,一次是加入當前
context.setCurNode(node);
ClusterBuilderSlot,資源統計節點的構建,也就是 ClusterNode#
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
//先檢查當前slotchain有沒有
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
//可以看到,每個slotchain只會生成一個clusterNode,共享給不同contextname的node
node.setClusterNode(clusterNode);
/*
* if context origin is set, we should get or create a new {@link Node} of
* the specific origin.
*/
//設置源,默認沒有,有過有,額外在clusterNode維護一個map
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ok,這裡邏輯很簡單,創建一個 clusterNode 統計進入信息
接著進入 LogSlot#
這裡面邏輯也比較簡單,記錄 BlockException
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), e.getRule().getId(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
ok ,到了核心的 StatisticSlot#
首先判斷是否可以進入,這裡先去執行 AuthoritySlot、SystemSlot,ParamFlowSlot,FlowSlot,所以我們先去看看其他幾個 slot 的邏輯,先跳過這一節
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 首先判斷是否可以進入,這裡先去執行AuthoritySlot、SystemSlot,ParamFlowSlot,FlowSlot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 請求通過,增加線程數和通過數。
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
AuthoritySlot#
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
// 檢查鑑權信息
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
// 就是去拿到實現設置好的鑑權規則看看是否可以通過
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
SystemSlot#
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 檢查系統健康狀態
// 這裡就不進去看了,具體就是根據系統的指標調整通過的請求,官網解釋https://sentinelguard.io/zh-cn/docs/system-adaptive-protection.html
SystemRuleManager.checkSystem(resourceWrapper, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ParamFlowSlot#
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 查看是否有規則
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
// 根據參數的位置,限制訪問https://sentinelguard.io/zh-cn/docs/parameter-flow-control.html
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
FlowSlot#
與 ParamFlowSlot 類似,根據 FlowRule 決定是否通過flow-control | Sentinel (sentinelguard.io)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
DegradeSlot#
- 慢調用比例 (
SLOW_REQUEST_RATIO
):選擇以慢調用比例作為閾值,需要設置允許的慢調用 RT(即最大的響應時間),請求的響應時間大於該值則統計為慢調用。當單位統計時長(statIntervalMs
)內請求數目大於設置的最小請求數目,並且慢調用的比例大於閾值,則接下來的熔斷時長內請求會自動被熔斷。經過熔斷時長後熔斷器會進入探測恢復狀態(HALF-OPEN 狀態),若接下來的一個請求響應時間小於設置的慢調用 RT 則結束熔斷,若大於設置的慢調用 RT 則會再次被熔斷。 - 異常比例 (
ERROR_RATIO
):當單位統計時長(statIntervalMs
)內請求數目大於設置的最小請求數目,並且異常的比例大於閾值,則接下來的熔斷時長內請求會自動被熔斷。經過熔斷時長後熔斷器會進入探測恢復狀態(HALF-OPEN 狀態),若接下來的一個請求成功完成(沒有錯誤)則結束熔斷,否則會再次被熔斷。異常比率的閾值範圍是[0.0, 1.0]
,代表 0% - 100%。 - 異常數 (
ERROR_COUNT
):當單位統計時長內的異常數目超過閾值之後會自動進行熔斷。經過熔斷時長後熔斷器會進入探測恢復狀態(HALF-OPEN 狀態),若接下來的一個請求成功完成(沒有錯誤)則結束熔斷,否則會再次被熔斷。 - circuit-breaking | Sentinel (sentinelguard.io)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
如果前面的都通過了然後回到 StatisticSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 現在執行到這裡了
// 增加線程數,裡面會有兩種,一種是瞬時的,一種是集群的
node.increaseThreadNum();
// 增加訪問次數
node.addPassRequest(count);
// 增加源訪問次數
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
// 統計入口node的訪問次數和線程
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// 通過了限流,的回調函數
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
// 這裡表示默認處理時,等待一會後可以進去,還是放行
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// 一樣統計入口node的訪問次數和線程
Constants.ENTRY_NODE.increaseThreadNum();
}
// 通過了限流,的回調函數
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// 沒有通過了限流,的回調函數
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
3. 擴展點#
sentinel 有很多擴展點
3.1 初始化過程擴展 Initexector#
執行時機,第一次調用 enty,或者不是 Earler 模式
public static void doInit() {
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
// 找到所有的InitFunc
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : initFuncs) {
RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();
RecordLog.info("[InitExecutor] Executing {} with order {}",
w.func.getClass().getCanonicalName(), w.order);
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
默認情況下,會找到集群模式下的 client 和 server 初始化以及 metric 回調函數初始化
public class MetricCallbackInit implements InitFunc {
@Override
public void init() throws Exception {
StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
new MetricEntryCallback());
StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
new MetricExitCallback());
}
}
3.2 Slot/Slot Chain 擴展#
調用時機:沒有找到資源匹配的 slotchain
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
/**
* The load and pick process is not thread-safe, but it's okay since the method should be only invoked
* via {@code lookProcessChain} in {@link com.alibaba.csp.sentinel.CtSph} under lock.
*
* @return new created slot chain
*/
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
private SlotChainProvider() {}
}
3.3 Transport 擴展#
其實就是客戶端對外暴露的接口,默認也會暴露一些接口,方便查看客戶端的情況
首先有一個 API 中心負責接收外界信息
public class SimpleHttpCommandCenter implements CommandCenter {
private static final int PORT_UNINITIALIZED = -1;
private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
private static final int DEFAULT_PORT = 8719;
@SuppressWarnings("rawtypes")
private static final Map<String, CommandHandler> handlerMap = new ConcurrentHashMap<String, CommandHandler>();
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private ExecutorService executor = Executors.newSingleThreadExecutor(
new NamedThreadFactory("sentinel-command-center-executor", true));
private ExecutorService bizExecutor;
private ServerSocket socketReference;
@Override
@SuppressWarnings("rawtypes")
public void beforeStart() throws Exception {
// Register handlers
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers);
}
@Override
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor", true),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
/**
* Get a server socket from an available port from a base port.<br>
* Increasing on port number will occur when the port has already been used.
*
* @param basePort base port to start
* @return new socket with available port
*/
private static ServerSocket getServerSocketFromBasePort(int basePort) {
int tryCount = 0;
while (true) {
try {
ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
server.setReuseAddress(true);
return server;
} catch (IOException e) {
tryCount++;
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e1) {
break;
}
}
}
return null;
}
@Override
public void stop() throws Exception {
if (socketReference != null) {
try {
socketReference.close();
} catch (IOException e) {
CommandCenterLog.warn("Error when releasing the server socket", e);
}
}
if (bizExecutor != null) {
bizExecutor.shutdownNow();
}
executor.shutdownNow();
TransportConfig.setRuntimePort(PORT_UNINITIALIZED);
handlerMap.clear();
}
/**
* Get the name set of all registered commands.
*/
public static Set<String> getCommands() {
return handlerMap.keySet();
}
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
setName("sentinel-courier-server-accept-thread");
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
}
}
接收到之後找到適配的 handler
public void run() {
if (socket == null) {
return;
}
PrintWriter printWriter = null;
InputStream inputStream = null;
try {
long start = System.currentTimeMillis();
inputStream = new BufferedInputStream(socket.getInputStream());
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String firstLine = readLine(inputStream);
CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
+ ", addr: " + socket.getInetAddress());
CommandRequest request = processQueryString(firstLine);
if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
// Deal with post method
processPostRequest(inputStream, request);
}
// Validate the target command.
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
return;
}
// Find the matching command handler.
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter);
} else {
// No matching command handler.
writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
}
long cost = System.currentTimeMillis() - start;
CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
+ ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
} catch (RequestException e) {
writeResponse(printWriter, e.getStatusCode(), e.getMessage());
} catch (Throwable e) {
CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
try {
if (printWriter != null) {
String errorMessage = SERVER_ERROR_MESSAGE;
e.printStackTrace();
if (!writtenHead) {
writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
} else {
printWriter.println(errorMessage);
}
printWriter.flush();
}
} catch (Exception e1) {
CommandCenterLog.warn("Failed to write error response", e1);
}
} finally {
closeResource(inputStream);
closeResource(printWriter);
closeResource(socket);
}
}
默認會定義一些 handle
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerFlowConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerTransportConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyServerNamespaceSetHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerInfoCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyClusterClientConfigHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterClientConfigHandler
3.4 集群流控擴展#
集群流控・alibaba/Sentinel Wiki (github.com)
此文由 Mix Space 同步更新至 xLog
原始鏈接為 https://me.liuyaowen.club/posts/default/springcloud-sentinel