SpringBoot多线程进行异步请求的处理方式

SpringBoot多线程进行异步请求的处理

近期在协会博客园中,有人发布了博客,系统进行查重的时候由于机器最低配置进行大量计算时需要十秒左右时间才能处理完,由于一开始是单例模式,导致,在某人查重的时候整个系统是不会再响应别的请求的,导致了系统假死状态,那么我们就应该使用多线程进行处理,将这种不能快速返回结果的方法交给线程池进行处理。

而我们自己使用Java Thread实现多线程又比较麻烦,在这里我们使用SpringBoot自带的多线程支持,仅需两步就可以对我们的查重方法利用多线程进行处理

第一步:编写配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 
import java.util.concurrent.Executor;
 
@Configuration
@EnableAsync  // 启用异步任务
public class AsyncConfig {
 
    // 声明一个线程池(并指定线程池的名字)
    @Bean("AsyncThread")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程数5:线程池创建时候初始化的线程数
        executor.setCorePoolSize(5);
        //最大线程数5:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(10);
        //缓冲队列500:用来缓冲执行任务的队列
        executor.setQueueCapacity(500);
        //允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(60);
        //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix("AsyncThread-");
        executor.initialize();
        return executor;
    }
}

第二步:对方法使用注解标注为使用多线程进行处理

并将方法改写返回类型(因为不能立即返回,所以需要将返回值改为CompletableFuture,

返回的时候使用return CompletableFuture.completedFuture(str);)

方法示范:

    @Async("AsyncThread")
    @RequestMapping(value = "/addBlog")
    public CompletableFuture addBlog(HttpSession httpSession, Blog blog, String blogId, boolean tempSave) {
        System.out.println("\n\n----------------------------------------------");
        System.out.println(Thread.currentThread().getName() + "正在处理请求");
        System.out.println("----------------------------------------------");
        String result = "请求失败";
        //....你的业务逻辑
        return CompletableFuture.completedFuture(result);
    }

这样以后你的这个方法将会交由线程池去进行处理,并将结果返回,一定要记得改返回值类型,否则返回的将是空的。

SpringBoot请求线程优化

在我们的实际生产中,常常会遇到下面的这种情况,某个请求非常耗时(大约5s返回),当大量的访问该请求的时候,再请求其他服务时,会造成没有连接使用的情况,造成这种现象的主要原因是,我们的容器(tomcat)中线程的数量是一定的,例如500个,当这500个线程都用来请求服务的时候,再有请求进来,就没有多余的连接可用了,只能拒绝连接。要是我们在请求耗时服务的时候,能够异步请求(请求到controller中时,则容器线程直接返回,然后使用系统内部的线程来执行耗时的服务,等到服务有返回的时候,再将请求返回给客户端),那么系统的吞吐量就会得到很大程度的提升了。当然,大家可以直接使用Hystrix的资源隔离来实现,今天我们的重点是spring mvc是怎么来实现这种异步请求的。

使用Callable来实现

controller如下:

    @RestController
    public class HelloController {undefined     
        private static final Logger logger = LoggerFactory.getLogger(HelloController.class);        
        @Autowired
        private HelloService hello;
     
        @GetMapping("/helloworld")
        public String helloWorldController() {undefined
            return hello.sayHello();
        }
     
        /**
         * 异步调用restful
         * 当controller返回值是Callable的时候,springmvc就会启动一个线程将Callable交给TaskExecutor去处理
         * 然后DispatcherServlet还有所有的spring拦截器都退出主线程,然后把response保持打开的状态
         * 当Callable执行结束之后,springmvc就会重新启动分配一个request请求,然后DispatcherServlet就重新
         * 调用和处理Callable异步执行的返回结果, 然后返回视图
         *
         * @return
         */
        @GetMapping("/hello")
        public Callable helloController() {undefined
            logger.info(Thread.currentThread().getName() + " 进入helloController方法");
            Callable callable = new Callable() {undefined
     
                @Override
                public String call() throws Exception {undefined
                    logger.info(Thread.currentThread().getName() + " 进入call方法");
                    String say = hello.sayHello();
                    logger.info(Thread.currentThread().getName() + " 从helloService方法返回");
                    return say;
                }
            };
            logger.info(Thread.currentThread().getName() + " 从helloController方法返回");
            return callable;
        }
    }

我们首先来看下上面这两个请求的区别:

下面这个是没有使用异步请求的

2017-12-07 18:05:42.351 INFO 3020 --- [nio-8060-exec-5] c.travelsky.controller.HelloController : http-nio-8060-exec-5 进入helloWorldController方法
2017-12-07 18:05:42.351 INFO 3020 --- [nio-8060-exec-5] com.travelsky.service.HelloService : http-nio-8060-exec-5 进入sayHello方法!
2017-12-07 18:05:44.351 INFO 3020 --- [nio-8060-exec-5] c.travelsky.controller.HelloController : http-nio-8060-exec-5 从helloWorldController方法返回

我们可以看到,请求从头到尾都只有一个线程,并且整个请求耗费了2s钟的时间。

下面,我们再来看下使用Callable异步请求的结果:

2017-12-07 18:11:55.671 INFO 6196 --- [nio-8060-exec-1] c.travelsky.controller.HelloController : http-nio-8060-exec-1 进入helloController方法
2017-12-07 18:11:55.672 INFO 6196 --- [nio-8060-exec-1] c.travelsky.controller.HelloController : http-nio-8060-exec-1 从helloController方法返回
2017-12-07 18:11:55.676 INFO 6196 --- [nio-8060-exec-1] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-1 进入afterConcurrentHandlingStarted方法
2017-12-07 18:11:55.676 INFO 6196 --- [ MvcAsync1] c.travelsky.controller.HelloController : MvcAsync1 进入call方法
2017-12-07 18:11:55.676 INFO 6196 --- [ MvcAsync1] com.travelsky.service.HelloService : MvcAsync1 进入sayHello方法!
2017-12-07 18:11:57.677 INFO 6196 --- [ MvcAsync1] c.travelsky.controller.HelloController : MvcAsync1 从helloService方法返回
2017-12-07 18:11:57.721 INFO 6196 --- [nio-8060-exec-2] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-2服务调用完成,返回结果给客户端

从上面的结果中,我们可以看出,容器的线程http-nio-8060-exec-1这个线程进入controller之后,就立即返回了,具体的服务调用是通过MvcAsync2这个线程来做的,当服务执行完要返回后,容器会再启一个新的线程http-nio-8060-exec-2来将结果返回给客户端或浏览器,整个过程response都是打开的,当有返回的时候,再从server端推到response中去。

1、异步调用的另一种方式

上面的示例是通过callable来实现的异步调用,其实还可以通过WebAsyncTask,也能实现异步调用,下面看示例:

    @RestController
    public class HelloController {undefined     
        private static final Logger logger = LoggerFactory.getLogger(HelloController.class);        
        @Autowired
        private HelloService hello;     
            /**
         * 带超时时间的异步请求 通过WebAsyncTask自定义客户端超时间
         *
         * @return
         */
        @GetMapping("/world")
        public WebAsyncTask worldController() {undefined
            logger.info(Thread.currentThread().getName() + " 进入helloController方法");
     
            // 3s钟没返回,则认为超时
            WebAsyncTask webAsyncTask = new WebAsyncTask<>(3000, new Callable() {undefined
     
                @Override
                public String call() throws Exception {undefined
                    logger.info(Thread.currentThread().getName() + " 进入call方法");
                    String say = hello.sayHello();
                    logger.info(Thread.currentThread().getName() + " 从helloService方法返回");
                    return say;
                }
            });
            logger.info(Thread.currentThread().getName() + " 从helloController方法返回");
     
            webAsyncTask.onCompletion(new Runnable() {undefined
     
                @Override
                public void run() {undefined
                    logger.info(Thread.currentThread().getName() + " 执行完毕");
                }
            });
     
            webAsyncTask.onTimeout(new Callable() {undefined
     
                @Override
                public String call() throws Exception {undefined
                    logger.info(Thread.currentThread().getName() + " onTimeout");
                    // 超时的时候,直接抛异常,让外层统一处理超时异常
                    throw new TimeoutException("调用超时");
                }
            });
            return webAsyncTask;
        }
     
        /**
         * 异步调用,异常处理,详细的处理流程见MyExceptionHandler类
         *
         * @return
         */
        @GetMapping("/exception")
        public WebAsyncTask exceptionController() {undefined
            logger.info(Thread.currentThread().getName() + " 进入helloController方法");
            Callable callable = new Callable() {undefined
     
                @Override
                public String call() throws Exception {undefined
                    logger.info(Thread.currentThread().getName() + " 进入call方法");
                    throw new TimeoutException("调用超时!");
                }
            };
            logger.info(Thread.currentThread().getName() + " 从helloController方法返回");
            return new WebAsyncTask<>(20000, callable);
        }     
    }

运行结果如下:

2017-12-07 19:10:26.582 INFO 6196 --- [nio-8060-exec-4] c.travelsky.controller.HelloController : http-nio-8060-exec-4 进入helloController方法
2017-12-07 19:10:26.585 INFO 6196 --- [nio-8060-exec-4] c.travelsky.controller.HelloController : http-nio-8060-exec-4 从helloController方法返回
2017-12-07 19:10:26.589 INFO 6196 --- [nio-8060-exec-4] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-4 进入afterConcurrentHandlingStarted方法
2017-12-07 19:10:26.591 INFO 6196 --- [ MvcAsync2] c.travelsky.controller.HelloController : MvcAsync2 进入call方法
2017-12-07 19:10:26.591 INFO 6196 --- [ MvcAsync2] com.travelsky.service.HelloService : MvcAsync2 进入sayHello方法!
2017-12-07 19:10:28.591 INFO 6196 --- [ MvcAsync2] c.travelsky.controller.HelloController : MvcAsync2 从helloService方法返回
2017-12-07 19:10:28.600 INFO 6196 --- [nio-8060-exec-5] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-5服务调用完成,返回结果给客户端
2017-12-07 19:10:28.601 INFO 6196 --- [nio-8060-exec-5] c.travelsky.controller.HelloController : http-nio-8060-exec-5 执行完毕

这种方式和上面的callable方式最大的区别就是,WebAsyncTask支持超时,并且还提供了两个回调函数,分别是onCompletion和onTimeout,顾名思义,这两个回调函数分别在执行完成和超时的时候回调。

3、Deferred方式实现异步调用

在我们是生产中,往往会遇到这样的情景,controller中调用的方法很多都是和第三方有关的,例如JMS,定时任务,队列等,拿JMS来说,比如controller里面的服务需要从JMS中拿到返回值,才能给客户端返回,而从JMS拿值这个过程也是异步的,这个时候,我们就可以通过Deferred来实现整个的异步调用。

首先,我们来模拟一个长时间调用的任务,代码如下:

    @Component
    public class LongTimeTask {undefined
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
        @Async
        public void execute(DeferredResult deferred){undefined
            logger.info(Thread.currentThread().getName() + "进入 taskService 的 execute方法");
            try {undefined
                // 模拟长时间任务调用,睡眠2s
                TimeUnit.SECONDS.sleep(2);
                // 2s后给Deferred发送成功消息,告诉Deferred,我这边已经处理完了,可以返回给客户端了
                deferred.setResult("world");
            } catch (InterruptedException e) {undefined
                e.printStackTrace();
            }
        }
    }

接着,我们就来实现异步调用,controller如下:

    @RestController
    public class AsyncDeferredController {undefined
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
        private final LongTimeTask taskService;        
        @Autowired
        public AsyncDeferredController(LongTimeTask taskService) {undefined
            this.taskService = taskService;
        }
        
        @GetMapping("/deferred")
        public DeferredResult executeSlowTask() {undefined
            logger.info(Thread.currentThread().getName() + "进入executeSlowTask方法");
            DeferredResult deferredResult = new DeferredResult<>();
            // 调用长时间执行任务
            taskService.execute(deferredResult);
            // 当长时间任务中使用deferred.setResult("world");这个方法时,会从长时间任务中返回,继续controller里面的流程
            logger.info(Thread.currentThread().getName() + "从executeSlowTask方法返回");
            // 超时的回调方法
            deferredResult.onTimeout(new Runnable(){undefined
            
                @Override
                public void run() {undefined
                    logger.info(Thread.currentThread().getName() + " onTimeout");
                    // 返回超时信息
                    deferredResult.setErrorResult("time out!");
                }
            });
            
            // 处理完成的回调方法,无论是超时还是处理成功,都会进入这个回调方法
            deferredResult.onCompletion(new Runnable(){undefined
            
                @Override
                public void run() {undefined
                    logger.info(Thread.currentThread().getName() + " onCompletion");
                }
            });            
            return deferredResult;
        }
    }

执行结果如下:

2017-12-07 19:25:40.192 INFO 6196 --- [nio-8060-exec-7] c.t.controller.AsyncDeferredController : http-nio-8060-exec-7进入executeSlowTask方法
2017-12-07 19:25:40.193 INFO 6196 --- [nio-8060-exec-7] .s.a.AnnotationAsyncExecutionInterceptor : No TaskExecutor bean found for async processing
2017-12-07 19:25:40.194 INFO 6196 --- [nio-8060-exec-7] c.t.controller.AsyncDeferredController : http-nio-8060-exec-7从executeSlowTask方法返回
2017-12-07 19:25:40.198 INFO 6196 --- [nio-8060-exec-7] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-7 进入afterConcurrentHandlingStarted方法
2017-12-07 19:25:40.202 INFO 6196 --- [cTaskExecutor-1] com.travelsky.controller.LongTimeTask : SimpleAsyncTaskExecutor-1进入 taskService 的 execute方法
2017-12-07 19:25:42.212 INFO 6196 --- [nio-8060-exec-8] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-8服务调用完成,返回结果给客户端
2017-12-07 19:25:42.213 INFO 6196 --- [nio-8060-exec-8] c.t.controller.AsyncDeferredController : http-nio-8060-exec-8 onCompletion

从上面的执行结果不难看出,容器线程会立刻返回,应用程序使用线程池里面的cTaskExecutor-1线程来完成长时间任务的调用,当调用完成后,容器又启了一个连接线程,来返回最终的执行结果。

这种异步调用,在容器线程资源非常宝贵的时候,能够大大的提高整个系统的吞吐量。

ps:异步调用可以使用AsyncHandlerInterceptor进行拦截,使用示例如下:

    @Component
    public class MyAsyncHandlerInterceptor implements AsyncHandlerInterceptor {undefined        
        private static final Logger logger = LoggerFactory.getLogger(MyAsyncHandlerInterceptor.class);     
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
                throws Exception {undefined
            return true;
        }
     
        @Override
        public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
                ModelAndView modelAndView) throws Exception {undefined
    //        HandlerMethod handlerMethod = (HandlerMethod) handler;
            logger.info(Thread.currentThread().getName()+ "服务调用完成,返回结果给客户端");
        }
     
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
                throws Exception {undefined
            if(null != ex){undefined
                System.out.println("发生异常:"+ex.getMessage());
            }
        }
     
        @Override
        public void afterConcurrentHandlingStarted(HttpServletRequest request, HttpServletResponse response, Object handler)
                throws Exception {undefined            
            // 拦截之后,重新写回数据,将原来的hello world换成如下字符串
            String resp = "my name is chhliu!";
            response.setContentLength(resp.length());
            response.getOutputStream().write(resp.getBytes());            
            logger.info(Thread.currentThread().getName() + " 进入afterConcurrentHandlingStarted方法");
        }     
    }

有兴趣的可以了解下,我这里的主题是异步调用,其他的相关知识点,以后在讲解吧。希望能给大家一个参考,也希望大家多多支持脚本之家。

你可能感兴趣的