Servlet-3asynchronous上下文,如何做asynchronous写入?

问题描述

Servlet-3.0 API允许分离请求/响应上下文,并在稍后进行回答。

但是,如果我尝试写入大量的数据,如:

AsyncContext ac = getWaitingContext() ; ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(some_big_data); out.flush() 

对于Tomcat 7和Jetty 8,它可能实际上会阻塞 – 而且会阻塞在简单的testing用例中。教程build议创build一个处理这种设置的线程池 – 巫婆通常对传统的10K架构是相反的。

但是,如果我有10,000个打开的连接和10个线程的线程池,即使只有1%的低速连接的客户端或只是阻塞的连接阻塞线程池,并完全阻止彗星响应或减速显著。

预期的做法是获得“写入就绪”通知或I / O完成通知,并继续推送数据。

这怎么可以使用Servlet-3.0 API来完成,也就是说我怎么得到:

  • I / O操作的asynchronous完成通知。
  • 通过写入就绪通知获取非阻塞I / O。

如果Servlet-3.0 API不支持这种情况,那么是否有任何特定于Web服务器的API(如Jetty Continuation或Tomcat CometEvent)可以asynchronous处理这些事件,而不用使用线程池伪造asynchronousI / O。

有人知道吗?

如果这是不可能的,你可以通过参考文档来确认吗?

示例代码中的问题演示

我附上了模拟事件stream的代码。

笔记:

  • 它使用ServletOutputStream抛出IOException检测断开连接的客户端
  • 它发送keep-alive消息,以确保客户仍然在那里
  • 我创build了一个线程池来“模拟”asynchronous操作。

在这样的例子中,我明确定义了大小为1的线程池来显示问题:

  • 启动应用程序
  • 从两个terminal运行curl http://localhost:8080/path/to/app (两次)
  • 现在用curd -dm=message http://localhost:8080/path/to/app发送数据
  • 两个客户都收到了这些数据
  • 现在挂起其中一个客户端(Ctrl + Z)并再次发送消息curd -dm=message http://localhost:8080/path/to/app
  • 观察到另一个未挂起的客户端或者什么也没有收到,或者在消息被转发后停止接收保持活动请求,因为其他线程被阻塞。

我想在不使用线程池的情况下解决这样的问题,因为有了1000-5000个打开的连接,我可以非常快地耗尽线程池。

下面的示例代码。


 import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; import javax.servlet.AsyncContext; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.ServletOutputStream; @WebServlet(urlPatterns = "", asyncSupported = true) public class HugeStreamWithThreads extends HttpServlet { private long id = 0; private String message = ""; private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); // it is explicitly small for demonstration purpose private final Thread timer = new Thread(new Runnable() { public void run() { try { while(true) { Thread.sleep(1000); sendKeepAlive(); } } catch(InterruptedException e) { // exit } } }); class RunJob implements Runnable { volatile long lastUpdate = System.nanoTime(); long id = 0; AsyncContext ac; RunJob(AsyncContext ac) { this.ac = ac; } public void keepAlive() { if(System.nanoTime() - lastUpdate > 1000000000L) pool.submit(this); } String formatMessage(String msg) { StringBuilder sb = new StringBuilder(); sb.append("id"); sb.append(id); for(int i=0;i<100000;i++) { sb.append("data:"); sb.append(msg); sb.append("\n"); } sb.append("\n"); return sb.toString(); } public void run() { String message = null; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) { this.id = HugeStreamWithThreads.this.id; message = HugeStreamWithThreads.this.message; } } if(message == null) message = ":keep-alive\n\n"; else message = formatMessage(message); if(!sendMessage(message)) return; boolean once_again = false; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) once_again = true; } if(once_again) pool.submit(this); } boolean sendMessage(String message) { try { ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(message); out.flush(); lastUpdate = System.nanoTime(); return true; } catch(IOException e) { ac.complete(); removeContext(this); return false; } } }; private HashSet<RunJob> asyncContexts = new HashSet<RunJob>(); @Override public void init(ServletConfig config) throws ServletException { super.init(config); timer.start(); } @Override public void destroy() { for(;;){ try { timer.interrupt(); timer.join(); break; } catch(InterruptedException e) { continue; } } pool.shutdown(); super.destroy(); } protected synchronized void removeContext(RunJob ac) { asyncContexts.remove(ac); } // GET method is used to establish a stream connection @Override protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // Content-Type header response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); // Access-Control-Allow-Origin header response.setHeader("Access-Control-Allow-Origin", "*"); final AsyncContext ac = request.startAsync(); ac.setTimeout(0); RunJob job = new RunJob(ac); asyncContexts.add(job); if(id!=0) { pool.submit(job); } } private synchronized void sendKeepAlive() { for(RunJob job : asyncContexts) { job.keepAlive(); } } // POST method is used to communicate with the server @Override protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setCharacterEncoding("utf-8"); id++; message = request.getParameter("m"); for(RunJob job : asyncContexts) { pool.submit(job); } } } 

上面的示例使用线程来防止阻塞…但是,如果阻塞客户端的数量大于线程池的大小,则会阻塞。

如何实现不受阻碍?

我发现Servlet 3.0 Asynchronous API很难实现正确和有用的文档是稀疏的。 经过大量的试验和错误,尝试了许多不同的方法,我find了一个我非常满意的强大解决scheme。 当我查看我的代码并将其与您的代码进行比较时,我注意到一个主要区别可能会帮助您解决特定的问题。 我使用ServletResponse来写入数据而不是ServletOutputStream

在这里,我的转到asynchronousServlet类稍微适应你的some_big_data情况:

 import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebInitParam; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; import org.apache.log4j.Logger; @javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") }) public class AsyncServlet extends HttpServlet { private static final Logger logger = Logger.getLogger(AsyncServlet.class); public static final int CALLBACK_TIMEOUT = 10000; // ms /** executor service */ private ExecutorService exec; @Override public void init(ServletConfig config) throws ServletException { super.init(config); int size = Integer.parseInt(getInitParameter("threadpoolsize")); exec = Executors.newFixedThreadPool(size); } @Override public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { final AsyncContext ctx = req.startAsync(); final HttpSession session = req.getSession(); // set the timeout ctx.setTimeout(CALLBACK_TIMEOUT); // attach listener to respond to lifecycle events of this AsyncContext ctx.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent event) throws IOException { logger.info("onComplete called"); } @Override public void onTimeout(AsyncEvent event) throws IOException { logger.info("onTimeout called"); } @Override public void onError(AsyncEvent event) throws IOException { logger.info("onError called: " + event.toString()); } @Override public void onStartAsync(AsyncEvent event) throws IOException { logger.info("onStartAsync called"); } }); enqueLongRunningTask(ctx, session); } /** * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact) * <p/> * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked). */ private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) { exec.execute(new Runnable() { @Override public void run() { String some_big_data = getSomeBigData(); try { ServletResponse response = ctx.getResponse(); if (response != null) { response.getWriter().write(some_big_data); ctx.complete(); } else { throw new IllegalStateException(); // this is caught below } } catch (IllegalStateException ex) { logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called. } catch (Exception e) { logger.error("ERROR IN AsyncServlet", e); } } }); } /** destroy the executor */ @Override public void destroy() { exec.shutdown(); } } 

在我对这个话题的研究过程中,这个线程一直在popup,所以我想在这里提到它:

Servlet 3.1引入了对ServletInputStreamServletOutputStreamasynchronous操作。 请参阅ServletOutputStream.setWriteListener

一个例子可以在http://docs.oracle.com/javaee/7/tutorial/servlets013.htmfind

我们不能完全导致写入asynchronous。 我们实际上必须忍受这样一个限制,即当我们向客户写信时,我们期望能够迅速做到这一点,如果我们不这样做,就能把它当作错误来对待。 也就是说,如果我们的目标是尽可能快地将数据stream式传输到客户端,并使用通道的阻塞/非阻塞状态作为控制stream量的方式,那我们就倒霉了。 但是,如果我们以低速率发送客户端应该能够处理的数据,那么我们至less能够立即断开读取速度不够快的客户端。

例如,在您的应用程序中,我们以慢速率(每隔几秒)发送Keepalive,并希望客户端能够跟上发送的所有事件。 我们把数据浪费到客户端,如果跟不上,我们可以及时,干净地断开连接。 这比真正的asynchronousI / O更有限,但它应该满足您的需要(顺便说一下,我的)。

诀窍是写出只抛出IOExceptions的输出的所有方法实际上会做得比这更多:在实现中,所有对可以被中断()的事件的调用都将被包装成这样的东西(从docker9):

 catch (InterruptedException x) throw (IOException)new InterruptedIOException().initCause(x); 

(我也注意到,在Jetty 8 中没有发生这种情况,在这种情况下logging了一个InterruptedException,并且立即重试了阻塞循环。假设你确定你的servlet容器使用这个技巧是行得通的。

也就是说,当一个缓慢的客户端导致写入线程阻塞时,我们只需通过调用线程上的interrupt()来强制将该写入抛出为IOException。 想一想:无阻塞的代码会在我们的一个处理线程上占用一个时间单位来执行,所以使用阻塞写操作(在一毫秒之后)就是原则上是相同的。 我们仍然只是在线上咀嚼一小段时间,效率稍低。

我已经修改了代码,以便主计时器线程在开始写入之前运行一个作业来限制每次写入的时间,如果写入快速完成,作业将被取消。

最后一个注意事项:在一个实施良好的servlet容器中,导致I / O抛出应该是安全的。 如果我们可以捕获InterruptedIOException并稍后再尝试写入,那将会很好。 也许我们想给慢速客户端一个事件的子集,如果他们跟不上全部stream。 据我所知,在docker这是不完全安全的。 如果写入抛出,HttpResponse对象的内部状态可能不足以处理稍后安全地重新进入写入。 我希望尝试以这种方式推送一个servlet容器是不明智的,除非有特定的文档,我错过了提供这种保证。 我认为这个想法是,如果发生IOException,连接被devise为closures。

下面是代码,使用一个糟糕的简单插图修改版本的RunJob :: run()(实际上,我们希望在这里使用主计时器线程,而不是每旋转一个傻写)。

 public void run() { String message = null; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) { this.id = HugeStreamWithThreads.this.id; message = HugeStreamWithThreads.this.message; } } if(message == null) message = ":keep-alive\n\n"; else message = formatMessage(message); final Thread curr = Thread.currentThread(); Thread canceller = new Thread(new Runnable() { public void run() { try { Thread.sleep(2000); curr.interrupt(); } catch(InterruptedException e) { // exit } } }); canceller.start(); try { if(!sendMessage(message)) return; } finally { canceller.interrupt(); while (true) { try { canceller.join(); break; } catch (InterruptedException e) { } } } boolean once_again = false; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) once_again = true; } if(once_again) pool.submit(this); } 

spring是你的select吗? Spring-MVC 3.2有一个名为DeferredResult的类,它将优雅地处理“10,000个开放连接/ 10个服务器池线程”场景。

例如: http : //blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support/

我已经快速浏览了您的列表,所以我可能错过了一些观点。 池线程的优点是随着时间的推移在几个任务之间共享线程资源。 也许你可以通过不同的http连接的keepAlive响应来解决你的问题,而不是同时分组所有的。