当前位置: 首页 > news >正文

聊聊ShardingSphere是怎么进行sql重写的

本文主要研究一下ShardingSphere进行sql重写的原理

prepareStatement

org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java

public final class ShardingSphereConnection extends AbstractConnectionAdapter {@Overridepublic PreparedStatement prepareStatement(final String sql) throws SQLException {return new ShardingSpherePreparedStatement(this, sql);}//......
}    

ShardingSphereConnection的prepareStatement创建的是ShardingSpherePreparedStatement

ShardingSpherePreparedStatement

org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter {@Getterprivate final ShardingSphereConnection connection;public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false, null);}private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,final String[] columns) throws SQLException {if (Strings.isNullOrEmpty(sql)) {throw new EmptySQLException().toSQLException();}this.connection = connection;metaDataContexts = connection.getContextManager().getMetaDataContexts();SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);hintValueContext = sqlParserRule.isSqlCommentParseEnabled() ? new HintValueContext() : SQLHintUtils.extractHint(sql).orElseGet(HintValueContext::new);this.sql = sqlParserRule.isSqlCommentParseEnabled() ? sql : SQLHintUtils.removeHint(sql);statements = new ArrayList<>();parameterSets = new ArrayList<>();SQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType()));sqlStatement = sqlParserEngine.parse(this.sql, true);sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData(), sqlStatement, connection.getDatabaseName());parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);statementOption = returnGeneratedKeys ? new StatementOption(true, columns) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);executor = new DriverExecutor(connection);JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.getDatabaseConnectionManager().getConnectionContext());batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getDatabaseName());kernelProcessor = new KernelProcessor();statementsCacheable = isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData());trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable();statementManager = new StatementManager();}//......
}    

ShardingSpherePreparedStatement继承了AbstractPreparedStatementAdapter,其构造器主要是通过SQLParserEngine解析sql得到SQLStatement,创建DriverExecutor、BatchPreparedStatementExecutor、KernelProcessor、StatementManager;这里即使useServerPrepStmts=true,也不会触发mysql server的prepare操作

executeUpdate

    public int executeUpdate() throws SQLException {try {if (statementsCacheable && !statements.isEmpty()) {resetParameters();return statements.iterator().next().executeUpdate();}clearPrevious();QueryContext queryContext = createQueryContext();trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);if (null != trafficInstanceId) {JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());}executionContext = createExecutionContext(queryContext);if (hasRawExecutionRule()) {Collection<ExecuteResult> executeResults = executor.getRawExecutor().execute(createRawExecutionGroupContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback());return accumulate(executeResults);}return isNeedImplicitCommitTransaction(connection, executionContext) ? executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate();// CHECKSTYLE:OFF} catch (final RuntimeException ex) {// CHECKSTYLE:ONhandleExceptionInTransaction(connection, metaDataContexts);throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());} finally {clearBatch();}}private void clearPrevious() {statements.clear();parameterSets.clear();generatedValues.clear();}private ExecutionContext createExecutionContext(final QueryContext queryContext) {ShardingSphereRuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName());SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext());ExecutionContext result = kernelProcessor.generateExecutionContext(queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());findGeneratedKey(result).ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));return result;}

这里executeUpdate会先执行clearPrevious方法,清空statements、parameterSets、generatedValues,然后createExecutionContext,这里有一步是kernelProcessor.generateExecutionContext

KernelProcessor

generateExecutionContext

shardingsphere-infra-context-5.4.0-sources.jar!/org/apache/shardingsphere/infra/connection/kernel/KernelProcessor.java

    public ExecutionContext generateExecutionContext(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,final ConfigurationProperties props, final ConnectionContext connectionContext) {RouteContext routeContext = route(queryContext, database, globalRuleMetaData, props, connectionContext);SQLRewriteResult rewriteResult = rewrite(queryContext, database, globalRuleMetaData, props, routeContext, connectionContext);ExecutionContext result = createExecutionContext(queryContext, database, routeContext, rewriteResult);logSQL(queryContext, props, result);return result;}

KernelProcessor的generateExecutionContext方法先创建routeContext,然后执行rewrite,最后执行createExecutionContext

rewrite

    private SQLRewriteResult rewrite(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,final ConfigurationProperties props, final RouteContext routeContext, final ConnectionContext connectionContext) {SQLRewriteEntry sqlRewriteEntry = new SQLRewriteEntry(database, globalRuleMetaData, props);return sqlRewriteEntry.rewrite(queryContext.getSql(), queryContext.getParameters(), queryContext.getSqlStatementContext(), routeContext, connectionContext, queryContext.getHintValueContext());}

rewrite主要是通过SQLRewriteEntry的rewrite方法进行的

SQLRewriteEntry

shardingsphere-infra-rewrite-5.4.0-sources.jar!/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java

    /*** Rewrite.* * @param sql SQL* @param params SQL parameters* @param sqlStatementContext SQL statement context* @param routeContext route context* @param connectionContext connection context* @param hintValueContext hint value context* * @return route unit and SQL rewrite result map*/public SQLRewriteResult rewrite(final String sql, final List<Object> params, final SQLStatementContext sqlStatementContext,final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) {SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(sql, params, sqlStatementContext, routeContext, connectionContext, hintValueContext);SQLTranslatorRule rule = globalRuleMetaData.getSingleRule(SQLTranslatorRule.class);DatabaseType protocolType = database.getProtocolType();Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();return routeContext.getRouteUnits().isEmpty()? new GenericSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext): new RouteSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext, routeContext);}private SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> params, final SQLStatementContext sqlStatementContext,final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) {SQLRewriteContext result = new SQLRewriteContext(database.getName(), database.getSchemas(), sqlStatementContext, sql, params, connectionContext, hintValueContext);decorate(decorators, result, routeContext, hintValueContext);result.generateSQLTokens();return result;}private void decorate(final Map<ShardingSphereRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext,final RouteContext routeContext, final HintValueContext hintValueContext) {if (hintValueContext.isSkipSQLRewrite()) {return;}for (Entry<ShardingSphereRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {entry.getValue().decorate(entry.getKey(), props, sqlRewriteContext, routeContext);}}

SQLRewriteEntry的rewrite方法,先通过createSQLRewriteContext来创建SQLRewriteContext,这里通过decorate方法遍历decorators,挨个执行SQLRewriteContextDecorator的decorate方法;最后通过GenericSQLRewriteEngine或者RouteSQLRewriteEngine进行rewrite

SQLRewriteContextDecorator

org/apache/shardingsphere/infra/rewrite/context/SQLRewriteContextDecorator.java

@SingletonSPI
public interface SQLRewriteContextDecorator<T extends ShardingSphereRule> extends OrderedSPI<T> {/*** Decorate SQL rewrite context.** @param rule rule* @param props ShardingSphere properties* @param sqlRewriteContext SQL rewrite context to be decorated* @param routeContext route context*/void decorate(T rule, ConfigurationProperties props, SQLRewriteContext sqlRewriteContext, RouteContext routeContext);
}

SQLRewriteContextDecorator定义了decorate方法,它有诸如ShardingSQLRewriteContextDecorator、EncryptSQLRewriteContextDecorator的实现类

EncryptSQLRewriteContextDecorator

org/apache/shardingsphere/encrypt/rewrite/context/EncryptSQLRewriteContextDecorator.java

/*** SQL rewrite context decorator for encrypt.*/
public final class EncryptSQLRewriteContextDecorator implements SQLRewriteContextDecorator<EncryptRule> {@Overridepublic void decorate(final EncryptRule encryptRule, final ConfigurationProperties props, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext();if (!containsEncryptTable(encryptRule, sqlStatementContext)) {return;}Collection<EncryptCondition> encryptConditions = createEncryptConditions(encryptRule, sqlRewriteContext);if (!sqlRewriteContext.getParameters().isEmpty()) {Collection<ParameterRewriter> parameterRewriters = new EncryptParameterRewriterBuilder(encryptRule,sqlRewriteContext.getDatabaseName(), sqlRewriteContext.getSchemas(), sqlStatementContext, encryptConditions).getParameterRewriters();rewriteParameters(sqlRewriteContext, parameterRewriters);}Collection<SQLTokenGenerator> sqlTokenGenerators = new EncryptTokenGenerateBuilder(encryptRule,sqlStatementContext, encryptConditions, sqlRewriteContext.getDatabaseName()).getSQLTokenGenerators();sqlRewriteContext.addSQLTokenGenerators(sqlTokenGenerators);}private Collection<EncryptCondition> createEncryptConditions(final EncryptRule encryptRule, final SQLRewriteContext sqlRewriteContext) {SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext();if (!(sqlStatementContext instanceof WhereAvailable)) {return Collections.emptyList();}Collection<WhereSegment> whereSegments = ((WhereAvailable) sqlStatementContext).getWhereSegments();Collection<ColumnSegment> columnSegments = ((WhereAvailable) sqlStatementContext).getColumnSegments();return new EncryptConditionEngine(encryptRule, sqlRewriteContext.getSchemas()).createEncryptConditions(whereSegments, columnSegments, sqlStatementContext, sqlRewriteContext.getDatabaseName());}private boolean containsEncryptTable(final EncryptRule encryptRule, final SQLStatementContext sqlStatementContext) {for (String each : sqlStatementContext.getTablesContext().getTableNames()) {if (encryptRule.findEncryptTable(each).isPresent()) {return true;}}return false;}private void rewriteParameters(final SQLRewriteContext sqlRewriteContext, final Collection<ParameterRewriter> parameterRewriters) {for (ParameterRewriter each : parameterRewriters) {each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());}}@Overridepublic int getOrder() {return EncryptOrder.ORDER;}@Overridepublic Class<EncryptRule> getTypeClass() {return EncryptRule.class;}
}

rewriteParameters是通过ParameterRewriter进行rewrite,主要是修改ParameterBuilder;而具体sql语句的修改则通过sqlTokenGenerators进行

SQLToken

@RequiredArgsConstructor
@Getter
public abstract class SQLToken implements Comparable<SQLToken> {private final int startIndex;@Overridepublic final int compareTo(final SQLToken sqlToken) {return startIndex - sqlToken.startIndex;}
}

SQLToken它有诸如InsertValuesToken、SubstitutableColumnNameToken、InsertColumnsToken之类的实现类

RouteSQLRewriteEngine

    /*** Rewrite SQL and parameters.** @param sqlRewriteContext SQL rewrite context* @param routeContext route context* @return SQL rewrite result*/public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1F);for (Entry<String, Collection<RouteUnit>> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) {Collection<RouteUnit> routeUnits = entry.getValue();if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) {sqlRewriteUnits.put(routeUnits.iterator().next(), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));} else {addSQLRewriteUnits(sqlRewriteUnits, sqlRewriteContext, routeContext, routeUnits);}}return new RouteSQLRewriteResult(translate(sqlRewriteContext.getSqlStatementContext().getSqlStatement(), sqlRewriteUnits));}private void addSQLRewriteUnits(final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits, final SQLRewriteContext sqlRewriteContext,final RouteContext routeContext, final Collection<RouteUnit> routeUnits) {for (RouteUnit each : routeUnits) {sqlRewriteUnits.put(each, new SQLRewriteUnit(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeContext, each)));}}private Map<RouteUnit, SQLRewriteUnit> translate(final SQLStatement sqlStatement, final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits) {Map<RouteUnit, SQLRewriteUnit> result = new LinkedHashMap<>(sqlRewriteUnits.size(), 1F);for (Entry<RouteUnit, SQLRewriteUnit> entry : sqlRewriteUnits.entrySet()) {DatabaseType storageType = storageTypes.get(entry.getKey().getDataSourceMapper().getActualName());String sql = translatorRule.translate(entry.getValue().getSql(), sqlStatement, protocolType, storageType);SQLRewriteUnit sqlRewriteUnit = new SQLRewriteUnit(sql, entry.getValue().getParameters());result.put(entry.getKey(), sqlRewriteUnit);}return result;}

addSQLRewriteUnits是往sqlRewriteUnits添加SQLRewriteUnit,最后translate方法构建SQLRewriteUnit;SQLRewriteUnit包含了更改之后的sql以及对应改动后的参数

useDriverToExecuteUpdate

org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

    private int useDriverToExecuteUpdate() throws SQLException {ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();cacheStatements(executionGroupContext.getInputGroups());return executor.getRegularExecutor().executeUpdate(executionGroupContext,executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());}private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getDatabaseName()));} private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException {for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {each.getInputs().forEach(eachInput -> {statements.add((PreparedStatement) eachInput.getStorageResource());parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters());});}replay();}private void replay() throws SQLException {replaySetParameter();for (Statement each : statements) {getMethodInvocationRecorder().replay(each);}}private void replaySetParameter() throws SQLException {for (int i = 0; i < statements.size(); i++) {replaySetParameter(statements.get(i), parameterSets.get(i));}}protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> params) throws SQLException {setParameterMethodInvocations.clear();addParameters(params);for (PreparedStatementInvocationReplayer each : setParameterMethodInvocations) {each.replayOn(preparedStatement);}}private void addParameters(final List<Object> params) {int i = 0;for (Object each : params) {int index = ++i;setParameterMethodInvocations.add(preparedStatement -> preparedStatement.setObject(index, each));}}

useDriverToExecuteUpdate方法会执行createExecutionGroupContext(会执行prepare方法),cacheStatements这里主要是把eachInput.getStorageResource()真正的PrepareStatement赋值到ShardingSpherePreparedStatement的statements变量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()赋值到parameterSets,然后执行replay方法通过PreparedStatementInvocationReplayer把修改后的变量replay到真正的PrepareStatement
该方法委托给executor.getRegularExecutor().executeUpdate,最后一个参数为callback,即createExecuteUpdateCallback

DriverExecutionPrepareEngine.prepare

org/apache/shardingsphere/infra/executor/sql/prepare/AbstractExecutionPrepareEngine.java

    public final ExecutionGroupContext<T> prepare(final RouteContext routeContext, final Collection<ExecutionUnit> executionUnits,final ExecutionGroupReportContext reportContext) throws SQLException {return prepare(routeContext, Collections.emptyMap(), executionUnits, reportContext);}public final ExecutionGroupContext<T> prepare(final RouteContext routeContext, final Map<String, Integer> connectionOffsets, final Collection<ExecutionUnit> executionUnits,final ExecutionGroupReportContext reportContext) throws SQLException {Collection<ExecutionGroup<T>> result = new LinkedList<>();for (Entry<String, List<SQLUnit>> entry : aggregateSQLUnitGroups(executionUnits).entrySet()) {String dataSourceName = entry.getKey();List<SQLUnit> sqlUnits = entry.getValue();List<List<SQLUnit>> sqlUnitGroups = group(sqlUnits);ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;result.addAll(group(dataSourceName, connectionOffsets.getOrDefault(dataSourceName, 0), sqlUnitGroups, connectionMode));}return decorate(routeContext, result, reportContext);}protected List<ExecutionGroup<T>> group(final String dataSourceName, final int connectionOffset, final List<List<SQLUnit>> sqlUnitGroups, final ConnectionMode connectionMode) throws SQLException {List<ExecutionGroup<T>> result = new LinkedList<>();List<C> connections = databaseConnectionManager.getConnections(dataSourceName, connectionOffset, sqlUnitGroups.size(), connectionMode);int count = 0;for (List<SQLUnit> each : sqlUnitGroups) {result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode));}return result;}private ExecutionGroup<T> createExecutionGroup(final String dataSourceName, final List<SQLUnit> sqlUnits, final C connection, final ConnectionMode connectionMode) throws SQLException {List<T> result = new LinkedList<>();for (SQLUnit each : sqlUnits) {result.add((T) sqlExecutionUnitBuilder.build(new ExecutionUnit(dataSourceName, each), statementManager, connection, connectionMode, option, databaseTypes.get(dataSourceName)));}return new ExecutionGroup<>(result);}

group方法调用遍历SQLUnit执行createExecutionGroup,而后者则执行sqlExecutionUnitBuilder.build;这里databaseConnectionManager.getConnections获取的connection是通过真正driver获取的connection(com.mysql.jdbc.Driver)

PreparedStatementExecutionUnitBuilder

org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java

    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager,final Connection connection, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException {PreparedStatement preparedStatement = createPreparedStatement(executionUnit, statementManager, connection, connectionMode, option, databaseType);return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement);}private PreparedStatement createPreparedStatement(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, final Connection connection,final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException {return (PreparedStatement) statementManager.createStorageResource(executionUnit, connection, connectionMode, option, databaseType);}

PreparedStatementExecutionUnitBuilder的build方法这里才真正创建PreparedStatement

StatementManager

org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java

    public Statement createStorageResource(final ExecutionUnit executionUnit, final Connection connection, final ConnectionMode connectionMode, final StatementOption option,final DatabaseType databaseType) throws SQLException {Statement result = cachedStatements.get(new CacheKey(executionUnit, connectionMode));if (null == result || result.isClosed() || result.getConnection().isClosed()) {String sql = executionUnit.getSqlUnit().getSql();if (option.isReturnGeneratedKeys()) {result = null == option.getColumns() || 0 == option.getColumns().length? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS): connection.prepareStatement(sql, option.getColumns());} else {result = connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());}cachedStatements.put(new CacheKey(executionUnit, connectionMode), result);}return result;}

createStorageResource则是通过connection.prepareStatement来创建真正的PrepareStatement,而此时传入的sql也是经过重写之后的sql

createExecuteUpdateCallback

org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

    private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown) {@Overrideprotected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {return ((PreparedStatement) statement).executeUpdate();}@Overrideprotected Optional<Integer> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {return Optional.empty();}};}

createExecuteUpdateCallback创建的JDBCExecutorCallback,其executeSQL方法则是通过((PreparedStatement) statement).executeUpdate()来执行,即委托给了真正的PreparedStatement

小结

  • ShardingSphereConnection的prepareStatement创建的是ShardingSpherePreparedStatement,它在ShardingSpherePreparedStatement的executeUpdate的时候进行sql重写,然后prepare,最后执行的时候是通过JDBCExecutorCallback,其executeSQL方法则是通过((PreparedStatement) statement).executeUpdate()来执行,即委托给了真正的PreparedStatement
  • rewriteParameters是通过ParameterRewriter进行rewrite,主要是修改ParameterBuilder;而具体sql语句的修改则通过sqlTokenGenerators进行
  • PreparedStatementExecutionUnitBuilder的build方法这里才真正创建PreparedStatement:它通过StatementManager.createStorageResource则是通过connection.prepareStatement来创建真正的PrepareStatement,而此时传入的sql也是经过重写之后的sql
  • useDriverToExecuteUpdate方法会执行createExecutionGroupContext(会执行prepare方法),cacheStatements这里主要是把eachInput.getStorageResource()真正的PrepareStatement赋值到ShardingSpherePreparedStatement的statements变量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()赋值到parameterSets,然后执行replay方法通过PreparedStatementInvocationReplayer把修改后的变量replay到真正的PrepareStatement

ShardingSpherePreparedStatement实现了java.sql.PreparedStatement接口,其sql属性是用户传入的sql,即未经过重写的sql,而实际execute的时候,会触发sql重写(包括重写sql语句及参数),最后会通过connection.prepareStatement(传入重写之后的sql)来创建真正的PrepareStatement,然后有一步replay操作,把重写后的参数作用到真正的PrepareStatement,最后通过((PreparedStatement) statement).executeUpdate()来触发执行
至此我们可以得到sql重写的一个基本思路:通过实现java.sql.PreparedStatement接口伪装一个PreparedStatement类,其创建和set参数先内存缓存起来,之后在execute的时候进行sql重写,创建真正的PreparedStatement,replay参数,执行execute方法

相关文章:

聊聊ShardingSphere是怎么进行sql重写的

序 本文主要研究一下ShardingSphere进行sql重写的原理 prepareStatement org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java public final class ShardingSphereConnection extends AbstractConnectionAdapter {Overridepublic Prepar…...

软件设计模式系列之二——抽象工厂模式

1 抽象工厂模式的定义 抽象工厂模式是一种创建型设计模式&#xff0c;它提供了一种创建一组相关或相互依赖对象的方式&#xff0c;而无需指定它们的具体类。该模式以一组抽象接口为核心&#xff0c;包括抽象工厂接口和一组抽象产品接口&#xff0c;每个具体工厂类负责创建特定…...

P2719 搞笑世界杯 (期望dp

#include <bits/stdc.h> using namespace std; using VI vector<int>;double dp[2000][2000]; int n; //求dp[2][0] //dp[0][2] //期望dp要从终末态&#xff0c;向起始态转移 //dp[a][b] - > dp[a][b-1] or dp[a-1][b] //dp[a][b] 1/2 * dp[a][b1] 1/2 * dp…...

spring cloud新版本使用loadbalancer替代Ribbon

Nacos 2021 不再集成 Ribbon&#xff0c;建议使用spring cloud loadbalancer 引入 一、简单使用 引入依赖spring cloud loadbalancer <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-loadbalancer<…...

【Git-Exception】Git报错:fatal: unable to auto-detect email address

报错信息&#xff1a; *** Please tell me who you are. Run git config --global user.email “youexample.com” git config –global user.name “Your Name” to set your account’s default identity. Omit --global to set the identity only in this repository. fatal…...

JVM性能优化 —— 类加载器,手动实现类的热加载

一、类加载的机制的层次结构 每个编写的”.java”拓展名类文件都存储着需要执行的程序逻辑&#xff0c;这些”.java”文件经过Java编译器编译成拓展名为”.class”的文件&#xff0c;”.class”文件中保存着Java代码经转换后的虚拟机指令&#xff0c;当需要使用某个类时&#…...

SSH连接MobaXterm

IT常用软件的安装 ssh连接MobaXterm详细使用教程 数据库Navicat安装&#xff1a;https://www.jianshu.com/p/2494e02caf63 java SE安装 https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html windows安装pip https://www.cnblogs.com/NanShan2016/…...

本地虚机Jumpserver使用域名访问报错 使用IP+端口没有错误

背景&#xff1a; 我在本地Windows VMware 15的环境中部署了CentOS7.5&#xff0c;下载jumpserver-offline-installer-v2.28.1-amd64-138.tar.gz并安装部署。 需求&#xff1a; 1、能使用http:ip访问堡垒机。达成&#xff1b; 2、能使用http:域名访问堡垒机。达成&#xff…...

备战计算机二级公共基础知识(五)----数据库设计基础

数据库设计基础 目录 数据库设计基础 数据库的基本概念&#xff1a;数据库&#xff0c;数据库管理系统&#xff0c;数据库系统 数据模型&#xff0c;实体联系模型及 &#xff25;&#xff0d;&#xff32; 图&#xff0c;从 &#xff25;&#xff0d;&#xff32; 图导出关系…...

【excel密码】excel文件加密方法总结:

想要给Excel文件进行加密&#xff0c;方法有很多&#xff0c;今天分享三种Excel加密方法给大家。 打开密码 设置了打开密码的excel文件&#xff0c;打开文件就会提示输入密码才能打开excel文件&#xff0c;只有输入了正确的密码才能打开并且编辑文件&#xff0c;如果密码错误…...

MySQL之用户管理

用户 用户信息 MySQL中的用户&#xff0c;都存储在系统数据库mysql的user表中 ps&#xff1a; host&#xff1a; 表示这个用户可以从哪个主机登陆&#xff0c;如果是localhost&#xff0c;表示只能从本机登陆 user&#xff1a; 用户名 authentication_string&#xff1a; 用户…...

伪静态web.config常见规则写法与参数介绍说明

伪静态web.config常见规则写法与参数介绍说明. 示例1&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <configuration><system.webServer><rewrite><rules><rule name"规则 1" stopProcessing"tru…...

使用kubasz快速搭建Kubernetes集群

Kubernetes安装 Kubernetes 也称为 K8s,是用于自动部署、扩缩和管理容器化应用程序的开源系统。ansible是新出现的自动化运维工具,基于Python开发,集合了众多运维工具(puppet、chef、func、fabric)的优点,实现了批量系统配置、批量程序部署、批量运行命令等功能。使用ans…...

php常用加密算法大全aes、3des、rsa等

目录 一、可解密加解密算法 1、aes 加解密算法 2、旧3des加解密方法 3、新3des加解密方法 4、rsa公私钥加解密、签名验签方法 5、自定义加密算法1 6、自定义加密算法2 7、自定义加密算法3 二、不可解密加密算法 1、md5算法 2、crypt算法 3、sha1算法 5、hash 算…...

ubuntu22.04搭建verilator仿真环境

概述 操作系统为 Ubuntu(22.04.2 LTS)&#xff0c;本次安装verilator开源verilog仿真工具&#xff0c;进行RTL功能仿真。下面构建版本为5.008的verilator仿真环境。先看一下我系统的版本&#xff1a; 安装流程 安装依赖 sudo apt-get install git perl python3 make autoc…...

python中如何使用正则表达匹配\本身?(文末赠书)

点击上方“Python爬虫与数据挖掘”&#xff0c;进行关注 回复“书籍”即可获赠Python从入门到进阶共10本电子书 今 日 鸡 汤 将军向宠&#xff0c;性行淑均。 大家好&#xff0c;我是皮皮。 一、前言 前几天在Python钻石群【空】问了一个Python正则表达式的问题&#xff0c;一起…...

Linux学习之MySQL连接查询

接上一篇 连接查询 连接查询也中多表查询&#xff0c;常用于查询来自于多张表的数据&#xff0c;通过不同的连接方式把多张表组成一张新的临时表&#xff0c;再对临时表做数据处理。 #表基础信息&#xff0c;内容可从上一篇博客中查看 mysql> desc departments; ---------…...

【Hello Algorithm】二叉树相关算法

本篇博客介绍&#xff1a;介绍二叉树的相关算法 二叉树相关算法 二叉树结构遍历二叉树递归序二叉树的交集非递归方式实现二叉树遍历二叉树的层序遍历 二叉树难题二叉树的序列化和反序列化lc431求二叉树最宽的层二叉树的后继节点谷歌面试题 二叉树结构 如果对于二叉树的结构还有…...

ExpressLRS开源代码之工程结构

ExpressLRS开源代码之工程结构 1. 源由2. 工程3. 开发环境安装4. pio命令5. ExpressLRS配置6. 硬件认证过程7. 参考资料 1. 源由 ExpressLRS开源代码基于Arduino框架设计&#xff0c;在所支持的硬件环境下&#xff0c;提供900/2400发射机和接收机硬件方案。 该设计提供了一个…...

fastjson 1.2.24 反序列化导致任意命令执行漏洞复现

拉取docker容器 访问并抓包 修改为POST 方式&#xff0c;文件类型改为json格式&#xff0c;发送json数据包&#xff0c;发送成功 这里安装一个bp的插件 使用安装的插件 可以看到&#xff0c;插件告诉我们这里有漏洞&#xff0c;并且提供了POC 既然我们发现有 rmi &#xff0c;…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别

【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而&#xff0c;传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案&#xff0c;能够实现大范围覆盖并远程采集数据。尽管具备这些优势&#xf…...

Python Einops库:深度学习中的张量操作革命

Einops&#xff08;爱因斯坦操作库&#xff09;就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库&#xff0c;用类似自然语言的表达式替代了晦涩的API调用&#xff0c;彻底改变了深度学习工程…...