La Description Du Problème
Servlet-API 3.0 permet de détacher une demande/réponse, le contexte et la réponse à plus tard.
Cependant, si j'essaie d'écrire une grande quantité de données, quelque chose comme:
AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()
Il peut effectivement bloquer et il ne bloque au trivial cas de test pour les deux Tomcat 7 et Jetée 8. Les tutoriels vous recommandons de créer un pool de threads qui serait gérer une telle installation - sorcière est généralement la contre-positif à un traditionnel 10K architecture.
Cependant si j'ai 10 000 connexions ouvertes et un pool de threads de, disons, 10 fils, c'est assez pour que même 1% des clients qui ont des connexions à vitesse réduite ou tout simplement bloqué connexion pour bloquer le pool de threads et de bloquer complètement la comète de réponse ou ralentir de manière significative.
La pratique courante est d'obtenir de l'écriture "-prêt" notification d'e/S ou de notification de fin et que de continuer à repousser les données.
Comment cela peut-il être fait à l'aide de Servlet-API 3.0, c'est à dire comment puis-je obtenir soit:
- Asynchrone de notification de fin sur les I/O de l'opération.
- Obtenir des non-blocage I/S en écriture prêt de notification.
Si cela n'est pas pris en charge par le Servlet-API 3.0, il n'existe aucun Serveur Web Api spécifiques (comme la Jetée de la Poursuite ou de Tomcat CometEvent) qui permettent de manipuler de tels événements vraiment de manière asynchrone sans truquer les e/S asynchrones à l'aide de pool de threads.
Quelqu'un sait?
Et si ce n'est pas possible, pouvez-vous confirmer par une référence à la documentation?
Problème de démonstration dans un exemple de code
J'avais joint le code ci-dessous qui émule événement-stream.
Notes:
- il utilise
ServletOutputStream
qui jetteIOException
pour détecter les clients déconnectés - il envoie
keep-alive
des messages assurez-vous que les clients sont toujours là - J'ai créé un pool de threads pour "émuler" les opérations asynchrones.
Dans un tel exemple, j'ai défini explicitement pool de threads de taille 1 pour montrer le problème:
- Démarrer une application
- Exécuter à partir de deux terminaux
curl http://localhost:8080/path/to/app
(deux fois) - Maintenant envoyer les données avec
curd -d m=message http://localhost:8080/path/to/app
- Les deux clients ont reçu les données
- Maintenant suspendre l'un des clients (Ctrl+Z) et d'envoyer le message, une fois encore,
curd -d m=message http://localhost:8080/path/to/app
- Observer qu'un autre non-suspendu client soit rien reçu, ou après que le message a été transféré cessé de recevoir keep-alive demandes parce que les autres thread est bloqué.
Je veux résoudre ce problème sans l'aide de pool de threads, car avec 1000 à 5000 ouvrir connexions je peut épuiser le pool de threads très rapide.
L'exemple de code ci-dessous.
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;
@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {
private long id = 0;
private String message = "";
private final ThreadPoolExecutor pool =
new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
// it is explicitly small for demonstration purpose
private final Thread timer = new Thread(new Runnable() {
public void run()
{
try {
while(true) {
Thread.sleep(1000);
sendKeepAlive();
}
}
catch(InterruptedException e) {
// exit
}
}
});
class RunJob implements Runnable {
volatile long lastUpdate = System.nanoTime();
long id = 0;
AsyncContext ac;
RunJob(AsyncContext ac)
{
this.ac = ac;
}
public void keepAlive()
{
if(System.nanoTime() - lastUpdate > 1000000000L)
pool.submit(this);
}
String formatMessage(String msg)
{
StringBuilder sb = new StringBuilder();
sb.append("id");
sb.append(id);
for(int i=0;i<100000;i++) {
sb.append("data:");
sb.append(msg);
sb.append("\n");
}
sb.append("\n");
return sb.toString();
}
public void run()
{
String message = null;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id) {
this.id = HugeStreamWithThreads.this.id;
message = HugeStreamWithThreads.this.message;
}
}
if(message == null)
message = ":keep-alive\n\n";
else
message = formatMessage(message);
if(!sendMessage(message))
return;
boolean once_again = false;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id)
once_again = true;
}
if(once_again)
pool.submit(this);
}
boolean sendMessage(String message)
{
try {
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(message);
out.flush();
lastUpdate = System.nanoTime();
return true;
}
catch(IOException e) {
ac.complete();
removeContext(this);
return false;
}
}
};
private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();
@Override
public void init(ServletConfig config) throws ServletException
{
super.init(config);
timer.start();
}
@Override
public void destroy()
{
for(;;){
try {
timer.interrupt();
timer.join();
break;
}
catch(InterruptedException e) {
continue;
}
}
pool.shutdown();
super.destroy();
}
protected synchronized void removeContext(RunJob ac)
{
asyncContexts.remove(ac);
}
// GET method is used to establish a stream connection
@Override
protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// Content-Type header
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
// Access-Control-Allow-Origin header
response.setHeader("Access-Control-Allow-Origin", "*");
final AsyncContext ac = request.startAsync();
ac.setTimeout(0);
RunJob job = new RunJob(ac);
asyncContexts.add(job);
if(id!=0) {
pool.submit(job);
}
}
private synchronized void sendKeepAlive()
{
for(RunJob job : asyncContexts) {
job.keepAlive();
}
}
// POST method is used to communicate with the server
@Override
protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
{
request.setCharacterEncoding("utf-8");
id++;
message = request.getParameter("m");
for(RunJob job : asyncContexts) {
pool.submit(job);
}
}
}
L'exemple ci-dessus utilise des threads, pour éviter le blocage... Cependant, si le nombre de blocage de clients est plus grande que la taille du pool de threads, il pourrait bloquer.
Comment pourrait-il être mis en œuvre sans blocage?