Spring Cloud Gateway调用Feign异步问题记录

版本设定 spring cloud 2020.0.2版本

HttpMessageConverters

原因

由于Spring Cloud Gateway 是基于Spring 5、Spring Boot 2.X和Reactor开发的响应式组件,运用了大量的异步实现。

在项目启动过程中,并不会创建HttpMessageConverters实例,具体可查看源码HttpMessageConvertersAutoConfiguration

解决方法

启动时创建相应的Bean,注入到Spring容器

  1. @Configuration
  2. public class FeignConfig {
  3.  
  4.      @Bean
  5.      public Decoder decoder(){
  6.          return new ResponseEntityDecoder(new SpringDecoder(feignHttpMessageConverter()));
  7.      }
  8.      private ObjectFactory<HttpMessageConverters> feignHttpMessageConverter(){
  9.          HttpMessageConverters httpMessageConverters=new HttpMessageConverters
  10.                  (new MappingJackson2HttpMessageConverter());
  11.          return ()->httpMessageConverters;
  12.      }
  13. }

Filter异步调用问题

场景

以鉴权为例,外部访问经由Gateway路由转发,需要验证当前请求中是否存在token,可以通过自定义过滤器实现GlobalFitler实现。

  1. @PropertySource(value = “classpath:loginfilter.properties”)
  2. @Component
  3. public class AuthLoginGlobalFilter implements GlobalFilter, Ordered {
  4.      @Value(“#{‘/per-user/login,/goods/**’.split(‘,’)}”)
  5.      private List<String> ignoreUrls;
  6.      @Autowired
  7.      private IUserFeign userFeign;
  8.      ExecutorService executorService = Executors.newFixedThreadPool(1);
  9.      @Override
  10.      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  11.          ServerHttpRequest request = exchange.getRequest();
  12.          if(ignoreUrls !=null && ignoreUrls.contains(request.getURI().getPath())) {
  13.              return chain.filter(exchange);
  14.          }
  15.          String Access_token = request.getHeaders().getFirst(“access_token”);
  16.          if(StringUtils.isBlank(access_token)) {
  17.              return onError(exchange,“尚未登录”);
  18.          }
  19.          R<String> r = userFeign.validToken(access_token);
  20.          if(r.getCode() == 200) {
  21.              ServerHttpRequest serverHttpRequest = request.mutate().header(“uid”,r.getData()).build();
  22.              return chain.filter(exchange.mutate().request(serverHttpRequest).build());
  23.          }
  24.  
  25.          return onError(exchange,r.getMsg());
  26.      }
  27.  
  28.      @Override
  29.      public int getOrder() {
  30.          return 0;
  31.      }
  32.  
  33.      private Mono<Void> onError(ServerWebExchange exchange,String msg) {
  34.          ServerHttpResponse response = exchange.getResponse();
  35.          response.setStatusCode(HttpStatus.UNAUTHORIZED);
  36.          response.getHeaders().add(“Content-Type”,“application/json;charset=UTF-8″);
  37.          R r = new R.Builder().buildCustomize(HttpStatus.UNAUTHORIZED.value(),msg);
  38.          ObjectMapper objectMapper = new ObjectMapper();
  39.          String rs = “”;
  40.          try {
  41.              rs = objectMapper.writeValueAsString(r);
  42.          } catch (JsonProcessingException e) {
  43.              e.printStackTrace();
  44.          }
  45.          DataBuffer dataBuffer =response.bufferFactory().wrap(rs.getBytes());
  46.          return response.writeWith(Flux.just(dataBuffer));
  47.      }
  48. }

R r = userFeign.validToken(access_token);属于同步调用,会报以下错误:

-1

错误原因

在blockingSingleSubscriber中会进行判断:

  1. final T blockingGet() {
  2.          if (Schedulers.isInNonBlockingThread()) {
  3.              throw new IllegalStateException(“block()/blockFirst()/blockLast() are blocking, which is not supported in thread “ + Thread.currentThread().getName());
  4.          }
  5.          if (getCount() != 0) {
  6.              try {
  7.                  await();
  8.              }
  9.              catch (InterruptedException ex) {
  10.                  dispose();
  11.                  throw Exceptions.propagate(ex);
  12.              }
  13.          }
  14.  
  15.          Throwable e = error;
  16.          if (!= null) {
  17.              RuntimeException re = Exceptions.propagate(e);
  18.              //this is ok, as re is always a new non-singleton instance
  19.              re.addSuppressed(new Exception(“#block terminated with an error”));
  20.              throw re;
  21.          }
  22.          return value;
  23.      }

解决方案

解决方案,同步转异步,如果需要获取返回结果,可以通过Future方式获取

  1. @PropertySource(value = “classpath:loginfilter.properties”)
  2. @Component
  3. public class AuthLoginGlobalFilter implements GlobalFilter, Ordered {
  4.      @Value(“#{‘/per-user/login,/goods/**’.split(‘,’)}”)
  5.      private List<String> ignoreUrls;
  6.      @Autowired
  7.      private IUserFeign userFeign;
  8.      ExecutorService executorService = Executors.newFixedThreadPool(1);
  9.      @Override
  10.      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  11.          ServerHttpRequest request = exchange.getRequest();
  12.          if(ignoreUrls !=null && ignoreUrls.contains(request.getURI().getPath())) {
  13.              return chain.filter(exchange);
  14.          }
  15.          String access_token = request.getHeaders().getFirst(“access_token”);
  16.          if(StringUtils.isBlank(access_token)) {
  17.              return onError(exchange,“尚未登录”);
  18.          }
  19.          // WebFlux异步调用,同步会报错
  20.          Future future = executorService.submit((Callable<R>) () -> userFeign.validToken(access_token));
  21.          R<String> r = null;
  22.          try {
  23.              r = (R<String>) future.get();
  24.              if(r.getCode() == 200) {
  25.                  ServerHttpRequest serverHttpRequest = request.mutate().header(“uid”,r.getData()).build();
  26.                  return chain.filter(exchange.mutate().request(serverHttpRequest).build());
  27.              }
  28.          } catch (InterruptedException e) {
  29.              e.printStackTrace();
  30.          } catch (ExecutionException e) {
  31.              e.printStackTrace();
  32.          }
  33.  
  34.          return onError(exchange,r.getMsg());
  35.      }
  36.  
  37.      @Override
  38.      public int getOrder() {
  39.          return 0;
  40.      }
  41.  
  42.      private Mono<Void> onError(ServerWebExchange exchange,String msg) {
  43.          ServerHttpResponse response = exchange.getResponse();
  44.          response.setStatusCode(HttpStatus.UNAUTHORIZED);
  45.          response.getHeaders().add(“Content-Type”,“application/json;charset=UTF-8”);
  46.          R r = new R.Builder().buildCustomize(HttpStatus.UNAUTHORIZED.value(),msg);
  47.          ObjectMapper objectMapper = new ObjectMapper();
  48.          String rs = “”;
  49.          try {
  50.              rs = objectMapper.writeValueAsString(r);
  51.          } catch (JsonProcessingException e) {
  52.              e.printStackTrace();
  53.          }
  54.          DataBuffer dataBuffer =response.bufferFactory().wrap(rs.getBytes());
  55.          return response.writeWith(Flux.just(dataBuffer));
  56.      }
  57. }

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

标签

发表评论