四时宝库

程序员的知识宝库

干货:重试利器之Guava Retrying(guava详解)

重试的使用场景

在很多业务场景中,为了排除系统中的各种不稳定因素,以及逻辑上的错误,并最大概率保证获得预期的结果,重试机制都是必不可少的。

尤其是调用远程服务,在高并发场景下,很可能因为服务器响应延迟或者网络原因,造成我们得不到想要的结果,或者根本得不到响应。这个时候,一个优雅的重试调用机制,可以让我们更大概率保证得到预期的响应。

sequenceDiagram
 Client->>Server:{"msg":"hello,server"}
 Note right of Server: busying ......
 Client->>Server:{"msg":"hello,server"}
 Server-->>Client:{"Exception":"500"}
 Note right of Server: busying ......
 loop ServerBlock
 Server -->> Server: Too busy to deal with so many requests...
 end
 Client->>Server:{"msg":"hello,server"}
 activate Server
 Server-->>Client:{"msg":"hello,client"}
 deactivate Server

通常情况下,我们会通过<strong>定时任务</strong>进行重试。例如某次操作失败,则记录下来,当定时任务再次启动,则将数据放到定时任务的方法中,重新跑一遍。最终直至得到想要的结果为止。

无论是基于定时任务的重试机制,还是我们自己写的简单的重试器,缺点都是重试的机制太单一,而且实现起来不优雅。

如何优雅地设计重试实现

一个完备的重试实现,要很好地解决如下问题:

  1. 什么条件下重试
  2. 什么条件下停止
  3. 如何停止重试
  4. 停止重试等待多久
  5. 如何等待
  6. 请求时间限制
  7. 如何结束
  8. 如何监听整个重试过程

并且,为了更好地封装性,重试的实现一般分为两步:

  1. 使用工厂模式构造重试器
  2. 执行重试方法并得到结果

一个完整的重试流程可以简单示意为:

graph LR
 A((Start)) -->|build| B(Retryer)
 B --> C{need call?}
 C -->|continue| D[call]
 D --> Z[call count++]
 Z --> C
 C -->|finished| E[result]
 E --> F((success))
 E --> G((failed ))

guava-retrying基础用法

guava-retrying是基于谷歌的核心类库guava的重试机制实现,可以说是一个重试利器。

下面就快速看一下它的用法。

1.Maven配置

<!-- https://mvnrepository.com/artifact/com.github.rholder/guava-retrying -->
<dependency>
 <groupId>com.github.rholder</groupId>
 <artifactId>guava-retrying</artifactId>
 <version>2.0.0</version>
</dependency>

需要注意的是,此版本依赖的是27.0.1版本的guava。如果你项目中的guava低几个版本没问题,但是低太多就不兼容了。这个时候你需要升级你项目的guava版本,或者直接去掉你自己的guava依赖,使用guava-retrying传递过来的guava依赖。

2.实现Callable

Callable<Boolean> callable = new Callable<Boolean>() {
 public Boolean call() throws Exception {
 return true; // do something useful here
 }
};

Callable的call方法中是你自己实际的业务调用。

  1. 通过RetryerBuilder构造Retryer
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
 .retryIfResult(Predicates.<Boolean>isNull())
 .retryIfExceptionOfType(IOException.class)
 .retryIfRuntimeException()
 .withStopStrategy(StopStrategies.stopAfterAttempt(3))
 .build();
  1. 使用重试器执行你的业务
retryer.call(callable);

下面是完整的参考实现。

public Boolean test() throws Exception {
 //定义重试机制
 Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
 //retryIf 重试条件
 .retryIfException()
 .retryIfRuntimeException()
 .retryIfExceptionOfType(Exception.class)
 .retryIfException(Predicates.equalTo(new Exception()))
 .retryIfResult(Predicates.equalTo(false))
 //等待策略:每次请求间隔1s
 .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
 //停止策略 : 尝试请求6次
 .withStopStrategy(StopStrategies.stopAfterAttempt(6))
 //时间限制 : 某次请求不得超过2s , 类似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
 .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
 .build();
 //定义请求实现
 Callable<Boolean> callable = new Callable<Boolean>() {
 int times = 1;
 @Override
 public Boolean call() throws Exception {
 log.info("call times={}", times);
 times++;
 if (times == 2) {
 throw new NullPointerException();
 } else if (times == 3) {
 throw new Exception();
 } else if (times == 4) {
 throw new RuntimeException();
 } else if (times == 5) {
 return false;
 } else {
 return true;
 }
 }
 };
 //利用重试器调用请求
 return retryer.call(callable);
}

guava-retrying实现原理

guava-retrying的核心是Attempt类、Retryer类以及一些Strategy(策略)相关的类。

  1. Attempt

Attempt既是一次重试请求(call),也是请求的结果,并记录了当前请求的次数、是否包含异常和请求的返回值。

/**
 * An attempt of a call, which resulted either in a result returned by the call,
 * or in a Throwable thrown by the call.
 *
 * @param <V> The type returned by the wrapped callable.
 * @author JB
 */
public interface Attempt<V>
  1. Retryer

Retryer通过RetryerBuilder这个工厂类进行构造。RetryerBuilder负责将定义的重试策略赋值到Retryer对象中。

在Retryer执行call方法的时候,会将这些重试策略一一使用。

下面就看一下Retryer的call方法的具体实现。

/**
 * Executes the given callable. If the rejection predicate
 * accepts the attempt, the stop strategy is used to decide if a new attempt
 * must be made. Then the wait strategy is used to decide how much time to sleep
 * and a new attempt is made.
 *
 * @param callable the callable task to be executed
 * @return the computed result of the given callable
 * @throws ExecutionException if the given callable throws an exception, and the
 * rejection predicate considers the attempt as successful. The original exception
 * is wrapped into an ExecutionException.
 * @throws RetryException if all the attempts failed before the stop strategy decided
 * to abort, or the thread was interrupted. Note that if the thread is interrupted,
 * this exception is thrown and the thread's interrupt status is set.
 */
 public V call(Callable<V> callable) throws ExecutionException, RetryException {
 long startTime = System.nanoTime();
 //说明: 根据attemptNumber进行循环——也就是重试多少次
 for (int attemptNumber = 1; ; attemptNumber++) {
 //说明:进入方法不等待,立即执行一次
 Attempt<V> attempt;
 try {
 //说明:执行callable中的具体业务
 //attemptTimeLimiter限制了每次尝试等待的时常
 V result = attemptTimeLimiter.call(callable);
 //利用调用结果构造新的attempt
 attempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
 } catch (Throwable t) {
 attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
 }
 //说明:遍历自定义的监听器
 for (RetryListener listener : listeners) {
 listener.onRetry(attempt);
 }
 //说明:判断是否满足重试条件,来决定是否继续等待并进行重试
 if (!rejectionPredicate.apply(attempt)) {
 return attempt.get();
 }
 //说明:此时满足停止策略,因为还没有得到想要的结果,因此抛出异常
 if (stopStrategy.shouldStop(attempt)) {
 throw new RetryException(attemptNumber, attempt);
 } else {
 //说明:执行默认的停止策略——线程休眠
 long sleepTime = waitStrategy.computeSleepTime(attempt);
 try {
 //说明:也可以执行定义的停止策略
 blockStrategy.block(sleepTime);
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new RetryException(attemptNumber, attempt);
 }
 }
 }
 }

Retryer执行过程如下。

graph TB
 sq[Retryer] --> ci((call))
 subgraph Retrying
 rb>RetryerBuilder]-- build retryer<br/>with strategies --> ro
 di{Retryer:<br/>using callable whith <br/>strategies execute call...} -.->
 ro(<br>.retryIf...<br>.withWaitStrategy<br>.withStopStrategy<br>.withAttemptTimeLimiter<br>.withBlockStrategy<br>.withRetryListene)
 di==>ro2(Attempt: get the result)
 end
 classDef green fill:#9f6,stroke:#333,stroke-width:2px;
 classDef orange fill:#f96,stroke:#333,stroke-width:4px;
 class sq,e green
 class di orange

guava-retrying高级用法

基于guava-retrying的实现原理,我们可以根据实际业务来确定自己的重试策略。

下面以数据同步这种常规系统业务为例,自定义重试策略。

如下实现基于Spring Boot 2.1.2.RELEASE版本。

并使用Lombok简化Bean。

<dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 <optional>true</optional>
</dependency>

业务描述

当商品创建以后,需要另外设置商品的价格。由于两个操作是有两个人进行的,因此会出现如下问题,即商品没有创建,但是价格数据却已经建好了。遇到这种情况,价格数据需要等待商品正常创建以后,继续完成同步。

我们通过一个http请求进行商品的创建,同时通过一个定时器来修改商品的价格。

当商品不存在,或者商品的数量小于1的时候,商品的价格不能设置。需要等商品成功创建且数量大于0的时候,才能将商品的价格设置成功。

实现过程

  1. 自定义重试阻塞策略

默认的阻塞策略是线程休眠,这里使用自旋锁实现,不阻塞线程。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy;
import com.github.rholder.retry.BlockStrategy;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.LocalDateTime;
/**
 * 自旋锁的实现, 不响应线程中断
 */
@Slf4j
@NoArgsConstructor
public class SpinBlockStrategy implements BlockStrategy {
 @Override
 public void block(long sleepTime) throws InterruptedException {
 LocalDateTime startTime = LocalDateTime.now();
 long start = System.currentTimeMillis();
 long end = start;
 log.info("[SpinBlockStrategy]...begin wait.");
 while (end - start <= sleepTime) {
 end = System.currentTimeMillis();
 }
 //使用Java8新增的Duration计算时间间隔
 Duration duration = Duration.between(startTime, LocalDateTime.now());
 log.info("[SpinBlockStrategy]...end wait.duration={}", duration.toMillis());
 }
}
  1. 自定义重试监听器

RetryListener可以监控多次重试过程,并可以使用attempt做一些额外的事情。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RetryLogListener implements RetryListener {
 @Override
 public <V> void onRetry(Attempt<V> attempt) {
 // 第几次重试,(注意:第一次重试其实是第一次调用)
 log.info("retry time : [{}]", attempt.getAttemptNumber());
 // 距离第一次重试的延迟
 log.info("retry delay : [{}]", attempt.getDelaySinceFirstAttempt());
 // 重试结果: 是异常终止, 还是正常返回
 log.info("hasException={}", attempt.hasException());
 log.info("hasResult={}", attempt.hasResult());
 // 是什么原因导致异常
 if (attempt.hasException()) {
 log.info("causeBy={}" , attempt.getExceptionCause().toString());
 } else {
 // 正常返回时的结果
 log.info("result={}" , attempt.getResult());
 }
 log.info("log listen over.");
 }
}
  1. 自定义Exception

有些异常需要重试,有些不需要。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception;
/**
 * 当抛出这个异常的时候,表示需要重试
 */
public class NeedRetryException extends Exception {
 public NeedRetryException(String message) {
 super("NeedRetryException can retry."+message);
 }
}
  1. 实现具体重试业务与Callable接口

使用call方法调用自己的业务。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.math.BigDecimal;
/**
 * 商品model
 */
@Data
@AllArgsConstructor
public class Product {
 private Long id;
 private String name;
 private Integer count;
 private BigDecimal price;
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import org.springframework.stereotype.Repository;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
 * 商品DAO
 */
@Repository
public class ProductRepository {
 private static ConcurrentHashMap<Long,Product> products=new ConcurrentHashMap();
 private static AtomicLong ids=new AtomicLong(0);
 public List<Product> findAll(){
 return new ArrayList<>(products.values());
 }
 public Product findById(Long id){
 return products.get(id);
 }
 public Product updatePrice(Long id, BigDecimal price){
 Product p=products.get(id);
 if (null==p){
 return p;
 }
 p.setPrice(price);
 return p;
 }
 public Product addProduct(Product product){
 Long id=ids.addAndGet(1);
 product.setId(id);
 products.put(id,product);
 return product;
 }
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
/**
 * 业务方法实现
 */
@Component
@Slf4j
public class ProductInformationHander implements Callable<Boolean> {
 @Autowired
 private ProductRepository pRepo;
 private static Map<Long, BigDecimal> prices = new HashMap<>();
 static {
 prices.put(1L, new BigDecimal(100));
 prices.put(2L, new BigDecimal(200));
 prices.put(3L, new BigDecimal(300));
 prices.put(4L, new BigDecimal(400));
 prices.put(8L, new BigDecimal(800));
 prices.put(9L, new BigDecimal(900));
 }
 @Override
 public Boolean call() throws Exception {
 log.info("sync price begin,prices size={}", prices.size());
 for (Long id : prices.keySet()) {
 Product product = pRepo.findById(id);
 if (null == product) {
 throw new NeedRetryException("can not find product by id=" + id);
 }
 if (null == product.getCount() || product.getCount() < 1) {
 throw new NeedRetryException("product count is less than 1, id=" + id);
 }
 Product updatedP = pRepo.updatePrice(id, prices.get(id));
 if (null == updatedP) {
 return false;
 }
 prices.remove(id);
 }
 log.info("sync price over,prices size={}", prices.size());
 return true;
 }
}
  1. 构造重试器Retryer

将上面的实现作为参数,构造Retryer。

package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import com.github.rholder.retry.*;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener.RetryLogListener;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy.SpinBlockStrategy;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
 * 构造重试器
 */
@Component
public class ProductRetryerBuilder {
 public Retryer build() {
 //定义重试机制
 Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
 //retryIf 重试条件
 //.retryIfException()
 //.retryIfRuntimeException()
 //.retryIfExceptionOfType(Exception.class)
 //.retryIfException(Predicates.equalTo(new Exception()))
 //.retryIfResult(Predicates.equalTo(false))
 .retryIfExceptionOfType(NeedRetryException.class)
 //等待策略:每次请求间隔1s
 .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
 //停止策略 : 尝试请求3次
 .withStopStrategy(StopStrategies.stopAfterAttempt(3))
 //时间限制 : 某次请求不得超过2s , 类似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
 .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
 //默认的阻塞策略:线程睡眠
 //.withBlockStrategy(BlockStrategies.threadSleepStrategy())
 //自定义阻塞策略:自旋锁
 .withBlockStrategy(new SpinBlockStrategy())
 //自定义重试监听器
 .withRetryListener(new RetryLogListener())
 .build();
 return retryer;
 }
}
  1. 与定时任务结合执行Retryer

定时任务只需要跑一次,但是实际上实现了所有的重试策略。这样大大简化了定时器的设计。

首先使用@EnableScheduling声明项目支持定时器注解。

@SpringBootApplication
@EnableScheduling
public class DemoRetryerApplication {
 public static void main(String[] args) {
 SpringApplication.run(DemoRetryerApplication.class, args);
 }
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.task;
import com.github.rholder.retry.Retryer;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductInformationHander;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductRetryerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 商品信息定时器
*/
@Component
public class ProductScheduledTasks {
 @Autowired
 private ProductRetryerBuilder builder;
 @Autowired
 private ProductInformationHander hander;
 /**
 * 同步商品价格定时任务
 * @Scheduled(fixedDelay = 30000) :上一次执行完毕时间点之后30秒再执行
 */
 @Scheduled(fixedDelay = 30*1000)
 public void syncPrice() throws Exception{
 Retryer retryer=builder.build();
 retryer.call(hander);
 }
}

执行结果:由于并没有商品,因此重试以后,抛出异常。

2019-二月-28 14:37:52.667 INFO [scheduling-1] n.i.t.f.s.i.d.r.g.l.RetryLogListener - log listen over.
2019-二月-28 14:37:52.672 ERROR [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler - Unexpected error occurred in scheduled task.
com.github.rholder.retry.RetryException: Retrying failed to complete successfully after 3 attempts.
 at com.github.rholder.retry.Retryer.call(Retryer.java:174)

动态调节重试策略

在实际使用过程中,有时经常需要调整重试的次数、等待的时间等重试策略,因此,将重试策略的配置参数化保存,可以动态调节。

例如在秒杀、双十一购物节等时期增加等待的时间与重试次数,以保证错峰请求。在平时,可以适当减少等待时间和重试次数。

对于系统关键性业务,如果多次重试步成功,可以通过RetryListener进行监控与报警。

关于 『动态调节重试策略 』下面提供一个参考实现。

import com.github.rholder.retry.Attempt; 
import com.github.rholder.retry.WaitStrategy; 
 
/** 
 * 自定义等待策略:根据重试次数动态调节等待时间,第一次请求间隔1s,第二次间隔10s,第三次及以后都是20s。 
 * 
 * 
 * 在创建Retryer的时候通过withWaitStrategy将该等待策略生效即可。 
 * 
 * RetryerBuilder.<Boolean>newBuilder() 
 * .withWaitStrategy(new AlipayWaitStrategy()) 
 * 
 * 类似的效果也可以通过自定义 BlockStrategy 来实现,你可以写一下试试。 
 * 
 */ 
public class AlipayWaitStrategy implements WaitStrategy { 
 
 @Override 
 public long computeSleepTime(Attempt failedAttempt) { 
 long number = failedAttempt.getAttemptNumber(); 
 if (number==1){ 
 return 1*1000; 
 } 
 if (number==2){ 
 return 10*1000; 
 } 
 return 20*1000; 
 } 
 
}

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接