Spring Cloud中Zuul重试机制的示例分析

发布时间:2021-08-21 14:19:17 作者:小新
来源:亿速云 阅读:120

小编给大家分享一下Spring Cloud中Zuul重试机制的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

具体内容如下:

  1. 开启Zuul功能

  2. 通过源码了解Zuul的一次转发

  3. 怎么开启zuul的重试机制

  4. Edgware.RC1版本的优化

开启Zuul的功能

首先如何使用spring cloud zuul完成路由转发的功能,这个问题很简单,只需要进行如下准备工作即可:

  1. 注册中心(Eureka Server)

  2. zuul(同时也是Eureka Client)

  3. 应用服务(同时也是Eureka Client)

我们希望zuul和后端的应用服务同时都注册到Eureka Server上,当我们访问Zuul的某一个地址时,对应其实访问的是后端应用的某个地址,从而从这个地址返回一段内容,并展现到浏览器上。

注册中心(Eureka Server)

创建一个Eureka Server只需要在主函数上添加@EnableEurekaServer,并在properties文件进行简单配置即可,具体内容如下:

@EnableEurekaServer
@RestController
@SpringBootApplication
public class EurekaServerApplication {
  public static void main(String[] args) {
   SpringApplication.run(EurekaServerApplication.class, args);
  }
}
server.port=8761
eureka.client.register-with-eureka=false
eureka.client.fetch-registry=false

Zuul

主函数添加@EnableZuulProxy注解(因为集成Eureka,需要另外添加@EnableDiscoveryClient注解)。并配置properties文件,具体内容如下所示:

@EnableZuulProxy
@EnableDiscoveryClient
@SpringBootApplication
public class ZuulDemoApplication {
  /**
  * 省略代码...
  */
}
server.port=8081
spring.application.name=ZUUL-CLIENT
zuul.routes.api-a.serviceId=EUREKA-CLIENT
zuul.routes.api-a.path=/api-a/**
eureka.client.service-url.defaultZone=http://localhost:8761/eureka

应用服务

@RestController
@EnableEurekaClient
@SpringBootApplication
public class EurekaClientApplication {
  public static void main(String[] args) {
   SpringApplication.run(EurekaClientApplication.class, args);
  }
  @RequestMapping(value = "/hello")
  public String index() {
   return "hello spring...";
  }
}
spring.application.name=EUREKA-CLIENT
eureka.client.service-url.defaultZone=http://localhost:8761/eureka

三个工程全部启动,这时当我们访问localhost:8081/api-a/hello时,你会看到浏览器输出的内容是hello spring...

通过源码了解Zuul的一次转发

接下来我们通过源码层面来了解下,一次转发内部都做了哪些事情。

首先我们查看Zuul的配置类ZuulProxyAutoConfiguration在这个类中有一项工作是初始化Zuul默认自带的Filter,其中有一个Filter很重要,它就是RibbonRoutingFilter。它主要是完成请求的路由转发。接下来我们看下他的run方法

@Override
public Object run() {
  RequestContext context = RequestContext.getCurrentContext();
  try {
   RibbonCommandContext commandContext = buildCommandContext(context);
   ClientHttpResponse response = forward(commandContext);
   setResponse(response);
   return response;
  }
  catch (ZuulException ex) {
   throw new ZuulRuntimeException(ex);
  }
  catch (Exception ex) {
   throw new ZuulRuntimeException(ex);
  }
}

可以看到进行转发的方法是forward,我们进一步查看这个方法,具体内容如下:

省略部分代码

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
  RibbonCommand command = this.ribbonCommandFactory.create(context);
  try {
   ClientHttpResponse response = command.execute();
   return response;
  }
  catch (HystrixRuntimeException ex) {
   return handleException(info, ex);
  }
}

ribbonCommandFactory指的是HttpClientRibbonCommandFactory这个类是在RibbonCommandFactoryConfiguration完成初始化的(触发RibbonCommandFactoryConfiguration的加载动作是利用ZuulProxyAutoConfiguration类上面的@Import标签),具体代码如下:

@Configuration
@ConditionalOnRibbonHttpClient
protected static class HttpClientRibbonConfiguration {
  @Autowired(required = false)
  private Set<ZuulFallbackProvider> zuulFallbackProviders = Collections.emptySet();
  @Bean
  @ConditionalOnMissingBean
  public RibbonCommandFactory<?> ribbonCommandFactory(
     SpringClientFactory clientFactory, ZuulProperties zuulProperties) {
   return new HttpClientRibbonCommandFactory(clientFactory, zuulProperties, zuulFallbackProviders);
  }
}

知道了这个ribbonCommandFactory具体的实现类(HttpClientRibbonCommandFactory),接下来我们看看它的create方法具体做了那些事情

@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
  ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
  final String serviceId = context.getServiceId();
  final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
     serviceId, RibbonLoadBalancingHttpClient.class);
  client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
  return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
     clientFactory.getClientConfig(serviceId));
}

这个方法按照我的理解主要做了以下几件事情:

@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
  /**
  *获取所有ZuulFallbackProvider,即当Zuul
  *调用失败后的降级方法
  */
  ZuulFallbackProvider = xxxxx
  /**
  *创建处理请求转发类,该类会利用
  *Apache的Http client进行请求的转发
  */
  RibbonLoadBalancingHttpClient = xxxxx
  
  /**
  *将降级方法、处理请求转发类、以及其他一些内容
  *包装成HttpClientRibbonCommand(这个类继承了HystrixCommand)
  */
  return new HttpClientRibbonCommand(xxxxx);
}

到这里我们很清楚的知道了RibbonRoutingFilter类的forward方法中RibbonCommand command = this.ribbonCommandFactory.create(context);这一行代码都做了哪些内容.

接下来调用的是command.execute();方法,通过刚刚的分析我们知道了command其实指的是HttpClientRibbonCommand,同时我们也知道HttpClientRibbonCommand继承了HystrixCommand所以当执行command.execute();时其实执行的是HttpClientRibbonCommand的run方法。查看源码我们并没有发现run方法,但是我们发现HttpClientRibbonCommand直接继承了AbstractRibbonCommand。所以其实执行的是AbstractRibbonCommand的run方法,接下来我们看看run方法里面都做了哪些事情:

@Override
protected ClientHttpResponse run() throws Exception {
  final RequestContext context = RequestContext.getCurrentContext();
  RQ request = createRequest();
  RS response = this.client.executeWithLoadBalancer(request, config);
  context.set("ribbonResponse", response);
  if (this.isResponseTimedOut()) {
   if (response != null) {
     response.close();
   }
  }
  return new RibbonHttpResponse(response);
}

可以看到在run方法中会调用client的executeWithLoadBalancer方法,通过上面介绍我们知道client指的是RibbonLoadBalancingHttpClient,而RibbonLoadBalancingHttpClient里面并没有executeWithLoadBalancer方法。(这里面会最终调用它的父类AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法。)

具体代码如下:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
 /**
 * 创建一个RetryHandler,这个很重要它是用来
 * 决定利用RxJava的Observable是否进行重试的标准。
 */
  RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
  /**
   * 创建一个LoadBalancerCommand,这个类用来创建Observable
   * 以及根据RetryHandler来判断是否进行重试操作。
   */
  LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
      .withLoadBalancerContext(this)
      .withRetryHandler(handler)
      .withLoadBalancerURI(request.getUri())
      .build();
  try {
   /**
   *command.submit()方法主要是创建了一个Observable(RxJava)
   *并且为这个Observable设置了重试次数,这个Observable最终
   *会回调AbstractLoadBalancerAwareClient.this.execute()
   *方法。
   */
    return command.submit(
      new ServerOperation<T>() {
        @Override
        public Observable<T> call(Server server) {
          URI finalUri = reconstructURIWithServer(server, request.getUri());
          S requestForServer = (S) request.replaceUri(finalUri);
          try {
            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
          } 
          catch (Exception e) {
            return Observable.error(e);
          }
        }
      })
      .toBlocking()
      .single();
  } catch (Exception e) {
    Throwable t = e.getCause();
    if (t instanceof ClientException) {
      throw (ClientException) t;
    } else {
      throw new ClientException(e);
    }
  }
  
}

下面针对于每一块内容做详细说明:

首先getRequestSpecificRetryHandler(request, requestConfig);这个方法其实调用的是RibbonLoadBalancingHttpClient的getRequestSpecificRetryHandler方法,这个方法主要是返回一个RequestSpecificRetryHandler

@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
 /**
 *这个很关键,请注意该类构造器中的前两个参数的值
 *正因为一开始我也忽略了这两个值,所以后续给我造
 *成一定的干扰。
 */
  return new RequestSpecificRetryHandler(false, false,
     RetryHandler.DEFAULT, requestConfig);
}

接下来创建LoadBalancerCommand并将上一步获得的RequestSpecificRetryHandler作为参数内容。

最后调用LoadBalancerCommand的submit方法。该方法内容太长具体代码细节就不在这里贴出了,按照我个人的理解,只贴出相应的伪代码:

public Observable<T> submit(final ServerOperation<T> operation) {
 //相同server的重试次数(去除首次请求)
  final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
  //集群内其他Server的重试个数
  final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
  /**
   *创建一个Observable(RxJava),selectServer()方法是
   *利用Ribbon选择一个Server,并将其包装成Observable
   */
  Observable<T> o = selectServer().concatMap(new Func1<Server, Observable<T>>() { 
    @Override
    public Observable<T> call(final Server server) {
     /**
     *这里会回调submit方法入参ServerOperation类的call方法,
     */
   return operation.call(server).doOnEach(new Observer<T>() {}
 }
  }
  if (maxRetrysSame > 0) 
   o = o.retry(retryPolicy(maxRetrysSame, true));
    
  if (maxRetrysNext > 0 && server == null) 
    o = o.retry(retryPolicy(maxRetrysNext, false));
  
  return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
    @Override
    public Observable<T> call(Throwable e) {
    /**
    *转发请求失败时,会进入此方法。通过此方法进行判断
    *是否超过重试次数maxRetrysSame、maxRetrysNext。
    */
   }
  });
}

operation.call()方法最终会调用RibbonLoadBalancingHttpClient的execute方法,该方法内容如下:

@Override
public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request,
   final IClientConfig configOverride) throws Exception {
  /**
  * 组装参数(RequestConfig)
  */
  final RequestConfig.Builder builder = RequestConfig.custom();
  IClientConfig config = configOverride != null ? configOverride : this.config;
  builder.setConnectTimeout(config.get(
     CommonClientConfigKey.ConnectTimeout, this.connectTimeout));
  builder.setSocketTimeout(config.get(
     CommonClientConfigKey.ReadTimeout, this.readTimeout));
  builder.setRedirectsEnabled(config.get(
     CommonClientConfigKey.FollowRedirects, this.followRedirects));
  final RequestConfig requestConfig = builder.build();
  if (isSecure(configOverride)) {
   final URI secureUri = UriComponentsBuilder.fromUri(request.getUri())
      .scheme("https").build().toUri();
   request = request.withNewUri(secureUri);
  }
  final HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
  /**
  * 发送转发请求
  */
  final HttpResponse httpResponse = this.delegate.execute(httpUriRequest);
  /**
  * 返回结果
  */
  return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}

可以看到上面方法主要做的就是组装请求参数(包括各种超时时间),然后发起转发请求,最终获取相应结果。

说到这里,zuul转发一次请求的基本原理就说完了。让我们再回顾下整个流程。

  1. zuul的转发是通过RibbonRoutingFilter这个Filter进行操作的。

  2. 在转发之前,zuul利用Hystrix将此次转发请求包装成一个HystrixCommand,正应为这样才使得zuul具有了降级(Fallback)的功能,同时HystrixCommand是具备超时时间的(默认是1s)。而且Zuul默认采用的隔离级别是信号量模式。

  3. 在HystrixCommand内部zuul再次将请求包装成一个Observable,(有关RxJava的知识请参照其官方文档)。并且为Observable设置了重试次数。

事实真的是这样吗?当我看到源码中为Observable设置重试次数的时候,我以为这就是zuul的重试逻辑。遗憾的是我的想法是错误的。还记得上面我说的getRequestSpecificRetryHandler(request, requestConfig);这个方法吗?(不记得的同学可以回过头来再看下),这个方法返回的是RequestSpecificRetryHandler这个类,而且在创建该类时,构造器的前两个参数都为false。(这一点非常重要)。这两个参数分别是okToRetryOnConnectErrors和okToRetryOnAllErrors。

我原本的想法是这个请求被包装成Observable,如果这次请求因为超时出现异常或者其他异常,这样就会触发Observable的重试机制(RxJava),但是事实并非如此,为什么呢?原因就是上面的那两个参数,当出现了超时异常的时候,在触发重试机制之前会调用RequestSpecificRetryHandler的isRetriableException()方法,该方法的作用是用来判断是否执行重试动作,具体代码如下:

@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
  //此时该值为false
  if (okToRetryOnAllErrors) {
    return true;
  } 
  else if (e instanceof ClientException) {
    ClientException ce = (ClientException) e;
    if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
      return !sameServer;
    } else {
      return false;
    }
  } 
  else {
    //此时该值为false
    return okToRetryOnConnectErrors && isConnectionException(e);
  }
}

说道这里zuul转发一次请求的基本原理大概了解了,同时也验证了一个事实就是实现zuul进行重试的逻辑并不是Observable的重试机制。那么问题来了?是什么使zuul具有重试功能的呢?

怎么开启zuul的重试机制

开启Zuul重试的功能在原有的配置基础上需要额外进行以下设置:

  1. 在pom中添加spring-retry的依赖(maven工程)

  2. 设置zuul.retryable=true(该参数默认为false)

具体properties文件内容如下:

server.port=8081
spring.application.name=ZUUL-CLIENT
#路由信息
zuul.routes.api-a.serviceId=EUREKA-CLIENT
zuul.routes.api-a.path=/api-a/**
#是否开启重试功能
zuul.retryable=true
#同一个Server重试的次数(除去首次)
ribbon.MaxAutoRetries=3
#切换相同Server的次数
ribbon.MaxAutoRetriesNextServer=0
eureka.client.service-url.defaultZone=http://localhost:8761/eureka

为了模拟出Zuul重试的功能,需要对后端应用服务进行改造,改造后的内容如下:

@RequestMapping(value = "/hello")
public String index() {
  System.out.println("request is coming...");
  try {
   Thread.sleep(100000);
  } catch (InterruptedException e) {
   System.out.println("线程被打断... " + e.getMessage());
  }
  return "hello spring ...";
}

通过使用Thread.sleep(100000)达到Zuul转发超时情况(Zuul默认连接超时未2s、read超时时间为5s),从而触发Zuul的重试功能。这时候在此访问localhost:8081/api-a/hello时,查看应用服务后台,会发现最终打印三次"request is coming..."

通过现象看本质,接下来简单介绍下Zuul重试的原理。首先如果你工程classpath中存在spring-retry,那么zuul在初始化的时候就不会创建RibbonLoadBalancingHttpClient而是创建RetryableRibbonLoadBalancingHttpClient具体源代码如下:

@ConditionalOnClass(name = "org.apache.http.client.HttpClient")
@ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)
public class HttpClientRibbonConfiguration {
  @Value("${ribbon.client.name}")
  private String name = "client";
  @Bean
  @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
  @ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate")
  public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
     IClientConfig config, ServerIntrospector serverIntrospector,
     ILoadBalancer loadBalancer, RetryHandler retryHandler) {
   RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(
      config, serverIntrospector);
   client.setLoadBalancer(loadBalancer);
   client.setRetryHandler(retryHandler);
   Monitors.registerObject("Client_" + this.name, client);
   return client;
  }
  @Bean
  @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
  @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
  public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(
     IClientConfig config, ServerIntrospector serverIntrospector,
     ILoadBalancer loadBalancer, RetryHandler retryHandler,
     LoadBalancedRetryPolicyFactory loadBalancedRetryPolicyFactory) {
   RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(
      config, serverIntrospector, loadBalancedRetryPolicyFactory);
   client.setLoadBalancer(loadBalancer);
   client.setRetryHandler(retryHandler);
   Monitors.registerObject("Client_" + this.name, client);
   return client;
  }
}

所以请求到来需要转发的时候(AbstractLoadBalancerAwareClient类中executeWithLoadBalancer方法会调用AbstractLoadBalancerAwareClient.this.execute())其实调用的是RetryableRibbonLoadBalancingHttpClient的execute方法(而不是没有重试时候RibbonLoadBalancingHttpClient的execute方法),源码内容如下:

@Override
public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
  final RequestConfig.Builder builder = RequestConfig.custom();
  IClientConfig config = configOverride != null ? configOverride : this.config;
  builder.setConnectTimeout(config.get(
     CommonClientConfigKey.ConnectTimeout, this.connectTimeout));
  builder.setSocketTimeout(config.get(
     CommonClientConfigKey.ReadTimeout, this.readTimeout));
  builder.setRedirectsEnabled(config.get(
     CommonClientConfigKey.FollowRedirects, this.followRedirects));
  final RequestConfig requestConfig = builder.build();
  final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);
  RetryCallback retryCallback = new RetryCallback() {
   @Override
   public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception {
     //on retries the policy will choose the server and set it in the context
     //extract the server and update the request being made
     RibbonApacheHttpRequest newRequest = request;
     if(context instanceof LoadBalancedRetryContext) {
      ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance();
      if(service != null) {
        //Reconstruct the request URI using the host and port set in the retry context
        newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(),
           newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(),
           newRequest.getURI().getPath(), newRequest.getURI().getQuery(),
           newRequest.getURI().getFragment()));
      }
     }
     if (isSecure(configOverride)) {
      final URI secureUri = UriComponentsBuilder.fromUri(newRequest.getUri())
         .scheme("https").build().toUri();
      newRequest = newRequest.withNewUri(secureUri);
     }
     HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig);
     final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
     if(retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) {
      if(CloseableHttpResponse.class.isInstance(httpResponse)) {
        ((CloseableHttpResponse)httpResponse).close();
      }
      throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName,
         httpResponse.getStatusLine().getStatusCode());
     }
     return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
   }
  };
  return this.executeWithRetry(request, retryPolicy, retryCallback);
}

executeWithRetry方法内容如下:

private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, LoadBalancedRetryPolicy retryPolicy, RetryCallback<RibbonApacheHttpResponse, IOException> callback) throws Exception {
  RetryTemplate retryTemplate = new RetryTemplate();
  boolean retryable = request.getContext() == null ? true :
     BooleanUtils.toBooleanDefaultIfNull(request.getContext().getRetryable(), true);
  retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()
     : new RetryPolicy(request, retryPolicy, this, this.getClientName()));
  return retryTemplate.execute(callback);
}

按照我的理解,主要逻辑如下:

@Override
public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
  /**
  *创建RequestConfig(请求信息)
  */
  final RequestConfig requestConfig = builder.build();
  final LoadBalancedRetryPolicy retryPolicy =       loadBalancedRetryPolicyFactory.create(this.getClientName(), this);
  /**
  * 创建RetryCallbck的实现类,用来完成重试逻辑
  */
  RetryCallback retryCallback = new RetryCallback() {};
  
  //创建Spring-retry的模板类,RetryTemplate。
  RetryTemplate retryTemplate = new RetryTemplate();
 /**
 *设置重试规则,即在什么情况下进行重试
 *什么情况下停止重试。源码中这部分存在
 *一个判断,判断的根据就是在zuul工程
 *的propertris中配置的zuul.retryable
 *该参数内容为true才可以具有重试功能。
 */
 retryTemplate.setRetryPolicy(xxx);
 /**
 *发起请求
 */
 return retryTemplate.execute(callback);
}

以上是“Spring Cloud中Zuul重试机制的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

推荐阅读:
  1. Spring Cloud Zuul路由规则动态更新的示例分析
  2. Spring Cloud 网关服务 zuul  动态路由的实现方法

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

springcloud zuul

上一篇:Java线程中断机制的示例分析

下一篇:非阻塞的同步机制CAS怎么用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》