1. Source Code Interpretation is Divided into Three Parts: Initialization and Running Process, and Extension Points#
1.1 Sentinel Self-Initialization#
What does Sentinel do during the Spring Boot startup?
What side effects does it have after introducing dependencies?
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2023.0.1.0</version>
</dependency>
Let's see what is included in the dependencies?
There are many, and we can see several classic parts. Let's first look at the contents of the main body spring-cloud-starter-alibaba-sentinel. Generally, we look at three parts: first, the auto-configuration class; second, the SPI interface; and third, the extension points in Spring Boot, spring.factories.
Let's see what we have.
We can see that only the auto-configuration class is imported. Let's see what configuration classes are imported inside.
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration
The first two are adaptations for the Spring Web framework, the third provides external access ports for Sentinel, the fourth is for initializing and customizing Sentinel, and the fifth is for supporting Feign. Let's look at them one by one, starting with Sentinel's own initialization.
1.2 Sentinel's Own Initialization: Property Initialization, Data Source Initialization, Aspect Initialization, RestTemplate Initialization. This part adapts to Spring projects without using MVC.#
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelAutoConfiguration {
@Value("${project.name:${spring.application.name:}}")
private String projectName;
@Autowired
private SentinelProperties properties;
// Property initialization
@PostConstruct
public void init() {
if (StringUtils.isEmpty(System.getProperty(LogBase.LOG_DIR))
&& StringUtils.isNotBlank(properties.getLog().getDir())) {
// Log
System.setProperty(LogBase.LOG_DIR, properties.getLog().getDir());
}
if (StringUtils.isEmpty(System.getProperty(LogBase.LOG_NAME_USE_PID))
&& properties.getLog().isSwitchPid()) {
// Log
System.setProperty(LogBase.LOG_NAME_USE_PID,
String.valueOf(properties.getLog().isSwitchPid()));
}
if (StringUtils.isEmpty(System.getProperty(SentinelConfig.APP_NAME_PROP_KEY))
&& StringUtils.isNotBlank(projectName)) {
// Set project name or Spring application name, usually the application name ${spring.application.name:}
System.setProperty(SentinelConfig.APP_NAME_PROP_KEY, projectName);
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.SERVER_PORT))
&& StringUtils.isNotBlank(properties.getTransport().getPort())) {
// Sentinel's external control port, like the dashboard is accessed through this port, default is public static final String API_PORT = "8719";
System.setProperty(TransportConfig.SERVER_PORT,
properties.getTransport().getPort());
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.CONSOLE_SERVER))
&& StringUtils.isNotBlank(properties.getTransport().getDashboard())) {
// Dashboard port address
System.setProperty(TransportConfig.CONSOLE_SERVER,
properties.getTransport().getDashboard());
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.HEARTBEAT_INTERVAL_MS))
&& StringUtils
.isNotBlank(properties.getTransport().getHeartbeatIntervalMs())) {
// Heartbeat interval time, default is private static final long DEFAULT_INTERVAL = 1000 * 10;
System.setProperty(TransportConfig.HEARTBEAT_INTERVAL_MS,
properties.getTransport().getHeartbeatIntervalMs());
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.HEARTBEAT_CLIENT_IP))
&& StringUtils.isNotBlank(properties.getTransport().getClientIp())) {
// Heartbeat client IP, default is local IP
System.setProperty(TransportConfig.HEARTBEAT_CLIENT_IP,
properties.getTransport().getClientIp());
}
if (StringUtils.isEmpty(System.getProperty(SentinelConfig.CHARSET))
&& StringUtils.isNotBlank(properties.getMetric().getCharset())) {
System.setProperty(SentinelConfig.CHARSET,
properties.getMetric().getCharset());
}
if (StringUtils
.isEmpty(System.getProperty(SentinelConfig.SINGLE_METRIC_FILE_SIZE))
&& StringUtils.isNotBlank(properties.getMetric().getFileSingleSize())) {
System.setProperty(SentinelConfig.SINGLE_METRIC_FILE_SIZE,
properties.getMetric().getFileSingleSize());
}
if (StringUtils
.isEmpty(System.getProperty(SentinelConfig.TOTAL_METRIC_FILE_COUNT))
&& StringUtils.isNotBlank(properties.getMetric().getFileTotalCount())) {
System.setProperty(SentinelConfig.TOTAL_METRIC_FILE_COUNT,
properties.getMetric().getFileTotalCount());
}
if (StringUtils.isEmpty(System.getProperty(SentinelConfig.COLD_FACTOR))
&& StringUtils.isNotBlank(properties.getFlow().getColdFactor())) {
System.setProperty(SentinelConfig.COLD_FACTOR,
properties.getFlow().getColdFactor());
}
if (StringUtils.isNotBlank(properties.getBlockPage())) {
setConfig(BLOCK_PAGE_URL_CONF_KEY, properties.getBlockPage());
}
// earlier initialize
// Whether to initialize at the beginning, default is false, instead wait until the first call to initialize
if (properties.isEager()) {
InitExecutor.doInit();
}
}
// This is the aspect class initialization that supports the SentinelResource annotation
@Bean
@ConditionalOnMissingBean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
// Initialization of SentinelRestTemplate, initializes a post-processor and adds interceptors to it
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.web.client.RestTemplate")
@ConditionalOnProperty(name = "resttemplate.sentinel.enabled", havingValue = "true",
matchIfMissing = true)
public static SentinelBeanPostProcessor sentinelBeanPostProcessor(
ApplicationContext applicationContext) {
return new SentinelBeanPostProcessor(applicationContext);
}
// Handling of external property sources, initialization
// After all singleton beans are initialized
/*
public void postRegister(AbstractDataSource dataSource) {
switch (this.getRuleType()) {
case FLOW -> FlowRuleManager.register2Property(dataSource.getProperty());
case DEGRADE -> DegradeRuleManager.register2Property(dataSource.getProperty());
case PARAM_FLOW -> ParamFlowRuleManager.register2Property(dataSource.getProperty());
case SYSTEM -> SystemRuleManager.register2Property(dataSource.getProperty());
case AUTHORITY -> AuthorityRuleManager.register2Property(dataSource.getProperty());
case GW_FLOW -> GatewayRuleManager.register2Property(dataSource.getProperty());
case GW_API_GROUP -> GatewayApiDefinitionManager.register2Property(dataSource.getProperty());
}
}
*/
@Bean
@ConditionalOnMissingBean
public SentinelDataSourceHandler sentinelDataSourceHandler(
DefaultListableBeanFactory beanFactory, SentinelProperties sentinelProperties,
Environment env) {
return new SentinelDataSourceHandler(beanFactory, sentinelProperties, env);
}
// Some converters, for example, setting converters when configuring external property sources
@ConditionalOnClass(ObjectMapper.class)
@Configuration(proxyBeanMethods = false)
protected static class SentinelConverterConfiguration {
// json
@Configuration(proxyBeanMethods = false)
protected static class SentinelJsonConfiguration {
private ObjectMapper objectMapper = new ObjectMapper();
public SentinelJsonConfiguration() {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
}
@Bean("sentinel-json-flow-converter")
public JsonConverter jsonFlowConverter() {
return new JsonConverter(objectMapper, FlowRule.class);
}
@Bean("sentinel-json-degrade-converter")
public JsonConverter jsonDegradeConverter() {
return new JsonConverter(objectMapper, DegradeRule.class);
}
@Bean("sentinel-json-system-converter")
public JsonConverter jsonSystemConverter() {
return new JsonConverter(objectMapper, SystemRule.class);
}
@Bean("sentinel-json-authority-converter")
public JsonConverter jsonAuthorityConverter() {
return new JsonConverter(objectMapper, AuthorityRule.class);
}
@Bean("sentinel-json-param-flow-converter")
public JsonConverter jsonParamFlowConverter() {
return new JsonConverter(objectMapper, ParamFlowRule.class);
}
}
// xml
@ConditionalOnClass(XmlMapper.class)
@Configuration(proxyBeanMethods = false)
protected static class SentinelXmlConfiguration {
private XmlMapper xmlMapper = new XmlMapper();
public SentinelXmlConfiguration() {
xmlMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
}
@Bean("sentinel-xml-flow-converter")
public XmlConverter xmlFlowConverter() {
return new XmlConverter(xmlMapper, FlowRule.class);
}
@Bean("sentinel-xml-degrade-converter")
public XmlConverter xmlDegradeConverter() {
return new XmlConverter(xmlMapper, DegradeRule.class);
}
@Bean("sentinel-xml-system-converter")
public XmlConverter xmlSystemConverter() {
return new XmlConverter(xmlMapper, SystemRule.class);
}
@Bean("sentinel-xml-authority-converter")
public XmlConverter xmlAuthorityConverter() {
return new XmlConverter(xmlMapper, AuthorityRule.class);
}
@Bean("sentinel-xml-param-flow-converter")
public XmlConverter xmlParamFlowConverter() {
return new XmlConverter(xmlMapper, ParamFlowRule.class);
}
}
}
}
1.3 Adaptation Initialization for Spring MVC#
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = Type.SERVLET)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@ConditionalOnClass(SentinelWebInterceptor.class)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelWebAutoConfiguration implements WebMvcConfigurer {
private static final Logger log = LoggerFactory
.getLogger(SentinelWebAutoConfiguration.class);
@Autowired
private SentinelProperties properties;
@Autowired
private Optional<UrlCleaner> urlCleanerOptional;
@Autowired
private Optional<BlockExceptionHandler> blockExceptionHandlerOptional;
@Autowired
private Optional<RequestOriginParser> requestOriginParserOptional;
// Here, a global interceptor is initialized
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebInterceptor sentinelWebInterceptor(
SentinelWebMvcConfig sentinelWebMvcConfig) {
return new SentinelWebInterceptor(sentinelWebMvcConfig);
}
// Some configurations for the above interceptor
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebMvcConfig sentinelWebMvcConfig() {
SentinelWebMvcConfig sentinelWebMvcConfig = new SentinelWebMvcConfig();
// Whether to include request method in resource name
sentinelWebMvcConfig.setHttpMethodSpecify(properties.getHttpMethodSpecify());
sentinelWebMvcConfig.setWebContextUnify(properties.getWebContextUnify());
// Exception handling after flow control
if (blockExceptionHandlerOptional.isPresent()) {
blockExceptionHandlerOptional
.ifPresent(sentinelWebMvcConfig::setBlockExceptionHandler);
}
else {
if (StringUtils.hasText(properties.getBlockPage())) {
sentinelWebMvcConfig.setBlockExceptionHandler(((request, response,
e) -> response.sendRedirect(properties.getBlockPage())));
}
else {
// Default exception handling after flow control
sentinelWebMvcConfig
.setBlockExceptionHandler(new DefaultBlockExceptionHandler());
}
}
urlCleanerOptional.ifPresent(sentinelWebMvcConfig::setUrlCleaner);
// Source name parsing
requestOriginParserOptional.ifPresent(sentinelWebMvcConfig::setOriginParser);
return sentinelWebMvcConfig;
}
// Register interceptor
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebMvcConfigurer sentinelWebMvcConfigurer() {
return new SentinelWebMvcConfigurer();
}
}
2 Running Process#
2.1 Spring MVC Interceptor Running Logic#
Interceptors generally have two methods: one for pre-request and one for post-request. Let's first look at the pre-request method.
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
// First, get the resource name
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
// If there is $$sentinel_spring_web_entry_attr-rc in the requests attribute, count and pass
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
return true;
}
// Parse the request origin using registered origin parser.
// Generate origin based on HTTP, default is empty, used in StatisticSlot
String origin = parseOrigin(request);
// Get the name of the monitoring container, default is sentinel_spring_web_context
String contextName = getContextName(request);
// Key code, initialize call context
ContextUtil.enter(contextName, origin);
// Enter resource, enter slot plugin module, create
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try {
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
// Slot plugin module creation process
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
// Sentinel default ProcessorSlots
// com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
// com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
// com.alibaba.csp.sentinel.slots.logger.LogSlot
// com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
// com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
// com.alibaba.csp.sentinel.slots.system.SystemSlot
// com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
// com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
// com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
// All default slots, executed in the above order
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
2.2 Entering chain.entry(context, resourceWrapper, null, count, prioritized, args) to see the actual running process#
Let's first look at the slot interface for better understanding.
public interface ProcessorSlot<T> {
// The running logic of this slot
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;
// What to do after the running of this slot is completed? The implementation of the abstract class casts the parameters and enters the next slot, forming a unidirectional linked list of their relationships
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;
// Similarly
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
Let's look at the abstract class.
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
// Save the next slot to be executed
private AbstractLinkedProcessorSlot<?> next = null;
// The implementation of the abstract class casts the parameters and enters the next slot, forming a unidirectional list of their relationships
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {
next.exit(context, resourceWrapper, count, args);
}
}
public AbstractLinkedProcessorSlot<?> getNext() {
return next;
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
}
Now we are entering the process of calling the slot, let's look at them one by one in the order above.#
// Resolve the slot chain builder SPI.
// Sentinel default ProcessorSlots
// com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
// com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
// com.alibaba.csp.sentinel.slots.logger.LogSlot
// com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
// com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
// com.alibaba.csp.sentinel.slots.system.SystemSlot
// com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
// com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
// com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
// All default slots, executed in the above order
NodeSelectorSlot, Resource Node Selector#
We must clarify one point: a node is bound to a slot chain. Each unique resource has a unique slot chain, and the slot chain contains a node (defaultNode) and different nodes for different calling contexts (defaultNode). The official website has explanations.
Each resource has a unique slot chain.
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
Let's see how NodeSelectorSlot is implemented.
DefaultNode node = map.get(context.getName());
// You can see that it looks up the node using context.getName() as the key, meaning that a resource may have multiple nodes, but it will only be counted once. This can be seen in the following ClusterBuilderSlot call.
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
// Set the invocation chain, as it may be nested into different resources,
// context saves the entry node and invocation chain (doubly linked list), here the current invoked node is added to the child of the previous node's CTNode. If it is the first entry, it is added to the entry node.
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
// Set the current invocation, set to the current invocation CTNODE's CurNode, added twice, once to the upper invocation context and once to the current one.
context.setCurNode(node);
ClusterBuilderSlot, Resource Statistics Node Construction, i.e., ClusterNode#
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
// First check if the current slot chain exists
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// You can see that each slot chain will only generate one clusterNode, shared with different context names' nodes.
node.setClusterNode(clusterNode);
/*
* if context origin is set, we should get or create a new {@link Node} of
* the specific origin.
*/
// Set the origin, default is none, if there is, maintain a map in clusterNode.
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
Okay, the logic here is simple: create a clusterNode to collect entry information.
Next, enter LogSlot#
The logic here is also quite simple: record BlockException.
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), e.getRule().getId(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
Okay, we have reached the core StatisticSlot#
First, it checks whether it can enter. Here, it first executes AuthoritySlot, SystemSlot, ParamFlowSlot, FlowSlot, so let's first look at the logic of the other slots and skip this section for now.
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// First, check whether it can enter, here it first executes AuthoritySlot, SystemSlot, ParamFlowSlot, FlowSlot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
AuthoritySlot#
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
// Check authorization information
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
// This is to retrieve the configured authorization rules to see if it can pass
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
SystemSlot#
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// Check system health status
// We won't go into this, it adjusts the requests that can pass based on system metrics. The official explanation is https://sentinelguard.io/zh-cn/docs/system-adaptive-protection.html
SystemRuleManager.checkSystem(resourceWrapper, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ParamFlowSlot#
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// Check if there are rules
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
// Limit access based on parameter position https://sentinelguard.io/zh-cn/docs/parameter-flow-control.html
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
FlowSlot#
Similar to ParamFlowSlot, it determines whether to pass based on FlowRule flow-control | Sentinel (sentinelguard.io)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
DegradeSlot#
- Slow Call Ratio (
SLOW_REQUEST_RATIO
): Choose to use the slow call ratio as a threshold, you need to set the allowed slow call RT (i.e., maximum response time). If the response time of the request exceeds this value, it is counted as a slow call. When the number of requests during the unit statistical period (statIntervalMs
) exceeds the minimum request number set, and the ratio of slow calls exceeds the threshold, the requests during the subsequent circuit-breaking duration will be automatically circuit broken. After the circuit-breaking duration, the circuit breaker will enter the probe recovery state (HALF-OPEN state). If the response time of the next request is less than the set slow call RT, the circuit-breaking ends; if it exceeds the set slow call RT, it will be circuit broken again. - Error Ratio (
ERROR_RATIO
): When the number of requests during the unit statistical period (statIntervalMs
) exceeds the minimum request number set, and the error ratio exceeds the threshold, the requests during the subsequent circuit-breaking duration will be automatically circuit broken. After the circuit-breaking duration, the circuit breaker will enter the probe recovery state (HALF-OPEN state). If the next request completes successfully (no errors), the circuit-breaking ends; otherwise, it will be circuit broken again. The error ratio threshold range is[0.0, 1.0]
, representing 0% - 100%. - Error Count (
ERROR_COUNT
): When the number of errors during the unit statistical period exceeds the threshold, it will automatically circuit break. After the circuit-breaking duration, the circuit breaker will enter the probe recovery state (HALF-OPEN state). If the next request completes successfully (no errors), the circuit-breaking ends; otherwise, it will be circuit broken again. - circuit-breaking | Sentinel (sentinelguard.io)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
If all the previous checks pass, we return to StatisticSlot.
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Now we have reached here
// Increase thread count, there will be two types: one is transient, the other is cluster.
node.increaseThreadNum();
// Increase access count
node.addPassRequest(count);
// Increase source access count
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
// Count the access times and threads of the entry node
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Callbacks for passing flow
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
// This indicates that during default processing, waiting a while allows entry, still passing
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Similarly count the access times and threads of the entry node
Constants.ENTRY_NODE.increaseThreadNum();
}
// Callbacks for passing flow
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Callbacks for not passing flow
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
3. Extension Points#
Sentinel has many extension points.
3.1 Initialization Process Extension InitExecutor#
Execution timing: the first call to entry, or not in Eager mode.
public static void doInit() {
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
// Find all InitFunc
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : initFuncs) {
RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();
RecordLog.info("[InitExecutor] Executing {} with order {}",
w.func.getClass().getCanonicalName(), w.order);
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
By default, it will find the initialization of clients and servers in cluster mode as well as the initialization of metric callback functions.
public class MetricCallbackInit implements InitFunc {
@Override
public void init() throws Exception {
StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
new MetricEntryCallback());
StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
new MetricExitCallback());
}
}
3.2 Slot/Slot Chain Extension#
Calling timing: when no matching slot chain is found for the resource.
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
/**
* The load and pick process is not thread-safe, but it's okay since the method should be only invoked
* via {@code lookProcessChain} in {@link com.alibaba.csp.sentinel.CtSph} under lock.
*
* @return new created slot chain
*/
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
private SlotChainProvider() {}
}
3.3 Transport Extension#
This is actually the interface exposed by the client, which also exposes some interfaces by default for checking client status.
First, there is an API center responsible for receiving external information.
public class SimpleHttpCommandCenter implements CommandCenter {
private static final int PORT_UNINITIALIZED = -1;
private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
private static final int DEFAULT_PORT = 8719;
@SuppressWarnings("rawtypes")
private static final Map<String, CommandHandler> handlerMap = new ConcurrentHashMap<String, CommandHandler>();
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private ExecutorService executor = Executors.newSingleThreadExecutor(
new NamedThreadFactory("sentinel-command-center-executor", true));
private ExecutorService bizExecutor;
private ServerSocket socketReference;
@Override
@SuppressWarnings("rawtypes")
public void beforeStart() throws Exception {
// Register handlers
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers);
}
@Override
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor", true),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
/**
* Get a server socket from an available port from a base port.<br>
* Increasing on port number will occur when the port has already been used.
*
* @param basePort base port to start
* @return new socket with available port
*/
private static ServerSocket getServerSocketFromBasePort(int basePort) {
int tryCount = 0;
while (true) {
try {
ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
server.setReuseAddress(true);
return server;
} catch (IOException e) {
tryCount++;
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e1) {
break;
}
}
}
return null;
}
@Override
public void stop() throws Exception {
if (socketReference != null) {
try {
socketReference.close();
} catch (IOException e) {
CommandCenterLog.warn("Error when releasing the server socket", e);
}
}
if (bizExecutor != null) {
bizExecutor.shutdownNow();
}
executor.shutdownNow();
TransportConfig.setRuntimePort(PORT_UNINITIALIZED);
handlerMap.clear();
}
/**
* Get the name set of all registered commands.
*/
public static Set<String> getCommands() {
return handlerMap.keySet();
}
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
setName("sentinel-courier-server-accept-thread");
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
}
}
After receiving, find the matching handler.
public void run() {
if (socket == null) {
return;
}
PrintWriter printWriter = null;
InputStream inputStream = null;
try {
long start = System.currentTimeMillis();
inputStream = new BufferedInputStream(socket.getInputStream());
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String firstLine = readLine(inputStream);
CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
+ ", addr: " + socket.getInetAddress());
CommandRequest request = processQueryString(firstLine);
if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
// Deal with post method
processPostRequest(inputStream, request);
}
// Validate the target command.
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
return;
}
// Find the matching command handler.
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter);
} else {
// No matching command handler.
writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
}
long cost = System.currentTimeMillis() - start;
CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
+ ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
} catch (RequestException e) {
writeResponse(printWriter, e.getStatusCode(), e.getMessage());
} catch (Throwable e) {
CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
try {
if (printWriter != null) {
String errorMessage = SERVER_ERROR_MESSAGE;
e.printStackTrace();
if (!writtenHead) {
writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
} else {
printWriter.println(errorMessage);
}
printWriter.flush();
}
} catch (Exception e1) {
CommandCenterLog.warn("Failed to write error response", e1);
}
} finally {
closeResource(inputStream);
closeResource(printWriter);
closeResource(socket);
}
}
By default, some handlers are defined.
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerFlowConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerTransportConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyServerNamespaceSetHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerInfoCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyClusterClientConfigHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterClientConfigHandler
3.4 Cluster Flow Control Extension#
Cluster Flow Control · alibaba/Sentinel Wiki (github.com)
This article was synchronized to xLog by Mix Space. The original link is https://me.liuyaowen.club/posts/default/springcloud-sentinel