当前位置: 首页 > news >正文

从零开发短视频电商 使用Spring WebClient发起远程Http调用

文章目录

    • 依赖
    • 使用
      • 创建WebClient实例
        • 创建带有超时的WebClient实例
        • 示例
      • 请求准备
      • 获取响应
    • 高级
      • 过滤器
        • 自定义过滤器
      • 自定义线程池
      • 自定义WebClient连接池
      • 开启日志
      • 错误处理
        • 最佳实践
    • 示例
      • 异步请求
      • 同步请求
      • 上传文件
      • 重试
      • 过滤错误
      • 错误处理
    • 参考

Spring WebClient 是 Spring WebFlux 项目中 Spring 5 中引入的异步、反应式 HTTP 客户端,用于替换旧的 RestTemplate,以便在使用 Spring Boot 框架构建的应用程序中进行 REST API 调用。

它支持同步、异步和流式场景。

它是一种基于 HTTP/1.1 协议的反应式、非阻塞解决方案。

依赖

为了使用WebClient,我们需要添加对 Spring WebFlux 启动器模块的依赖:

  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>

使用

创建WebClient实例

WebClient client = WebClient.create("http://localhost:8080");WebClient client = WebClient.builder().baseUrl("http://localhost:8080") // 基本 URL.defaultCookie("cookieKey", "cookieValue") // 定义默认cookie.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) // 定义默认header.defaultUriVariables(Collections.singletonMap("url", "http://localhost:8080")) // 定义默认 quringstring参数.build();WebClient.builder().baseUrl(host).exchangeStrategies(ExchangeStrategies.builder().codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(500 * 1024))// 编解码器内存中数据的缓冲,默认值为 262,144 字节.build()).build();

创建带有超时的WebClient实例

默认的 30 秒超时

  • 通过 ChannelOption.CONNECT_TIMEOUT_MILLIS 选项设置连接超时时间
  • 使用 ReadTimeoutHandler 和 WriteTimeoutHandler 分别设置读取写入超时时间
  • 使用 responseTimeout 指令配置响应超时时间
HttpClient httpClient = HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时时间.responseTimeout(Duration.ofMillis(5000)) // 响应超时时间.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS)) // 读超时.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))); // 写超时WebClient client = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();

示例

// 配置资源工厂,以控制连接池、线程池和其他资源的创建和管理
ReactorResourceFactory factory = new ReactorResourceFactory();
// 设置是否使用全局资源。设置为 false将创建独立的资源。默认情况下,为 true,表示使用全局资源。
factory.setUseGlobalResources(false);
// 配置连接池的提供者,控制连接的创建、分配和回收。 创建名为"httpClient"的连接池,最大连接数为 50 默认是 max(cpu,8)*2 。
factory.setConnectionProvider(ConnectionProvider.create("httpClient", 50));
// 用于配置事件循环资源,它管理底层事件循环线程。创建一个名为 "httpClient" 的事件循环资源,最大线程数为 50,而第三个参数 线程是否在 JVM 关闭时释放 max(cpu,4)
factory.setLoopResources(LoopResources.create("httpClient", 50, true));
WebClient.builder().baseUrl("")// 用于配置请求和响应的处理策略 通常用于配置序列化和反序列化。.exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)).build())// 用于配置编解码器。.codecs(clientCodecConfigurer -> {clientCodecConfigurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024); // 设置最大内存大小}).clientConnector(new ReactorClientHttpConnector(factory, client -> client// 设置连接建立的超时时间,单位为毫秒。.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)// 启用或禁用 HTTP 响应的压缩。默认情况下,压缩是禁用的。.compress(true)// 启用或禁用 "wiretap",这将允许你记录请求和响应的详细信息,用于调试和监控。.wiretap(true).responseTimeout(Duration.ofMillis(5000)) // 响应超时时间.doOnConnected(connection -> {// 添加读超时处理器,单位为毫秒connection.addHandlerLast(new ReadTimeoutHandler(10));// 添加写超时处理器,单位为毫秒connection.addHandlerLast(new WriteTimeoutHandler(8));})// 启用跟随重定向,默认情况下启用.followRedirect(true))).filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {// 在请求发出之前记录请求信息System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());return Mono.just(clientRequest);}).andThen(ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {// 在响应接收后记录响应信息System.out.println("Response: " + clientResponse.statusCode());return Mono.just(clientResponse);}))).build();

请求准备

// 1.方法
UriSpec<RequestBodySpec> uriSpec = client.method(HttpMethod.POST);
UriSpec<RequestBodySpec> uriSpec = client.post();
// 2.uri
RequestBodySpec bodySpec = uriSpec.uri("/resource");
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder -> uriBuilder.pathSegment("/resource").build());
//  "/products/2"
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/{id}").build(2))
//  "/products/2/attributes/13"    
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/{id}/attributes/{attributeId}").build(2, 13))
//	"/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019"    
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/").queryParam("name", "AndroidPhone").queryParam("color", "black").queryParam("deliveryDate", "13/04/2019").build())
//	"/products/?name=AndroidPhone&color=black&deliveryDate=13%2F04%2F2019" 这种'/'符被转义了
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/").queryParam("name", "{title}").queryParam("color", "{authorId}").queryParam("deliveryDate", "{date}").build("AndroidPhone", "black", "13/04/2019"))
// 数组参数 "/products/?category=Phones&category=Tablets"
webClient.get().uri(uriBuilder - > uriBuilder.path("/products/").queryParam("category", "Phones", "Tablets").build())
// 数组参数 "/products/?category=Phones,Tablets"
webClient.get().uri(uriBuilder - > uriBuilder.path("/products/").queryParam("category", String.join(",", "Phones", "Tablets")).build())    // 3.内容
RequestHeadersSpec<?> headersSpec = bodySpec.bodyValue("data");
RequestHeadersSpec<?> headersSpec = bodySpec.body(Mono.just(new Foo("name")), Foo.class);
RequestHeadersSpec<?> headersSpec = bodySpec.body(BodyInserters.fromValue("data"));
LinkedMultiValueMap map = new LinkedMultiValueMap();
map.add("key1", "value1");
map.add("key2", "value2");
RequestHeadersSpec<?> headersSpec = bodySpec.body(BodyInserters.fromMultipartData(map));
// 4.header标头
ResponseSpec responseSpec = headersSpec.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML).acceptCharset(StandardCharsets.UTF_8).ifNoneMatch("*").ifModifiedSince(ZonedDateTime.now()).retrieve();

获取响应

发送请求并接收响应。我们可以通过使用exchangeToMono/exchangeToFlux或retrieve方法来实现。

// ExchangeToMono和ExchangeToFlux方法允许访问ClientResponse及其状态和标头
Mono<String> response = headersSpec.exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.OK)) {return response.bodyToMono(String.class);} else if (response.statusCode().is4xxClientError()) {return Mono.just("Error response");} else {return response.createException().flatMap(Mono::error);}
});
// 而retrieve方法是直接获取body
Mono<String> response = headersSpec.retrieve().bodyToMono(String.class);

需要注意的是 ResponseSpec.bodyToMono 方法,如果状态码为4xx(客户端错误)或5xx(服务器错误),它将抛出一个 WebClientException。

单个资源

Mono<Employee> employeeMono = client.get().uri("/employees/{id}", "1").retrieve().bodyToMono(Employee.class);employeeMono.subscribe(System.out::println);

多个资源

Flux<Employee> employeeFlux = client.get().uri("/employees").retrieve().bodyToFlux(Employee.class);employeeFlux.subscribe(System.out::println);

高级

过滤器

过滤器可以拦截、检查和修改客户端请求(或响应)。过滤器非常适合为每个请求添加功能,因为逻辑保留在一个位置。用例包括监视、修改、记录和验证客户端请求。

一个请求具有一个有序链,包含零个或多个过滤器。

在Spring Reactive中,过滤器是 ExchangeFilterFunction 的实例。过滤器函数有两个参数:要修改的 ClientRequest 和下一个 ExchangeFilterFunction。

通常,过滤器函数通过调用过滤器链中的下一个函数来返回:

ExchangeFilterFunction filterFunction = (clientRequest, nextFilter) -> {LOG.info("WebClient fitler executed");return nextFilter.exchange(clientRequest);
};// 添加过滤器
WebClient webClient = WebClient.builder().filter(filterFunction).build();WebClient.builder().filter((request, next) -> {  //过滤器,3次重试,header打印log.info(String.format("请求地址: %s", request.url()));log.info(String.format("请求头信息: %s", request.headers()));Mono<ClientResponse> exchange = next.exchange(request).retry(3);ClientResponse clientResponse = exchange.block();log.info(String.format("响应头信息: %s", clientResponse.headers().asHttpHeaders()));return exchange;}).clientConnector(connector).build();

自定义过滤器

让我们从一个对客户端发送的 HTTP GET 请求进行计数的过滤器开始。

过滤器检查请求方法并在 GET 请求的情况下增加“全局”计数器:

ExchangeFilterFunction countingFunction = (clientRequest, nextFilter) -> {HttpMethod httpMethod = clientRequest.method();if (httpMethod == HttpMethod.GET) {getCounter.incrementAndGet();}return nextFilter.exchange(clientRequest);
};

我们将定义的第二个过滤器将版本号附加到请求 URL 路径。我们利用ClientRequest.from()方法从当前请求对象创建一个新的请求对象并设置修改后的 URL。

随后,我们继续使用新修改的请求对象执行过滤器链:

ExchangeFilterFunction urlModifyingFilter = (clientRequest, nextFilter) -> {String oldUrl = clientRequest.url().toString();URI newUrl = URI.create(oldUrl + "/" + version);ClientRequest filteredRequest = ClientRequest.from(clientRequest).url(newUrl).build();return nextFilter.exchange(filteredRequest);
};

自定义线程池

WebClient 默认使用 Project Reactor 提供的线程池来执行异步操作。但是,你可以根据应用程序的需求进行自定义线程池配置。以下是一些相关原理和最佳实践:

  • 调度器(Schedulers):WebClient 使用调度器来管理线程,例如 Schedulers.elastic() 用于 CPU 密集型操作,Schedulers.parallel() 用于 I/O 密集型操作。你可以通过 publishOnsubscribeOn 方法来切换调度器。
webClient.get().uri("/todos/1").retrieve().bodyToMono(Todo.class).subscribeOn(Schedulers.elastic()) // 切换订阅线程.publishOn(Schedulers.parallel()) // 切换发布线程.subscribe(result -> {// 处理响应});
  • 自定义线程池:如果需要更精细的线程控制,你可以创建自定义的线程池,并在调度器中使用它。这对于控制并发度和资源管理非常有用。
ExecutorService customExecutorService = Executors.newFixedThreadPool(10);webClient.get().uri("/todos/1").retrieve().bodyToMono(Todo.class).subscribeOn(Schedulers.fromExecutor(customExecutorService)).subscribe(result -> {// 处理响应});

自定义WebClient连接池

WebClient 使用 Reactor Netty 作为底层的 HTTP 客户端,它管理着连接池。连接池是一组可重用的连接,以提高性能和资源利用率。以下是有关连接池的原理和最佳实践:

  • 连接池大小:连接池的大小可以通过 WebClient 的配置进行设置。默认情况下,它会根据应用程序的需求动态分配连接。你可以使用 HttpClientmaxConnections 方法来设置最大连接数。
HttpClient httpClient = HttpClient.create().maxConnections(50); // 设置最大连接数为 50WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).baseUrl("https://jsonplaceholder.typicode.com").build();
  • 连接超时:你可以配置连接超时和读写超时,以确保请求不会永远等待。这对于防止应用程序被慢速或不响应的服务挂起非常重要。
HttpClient httpClient = HttpClient.create().responseTimeout(Duration.ofSeconds(10)) // 设置响应超时时间为 10 秒.doOnConnected(connection -> {connection.addHandlerLast(new ReadTimeoutHandler(10)); // 设置读取超时connection.addHandlerLast(new WriteTimeoutHandler(10)); // 设置写入超时});WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).baseUrl("https://jsonplaceholder.typicode.com").build();
  • 默认情况下,WebClient 会自动管理连接池,无需手动配置。
  • 通常情况下,你不需要担心连接池的具体细节,因为它会在后台自动处理连接的创建、重用和关闭。
  • 如果你需要更精细的控制,可以考虑配置连接池的大小和超时设置。

开启日志

在开发和调试期间,启用详细的 WebClient 日志记录可以帮助你识别问题。你可以使用 Spring Boot 的日志配置来启用 WebClient 的日志记录。在 application.propertiesapplication.yml 中添加以下配置:

logging.level.org.springframework.web.reactive.function.client=DEBUG
logging.level.reactor.netty.http.client=DEBUG
logging.level.reactor.netty.tcp.client=DEBUG

这将为 WebClient 的 HTTP 请求和响应生成详细的日志信息,包括请求头、响应头和响应体。

WebClient 提供了一种简单的方法来记录请求和响应的日志,以便进行调试和监控。你可以通过添加过滤器来实现日志记录。以下是一个示例:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient.Builder webClientBuilder() {return WebClient.builder().filter(logRequest()).filter(logResponse());}private ExchangeFilterFunction logRequest() {return ExchangeFilterFunction.ofRequestProcessor(request -> {// 记录请求日志System.out.println("Request: " + request.method() + " " + request.url());return Mono.just(request);});}private ExchangeFilterFunction logResponse() {return ExchangeFilterFunction.ofResponseProcessor(response -> {// 记录响应日志System.out.println("Response: " + response.statusCode());return Mono.just(response);});}// ... 省略其他配置
}

在上面的示例中,我们定义了两个过滤器 logRequestlogResponse,分别用于记录请求和响应的日志。你可以根据需要将日志输出到日志文件或其他监控工具。

错误处理

WebClient 提供了多种处理错误的方式。在上面的示例中,我们使用了 onStatus 方法来处理特定的 HTTP 状态码。这里有一些更多的错误处理策略和最佳实践:

  • onStatus 方法onStatus 方法允许你根据 HTTP 响应的状态码来处理错误。你可以根据需要定义不同的处理逻辑,例如重试、返回默认值或引发自定义异常。
  • onErrorResume 方法:使用 onErrorResume 方法,你可以在出现错误时返回一个备用的 Mono。这可以用于从缓存中获取数据或返回默认值。
  • 全局错误处理器:你可以注册一个全局错误处理器来处理 WebClient 的全局错误,例如连接失败、超时等。通过 ExchangeStrategies 可以定义一个全局错误处理器。
@Bean
public WebClient.Builder webClientBuilder() {return WebClient.builder().exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)).build()).clientConnector(new ReactorClientHttpConnector(HttpClient.newConnection().compress(true).resolver(DefaultAddressResolverGroup.INSTANCE))).baseUrl("https://jsonplaceholder.typicode.com").filter((request, next) -> next.exchange(request).doOnError(throwable -> {// 全局错误处理逻辑}));
}

WebClient 提供了丰富的错误处理机制,可以通过 onStatus 和其他方法来捕获和处理不同类型的错误。例如,onStatus 方法用于根据 HTTP 响应的状态码来处理错误。你可以在 onStatus 方法中返回一个 Mono.error,将错误包装成异常并传播。

最佳实践

  • 使用 onStatus 处理不同的 HTTP 状态码。这可以让你根据状态码执行不同的错误处理逻辑。
  • 使用 onErrorResume 来提供备用值或执行备用操作,以确保即使出现错误,也能返回有意义的响应。
  • 使用 onErrorReturn 来在发生错误时返回一个默认值。
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> {// 处理 4xx 错误,或返回备用值return Mono.error(new CustomException("Client error: " + response.statusCode()));
})
.onStatus(HttpStatus::is5xxServerError, response -> {// 处理 5xx 错误,或返回备用值return Mono.error(new CustomException("Server error: " + response.statusCode()));
})
.bodyToMono(Todo.class)    
.onErrorResume(CustomClientException.class, ex -> {// 处理自定义客户端异常return Mono.just(createDefaultTodo()); // 返回一个默认值
})
.onErrorResume(CustomServerException.class, ex -> {// 处理自定义服务器异常return Mono.error(new CustomFallbackException("Fallback error: " + ex.getMessage()));
}) 
.onErrorResume(Exception.class, error -> {// 处理其他类型的异常,或返回备用值return Mono.just(new DefaultResponse());
})
.onErrorReturn(CustomClientException.class, createDefaultTodo()) // 在客户端错误时返回默认值
.onErrorReturn(CustomServerException.class, createDefaultTodo()) // 在服务器错误时返回默认值    
.doOnError(error -> {// 在发生错误时执行的操作System.err.println("Error occurred: " + error.getMessage());})

示例

异步请求

client.get().uri(URLConstants.URL).header(URLConstants.API_KEY_NAME, URLConstants.API_KEY_VALUE).retrieve().bodyToMono(String.class).subscribe(result->System.out.println(result));// 创建 WebClient 实例WebClient webClient = webClientBuilder().baseUrl("https://jsonplaceholder.typicode.com").build();// 执行 GET 请求Mono<ResponseEntity<String>> responseMono = webClient.get().uri("/posts/1").retrieve().toEntity(String.class);// 订阅响应并处理结果responseMono.subscribe(responseEntity -> {if (responseEntity.getStatusCode().is2xxSuccessful()) {System.out.println("Response Body: " + responseEntity.getBody());} else {System.err.println("Request failed with status code: " + responseEntity.getStatusCode());}});

以非阻塞的方式订阅subscribe(),该方法返回Mono的包装器。

同步请求

虽然Spring WebClient是异步的,但是我们仍然可以通过调用阻塞线程直到执行结束的方法block()来进行同步调用。方法执行后我们得到结果。

 String block = webClient.get().uri("https://jsonplaceholder.typicode.com/t3odos/1").retrieve().onStatus(HttpStatus::is4xxClientError, response -> {// 处理 4xx 错误,或返回备用值return Mono.error(new RuntimeException("Client error: " + response.statusCode()));}).onStatus(HttpStatus::is5xxServerError, response -> {// 处理 5xx 错误,或返回备用值return Mono.error(new RuntimeException("Server error: " + response.statusCode()));}).bodyToMono(String.class).onErrorResume(RuntimeException.class, ex -> {// 处理自定义异常 // 返回一个默认值return Mono.just("createDefaultTodo()");}).doOnError(error -> {// 在发生错误时执行的操作System.out.println("Error occurred: " + error.getMessage());}).block(Duration.ofSeconds(10));System.out.println(block);String result = client.post().uri("https://reqbin.com/echo/post/json").body(BodyInserters.fromValue(prepareRequest())).exchange().flatMap(response -> response.bodyToMono(String.class)).block();System.out.println("result::" + result);private String prepareRequest() {var values = new HashMap<String, String>() {{put("Id", "12345");put("Customer", "Roger Moose");put("Quantity", "3");put("Price", "167.35");}};var objectMapper = new ObjectMapper();String requestBody;try {requestBody = objectMapper.writeValueAsString(values);} catch (JsonProcessingException e) {e.printStackTrace();return null;}return requestBody;}}

创建了一个 JSON 字符串prepareRequest(),然后将该字符串作为请求正文发送到 HTTPPOST方法中。

exchange()与之前使用的retrieve()方法相比,该方法通过提供对来自 HTTP 客户端的响应的访问来提供更多控制。

上传文件

Mono<HttpStatus> httpStatusMono = webClient.post().uri(url).contentType(MediaType.APPLICATION_PDF).body(BodyInserters.fromResource(resource)).exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.OK)) {return response.bodyToMono(HttpStatus.class).thenReturn(response.statusCode());} else {throw new ServiceException("Error uploading file");}});
// 从多部分资源上传文件
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("file", multipartFile.getResource());Mono<HttpStatus> httpStatusMono = webClient.post().uri(url).contentType(MediaType.MULTIPART_FORM_DATA).body(BodyInserters.fromMultipartData(builder.build())).exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.OK)) {return response.bodyToMono(HttpStatus.class).thenReturn(response.statusCode());} else {throw new ServiceException("Error uploading file");}});

重试

// 使用retry方法
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class).retry(3); // 无论 Web 客户端返回什么错误,这都会重试最多 3 次。
}
// 使用retryWhen方法的可配置策略
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class).retryWhen(Retry.max(3));
}
// 固定延迟重试
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class)// 尝试之间有两秒的延迟,这可能会增加成功的机会.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)));
}
// 不是按固定间隔重试
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class)// 会逐渐增加尝试之间的延迟- 大约为 2 秒、4 秒,然后是 8 秒.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)));
}
// 抖动重试 为计算的延迟间隔增加了随机性
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(String.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));
}

过滤错误

服务中的任何错误都将导致重试尝试,包括 4xx 错误,例如400:Bad Request或401:Unauthorized。

显然,我们不应该重试此类客户端错误,因为服务器响应不会有任何不同。因此,让我们看看如何仅在出现特定错误的情况下应用重试策略

public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve()// 当是5xx 错误的异常,返回我们自定义的异常.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode()))).bodyToMono(String.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(5))// 仅在抛出ServiceException时重试.filter(throwable -> throwable instanceof ServiceException));
}

所有重试尝试均不成功的时候。在这种情况下,该策略的默认行为是传播 RetryExhaustedException ,包装最后一个错误。

public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode()))).bodyToMono(String.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(5)).filter(throwable -> throwable instanceof ServiceException)// 一系列失败的重试结束后,请求将失败并出现 ServiceException       .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {throw new ServiceException("External Service failed to process after max retries", HttpStatus.SERVICE_UNAVAILABLE.value());}));
}

错误处理

使用onStatus

onStatus是一种内置机制,可用于处理WebClient响应。这使我们能够根据特定响应(例如 400、500、503 等)或状态类别(例如 4XX 和 5XX 等)应用细粒度的功能:

WebClient.builder().build().post().uri("/some-resource").retrieve().onStatus(HttpStatus.INTERNAL_SERVER_ERROR::equals,response -> response.bodyToMono(String.class).map(Exception::new))

onStatus方法需要两个参数。第一个是接收状态代码的谓词。第二个参数的执行基于第一个参数的输出。第二个是将响应映射到Mono或异常的函数。

示例为,如果我们看到 INTERNAL_SERVER_ERROR (即 500),我们将使用bodyToMono获取主体,然后将其映射到新的Exception。

我们可以链接onStatus调用,以便能够为不同的状态条件提供功能:

Mono<String> response = WebClient.builder().build().post().uri("some-resource").retrieve().onStatus( HttpStatus.INTERNAL_SERVER_ERROR::equals,response -> response.bodyToMono(String.class).map(CustomServerErrorException::new)) .onStatus(HttpStatus.BAD_REQUEST::equals,response -> response.bodyToMono(String.class).map(CustomBadRequestException::new))... .bodyToMono(String.class);// do something with responsewebClient.get().uri("/todos/{id}", id).retrieve().onStatus(HttpStatus::is4xxClientError, response -> {// 处理 4xx 错误return Mono.error(new CustomException("Client error: " + response.statusCode()));}).onStatus(HttpStatus::is5xxServerError, response -> {// 处理 5xx 错误return Mono.error(new CustomException("Server error: " + response.statusCode()));}).bodyToMono(Todo.class);

现在onStatus调用映射到我们的自定义异常。我们为这两种错误状态分别定义了异常类型。onStatus方法允许我们使用我们选择的任何类型。

使用ExchangeFilterFunction

ExchangeFilterFunction是处理特定状态代码和获取响应正文的另一种方法 。与onStatus不同,交换过滤器非常灵活,适用于基于任何布尔表达式的过滤器功能。

我们可以受益于ExchangeFilterFunction的灵活性,涵盖与onStatus函数相同的类别。

处理返回的逻辑:

private static Mono<ClientResponse> exchangeFilterResponseProcessor(ClientResponse response) {HttpStatus status = response.statusCode();if (HttpStatus.INTERNAL_SERVER_ERROR.equals(status)) {return response.bodyToMono(String.class).flatMap(body -> Mono.error(new CustomServerErrorException(body)));}if (HttpStatus.BAD_REQUEST.equals(status)) {return response.bodyToMono(String.class).flatMap(body -> Mono.error(new CustomBadRequestException(body)));}return Mono.just(response);
}

接下来,我们将定义过滤器并使用对处理程序的方法引用:

ExchangeFilterFunction errorResponseFilter = ExchangeFilterFunction.ofResponseProcessor(WebClientStatusCodeHandler::exchangeFilterResponseProcessor);

与onStatus调用类似,我们在错误时映射到异常类型。但是,使用Mono.error会将此异常包装ReactiveException 中。处理错误时应牢记这种嵌套。

现在我们将其应用于WebClient的实例,以达到与onStatus链式调用相同的效果:

Mono<String> response = WebClient.builder().filter(errorResponseFilter).build().post().uri("some-resource").retrieve().bodyToMono(String.class);// do something with response

参考

  • https://reflectoring.io/comparison-of-java-http-clients/
  • https://reflectoring.io/spring-webclient/
  • https://docs.flydean.com/spring-framework-documentation5/webreactive/2.webclient
  • https://blog.hanqunfeng.com/2020/04/18/http-utils/#WebClientUtil

相关文章:

从零开发短视频电商 使用Spring WebClient发起远程Http调用

文章目录 依赖使用创建WebClient实例创建带有超时的WebClient实例示例 请求准备获取响应 高级过滤器自定义过滤器 自定义线程池自定义WebClient连接池开启日志错误处理最佳实践 示例异步请求同步请求上传文件重试过滤错误错误处理 参考 Spring WebClient 是 Spring WebFlux 项目…...

Python实现成语接龙

如图&#xff1a; 详细代码实现&#xff1a; # coding:utf-8 import string import pypinyin import sys import randomprint("初始化中&#xff0c;请稍等……")def main():f2 open(./idiom.txt, r, encodingutf-8)f f2.read() # 一次性读取完成new3_list f.sp…...

继续上一个爬虫,所以说selenium加browsermobproxy

继续&#xff0c;书接上回&#xff0c;这次我通过jsrpc&#xff0c;也学会了不少逆向的知识&#xff0c;感觉对于一般的网站应该都能应付了。当然我说的是简单的网站&#xff0c;遇到那些混淆的&#xff0c;还有那种猿人学里面的题目&#xff0c;还是免谈了。那种需要的水平太高…...

Sentinel服务熔断和流控

Sentinel服务熔断和流控 简介 Sentinel ​ 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的流量控制组件&#xff0c;主要以流量为切入点&#xff0c;从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来…...

01_TMS320F28004x系列MCU介绍和资料搜集

1. TI C2000 实时微控制器 TI公司在处理器方面的产品线有&#xff1a;基于ARM内核的微控制器/微处理器、MSP430微控制器、C2000系列实时微控制器、还有数字信号处理器&#xff08;DSP&#xff09;。 其中&#xff0c;C2000是TI公司专门针对实时控制推出的32位微控制器。TI公司…...

JavaScript中获取对象属性的不同方法

JavaScript中获取对象属性的不同方法 文章目录 JavaScript中获取对象属性的不同方法一、点记法二、方括号记法三、Object.keys()方法四、Object.values()方法五、Object.entries()方法六、Object.getOwnPropertyNames()方法七、Object.getOwnPropertyDescriptors()方法 JavaScr…...

【STM32教程】第四章 STM32的外部中断EXTI

案例代码及相关资料下载链接&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1hsIibEmsB91xFclJd-YTYA?pwdjauj 提取码&#xff1a;jauj 1 中断系统 1.1 中断的概念 中断系统的定义&#xff1a;中断是指在主程序运行过程中&#xff0c;出现了特定的中断触发条件…...

力扣第40天----第121题、第122题

# 力扣第40天----第121题、第122题 文章目录 一、第121题--买卖股票的最佳时机二、第122题--买卖股票的最佳时机II 一、第121题–买卖股票的最佳时机 ​ 分2种情况考虑&#xff0c;根据持有股票、不持有股票这2种情况&#xff0c;完成递推公式。另外&#xff0c;这里要求只买卖…...

Flask 使用 JWT(二)

在 Python 使用 JWT 主要的方案是 PyJWT 工具。 安装与基本使用 可以使用 pip 安装 PyJWT: $ pip install pyjwt编码与解码 编码函数 def encode( self, payload: Dict[str, Any], # payload 参数 key: str, …...

从0到1理解ChatGPT原理

目录 写在前面 1.Tansformer架构模型 2.ChatGPT原理 3.提示学习与大模型能力的涌现 3.1提示学习 3.2上下文学习 3.3思维链 4.行业参考建议 4.1拥抱变化 4.2定位清晰 4.3合规可控 4.4经验沉淀 机械工业出版社京东自购链接 写在前面 2022年11月30日&#xff0c;ChatG…...

如何解决 “Component cannot be used as a JSX component“

原因是react版本与types/react版本不一致导致的, 在tsconfig.json中加入以下代码,将依赖指向项目里的node_modules "paths": {"react": [ "./node_modules/types/react" ]}改完后代码大概是长这样的 {"compilerOptions": {..."…...

小程序自定义tabbar

前言 使用小程序默认的tabbar可以满足常规开发&#xff0c;但是满足不了个性化需求&#xff0c;如果想个性化开发就需要用到自定义tabbar,以下图为例子 一、在app.json配置 先按照以往默认的形式配置&#xff0c;如果中间的样式特殊则不需要配置 "tabBar": {&qu…...

分布式系统第五讲:分布式事务及实现方案

分布式系统第五讲&#xff1a;分布式事务及实现方案 事务是一个程序执行单元&#xff0c;里面的所有操作要么全部执行成功&#xff0c;要么全部执行失败。而分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。本…...

算法通关村17关 | 透析跳跃游戏

1. 跳跃游戏 题目 LeetCode55 给定一个非负整数数组&#xff0c;最初位于数组的第一个位置&#xff0c;数组中的每个元素代表你再该位置可以跳跃的最大长度&#xff0c;判断你是否能够达到最后一个位置。 思路 如果当前位置元素如果是3&#xff0c;我们无需考虑是跳几步&#…...

ARM接口编程—RTC(exynos 4412平台)

RTC简介 RTC(Real Time Clock)即实时时钟&#xff0c;它是一个可以为系统提供精确的时间基准的元器件&#xff0c;RTC一般采用精度较高的晶振作为时钟源&#xff0c;有些RTC为了在主电源掉电时还可以工作&#xff0c;需要外加电池供电。 RTC内部原理 RTC寄存器 RTC控制寄存器 …...

数据分享|WEKA信贷违约预测报告:用决策树、随机森林、支持向量机SVM、朴素贝叶斯、逻辑回归...

完整报告链接&#xff1a;http://tecdat.cn/?p28579 作者&#xff1a;Nuo Liu 数据变得越来越重要&#xff0c;其核心应用“预测”也成为互联网行业以及产业变革的重要力量。近年来网络 P2P借贷发展形势迅猛&#xff0c;一方面普通用户可以更加灵活、便快捷地获得中小额度的贷…...

逆市而行:如何在市场恐慌时保持冷静并抓住机会?

市场中的恐慌和波动是投资者所不可避免的。当市场出现恐慌情绪时&#xff0c;很多投资者会盲目跟从大众&#xff0c;导致决策出现错误。然而&#xff0c;聪明的投资者懂得在恐慌中保持冷静&#xff0c;并将其视为抓住机会的时机。本文将分享一些在市场恐慌时保持冷静并抓住机会…...

SpringBoot项目在Linux上启动、停止脚本

文章目录 SpringBoot项目在Linux上启动、停止脚本1. 在项目jar包同一目录&#xff0c;创建脚本xxx.sh【注: 和项目Jar同一目录】2. xxx.sh脚本内容&#xff0c;实际项目使用&#xff0c;只需修改jar包的名称&#xff1a;xxxxxx.jar3. 给xxx.sh赋予执行权限4. xxx.sh脚本的使用 …...

基于32位单片机的感应灯解决方案

感应灯是一种常见照明灯&#xff0c;提起感应灯&#xff0c;相信大家并不陌生&#xff0c; 它在一些公共场所、卫生间或者走廊等场所&#xff0c;使用的较为广泛&#xff0c;同时它使用起来也较为方便省电。“人来灯亮&#xff0c;人走灯灭”的特性&#xff0c;使他们在部分场景…...

机器学习——支持向量机(SVM)

机器学习——支持向量机&#xff08;SVM&#xff09; 文章目录 前言一、SVM算法原理1.1. SVM介绍1.2. 核函数&#xff08;Kernel&#xff09;介绍1.3. 算法和核函数的选择1.4. 算法步骤1.5. 分类和回归的选择 二、代码实现&#xff08;SVM&#xff09;1. SVR&#xff08;回归&a…...

HTTP协议初识·下篇

介绍 承接上篇&#xff1a;HTTP协议初识中篇_清风玉骨的博客-CSDN博客 本篇内容&#xff1a; 长链接 网络病毒 cookie使用&session介绍 基本工具介绍 postman 模拟客户端请求 fiddler 本地抓包的软件 https介绍 https协议原理 为什么加密 怎么加密 CA证书介绍 数字签名介绍…...

c++ 类的实例化顺序

其他类对象有作为本类成员&#xff0c;先构造类中的其他类对象&#xff0c; 释放先执行本对象的析构函数再执行包含的类对象的析构函数 #include <iostream> #include <string.h> using namespace std;class Phone { public:Phone(string name):m_PName(name){…...

Vue自动生成二维码并可下载二维码

遇到一个需求&#xff0c;需要前端自行生成用户的个人名片分享二维码&#xff0c;并提供二维码下载功能。在网上找到很多解决方案&#xff0c;最终吭哧吭哧做完了&#xff0c;把它整理记录一下&#xff0c;方便后续学习使用&#xff01;嘿嘿O(∩_∩)O~ 这个小东西有以下功能特点…...

应该下那个 ActiveMQ

最近在搞 ActiveMQ 的时候&#xff0c;发现有 2 个 ActiveMQ 可以下载。 应该下那个呢&#xff1f; JMS 即Java Message Service&#xff0c;是JavaEE的消息服务接口。 JMS主要有两个版本&#xff1a;1.1和2.0。 2.0和1.1相比&#xff0c;主要是简化了收发消息的代码。 所谓…...

【C语言】指针详解(3)

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解指针(2)&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 一.函数指针数组二.指向函数指针数组的指针&#xff08;不重要&#xff09;三.回调函数 一.函…...

告别HR管理繁琐,免费低代码平台来帮忙

编者按&#xff1a;本文着重介绍了使用免费且高效的低代码平台实现的HR管理系统在一般日常人力资源管理工作中的关键作用。 关键词&#xff1a;低代码平台、HR管理系统 1.HR管理系统有什么作用&#xff1f; HR管理系统作为一款数字化工具&#xff0c;可为企业提供全方位的人力资…...

Java开发面试--Redis专区

1、 什么是Redis&#xff1f;它的主要特点是什么&#xff1f; 答&#xff1a; Redis是一个开源的、基于内存的高性能键值对存储系统。它主要用于缓存、数据存储和消息队列等场景。 高性能&#xff1a;Redis将数据存储在内存中&#xff0c;并采用单线程的方式处理请求&#xf…...

Ansible-roles学习

目录 一.roles角色介绍二.示例一.安装httpd服务 一.roles角色介绍 roles能够根据层次型结构自动装载变量文件&#xff0c;tasks以及handlers登。要使用roles只需在playbook中使用include指令即可。roles就是通过分别将变量&#xff0c;文件&#xff0c;任务&#xff0c;模块以…...

python3如何安装各类库的小总结

我的python3的安装路径是&#xff1a; C:\Users\Administrator\AppData\Local\Programs\Python\Python38 C:\Users\Administrator\AppData\Local\Programs\Python\Python38\python3.exeC:\Users\Administrator\AppData\Local\Programs\Python\Python38\Scripts C:\Users\Admin…...

ffmpeg 特效 转场 放大缩小

案例 ffmpeg \ -i input.mp4 \ -i image1.png \ -i image2.png \ -filter_complex \ [1:v]scale100:100[img1]; \ [2:v]scale1280:720[img2]; \ [0:v][img1]overlay(main_w-overlay_w)/2:(main_h-overlay_h)/2[bkg];\ [bkg][img2]overlay0:0 \ -y output.mp4 -i input.mp4//这…...

可以做淘宝联盟的免费网站/最新腾讯新闻

前言众所周知Angular响应式表单相比较模板驱动表单更大操作性、更易测试性。因此&#xff0c;我更推荐这类表单创造方式。当一个用于修改用户信息的表单&#xff0c;数据的来源总是来自远程&#xff1b;而对于一个 FormGroup 的创建总在 ngOnInit 中完成。因此&#xff0c;这里…...

网站建设项目报价/今日特大军事新闻

推荐资料 https://www.cnblogs.com/xiaokang01/p/9865724.html socket传输文件 思路:# 先将报头转换成字符串(json.dumps), 再将字符串的长度打包# 发送报头长度,发送报头内容,最后放真是内容# 报头内容包括文件名,文件信息,报头# 接收时:先接收4个字节的报头长度,# …...

姜堰网站制作/软文文章

接上次的内容&#xff0c;我们在用java调用外部exe&#xff0c;有时会发生exe一闪而过&#xff0c;或者长时间的进程没有内存&#xff0c;cpu变化&#xff0c;看上去好像阻塞了一样&#xff0c;这是因为再调用process Runtime.getRuntime().exec("cmd /k dir",null,…...

专业做ppt的网站/一元友情链接平台

前端是最贴近用户的程序员&#xff0c;比后端、数据库、产品经理、运营、安全都近 实现界面交互 提升用户体验 基于NodeJS&#xff0c;可跨平台开发 前端是最贴近用户的程序员&#xff0c;前端的能力就是能让产品从 90分进化到 100 分&#xff0c;甚至更好&#xff0c; 与团队…...

重庆北京网站建设/sem是什么职位

欢迎关注”生信修炼手册”!影响基因型填充效果的因素有很多&#xff0c;比如填充软件的选择&#xff0c;reference panel的选择&#xff0c;样本个数&#xff0c;SNP的密度或者测序深度等等因素。目前基因型填充的软件有很多种&#xff0c;每个软件各有优劣&#xff0c;如何选择…...

网站qq访客统计/搜索引擎营销优缺点

本课的核心内容&#xff1a; info 和 thread 命令next、step、util、finish、return 和 jump 命令 info 和 thread 命令 在前面使用 info break 命令查看当前断点时介绍过&#xff0c;info 命令是一个复合指令&#xff0c;还可以用来查看当前进程的所有线程运行情况。下面以 …...