电子商务网站建设用什么登录/慈溪seo排名
Sharding-JDBC系列
1、Sharding-JDBC分库分表的基本使用
2、Sharding-JDBC分库分表之SpringBoot分片策略
3、Sharding-JDBC分库分表之SpringBoot主从配置
4、SpringBoot集成Sharding-JDBC-5.3.0分库分表
5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表
6、【源码】Sharding-JDBC源码分析之JDBC
7、【源码】Sharding-JDBC源码分析之SPI机制
8、【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理
9、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)
10、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(二)
11、【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理
12、【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理
13、【源码】Sharding-JDBC源码分析之ContextManager创建中mode分片配置信息的持久化存储的原理
14、【源码】Sharding-JDBC源码分析之ContextManager创建中ShardingSphereDatabase的创建原理
15、【源码】Sharding-JDBC源码分析之分片规则生成器DatabaseRuleBuilder实现规则配置到规则对象的生成原理
16、【源码】Sharding-JDBC源码分析之配置数据库定义的表的元数据解析原理
17、【源码】Sharding-JDBC源码分析之ShardingSphereConnection的创建原理
18、【源码】Sharding-JDBC源码分析之ShardingSpherePreparedStatement的创建原理
19、【源码】Sharding-JDBC源码分析之Sql解析的原理
20、【源码】Sharding-JDBC源码分析之SQL路由及SingleSQLRouter单表路由
21、【源码】Sharding-JDBC源码分析之SQL中分片键路由ShardingSQLRouter的原理
22、【源码】Sharding-JDBC源码分析之SQL中读写分离路由ReadwriteSplittingSQLRouter的原理
23、【源码】Sharding-JDBC源码分析之SQL中读写分离动态策略、数据库发现规则及DatabaseDiscoverySQLRouter路由的原理
前言
在上一篇
【源码】Sharding-JDBC源码分析之SQL中读写分离路由ReadwriteSplittingSQLRouter的原理
介绍了读写分离策略时,策略分为静态策略和动态策略,并详细介绍了静态策略的实现。本篇从源码的角度,分析动态策略的实现。读写分离规则的动态策略是结合数据库发现规则来实现的,且数据库发现规则可单独作为路由器,对应的路由器为DatabaseDiscoverySQLRouter,对应的规则对象为DatabaseDiscoveryRule。
ReadwriteSplittingStrategyFactory
在ReadwriteSplittingDataSourceRule读写分离规则类的构造方法中,通过ReadwriteSplittingStrategyFactory的newInstance()方法,创建ReadwriteSplittingStrategy读写分离策略。
ReadwriteSplittingStrategyFactory的源码如下:
package org.apache.shardingsphere.readwritesplitting.strategy;/*** 读写分离策略工厂*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ReadwriteSplittingStrategyFactory {/*** 实例化读写分离策略* @param readwriteSplittingConfig 读写分离配置* @param builtRules 配置的规则* @return*/public static ReadwriteSplittingStrategy newInstance(final ReadwriteSplittingDataSourceRuleConfiguration readwriteSplittingConfig, final Collection<ShardingSphereRule> builtRules) {// 如果没有指定静态策略return null == readwriteSplittingConfig.getStaticStrategy()// 默认创建动态策略? createDynamicReadwriteSplittingStrategy(readwriteSplittingConfig.getDynamicStrategy(), builtRules)// 创建静态策略: createStaticReadwriteSplittingStrategy(readwriteSplittingConfig.getStaticStrategy());}/*** 创建一个静态读写分离策略* @param staticConfig* @return*/private static StaticReadwriteSplittingStrategy createStaticReadwriteSplittingStrategy(final StaticReadwriteSplittingStrategyConfiguration staticConfig) {return new StaticReadwriteSplittingStrategy(staticConfig.getWriteDataSourceName(), staticConfig.getReadDataSourceNames());}/*** 创建动态读写分离策略* @param dynamicConfig 读写分离的动态策略配置* @param builtRules 配置的规则* @return*/private static DynamicReadwriteSplittingStrategy createDynamicReadwriteSplittingStrategy(final DynamicReadwriteSplittingStrategyConfiguration dynamicConfig,final Collection<ShardingSphereRule> builtRules) {// 从配置的规则中获取DynamicDataSourceContainedRule,默认只有DatabaseDiscoveryRule,即动态数据库发现规则Optional<ShardingSphereRule> dynamicDataSourceStrategy = builtRules.stream().filter(each -> each instanceof DynamicDataSourceContainedRule).findFirst();// 获取是否允许写库支持读,默认为支持boolean allowWriteDataSourceQuery = Strings.isNullOrEmpty(dynamicConfig.getWriteDataSourceQueryEnabled()) ? Boolean.TRUE : Boolean.parseBoolean(dynamicConfig.getWriteDataSourceQueryEnabled());// 创建动态策略return new DynamicReadwriteSplittingStrategy(dynamicConfig.getAutoAwareDataSourceName(), allowWriteDataSourceQuery, (DynamicDataSourceContainedRule) dynamicDataSourceStrategy.get());}
}
通过createDynamicReadwriteSplittingStrategy()方法,创建DynamicReadwriteSplittingStrategy动态读写分离策略。
DynamicReadwriteSplittingStrategy
DynamicReadwriteSplittingStrategy的源码如下:
package org.apache.shardingsphere.readwritesplitting.strategy.type;/*** 动态读写分离策略*/
@RequiredArgsConstructor
@Getter
public final class DynamicReadwriteSplittingStrategy implements ReadwriteSplittingStrategy {// 自动感知的数据源名称private final String autoAwareDataSourceName;// 是否允许写数据源进行读操作private final boolean allowWriteDataSourceQuery;// 动态数据源包含规则private final DynamicDataSourceContainedRule dynamicDataSource;/*** 获取写数据源名称* @return*/@Overridepublic String getWriteDataSource() {return dynamicDataSource.getPrimaryDataSourceName(autoAwareDataSourceName);}/*** 获取读数据源名称* @return*/@Overridepublic List<String> getReadDataSources() {return new ArrayList<>(dynamicDataSource.getReplicaDataSourceNames(autoAwareDataSourceName));}@Overridepublic Collection<String> getAllDataSources() {return Collections.singletonList(autoAwareDataSourceName);}
}
在DynamicReadwriteSplittingStrategy动态读写分离策略中,通过DynamicDataSourceContainedRule对象,获取读写分离中的读数据源和写数据源名称。其中DynamicDataSourceContainedRule对象通过构造方法传入,即从ReadwriteSplittingStrategyFactory的createDynamicReadwriteSplittingStrategy()方法中传入的,且传入的规则是从系统配置的规则中,查找实现DynamicDataSourceContainedRule接口的规则,在系统中为DatabaseDiscoveryRule对象,即数据库发现规则。
动态读写分离配置示例
rules:- !READWRITE_SPLITTINGdataSources:readwrite_ds: rw_ds #逻辑数据源dynamicStrategy: #动态策略autoAwareDataSourceName: awareDataSource #允许自动识别的数据源名writeDataSourceQueryEnabled: true #允许写库进行读操作- !DB_DISCOVERYdataSources:awareDataSource: # 组名,同dynamicStrategy的autoAwareDataSourceNamedataSourceNames: ds_$->{1..2}discoveryHeartbeatName: ds_heartbeat #心跳检测discoveryTypeName: MySQLMGRdiscoveryHeartbeats:ds_heartbeat:props:keep-alive-cron: '0/5 * * * * ?'discoveryTypes: # 发现类型MySQLMGR:type: MySQL.MGRprops:group-name: your_group_name
DatabaseDiscoveryRule
数据库发现规则配置最后会解析成DatabaseDiscoveryRule对象。DatabaseDiscoveryRule的源码如下:
package org.apache.shardingsphere.dbdiscovery.rule;/*** 数据库发现规则*/
public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceContainedRule, DynamicDataSourceContainedRule, ExportableRule {// DatabaseDiscoveryRuleConfiguration对象@Getterprivate final RuleConfiguration configuration;// 数据源名称,默认为logic_dbprivate final String databaseName;// 数据库发现配置的数据源及对应的数据源对象private final Map<String, DataSource> dataSourceMap;// 配置的数据库发现提供算法对象信息private final Map<String, DatabaseDiscoveryProviderAlgorithm> discoveryTypes;// 数据库规则对象。key:组名(组名需为逻辑数据源或真实数据源的名称),一组一个DatabaseDiscoveryDataSourceRule对象@Getterprivate final Map<String, DatabaseDiscoveryDataSourceRule> dataSourceRules;private final InstanceContext instanceContext;// 针对Zookeeper集群的mode的日程上下文对象(非Zookeeper为空方法实例)private final ScheduleContext scheduleContext;/*** @param databaseName 数据源名称,默认为logic_db* @param dataSourceMap 数据库发现配置的数据源及对应的数据源对象* @param ruleConfig 数据库发现规则配置对象* @param instanceContext 实例上下文*/public DatabaseDiscoveryRule(final String databaseName, final Map<String, DataSource> dataSourceMap, final DatabaseDiscoveryRuleConfiguration ruleConfig, final InstanceContext instanceContext) {configuration = ruleConfig;this.databaseName = databaseName;this.dataSourceMap = dataSourceMap;this.instanceContext = instanceContext;this.scheduleContext = ScheduleContextFactory.newInstance(instanceContext.getModeConfiguration());discoveryTypes = getDiscoveryProviderAlgorithms(ruleConfig.getDiscoveryTypes());dataSourceRules = getDataSourceRules(ruleConfig.getDataSources(), ruleConfig.getDiscoveryHeartbeats());findPrimaryReplicaRelationship(databaseName, dataSourceMap);initHeartBeatJobs();}/*** 解析配置的算法信息,根据类型创建对应的算法对象* @param discoveryTypesConfig* @return*/private static Map<String, DatabaseDiscoveryProviderAlgorithm> getDiscoveryProviderAlgorithms(final Map<String, AlgorithmConfiguration> discoveryTypesConfig) {Map<String, DatabaseDiscoveryProviderAlgorithm> result = new LinkedHashMap<>(discoveryTypesConfig.size(), 1);for (Entry<String, AlgorithmConfiguration> entry : discoveryTypesConfig.entrySet()) {result.put(entry.getKey(), ShardingSphereAlgorithmFactory.createAlgorithm(entry.getValue(), DatabaseDiscoveryProviderAlgorithm.class));}return result;}/*** 获取数据源对象,key:组名,一组一个DatabaseDiscoveryDataSourceRule对象* @param dataSources 配置的数据源集合* @param heartbeatConfig 心跳配置* @return*/private Map<String, DatabaseDiscoveryDataSourceRule> getDataSourceRules(final Collection<DatabaseDiscoveryDataSourceRuleConfiguration> dataSources,final Map<String, DatabaseDiscoveryHeartBeatConfiguration> heartbeatConfig) {Map<String, DatabaseDiscoveryDataSourceRule> result = new HashMap<>(dataSources.size(), 1);// 遍历 配置的数据源集合for (DatabaseDiscoveryDataSourceRuleConfiguration each : dataSources) {// key为组名result.put(each.getGroupName(), new DatabaseDiscoveryDataSourceRule(each, Strings.isNullOrEmpty(each.getDiscoveryHeartbeatName()) ? new Properties(): heartbeatConfig.get(each.getDiscoveryHeartbeatName()).getProps(), discoveryTypes.get(each.getDiscoveryTypeName())));}return result;}/*** 查找主副本关系* @param databaseName* @param dataSourceMap 数据源Map集合*/private void findPrimaryReplicaRelationship(final String databaseName, final Map<String, DataSource> dataSourceMap) {// 遍历配置的数据源规则for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {String groupName = entry.getKey();DatabaseDiscoveryDataSourceRule dataSourceRule = entry.getValue();// 从dataSourceMap中获取数据库发现配置的数据源对象Map<String, DataSource> originalDataSourceMap = dataSourceRule.getDataSourceGroup(dataSourceMap);// 创建数据库发现引擎DatabaseDiscoveryEngine engine = new DatabaseDiscoveryEngine(dataSourceRule.getDatabaseDiscoveryProviderAlgorithm(), instanceContext.getEventBusContext());engine.checkEnvironment(databaseName, originalDataSourceMap);// 通过引擎,修改主数据源名称dataSourceRule.changePrimaryDataSourceName(engine.changePrimaryDataSource(databaseName, groupName, entry.getValue().getPrimaryDataSourceName(), originalDataSourceMap, dataSourceRule.getDisabledDataSourceNames()));}}/*** 从Map数据源集合的value中获取第一个数据源规则对象* @return*/public DatabaseDiscoveryDataSourceRule getSingleDataSourceRule() {return dataSourceRules.values().iterator().next();}/*** 根据数据源名称,获取数据库发现数据源规则* @param dataSourceName* @return*/public Optional<DatabaseDiscoveryDataSourceRule> findDataSourceRule(final String dataSourceName) {return Optional.ofNullable(dataSourceRules.get(dataSourceName));}@Overridepublic Map<String, Collection<String>> getDataSourceMapper() {Map<String, Collection<String>> result = new HashMap<>();for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {result.putAll(entry.getValue().getDataSourceMapper());}return result;}@Overridepublic void restartHeartBeatJob(final DataSourceStatusChangedEvent event) {PrimaryDataSourceChangedEvent dataSourceEvent = (PrimaryDataSourceChangedEvent) event;QualifiedDatabase qualifiedDatabase = dataSourceEvent.getQualifiedDatabase();DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDatabase.getGroupName());Preconditions.checkNotNull(dataSourceRule, "Can not find database discovery data source rule in database `%s`", databaseName);dataSourceRule.changePrimaryDataSourceName(qualifiedDatabase.getDataSourceName());initHeartBeatJobs();}@Overridepublic void closeSingleHeartBeatJob(final String groupName) {DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(groupName);Preconditions.checkNotNull(dataSourceRule, "Can not find database discovery data source rule in database `%s`", databaseName);scheduleContext.closeSchedule(dataSourceRule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + dataSourceRule.getGroupName());}@Overridepublic void closeAllHeartBeatJob() {for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {DatabaseDiscoveryDataSourceRule rule = entry.getValue();scheduleContext.closeSchedule(rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName());}}/*** 初始化心跳检测的任务*/private void initHeartBeatJobs() {for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {DatabaseDiscoveryDataSourceRule rule = entry.getValue();// 创建任务名称String jobName = rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName();// 根据配置信息,创建cron任务CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),rule.getDatabaseDiscoveryProviderAlgorithm(), rule.getDisabledDataSourceNames(), instanceContext.getEventBusContext()).execute(null),rule.getHeartbeatProps().getProperty("keep-alive-cron"));// 添加到日程上下文中,根据cron,定时检测心跳scheduleContext.startSchedule(job);}}@Overridepublic String getPrimaryDataSourceName(final String dataSourceName) {return dataSourceRules.get(dataSourceName).getPrimaryDataSourceName();}/*** 通过数据源规则,从对应的逻辑数据源(组名)中获取副本数据源名称* @param dataSourceName data source name* @return*/@Overridepublic Collection<String> getReplicaDataSourceNames(final String dataSourceName) {return dataSourceRules.get(dataSourceName).getReplicaDataSourceNames();}@Overridepublic void updateStatus(final DataSourceStatusChangedEvent event) {StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(dataSourceChangedEvent.getQualifiedDatabase().getGroupName());Preconditions.checkNotNull(dataSourceRule, "Can not find database discovery data source rule in database `%s`", databaseName);if (StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus())) {dataSourceRule.disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());} else {dataSourceRule.enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());}}@Overridepublic Map<String, Object> getExportData() {return Collections.singletonMap(ExportableConstants.EXPORT_DB_DISCOVERY_PRIMARY_DATA_SOURCES, exportPrimaryDataSourceMap());}private Map<String, String> exportPrimaryDataSourceMap() {Map<String, String> result = new HashMap<>(dataSourceRules.size(), 1);dataSourceRules.forEach((name, dataSourceRule) -> result.put(dataSourceRule.getGroupName(), dataSourceRule.getPrimaryDataSourceName()));return result;}@Overridepublic String getType() {return DatabaseDiscoveryRule.class.getSimpleName();}
}
4.1 构造方法
在构造方法中,执行如下:
1)记录配置对象、数据库名、配置的数据源、示例上下文;
2)创建日程上下文,用于心跳检测;
3)通过配置的发现规则,创建对应的发现提供器算法DatabaseDiscoveryProviderAlgorithm对象,存放在Map中,key为配置的提供者名称;
3.1)通过配置的发现类型的 type 值,使用SPI,获取对应的 DatabaseDiscoveryProviderAlgorithm 算法对象;
3.2)可配置的type值包括:
3.2.1)openGauss.NORMAL_REPLICATION:openGauss数据库的普通复制(主从),对应的算法对象为:OpenGaussNormalReplicationDatabaseDiscoveryProviderAlgorithm;
3.2.2)MySQL.MGR:MySQL是MGR机制,对应的算法对象为:MGRMySQLDatabaseDiscoveryProviderAlgorithm;
3.2.3)MySQL.NORMAL_REPLICATION:MySQL的普通复制(主从),对应的算法对象为:MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm;
4)通过配置的发现数据源、心跳检测规则,创建DatabaseDiscoveryDataSourceRule对象,存放在Map中,key为组名;
5)查找 DatabaseDiscoveryDataSourceRule 数据源中的主副本关系;
5.1)遍历 4)中的 DatabaseDiscoveryDataSourceRule;
5.2)执行DatabaseDiscoveryDataSourceRule的getDataSourceGroup()方法,从当前系统配置的数据源中查找当前数据库发现规则中配置的数据源DataSource的Map对象;
5.3)创建DatabaseDiscoveryEngine引擎;
5.4)通过DatabaseDiscoveryEngine引擎,进行环境检测,即检测对应数据源的有效性。通过对应的发现提供器算法DatabaseDiscoveryProviderAlgorithm对象,连接数据源,进行检测;
5.5)自动检测数据库发现中配置的数据源中的主数据源,并记录。通过对应的发现提供器算法DatabaseDiscoveryProviderAlgorithm对象,连接数据源,获取当前对应组的主数据源;
4.2 获取主数据源
通过当前分片或单表转换后的数据源名,匹配对应数据库发现规则的组名,找到 DatabaseDiscoveryDataSourceRule,获取主数据源名称。
4.3 获取副本数据源
通过当前分片或单表转换后的数据源名,匹配对应数据库发现规则的组名,找到 DatabaseDiscoveryDataSourceRule,获取数据库发现规则中配置的数据源中,可用且不是主数据源的其他数据源。
MySQL的MGR
简介
MySQL MGR(MySQL Group Replication)是MySQL官方在MySQL 5.7.17版本中以插件形式推出的主从复制高可用技术,可以为MySQL集群系统提供数据冗余和故障转移能力。
MGR基于原生的主从复制,将各节点归入到一个组中,通过组内节点的通信协商(组通信协议基于Paxos算法),实现数据的强一致性、故障探测、冲突检测、节点加组、节点离组等功能。它使用Paxos分布式算法来提供节点间的分布式协调,要求组中大多数节点在线才能达到法定票数,从而对一个决策做出一致的决定。
工作模式
MGR可以以单主模式或多主模式运行,通过group_replication_single_primary_mode=[ON|OFF]变量指定工作模式。组内所有成员都必须运行相同的工作模式。
-
单主模式:从复制组中众多个MySQL节点中自动选举一个master节点,只有master节点可以写,其他节点自动设置为read only。当master节点故障时,会自动选举一个新的master节点,选举成功后,它将设置为可写,其他slave将指向这个新的master。
-
多主模式:复制组中的任何一个节点都可以写,因此没有master和slave的概念。只要突然故障的节点数量不太多,这个多主模型就能继续可用。但需要注意的是,多主模式下可能存在数据冲突和一致性问题,因此在使用时需要谨慎。
与普通主从复制的区别
MySQL组复制分单主模式和多主模式。如果仅使用MySQL主从模式,MySQL主从模式的复制技术仅解决了数据同步的问题,如果 master 宕机,意味着数据库管理员需要介入,应用系统可能需要修改数据库连接地址或者重启才能实现。组复制在数据库层面上做到了,只要集群中大多数主机可用,则服务可用,例如有一个3台的服务器集群,则允许其中1台宕机。
常用SQL语句
1)查看集群成员及其状态
SELECT * FROM performance_schema.replication_group_members;
此语句用于显示集群中所有成员的信息,包括成员ID、主机名、端口号、状态、角色和版本等。
2)检查MySQL服务器上group_replication插件的状态
SELECT PLUGIN_STATUS FROM information_schema.PLUGINS WHERE PLUGIN_NAME='group_replication';
这个查询会返回group_replication插件的当前状态,状态值可能是以下几种之一:
ACTIVE:表示插件已启用并正在运行。
INACTIVE:表示插件已安装但未启用。
DISABLED:表示插件已被禁用(在某些MySQL版本中,这个状态可能不被明确使用,插件可能只显示为INACTIVE,但通过设置disabled_plugins系统变量来禁用)。
3)检查MySQL服务器上MGR是否以单主模式执行
SELECT VARIABLE_VALUE FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_single_primary_mode'
这个查询会返回一个结果集,其中包含一个列VARIABLE_VALUE,该列的值指示了group_replication_single_primary_mode的设置:
如果值为ON,则表示MGR集群配置为单主模式。在这种模式下,集群会自动选举一个主节点来处理写操作,而其他节点则作为从节点,只能处理读操作(除非配置了读写分离策略)。
如果值为OFF,则表示MGR集群配置为多主模式。在这种模式下,集群中的任何节点都可以处理写操作,但需要小心处理数据冲突和一致性问题。
MGRMySQLDatabaseDiscoveryProviderAlgorithm
MGRMySQLDatabaseDiscoveryProviderAlgorithm为MySQL的MGR对应的数据库发现提供者算法对象。
MGRMySQLDatabaseDiscoveryProviderAlgorithm的源码如下:
package org.apache.shardingsphere.dbdiscovery.mysql.type;/*** MySQL Group Replication 数据库发现提供算法*/
@Getter
public final class MGRMySQLDatabaseDiscoveryProviderAlgorithm implements DatabaseDiscoveryProviderAlgorithm {/*** 查询 MGR 状态*/private static final String QUERY_PLUGIN_STATUS = "SELECT PLUGIN_STATUS FROM information_schema.PLUGINS WHERE PLUGIN_NAME='group_replication'";/*** 查询单主节点模式的可用值*/private static final String QUERY_SINGLE_PRIMARY_MODE = "SELECT VARIABLE_VALUE FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_single_primary_mode'";/*** 查询组名*/private static final String QUERY_GROUP_NAME = "SELECT VARIABLE_VALUE FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_group_name'";/*** 查询所有数据源节点信息*/private static final String QUERY_MEMBER_LIST = "SELECT MEMBER_HOST, MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members";/*** 查询主数据源节点*/private static final String QUERY_PRIMARY_DATA_SOURCE = "SELECT MEMBER_HOST, MEMBER_PORT FROM performance_schema.replication_group_members WHERE MEMBER_ID = "+ "(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'group_replication_primary_member')";/*** 查询某个ip节点的数据源状态*/private static final String QUERY_CURRENT_MEMBER_STATE = "SELECT MEMBER_STATE FROM performance_schema.replication_group_members WHERE MEMBER_HOST=? AND MEMBER_PORT=?";private Properties props;@Overridepublic void init(final Properties props) {this.props = props;}/*** 环境检测* @param databaseName database name* @param dataSources data sources*/@Overridepublic void checkEnvironment(final String databaseName, final Collection<DataSource> dataSources) {// 创建线程池ExecutorService executorService = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService();Collection<CompletableFuture<Void>> completableFutures = new LinkedList<>();// 遍历检测数据源for (DataSource dataSource : dataSources) {completableFutures.add(runAsyncCheckEnvironment(databaseName, dataSource, executorService));}CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));Iterator<CompletableFuture<Void>> mgrInstancesFuture = completableFutures.stream().iterator();while (mgrInstancesFuture.hasNext()) {// 确保所有的数据源都是有效的mgrInstancesFuture.next().join();}}/*** 异步运行环境检测* @param databaseName* @param dataSource* @param executorService* @return*/private CompletableFuture<Void> runAsyncCheckEnvironment(final String databaseName, final DataSource dataSource, final ExecutorService executorService) {return CompletableFuture.runAsync(() -> {try {checkSingleDataSourceEnvironment(databaseName, dataSource);} catch (final SQLException ex) {throw new SQLWrapperException(ex);}}, executorService);}/*** 检测单数据源环境* @param databaseName* @param dataSource* @throws SQLException*/private void checkSingleDataSourceEnvironment(final String databaseName, final DataSource dataSource) throws SQLException {try (Connection connection = dataSource.getConnection();Statement statement = connection.createStatement()) {checkPluginActive(databaseName, statement);checkSinglePrimaryMode(databaseName, statement);checkGroupName(databaseName, statement);checkMemberInstanceURL(databaseName, connection.getMetaData().getURL(), statement);}}/*** 检测MGR插件状态,确认是否为ACTIVE* @param databaseName* @param statement* @throws SQLException*/private void checkPluginActive(final String databaseName, final Statement statement) throws SQLException {try (ResultSet resultSet = statement.executeQuery(QUERY_PLUGIN_STATUS)) {ShardingSpherePreconditions.checkState(resultSet.next() && "ACTIVE".equals(resultSet.getString("PLUGIN_STATUS")), () -> new InvalidMGRPluginException(databaseName));}}/*** 检测单主节点模式的可用值* @param databaseName* @param statement* @throws SQLException*/private void checkSinglePrimaryMode(final String databaseName, final Statement statement) throws SQLException {try (ResultSet resultSet = statement.executeQuery(QUERY_SINGLE_PRIMARY_MODE)) {ShardingSpherePreconditions.checkState(resultSet.next() && "ON".equals(resultSet.getString("VARIABLE_VALUE")), () -> new InvalidMGRModeException(databaseName));}}/*** 检测组名是否和配置的组名一致* @param databaseName* @param statement* @throws SQLException*/private void checkGroupName(final String databaseName, final Statement statement) throws SQLException {try (ResultSet resultSet = statement.executeQuery(QUERY_GROUP_NAME)) {ShardingSpherePreconditions.checkState(resultSet.next() && props.getProperty("group-name", "").equals(resultSet.getString("VARIABLE_VALUE")),() -> new InvalidMGRGroupNameConfigurationException(props.getProperty("group-name"), databaseName));}}/*** 检测url对应的数据源的地址是否为MGR集群中的节点* @param databaseName* @param url* @param statement* @throws SQLException*/private void checkMemberInstanceURL(final String databaseName, final String url, final Statement statement) throws SQLException {try (ResultSet resultSet = statement.executeQuery(QUERY_MEMBER_LIST)) {while (resultSet.next()) {if (url.contains(String.join(":", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT")))) {return;}}}throw new InvalidMGRReplicationGroupMemberException(url, databaseName);}/*** 判断dataSource是否为主节点* @param dataSource data source to be judged* @return* @throws SQLException*/@Overridepublic boolean isPrimaryInstance(final DataSource dataSource) throws SQLException {try (Connection connection = dataSource.getConnection();Statement statement = connection.createStatement();ResultSet resultSet = statement.executeQuery(QUERY_PRIMARY_DATA_SOURCE)) {if (resultSet.next()) {MySQLDataSourceMetaData metaData = new MySQLDataSourceMetaData(connection.getMetaData().getURL());return metaData.getHostname().equals(resultSet.getString("MEMBER_HOST")) && Integer.toString(metaData.getPort()).equals(resultSet.getString("MEMBER_PORT"));}}return false;}@Overridepublic ReplicaDataSourceStatus loadReplicaStatus(final DataSource replicaDataSource) throws SQLException {try (Connection connection = replicaDataSource.getConnection()) {return new ReplicaDataSourceStatus(isOnlineDataSource(connection, new MySQLDataSourceMetaData(connection.getMetaData().getURL())), 0L);}}/*** 判断当前的metaData是否为online的节点* @param connection* @param metaData* @return* @throws SQLException*/private boolean isOnlineDataSource(final Connection connection, final MySQLDataSourceMetaData metaData) throws SQLException {try (PreparedStatement preparedStatement = connection.prepareStatement(QUERY_CURRENT_MEMBER_STATE)) {preparedStatement.setString(1, metaData.getHostname());preparedStatement.setString(2, Integer.toString(metaData.getPort()));try (ResultSet resultSet = preparedStatement.executeQuery()) {return resultSet.next() && "ONLINE".equals(resultSet.getString("MEMBER_STATE"));}}}@Overridepublic String getType() {return "MySQL.MGR";}
}
在MGRMySQLDatabaseDiscoveryProviderAlgorithm类中,主要完成以下实现:
1)数据源有效性检测;
通过数据源对象,执行对应MGR的SQL查询语句,检查MGR对应的状态;
2)主节点判断;
通过数据源对象,执行MGR查询主节点的SQL语句,检查对应数据源在MGR组中的状态是否为主节点;
3)副本节点状态检测;
通过数据源对象,执行MGR查询节点的SQL语句,检查对应数据源在MGR组中的状态是否为在线;
对于数据库发现规则为MySQL.MGR的读写分离动态策略中,对应的DynamicReadwriteSplittingStrategy对象中的DynamicDataSourceContainedRule对象为DatabaseDiscoveryRule对象,其对应的提供者算法为MGRMySQLDatabaseDiscoveryProviderAlgorithm对象。
在DatabaseDiscoveryRule对象中,通过MGRMySQLDatabaseDiscoveryProviderAlgorithm对象,获取组中的主数据源。从而保证了主库的可用性。
DatabaseDiscoverySQLRouter
数据库发现规则也可以单独作为路由器进行SQL语句的数据源路由,其执行在读写分离路由之后。
DatabaseDiscoverySQLRouter的源码如下:
package org.apache.shardingsphere.dbdiscovery.route;/*** 数据库发现SQL路由器*/
public final class DatabaseDiscoverySQLRouter implements SQLRouter<DatabaseDiscoveryRule> {@Overridepublic RouteContext createRouteContext(final QueryContext queryContext, final ShardingSphereDatabase database,final DatabaseDiscoveryRule rule, final ConfigurationProperties props, final ConnectionContext connectionContext) {RouteContext result = new RouteContext();// 获取第一个DatabaseDiscoveryDataSourceRuleDatabaseDiscoveryDataSourceRule singleDataSourceRule = rule.getSingleDataSourceRule();// 路由到主数据源String dataSourceName = new DatabaseDiscoveryDataSourceRouter(singleDataSourceRule).route();// 创建路由单元,添加到路由上下文中result.getRouteUnits().add(new RouteUnit(new RouteMapper(singleDataSourceRule.getGroupName(), dataSourceName), Collections.emptyList()));return result;}@Overridepublic void decorateRouteContext(final RouteContext routeContext,final QueryContext queryContext, final ShardingSphereDatabase database, final DatabaseDiscoveryRule rule,final ConfigurationProperties props, final ConnectionContext connectionContext) {Collection<RouteUnit> toBeRemoved = new LinkedList<>();Collection<RouteUnit> toBeAdded = new LinkedList<>();for (RouteUnit each : routeContext.getRouteUnits()) {// 获取当前路由上下文的数据源名称String dataSourceName = each.getDataSourceMapper().getLogicName();// 获取数据源对应的数据库发现规则Optional<DatabaseDiscoveryDataSourceRule> dataSourceRule = rule.findDataSourceRule(dataSourceName);// 存在对应的数据库发现规则if (dataSourceRule.isPresent() && dataSourceRule.get().getGroupName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {toBeRemoved.add(each);// 获取规则中的主数据源String actualDataSourceName = new DatabaseDiscoveryDataSourceRouter(dataSourceRule.get()).route();// 创建路由单元,添加到路由上下文中toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));}}routeContext.getRouteUnits().removeAll(toBeRemoved);routeContext.getRouteUnits().addAll(toBeAdded);}@Overridepublic int getOrder() {return DatabaseDiscoveryOrder.ORDER;}@Overridepublic Class<DatabaseDiscoveryRule> getTypeClass() {return DatabaseDiscoveryRule.class;}
}
数据库发现路由器的路由规则比较简单,在RouteContext上下文装饰的方法中,遍历路由单元,获取当前路由数据源,匹配数据库发现中的组名,获取对应组中的主数据源。
注:在数据库发现规则中,所有的操作都是路由到主库。
小结
关于数据库发现规则先介绍到这里,以下做一个小结:
1)数据库发现规则可以和读写分离规则结合一起使用;
1.1)在读写分离规则中,配置读写分离策略为动态策略及“允许自动识别的数据源名称”;
1.2)配置数据库发现规则的数据源组名,为对应的“允许自动识别的数据源名称”;
1.3)配置数据库发现规则的提供者类型,不同的提供者类型有对应的提供者算法;
1.4)数据库发现规则对应的对象为DatabaseDiscoveryRule,该对象主要执行如下:
a)创建日程上下文,用于心跳检测,根据配置的cron,定时访问数据源,确认数据源的状态;
b)通过配置的发现规则,创建对应的发现提供器算法DatabaseDiscoveryProviderAlgorithm对象,存放在Map中,key为配置的提供者名称;
c)通过配置的发现数据源、心跳检测规则,创建DatabaseDiscoveryDataSourceRule对象,存放在Map中,key为组名;
d)查找 DatabaseDiscoveryDataSourceRule 数据源中的主副本关系,确认主数据源名称;
e)提供通过组名,获取主数据源的方法。从DatabaseDiscoveryDataSourceRule中获取;
f)提供通过组名,获取副本数据源的方法。从DatabaseDiscoveryDataSourceRule中获取;
g)通过主、副数据源,结合读写分离规则,实现了读写分离;
2)数据库发现规则也可以单独作为数据源路由器使用;
在数据库发现规则路由器中,通过相应的数据库提供者算法,获取主从数据库中的主库。将SQL路由到对应组的主数据源中执行;
3)系统默认执行的数据库发现类型包括openGauss.NORMAL_REPLICATION、MySQL.MGR:MySQL、MySQL.NORMAL_REPLICATION;
3.1)NORMAL_REPLICATION:为普通的主从复制,通过对应数据库的主从操作SQL语句确认数据源的信息(如 show slave status 等);
3.2)MySQL.MGR:MySQL Group Replication 是MySQL官方在MySQL 5.7.17版本中以插件形式推出的主从复制高可用技术,在主从复制的基础上,添加了主库选举等功能,确保主库的高可用等。在ShardingSphere中,通过MGR相关的操作SQL语句确认数据源的信息;
关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。
相关文章:

【源码】Sharding-JDBC源码分析之SQL中读写分离动态策略、数据库发现规则及DatabaseDiscoverySQLRouter路由的原理
Sharding-JDBC系列 1、Sharding-JDBC分库分表的基本使用 2、Sharding-JDBC分库分表之SpringBoot分片策略 3、Sharding-JDBC分库分表之SpringBoot主从配置 4、SpringBoot集成Sharding-JDBC-5.3.0分库分表 5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表 6、【…...

Spark 之 partitons
Listing leaf files and directories 分析其并行化 org.apache.spark.util.HadoopFSUtils sc.parallelize(paths, numParallelism).mapPartitions { pathsEachPartition >val hadoopConf serializableConfiguration.valuepathsEachPartition.map { path >val leafFiles…...

使用Zerotier配置虚拟局域网,踏坑
配置虚拟局域网有多种方式,包括带桌面的和纯网络的。 一、带桌面的(不是本次重点) 常见工具:向日葵、todesk、anydesk、restDesk 前两者是常见商业工具,anydesk好像很轻,restDesk是开源项目,…...

【优选算法 二分查找】二分查找算法模板详解:二分查找 & 在排序数组中查找元素的第一个和最后一个位置
二分查找 题目描述 题目解析 暴力解法 我们可以从左往右遍历一次数组,如果存在 target 则返回数组的下标,否则返回 -1; 时间复杂度 O(N),因为没有利用数组有序的特点,每次比较只能舍弃一个要比较的数&…...

gitlab 生成并设置 ssh key
一、介绍 🎯 本文主要介绍 SSH Key 的生成方法,以及如何在GitLab上添加SSH Key。GitLab 使用SSH协议与Git 进行安全通信。当您使用 SSH密钥 对 GitLab远程服务器进行身份验证时,您不需要每次都提供您的用户名和密码。SSH使用两个密钥&#x…...

计算机视觉在科学研究(数字化)中的实际应用
计算机视觉是一种利用计算机技术来解析和理解图像和视频的方法。.随着计算机技术的不断发展,计算机视觉被广泛应用于科学研究领域,为科学家提供了无限的可能。 一、生命科学领域 在生命科学领域,计算机视觉被广泛用于图像识别、分类和测量等…...

移动应用开发课程第六次实验:为实验2添加登陆页面,用SQList存储好友基本信息
1、在Android Studio中,请在第二次实验成果的基础上完成以下实验要求。 向右滑动 请添加登录页面。在登录页面中,如果用户输入的用户名和密码正确,则跳转至如上图所示的好友列表,并记录用户的登录信息,在用户第一次登…...

nextjs增加系统路径前缀(basePath)适配方案
在 Next.js 中,路由是通过文件夹结构来定义的,使用类似于 History 模式的 URL 结构。所以如果想通过nginx来代理一个nextjs开发的系统, 除非直接使用跟路径/来进行代理,否则代理将非常麻烦,这时添加basePath就非常有必…...

嵌入式蓝桥杯学习拓展 LCD翻转显示
通过配置SS和GS两个标志位,实现扫描方向的切换。 将lcd.c的REG_932X_Init函数进行部分修改。 将LCD_WriteReg(R1, 0x0000);修改为LCD_WriteReg(R1,0x0100); 将LCD_WriteReg(R96, 0x2700); 修改为LCD_WriteReg(R96, 0xA700); void REG_932X_Init1(void) {LCD_Wr…...

学习threejs,实现配合使用WebWorker
👨⚕️ 主页: gis分享者 👨⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️WebWorker web端多线程 二、…...

TDengine 新功能 复合主键
1. 简介 从 TDengine 3.3.0.0 版本之后,新增了复合主键的功能。 TDengine 原来的时间列是不允许有重复时间戳的,有了复合主键功能后,时间列即允许有重复,重复后的时间戳按紧跟其后第二列主键列的值来确定唯一性。 此功能的常用…...

JVM 面试题
Java 虚拟机(JVM)是运行 Java 程序的引擎,它是 Java 语言 “一次编译,处处运行” 的核心技术。JVM 的主要任务是将 Java 字节码(Bytecode)解释成机器码并执行,负责内存管理、线程管理、垃圾回收…...

组件上传图片不回显问题
import { Plus } from "element-plus/icons-vue"; // 图片上传 const img_add ref([]); function httpRequest_add(option) {let dataForm new FormData();dataForm.append("file", option.file);dataForm.append("id", user.data.id);axios({…...

【JavaWeb后端学习笔记】Spring AOP面向切面编程
AOP 1、Spring AOP概述2、SpringAOP快速入门3、SpringAOP核心概念4、通知类型5、通知顺序6、切入点表达式6.1 execution方式6.2 annotation方式 7、连接点 1、Spring AOP概述 AOP:Aspect Oriented Programming,面向特定方法编程。 AOP是通过动态代理技术…...

6.584-Lab5B
6.584-Lab5B Reference CodeReference BlogHomeworkMyself Code Sharded Key/Value Service 梗概 这个图是我从上面参考blog中拿来的,觉得做的不错,借助这张图来讲解一下需要一个什么样的 Service。 ShardCtrler Client: 接收来自客户发出的命…...

OceanBase 的探索与实践
作者:来自 vivo 互联网数据库团队- Xu Shaohui 本文总结了目前我们遇到的痛点问题并通过 OceanBase 的技术方案解决了这些痛点问题,完整的描述了 OceanBase 的实施落地,通过迁移到 OceanBase 实践案例中遇到的问题与解决方案让大家能更好的了…...

安卓调试环境搭建
前言 前段时间电脑重装了系统,最近准备调试一个apk,没想到装环境的过程并不顺利,很让人火大,于是记录一下。 反编译工具下载 下载apktool.bat和apktool.jar 官网地址:https://ibotpeaches.github.io/Apktool/install…...

动画Lottie
Lottie简介 Lottie是一个Airbnb 开发的用于Android,iOS,Web和Windows的库,用于解析使用Bodymovin导出为json的Adobe After Effects动画,并在移动设备和网络上呈现 — GitHub Lottie主要特性 After Effects 兼容性: …...

C++感受14-Hello Object 封装版 - 上
1. 封装即约束——封装和派生、多态的本质区别 一门计算机语言,要如何帮助程序员写出优秀的代码?两个方法:一是给程序员更多能力,二是给程序员更多约束。之前我们学习的派生和多态,更多的是给我们技能,而封…...

网络安全中大数据和人工智能应用实践
传统的网络安全防护手段主要是通过单点的网络安全设备,随着网络攻击的方式和手段不断的变化,大数据和人工智能技术也在最近十年飞速地发展,网络安全防护也逐渐开始拥抱大数据和人工智能。传统的安全设备和防护手段容易形成数据孤岛࿰…...

RISC-V架构下OP-TEE 安全系统实践
安全之安全(security)博客目录导读 本篇博客,我们聚焦RISC-V 2024中国峰会上的RISC-V和OP-TEE结合的一个安全系统实践,来自芯来科技桂兵老师。 关于RISC-V TEE(可信执行环境)的相关方案,如感兴趣可参考R...

40分钟学 Go 语言高并发:【实战】分布式缓存系统
【实战课程】分布式缓存系统 一、整体架构设计 首先,让我们通过架构图了解分布式缓存系统的整体设计: 核心组件 组件名称功能描述技术选型负载均衡层请求分发、节点选择一致性哈希缓存节点数据存储、过期处理内存存储 持久化同步机制节点间数据同步…...

[创业之路-186]:《华为战略管理法-DSTE实战体系》-1-为什么UTStarcom死了,华为却活了,而且越活越好?
目录 前言 一、市场定位与战略选择 二、技术创新能力 三、企业文化与团队建设 四、应对危机的能力 五、客户为中心的理念 六、市场适应性与战略灵活性 七、技术创新与研发投入 八、企业文化与团队建设 九、应对危机的能力 前言 UT斯达康(UTStarcom&#…...

python如何多行注释
在Python中,多行注释通常有两种方式: 使用三个单引号()或三个双引号(""")来创建多行字符串,这可以被用来作为多行注释。这种方式在Python中实际上是创建了一个多行的字符串对象…...

前端工程化面试题目常见
前端工程化面试常见题目包括: • 谈谈你对WebPack的认识。 • Webpack打包的流程是什么? • 说说你工作中几个常用的loader。 • 说说HtmlWebpackPlugin插件的作用。 • Webpack支持的脚本模块规范有哪些? • Webpack和gulp/grunt相比有什么特…...

定点数的乘除运算
原码一位乘法 乘积的符号由两个数的符号位异或而成。(不参与运算)被乘数和乘数均取绝对值参与运算,看作无符号数。乘数的最低位为Yn: 若Yn1,则部分积加上被乘数|x|,然后逻辑右移一位;若Yn0&…...

页面置换算法模拟 最近最久未使用(LRU)算法
最近最久未使用(LRU)算法是一种基于页面访问历史的页面置换算法。它选择最久未使用的页面进行置换。当需要访问一个不在内存中的页面时,如果内存已满,则选择最久未使用的页面进行置换。LRU算法通过记录页面的访问时间戳来判断页面…...

Ubuntu与Centos系统有何区别?
Ubuntu和CentOS都是基于Linux内核的操作系统,但它们在设计理念、使用场景和技术实现上有显著的区别。以下是详细的对比: 1. 基础和发行版本 Ubuntu: 基于Debian,使用.deb包管理系统。包含两个主要版本: LTSÿ…...

RK3568平台开发系列讲解(pinctrl 子系统篇)pinctrl_debug
🚀返回专栏总目录 文章目录 1. Overview2. debug信息2.1 pinctrl-devices2.2. pinctrl-handles2.3. pinctrl-handles3. debug信息3.1. 查看(pinctrl_register_pins)注册了哪些pins3.2. 查看pin groups;3.3. 查看每种functions所占用的gpio groups信息:3.4. pinconf沉淀、…...

避大坑!Vue3中reactive丢失响应式的问题
在vue3中,我们定义响应式数据无非是ref和reactive。 但是有的小伙伴会踩雷!导致定义的响应式丢失的问题。 reactive丢失响应式的情况1(直接赋值) 场景: 1.你定义了一个数据:let datareactive({name:"",age:"" }) 2.然后你…...