【源码解析】流控框架Sentinel源码解析
Sentinel简介
Sentinel是阿里开源的一款面向分布式、多语言异构化服务架构的流量治理组件。
主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。
核心概念
资源
资源是Sentinel中一个非常重要的概念,资源就是Sentinel所保护的对象。
资源可以是一段代码,又或者是一个接口,Sentinel中并没有什么强制规定,但是实际项目中一般以一个接口为一个资源,比如说一个http接口,又或者是rpc接口,它们就是资源,可以被保护。
资源是通过Sentinel的API定义的,每个资源都有一个对应的名称,比如对于一个http接口资源来说,Sentinel默认的资源名称就是请求路径。
规则
规则也是一个重要的概念,规则其实比较好理解,比如说要对一个资源进行限流,那么限流的条件就是规则,后面在限流的时候会基于这个规则来判定是否需要限流。
Sentinel的规则分为流量控制规则、熔断降级规则以及系统保护规则,不同的规则实现的效果不一样。
快速入门
引入依赖
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId><version>2021.1</version></dependency>
设置流控规则
@Configuration
public class FLowRulesConfig {/*** 定义限流规则*/@PostConstructpublic void initFLowRules() {//1.创建限流规则List<FlowRule> rules = new ArrayList<>();FlowRule rule = new FlowRule();// 定义资源,表示Sentinel会对哪个资源生效rule.setResource("hello");// 定义限流规则类型,QPS限流类型rule.setGrade(RuleConstant.FLOW_GRADE_QPS);// 定义QPS每秒能通过的请求个数rule.setCount(2);rules.add(rule);// 2、加载限流规则FlowRuleManager.loadRules(rules);}
}
编写Controller,进行测试
@Slf4j
@RestController
public class HelloController {@GetMapping("/hello")public String startHello() {// 进行限流try (Entry entry = SphU.entry("hello")) { // 加载限流规则,若果存在才会往下执行// 被保护的资源return "hello Sentinel";} catch (BlockException e) {e.printStackTrace();// 被限流或者被降级的操作return "系统繁忙";}}
}
源码拆解
CtSph
核心是Entry entry = SphU.entry("hello")
。
CtSph#entryWithPriority()
,获取Context
,如果不存在,则使用默认的。构造责任链,执行。
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {// The {@link NullContext} indicates that the amount of context has exceeded the threshold,// so here init the entry only. No rule checking will be done.return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ProcessorSlotChain chain = chainMap.get(resourceWrapper);if (chain == null) {synchronized (LOCK) {chain = chainMap.get(resourceWrapper);if (chain == null) {// Entry size limit.if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {return null;}chain = SlotChainProvider.newSlotChain();Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);newMap.putAll(chainMap);newMap.put(resourceWrapper, chain);chainMap = newMap;}}}return chain;}
ContextUtil
静态方法,会初始化一个默认的EntranceNode
。
static {// Cache the entrance node for default context.initDefaultContext();}private static void initDefaultContext() {String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);Constants.ROOT.addChild(node);contextNameNodeMap.put(defaultContextName, node);}
ContextUtil
ContextUtil#enter()
,当执行该方法的时候,会往线程变量ThreadLocal
存入当前的Context
public static Context enter(String name, String origin) {if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {throw new ContextNameDefineException("The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");}return trueEnter(name, origin);}protected static Context trueEnter(String name, String origin) {Context context = contextHolder.get();if (context == null) {Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;DefaultNode node = localCacheNameMap.get(name);if (node == null) {if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {LOCK.lock();try {node = contextNameNodeMap.get(name);if (node == null) {if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {setNullContext();return NULL_CONTEXT;} else {node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);// Add entrance node.Constants.ROOT.addChild(node);Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);newMap.putAll(contextNameNodeMap);newMap.put(name, node);contextNameNodeMap = newMap;}}} finally {LOCK.unlock();}}}context = new Context(node, name);context.setOrigin(origin);contextHolder.set(context);}return context;}
ProcessorSlotChain
SlotChainProvider#newSlotChain
,按照默认的DefaultSlotChainBuilder
生成责任链。
public static ProcessorSlotChain newSlotChain() {if (slotChainBuilder != null) {return slotChainBuilder.build();}// Resolve the slot chain builder SPI.slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);if (slotChainBuilder == null) {// Should not go through here.RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");slotChainBuilder = new DefaultSlotChainBuilder();} else {RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "+ slotChainBuilder.getClass().getCanonicalName());}return slotChainBuilder.build();}
DefaultSlotChainBuilder#build
,根据spi加载ProcessorSlot
。
public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();// Note: the instances of ProcessorSlot should be different, since they are not stateless.List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);for (ProcessorSlot slot : sortedSlotList) {if (!(slot instanceof AbstractLinkedProcessorSlot)) {RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");continue;}chain.addLast((AbstractLinkedProcessorSlot<?>) slot);}return chain;}
}
NodeSelectorSlot
NodeSelectorSlot
,处理同一个resource,使用同一个node
@SpiOrder(-10000)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {/*** {@link DefaultNode}s of the same resource in different context.*/private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {DefaultNode node = map.get(context.getName());if (node == null) {synchronized (this) {node = map.get(context.getName());if (node == null) {node = new DefaultNode(resourceWrapper, null);HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap;// Build invocation tree((DefaultNode) context.getLastNode()).addChild(node);}}}context.setCurNode(node);fireEntry(context, resourceWrapper, node, count, prioritized, args);}
}
ClusterBuilderSlot
对于每一个 resource,会对应一个 ClusterNode
实例,如果不存在,就创建一个实例。当设置了 origin 的时候,会额外生成一个 StatisticsNode
实例,挂在 ClusterNode
上。
- 不同入口的访问数据是
DefaultNode
- 统计所有入口访问数据之和是
ClusterNode
- 来自某个服务的访问数据是
OriginNode
@SpiOrder(-9000)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();private static final Object lock = new Object();private volatile ClusterNode clusterNode = null;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args)throws Throwable {if (clusterNode == null) {synchronized (lock) {if (clusterNode == null) {// Create the cluster node.clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));newMap.putAll(clusterNodeMap);newMap.put(node.getId(), clusterNode);clusterNodeMap = newMap;}}}node.setClusterNode(clusterNode);/** if context origin is set, we should get or create a new {@link Node} of* the specific origin.*/if (!"".equals(context.getOrigin())) {Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());context.getCurEntry().setOriginNode(originNode);}fireEntry(context, resourceWrapper, node, count, prioritized, args);}
}
LogSlot
对于下面的节点如果有抛出阻塞异常,进行捕捉,并且打印日志。记录哪些接口被规则挡住了。
@SpiOrder(-8000)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)throws Throwable {try {fireEntry(context, resourceWrapper, obj, count, prioritized, args);} catch (BlockException e) {EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),context.getOrigin(), count);throw e;} catch (Throwable e) {RecordLog.warn("Unexpected entry exception", e);}}
}
StatisticSlot
负责进行数据统计,为限流降级提供数据支持的。
@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// Do some checking.fireEntry(context, resourceWrapper, node, count, prioritized, args);// Request passed, add thread count and pass count.node.increaseThreadNum();node.addPassRequest(count);if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (PriorityWaitException ex) {node.increaseThreadNum();if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (BlockException e) {// Blocked, set block exception to current entry.context.getCurEntry().setBlockError(e);// Add block count.node.increaseBlockQps(count);if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseBlockQps(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseBlockQps(count);}// Handle block event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onBlocked(e, context, resourceWrapper, node, count, args);}throw e;} catch (Throwable e) {// Unexpected internal error, set error to current entry.context.getCurEntry().setError(e);throw e;}}
}
AuthoritySlot
AuthoritySlot
做权限控制,根据 origin 做黑白名单的控制:
@SpiOrder(-6000)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)throws Throwable {checkBlackWhiteAuthority(resourceWrapper, context);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();if (authorityRules == null) {return;}Set<AuthorityRule> rules = authorityRules.get(resource.getName());if (rules == null) {return;}for (AuthorityRule rule : rules) {if (!AuthorityRuleChecker.passCheck(rule, context)) {throw new AuthorityException(context.getOrigin(), rule);}}}
}
AuthorityRuleChecker#passCheck
对origin进行规则校验
final class AuthorityRuleChecker {static boolean passCheck(AuthorityRule rule, Context context) {String requester = context.getOrigin();// Empty origin or empty limitApp will pass.if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {return true;}// Do exact match with origin name.int pos = rule.getLimitApp().indexOf(requester);boolean contain = pos > -1;if (contain) {boolean exactlyMatch = false;String[] appArray = rule.getLimitApp().split(",");for (String app : appArray) {if (requester.equals(app)) {exactlyMatch = true;break;}}contain = exactlyMatch;}int strategy = rule.getStrategy();if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {return false;}if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {return false;}return true;}private AuthorityRuleChecker() {}
}
SystemSlot
根据整个系统运行的统计数据来限流的,防止当前系统负载过高。它支持入口qps、线程数、响应时间、cpu使用率、负载5个限流的维度。
@SpiOrder(-5000)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {SystemRuleManager.checkSystem(resourceWrapper);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}}
Sentinel 针对所有的入口流量,使用了一个全局的 ENTRY_NODE 进行统计,所以我们也要知道,系统保护规则是全局的,和具体的某个资源没有关系。
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {if (resourceWrapper == null) {return;}// Ensure the checking switch is on.if (!checkSystemStatus.get()) {return;}// for inbound traffic onlyif (resourceWrapper.getEntryType() != EntryType.IN) {return;}// total qpsdouble currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();if (currentQps > qps) {throw new SystemBlockException(resourceWrapper.getName(), "qps");}// total threadint currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();if (currentThread > maxThread) {throw new SystemBlockException(resourceWrapper.getName(), "thread");}double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();if (rt > maxRt) {throw new SystemBlockException(resourceWrapper.getName(), "rt");}// load. BBR algorithm.if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {if (!checkBbr(currentThread)) {throw new SystemBlockException(resourceWrapper.getName(), "load");}}// cpu usageif (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {throw new SystemBlockException(resourceWrapper.getName(), "cpu");}}
FlowSlot
流控的核心处理类
@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;public FlowSlot() {this(new FlowRuleChecker());}/*** Package-private for test.** @param checker flow rule checker* @since 1.6.1*/FlowSlot(FlowRuleChecker checker) {AssertUtil.notNull(checker, "flow checker should not be null");this.checker = checker;}@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}
}
FlowRuleChecker#checkFlow
public class FlowRuleChecker {public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}
}
DegradeSlot
降级策略。Sentinel支持三种熔断策略:慢调用比例、异常比例 、异常数。
Sentinel会为每个设置的规则都创建一个熔断器,熔断器有三种状态,OPEN(打开)、HALF_OPEN(半开)、CLOSED(关闭)
- 当处于CLOSED状态时,可以访问资源,访问之后会进行慢调用比例、异常比例、异常数的统计,一旦达到了设置的阈值,就会将熔断器的状态设置为OPEN
- 当处于OPEN状态时,会去判断是否达到了熔断时间,如果没到,拒绝访问,如果到了,那么就将状态改成HALF_OPEN,然后访问资源,访问之后会对访问结果进行判断,符合规则设置的要求,直接将熔断器设置为CLOSED,关闭熔断器,不符合则还是改为OPEN状态
- 当处于HALF_OPEN状态时,直接拒绝访问资源
@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {performChecking(context, resourceWrapper);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void performChecking(Context context, ResourceWrapper r) throws BlockException {List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {return;}for (CircuitBreaker cb : circuitBreakers) {if (!cb.tryPass(context)) {throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}}}
}
SentinelWebInterceptor
SentinelWebInterceptor
继承AbstractSentinelInterceptor
,当有请求访问,会执行AbstractSentinelInterceptor#preHandle
。
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {try {String resourceName = this.getResourceName(request);if (StringUtil.isEmpty(resourceName)) {return true;} else if (this.increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {return true;} else {String origin = this.parseOrigin(request);String contextName = this.getContextName(request);ContextUtil.enter(contextName, origin);Entry entry = SphU.entry(resourceName, 1, EntryType.IN);request.setAttribute(this.baseWebMvcConfig.getRequestAttributeName(), entry);return true;}} catch (BlockException var12) {BlockException e = var12;try {this.handleBlockException(request, response, e);} finally {ContextUtil.exit();}return false;}}
InitFunc
SphU#entry()
,执行该方法会调用Env
的静态变量,Env进行初始化。
public static Entry entry(String name) throws BlockException {return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);}
Env
执行静态方法。
public class Env {public static final Sph sph = new CtSph();static {// If init fails, the process will exit.InitExecutor.doInit();}}
InitExecutor#doInit
使用 SPI 加载 InitFunc
的实现
public static void doInit() {if (!initialized.compareAndSet(false, true)) {return;}try {ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);List<OrderWrapper> initList = new ArrayList<OrderWrapper>();for (InitFunc initFunc : loader) {RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());insertSorted(initList, initFunc);}for (OrderWrapper w : initList) {w.func.init();RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",w.func.getClass().getCanonicalName(), w.order));}} catch (Exception ex) {RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);ex.printStackTrace();} catch (Error error) {RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);error.printStackTrace();}}
HeartbeatSenderInitFunc
,使用HeartbeatSenderProvider
获取加载HeartbeatSender
,定时执行。
@InitOrder(-1)
public class HeartbeatSenderInitFunc implements InitFunc {private ScheduledExecutorService pool = null;public HeartbeatSenderInitFunc() {}private void initSchedulerIfNeeded() {if (this.pool == null) {this.pool = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("sentinel-heartbeat-send-task", true), new DiscardOldestPolicy());}}public void init() {HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();if (sender == null) {RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded", new Object[0]);} else {this.initSchedulerIfNeeded();long interval = this.retrieveInterval(sender);this.setIntervalIfNotExists(interval);this.scheduleHeartbeatTask(sender, interval);}}private boolean isValidHeartbeatInterval(Long interval) {return interval != null && interval > 0L;}private void setIntervalIfNotExists(long interval) {SentinelConfig.setConfig("csp.sentinel.heartbeat.interval.ms", String.valueOf(interval));}long retrieveInterval(HeartbeatSender sender) {Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs();if (this.isValidHeartbeatInterval(intervalInConfig)) {RecordLog.info("[HeartbeatSenderInitFunc] Using heartbeat interval in Sentinel config property: " + intervalInConfig, new Object[0]);return intervalInConfig;} else {long senderInterval = sender.intervalMs();RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in config property or invalid, using sender default: " + senderInterval, new Object[0]);return senderInterval;}}private void scheduleHeartbeatTask(final HeartbeatSender sender, long interval) {this.pool.scheduleAtFixedRate(new Runnable() {public void run() {try {sender.sendHeartbeat();} catch (Throwable var2) {RecordLog.warn("[HeartbeatSender] Send heartbeat error", var2);}}}, 5000L, interval, TimeUnit.MILLISECONDS);RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: " + sender.getClass().getCanonicalName(), new Object[0]);}
}
HeartbeatSenderProvider
静态方法块进行初始化,根据SPI获取HeartbeatSender
。
public final class HeartbeatSenderProvider {private static HeartbeatSender heartbeatSender = null;private static void resolveInstance() {HeartbeatSender resolved = (HeartbeatSender)SpiLoader.loadHighestPriorityInstance(HeartbeatSender.class);if (resolved == null) {RecordLog.warn("[HeartbeatSenderProvider] WARN: No existing HeartbeatSender found", new Object[0]);} else {heartbeatSender = resolved;RecordLog.info("[HeartbeatSenderProvider] HeartbeatSender activated: " + resolved.getClass().getCanonicalName(), new Object[0]);}}public static HeartbeatSender getHeartbeatSender() {return heartbeatSender;}private HeartbeatSenderProvider() {}static {resolveInstance();}
}
StatisticNode
ClusterNode
,DefaultNode
,EntranceNode
都继承StatisticNode
。
StatisticNode#addPassRequest
,增加通过的请求数。
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);@Overridepublic void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}
ArrayMetric
的data属性默认实现OccupiableBucketLeapArray
private final LeapArray<MetricBucket> data; public ArrayMetric(int sampleCount, int intervalInMs) {this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);}
LeapArray#currentWindow(long)
。添加数据的时候,我们要先获取操作的目标窗口,也就是 currentWindow
这个方法。
public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {return old;} else if (windowStart > old.windowStart()) {if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}
LeapArray#values(long)
,获取有效的窗口中的数据。
public List<T> values(long timeMillis) {if (timeMillis < 0) {return new ArrayList<T>();}int size = array.length();List<T> result = new ArrayList<T>(size);for (int i = 0; i < size; i++) {WindowWrap<T> windowWrap = array.get(i);if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {continue;}result.add(windowWrap.value());}return result;}
TrafficShapingController
FlowRuleUtil#generateRater
,根据规则生成对应的TrafficShapingController
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {switch (rule.getControlBehavior()) {case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:default:// Default mode or unknown mode: default traffic shaping controller (fast-reject).}}return new DefaultController(rule.getCount(), rule.getGrade());}
DefaultController#canPass()
,超过流量阈值的会直接拒绝。
@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);if (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime = TimeUtil.currentTimeMillis();waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.throw new PriorityWaitException(waitInMs);}}return false;}return true;}
SentinelResourceAspect
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")public void sentinelResourceAnnotationPointcut() {}@Around("sentinelResourceAnnotationPointcut()")public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {Method originMethod = resolveMethod(pjp);SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);if (annotation == null) {// Should not go through here.throw new IllegalStateException("Wrong state for SentinelResource annotation");}String resourceName = getResourceName(annotation.value(), originMethod);EntryType entryType = annotation.entryType();int resourceType = annotation.resourceType();Entry entry = null;try {entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());Object result = pjp.proceed();return result;} catch (BlockException ex) {return handleBlockException(pjp, annotation, ex);} catch (Throwable ex) {Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();// The ignore list will be checked first.if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {throw ex;}if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {traceException(ex);return handleFallback(pjp, annotation, ex);}// No fallback function can handle the exception, so throw it out.throw ex;} finally {if (entry != null) {entry.exit(1, pjp.getArgs());}}}
}
相关文章:

【源码解析】流控框架Sentinel源码解析
Sentinel简介 Sentinel是阿里开源的一款面向分布式、多语言异构化服务架构的流量治理组件。 主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。 核心概念 资源 资源…...

redis面试重点------源于黑马
缓存问题三兄弟 是因为不同的原因让请求全部打到了数据库而造成的问题 什么是缓存穿透? 缓存穿透是指查询一个数据,在redis和MySQL中都不存在。也就是查询一个数据不存在的数据,导致每次请求都会到达数据库,给数据造成很大的压力…...

jQuery知识点二
一、 jQuery 属性操作 1. 元素固有属性值 prop() 获取属性:prop("属性") 设置属性:prop("属性","属性值") 所谓元素固有属性就是元素本身自带的属性,比如 <a> 元素里…...

4 月份 火火火火 的开源项目
盘点 4 月份 GitHub 上 Star 攀升最多的开源项目,整个 4 月份最火项目 90% 都是 AI 项目(准确的说,最近半年的热榜都是 AI 项目) 本期推荐开源项目目录: 1. AI 生成逼真语音 2. 复旦大模型 MOSS! 3. 让画中…...

PAT A1011 World Cup Betting
1011 World Cup Betting 分数 20 作者 CHEN, Yue 单位 浙江大学 With the 2010 FIFA World Cup running, football fans the world over were becoming increasingly excited as the best players from the best teams doing battles for the World Cup trophy in South Af…...

Android 拍照以及相册中选择(适配高版本)————上传头像并裁剪(一)
前言 在项目研发中,相信大家都遇到过给用户增加头像照片的需求。 随着手机版本的不断更新,android 8、android 9、android 10、android 12、android 13、鸿蒙系统等等;遇到这个功能需求,大家肯定会想,“这还不好写&…...

带你了解现在的LED显示屏技术
随着LED显示屏技术的空前繁荣,LED显示屏产品备受关注,广泛应用于商业广告、实况播映、交通诱导、舞台演绎等领域,发展至今。你了解十大中国LED显示屏制造商吗? LED显示屏技术已经得到了长足的发展,现在的LED显示屏技术…...

AI模型推理(1)——入门篇
前言 本文主要介绍AI模型推理的相关基础概念,为后续云原生模型推理服务的学习做准备。 初识模型部署 对于深度学习模型来说,模型部署指让训练好的模型在特定环境中运行的过程。相比于常规的软件部署,模型部署会面临更多的难题: …...

MySQL--表的基本查询--0410--15
目录 1. Create 1.1 insert 1.1.2 插入否则更新 1.2 replace 2.Retrieve 2.1 select 2.1.1 全列查询 2.1.2 指定列查询 2.1.3 查询字段为表达式 2.1.4 为查询结果指定名称 2.1.5 去重 2.2 where 2.2.1 > and > and < and < and 2.2.2 in between…...

Scala语言入门以及基本语法
文章目录 前言1.环境搭建1) IDEA中插件下载2) SDK下载配置 2.基本使用1)var与val的区别2) .基本数据类型3).字符串的基本用法4) 控制结构1) if else2) for 循环3) while循环 5)类6) 函数 前言 scala在一种简洁的高级语言中结合了面向对象和函数式编程。Scala的静态…...

Linux shell编程 循环语句for continue break
for循环是编程语言中一种循环语句 示例1:循环读取user.txt中的用户名,创建用户。设置密码。 for i in $(cat /opt/user.txt) douseradd $iecho 123456 | passwd --stdin $i done 示例2:循环读取ipaddr文本文件中地址,执行ping命令…...

leetcode 643. 子数组最大平均数 I
题目描述解题思路执行结果 leetcode 643. 子数组最大平均数 I 题目描述 子数组最大平均数 I 给你一个由 n 个元素组成的整数数组 nums 和一个整数 k 。 请你找出平均数最大且 长度为 k 的连续子数组,并输出该最大平均数。 任何误差小于 10-5 的答案都将被视为正确答…...

TDA4VM/VH 芯片硬件 mailbox
请从官网下载 TD4VM 技术参考手册,地址如下: TDA4VM 技术参考手册地址 概述 (Mailbox 的介绍在 TRM 的第7.1章节) Mailbox 使用邮箱中断机制实现了 VM 芯片的核间通信。 Mailbox 是集成在 NAVSS0 域下的一个外设(NAVSS0 的说明可以查看&a…...

如何利用Trimble RealWorks三维激光扫描仪进行外业测量和内业处理?
文章目录 0.引言1.Trimble RealWorks介绍2.外业测量3.内业处理 0.引言 笔者所在资源与环境工程学院实验室采购有一台Trimble RealWorks三维激光扫描仪(仪器名:Trimble TX8),因项目需要,在学校实验场地进行实地测量训练…...

mysql数据备份
数据备份分类 数据库的备份类型 完全备份:对整个数据库的数据进行备份部分备份:对部分数据进行备份(可以是一张表也可以是多张表) 增量备份:是以上一次备份为基础来备份变更数据的,节约空间差异备份&#x…...

排队接水--贪心
排队接水 题目描述 有 n n n 个人在一个水龙头前排队接水,假如每个人接水的时间为 T i T_i Ti,请编程找出这 n n n 个人排队的一种顺序,使得 n n n 个人的平均等待时间最小。 输入格式 第一行为一个整数 n n n。 第二行 n n n 个…...

数字温度传感器-DS18B20
文章目录 一、DS18B20器件图二、DS18B20特点三、DS18B20内部结构内部构成 四、工作时序1.初始化时序2.ReadOneChar2.WriteOneChar 一、DS18B20器件图 DS18B20的管脚排列: GND为电源地;DQ为数字信号输入/输出端;VDD为外接供电电源…...

【算法】【算法杂谈】从M个数中等概率的选出n个数,保证每一个数的选中概率都是n/m(蓄水池算法)
目录 前言问题介绍解决方案代码编写java语言版本c语言版本c语言版本 思考感悟写在最后 前言 当前所有算法都使用测试用例运行过,但是不保证100%的测试用例,如果存在问题务必联系批评指正~ 在此感谢左大神让我对算法有了新的感悟认识! 问题介…...

vue3+ts+vite自适应项目——路由、layout布局
系列文章目录 第一章:搭建项目 目录 系列文章目录 前言 一、vue-router 1.安装vue-router 2.引入 2.1 新建页面 2.2 公共样式引入 2.3 layout 布局 2.4路由配置 总结 前言 上一章我们搭建了项目,这一张主要讲路由和layout布局,和…...

数据库之约束、索引和事务
一、约束 约束,顾名思义就是数据库对数据库中的数据所给出的一组检验规则.负责判断元素是否符合数据库要求.其目的就是为了提高效率以及准确性. 1.not null - > 数据元素非空 表示如果插入数据,则当前数据不能为空. //创建一张学生表,其班级id和年级id不为空 create …...

centos --libreoffice使用
您可以按照以下步骤在CentOS上安装LibreOffice: 打开终端并使用root用户登录。 运行以下命令更新系统软件包: yum update安装LibreOffice依赖项: yum install -y libreoffice-headless libreoffice-writer libreoffice-calc libreoffice-…...

Steam-V Rising 私人服务器架设教程
一、安装前的准备 一台服务器 拥有公网IP并且做好了端口映射 二、使用SteamCMD安装服务器 1.下载SteamCMD SteamCMD是Steam专用的命令行式客户端程序,所有的安装方式可以参照:https://developer.valvesoftware.com/wiki/SteamCMD 或者在其他站点自行…...

SpringBoot+Vue3实现登录验证码功能
系列文章目录 Redis缓存穿透、击穿、雪崩问题及解决方法Spring Cache的使用–快速上手篇分页查询–Java项目实战篇全局异常处理–Java实战项目篇 Java实现发送邮件(定时自动发送邮件)_java邮件通知_心态还需努力呀的博客-CSDN博客 该系列文章持续更新…...

spring2:创建和使用
目录 1.创建Spring项目 1.1创建Maven类 1.2添加Spring支持框架 1.3添加启动类 2.存储Bean对象 2.0 spring项目中添加配置文件(第一次) 2.1创建Bean 2.2把Bean注册到容器中 3.获取并使用Bean对象 3.1创建上下文 3.2获取指定Bean对象 getBean()方法 --> 获取什么…...

前端如何处理后端一次性传来的10w条数据?
写在前面 如果你在面试中被问到这个问题,你可以用下面的内容回答这个问题,如果你在工作中遇到这个问题,你应该先揍那个写 API 的人。 创建服务器 为了方便后续测试,我们可以使用node创建一个简单的服务器。 const http requir…...

Codeforces Round 867 (Div. 3)(A-G2)
文章目录 A. TubeTube Feed1、题目2、分析3、代码, B. Karina and Array1、题目2、分析3、代码 C. Bun Lover1、问题2、分析(1)观察样例法(2)正解推导 3、代码 D. Super-Permutation1、问题2、分析(1&#…...

蓝奥声核心技术分享——一种无线低功耗配置技术
1.技术背景 无线低功耗配置技术指基于对目标场景状态变化的协同感知而获得触发响应并进行智能决策,属于蓝奥声核心技术--边缘协同感知(EICS)技术的关键支撑性技术之一。该项技术涉及物联网边缘域的无线通信技术领域,具体主要涉及网络服务节点…...

kafka集群模拟单节点故障
这里通过kafka manage来展示节点宕机效果 现在三台主机节点均正常 topic正常识别到三个broker leader也均匀分配到了三个broker上 现在把节点id为0的主机模拟宕机 可以通过以上两张图片看到每个topic现在只识别到了两个broker节点,broker id为0的节点已经被剔除掉了 isr列…...

笔记:vue-cli-service
vue-cli-service serve 这个是什么意思? vue-cli-service serve 是一个 Vue.js CLI 命令,用于在本地开发环境下运行一个开发服务器,以便你可以在浏览器中查看和测试你的 Vue.js 应用程序。它在开发期间提供了自动重载、热模块替换和其它实用…...

Amazon S3 对象存储Java API操作记录(Minio与S3 SDK两种实现)
缘起 今年(2023年) 2月的时候做了个适配Amazon S3对象存储接口的需求,由于4月份自学考试临近,一直在备考就拖着没总结记录下,开发联调过程中也出现过一些奇葩的问题,最近人刚从考试缓过来顺手记录一下。 S3对象存储的基本概念 …...