elasticsearch源码分析-08Serch查询流程
Serch查询流程
查询请求Rest路由注册也是在actionModule中
//查询操作
registerHandler.accept(new RestSearchAction());@Override
public List<Route> routes() {return unmodifiableList(asList(new Route(GET, "/_search"),new Route(POST, "/_search"),new Route(GET, "/{index}/_search"),new Route(POST, "/{index}/_search"),// Deprecated typed endpoints.new Route(GET, "/{index}/{type}/_search"),new Route(POST, "/{index}/{type}/_search")));
}
和Get查询一样,请求会经过转发dispatchRequest,查询路由对应的handler处理类,然后执行处理
@Overridepublic void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {//小图标请求处理if (request.rawPath().equals("/favicon.ico")) {handleFavicon(request.method(), request.uri(), channel);return;}try {//处理rest请求tryAllHandlers(request, channel, threadContext);} catch (Exception e) {try {channel.sendResponse(new BytesRestResponse(channel, e));} catch (Exception inner) {inner.addSuppressed(e);logger.error(() ->new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);}}}
调用BaseRestHandler的handleRequest方法
@Overridepublic final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {// prepare the request for execution; has the side effect of touching the request parameters//处理请求final RestChannelConsumer action = prepareRequest(request, client);// validate unconsumed params, but we must exclude params used to format the response// use a sorted set so the unconsumed parameters appear in a reliable sorted orderfinal SortedSet<String> unconsumedParams =request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));// validate the non-response paramsif (!unconsumedParams.isEmpty()) {final Set<String> candidateParams = new HashSet<>();candidateParams.addAll(request.consumedParams());candidateParams.addAll(responseParams());throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));}if (request.hasContent() && request.isContentConsumed() == false) {throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");}usageCount.increment();// execute the actionaction.accept(channel);}
首先构造请求request对象,RestSearchAction对该方法进行了重写
@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {//创建查询请求对象SearchRequest searchRequest = new SearchRequest();IntConsumer setSize = size -> searchRequest.source().size(size);request.withContentOrSourceParamParserOrNull(parser ->parseSearchRequest(searchRequest, request, parser, setSize));return channel -> {RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));};}
首先根据请求的request解析成查询请求
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,XContentParser requestContentParser,IntConsumer setSize) throws IOException {//查询源包含哪些字段,为空查询所有字段if (searchRequest.source() == null) {searchRequest.source(new SearchSourceBuilder());}//查询索引searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));if (requestContentParser != null) {searchRequest.source().parseXContent(requestContentParser, true);}//协调节点的保护,从每个shard获取的数据最大数据量final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());searchRequest.setBatchedReduceSize(batchedReduceSize);//预过滤分片数量if (request.hasParam("pre_filter_shard_size")) {searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE));}//分片并发查询数量if (request.hasParam("max_concurrent_shard_requests")) {// only set if we have the parameter since we auto adjust the max concurrency on the coordinator// based on the number of nodes in the clusterfinal int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests",searchRequest.getMaxConcurrentShardRequests());searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);}//允许部分搜索结果if (request.hasParam("allow_partial_search_results")) {// only set if we have the parameter passed to override the cluster-level defaultsearchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));}// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types// from the REST layer. these modes are an internal optimization and should// not be specified explicitly by the user.//搜索类型,默认query_then_fetchString searchType = request.param("search_type");//下面两种查询已经被废弃if ("query_and_fetch".equals(searchType) ||"dfs_query_and_fetch".equals(searchType)) {throw new IllegalArgumentException("Unsupported search type [" + searchType + "]");} else {searchRequest.searchType(searchType);}parseSearchSource(searchRequest.source(), request, setSize);//查询缓存searchRequest.requestCache(request.paramAsBoolean("request_cache", searchRequest.requestCache()));//滚动查询String scroll = request.param("scroll");if (scroll != null) {searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll")));}//type字段已废弃if (request.hasParam("type")) {deprecationLogger.deprecatedAndMaybeLog("search_with_types", TYPES_DEPRECATION_MESSAGE);searchRequest.types(Strings.splitStringByCommaToArray(request.param("type")));}searchRequest.routing(request.param("routing"));searchRequest.preference(request.param("preference"));searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));checkRestTotalHits(request, searchRequest);}private static void parseSearchSource(final SearchSourceBuilder searchSourceBuilder, RestRequest request, IntConsumer setSize) {QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);if (queryBuilder != null) {searchSourceBuilder.query(queryBuilder);}//分页查询int from = request.paramAsInt("from", -1);if (from != -1) {searchSourceBuilder.from(from);}int size = request.paramAsInt("size", -1);if (size != -1) {setSize.accept(size);}//特定文档如何与查询匹配的详细信息和解释if (request.hasParam("explain")) {searchSourceBuilder.explain(request.paramAsBoolean("explain", null));}//带版本号查询if (request.hasParam("version")) {searchSourceBuilder.version(request.paramAsBoolean("version", null));}//是否返回seqNo和primaryTermif (request.hasParam("seq_no_primary_term")) {searchSourceBuilder.seqNoAndPrimaryTerm(request.paramAsBoolean("seq_no_primary_term", null));}//超时时间if (request.hasParam("timeout")) {searchSourceBuilder.timeout(request.paramAsTime("timeout", null));}//收集后终止搜索if (request.hasParam("terminate_after")) {int terminateAfter = request.paramAsInt("terminate_after",SearchContext.DEFAULT_TERMINATE_AFTER);if (terminateAfter < 0) {throw new IllegalArgumentException("terminateAfter must be > 0");} else if (terminateAfter > 0) {searchSourceBuilder.terminateAfter(terminateAfter);}}//指示应如何获取存储的字段StoredFieldsContext storedFieldsContext =StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request);if (storedFieldsContext != null) {searchSourceBuilder.storedFields(storedFieldsContext);}//从doc_value字段查询String sDocValueFields = request.param("docvalue_fields");if (sDocValueFields != null) {if (Strings.hasText(sDocValueFields)) {String[] sFields = Strings.splitStringByCommaToArray(sDocValueFields);for (String field : sFields) {searchSourceBuilder.docValueField(field, null);}}}//从_source获取结果FetchSourceContext fetchSourceContext = FetchSourceContext.parseFromRestRequest(request);if (fetchSourceContext != null) {searchSourceBuilder.fetchSource(fetchSourceContext);}//在排序时应用,并控制是否也会跟踪分数if (request.hasParam("track_scores")) {searchSourceBuilder.trackScores(request.paramAsBoolean("track_scores", false));}//如果值未设置,则返回应跟踪的总命中数或 nullif (request.hasParam("track_total_hits")) {if (Booleans.isBoolean(request.param("track_total_hits"))) {searchSourceBuilder.trackTotalHits(request.paramAsBoolean("track_total_hits", true));} else {searchSourceBuilder.trackTotalHitsUpTo(request.paramAsInt("track_total_hits", SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO));}}//排序字段String sSorts = request.param("sort");if (sSorts != null) {String[] sorts = Strings.splitStringByCommaToArray(sSorts);for (String sort : sorts) {int delimiter = sort.lastIndexOf(":");if (delimiter != -1) {String sortField = sort.substring(0, delimiter);String reverse = sort.substring(delimiter + 1);if ("asc".equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.ASC);} else if ("desc".equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.DESC);}} else {searchSourceBuilder.sort(sort);}}}//此请求将汇总的统计信息组String sStats = request.param("stats");if (sStats != null) {searchSourceBuilder.stats(Arrays.asList(Strings.splitStringByCommaToArray(sStats)));}//推荐词String suggestField = request.param("suggest_field");if (suggestField != null) {String suggestText = request.param("suggest_text", request.param("q"));int suggestSize = request.paramAsInt("suggest_size", 5);String suggestMode = request.param("suggest_mode");searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(suggestField,termSuggestion(suggestField).text(suggestText).size(suggestSize).suggestMode(SuggestMode.resolve(suggestMode))));}}
创建rest client开始发送请求
return channel -> {RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));};
执行execute方法
public < Request extends ActionRequest,Response extends ActionResponse> Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {//获取transportAction开始执行return transportAction(action).execute(request, listener);}
最终调用到TransportSearchAction的doExecute方法
search操作会发送请求到索引下的所有分片,相同分片只会发送到主分片或副本分片
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {//服务启动相对时间final long relativeStartNanos = System.nanoTime();final SearchTimeProvider timeProvider =new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {if (source != searchRequest.source()) {// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch// situations when source is rewritten to null due to a bugsearchRequest.source(source);}//对其他集群的索引和当前集群的索引分组,合并本集群shard列表和远程集群的shardfinal ClusterState clusterState = clusterService.state();final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);//执行本集群搜索if (remoteClusterIndices.isEmpty()) {executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener);} else {//最小化查询次数if (shouldMinimizeRoundtrips(searchRequest)) {ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider,searchService.aggReduceContextBuilder(searchRequest),remoteClusterService, threadPool, listener,(r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));} else {AtomicInteger skippedClusters = new AtomicInteger(0);//收集所有需要查询的分片collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,ActionListener.wrap(//查询分片返回的结果searchShardsResponses -> {List<SearchShardIterator> remoteShardIterators = new ArrayList<>();Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);//查询集群数int localClusters = localIndices == null ? 0 : 1;int totalClusters = remoteClusterIndices.size() + localClusters;int successfulClusters = searchShardsResponses.size() + localClusters;//执行查询executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));},listener::onFailure));}}}, listener::onFailure);if (searchRequest.source() == null) {rewriteListener.onResponse(searchRequest.source());} else {Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),rewriteListener);}}
这里收集本集群和跨集群及其他集群需要查询的索引,这里我们主要是分析本集群搜索
ES中查询主要包括两个方式:
- QUERY_THEN_FETCH:默认搜索方式, 先向所有的 shard 发出请求, 各分片只返回文档 id和排
名相关的信息及文档的打分然后按照各分片返回的文档的分数进行重新排序和排名, 取前
size 个文档。然后根据文档 id 去相关的 shard 取 document。 数据量是准确的,数据排名不准确 - DFS_QUERY_THEN_FETCH:在上面的查询方式之前多个一个DFS的过程,也就是在进行查询之前, 先对所有分片发送请求, 把所有分片中的词频和文档频率等打分依据全部汇总到一块, 再执行后面的操作,数据排名准确,但是性能比上面性能要差
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,SearchResponse.Clusters clusters) {clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead// of just for the _search apifinal Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),searchRequest.indices());routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);String[] concreteIndices = new String[indices.length];for (int i = 0; i < indices.length; i++) {concreteIndices[i] = indices[i].getName();}//每个节点查询次数Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);//合并需要查询的shard分片GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,searchRequest.getLocalClusterAlias(), remoteShardIterators);//检查需要查询分片数量是否超过阈值failIfOverShardCountLimit(clusterService, shardIterators.size());Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);// optimize search type for cases where there is only one shard group to search on//针对只有一个分片组要搜索的情况优化搜索类型if (shardIterators.size() == 1) {// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shardsearchRequest.searchType(QUERY_THEN_FETCH);}if (searchRequest.allowPartialSearchResults() == null) {// No user preference defined in search request - apply cluster service default//允许部分搜索结果返回searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());}if (searchRequest.isSuggestOnly()) {// disable request cache if we have only suggest// 如果我们只有建议,则禁用请求缓存searchRequest.requestCache(false);switch (searchRequest.searchType()) {case DFS_QUERY_THEN_FETCH:// convert to Q_T_F if we have only suggestsearchRequest.searchType(QUERY_THEN_FETCH);break;}}final DiscoveryNodes nodes = clusterState.nodes();BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),nodes::get, remoteConnections, searchTransportService::getConnection);//提前过滤分片boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, indices, shardIterators.size());//异步查询searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState,Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();}
根据两个搜索方式创建不同的action
AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction;switch (searchRequest.searchType()) {case DFS_QUERY_THEN_FETCH:searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,shardIterators, timeProvider, clusterState, task, clusters);break;case QUERY_THEN_FETCH:searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, /*查询后的回调*/listener,shardIterators, timeProvider, clusterState, task, clusters);break;default:throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");}return searchAsyncAction;
创建action后开始执行
public final void start() {if (getNumShards() == 0) {//no search shards to search on, bail with empty response//(it happens with search across _all with no indices around and consistent with broadcast operations)int trackTotalHitsUpTo = request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :request.source().trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :request.source().trackTotalHitsUpTo();// total hits is null in the response if the tracking of total hits is disabledboolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED;listener.onResponse(new SearchResponse(InternalSearchResponse.empty(withTotalHits), null, 0, 0, 0, buildTookInMillis(),ShardSearchFailure.EMPTY_ARRAY, clusters));return;}//执行搜索executePhase(this);
}private void executePhase(SearchPhase phase) {try {phase.run();} catch (Exception e) {if (logger.isDebugEnabled()) {logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);}onPhaseFailure(phase, "", e);}
}
遍历所有分片发送请求,并设置回调函数接收分片返回结果
public final void run() {for (final SearchShardIterator iterator : toSkipShardsIts) {assert iterator.skip();skipShard(iterator);}if (shardsIts.size() > 0) {...for (int index = 0; index < shardsIts.size(); index++) {//分片路由信息final SearchShardIterator shardRoutings = shardsIts.get(index);assert shardRoutings.skip() == false;//分片执行performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());}}}private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {if (shard == null) {fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));} else {//节流并发请求final PendingExecutions pendingExecutions = throttleConcurrentRequests ?pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)): null;Runnable r = () -> {final Thread thread = Thread.currentThread();try {//执行查询executePhaseOnShard(shardIt, shard,new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {@Overridepublic void innerOnResponse(Result result) {try {//分片返回结果onShardResult(result, shardIt);} finally {executeNext(pendingExecutions, thread);}}@Overridepublic void onFailure(Exception t) {try {onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);} finally {executeNext(pendingExecutions, thread);}}});} catch (final Exception e) {try { fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));} finally {executeNext(pendingExecutions, thread);}}};//是否限流if (throttleConcurrentRequests) {pendingExecutions.tryRun(r);} else {r.run();}}}
默认为QUERY_THEN_FETCH方式,调用SearchQueryThenFetchAsyncAction的executePhaseOnShard方法
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,final SearchActionListener<SearchPhaseResult> listener) {//封装请求ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),request, getTask(), listener);}
获取连接然后发送action为indices:data/read/search[phase/query]的RPC请求
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchRequest request, SearchTask task,final SearchActionListener<SearchPhaseResult> listener) {// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request// this used to be the QUERY_AND_FETCH which doesn't exist anymore.final boolean fetchDocuments = request.numberOfShards() == 1;Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;final ActionListener handler = responseWrapper.apply(connection, listener);//发送查询transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId()));}
- 数据节点
数据节点对应的RPC注册的处理handler
//注册分片查询处理transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,(request, channel, task) -> {//执行查询searchService.executeQueryPhase(request, (SearchShardTask) task,/*查询后回调*/new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request));});
数据节点接收RPC请求执行query阶段
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1: "empty responses require more than one shard";//索引服务IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//索引分片IndexShard shard = indexService.getShard(request.shardId().id());rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {@Overridepublic void onResponse(ShardSearchRequest orig) {//如果没有查询到doc可以返回nullif (orig.canReturnNullResponseIfMatchNoDocs()) { ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig);try (Engine.Searcher searcher = shard.acquireCanMatchSearcher()) {QueryShardContext context = indexService.newQueryShardContext(canMatchRequest.shardId().id(), searcher,canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias());Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true);} catch (Exception exc) {listener.onFailure(exc);return;}if (canRewriteToMatchNone(canMatchRequest.source())&& canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) {assert canMatchRequest.scroll() == null : "must always create search context for scroll requests";listener.onResponse(QuerySearchResult.nullInstance());return;}}// fork the execution in the search thread pool//在搜索线程池中执行runAsync(shard, () -> executeQueryPhase(orig, task), listener);}@Overridepublic void onFailure(Exception exc) {listener.onFailure(exc);}});}
执行search操作
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {final SearchContext context = createAndPutContext(request, task);context.incRef();try {final long afterQueryTime;try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {contextProcessing(context);//执行查询阶段loadOrExecuteQueryPhase(request, context);if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {freeContext(context.id());} else {contextProcessedSuccessfully(context);}afterQueryTime = executor.success();}//如果只有一个分片,执行fetch阶段if (request.numberOfShards() == 1) {return executeFetchPhase(context, afterQueryTime);}return context.queryResult();} catch (Exception e) {// execution exception can happen while loading the cache, strip itif (e instanceof ExecutionException) {e = (e.getCause() == null || e.getCause() instanceof Exception) ?(Exception) e.getCause() : new ElasticsearchException(e.getCause());}logger.trace("Query phase failed", e);processFailure(context, e);throw e;} finally {cleanContext(context);}}
创建查询上下文对象SearchContext跟踪整个search流程,并且查询后的结果也会填充到SearchContext中
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {final boolean canCache = indicesService.canCache(request, context);context.getQueryShardContext().freezeContext();if (canCache) {indicesService.loadIntoContext(request, context, queryPhase);} else {//执行查询queryPhase.execute(context);}}public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {if (searchContext.hasOnlySuggest()) {suggestPhase.execute(searchContext);searchContext.queryResult().topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),new DocValueFormat[0]);return;}if (LOGGER.isTraceEnabled()) {LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));}//尽可能晚地预处理聚合。在 DFS_Q_T_F 的情况下// 请求,preProcess 在 DFS 阶段调用,这就是我们预处理它们的原因//这里是为了确保它发生在QUERY阶段aggregationPhase.preProcess(searchContext);//lucene搜索boolean rescore = executeInternal(searchContext);if (rescore) { // only if we do a regular search//重新计算打分rescorePhase.execute(searchContext);}//纠错suggestPhase.execute(searchContext);//聚合aggregationPhase.execute(searchContext);if (searchContext.getProfilers() != null) {ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());searchContext.queryResult().profileResults(shardResults);}}
这里可以看到首先会执行lucene的查询,然后对查询结果进行打分、执行聚合逻辑,然后生成SearchPhaseResult返回
数据节点查询数据后返回协调节点,我们继续回到executePhaseOnShard方法执行回调
//执行查询
executePhaseOnShard(shardIt, shard,new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {@Overridepublic void innerOnResponse(Result result) {try {//分片返回结果onShardResult(result, shardIt);} finally {executeNext(pendingExecutions, thread);}}@Overridepublic void onFailure(Exception t) {try {onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);} finally {executeNext(pendingExecutions, thread);}}});
保存分片查询返回的数据到results数组中并进行计数
protected void onShardResult(Result result, SearchShardIterator shardIt) {assert result.getShardIndex() != -1 : "shard index is not set";assert result.getSearchShardTarget() != null : "search shard target must not be null";successfulOps.incrementAndGet();results.consumeResult(result);hasShardResponse.set(true);if (logger.isTraceEnabled()) {logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);}AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();if (shardFailures != null) {shardFailures.set(result.getShardIndex(), null);}successfulShardExecution(shardIt);}
判断所有分片查询是否都已经返回,如果都已经返回则执行下一阶段及fetch阶段
private void successfulShardExecution(SearchShardIterator shardsIt) {final int remainingOpsOnIterator;if (shardsIt.skip()) {remainingOpsOnIterator = shardsIt.remaining();} else {remainingOpsOnIterator = shardsIt.remaining() + 1;}final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);//所有分片都已经返回if (xTotalOps == expectedTotalOps) {onPhaseDone();} else if (xTotalOps > expectedTotalOps) {throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["+ expectedTotalOps + "]");}}final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()executeNextPhase(this, getNextPhase(results, this));}@Overrideprotected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {//获取数据阶段return new FetchSearchPhase(results, searchPhaseController, context, clusterState());}FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,SearchPhaseController searchPhaseController,SearchPhaseContext context,ClusterState clusterState) {this(resultConsumer, searchPhaseController, context, clusterState,//最后收尾阶段(response, scrollId) -> new ExpandSearchPhase(context, response, scrollId));}
- fetch阶段
@Overridepublic void run() {context.execute(new AbstractRunnable() {@Overrideprotected void doRun() throws Exception {// we do the heavy lifting in this inner run method where we reduce aggs etc. that's why we fork this phase// off immediately instead of forking when we send back the response to the user since there we only need// to merge together the fetched results which is a linear operation.innerRun();}@Overridepublic void onFailure(Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}});}private void innerRun() throws IOException {//查询分片数量final int numShards = context.getNumShards();//是否是滚动查询final boolean isScrollSearch = context.getRequest().scroll() != null;//search阶段返回结果final List<SearchPhaseResult> phaseResults = queryResults.asList();final String scrollId;//如果是滚动查询则生成scrollIdif (isScrollSearch) {final boolean includeContextUUID = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_7_7_0);scrollId = TransportSearchHelper.buildScrollId(queryResults, includeContextUUID);} else {scrollId = null;}final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();final boolean queryAndFetchOptimization = queryResults.length() == 1;//fetch结束后执行任务final Runnable finishPhase = ()-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?queryResults : fetchResults);if (queryAndFetchOptimization) {assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()+ "], single result: " + phaseResults.get(0).fetchResult();// query AND fetch optimizationfinishPhase.run();} else {ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);// no docs to fetch -- sidestep everything and return//无文档执行fetchif (scoreDocs.length == 0) {// we have to release contexts here to free up resourcesphaseResults.stream().map(SearchPhaseResult::queryResult).forEach(this::releaseIrrelevantSearchContext);finishPhase.run();} else {final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards): null;final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or notfinishPhase, context);for (int i = 0; i < docIdsToLoad.length; i++) {IntArrayList entry = docIdsToLoad[i];SearchPhaseResult queryResult = queryResults.get(i);if (entry == null) { // no results for this shard IDif (queryResult != null) {// if we got some hits from this shard we have to release the context there// we do this as we go since it will free up resources and passing on the request on the// transport layer is cheap.releaseIrrelevantSearchContext(queryResult.queryResult());progressListener.notifyFetchResult(i);}// in any case we count down this result since we don't talk to this shard anymorecounter.countDown();} else {SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),searchShardTarget.getNodeId());//创建fetch请求ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getContextId(), i, entry,lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());//执行fetchexecuteFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),connection);}}}}}
创建fetch请求,执行fetch
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,final CountedCollector<FetchSearchResult> counter,final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,final Transport.Connection connection) {//发送fetch请求context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {@Overridepublic void innerOnResponse(FetchSearchResult result) {try {progressListener.notifyFetchResult(shardIndex);counter.onResult(result);} catch (Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}}@Overridepublic void onFailure(Exception e) {try {logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.contextId()), e);progressListener.notifyFetchFailure(shardIndex, shardTarget, e);counter.onFailure(shardIndex, shardTarget, e);} finally {releaseIrrelevantSearchContext(querySearchResult);}}});}
发送RPC为indices:data/read/search[phase/fetch/id]的fetch请求
public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,final SearchActionListener<FetchSearchResult> listener) {sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener);}
数据节点注册的handler
//fetch请求transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,(request, channel, task) -> {//执行请求searchService.executeFetchPhase(request, (SearchShardTask) task,new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request));});
- 数据节点执行fetch阶段
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {runAsync(request.contextId(), () -> {final SearchContext context = findContext(request.contextId(), request);context.incRef();try {context.setTask(task);contextProcessing(context);if (request.lastEmittedDoc() != null) {context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();}context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {//fetch查询fetchPhase.execute(context);if (fetchPhaseShouldFreeContext(context)) {freeContext(request.contextId());} else {contextProcessedSuccessfully(context);}executor.success();}//返回结果return context.fetchResult();} catch (Exception e) {logger.trace("Fetch phase failed", e);processFailure(context, e);throw e;} finally {cleanContext(context);}}, listener);}
遍历文档id集合,查询详细信息
@Overridepublic void execute(SearchContext context) {if (LOGGER.isTraceEnabled()) {LOGGER.trace("{}", new SearchContextSourcePrinter(context));}final FieldsVisitor fieldsVisitor;Map<String, Set<String>> storedToRequestedFields = new HashMap<>();StoredFieldsContext storedFieldsContext = context.storedFieldsContext();if (storedFieldsContext == null) {// no fields specified, default to return source if no explicit indicationif (!context.hasScriptFields() && !context.hasFetchSourceContext()) {context.fetchSourceContext(new FetchSourceContext(true));}fieldsVisitor = new FieldsVisitor(context.sourceRequested());} else if (storedFieldsContext.fetchFields() == false) {// disable stored fields entirelyfieldsVisitor = null;} else {for (String fieldNameOrPattern : context.storedFieldsContext().fieldNames()) {if (fieldNameOrPattern.equals(SourceFieldMapper.NAME)) {FetchSourceContext fetchSourceContext = context.hasFetchSourceContext() ? context.fetchSourceContext(): FetchSourceContext.FETCH_SOURCE;context.fetchSourceContext(new FetchSourceContext(true, fetchSourceContext.includes(), fetchSourceContext.excludes()));continue;}Collection<String> fieldNames = context.mapperService().simpleMatchToFullName(fieldNameOrPattern);for (String fieldName : fieldNames) {MappedFieldType fieldType = context.smartNameFieldType(fieldName);if (fieldType == null) {// Only fail if we know it is a object field, missing paths / fields shouldn't fail.if (context.getObjectMapper(fieldName) != null) {throw new IllegalArgumentException("field [" + fieldName + "] isn't a leaf field");}} else {String storedField = fieldType.name();Set<String> requestedFields = storedToRequestedFields.computeIfAbsent(storedField, key -> new HashSet<>());requestedFields.add(fieldName);}}}boolean loadSource = context.sourceRequested();if (storedToRequestedFields.isEmpty()) {// empty list specified, default to disable _source if no explicit indicationfieldsVisitor = new FieldsVisitor(loadSource);} else {fieldsVisitor = new CustomFieldsVisitor(storedToRequestedFields.keySet(), loadSource);}}try {SearchHit[] hits = new SearchHit[context.docIdsToLoadSize()];FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext();for (int index = 0; index < context.docIdsToLoadSize(); index++) {if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}//文档idint docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());LeafReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);int subDocId = docId - subReaderContext.docBase;final SearchHit searchHit;int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);if (rootDocId != -1) {searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId,storedToRequestedFields, subReaderContext);} else {searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId,storedToRequestedFields, subReaderContext);}hits[index] = searchHit;hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());//执行各种元数据获取for (FetchSubPhase fetchSubPhase : fetchSubPhases) {fetchSubPhase.hitExecute(context, hitContext);}}if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}//执行各种元数据获取for (FetchSubPhase fetchSubPhase : fetchSubPhases) {fetchSubPhase.hitsExecute(context, hits);if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}}TotalHits totalHits = context.queryResult().getTotalHits();context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));} catch (IOException e) {throw ExceptionsHelper.convertToElastic(e);}}private SearchHit createSearchHit(SearchContext context,FieldsVisitor fieldsVisitor,int docId,int subDocId,Map<String, Set<String>> storedToRequestedFields,LeafReaderContext subReaderContext) {DocumentMapper documentMapper = context.mapperService().documentMapper();Text typeText = documentMapper.typeText();if (fieldsVisitor == null) {return new SearchHit(docId, null, typeText, null, null);}//查询lucene保存的字段Map<String, DocumentField> searchFields = getSearchFields(context, fieldsVisitor, subDocId,storedToRequestedFields, subReaderContext);Map<String, DocumentField> metaFields = new HashMap<>();Map<String, DocumentField> documentFields = new HashMap<>();SearchHit.splitFieldsByMetadata(searchFields, documentFields, metaFields);SearchHit searchHit = new SearchHit(docId, fieldsVisitor.uid().id(), typeText, documentFields, metaFields);// Set _source if requested.SourceLookup sourceLookup = context.lookup().source();sourceLookup.setSegmentAndDocument(subReaderContext, subDocId);if (fieldsVisitor.source() != null) {sourceLookup.setSource(fieldsVisitor.source());}return searchHit;}
数据节点fetch数据后返回,协调节点。继续回到executeFetch方法的回调函数
@Override
public void innerOnResponse(FetchSearchResult result) {try {progressListener.notifyFetchResult(shardIndex);counter.onResult(result);} catch (Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}
}
将返回的数据填充到fetchResults中,最后执行finish阶段
final Runnable finishPhase = ()-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?queryResults : fetchResults);private void moveToNextPhase(SearchPhaseController searchPhaseController,String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));}
执行ExpandSearchPhase的run方法
@Overridepublic void run() {if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {SearchRequest searchRequest = context.getRequest();CollapseBuilder collapseBuilder = searchRequest.source().collapse();final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();MultiSearchRequest multiRequest = new MultiSearchRequest();if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());}for (SearchHit hit : searchResponse.hits().getHits()) {BoolQueryBuilder groupQuery = new BoolQueryBuilder();Object collapseValue = hit.field(collapseBuilder.getField()).getValue();if (collapseValue != null) {groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));} else {groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));}QueryBuilder origQuery = searchRequest.source().query();if (origQuery != null) {groupQuery.must(origQuery);}for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder).query(groupQuery).postFilter(searchRequest.source().postFilter());SearchRequest groupRequest = new SearchRequest(searchRequest);groupRequest.source(sourceBuilder);multiRequest.add(groupRequest);}}context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),ActionListener.wrap(response -> {Iterator<MultiSearchResponse.Item> it = response.iterator();for (SearchHit hit : searchResponse.hits.getHits()) {for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {MultiSearchResponse.Item item = it.next();if (item.isFailure()) {context.onPhaseFailure(this, "failed to expand hits", item.getFailure());return;}SearchHits innerHits = item.getResponse().getHits();if (hit.getInnerHits() == null) {hit.setInnerHits(new HashMap<>(innerHitBuilders.size()));}hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);}}context.sendSearchResponse(searchResponse, scrollId);}, context::onFailure));} else {//返回查询结果context.sendSearchResponse(searchResponse, scrollId);}}
返回查询结果
@Overridepublic void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {ShardSearchFailure[] failures = buildShardFailures();Boolean allowPartialResults = request.allowPartialSearchResults();assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";if (allowPartialResults == false && failures.length > 0){raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));} else {listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId, failures));}}
这里的listener就是new RestStatusToXContentListener<>(channel)
@Overridepublic RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {assert response.isFragment() == false; //would be nice if we could make default methods finalresponse.toXContent(builder, channel.request());RestResponse restResponse = new BytesRestResponse(response.status(), builder);if (RestStatus.CREATED == restResponse.status()) {final String location = extractLocation.apply(response);if (location != null) {restResponse.addHeader("Location", location);}}return restResponse;}@Overridepublic final RestResponse buildResponse(Response response) throws Exception {return buildResponse(response, channel.newBuilder());}protected final void processResponse(Response response) throws Exception {channel.sendResponse(buildResponse(response));}
最后通过channel将结果返回客户端
相关文章:
elasticsearch源码分析-08Serch查询流程
Serch查询流程 查询请求Rest路由注册也是在actionModule中 //查询操作 registerHandler.accept(new RestSearchAction());Override public List<Route> routes() {return unmodifiableList(asList(new Route(GET, "/_search"),new Route(POST, "/_searc…...
【协作提效 Go - gin ! swagger】
什么是swagger Swagger 是一个用于设计、构建、记录和使用 RESTful Web 服务的工具集。它的主要作用包括: API 文档生成:Swagger 可以自动生成详细的 API 文档,包括每个端点的请求和响应格式、参数、状态码等。这使得开发者和用户可以轻松理…...
栈和队列——3.滑动窗口最大值
力扣题目链接 给定一个数组 nums,有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值。 示例: 输入:nums[1,3,-1,-3,5,3,6,7],k 3 …...
嵌入式智能手表开发系列文章之开篇
不好意思,朋友们,我回来了。想想已经断更了好久了。在这段断更的日子里。开拓了个新领域,不搞android 产品,而是去搞嵌入式智能手表啦。 接下来我会用几篇文章来介绍下我对这个领域的看法体会,以及我自己所负责领域的…...
24.8.2数据结构|双链表
双链表 1、定义结构:2个指针域、数据域 2、初始化:创建一个含有N个结点的带头结点双链表head (双链表头结点的前驱与和尾节点的后继与置为空) 3、求表长:返回双链表head的长度 4、取元素:取出双链表head中…...
RabbitMQ高级特性 - 事务消息
文章目录 RabbitMQ 事务消息概述实现原理代码实现不采用事务采用事务 RabbitMQ 事务消息 概述 RabbitMQ 的 AMQP 协议实现了事务机制,允许开发者保证消息的发送和接收时原子性的,也就是说,要么消息全都发送成功,要么全都发送失败…...
leetcode:心算挑战
题目: 心算项目的挑战比赛中,要求选手从N张卡牌中选出cnt张卡牌,若这cnt张卡牌数字总和为偶数,则选手成绩「有效」且得分为cnt张卡牌数字总和。给定数组cards和cnt,其中cards[i]表示第i张卡牌上的数字。 请帮参赛选手计…...
docker部署java项目(war包方式)
场景描述:java项目war包,在开发开电脑上使用dockerfile构建镜像,上传镜像到客户服务器中使用docker加载docker镜像,然后部署。 目录 一、本地环境安装 docker git 二、服务器环境安装 docker 三、构建docker镜像(win系统) 四、注意事项 (1)系统架构 (2)使…...
jsp 自定义taglib
一、简介 我们在javaWeb开发中,经常会用到jsp的taglib标签,有时候并不能满足我们的实际需要,这就需要我们自定义taglib标签, 二、开发步骤 1、编写control方法,继承BodyTagSupport 2、定义zdytaglib.tld标签文件 3、…...
从一到无穷大 #32 TimeCloth,云上的快速 Point-in-Time Recovery
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。 文章目录 引言解决方案FAST FINE-GRAINED PITRLog FilterInter-Record Dependency ResolutionL…...
时间序列论文1——Forecasting at Scale
目录 0. AI总结0.1 文章概述0.2 研究背景0.3 研究思路0.4 研究结论与讨论1. Introduction2 Features of Business Time Series3 The Prophet Forecasting Model3.1 The Trend Model3.2 Seasonality3.3 Holidays and Events3.4 Model Fitting3.5 Analyst-in-the-Loop Modeling4 …...
HDFS常用命令
HDFS常用命令 1.HDFS命令介绍1.1基本语法格式1.2常用命令 1.HDFS命令介绍 HDFS 提供了一组命令行工具,用于管理和操作 HDFS 文件系统。 1.1基本语法格式 hdfs dfs -<命令> [选项] <参数>1.2常用命令 1.显示<path>指定的文件的详细信息。 had…...
请问如何做好软件测试工作呢?
一、明确测试目标和范围 理解测试目的:在开始测试之前,首先要明确测试的目标和范围,确保测试计划 与需求相匹配。这有助于测试人员聚焦在关键功能上,避免浪费时间和资源。制定详细的测试计划:根据项目需求࿰…...
单片机开发与Linux开发的区别
引言 单片机(MCU)和Linux开发是嵌入式系统领域的两大主要方向。它们在硬件平台、开发环境、应用场景和开发难度上存在显著区别。本文将系统性地比较单片机开发和Linux开发,探讨它们的主要区别及各自的应用场景和难度体系。 一、基本概念 1…...
【机器学习】回归类算法-相关性分析
一、前言 前面的几篇博客我们学习了分类算法,今天我们来了解一下回归类的算法吧。首先我们来谈谈两者有什么区别,首先是我们在之前的分类算法,这类算法可以将让我们学会如何将不同的数据划分到不同的类里面,输出的是一些离散的值。…...
java基础 之 集合与栈的使用(三)
文章目录 Map接口(一)实现类:HashMap特点HashMap集合的一些方法 (二)实现类: TreeMap特点【自然排序】代码【定制排序】代码TreeMap集合的一些方法 HashMap 和 TreeMap的区别 前文回顾: 戳这里 …...
JDK-java.nio包详解
JDK-java.nio包详解 概述 一直以来Java三件套(集合、io、多线程)都是最热门的Java基础技术点,我们要深入掌握好这三件套才能在日常开发中得心应手,之前有编写集合相关的文章,这里出一篇文章来梳理一下io相关的知识点。…...
虚拟机与服务器的区别是什么?虚拟机与服务器的区别和联系
服务器和虚拟机是两个不同的概念,它们在计算机领域有着不同的含义和作用。今天飞飞就和你分享虚拟机和服务器的区别和联系,希望可以帮助到你~ 1、物理形态 a)服务器是实实在在的物理设备,拥有独立的硬件架构。如CPU、硬盘、内存等 b)虚拟机…...
Linux CentOS stream9 命令
初学linux,对字符界面的命令并不陌生。问到什么是linux命令直接答cd、pwd、ls是linux命令。对于命令的定义并熟悉,也不太关心命令的底层执行逻辑,更关心录入命令,马上获取需要的结果。 本文就命令的定义、分类或执行优先级作一简单介绍。 一、定义 搜索网上对linux命令的…...
JavaScript基础——JavaScript变量声明
变量是存储数据的容器,可以变的量,值可以改变,在JavaScript中,变量声明的关键字有var、let,其中,var是ES5的语法,let是ES6的语法,变量需要先声明,在使用。 声明一个age变…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
WPF八大法则:告别模态窗口卡顿
⚙️ 核心问题:阻塞式模态窗口的缺陷 原始代码中ShowDialog()会阻塞UI线程,导致后续逻辑无法执行: var result modalWindow.ShowDialog(); // 线程阻塞 ProcessResult(result); // 必须等待窗口关闭根本问题:…...
基于江科大stm32屏幕驱动,实现OLED多级菜单(动画效果),结构体链表实现(独创源码)
引言 在嵌入式系统中,用户界面的设计往往直接影响到用户体验。本文将以STM32微控制器和OLED显示屏为例,介绍如何实现一个多级菜单系统。该系统支持用户通过按键导航菜单,执行相应操作,并提供平滑的滚动动画效果。 本文设计了一个…...
