当前位置: 首页 > news >正文

第07讲:Java High Level Client,读写 ES 利器

SkyWalking OAP 后端可以使用多种存储对数据进行持久化,例如 MySQL、TiDB 等,默认使用 ElasticSearch 作为持久化存储,在后面的源码分析过程中也将以 ElasticSearch 作为主要存储进行分析。

ElasticSearch 基本概念

本课时将快速介绍一下 ElasticSearch 的基本概念,如果你没有用过 ElasticSearch ,可以通过本小节迅速了解 ElasticSearch 中涉及的基本概念。Elasticsearch 是一个基于 Apache Lucene 的开源搜索引擎,无论在开源还是专业领域,Apache Lucene 可以被认为是迄今为止最先进、性能最好、功能最全的搜索引擎库。但是,Apache Lucene 只是一个工具库,本身使用也比较复杂,直接使用 Apache Lucene 对开发人员的要求比较高,需要检索方面的知识来理解它是如何工作的。ElasticSearch 使用 Java 对 Apache Lucene 进行了封装,提供了简单易用的 RESTful API,隐藏 Apache Lucene 的复杂性,降低了全文检索的编程门槛。

ElasticSearch 中有几个比较核心的概念,为了方便你理解,我将其与数据库中的概念进行映射,如下图所示:

注意:在老版本的 ElasticSearch 中,Index 和 Document 之间还有个 Type 的概念,每个 Index 下可以建立多个 Type,Document 存储时需要指定 Index 和 Type。从 ES 6.0 版本开始单个 Index 中只能有一个 Type,ES 7.0 版本以后将不建议使用 Type,ES 8.0 以后完全不支持 Type。

Document 是构建 Index 的基本单元。例如,一条订单数据就可以是一个 Document,其中可以包含多个 Field,例如,订单的创建时间、价格、明细,等等。Document 以 JSON 格式表示,Field 则是这条 JSON 数据中的字段,如下所示:

{"_index""order_index","_type""1","_id""kg72dW8BOCMUWGurIFiy","_version"1,"_score"1,"_source": {"create_time""2020-02-01 17:35:00","creator""xxxxx","order_status""NEW","price"100.00,}
}

Index 是具有某些类似特征的 Document 的集合,Index 与 Document 之间的关系就类似于数据库中 Table 与 Row 之间的关系。在 Index 中可以存储任意数量的 Document。在后续介绍的示例中可以看到,对 Document 的添加、删除、更新、搜索等操作,都需要明确的指定 Index 名称。

最后,还需要了解 ElasticSearch 中一个叫作 Index Template(模板)的概念。Index Template 一般会包含 settings、mappings、index_patterns 、order、aliases 几部分:

  • index_patterns 负责匹配 Index 名称,Index Template 只会应用到名称与之匹配的 Index 上,而且 ElasticSearch 只会在 Index 创建的时候应用匹配的 Index Template,后续修改 Index Template 时不会影响已有的 Index。通过 index_patterns 匹配可以让多个 Index 重用一个 Index Template。
  • settings 主要用于设置 Index 中的一些相关配置信息,如分片数、副本数、refresh 间隔等信息(后面会介绍分片数和副本数的概念);
  • mappings 主要是一些说明信息,类似于定义该 Index 的 schema 信息,例如,指定每个 Field 字段的数据类型;>
  • order 主要作用于在多个 Index Template 同时匹配到一个 Index 的情况,如果此时这些Index Template 中的配置出现不一致,则以 order 的最大值为准,order 默认值为 0。另外,创建 Index 的命令中如果自带了 settings 或 mappings 配置,则其优先级最高;
  • aliases 则是为匹配的 Index 创建别名。我们可以通过请求 http://localhost:9200/_alias/*获取所有别名与 Index 之间的对应关系。

下面是 SkyWalking 使用的 segment 模板,它会匹配所有 segment-* 索引,segment-yyyyMMdd 索引是用来存储 Trace 数据的:

{"segment": {"order"0,"index_patterns": ["segment-*"],"settings": {"index": {"refresh_interval""3s","number_of_shards""2","number_of_replicas""0"// 省略 analysis字段设置}},"mappings": {"type": {"properties": {"segment_id": {"type""keyword"},"trace_id": {"type""keyword"},"service_id": {"type""integer"},// 省略其他字段的设置}}},"aliases": { // 为匹配的Index创建别名"segment": {}}}
}

节点角色

一个 ElasticSearch 集群是由一个或多个节点组成,这些节点共同存储了集群中的所有数据,并且 ElasticSearch 提供了跨节点的联合索引和搜索功能。集群名称是一个 ElasticSearch 集群的唯一标识,在请求 ElasticSearch 集群时都需要使用到这个集群名称。在同一个网络环境中,需要保证集群名称不重复,否则集群中的节点可能会加入到错误的集群中。

ElasticSearch 集群是去中心化的,ElasticSearch 节点的相互发现是基于 Pull-Push 版本的 Gossip 算法实现的。Zen Discovery 是 ElasticSearch 默认的发现实现,提供了广播和单播的能力,帮助一个集群内的节点完成快速的相互发现。

ElasticSearch 集群中的节点有多个可选的角色,这些角色都是通过在节点的配置文件中配置的。

  • Master Eligible Node (候选主节点):可以被选举为 Master 的候选节点;
  • Master Node (主节点):完成节点发现阶段之后,才会进入主节点选举阶段,为了防止在网络分区的场景下出现脑裂问题,一般采用 quorum 版本的 Bully 算法变体(本课时重点是帮助你快速了解 ElasticSearch 基础知识,不展开该算法的具体原理)。所以,主节点是从候选主节点中选举出来的,主要负责管理 ElasticSearch 集群,通过广播的机制与其他节点维持关系,负责集群中的 DDL 操作(创建/删除索引),管理其他节点上的分片;
  • Data Node(数据节点):存放数据的节点,负责数据的增删改查;
  • Coordinating Node(协调节点):每个节点都是一个潜在的协调节点,协调节点最大的作用就是响应客户端的请求,将各个分片里的数据汇总起来一并返回给客户端,因此 ElasticSearch 的节点需要有足够的 CPU 和内存资源去处理汇总操作;
  • Ingest Node(提取节点):能执行预处理管道,不负责数据也不负责集群相关的事务。

分片&副本

在 ElasticSearch 中的一个 Index 可以存储海量的 Document,单台机器的磁盘大小是无法存储的,而且在进行数据检索的时候,单台机器也存在性能瓶颈,无法为海量数据提供高效的检索。

为了解决上述问题,ElasticSearch 将单个 Index 分割成多个分片,创建 Index 时,可以按照预估值指定任意数量的分片。虽然逻辑上每个分片都属于一个 Index,但是单个分片都是一个功能齐全且独立的 Index,一个分片可以被分配到集群中的任意节点上。

通过分片的功能,Index 就有了容量水平扩展的能力,运维人员可以通过添加节点的方式扩充整个集群的容量。在处理检索请求时,不同的分片由不同的 ElasticSearch 节点进行检索,可以实现并发操作,这样也就可以大大提高检索性能。

最后,某条 Document 数据具体存储在哪个分片,完全由 ElasticSearch 的分片机制决定。当写入一条 Document 的时候,ElasticSearch 会根据指定的 key (默认是 ElasticSearch 自动生成的 Id,用户也可以手动指定)决定其所在的分片编号,计算公式如下:

分片编号 = hash(key) % 主分片数量

主分片的数量决定了 Document 所在的分片编号,所以在创建 Index 之后,主分片数量不能改变。

在进行搜索时,每个分片产生的部分查询结果,也是由 ElasticSearch 集群负责进行聚合的,整个过程对于 Client 来说是透明的,如同操作一个单节点 ElasticSearch 实例。

单台服务器在实际使用中可能会因为这样或那样的原因发生故障,例如意外断电、系统崩溃、磁盘寿命到期等,这些故障是无法预知的。当发生故障时,该节点负责的分片就无法对外提供服务了,此时需要有一定的容错机制,在发生故障时保证此分片可以继续对外提供服务。

ElasticSearch 提供的副本功能就可以很好的解决这一问题,在副本模式下,每个分片分为主分片和副本分片,下图中一个 Index 有两个分片,p0 和 p1 是两个主分片,r0 和 r1 则是相应的副本分片:

副本带来了两个好处:一个是在主分片出现故障的时候,可以通过副本继续提供服务(所以,分片副本一般不与主分片分配到同一个节点上);另一个就是查询操作可以在分片副本上执行,因此可以提升整个 ElasticSearch 查询性能。

ElasticSearch 写入流程简介

分片是 ElasticSearch 中最小的数据分配单位,即一个分片总是作为一个整体被分配到集群中的某个节点。继续深入分片的结构会发现,一个分片是由多个 Segment 构成的,如下图所示:

Segment 是最小的数据存储单元,ElasticSearch 每隔一段时间会产生一个新的 Segment,用于写入最新的数据。旧的 Segment 是不可改变的,只能用于数据查询,是无法继续向其中写入数据的。

在很多分布式系统中都能看到类似的设计,这种设计有下面几点好处:

  • 旧 Segment 不支持修改,那么在读操作的时候就不需要加锁,省去了锁本身以及竞争锁相关的开销;
  • 只有最新的 Segment 支持写入,可以实现顺序写入的效果,增加写入性能;
  • 只有最新的 Segment 支持写入,可以更好的利用文件系统的 Cache 进行缓存,提高写入和查询性能。

介绍完分片内部的 Segment 结构之后,接下来简单介绍一下 ElasticSearch 集群处理一个写入请求的大致过程:

写入请求会首先发往协调节点(Coordinating Node),之前提到,协调节点可能是 Client 连接上的任意一个节点,协调节点根据 Document Id 找到对应的主分片所在的节点。

接下来,由主分片所在节点处理写入请求,先是写入 Transaction Log 【很多分布式系统都有 WAL (Write-ahead Log)的概念,可以防止数据丢失】,而后将数据写入内存中,默认情况下每隔一秒会同步到 FileSystem Cache 中,Cache 中的数据在后续查询中已经可以被查询了,默认情况下每隔 30s,会将 FileSystem cache 中的数据写入磁盘中,当然为了降低数据丢失的概率,可以将这个时间缩短,甚至设置成同步的形式,相应地,写入性能也会受到影响。

写入其他副本的方式与写入主分片的方式类似,不再重复。需要注意的是,这里可以设置三种副本写入策略:

  • quorum:默认为 quorum 策略,即超过半数副本写入成功之后,相应写入请求即可返回给客户端;
  • one :one 策略是只要成功写入一个副本,即可向客户端返回;
  • all:all 策略是要成功写入所有副本之后,才能向客户端返回。

ElasticSearch 的删除操作只是逻辑删除, 在每个 Segment 中都会维护一个 .del 文件,删除操作会将相应 Document 在 .del 文件中标记为已删除,查询时依然可以查到,但是会在结果中将这些"已删除"的 Document 过滤掉。

由于旧 Segment 文件无法修改,ElasticSearch 是无法直接进行修改的,而是引入了版本的概念,它会将旧版本的 Document 在 .del 文件中标记为已删除,而将新版本的 Document 索引到最新的 Segment 中。

另外,随着数据的不断写入,将产生很多小 Segment 文件,ElasticSearch 会定期进行 Segment Merge,从而减少碎片文件,降低文件打开数,提升 I/O 性能。在 Merge 过程中可以同时根据 .del 文件,将被标记的 Document 真正删除,此时才是真正的物理删除。

ElasticSearch 查询流程简介

读操作分为两个阶段:查询阶段和聚合提取阶段。在查询阶段中,协调节点接受到读请求,并将请求分配到相应的分片上(如果没有特殊指定,请求可能落到主分片,也有可能落到副本分片,由协调节点的负载均衡算法来确定)。默认情况下,每个分片会创建一个固定大小的优先级队列(其中只包含 Document Id 以及 Score,并不包含 Document 的具体内容),并以 Score 进行排序,返回给协调节点。如下图所示:

在聚合阶段中,协调节点会将拿到的全部优先级队列进行合并排序,然后再通过 Document ID 查询对应的 Document ,并将这些 Document 组装到队列里返回给客户端。

High Level REST Client 入门

ElasticSearch 提供了两种 Java Client,分别是 Low Level REST Client 和 High Level REST Client,两者底层都是通过 HTTP 接口与 ElasticSearch 进行交互的:

  • Low Level REST Client  需要使用方自己完成请求的序列化以及响应的反序列化;
  • High Level REST Client 是基于 Low Level REST Client 实现的,调用方直接使用特定的请求/响应对象即可完成数据的读写,完全屏蔽了底层协议的细节,无需再关心底层的序列化问题。另外, High Level REST Client 提供的 API 都会有同步和异步( async 开头)两个版本,其中同步方法直接返回相应的 response 对象,异步方法需要添加相应的 Listener 来监听并处理返回结果。

SkyWalking 中提供的 ElasticSearchClient 是对 High Level REST Client 的封装,本课时将简单介绍 High Level REST Client 的基本操作,你可以将本课时作为 High Level REST Client 的入门参考,更加完整的 API 使用可以参考 ElasticSearch  官方文档。

使用 High Level REST Client 的第一步就是初始化 RestHighLevelClient 对象,该过程底层会初始化线程池以及网络请求所需的资源,类似于 JDBC 中的 Connection 对象,相关 API 代码如下:

RestHighLevelClient client = new RestHighLevelClient(RestClient.builder( // 指定 ElasticSearch 集群各个节点的地址和端口号new HttpHost("localhost"9200"http"),new HttpHost("localhost"9201"http")));

拿到 RestHighLevelClient 对象之后,我们就可以通过它发送 CreateIndexRequest 请求创建 Index,示例代码如下:

// 创建 CreateIndexRequest请求,该请求会创建一个名为"skywalking"的 Index,
// 注意,Index 的名称必须是小写
CreateIndexRequest request = new CreateIndexRequest("skywalking");
// 在 CreateIndexRequest请求中设置 Index的 setting信息
request.settings(Settings.builder().put("index.number_of_shards"3)   // 设置分片数量.put("index.number_of_replicas"2)  // 设置副本数量
);
// 在 CreateIndexRequest请求中设置 Index的 Mapping信息,新建的 Index里有
// 个user和message两个字段,都为text类型,还有一个 age字段,为 integer类型
request.mapping("type""user""type=text""age""type=integer""message""type=text");
// 设置请求的超时时间
request.timeout(TimeValue.timeValueSeconds(5));
// 发送 CreateIndex请求
CreateIndexResponse response = client.indices().create(request);
// 这里关心 CreateIndexResponse响应的 isAcknowledged字段值
// 该字段为 true则表示 ElasticSearch已处理该请求
boolean acknowledged = response.isAcknowledged();
Assert.assertTrue(acknowledged);

完成 Index 的创建之后,我们就可以通过 IndexRequest 请求向其中写入 Document 数据了,需要使用 RestHighLevelClient 发送 IndexRequest 实现 Document 的写入:

// 创建 IndexRequest请求,这里需要指定 Index名称
IndexRequest request = new IndexRequest("skywalking"); 
request.type("type");
request.id("1");  // Document Id,不指定的话,ElasticSearch 为其自动分配
// Document的具体内容
String jsonString = "{" +"\"user\":\"kim\"," +"\"postDate\":\"2013-01-30\"," +"\"age\":29," +"\"message\":\"trying out Elasticsearch\"" +"}";
request.source(jsonString, XContentType.JSON);
// 发送 IndexRequest请求,不抛异常,就是创建成了
IndexResponse response = client.index(request);
System.out.println(response);
// 输出如下:
// IndexResponse[index=skywalking,type=type,id=1,version=1,
// result=created,seqNo=0,primaryTerm=1,shards=
// {"total":3,"successful":1,"failed":0}]
// -----------------------
// IndexResponse 中包含写入的 Index名称、Document Id,Document 版本,创建的
// result等重要信息

在明确知道 Document 的 Id 以及所属的 Index 时,我们可以通过 GetRequest 请求查询该 Document 内容,相关代码如下:

try {// 创建 GetRequest请求,这里指定 Index、type以及 Document Id,// 在高版本中,type参数已经消失GetRequest request = new GetRequest("skywalking""type""1");// 发送 GetRequest请求GetResponse response = client.get(request);// 从 GetResponse响应中可以拿到相应的 Document以及相关信息String index = response.getIndex(); // 获取 Index名称String id = response.getId();// 获取 Document Idif (response.isExists()) { // 检查 Document是否存在long version = response.getVersion(); // Document版本System.out.println("version:" + version);// 获取不同格式的 Document内容String sourceAsString = response.getSourceAsString();System.out.println("sourceAsString:" + sourceAsString);Map<String, Object> sourceAsMap = response.getSourceAsMap();System.out.println("sourceAsMap:" + sourceAsMap);byte[] sourceAsBytes = response.getSourceAsBytes();// 按照字段进行遍历Map<String, DocumentField> fields = response.getFields();for (Map.Entry<String, DocumentField> entry : fields.entrySet()) {DocumentField documentField = entry.getValue();String name = documentField.getName();Object value = documentField.getValue();System.out.println(name + ":" + value);}} else {System.out.println("Document Not Exist!");}
} catch (ElasticsearchParseException e) { // Index不存在的异常if (e.status() == RestStatus.NOT_FOUND) {System.err.println("Can find index");}
}
// 输出如下:
// version:1
// sourceAsString:{"user":"kim","postDate":"2013-01-30",
// "age":29,"message":"trying out Elasticsearch"}
// sourceAsMap:{postDate=2013-01-30, message=trying out Elasticsearch, 
// user=kim, age=29}
// _seq_no:0
// _primary_term:1

有时候,我们只想检测某个 Document 是否存在,并不想查询其具体内容,可以通过 exists() 方法实现。该方法发送的还是 GetRequest 请求,但我们可以指明此次请求不查询 Document 的具体内容,在 Document 较大的时候可以节省带宽,具体使用方式如下:

GetRequest request = new GetRequest("skywalking""type""1");
// 不查询 "_source"字段以及其他字段
request.fetchSourceContext(new FetchSourceContext(false));
request.storedFields("_none_");
// 通过exists()方法发送 GetRequest
boolean exists = client.exists(request);
Assert.assertTrue(exists);

删除一个 Document 的时候,使用的是 DeleteRequest,其中需要指定 Document Id 以及所属 Index 即可,使用方式如下:

DeleteRequest request = new DeleteRequest("skywalking""type""1");
DeleteResponse response = client.delete(request);
System.out.println(response);
// 输出:
// DeleteResponse[index=skywalking,type=type,id=1,version=5,
// result=deleted,shards=ShardInfo{total=3, successful=1,
// failures=[]}]

最后来看看更新操作,使用的是 UpdateRequest,示例如下:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{builder.field("age"30);builder.field("message""update Test");
}
builder.endObject();
// 创建 UpdateReques请求
UpdateRequest request = new UpdateRequest("skywalking""type","1").doc(builder);
// 发送请求
UpdateResponse updateResponse = client.update(request);
System.out.println(updateResponse);
// 输出:
// UpdateResponse[index=skywalking,type=type,id=1,version=2,seqNo=1,
// primaryTerm=1,result=updated,shards=ShardInfo{total=3, 
// successful=1, failures=[]}]
// 注意,更新之后 Document version 会增加

除了通过 GetRequest 请求根据 Document Id 进行查询之外,我们还可以通过 SearchRequest 请求进行检索,在检索请求里面我们可以通过 QueryBuilder 构造具体的检索条件,下面是一个简单的示例:

// 创建 SearchRequest请求
SearchRequest searchRequest = new SearchRequest("skywalking");
// 通过 SearchSourceBuilder,用来构造检索条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 通过id进行检索,这里查询 Id为1和2的两个Document
searchSourceBuilder.query(QueryBuilders.idsQuery().addIds("1""2"));
searchRequest.source(searchSourceBuilder);
// 发送 SearchRequest 请求
SearchResponse searchResponse = client.search(searchRequest);
// 遍历 SearchHit
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {System.out.println(searchHit.getId() + ":" + searchHit.getSourceAsMap());
}
// 输出:
// 1:{postDate=2013-01-30, message=update Test, user=kim, age=31}
// 2:{postDate=2020-01-30, message=Test Message, user=Tom, age=51}

按照 Document Id 进行查询之外,ElasticSearch 还提供了一套 Query DSL 语言帮助我们构造复杂的查询条件,查询条件主要分为下面两部分:

  • Leaf 子句:Leaf 子句一般是针对某个字段的查询,例如:
    • Match Query:最常用的 Full Text Query 。Match Query 既能处理全文字段,又能处理精确字段(可参考:Match Query 的官方文档)。
    • Term Query:精确匹配字段值的查询(可参考:Term Query 的官方文档。
    • Range Query:针对一个字段的范围查询(可参考:Range Query 的官方文档。
  • Compound 子句:Conpound 子句是由一个或多个 Leaf 子句或 Compound 子句构成的,例如 Bool Query,包含多个返回 Boolean 值的子句(可参考:Bool Query 的官方文档)。

下面使用 ElasticSearch Query DSL 实现一个复杂查询:

// 创建 SearchRequest请求,查询的Index为skywalking
SearchRequest searchRequest = new SearchRequest("skywalking");
// 通过 SearchSourceBuilder,用来构造检索条件
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
// 创建 BoolQueryBuilder
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 符合条件的 Document中 age字段的值必须位于[10,40]这个范围
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
mustQueryList.add(QueryBuilders.rangeQuery("age").gte(10));
mustQueryList.add(QueryBuilders.rangeQuery("age").lte(40));
// 符合条件的 Document中 user字段的值必须为kim
boolQueryBuilder.must().add(QueryBuilders.termQuery("user""kim"));
sourceBuilder.query(boolQueryBuilder);
searchRequest.source(sourceBuilder);
// 发送 SearchRequest 请求
SearchResponse searchResponse = client.search(searchRequest);
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {System.out.println(searchHit.getId() + ":" + searchHit.getSourceAsMap());
}
// 输出:
// 1:{postDate=2013-01-30, message=update Test, user=kim, age=31}

示例中匹配的 Document 中 age 字段的值位于[10,40]这个范围中,且 user 字段值为 "kim",类似于 SQL 语句中的【age >=10 and  age <=40 and user == "kim"】语句。

High Level REST Client 还提供了批量操作的 API —— BulkProcessor 会将多个请求积攒成一个 bulk,然后批量发给 ElasticSearch 集群进行处理。使用 BulkProcessor 批量操作可以减少请求发送次数,提高请求消息的有效负载,降低 ElasticSearch 集群压力。

BulkProcessor 中有几个核心参数需要设置。

  • setBulkActions() 方法:设置每个 BulkRequest  包含的请求数量,默认值为 1000;
  • setBulkSize():设置每个 BulkRequest 的大小,默认值为 5MB;
  • setFlushInterval():设置两次发送 BulkRequest 执行的时间间隔。没有默认值;
  • setConcurrentRequests():设置并发请求数。默认是 1,表示允许执行 1 个并发请求,积攒 BulkRequest 和发送 bulk 是异步的,其数值表示发送 bulk 的并发线程数(取值可以是任意大于 0 的数字),若设置为 0 表示二者同步;
  • setBackoffPolicy():设置最大重试次数和重试周期。默认最大重试次数为 8 次,初始延迟是 50 ms。

下面来看创建 BulkProcessor 的具体流程:

// 该 Listener可以监听每个 BulkRequest请求相关事件
BulkProcessor.Listener listener = new BulkProcessor.Listener() {...}
// 创建 BulkProcessor
BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync,listener).setBulkActions(bulkActions) .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueSeconds(flushInterval)).setConcurrentRequests(concurrentRequests).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();

这里添加的 BulkProcessor.Listener 实现如下,其中提供了三个方法分别监听 BulkRequest 请求触发的不同事件:

new BulkProcessor.Listener() {public void beforeBulk(long executionId, BulkRequest request) { ... // 在 BulkRequest请求发送之前,会触发该方法}

   public void afterBulk(long executionId, BulkRequest request,
      BulkResponse response)
 

      … // 在收到 BulkResponse响应时触发该方法,这里可以通过 
          // response.hasFailures()方法判断请求是否失败
   } 

   public void afterBulk(long executionId, BulkRequest request,
        Throwable failure)
 

        … // 在 BulkRequest请求抛出异常的时候,会触发该方法
   } 
}

之后我们就可以通过 BulkProcessor.add() 方法向其中添加请求,这些请求最终会积攒成一个 BulkRequest 请求发出:

bulkProcessor.add(new IndexRequest(...));
bulkProcessor.add(new DeleteRequest(...));

一般情况下,我们只需要在全局维护一个 BulkProcessor 对象即可,在应用程序关闭之前需要通过 awaitClose() 方法等待全部请求发送出去后再关闭 BulkProcessor 对象。

相关文章:

第07讲:Java High Level Client,读写 ES 利器

SkyWalking OAP 后端可以使用多种存储对数据进行持久化&#xff0c;例如 MySQL、TiDB 等&#xff0c;默认使用 ElasticSearch 作为持久化存储&#xff0c;在后面的源码分析过程中也将以 ElasticSearch 作为主要存储进行分析。 ElasticSearch 基本概念 本课时将快速介绍一下 E…...

dockerfile暴力处理配置文件外提

前言&#xff1a; 一般来说&#xff0c;springboot打成的jar运行时&#xff0c;同目录/config目录下放application.yml文件会被进行加载&#xff0c;然后通过设置docker映射出宿主机即可做到配置文件外配的效果&#xff0c;但很多时候别的配置文件做不到这种效果&#xff0c;说…...

如何快速给出解释——正交矩阵子矩阵的特征值的模必然不大于1

Memory 首先快速回忆一下正交矩阵的定义&#xff1a; A为n阶实矩阵&#xff0c;且满足A‘AE或是说AA’E&#xff0c;那么A为正交矩阵。 &#xff08;啊&#xff0c;多么简洁的定义&#xff09; 其次快速想到它的性质&#xff1a; ① 实特征值必然 或 其他复数…...

c语言-位运算

位运算小结 ​ 位运算不管是在C语言中&#xff0c;或者其他语言&#xff0c;都是经常会用到的&#xff0c;所以本文也就不固定以某种语言来举例子了&#xff0c;原始点就从0、1开始。位运算主要包括按位与(&)、按位或(|)、按位异或(^)、取反(~)、左移(<<)、右移(>…...

【Android学习专题】安卓样式学习(学习内容记录)

学习记录内容来自《Android编程权威指南&#xff08;第三版&#xff09;》 样式调整和添加 调整颜色资源&#xff08;res/values/colors.xml&#xff09; 格式&#xff1a; 添加样式&#xff08;res/values/styles.xml&#xff09;&#xff0c;&#xff08;创建BeatBox项目时…...

普罗米修斯统计信息上报结构设计

为了实现高效的监控和警报&#xff0c;普罗米修斯提供了一个强大的统计信息上报机制。通过这个机制&#xff0c;可以将应用程序的各种统计信息发送到普罗米修斯&#xff0c;普罗米修斯会对这些信息进行处理&#xff0c;然后提供丰富的监控和警报功能。下面是基本的统计信息上报…...

两个系统之间的传值

在两个系统之间传值可以采用以下几种方式&#xff1a; 使用 URL 参数&#xff1a;可以将数据作为 URL 参数传递给另一个系统&#xff0c;另一个系统可以解析 URL 参数并获取数据。例如&#xff1a;Example Domain 使用 Cookie&#xff1a;可以在一个系统中设置 Cookie&#xf…...

PostgreSQL(五)JDBC连接串常用参数

目录 1.单机 PostgreSQL 连接串2.集群PostgreSQL 连接串 PostgreSQL JDBC 官方驱动下载地址&#xff1a; https://jdbc.postgresql.org/download/ PostgreSQL JDBC 官方参数说明文档&#xff1a; https://jdbc.postgresql.org/documentation/use/ 驱动类&#xff1a; driver-…...

如何修改浏览器中导航栏的背景色和字体

在日常使用电脑时&#xff0c;我们总会使用浏览器来浏览网页。而浏览器中的导航栏是用户进行网页浏览的主要界面之一&#xff0c;其背景色和字体的选择对用户的体验有着重要的影响。因此&#xff0c;为了让导航栏更加美观和易于使用&#xff0c;我们需要对其背景色和字体进行修…...

如何选择合适的智能氮气柜?

随着电子产品的普及&#xff0c;IC、半导体、精密元件、检测仪器之类的物品对湿度要求越来越高&#xff0c;潮湿、霉菌和金属氧化所造成的损害&#xff0c;随时在发生。人们对于物品的存放环境要求逐渐提高&#xff0c;利用防潮设备如智能氮气柜、电子防潮柜来存储产品也越来越…...

双向链表(数据结构)(C语言)

目录 概念 带头双向循环链表的实现 前情提示 双向链表的结构体定义 双向链表的初始化 关于无头单向非循环链表无需初始化函数&#xff0c;顺序表、带头双向循环链表需要的思考 双向链表在pos位置之前插入x 双向链表的打印 双链表删除pos位置的结点 双向链表的尾插 关…...

离线安装Percona

前言 安装还是比较简单&#xff0c;这边简单进行记录一下。 版本差异 一、离线安装Percona 下载percona官网 去下载你需要对应的版本 jemalloc-3.6.0-1.el7.x86_64.rpm 需要单独下载 安装Percona 进入RPM安装文件目录&#xff0c;执行下面的脚本 yum localinstall *.rpm修改…...

界面控件Telerik UI for WinForms使用指南 - 数据绑定 填充(二)

Telerik UI for WinForms拥有适用Windows Forms的110多个令人惊叹的UI控件&#xff0c;所有的UI for WinForms控件都具有完整的主题支持&#xff0c;可以轻松地帮助开发人员在桌面和平板电脑应用程序提供一致美观的下一代用户体验。 Telerik UI for WinForms组件为可视化任何类…...

通过栈/队列/优先级队列/了解容器适配器,仿函数和反向迭代器

文章目录 一.stack二.queue三.deque&#xff08;双端队列&#xff09;四.优先级队列优先级队列中的仿函数手搓优先级队列 五.反向迭代器手搓反向迭代器 vector和list我们称为容器&#xff0c;而stack和queue却被称为容器适配器。 这和它们第二个模板参数有关系&#xff0c;可以…...

leetcode 704. 二分查找

题目描述解题思路执行结果 leetcode 704. 二分查找 题目描述 二分查找 给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&#xff0c;否则返回 -1。 示…...

蓝牙耳机什么牌子好?500内好用的蓝牙耳机推荐

随着蓝牙耳机的受欢迎程度越来越高&#xff0c;近几年来&#xff0c;无蓝牙耳机市场呈爆发式增长&#xff0c;蓝牙耳机品牌也越来越多。那么蓝牙耳机什么牌子好&#xff1f;接下来&#xff0c;我来给大家推荐几款500内好用的蓝牙耳机&#xff0c;一起来看看吧。 一、南卡小音舱…...

设计模式 -- 中介者模式

前言 月是一轮明镜,晶莹剔透,代表着一张白纸(啥也不懂) 央是一片海洋,海乃百川,代表着一块海绵(吸纳万物) 泽是一柄利剑,千锤百炼,代表着千百锤炼(输入输出) 月央泽,学习的一种过程,从白纸->吸收各种知识->不断输入输出变成自己的内容 希望大家一起坚持这个过程,也同…...

人工智能的未来之路:语音识别的应用与挑战

随着人工智能技术的不断发展&#xff0c;语音识别已成为人工智能领域的一个重要应用。语音识别是指通过计算机对语音信号进行处理&#xff0c;将其转换为可以被计算机识别的文本或指令的过程。语音识别技术的应用范围非常广泛&#xff0c;例如智能家居、语音助手、智能客服、智…...

c++ 友元介绍

友元的目的就是让一个函数或类访问另一个函数中的私有成员 友元函数 &#xff08;1&#xff09;普通函数作为友元函数 class 类名{friend 函数返回值类型 友元函数名(形参列表);//这个形参一般是此类的对象.... } 经过以上操作后&#xff0c;友元函数就可以访问此类中的私有…...

四维轻云地理空间数据在线管理软件能够在线管理哪些数据?

四维轻云是一款地理空间数据在线管理软件&#xff0c;支持各类地理空间数据的在线管理、浏览及分享&#xff0c;用户可不受时间地点限制&#xff0c;随时随地查看各类地理空间数据。软件还具有项目管理、场景搭建、素材库等功能模块&#xff0c;支持在线协作管理&#xff0c;便…...

学习 GitHub 对我们有什么好处?

学习 GitHub 对我们有什么好处&#xff1f; 为什么要学习 GitHub&#xff0c;或者说学习 GitHub 对我们有什么好处&#xff1f; 理由一&#xff1a;GitHub 上有很多大牛出没&#xff0c;国外的咱先不说&#xff0c;就国内的像百度、腾讯、阿里之类的大公司&#xff0c;里面的很…...

java记录-反射

什么是反射 反射是一种让Java拥有一定动态性的机制&#xff0c;它允许程序在执行期间取得任何类的内部信息&#xff0c;并且直接操作任意对象的内部属性及方法 类加载 类加载后通过堆内存方法区的Class类型对象就能了解该类的结构信息&#xff0c;这个对象就像该类的一面镜子…...

这次彻底不需要账号了,无需魔法永久白嫖GPT

免费GPT 自GPT风靡以来&#xff0c;大家用的是不亦乐乎&#xff0c;你用他去解决过实际问题&#xff0c;你用他去写过代码&#xff0c;你用他去修改过bug&#xff0c;你用他去写过sql&#xff0c;你用他去画过图&#xff0c;你问过他你能想到的任何“刁钻”问题。 你&#xff…...

远程桌面连接是什么?如何开启远程桌面连接详细教程

远程桌面连接是一种非常方便的技术&#xff0c;它允许用户通过互联网在不同的计算机之间共享资源和访问数据。目前这个技术已经广泛地应用于企业、教育、医疗和其他领域&#xff0c;使得人们能够更高效地工作和学习。 这篇文章&#xff0c;我将解释远程桌面连接是什么&#xf…...

lua实战(2)

目录 值和类型子类型类型字符串type (v) 值和类型 Lua是一种动态类型语言。这意味着变量没有类型;只有价值观才有意义。该语言中没有类型定义。所有值都有自己的类型。 Lua中的所有值都是一等值。这意味着所有的值都可以存储在变量中&#xff0c;作为参数传递给其他函数&…...

UI自动化测试案例——简单的Google搜索测试

以下是一个UI自动化测试的经典案例&#xff1a; import unittest from selenium import webdriverclass GoogleSearchTest(unittest.TestCase):def setUp(self):# 创建Chrome浏览器实例self.driver webdriver.Chrome()self.driver.maximize_window() # 最大化浏览器窗口def t…...

C++之虚函数原理

对象数据和函数的存储方式 注意说的是对象。 C中的对象存储方式是 每个对象占用的存储空间只是该对象的数据部分&#xff08;虚函数指针和虚基类指针也属于数据部分&#xff09;&#xff0c;函数属于公共部分。 虚函数表 虚函数是通过虚函数表实现的。 C实现虚函数的方法是…...

Windows Information Protection(WIP)部署方案

目录 前言 一、方案准备工作 1、确定哪些数据需要保护 2、选择合适的加密方式...

细说Hibernate的缓存机制

Hibernate 的缓存机制主要包括一级缓存和二级缓存。 1. 一级缓存&#xff08;Session 缓存&#xff09;&#xff1a; 一级缓存是 Hibernate 的 Session 级别的缓存&#xff0c;与每个 Session 对象相关联。当您通过 Session 对象执行查询、保存或更新操作时&#xff0c;Hibern…...

初识C++之线程库

目录 一、C中的线程使用 二、C的线程安全问题 1. 加锁 2. 变为原子操作 3. 递归里面的锁 4. 定时锁 5. RAII的锁 三、条件变量 1. 为什么需要条件变量 2. 条件变量的使用 2.1 条件变量的相关函数 2.2 wait函数 一、C中的线程使用 线程的概念在linux中的线程栏已经…...