从零开发短视频电商 使用Spring WebClient发起远程Http调用
文章目录
- 依赖
- 使用
- 创建WebClient实例
- 创建带有超时的WebClient实例
- 示例
- 请求准备
- 获取响应
- 高级
- 过滤器
- 自定义过滤器
- 自定义线程池
- 自定义WebClient连接池
- 开启日志
- 错误处理
- 最佳实践
- 示例
- 异步请求
- 同步请求
- 上传文件
- 重试
- 过滤错误
- 错误处理
- 参考
Spring WebClient 是 Spring WebFlux 项目中 Spring 5 中引入的异步、反应式 HTTP 客户端,用于替换旧的 RestTemplate,以便在使用 Spring Boot 框架构建的应用程序中进行 REST API 调用。
它支持同步、异步和流式场景。
它是一种基于 HTTP/1.1 协议的反应式、非阻塞解决方案。
依赖
为了使用WebClient
,我们需要添加对 Spring WebFlux 启动器模块的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>
使用
创建WebClient实例
WebClient client = WebClient.create("http://localhost:8080");WebClient client = WebClient.builder().baseUrl("http://localhost:8080") // 基本 URL.defaultCookie("cookieKey", "cookieValue") // 定义默认cookie.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) // 定义默认header.defaultUriVariables(Collections.singletonMap("url", "http://localhost:8080")) // 定义默认 quringstring参数.build();WebClient.builder().baseUrl(host).exchangeStrategies(ExchangeStrategies.builder().codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(500 * 1024))// 编解码器内存中数据的缓冲,默认值为 262,144 字节.build()).build();
创建带有超时的WebClient实例
默认的 30 秒超时
- 通过 ChannelOption.CONNECT_TIMEOUT_MILLIS 选项设置连接超时时间
- 使用 ReadTimeoutHandler 和 WriteTimeoutHandler 分别设置读取和写入超时时间
- 使用 responseTimeout 指令配置响应超时时间
HttpClient httpClient = HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时时间.responseTimeout(Duration.ofMillis(5000)) // 响应超时时间.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS)) // 读超时.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))); // 写超时WebClient client = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
示例
// 配置资源工厂,以控制连接池、线程池和其他资源的创建和管理
ReactorResourceFactory factory = new ReactorResourceFactory();
// 设置是否使用全局资源。设置为 false将创建独立的资源。默认情况下,为 true,表示使用全局资源。
factory.setUseGlobalResources(false);
// 配置连接池的提供者,控制连接的创建、分配和回收。 创建名为"httpClient"的连接池,最大连接数为 50 默认是 max(cpu,8)*2 。
factory.setConnectionProvider(ConnectionProvider.create("httpClient", 50));
// 用于配置事件循环资源,它管理底层事件循环线程。创建一个名为 "httpClient" 的事件循环资源,最大线程数为 50,而第三个参数 线程是否在 JVM 关闭时释放 max(cpu,4)
factory.setLoopResources(LoopResources.create("httpClient", 50, true));
WebClient.builder().baseUrl("")// 用于配置请求和响应的处理策略 通常用于配置序列化和反序列化。.exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)).build())// 用于配置编解码器。.codecs(clientCodecConfigurer -> {clientCodecConfigurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024); // 设置最大内存大小}).clientConnector(new ReactorClientHttpConnector(factory, client -> client// 设置连接建立的超时时间,单位为毫秒。.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)// 启用或禁用 HTTP 响应的压缩。默认情况下,压缩是禁用的。.compress(true)// 启用或禁用 "wiretap",这将允许你记录请求和响应的详细信息,用于调试和监控。.wiretap(true).responseTimeout(Duration.ofMillis(5000)) // 响应超时时间.doOnConnected(connection -> {// 添加读超时处理器,单位为毫秒connection.addHandlerLast(new ReadTimeoutHandler(10));// 添加写超时处理器,单位为毫秒connection.addHandlerLast(new WriteTimeoutHandler(8));})// 启用跟随重定向,默认情况下启用.followRedirect(true))).filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {// 在请求发出之前记录请求信息System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());return Mono.just(clientRequest);}).andThen(ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {// 在响应接收后记录响应信息System.out.println("Response: " + clientResponse.statusCode());return Mono.just(clientResponse);}))).build();
请求准备
// 1.方法
UriSpec<RequestBodySpec> uriSpec = client.method(HttpMethod.POST);
UriSpec<RequestBodySpec> uriSpec = client.post();
// 2.uri
RequestBodySpec bodySpec = uriSpec.uri("/resource");
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder -> uriBuilder.pathSegment("/resource").build());
// "/products/2"
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/{id}").build(2))
// "/products/2/attributes/13"
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/{id}/attributes/{attributeId}").build(2, 13))
// "/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019"
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/").queryParam("name", "AndroidPhone").queryParam("color", "black").queryParam("deliveryDate", "13/04/2019").build())
// "/products/?name=AndroidPhone&color=black&deliveryDate=13%2F04%2F2019" 这种'/'符被转义了
RequestBodySpec bodySpec = uriSpec.uri(uriBuilder - > uriBuilder.path("/products/").queryParam("name", "{title}").queryParam("color", "{authorId}").queryParam("deliveryDate", "{date}").build("AndroidPhone", "black", "13/04/2019"))
// 数组参数 "/products/?category=Phones&category=Tablets"
webClient.get().uri(uriBuilder - > uriBuilder.path("/products/").queryParam("category", "Phones", "Tablets").build())
// 数组参数 "/products/?category=Phones,Tablets"
webClient.get().uri(uriBuilder - > uriBuilder.path("/products/").queryParam("category", String.join(",", "Phones", "Tablets")).build()) // 3.内容
RequestHeadersSpec<?> headersSpec = bodySpec.bodyValue("data");
RequestHeadersSpec<?> headersSpec = bodySpec.body(Mono.just(new Foo("name")), Foo.class);
RequestHeadersSpec<?> headersSpec = bodySpec.body(BodyInserters.fromValue("data"));
LinkedMultiValueMap map = new LinkedMultiValueMap();
map.add("key1", "value1");
map.add("key2", "value2");
RequestHeadersSpec<?> headersSpec = bodySpec.body(BodyInserters.fromMultipartData(map));
// 4.header标头
ResponseSpec responseSpec = headersSpec.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML).acceptCharset(StandardCharsets.UTF_8).ifNoneMatch("*").ifModifiedSince(ZonedDateTime.now()).retrieve();
获取响应
发送请求并接收响应。我们可以通过使用exchangeToMono/exchangeToFlux或retrieve方法来实现。
// ExchangeToMono和ExchangeToFlux方法允许访问ClientResponse及其状态和标头
Mono<String> response = headersSpec.exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.OK)) {return response.bodyToMono(String.class);} else if (response.statusCode().is4xxClientError()) {return Mono.just("Error response");} else {return response.createException().flatMap(Mono::error);}
});
// 而retrieve方法是直接获取body
Mono<String> response = headersSpec.retrieve().bodyToMono(String.class);
需要注意的是 ResponseSpec.bodyToMono 方法,如果状态码为4xx(客户端错误)或5xx(服务器错误),它将抛出一个 WebClientException。
单个资源
Mono<Employee> employeeMono = client.get().uri("/employees/{id}", "1").retrieve().bodyToMono(Employee.class);employeeMono.subscribe(System.out::println);
多个资源
Flux<Employee> employeeFlux = client.get().uri("/employees").retrieve().bodyToFlux(Employee.class);employeeFlux.subscribe(System.out::println);
高级
过滤器
过滤器可以拦截、检查和修改客户端请求(或响应)。过滤器非常适合为每个请求添加功能,因为逻辑保留在一个位置。用例包括监视、修改、记录和验证客户端请求。
一个请求具有一个有序链,包含零个或多个过滤器。
在Spring Reactive中,过滤器是 ExchangeFilterFunction 的实例。过滤器函数有两个参数:要修改的 ClientRequest 和下一个 ExchangeFilterFunction。
通常,过滤器函数通过调用过滤器链中的下一个函数来返回:
ExchangeFilterFunction filterFunction = (clientRequest, nextFilter) -> {LOG.info("WebClient fitler executed");return nextFilter.exchange(clientRequest);
};// 添加过滤器
WebClient webClient = WebClient.builder().filter(filterFunction).build();WebClient.builder().filter((request, next) -> { //过滤器,3次重试,header打印log.info(String.format("请求地址: %s", request.url()));log.info(String.format("请求头信息: %s", request.headers()));Mono<ClientResponse> exchange = next.exchange(request).retry(3);ClientResponse clientResponse = exchange.block();log.info(String.format("响应头信息: %s", clientResponse.headers().asHttpHeaders()));return exchange;}).clientConnector(connector).build();
自定义过滤器
让我们从一个对客户端发送的 HTTP GET 请求进行计数的过滤器开始。
过滤器检查请求方法并在 GET 请求的情况下增加“全局”计数器:
ExchangeFilterFunction countingFunction = (clientRequest, nextFilter) -> {HttpMethod httpMethod = clientRequest.method();if (httpMethod == HttpMethod.GET) {getCounter.incrementAndGet();}return nextFilter.exchange(clientRequest);
};
我们将定义的第二个过滤器将版本号附加到请求 URL 路径。我们利用ClientRequest.from()方法从当前请求对象创建一个新的请求对象并设置修改后的 URL。
随后,我们继续使用新修改的请求对象执行过滤器链:
ExchangeFilterFunction urlModifyingFilter = (clientRequest, nextFilter) -> {String oldUrl = clientRequest.url().toString();URI newUrl = URI.create(oldUrl + "/" + version);ClientRequest filteredRequest = ClientRequest.from(clientRequest).url(newUrl).build();return nextFilter.exchange(filteredRequest);
};
自定义线程池
WebClient 默认使用 Project Reactor 提供的线程池来执行异步操作。但是,你可以根据应用程序的需求进行自定义线程池配置。以下是一些相关原理和最佳实践:
- 调度器(Schedulers):WebClient 使用调度器来管理线程,例如
Schedulers.elastic()
用于 CPU 密集型操作,Schedulers.parallel()
用于 I/O 密集型操作。你可以通过publishOn
和subscribeOn
方法来切换调度器。
webClient.get().uri("/todos/1").retrieve().bodyToMono(Todo.class).subscribeOn(Schedulers.elastic()) // 切换订阅线程.publishOn(Schedulers.parallel()) // 切换发布线程.subscribe(result -> {// 处理响应});
- 自定义线程池:如果需要更精细的线程控制,你可以创建自定义的线程池,并在调度器中使用它。这对于控制并发度和资源管理非常有用。
ExecutorService customExecutorService = Executors.newFixedThreadPool(10);webClient.get().uri("/todos/1").retrieve().bodyToMono(Todo.class).subscribeOn(Schedulers.fromExecutor(customExecutorService)).subscribe(result -> {// 处理响应});
自定义WebClient连接池
WebClient 使用 Reactor Netty 作为底层的 HTTP 客户端,它管理着连接池。连接池是一组可重用的连接,以提高性能和资源利用率。以下是有关连接池的原理和最佳实践:
- 连接池大小:连接池的大小可以通过 WebClient 的配置进行设置。默认情况下,它会根据应用程序的需求动态分配连接。你可以使用
HttpClient
的maxConnections
方法来设置最大连接数。
HttpClient httpClient = HttpClient.create().maxConnections(50); // 设置最大连接数为 50WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).baseUrl("https://jsonplaceholder.typicode.com").build();
- 连接超时:你可以配置连接超时和读写超时,以确保请求不会永远等待。这对于防止应用程序被慢速或不响应的服务挂起非常重要。
HttpClient httpClient = HttpClient.create().responseTimeout(Duration.ofSeconds(10)) // 设置响应超时时间为 10 秒.doOnConnected(connection -> {connection.addHandlerLast(new ReadTimeoutHandler(10)); // 设置读取超时connection.addHandlerLast(new WriteTimeoutHandler(10)); // 设置写入超时});WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).baseUrl("https://jsonplaceholder.typicode.com").build();
- 默认情况下,WebClient 会自动管理连接池,无需手动配置。
- 通常情况下,你不需要担心连接池的具体细节,因为它会在后台自动处理连接的创建、重用和关闭。
- 如果你需要更精细的控制,可以考虑配置连接池的大小和超时设置。
开启日志
在开发和调试期间,启用详细的 WebClient 日志记录可以帮助你识别问题。你可以使用 Spring Boot 的日志配置来启用 WebClient 的日志记录。在 application.properties
或 application.yml
中添加以下配置:
logging.level.org.springframework.web.reactive.function.client=DEBUG
logging.level.reactor.netty.http.client=DEBUG
logging.level.reactor.netty.tcp.client=DEBUG
这将为 WebClient 的 HTTP 请求和响应生成详细的日志信息,包括请求头、响应头和响应体。
WebClient 提供了一种简单的方法来记录请求和响应的日志,以便进行调试和监控。你可以通过添加过滤器来实现日志记录。以下是一个示例:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient.Builder webClientBuilder() {return WebClient.builder().filter(logRequest()).filter(logResponse());}private ExchangeFilterFunction logRequest() {return ExchangeFilterFunction.ofRequestProcessor(request -> {// 记录请求日志System.out.println("Request: " + request.method() + " " + request.url());return Mono.just(request);});}private ExchangeFilterFunction logResponse() {return ExchangeFilterFunction.ofResponseProcessor(response -> {// 记录响应日志System.out.println("Response: " + response.statusCode());return Mono.just(response);});}// ... 省略其他配置
}
在上面的示例中,我们定义了两个过滤器 logRequest
和 logResponse
,分别用于记录请求和响应的日志。你可以根据需要将日志输出到日志文件或其他监控工具。
错误处理
WebClient 提供了多种处理错误的方式。在上面的示例中,我们使用了 onStatus
方法来处理特定的 HTTP 状态码。这里有一些更多的错误处理策略和最佳实践:
onStatus
方法:onStatus
方法允许你根据 HTTP 响应的状态码来处理错误。你可以根据需要定义不同的处理逻辑,例如重试、返回默认值或引发自定义异常。onErrorResume
方法:使用onErrorResume
方法,你可以在出现错误时返回一个备用的 Mono。这可以用于从缓存中获取数据或返回默认值。- 全局错误处理器:你可以注册一个全局错误处理器来处理 WebClient 的全局错误,例如连接失败、超时等。通过
ExchangeStrategies
可以定义一个全局错误处理器。
@Bean
public WebClient.Builder webClientBuilder() {return WebClient.builder().exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)).build()).clientConnector(new ReactorClientHttpConnector(HttpClient.newConnection().compress(true).resolver(DefaultAddressResolverGroup.INSTANCE))).baseUrl("https://jsonplaceholder.typicode.com").filter((request, next) -> next.exchange(request).doOnError(throwable -> {// 全局错误处理逻辑}));
}
WebClient 提供了丰富的错误处理机制,可以通过 onStatus
和其他方法来捕获和处理不同类型的错误。例如,onStatus
方法用于根据 HTTP 响应的状态码来处理错误。你可以在 onStatus
方法中返回一个 Mono.error
,将错误包装成异常并传播。
最佳实践
- 使用
onStatus
处理不同的 HTTP 状态码。这可以让你根据状态码执行不同的错误处理逻辑。 - 使用
onErrorResume
来提供备用值或执行备用操作,以确保即使出现错误,也能返回有意义的响应。 - 使用
onErrorReturn
来在发生错误时返回一个默认值。
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> {// 处理 4xx 错误,或返回备用值return Mono.error(new CustomException("Client error: " + response.statusCode()));
})
.onStatus(HttpStatus::is5xxServerError, response -> {// 处理 5xx 错误,或返回备用值return Mono.error(new CustomException("Server error: " + response.statusCode()));
})
.bodyToMono(Todo.class)
.onErrorResume(CustomClientException.class, ex -> {// 处理自定义客户端异常return Mono.just(createDefaultTodo()); // 返回一个默认值
})
.onErrorResume(CustomServerException.class, ex -> {// 处理自定义服务器异常return Mono.error(new CustomFallbackException("Fallback error: " + ex.getMessage()));
})
.onErrorResume(Exception.class, error -> {// 处理其他类型的异常,或返回备用值return Mono.just(new DefaultResponse());
})
.onErrorReturn(CustomClientException.class, createDefaultTodo()) // 在客户端错误时返回默认值
.onErrorReturn(CustomServerException.class, createDefaultTodo()) // 在服务器错误时返回默认值
.doOnError(error -> {// 在发生错误时执行的操作System.err.println("Error occurred: " + error.getMessage());})
示例
异步请求
client.get().uri(URLConstants.URL).header(URLConstants.API_KEY_NAME, URLConstants.API_KEY_VALUE).retrieve().bodyToMono(String.class).subscribe(result->System.out.println(result));// 创建 WebClient 实例WebClient webClient = webClientBuilder().baseUrl("https://jsonplaceholder.typicode.com").build();// 执行 GET 请求Mono<ResponseEntity<String>> responseMono = webClient.get().uri("/posts/1").retrieve().toEntity(String.class);// 订阅响应并处理结果responseMono.subscribe(responseEntity -> {if (responseEntity.getStatusCode().is2xxSuccessful()) {System.out.println("Response Body: " + responseEntity.getBody());} else {System.err.println("Request failed with status code: " + responseEntity.getStatusCode());}});
以非阻塞的方式订阅subscribe()
,该方法返回Mono
的包装器。
同步请求
虽然Spring WebClient是异步的,但是我们仍然可以通过调用阻塞线程直到执行结束的方法block()
来进行同步调用。方法执行后我们得到结果。
String block = webClient.get().uri("https://jsonplaceholder.typicode.com/t3odos/1").retrieve().onStatus(HttpStatus::is4xxClientError, response -> {// 处理 4xx 错误,或返回备用值return Mono.error(new RuntimeException("Client error: " + response.statusCode()));}).onStatus(HttpStatus::is5xxServerError, response -> {// 处理 5xx 错误,或返回备用值return Mono.error(new RuntimeException("Server error: " + response.statusCode()));}).bodyToMono(String.class).onErrorResume(RuntimeException.class, ex -> {// 处理自定义异常 // 返回一个默认值return Mono.just("createDefaultTodo()");}).doOnError(error -> {// 在发生错误时执行的操作System.out.println("Error occurred: " + error.getMessage());}).block(Duration.ofSeconds(10));System.out.println(block);String result = client.post().uri("https://reqbin.com/echo/post/json").body(BodyInserters.fromValue(prepareRequest())).exchange().flatMap(response -> response.bodyToMono(String.class)).block();System.out.println("result::" + result);private String prepareRequest() {var values = new HashMap<String, String>() {{put("Id", "12345");put("Customer", "Roger Moose");put("Quantity", "3");put("Price", "167.35");}};var objectMapper = new ObjectMapper();String requestBody;try {requestBody = objectMapper.writeValueAsString(values);} catch (JsonProcessingException e) {e.printStackTrace();return null;}return requestBody;}}
创建了一个 JSON 字符串prepareRequest()
,然后将该字符串作为请求正文发送到 HTTPPOST
方法中。
exchange()
与之前使用的retrieve()
方法相比,该方法通过提供对来自 HTTP 客户端的响应的访问来提供更多控制。
上传文件
Mono<HttpStatus> httpStatusMono = webClient.post().uri(url).contentType(MediaType.APPLICATION_PDF).body(BodyInserters.fromResource(resource)).exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.OK)) {return response.bodyToMono(HttpStatus.class).thenReturn(response.statusCode());} else {throw new ServiceException("Error uploading file");}});
// 从多部分资源上传文件
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("file", multipartFile.getResource());Mono<HttpStatus> httpStatusMono = webClient.post().uri(url).contentType(MediaType.MULTIPART_FORM_DATA).body(BodyInserters.fromMultipartData(builder.build())).exchangeToMono(response -> {if (response.statusCode().equals(HttpStatus.OK)) {return response.bodyToMono(HttpStatus.class).thenReturn(response.statusCode());} else {throw new ServiceException("Error uploading file");}});
重试
// 使用retry方法
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class).retry(3); // 无论 Web 客户端返回什么错误,这都会重试最多 3 次。
}
// 使用retryWhen方法的可配置策略
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class).retryWhen(Retry.max(3));
}
// 固定延迟重试
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class)// 尝试之间有两秒的延迟,这可能会增加成功的机会.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)));
}
// 不是按固定间隔重试
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().bodyToMono(String.class)// 会逐渐增加尝试之间的延迟- 大约为 2 秒、4 秒,然后是 8 秒.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)));
}
// 抖动重试 为计算的延迟间隔增加了随机性
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(String.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));
}
过滤错误
服务中的任何错误都将导致重试尝试,包括 4xx 错误,例如400:Bad Request或401:Unauthorized。
显然,我们不应该重试此类客户端错误,因为服务器响应不会有任何不同。因此,让我们看看如何仅在出现特定错误的情况下应用重试策略。
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve()// 当是5xx 错误的异常,返回我们自定义的异常.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode()))).bodyToMono(String.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(5))// 仅在抛出ServiceException时重试.filter(throwable -> throwable instanceof ServiceException));
}
所有重试尝试均不成功的时候。在这种情况下,该策略的默认行为是传播 RetryExhaustedException ,包装最后一个错误。
public Mono<String> getData(String stockId) {return webClient.get().uri(PATH_BY_ID, stockId).retrieve().onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode()))).bodyToMono(String.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(5)).filter(throwable -> throwable instanceof ServiceException)// 一系列失败的重试结束后,请求将失败并出现 ServiceException .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {throw new ServiceException("External Service failed to process after max retries", HttpStatus.SERVICE_UNAVAILABLE.value());}));
}
错误处理
使用onStatus
onStatus是一种内置机制,可用于处理WebClient响应。这使我们能够根据特定响应(例如 400、500、503 等)或状态类别(例如 4XX 和 5XX 等)应用细粒度的功能:
WebClient.builder().build().post().uri("/some-resource").retrieve().onStatus(HttpStatus.INTERNAL_SERVER_ERROR::equals,response -> response.bodyToMono(String.class).map(Exception::new))
onStatus方法需要两个参数。第一个是接收状态代码的谓词。第二个参数的执行基于第一个参数的输出。第二个是将响应映射到Mono或异常的函数。
示例为,如果我们看到 INTERNAL_SERVER_ERROR (即 500),我们将使用bodyToMono获取主体,然后将其映射到新的Exception。
我们可以链接onStatus调用,以便能够为不同的状态条件提供功能:
Mono<String> response = WebClient.builder().build().post().uri("some-resource").retrieve().onStatus( HttpStatus.INTERNAL_SERVER_ERROR::equals,response -> response.bodyToMono(String.class).map(CustomServerErrorException::new)) .onStatus(HttpStatus.BAD_REQUEST::equals,response -> response.bodyToMono(String.class).map(CustomBadRequestException::new))... .bodyToMono(String.class);// do something with responsewebClient.get().uri("/todos/{id}", id).retrieve().onStatus(HttpStatus::is4xxClientError, response -> {// 处理 4xx 错误return Mono.error(new CustomException("Client error: " + response.statusCode()));}).onStatus(HttpStatus::is5xxServerError, response -> {// 处理 5xx 错误return Mono.error(new CustomException("Server error: " + response.statusCode()));}).bodyToMono(Todo.class);
现在onStatus调用映射到我们的自定义异常。我们为这两种错误状态分别定义了异常类型。onStatus方法允许我们使用我们选择的任何类型。
使用ExchangeFilterFunction
ExchangeFilterFunction是处理特定状态代码和获取响应正文的另一种方法 。与onStatus不同,交换过滤器非常灵活,适用于基于任何布尔表达式的过滤器功能。
我们可以受益于ExchangeFilterFunction的灵活性,涵盖与onStatus函数相同的类别。
处理返回的逻辑:
private static Mono<ClientResponse> exchangeFilterResponseProcessor(ClientResponse response) {HttpStatus status = response.statusCode();if (HttpStatus.INTERNAL_SERVER_ERROR.equals(status)) {return response.bodyToMono(String.class).flatMap(body -> Mono.error(new CustomServerErrorException(body)));}if (HttpStatus.BAD_REQUEST.equals(status)) {return response.bodyToMono(String.class).flatMap(body -> Mono.error(new CustomBadRequestException(body)));}return Mono.just(response);
}
接下来,我们将定义过滤器并使用对处理程序的方法引用:
ExchangeFilterFunction errorResponseFilter = ExchangeFilterFunction.ofResponseProcessor(WebClientStatusCodeHandler::exchangeFilterResponseProcessor);
与onStatus调用类似,我们在错误时映射到异常类型。但是,使用Mono.error会将此异常包装在ReactiveException 中。处理错误时应牢记这种嵌套。
现在我们将其应用于WebClient的实例,以达到与onStatus链式调用相同的效果:
Mono<String> response = WebClient.builder().filter(errorResponseFilter).build().post().uri("some-resource").retrieve().bodyToMono(String.class);// do something with response
参考
- https://reflectoring.io/comparison-of-java-http-clients/
- https://reflectoring.io/spring-webclient/
- https://docs.flydean.com/spring-framework-documentation5/webreactive/2.webclient
- https://blog.hanqunfeng.com/2020/04/18/http-utils/#WebClientUtil
相关文章:
从零开发短视频电商 使用Spring WebClient发起远程Http调用
文章目录 依赖使用创建WebClient实例创建带有超时的WebClient实例示例 请求准备获取响应 高级过滤器自定义过滤器 自定义线程池自定义WebClient连接池开启日志错误处理最佳实践 示例异步请求同步请求上传文件重试过滤错误错误处理 参考 Spring WebClient 是 Spring WebFlux 项目…...

Python实现成语接龙
如图: 详细代码实现: # coding:utf-8 import string import pypinyin import sys import randomprint("初始化中,请稍等……")def main():f2 open(./idiom.txt, r, encodingutf-8)f f2.read() # 一次性读取完成new3_list f.sp…...

继续上一个爬虫,所以说selenium加browsermobproxy
继续,书接上回,这次我通过jsrpc,也学会了不少逆向的知识,感觉对于一般的网站应该都能应付了。当然我说的是简单的网站,遇到那些混淆的,还有那种猿人学里面的题目,还是免谈了。那种需要的水平太高…...

Sentinel服务熔断和流控
Sentinel服务熔断和流控 简介 Sentinel 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来…...

01_TMS320F28004x系列MCU介绍和资料搜集
1. TI C2000 实时微控制器 TI公司在处理器方面的产品线有:基于ARM内核的微控制器/微处理器、MSP430微控制器、C2000系列实时微控制器、还有数字信号处理器(DSP)。 其中,C2000是TI公司专门针对实时控制推出的32位微控制器。TI公司…...
JavaScript中获取对象属性的不同方法
JavaScript中获取对象属性的不同方法 文章目录 JavaScript中获取对象属性的不同方法一、点记法二、方括号记法三、Object.keys()方法四、Object.values()方法五、Object.entries()方法六、Object.getOwnPropertyNames()方法七、Object.getOwnPropertyDescriptors()方法 JavaScr…...

【STM32教程】第四章 STM32的外部中断EXTI
案例代码及相关资料下载链接: 链接:https://pan.baidu.com/s/1hsIibEmsB91xFclJd-YTYA?pwdjauj 提取码:jauj 1 中断系统 1.1 中断的概念 中断系统的定义:中断是指在主程序运行过程中,出现了特定的中断触发条件…...
力扣第40天----第121题、第122题
# 力扣第40天----第121题、第122题 文章目录 一、第121题--买卖股票的最佳时机二、第122题--买卖股票的最佳时机II 一、第121题–买卖股票的最佳时机 分2种情况考虑,根据持有股票、不持有股票这2种情况,完成递推公式。另外,这里要求只买卖…...
Flask 使用 JWT(二)
在 Python 使用 JWT 主要的方案是 PyJWT 工具。 安装与基本使用 可以使用 pip 安装 PyJWT: $ pip install pyjwt编码与解码 编码函数 def encode( self, payload: Dict[str, Any], # payload 参数 key: str, …...

从0到1理解ChatGPT原理
目录 写在前面 1.Tansformer架构模型 2.ChatGPT原理 3.提示学习与大模型能力的涌现 3.1提示学习 3.2上下文学习 3.3思维链 4.行业参考建议 4.1拥抱变化 4.2定位清晰 4.3合规可控 4.4经验沉淀 机械工业出版社京东自购链接 写在前面 2022年11月30日,ChatG…...
如何解决 “Component cannot be used as a JSX component“
原因是react版本与types/react版本不一致导致的, 在tsconfig.json中加入以下代码,将依赖指向项目里的node_modules "paths": {"react": [ "./node_modules/types/react" ]}改完后代码大概是长这样的 {"compilerOptions": {..."…...

小程序自定义tabbar
前言 使用小程序默认的tabbar可以满足常规开发,但是满足不了个性化需求,如果想个性化开发就需要用到自定义tabbar,以下图为例子 一、在app.json配置 先按照以往默认的形式配置,如果中间的样式特殊则不需要配置 "tabBar": {&qu…...

分布式系统第五讲:分布式事务及实现方案
分布式系统第五讲:分布式事务及实现方案 事务是一个程序执行单元,里面的所有操作要么全部执行成功,要么全部执行失败。而分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。本…...

算法通关村17关 | 透析跳跃游戏
1. 跳跃游戏 题目 LeetCode55 给定一个非负整数数组,最初位于数组的第一个位置,数组中的每个元素代表你再该位置可以跳跃的最大长度,判断你是否能够达到最后一个位置。 思路 如果当前位置元素如果是3,我们无需考虑是跳几步&#…...

ARM接口编程—RTC(exynos 4412平台)
RTC简介 RTC(Real Time Clock)即实时时钟,它是一个可以为系统提供精确的时间基准的元器件,RTC一般采用精度较高的晶振作为时钟源,有些RTC为了在主电源掉电时还可以工作,需要外加电池供电。 RTC内部原理 RTC寄存器 RTC控制寄存器 …...

数据分享|WEKA信贷违约预测报告:用决策树、随机森林、支持向量机SVM、朴素贝叶斯、逻辑回归...
完整报告链接:http://tecdat.cn/?p28579 作者:Nuo Liu 数据变得越来越重要,其核心应用“预测”也成为互联网行业以及产业变革的重要力量。近年来网络 P2P借贷发展形势迅猛,一方面普通用户可以更加灵活、便快捷地获得中小额度的贷…...
逆市而行:如何在市场恐慌时保持冷静并抓住机会?
市场中的恐慌和波动是投资者所不可避免的。当市场出现恐慌情绪时,很多投资者会盲目跟从大众,导致决策出现错误。然而,聪明的投资者懂得在恐慌中保持冷静,并将其视为抓住机会的时机。本文将分享一些在市场恐慌时保持冷静并抓住机会…...
SpringBoot项目在Linux上启动、停止脚本
文章目录 SpringBoot项目在Linux上启动、停止脚本1. 在项目jar包同一目录,创建脚本xxx.sh【注: 和项目Jar同一目录】2. xxx.sh脚本内容,实际项目使用,只需修改jar包的名称:xxxxxx.jar3. 给xxx.sh赋予执行权限4. xxx.sh脚本的使用 …...

基于32位单片机的感应灯解决方案
感应灯是一种常见照明灯,提起感应灯,相信大家并不陌生, 它在一些公共场所、卫生间或者走廊等场所,使用的较为广泛,同时它使用起来也较为方便省电。“人来灯亮,人走灯灭”的特性,使他们在部分场景…...

机器学习——支持向量机(SVM)
机器学习——支持向量机(SVM) 文章目录 前言一、SVM算法原理1.1. SVM介绍1.2. 核函数(Kernel)介绍1.3. 算法和核函数的选择1.4. 算法步骤1.5. 分类和回归的选择 二、代码实现(SVM)1. SVR(回归&a…...

python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序
一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...
PostgreSQL——环境搭建
一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在࿰…...

永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...

【LeetCode】算法详解#6 ---除自身以外数组的乘积
1.题目介绍 给定一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O…...

相关类相关的可视化图像总结
目录 一、散点图 二、气泡图 三、相关图 四、热力图 五、二维密度图 六、多模态二维密度图 七、雷达图 八、桑基图 九、总结 一、散点图 特点 通过点的位置展示两个连续变量之间的关系,可直观判断线性相关、非线性相关或无相关关系,点的分布密…...