刘耀文

刘耀文

java开发者
github

スプリングクラウド センティネル

1. ソースコードの解読は 3 つの部分に分かれ、初期化と実行プロセス、および拡張ポイント#

1.1 sentinel 自身の初期化#

sentinel は springboot 起動時に何を行ったのか?
依存関係を導入した後、spring boot にどのような副作用があったのか

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
    <version>2023.0.1.0</version>
</dependency>

依存関係には何が含まれているのか見てみましょう。

image-20240709114212024

たくさんありますが、いくつかのクラシックな部分が見えます。まずは本体の spring-cloud-starter-alibaba-sentinel の内容を見てみましょう。一般的には 3 つの部分を見ます。1 つは自動構成クラス、2 つ目は SPI インターフェース、3 つ目は springboot の拡張ポイントである spring.factories です。

まずは何があるのか見てみましょう。

image-20240709114953153

自動構成クラスがインポートされているだけで、内部でどの構成クラスがインポートされているのか見てみましょう。

com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration

最初の 2 つは springweb フレームワークへの適合、3 つ目は sentinel が外部に提供するアクセスポート、4 つ目は sentinel の初期化とカスタマイズ、5 つ目は feign のサポートです。1 つずつ見ていきましょう。まずは sentinel 自身の初期化を見てみましょう。

1.2 sentinel 自身の初期化、属性の初期化、データソースの初期化、アスペクトの初期化、restTemplate の初期化。この部分は mvc を使用していない場合に spring プロジェクトに適合しました。#

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelAutoConfiguration {

	@Value("${project.name:${spring.application.name:}}")
	private String projectName;

	@Autowired
	private SentinelProperties properties;
    // 属性の初期化
	@PostConstruct
	public void init() {
		if (StringUtils.isEmpty(System.getProperty(LogBase.LOG_DIR))
				&& StringUtils.isNotBlank(properties.getLog().getDir())) {
            // ログ
			System.setProperty(LogBase.LOG_DIR, properties.getLog().getDir());
		}
		if (StringUtils.isEmpty(System.getProperty(LogBase.LOG_NAME_USE_PID))
				&& properties.getLog().isSwitchPid()) {
            // ログ
			System.setProperty(LogBase.LOG_NAME_USE_PID,
					String.valueOf(properties.getLog().isSwitchPid()));
		}
		if (StringUtils.isEmpty(System.getProperty(SentinelConfig.APP_NAME_PROP_KEY))
				&& StringUtils.isNotBlank(projectName)) {
            // プロジェクト名またはspringアプリケーション名を設定します。一般的にはアプリ名${spring.application.name:}
			System.setProperty(SentinelConfig.APP_NAME_PROP_KEY, projectName);
		}
		if (StringUtils.isEmpty(System.getProperty(TransportConfig.SERVER_PORT))
				&& StringUtils.isNotBlank(properties.getTransport().getPort())) {
             // sentinelの外部制御ポート、dashboardはこのポートを使ってアクセスされます。デフォルトはpublic static final String API_PORT = "8719";
			System.setProperty(TransportConfig.SERVER_PORT,
					properties.getTransport().getPort());
		}
		if (StringUtils.isEmpty(System.getProperty(TransportConfig.CONSOLE_SERVER))
				&& StringUtils.isNotBlank(properties.getTransport().getDashboard())) {
           // dashboardのポートアドレス	
			System.setProperty(TransportConfig.CONSOLE_SERVER,
					properties.getTransport().getDashboard());
		}
		if (StringUtils.isEmpty(System.getProperty(TransportConfig.HEARTBEAT_INTERVAL_MS))
				&& StringUtils
						.isNotBlank(properties.getTransport().getHeartbeatIntervalMs())) {
            // ハートビートパケットの時間間隔、デフォルトはprivate static final long DEFAULT_INTERVAL = 1000 * 10;
			System.setProperty(TransportConfig.HEARTBEAT_INTERVAL_MS,
					properties.getTransport().getHeartbeatIntervalMs());
		}
		if (StringUtils.isEmpty(System.getProperty(TransportConfig.HEARTBEAT_CLIENT_IP))
				&& StringUtils.isNotBlank(properties.getTransport().getClientIp())) {
            // ハートビートパケットのクライアントip、デフォルトはローカルip
			System.setProperty(TransportConfig.HEARTBEAT_CLIENT_IP,
					properties.getTransport().getClientIp());
		}
		if (StringUtils.isEmpty(System.getProperty(SentinelConfig.CHARSET))
				&& StringUtils.isNotBlank(properties.getMetric().getCharset())) {
			System.setProperty(SentinelConfig.CHARSET,
					properties.getMetric().getCharset());
		}
		if (StringUtils
				.isEmpty(System.getProperty(SentinelConfig.SINGLE_METRIC_FILE_SIZE))
				&& StringUtils.isNotBlank(properties.getMetric().getFileSingleSize())) {
			System.setProperty(SentinelConfig.SINGLE_METRIC_FILE_SIZE,
					properties.getMetric().getFileSingleSize());
		}
		if (StringUtils
				.isEmpty(System.getProperty(SentinelConfig.TOTAL_METRIC_FILE_COUNT))
				&& StringUtils.isNotBlank(properties.getMetric().getFileTotalCount())) {
			System.setProperty(SentinelConfig.TOTAL_METRIC_FILE_COUNT,
					properties.getMetric().getFileTotalCount());
		}
		if (StringUtils.isEmpty(System.getProperty(SentinelConfig.COLD_FACTOR))
				&& StringUtils.isNotBlank(properties.getFlow().getColdFactor())) {
			System.setProperty(SentinelConfig.COLD_FACTOR,
					properties.getFlow().getColdFactor());
		}
		if (StringUtils.isNotBlank(properties.getBlockPage())) {
			setConfig(BLOCK_PAGE_URL_CONF_KEY, properties.getBlockPage());
		}

		// 早期初期化
        // 最初から初期化するかどうか、デフォルトはfalseで、最初の呼び出し時に初期化されます
		if (properties.isEager()) {
			InitExecutor.doInit();
		}

	}
	
    // これはSentinelResource注釈をサポートするアスペクトクラスの初期化です
	@Bean
	@ConditionalOnMissingBean
	public SentinelResourceAspect sentinelResourceAspect() {
		return new SentinelResourceAspect();
	}
	
    // SentinelRestTemplateの初期化、後処理器を初期化し、インターセプターを追加します
	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnClass(name = "org.springframework.web.client.RestTemplate")
	@ConditionalOnProperty(name = "resttemplate.sentinel.enabled", havingValue = "true",
			matchIfMissing = true)
	public static SentinelBeanPostProcessor sentinelBeanPostProcessor(
			ApplicationContext applicationContext) {
		return new SentinelBeanPostProcessor(applicationContext);
	}
	// 外部属性ソースの処理、初期化
    // すべてのシングルトンbeanの初期化後
        /*
        public void postRegister(AbstractDataSource dataSource) {
        switch (this.getRuleType()) {
            case FLOW -> FlowRuleManager.register2Property(dataSource.getProperty());
            case DEGRADE -> DegradeRuleManager.register2Property(dataSource.getProperty());
            case PARAM_FLOW -> ParamFlowRuleManager.register2Property(dataSource.getProperty());
            case SYSTEM -> SystemRuleManager.register2Property(dataSource.getProperty());
            case AUTHORITY -> AuthorityRuleManager.register2Property(dataSource.getProperty());
            case GW_FLOW -> GatewayRuleManager.register2Property(dataSource.getProperty());
            case GW_API_GROUP -> GatewayApiDefinitionManager.register2Property(dataSource.getProperty());
        }

    }
    */
	@Bean
	@ConditionalOnMissingBean
	public SentinelDataSourceHandler sentinelDataSourceHandler(
			DefaultListableBeanFactory beanFactory, SentinelProperties sentinelProperties,
			Environment env) {
		return new SentinelDataSourceHandler(beanFactory, sentinelProperties, env);
	}
	
    // 一部の変換器、例えば外部属性ソースを設定する際に変換器を設定します
	@ConditionalOnClass(ObjectMapper.class)
	@Configuration(proxyBeanMethods = false)
	protected static class SentinelConverterConfiguration {
		
        // json
		@Configuration(proxyBeanMethods = false)
		protected static class SentinelJsonConfiguration {

			private ObjectMapper objectMapper = new ObjectMapper();

			public SentinelJsonConfiguration() {
				objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
						false);
			}

			@Bean("sentinel-json-flow-converter")
			public JsonConverter jsonFlowConverter() {
				return new JsonConverter(objectMapper, FlowRule.class);
			}

			@Bean("sentinel-json-degrade-converter")
			public JsonConverter jsonDegradeConverter() {
				return new JsonConverter(objectMapper, DegradeRule.class);
			}

			@Bean("sentinel-json-system-converter")
			public JsonConverter jsonSystemConverter() {
				return new JsonConverter(objectMapper, SystemRule.class);
			}

			@Bean("sentinel-json-authority-converter")
			public JsonConverter jsonAuthorityConverter() {
				return new JsonConverter(objectMapper, AuthorityRule.class);
			}

			@Bean("sentinel-json-param-flow-converter")
			public JsonConverter jsonParamFlowConverter() {
				return new JsonConverter(objectMapper, ParamFlowRule.class);
			}

		}
		// xml
		@ConditionalOnClass(XmlMapper.class)
		@Configuration(proxyBeanMethods = false)
		protected static class SentinelXmlConfiguration {

			private XmlMapper xmlMapper = new XmlMapper();

			public SentinelXmlConfiguration() {
				xmlMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
						false);
			}

			@Bean("sentinel-xml-flow-converter")
			public XmlConverter xmlFlowConverter() {
				return new XmlConverter(xmlMapper, FlowRule.class);
			}

			@Bean("sentinel-xml-degrade-converter")
			public XmlConverter xmlDegradeConverter() {
				return new XmlConverter(xmlMapper, DegradeRule.class);
			}

			@Bean("sentinel-xml-system-converter")
			public XmlConverter xmlSystemConverter() {
				return new XmlConverter(xmlMapper, SystemRule.class);
			}

			@Bean("sentinel-xml-authority-converter")
			public XmlConverter xmlAuthorityConverter() {
				return new XmlConverter(xmlMapper, AuthorityRule.class);
			}

			@Bean("sentinel-xml-param-flow-converter")
			public XmlConverter xmlParamFlowConverter() {
				return new XmlConverter(xmlMapper, ParamFlowRule.class);
			}

		}

	}

}

1.3 springMVC の適合初期化#

@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = Type.SERVLET)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@ConditionalOnClass(SentinelWebInterceptor.class)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelWebAutoConfiguration implements WebMvcConfigurer {

    private static final Logger log = LoggerFactory
          .getLogger(SentinelWebAutoConfiguration.class);

    @Autowired
    private SentinelProperties properties;

    @Autowired
    private Optional<UrlCleaner> urlCleanerOptional;

    @Autowired
    private Optional<BlockExceptionHandler> blockExceptionHandlerOptional;

    @Autowired
    private Optional<RequestOriginParser> requestOriginParserOptional;

    // ここでグローバルインターセプターを初期化します
    @Bean
    @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
          matchIfMissing = true)
    public SentinelWebInterceptor sentinelWebInterceptor(
          SentinelWebMvcConfig sentinelWebMvcConfig) {
       return new SentinelWebInterceptor(sentinelWebMvcConfig);
    }
	// 上記インターセプターのいくつかの設定
    @Bean
    @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
          matchIfMissing = true)
    public SentinelWebMvcConfig sentinelWebMvcConfig() {
       SentinelWebMvcConfig sentinelWebMvcConfig = new SentinelWebMvcConfig();
       // リクエストメソッドをリソース名に追加するかどうか
       sentinelWebMvcConfig.setHttpMethodSpecify(properties.getHttpMethodSpecify());
       sentinelWebMvcConfig.setWebContextUnify(properties.getWebContextUnify());
	   // 限流後の例外処理
       if (blockExceptionHandlerOptional.isPresent()) {
          blockExceptionHandlerOptional
                .ifPresent(sentinelWebMvcConfig::setBlockExceptionHandler);
       }
       else {
          if (StringUtils.hasText(properties.getBlockPage())) {
             sentinelWebMvcConfig.setBlockExceptionHandler(((request, response,
                   e) -> response.sendRedirect(properties.getBlockPage())));
          }
          else {
              // 限流後のデフォルトの例外処理
             sentinelWebMvcConfig
                   .setBlockExceptionHandler(new DefaultBlockExceptionHandler());
          }
       }

       urlCleanerOptional.ifPresent(sentinelWebMvcConfig::setUrlCleaner);
        // ソース名の解析
       requestOriginParserOptional.ifPresent(sentinelWebMvcConfig::setOriginParser);
       return sentinelWebMvcConfig;
    }
	// インターセプターの登録
    @Bean
    @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
          matchIfMissing = true)
    public SentinelWebMvcConfigurer sentinelWebMvcConfigurer() {
       return new SentinelWebMvcConfigurer();
    }

}

2 実行プロセス#

2.1 springMVC インターセプターの実行ロジック#

インターセプターには一般的に 2 つのメソッドがあります。1 つはリクエスト前のメソッド、もう 1 つはリクエスト後のメソッドです。まずはリクエスト前のメソッドを見てみましょう。

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
        throws Exception {
        try {
            // まずリソース名を取得します
            String resourceName = getResourceName(request);

            if (StringUtil.isEmpty(resourceName)) {
                return true;
            }
            // requests属性に$$sentinel_spring_web_entry_attr-rcがあれば、カウント後に放行します
            if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
                return true;
            }
            
            // 登録されたオリジンパーサーを使用してリクエストオリジンを解析します。
            // HTTPから生成されたオリジン、デフォルトは空です。StatisticSlotで使用されます
            String origin = parseOrigin(request);
            // 監視コンテナの名前を取得します。ここではデフォルトでsentinel_spring_web_contextです
            String contextName = getContextName(request);
            
        	// 重要なコード、呼び出しコンテキストを初期化します
            ContextUtil.enter(contextName, origin);
            // リソースに入ります。slotプラグインモジュールに入ります。作成されます
            
            
            Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
            request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
            return true;
        } catch (BlockException e) {
            try {
                handleBlockException(request, response, e);
            } finally {
                ContextUtil.exit();
            }
            return false;
        }
    }
// slotプラグインモジュールの作成プロセス
public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }

        // スロットチェーンビルダーSPIを解決します。
    	// SentinelデフォルトProcessorSlots
        //com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
        //com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
        //com.alibaba.csp.sentinel.slots.logger.LogSlot
        //com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
        //com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
        //com.alibaba.csp.sentinel.slots.system.SystemSlot
    	//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
        //com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
        //com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
    	// すべてのデフォルトスロット、実行順序も上記の順序です
        slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

        if (slotChainBuilder == null) {
            // ここを通過するべきではありません。
            RecordLog.warn("[SlotChainProvider] スロットチェーンビルダーを解決する際の状態が不正です。デフォルトを使用します");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] グローバルスロットチェーンビルダーが解決されました: {}",
                slotChainBuilder.getClass().getCanonicalName());
        }
        return slotChainBuilder.build();
    }

2.2 chain.entry (context, resourceWrapper, null, count, prioritized, args) に入って実際の実行プロセスを見てみましょう#

まずは slot インターフェースを見て、理解を深めましょう。

public interface ProcessorSlot<T> {
	
	// このslotの実行ロジック
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
               Object... args) throws Throwable;
	// このslotの実行が完了した後、何をする必要がありますか? 抽象クラスの実装は、パラメータをキャストして次のslotに入ることです。彼らの関係は単方向の連結リストです
    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
                   Object... args) throws Throwable;
	// 同様に
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

抽象クラスを見てみましょう。


public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
	
	// 次に実行するslotを保存します
    private AbstractLinkedProcessorSlot<?> next = null;
	
	// 抽象クラスの実装は、パラメータをキャストして次のslotに入ることです。彼らの関係は単方向のリストです
    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

    @SuppressWarnings("unchecked")
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
        throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        if (next != null) {
            next.exit(context, resourceWrapper, count, args);
        }
    }

    public AbstractLinkedProcessorSlot<?> getNext() {
        return next;
    }

    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        this.next = next;
    }

}

さて、呼び出し slot のプロセスに入ります。上記の順序に従って 1 つずつ見ていきましょう。#

// スロットチェーンビルダーSPIを解決します。
// SentinelデフォルトProcessorSlots
//com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
//com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
//com.alibaba.csp.sentinel.slots.logger.LogSlot
//com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
//com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
//com.alibaba.csp.sentinel.slots.system.SystemSlot
//com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
//com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
//com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
// すべてのデフォルトスロット、実行順序も上記の順序です

NodeSelectorSlot、リソースノードセレクター#

まず明確にしておくべきことは、ノードは slotchain にバインドされていることです。各ユニークリソースにはユニークな slotchain があり、slotchain にはノード(defaultNode)が保存されており、異なる呼び出しコンテキストの異なるノード(defaultNode)も保存されています。公式サイトに解説があります。

image-20240709141403982

リソースにはユニークな slotchain があります。

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // エントリーサイズ制限。
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

NodeSelectorSlot がどのように実装されているか見てみましょう。

DefaultNode node = map.get(context.getName());
// context.getName()呼び出しコンテキストの名前をキーにしてノードを取得します。つまり、1つのリソースには複数のノードがありますが、実際には1回だけ統計されます。次のClusterBuilderSlotの呼び出しで見ることができます。
if (node == null) {
    synchronized (this) {
        node = map.get(context.getName());
        if (node == null) {
            node = new DefaultNode(resourceWrapper, null);
            HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
            cacheMap.putAll(map);
            cacheMap.put(context.getName(), node);
            map = cacheMap;
            // 呼び出しツリーを構築します
            // 呼び出しチェーンを設定します。異なるリソースにネストして入る可能性があるため、
            // contextはエントリノードと呼び出しチェーン(双方向リスト)を保持しており、現在呼び出しているノードを前のノードのCTNodeの子に追加します。もちろん、最初に入る場合はエントリノードに追加します。
            ((DefaultNode) context.getLastNode()).addChild(node);
        }

    }
}
// 現在の呼び出しを設定し、現在の呼び出しCTNODEのCurNodeに設定します。2回追加されます。1回は上位呼び出しコンテキストに追加され、もう1回は現在の呼び出しに追加されます。
context.setCurNode(node);

ClusterBuilderSlot、リソース統計ノードの構築、つまり ClusterNode#

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args)
    throws Throwable {
    // まず現在のslotchainがあるかどうかを確認します
    if (clusterNode == null) {
        synchronized (lock) {
            if (clusterNode == null) {
                // クラスタノードを作成します。
                clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                newMap.putAll(clusterNodeMap);
                newMap.put(node.getId(), clusterNode);

                clusterNodeMap = newMap;
            }
        }
    }
    // 各slotchainは1つのclusterNodeを生成し、異なるcontextnameのノードに共有されます
    node.setClusterNode(clusterNode);

    /*
     * context originが設定されている場合、特定のオリジンの新しい{@link Node}を取得または作成する必要があります。
     */
    // オリジンを設定します。デフォルトではありません。以前にあった場合、clusterNodeに追加のmapを維持します。

    if (!"".equals(context.getOrigin())) {
        Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
        context.getCurEntry().setOriginNode(originNode);
    }

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

ok、ここでのロジックはシンプルです。clusterNode を作成して情報を統計します。

次に LogSlot に入ります#

ここでのロジックも比較的シンプルで、BlockException を記録します。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
    throws Throwable {
    try {
        fireEntry(context, resourceWrapper, obj, count, prioritized, args);
    } catch (BlockException e) {
        EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
            context.getOrigin(), e.getRule().getId(), count);
        throw e;
    } catch (Throwable e) {
        RecordLog.warn("予期しないエントリー例外", e);
    }

}

ok、コアの StatisticSlot に到達しました#

まず、入ることができるかどうかを判断します。ここでは AuthoritySlot、SystemSlot、ParamFlowSlot、FlowSlot を実行しますので、他のいくつかの slot のロジックを見てみましょう。ここではこのセクションをスキップします。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
        // まず入ることができるかどうかを判断します。ここではAuthoritySlot、SystemSlot、ParamFlowSlot、FlowSlotを実行します
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        // リクエストが通過した場合、スレッド数と通過数を追加します。
        node.increaseThreadNum();
        node.addPassRequest(count);

        if (context.getCurEntry().getOriginNode() != null) {
            // オリジンノードのカウントを追加します。
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // グローバルインバウンドエントリノードのカウントを追加します。グローバル統計用。
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        // 登録されたエントリーコールバックハンドラで通過イベントを処理します。
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {
            // オリジンノードのカウントを追加します。
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // グローバルインバウンドエントリノードのカウントを追加します。グローバル統計用。
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        // 通過イベントを処理します。
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException e) {
        // ブロックされました。現在のエントリーにブロック例外を設定します。
        context.getCurEntry().setBlockError(e);

        // ブロックカウントを追加します。
        node.increaseBlockQps(count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseBlockQps(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // グローバルインバウンドエントリノードのカウントを追加します。グローバル統計用。
            Constants.ENTRY_NODE.increaseBlockQps(count);
        }

        // ブロックイベントを処理します。
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onBlocked(e, context, resourceWrapper, node, count, args);
        }

        throw e;
    } catch (Throwable e) {
        // 予期しない内部エラー、現在のエントリーにエラーを設定します。
        context.getCurEntry().setError(e);

        throw e;
    }
}

AuthoritySlot#

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
    throws Throwable {
    // 認証情報をチェックします
    checkBlackWhiteAuthority(resourceWrapper, context);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
// 設定された認証ルールを取得して通過できるかどうかを確認します
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
    Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

    if (authorityRules == null) {
        return;
    }

    Set<AuthorityRule> rules = authorityRules.get(resource.getName());
    if (rules == null) {
        return;
    }

    for (AuthorityRule rule : rules) {
        if (!AuthorityRuleChecker.passCheck(rule, context)) {
            throw new AuthorityException(context.getOrigin(), rule);
        }
    }
}

SystemSlot#

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    // システムの健康状態をチェックします
    // ここは見ませんが、具体的にはシステムの指標に基づいて通過するリクエストを調整します。公式サイトの説明https://sentinelguard.io/zh-cn/docs/system-adaptive-protection.html
    SystemRuleManager.checkSystem(resourceWrapper, count);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

ParamFlowSlot#

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    // ルールがあるかどうかを確認します
    if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
        return;
    }
	// パラメータの位置に基づいてアクセスを制限しますhttps://sentinelguard.io/zh-cn/docs/parameter-flow-control.html
    checkFlow(resourceWrapper, count, args);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

FlowSlot#

ParamFlowSlot と似ており、FlowRule に基づいて通過するかどうかを決定しますflow-control | Sentinel (sentinelguard.io)

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

DegradeSlot#

  • 遅い呼び出し比率 (SLOW_REQUEST_RATIO):遅い呼び出し比率を閾値として選択し、許可される遅い呼び出し RT(最大応答時間)を設定する必要があります。リクエストの応答時間がこの値を超えると、遅い呼び出しとして統計されます。単位統計期間(statIntervalMs)内のリクエスト数が設定された最小リクエスト数を超え、遅い呼び出しの比率が閾値を超えると、次の熔断期間内のリクエストは自動的に熔断されます。熔断期間後、熔断器は探査回復状態(HALF-OPEN 状態)に入り、次のリクエストの応答時間が設定された遅い呼び出し RT よりも小さい場合は熔断が終了し、設定された遅い呼び出し RT よりも大きい場合は再度熔断されます。
  • 異常比率 (ERROR_RATIO):単位統計期間(statIntervalMs)内のリクエスト数が設定された最小リクエスト数を超え、異常の比率が閾値を超えると、次の熔断期間内のリクエストは自動的に熔断されます。熔断期間後、熔断器は探査回復状態(HALF-OPEN 状態)に入り、次のリクエストが正常に完了(エラーなし)した場合は熔断が終了し、そうでない場合は再度熔断されます。異常比率の閾値範囲は [0.0, 1.0] で、0% - 100% を表します。
  • 異常数 (ERROR_COUNT):単位統計期間内の異常数が閾値を超えた場合、自動的に熔断されます。熔断期間後、熔断器は探査回復状態(HALF-OPEN 状態)に入り、次のリクエストが正常に完了(エラーなし)した場合は熔断が終了し、そうでない場合は再度熔断されます。
  • circuit-breaking | Sentinel (sentinelguard.io)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    performChecking(context, resourceWrapper);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

前のすべてが通過した後、StatisticSlot に戻ります。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
        // いくつかのチェックを行います。
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        // ここに到達しました
        // スレッド数を増やします。ここには瞬時のものと集団の2種類があります。
        node.increaseThreadNum();
        // アクセス回数を増やします
        node.addPassRequest(count);
		// ソースアクセス回数を増やします
        if (context.getCurEntry().getOriginNode() != null) {
            // オリジンノードのカウントを追加します。
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }
		
        // エントリノードのアクセス回数とスレッドを統計します
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // グローバルインバウンドエントリノードのカウントを追加します。グローバル統計用。
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }
		
        // 通過した場合のコールバック関数
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        // ここではデフォルト処理時に待機し、後で入ることができる場合、やはり放行します
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {
            // オリジンノードのカウントを追加します。
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // 同様にエントリノードのアクセス回数とスレッドを統計します
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        // 通過した場合のコールバック関数
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException e) {
        // ブロックされました。現在のエントリーにブロック例外を設定します。
        context.getCurEntry().setBlockError(e);

        // ブロックカウントを追加します。
        node.increaseBlockQps(count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseBlockQps(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // グローバルインバウンドエントリノードのカウントを追加します。グローバル統計用。
            Constants.ENTRY_NODE.increaseBlockQps(count);
        }

        // 通過しなかった場合のコールバック関数
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onBlocked(e, context, resourceWrapper, node, count, args);
        }

        throw e;
    } catch (Throwable e) {
        // 予期しない内部エラー、現在のエントリーにエラーを設定します。
        context.getCurEntry().setError(e);

        throw e;
    }
}

3. 拡張ポイント#

sentinel には多くの拡張ポイントがあります。

3.1 初期化プロセスの拡張 Initexector#

実行タイミング、最初の呼び出し時 enty、または Earler モードではない場合

public static void doInit() {
    if (!initialized.compareAndSet(false, true)) {
        return;
    }
    try {
        // すべてのInitFuncを見つけます
        List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
        List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
        for (InitFunc initFunc : initFuncs) {
            RecordLog.info("[InitExecutor] 初期化関数を見つけました: {}", initFunc.getClass().getCanonicalName());
            insertSorted(initList, initFunc);
        }
        for (OrderWrapper w : initList) {
            w.func.init();
            RecordLog.info("[InitExecutor] {}を順序{}で実行しています",
                w.func.getClass().getCanonicalName(), w.order);
        }
    } catch (Exception ex) {
        RecordLog.warn("[InitExecutor] 警告: 初期化に失敗しました", ex);
        ex.printStackTrace();
    } catch (Error error) {
        RecordLog.warn("[InitExecutor] エラー: 致命的なエラーで初期化に失敗しました", error);
        error.printStackTrace();
    }
}

デフォルトでは、クラスター モードでのクライアントとサーバーの初期化、およびメトリック コールバック関数の初期化が見つかります。

public class MetricCallbackInit implements InitFunc {
    @Override
    public void init() throws Exception {
        StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
            new MetricEntryCallback());
        StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
            new MetricExitCallback());
    }
}

3.2 Slot/Slot Chain の拡張#

呼び出しタイミング:リソースに一致する slotchain が見つからなかった場合

public final class SlotChainProvider {

    private static volatile SlotChainBuilder slotChainBuilder = null;

    /**
     * ロードとピックプロセスはスレッドセーフではありませんが、問題ありません。なぜなら、このメソッドはロックの下で{@code lookProcessChain}を介してのみ呼び出されるべきだからです。
     *
     * @return 新しく作成されたスロットチェーン
     */
    public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }

        // スロットチェーンビルダーSPIを解決します。
        slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

        if (slotChainBuilder == null) {
            // ここを通過するべきではありません。
            RecordLog.warn("[SlotChainProvider] スロットチェーンビルダーを解決する際の状態が不正です。デフォルトを使用します");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] グローバルスロットチェーンビルダーが解決されました: {}",
                slotChainBuilder.getClass().getCanonicalName());
        }
        return slotChainBuilder.build();
    }

    private SlotChainProvider() {}
}

3.3 Transport の拡張#

実際には、クライアントが外部に公開するインターフェースであり、デフォルトでもいくつかのインターフェースが公開され、クライアントの状況を確認しやすくなります。

まず、外部情報を受け取る API センターがあります。


public class SimpleHttpCommandCenter implements CommandCenter {

    private static final int PORT_UNINITIALIZED = -1;

    private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
    private static final int DEFAULT_PORT = 8719;

    @SuppressWarnings("rawtypes")
    private static final Map<String, CommandHandler> handlerMap = new ConcurrentHashMap<String, CommandHandler>();

    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private ExecutorService executor = Executors.newSingleThreadExecutor(
        new NamedThreadFactory("sentinel-command-center-executor", true));
    private ExecutorService bizExecutor;

    private ServerSocket socketReference;

    @Override
    @SuppressWarnings("rawtypes")
    public void beforeStart() throws Exception {
        // ハンドラーを登録します
        Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
        registerCommands(handlers);
    }

    @Override
    public void start() throws Exception {
        int nThreads = Runtime.getRuntime().availableProcessors();
        this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(10),
            new NamedThreadFactory("sentinel-command-center-service-executor", true),
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    CommandCenterLog.info("EventTask rejected");
                    throw new RejectedExecutionException();
                }
            });

        Runnable serverInitTask = new Runnable() {
            int port;

            {
                try {
                    port = Integer.parseInt(TransportConfig.getPort());
                } catch (Exception e) {
                    port = DEFAULT_PORT;
                }
            }

            @Override
            public void run() {
                boolean success = false;
                ServerSocket serverSocket = getServerSocketFromBasePort(port);

                if (serverSocket != null) {
                    CommandCenterLog.info("[CommandCenter] ポート" + serverSocket.getLocalPort()でリスニングを開始します);
                    socketReference = serverSocket;
                    executor.submit(new ServerThread(serverSocket));
                    success = true;
                    port = serverSocket.getLocalPort();
                } else {
                    CommandCenterLog.info("[CommandCenter] ポートの選択に失敗しました。httpコマンドセンターは機能しません");
                }

                if (!success) {
                    port = PORT_UNINITIALIZED;
                }

                TransportConfig.setRuntimePort(port);
                executor.shutdown();
            }

        };

        new Thread(serverInitTask).start();
    }

    /**
     * 利用可能なポートから新しいソケットを取得します。<br>
     * ポートがすでに使用されている場合、ポート番号が増加します。
     *
     * @param basePort 基本ポートを開始します
     * @return 利用可能なポートの新しいソケット
     */
    private static ServerSocket getServerSocketFromBasePort(int basePort) {
        int tryCount = 0;
        while (true) {
            try {
                ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
                server.setReuseAddress(true);
                return server;
            } catch (IOException e) {
                tryCount++;
                try {
                    TimeUnit.MILLISECONDS.sleep(30);
                } catch (InterruptedException e1) {
                    break;
                }
            }
        }
        return null;
    }

    @Override
    public void stop() throws Exception {
        if (socketReference != null) {
            try {
                socketReference.close();
            } catch (IOException e) {
                CommandCenterLog.warn("サーバーソケットを解放する際のエラー", e);
            }
        }

        if (bizExecutor != null) {
            bizExecutor.shutdownNow();
        }
        executor.shutdownNow();
        TransportConfig.setRuntimePort(PORT_UNINITIALIZED);
        handlerMap.clear();
    }

    /**
     * 登録されたコマンドの名前セットを取得します。
     */
    public static Set<String> getCommands() {
        return handlerMap.keySet();
    }

    class ServerThread extends Thread {

        private ServerSocket serverSocket;

        ServerThread(ServerSocket s) {
            this.serverSocket = s;
            setName("sentinel-courier-server-accept-thread");
        }

        @Override
        public void run() {
            while (true) {
                Socket socket = null;
                try {
                    socket = this.serverSocket.accept();
                    setSocketSoTimeout(socket);
                    HttpEventTask eventTask = new HttpEventTask(socket);
                    bizExecutor.submit(eventTask);
                } catch (Exception e) {
                    CommandCenterLog.info("サーバーエラー", e);
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (Exception e1) {
                            CommandCenterLog.info("オープンソケットを閉じる際のエラー", e1);
                        }
                    }
                    try {
                        // 無限ログを防ぐために。
                        Thread.sleep(10);
                    } catch (InterruptedException e1) {
                        // タスクを停止する必要があることを示します。
                        break;
                    }
                }
            }
        }
    }

}

受信後、適切なハンドラーを見つけます。

public void run() {
    if (socket == null) {
        return;
    }

    PrintWriter printWriter = null;
    InputStream inputStream = null;
    try {
        long start = System.currentTimeMillis();
        inputStream = new BufferedInputStream(socket.getInputStream());
        OutputStream outputStream = socket.getOutputStream();

        printWriter = new PrintWriter(
            new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));

        String firstLine = readLine(inputStream);
        CommandCenterLog.info("[SimpleHttpCommandCenter] ソケットの受信: " + firstLine
            + ", アドレス: " + socket.getInetAddress());
        CommandRequest request = processQueryString(firstLine);

        if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
            // POSTメソッドを処理します
            processPostRequest(inputStream, request);
        }

        // 対象コマンドを検証します。
        String commandName = HttpCommandUtils.getTarget(request);
        if (StringUtil.isBlank(commandName)) {
            writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
            return;
        }

        // 一致するコマンドハンドラーを見つけます。
        CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
        if (commandHandler != null) {
            CommandResponse<?> response = commandHandler.handle(request);
            handleResponse(response, printWriter);
        } else {
            // 一致するコマンドハンドラーがありません。
            writeResponse(printWriter, StatusCode.BAD_REQUEST, "不明なコマンド `" + commandName + '`');
        }

        long cost = System.currentTimeMillis() - start;
        CommandCenterLog.info("[SimpleHttpCommandCenter] ソケットタスクを処理しました: " + firstLine
            + ", アドレス: " + socket.getInetAddress() + ", 時間コスト: " + cost + " ms");
    } catch (RequestException e) {
        writeResponse(printWriter, e.getStatusCode(), e.getMessage());
    } catch (Throwable e) {
        CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenterエラー", e);
        try {
            if (printWriter != null) {
                String errorMessage = SERVER_ERROR_MESSAGE;
                e.printStackTrace();
                if (!writtenHead) {
                    writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
                } else {
                    printWriter.println(errorMessage);
                }
                printWriter.flush();
            }
        } catch (Exception e1) {
            CommandCenterLog.warn("エラーレスポンスの書き込みに失敗しました", e1);
        }
    } finally {
        closeResource(inputStream);
        closeResource(printWriter);
        closeResource(socket);
    }
}

デフォルトでは、いくつかのハンドルが定義されます。

com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerFlowConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerTransportConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyServerNamespaceSetHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerInfoCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterMetricCommandHandler

com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler

com.alibaba.csp.sentinel.command.handler.ModifyClusterClientConfigHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterClientConfigHandler

3.4 クラスタ流量制御の拡張#

クラスタ流量制御・alibaba/Sentinel Wiki (github.com)

この文は Mix Space によって xLog に同期更新されました
元のリンクは https://me.liuyaowen.club/posts/default/springcloud-sentinel


読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。