当前位置: 首页 > news >正文

【源码解析】流控框架Sentinel源码解析

Sentinel简介

Sentinel是阿里开源的一款面向分布式、多语言异构化服务架构的流量治理组件。

主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。

核心概念

资源

资源是Sentinel中一个非常重要的概念,资源就是Sentinel所保护的对象。

资源可以是一段代码,又或者是一个接口,Sentinel中并没有什么强制规定,但是实际项目中一般以一个接口为一个资源,比如说一个http接口,又或者是rpc接口,它们就是资源,可以被保护。

资源是通过Sentinel的API定义的,每个资源都有一个对应的名称,比如对于一个http接口资源来说,Sentinel默认的资源名称就是请求路径。

规则

规则也是一个重要的概念,规则其实比较好理解,比如说要对一个资源进行限流,那么限流的条件就是规则,后面在限流的时候会基于这个规则来判定是否需要限流。

Sentinel的规则分为流量控制规则、熔断降级规则以及系统保护规则,不同的规则实现的效果不一样。

快速入门

引入依赖

       <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId><version>2021.1</version></dependency>

设置流控规则

@Configuration
public class FLowRulesConfig {/*** 定义限流规则*/@PostConstructpublic void initFLowRules() {//1.创建限流规则List<FlowRule> rules = new ArrayList<>();FlowRule rule = new FlowRule();// 定义资源,表示Sentinel会对哪个资源生效rule.setResource("hello");// 定义限流规则类型,QPS限流类型rule.setGrade(RuleConstant.FLOW_GRADE_QPS);// 定义QPS每秒能通过的请求个数rule.setCount(2);rules.add(rule);// 2、加载限流规则FlowRuleManager.loadRules(rules);}
}

编写Controller,进行测试

@Slf4j
@RestController
public class HelloController {@GetMapping("/hello")public String startHello() {// 进行限流try (Entry entry = SphU.entry("hello")) { // 加载限流规则,若果存在才会往下执行// 被保护的资源return "hello Sentinel";} catch (BlockException e) {e.printStackTrace();// 被限流或者被降级的操作return "系统繁忙";}}
}

源码拆解

CtSph

核心是Entry entry = SphU.entry("hello")

CtSph#entryWithPriority(),获取Context,如果不存在,则使用默认的。构造责任链,执行。

    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {// The {@link NullContext} indicates that the amount of context has exceeded the threshold,// so here init the entry only. No rule checking will be done.return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ProcessorSlotChain chain = chainMap.get(resourceWrapper);if (chain == null) {synchronized (LOCK) {chain = chainMap.get(resourceWrapper);if (chain == null) {// Entry size limit.if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {return null;}chain = SlotChainProvider.newSlotChain();Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);newMap.putAll(chainMap);newMap.put(resourceWrapper, chain);chainMap = newMap;}}}return chain;}

ContextUtil静态方法,会初始化一个默认的EntranceNode

    static {// Cache the entrance node for default context.initDefaultContext();}private static void initDefaultContext() {String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);Constants.ROOT.addChild(node);contextNameNodeMap.put(defaultContextName, node);}

ContextUtil

ContextUtil#enter(),当执行该方法的时候,会往线程变量ThreadLocal存入当前的Context

    public static Context enter(String name, String origin) {if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {throw new ContextNameDefineException("The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");}return trueEnter(name, origin);}protected static Context trueEnter(String name, String origin) {Context context = contextHolder.get();if (context == null) {Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;DefaultNode node = localCacheNameMap.get(name);if (node == null) {if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {LOCK.lock();try {node = contextNameNodeMap.get(name);if (node == null) {if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);// Add entrance node.Constants.ROOT.addChild(node);Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);newMap.putAll(contextNameNodeMap);newMap.put(name, node);contextNameNodeMap = newMap;}}} finally {LOCK.unlock();}}}context = new Context(node, name);context.setOrigin(origin);contextHolder.set(context);}return context;}

ProcessorSlotChain

SlotChainProvider#newSlotChain,按照默认的DefaultSlotChainBuilder生成责任链。

    public static ProcessorSlotChain newSlotChain() {if (slotChainBuilder != null) {return slotChainBuilder.build();}// Resolve the slot chain builder SPI.slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);if (slotChainBuilder == null) {// Should not go through here.RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");slotChainBuilder = new DefaultSlotChainBuilder();} else {RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "+ slotChainBuilder.getClass().getCanonicalName());}return slotChainBuilder.build();}

DefaultSlotChainBuilder#build,根据spi加载ProcessorSlot

public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();// Note: the instances of ProcessorSlot should be different, since they are not stateless.List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);for (ProcessorSlot slot : sortedSlotList) {if (!(slot instanceof AbstractLinkedProcessorSlot)) {RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");continue;}chain.addLast((AbstractLinkedProcessorSlot<?>) slot);}return chain;}
}

NodeSelectorSlot

NodeSelectorSlot,处理同一个resource,使用同一个node

@SpiOrder(-10000)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {/*** {@link DefaultNode}s of the same resource in different context.*/private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {DefaultNode node = map.get(context.getName());if (node == null) {synchronized (this) {node = map.get(context.getName());if (node == null) {node = new DefaultNode(resourceWrapper, null);HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap;// Build invocation tree((DefaultNode) context.getLastNode()).addChild(node);}}}context.setCurNode(node);fireEntry(context, resourceWrapper, node, count, prioritized, args);}
}

ClusterBuilderSlot

对于每一个 resource,会对应一个 ClusterNode 实例,如果不存在,就创建一个实例。当设置了 origin 的时候,会额外生成一个 StatisticsNode 实例,挂在 ClusterNode 上。

  • 不同入口的访问数据是DefaultNode
  • 统计所有入口访问数据之和是ClusterNode
  • 来自某个服务的访问数据是OriginNode
@SpiOrder(-9000)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();private static final Object lock = new Object();private volatile ClusterNode clusterNode = null;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args)throws Throwable {if (clusterNode == null) {synchronized (lock) {if (clusterNode == null) {// Create the cluster node.clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));newMap.putAll(clusterNodeMap);newMap.put(node.getId(), clusterNode);clusterNodeMap = newMap;}}}node.setClusterNode(clusterNode);/** if context origin is set, we should get or create a new {@link Node} of* the specific origin.*/if (!"".equals(context.getOrigin())) {Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());context.getCurEntry().setOriginNode(originNode);}fireEntry(context, resourceWrapper, node, count, prioritized, args);}
}

LogSlot

对于下面的节点如果有抛出阻塞异常,进行捕捉,并且打印日志。记录哪些接口被规则挡住了。

@SpiOrder(-8000)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)throws Throwable {try {fireEntry(context, resourceWrapper, obj, count, prioritized, args);} catch (BlockException e) {EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),context.getOrigin(), count);throw e;} catch (Throwable e) {RecordLog.warn("Unexpected entry exception", e);}}
}

StatisticSlot

负责进行数据统计,为限流降级提供数据支持的。

@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// Do some checking.fireEntry(context, resourceWrapper, node, count, prioritized, args);// Request passed, add thread count and pass count.node.increaseThreadNum();node.addPassRequest(count);if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (PriorityWaitException ex) {node.increaseThreadNum();if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (BlockException e) {// Blocked, set block exception to current entry.context.getCurEntry().setBlockError(e);// Add block count.node.increaseBlockQps(count);if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseBlockQps(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseBlockQps(count);}// Handle block event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onBlocked(e, context, resourceWrapper, node, count, args);}throw e;} catch (Throwable e) {// Unexpected internal error, set error to current entry.context.getCurEntry().setError(e);throw e;}}
}

AuthoritySlot

AuthoritySlot做权限控制,根据 origin 做黑白名单的控制:

@SpiOrder(-6000)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)throws Throwable {checkBlackWhiteAuthority(resourceWrapper, context);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();if (authorityRules == null) {return;}Set<AuthorityRule> rules = authorityRules.get(resource.getName());if (rules == null) {return;}for (AuthorityRule rule : rules) {if (!AuthorityRuleChecker.passCheck(rule, context)) {throw new AuthorityException(context.getOrigin(), rule);}}}
}

AuthorityRuleChecker#passCheck

对origin进行规则校验

final class AuthorityRuleChecker {static boolean passCheck(AuthorityRule rule, Context context) {String requester = context.getOrigin();// Empty origin or empty limitApp will pass.if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {return true;}// Do exact match with origin name.int pos = rule.getLimitApp().indexOf(requester);boolean contain = pos > -1;if (contain) {boolean exactlyMatch = false;String[] appArray = rule.getLimitApp().split(",");for (String app : appArray) {if (requester.equals(app)) {exactlyMatch = true;break;}}contain = exactlyMatch;}int strategy = rule.getStrategy();if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {return false;}if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {return false;}return true;}private AuthorityRuleChecker() {}
}

SystemSlot

根据整个系统运行的统计数据来限流的,防止当前系统负载过高。它支持入口qps、线程数、响应时间、cpu使用率、负载5个限流的维度。

@SpiOrder(-5000)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {SystemRuleManager.checkSystem(resourceWrapper);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}}

Sentinel 针对所有的入口流量,使用了一个全局的 ENTRY_NODE 进行统计,所以我们也要知道,系统保护规则是全局的,和具体的某个资源没有关系。

    public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {if (resourceWrapper == null) {return;}// Ensure the checking switch is on.if (!checkSystemStatus.get()) {return;}// for inbound traffic onlyif (resourceWrapper.getEntryType() != EntryType.IN) {return;}// total qpsdouble currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();if (currentQps > qps) {throw new SystemBlockException(resourceWrapper.getName(), "qps");}// total threadint currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();if (currentThread > maxThread) {throw new SystemBlockException(resourceWrapper.getName(), "thread");}double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();if (rt > maxRt) {throw new SystemBlockException(resourceWrapper.getName(), "rt");}// load. BBR algorithm.if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {if (!checkBbr(currentThread)) {throw new SystemBlockException(resourceWrapper.getName(), "load");}}// cpu usageif (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {throw new SystemBlockException(resourceWrapper.getName(), "cpu");}}

FlowSlot

流控的核心处理类

@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;public FlowSlot() {this(new FlowRuleChecker());}/*** Package-private for test.** @param checker flow rule checker* @since 1.6.1*/FlowSlot(FlowRuleChecker checker) {AssertUtil.notNull(checker, "flow checker should not be null");this.checker = checker;}@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}
}

FlowRuleChecker#checkFlow

public class FlowRuleChecker {public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}
}

DegradeSlot

降级策略。Sentinel支持三种熔断策略:慢调用比例、异常比例 、异常数。

Sentinel会为每个设置的规则都创建一个熔断器,熔断器有三种状态,OPEN(打开)、HALF_OPEN(半开)、CLOSED(关闭)

  • 当处于CLOSED状态时,可以访问资源,访问之后会进行慢调用比例、异常比例、异常数的统计,一旦达到了设置的阈值,就会将熔断器的状态设置为OPEN
  • 当处于OPEN状态时,会去判断是否达到了熔断时间,如果没到,拒绝访问,如果到了,那么就将状态改成HALF_OPEN,然后访问资源,访问之后会对访问结果进行判断,符合规则设置的要求,直接将熔断器设置为CLOSED,关闭熔断器,不符合则还是改为OPEN状态
  • 当处于HALF_OPEN状态时,直接拒绝访问资源
@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {performChecking(context, resourceWrapper);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void performChecking(Context context, ResourceWrapper r) throws BlockException {List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {return;}for (CircuitBreaker cb : circuitBreakers) {if (!cb.tryPass(context)) {throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}}}
}

SentinelWebInterceptor

SentinelWebInterceptor继承AbstractSentinelInterceptor,当有请求访问,会执行AbstractSentinelInterceptor#preHandle

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {try {String resourceName = this.getResourceName(request);if (StringUtil.isEmpty(resourceName)) {return true;} else if (this.increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {return true;} else {String origin = this.parseOrigin(request);String contextName = this.getContextName(request);ContextUtil.enter(contextName, origin);Entry entry = SphU.entry(resourceName, 1, EntryType.IN);request.setAttribute(this.baseWebMvcConfig.getRequestAttributeName(), entry);return true;}} catch (BlockException var12) {BlockException e = var12;try {this.handleBlockException(request, response, e);} finally {ContextUtil.exit();}return false;}}

InitFunc

SphU#entry(),执行该方法会调用Env的静态变量,Env进行初始化。

    public static Entry entry(String name) throws BlockException {return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);}

Env执行静态方法。

public class Env {public static final Sph sph = new CtSph();static {// If init fails, the process will exit.InitExecutor.doInit();}}

InitExecutor#doInit

使用 SPI 加载 InitFunc 的实现

    public static void doInit() {if (!initialized.compareAndSet(false, true)) {return;}try {ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);List<OrderWrapper> initList = new ArrayList<OrderWrapper>();for (InitFunc initFunc : loader) {RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());insertSorted(initList, initFunc);}for (OrderWrapper w : initList) {w.func.init();RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",w.func.getClass().getCanonicalName(), w.order));}} catch (Exception ex) {RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);ex.printStackTrace();} catch (Error error) {RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);error.printStackTrace();}}

HeartbeatSenderInitFunc,使用HeartbeatSenderProvider获取加载HeartbeatSender,定时执行。

@InitOrder(-1)
public class HeartbeatSenderInitFunc implements InitFunc {private ScheduledExecutorService pool = null;public HeartbeatSenderInitFunc() {}private void initSchedulerIfNeeded() {if (this.pool == null) {this.pool = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("sentinel-heartbeat-send-task", true), new DiscardOldestPolicy());}}public void init() {HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();if (sender == null) {RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded", new Object[0]);} else {this.initSchedulerIfNeeded();long interval = this.retrieveInterval(sender);this.setIntervalIfNotExists(interval);this.scheduleHeartbeatTask(sender, interval);}}private boolean isValidHeartbeatInterval(Long interval) {return interval != null && interval > 0L;}private void setIntervalIfNotExists(long interval) {SentinelConfig.setConfig("csp.sentinel.heartbeat.interval.ms", String.valueOf(interval));}long retrieveInterval(HeartbeatSender sender) {Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs();if (this.isValidHeartbeatInterval(intervalInConfig)) {RecordLog.info("[HeartbeatSenderInitFunc] Using heartbeat interval in Sentinel config property: " + intervalInConfig, new Object[0]);return intervalInConfig;} else {long senderInterval = sender.intervalMs();RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in config property or invalid, using sender default: " + senderInterval, new Object[0]);return senderInterval;}}private void scheduleHeartbeatTask(final HeartbeatSender sender, long interval) {this.pool.scheduleAtFixedRate(new Runnable() {public void run() {try {sender.sendHeartbeat();} catch (Throwable var2) {RecordLog.warn("[HeartbeatSender] Send heartbeat error", var2);}}}, 5000L, interval, TimeUnit.MILLISECONDS);RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: " + sender.getClass().getCanonicalName(), new Object[0]);}
}

HeartbeatSenderProvider静态方法块进行初始化,根据SPI获取HeartbeatSender

public final class HeartbeatSenderProvider {private static HeartbeatSender heartbeatSender = null;private static void resolveInstance() {HeartbeatSender resolved = (HeartbeatSender)SpiLoader.loadHighestPriorityInstance(HeartbeatSender.class);if (resolved == null) {RecordLog.warn("[HeartbeatSenderProvider] WARN: No existing HeartbeatSender found", new Object[0]);} else {heartbeatSender = resolved;RecordLog.info("[HeartbeatSenderProvider] HeartbeatSender activated: " + resolved.getClass().getCanonicalName(), new Object[0]);}}public static HeartbeatSender getHeartbeatSender() {return heartbeatSender;}private HeartbeatSenderProvider() {}static {resolveInstance();}
}

StatisticNode

ClusterNodeDefaultNodeEntranceNode都继承StatisticNode

StatisticNode#addPassRequest,增加通过的请求数。

        private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);@Overridepublic void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}

ArrayMetric的data属性默认实现OccupiableBucketLeapArray

   	private final LeapArray<MetricBucket> data; public ArrayMetric(int sampleCount, int intervalInMs) {this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);}

LeapArray#currentWindow(long)。添加数据的时候,我们要先获取操作的目标窗口,也就是 currentWindow 这个方法。

    public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {return old;} else if (windowStart > old.windowStart()) {if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}

LeapArray#values(long),获取有效的窗口中的数据。

    public List<T> values(long timeMillis) {if (timeMillis < 0) {return new ArrayList<T>();}int size = array.length();List<T> result = new ArrayList<T>(size);for (int i = 0; i < size; i++) {WindowWrap<T> windowWrap = array.get(i);if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {continue;}result.add(windowWrap.value());}return result;}

TrafficShapingController

FlowRuleUtil#generateRater,根据规则生成对应的TrafficShapingController

    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {switch (rule.getControlBehavior()) {case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:default:// Default mode or unknown mode: default traffic shaping controller (fast-reject).}}return new DefaultController(rule.getCount(), rule.getGrade());}

DefaultController#canPass(),超过流量阈值的会直接拒绝。

    @Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);if (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime = TimeUtil.currentTimeMillis();waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.throw new PriorityWaitException(waitInMs);}}return false;}return true;}

SentinelResourceAspect

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")public void sentinelResourceAnnotationPointcut() {}@Around("sentinelResourceAnnotationPointcut()")public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {Method originMethod = resolveMethod(pjp);SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);if (annotation == null) {// Should not go through here.throw new IllegalStateException("Wrong state for SentinelResource annotation");}String resourceName = getResourceName(annotation.value(), originMethod);EntryType entryType = annotation.entryType();int resourceType = annotation.resourceType();Entry entry = null;try {entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());Object result = pjp.proceed();return result;} catch (BlockException ex) {return handleBlockException(pjp, annotation, ex);} catch (Throwable ex) {Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();// The ignore list will be checked first.if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {throw ex;}if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {traceException(ex);return handleFallback(pjp, annotation, ex);}// No fallback function can handle the exception, so throw it out.throw ex;} finally {if (entry != null) {entry.exit(1, pjp.getArgs());}}}
}

在这里插入图片描述

相关文章:

【源码解析】流控框架Sentinel源码解析

Sentinel简介 Sentinel是阿里开源的一款面向分布式、多语言异构化服务架构的流量治理组件。 主要以流量为切入点&#xff0c;从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。 核心概念 资源 资源…...

redis面试重点------源于黑马

缓存问题三兄弟 是因为不同的原因让请求全部打到了数据库而造成的问题 什么是缓存穿透&#xff1f; 缓存穿透是指查询一个数据&#xff0c;在redis和MySQL中都不存在。也就是查询一个数据不存在的数据&#xff0c;导致每次请求都会到达数据库&#xff0c;给数据造成很大的压力…...

jQuery知识点二

一、 jQuery 属性操作 1. 元素固有属性值 prop() 获取属性&#xff1a;prop("属性") 设置属性&#xff1a;prop&#xff08;"属性"&#xff0c;"属性值"&#xff09; ​所谓元素固有属性就是元素本身自带的属性&#xff0c;比如 <a> 元素里…...

4 月份 火火火火 的开源项目

盘点 4 月份 GitHub 上 Star 攀升最多的开源项目&#xff0c;整个 4 月份最火项目 90% 都是 AI 项目&#xff08;准确的说&#xff0c;最近半年的热榜都是 AI 项目&#xff09; 本期推荐开源项目目录&#xff1a; 1. AI 生成逼真语音 2. 复旦大模型 MOSS&#xff01; 3. 让画中…...

PAT A1011 World Cup Betting

1011 World Cup Betting 分数 20 作者 CHEN, Yue 单位 浙江大学 With the 2010 FIFA World Cup running, football fans the world over were becoming increasingly excited as the best players from the best teams doing battles for the World Cup trophy in South Af…...

Android 拍照以及相册中选择(适配高版本)————上传头像并裁剪(一)

前言 在项目研发中&#xff0c;相信大家都遇到过给用户增加头像照片的需求。 随着手机版本的不断更新&#xff0c;android 8、android 9、android 10、android 12、android 13、鸿蒙系统等等&#xff1b;遇到这个功能需求&#xff0c;大家肯定会想&#xff0c;“这还不好写&…...

带你了解现在的LED显示屏技术

随着LED显示屏技术的空前繁荣&#xff0c;LED显示屏产品备受关注&#xff0c;广泛应用于商业广告、实况播映、交通诱导、舞台演绎等领域&#xff0c;发展至今。你了解十大中国LED显示屏制造商吗&#xff1f; LED显示屏技术已经得到了长足的发展&#xff0c;现在的LED显示屏技术…...

AI模型推理(1)——入门篇

前言 本文主要介绍AI模型推理的相关基础概念&#xff0c;为后续云原生模型推理服务的学习做准备。 初识模型部署 对于深度学习模型来说&#xff0c;模型部署指让训练好的模型在特定环境中运行的过程。相比于常规的软件部署&#xff0c;模型部署会面临更多的难题&#xff1a; …...

MySQL--表的基本查询--0410--15

目录 1. Create 1.1 insert 1.1.2 插入否则更新 1.2 replace 2.Retrieve 2.1 select 2.1.1 全列查询 2.1.2 指定列查询 2.1.3 查询字段为表达式 2.1.4 为查询结果指定名称 2.1.5 去重 2.2 where 2.2.1 > and > and < and < and 2.2.2 in between…...

Scala语言入门以及基本语法

文章目录 前言1.环境搭建1) IDEA中插件下载2) SDK下载配置 2.基本使用1&#xff09;var与val的区别2) .基本数据类型3).字符串的基本用法4) 控制结构1) if else2) for 循环3) while循环 5)类6) 函数 前言 scala在一种简洁的高级语言中结合了面向对象和函数式编程。Scala的静态…...

Linux shell编程 循环语句for continue break

for循环是编程语言中一种循环语句 示例1&#xff1a;循环读取user.txt中的用户名&#xff0c;创建用户。设置密码。 for i in $(cat /opt/user.txt) douseradd $iecho 123456 | passwd --stdin $i done 示例2&#xff1a;循环读取ipaddr文本文件中地址&#xff0c;执行ping命令…...

leetcode 643. 子数组最大平均数 I

题目描述解题思路执行结果 leetcode 643. 子数组最大平均数 I 题目描述 子数组最大平均数 I 给你一个由 n 个元素组成的整数数组 nums 和一个整数 k 。 请你找出平均数最大且 长度为 k 的连续子数组&#xff0c;并输出该最大平均数。 任何误差小于 10-5 的答案都将被视为正确答…...

TDA4VM/VH 芯片硬件 mailbox

请从官网下载 TD4VM 技术参考手册&#xff0c;地址如下&#xff1a; TDA4VM 技术参考手册地址 概述 (Mailbox 的介绍在 TRM 的第7.1章节) Mailbox 使用邮箱中断机制实现了 VM 芯片的核间通信。 Mailbox 是集成在 NAVSS0 域下的一个外设&#xff08;NAVSS0 的说明可以查看&a…...

如何利用Trimble RealWorks三维激光扫描仪进行外业测量和内业处理?

文章目录 0.引言1.Trimble RealWorks介绍2.外业测量3.内业处理 0.引言 笔者所在资源与环境工程学院实验室采购有一台Trimble RealWorks三维激光扫描仪&#xff08;仪器名&#xff1a;Trimble TX8&#xff09;&#xff0c;因项目需要&#xff0c;在学校实验场地进行实地测量训练…...

mysql数据备份

数据备份分类 数据库的备份类型 完全备份&#xff1a;对整个数据库的数据进行备份部分备份&#xff1a;对部分数据进行备份&#xff08;可以是一张表也可以是多张表&#xff09; 增量备份&#xff1a;是以上一次备份为基础来备份变更数据的&#xff0c;节约空间差异备份&#x…...

排队接水--贪心

排队接水 题目描述 有 n n n 个人在一个水龙头前排队接水&#xff0c;假如每个人接水的时间为 T i T_i Ti​&#xff0c;请编程找出这 n n n 个人排队的一种顺序&#xff0c;使得 n n n 个人的平均等待时间最小。 输入格式 第一行为一个整数 n n n。 第二行 n n n 个…...

数字温度传感器-DS18B20

文章目录 一、DS18B20器件图二、DS18B20特点三、DS18B20内部结构内部构成 四、工作时序1.初始化时序2.ReadOneChar2.WriteOneChar 一、DS18B20器件图 DS18B20的管脚排列&#xff1a; GND为电源地&#xff1b;DQ为数字信号输入&#xff0f;输出端&#xff1b;VDD为外接供电电源…...

【算法】【算法杂谈】从M个数中等概率的选出n个数,保证每一个数的选中概率都是n/m(蓄水池算法)

目录 前言问题介绍解决方案代码编写java语言版本c语言版本c语言版本 思考感悟写在最后 前言 当前所有算法都使用测试用例运行过&#xff0c;但是不保证100%的测试用例&#xff0c;如果存在问题务必联系批评指正~ 在此感谢左大神让我对算法有了新的感悟认识&#xff01; 问题介…...

vue3+ts+vite自适应项目——路由、layout布局

系列文章目录 第一章&#xff1a;搭建项目 目录 系列文章目录 前言 一、vue-router 1.安装vue-router 2.引入 2.1 新建页面 2.2 公共样式引入 2.3 layout 布局 2.4路由配置 总结 前言 上一章我们搭建了项目&#xff0c;这一张主要讲路由和layout布局&#xff0c;和…...

数据库之约束、索引和事务

一、约束 约束,顾名思义就是数据库对数据库中的数据所给出的一组检验规则.负责判断元素是否符合数据库要求.其目的就是为了提高效率以及准确性. 1.not null - > 数据元素非空 表示如果插入数据,则当前数据不能为空. //创建一张学生表,其班级id和年级id不为空 create …...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

使用Spring AI和MCP协议构建图片搜索服务

目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式&#xff08;本地调用&#xff09; SSE模式&#xff08;远程调用&#xff09; 4. 注册工具提…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配

目录 一、C 内存的基本概念​ 1.1 内存的物理与逻辑结构​ 1.2 C 程序的内存区域划分​ 二、栈内存分配​ 2.1 栈内存的特点​ 2.2 栈内存分配示例​ 三、堆内存分配​ 3.1 new和delete操作符​ 4.2 内存泄漏与悬空指针问题​ 4.3 new和delete的重载​ 四、智能指针…...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...

windows系统MySQL安装文档

概览&#xff1a;本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容&#xff0c;为学习者提供全面的操作指导。关键要点包括&#xff1a; 解压 &#xff1a;下载完成后解压压缩包&#xff0c;得到MySQL 8.…...

【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error

在前端开发中&#xff0c;JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作&#xff08;如 Promise、async/await 等&#xff09;&#xff0c;开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝&#xff08;r…...

Oracle11g安装包

Oracle 11g安装包 适用于windows系统&#xff0c;64位 下载路径 oracle 11g 安装包...

Mac flutter环境搭建

一、下载flutter sdk 制作 Android 应用 | Flutter 中文文档 - Flutter 中文开发者网站 - Flutter 1、查看mac电脑处理器选择sdk 2、解压 unzip ~/Downloads/flutter_macos_arm64_3.32.2-stable.zip \ -d ~/development/ 3、添加环境变量 命令行打开配置环境变量文件 ope…...