50 votes

Servlet-3 Async Context, comment faire des écritures asynchrones?

La Description Du Problème

Servlet-API 3.0 permet de détacher une demande/réponse, le contexte et la réponse à plus tard.

Cependant, si j'essaie d'écrire une grande quantité de données, quelque chose comme:

AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()

Il peut effectivement bloquer et il ne bloque au trivial cas de test pour les deux Tomcat 7 et Jetée 8. Les tutoriels vous recommandons de créer un pool de threads qui serait gérer une telle installation - sorcière est généralement la contre-positif à un traditionnel 10K architecture.

Cependant si j'ai 10 000 connexions ouvertes et un pool de threads de, disons, 10 fils, c'est assez pour que même 1% des clients qui ont des connexions à vitesse réduite ou tout simplement bloqué connexion pour bloquer le pool de threads et de bloquer complètement la comète de réponse ou ralentir de manière significative.

La pratique courante est d'obtenir de l'écriture "-prêt" notification d'e/S ou de notification de fin et que de continuer à repousser les données.

Comment cela peut-il être fait à l'aide de Servlet-API 3.0, c'est à dire comment puis-je obtenir soit:

  • Asynchrone de notification de fin sur les I/O de l'opération.
  • Obtenir des non-blocage I/S en écriture prêt de notification.

Si cela n'est pas pris en charge par le Servlet-API 3.0, il n'existe aucun Serveur Web Api spécifiques (comme la Jetée de la Poursuite ou de Tomcat CometEvent) qui permettent de manipuler de tels événements vraiment de manière asynchrone sans truquer les e/S asynchrones à l'aide de pool de threads.

Quelqu'un sait?

Et si ce n'est pas possible, pouvez-vous confirmer par une référence à la documentation?

Problème de démonstration dans un exemple de code

J'avais joint le code ci-dessous qui émule événement-stream.

Notes:

  • il utilise ServletOutputStream qui jette IOException pour détecter les clients déconnectés
  • il envoie keep-alive des messages assurez-vous que les clients sont toujours là
  • J'ai créé un pool de threads pour "émuler" les opérations asynchrones.

Dans un tel exemple, j'ai défini explicitement pool de threads de taille 1 pour montrer le problème:

  • Démarrer une application
  • Exécuter à partir de deux terminaux curl http://localhost:8080/path/to/app (deux fois)
  • Maintenant envoyer les données avec curd -d m=message http://localhost:8080/path/to/app
  • Les deux clients ont reçu les données
  • Maintenant suspendre l'un des clients (Ctrl+Z) et d'envoyer le message, une fois encore, curd -d m=message http://localhost:8080/path/to/app
  • Observer qu'un autre non-suspendu client soit rien reçu, ou après que le message a été transféré cessé de recevoir keep-alive demandes parce que les autres thread est bloqué.

Je veux résoudre ce problème sans l'aide de pool de threads, car avec 1000 à 5000 ouvrir connexions je peut épuiser le pool de threads très rapide.

L'exemple de code ci-dessous.


import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;


@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {

    private long id = 0;
    private String message = "";
    private final ThreadPoolExecutor pool = 
        new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
        // it is explicitly small for demonstration purpose

    private final Thread timer = new Thread(new Runnable() {
        public void run()
        {
            try {
                while(true) {
                    Thread.sleep(1000);
                    sendKeepAlive();
                }
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });


    class RunJob implements Runnable {
        volatile long lastUpdate = System.nanoTime();
        long id = 0;
        AsyncContext ac;
        RunJob(AsyncContext ac) 
        {
            this.ac = ac;
        }
        public void keepAlive()
        {
            if(System.nanoTime() - lastUpdate > 1000000000L)
                pool.submit(this);
        }
        String formatMessage(String msg)
        {
            StringBuilder sb = new StringBuilder();
            sb.append("id");
            sb.append(id);
            for(int i=0;i<100000;i++) {
                sb.append("data:");
                sb.append(msg);
                sb.append("\n");
            }
            sb.append("\n");
            return sb.toString();
        }
        public void run()
        {
            String message = null;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id) {
                    this.id = HugeStreamWithThreads.this.id;
                    message = HugeStreamWithThreads.this.message;
                }
            }
            if(message == null)
                message = ":keep-alive\n\n";
            else
                message = formatMessage(message);

            if(!sendMessage(message))
                return;

            boolean once_again = false;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id)
                    once_again = true;
            }
            if(once_again)
                pool.submit(this);

        }
        boolean sendMessage(String message) 
        {
            try {
                ServletOutputStream out = ac.getResponse().getOutputStream();
                out.print(message);
                out.flush();
                lastUpdate = System.nanoTime();
                return true;
            }
            catch(IOException e) {
                ac.complete();
                removeContext(this);
                return false;
            }
        }
    };

    private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();

    @Override
    public void init(ServletConfig config) throws ServletException
    {
        super.init(config);
        timer.start();
    }
    @Override
    public void destroy()
    {
        for(;;){
            try {
                timer.interrupt();
                timer.join();
                break;
            }
            catch(InterruptedException e) {
                continue;
            }
        }
        pool.shutdown();
        super.destroy();
    }


    protected synchronized void removeContext(RunJob ac)
    {
        asyncContexts.remove(ac);
    }

    // GET method is used to establish a stream connection
    @Override
    protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        // Content-Type header
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        // Access-Control-Allow-Origin header
        response.setHeader("Access-Control-Allow-Origin", "*");

        final AsyncContext ac = request.startAsync();

        ac.setTimeout(0);
        RunJob job = new RunJob(ac);
        asyncContexts.add(job);
        if(id!=0) {
            pool.submit(job);
        }
    }

    private synchronized void sendKeepAlive()
    {
        for(RunJob job : asyncContexts) {
            job.keepAlive();
        }
    }

    // POST method is used to communicate with the server
    @Override
    protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException 
    {
        request.setCharacterEncoding("utf-8");
        id++;
        message = request.getParameter("m");        
        for(RunJob job : asyncContexts) {
            pool.submit(job);
        }
    }


}

L'exemple ci-dessus utilise des threads, pour éviter le blocage... Cependant, si le nombre de blocage de clients est plus grande que la taille du pool de threads, il pourrait bloquer.

Comment pourrait-il être mis en œuvre sans blocage?

29voto

herrtim Points 973

J'ai trouvé l' Servlet 3.0 Asynchronous API difficile à mettre en œuvre correctement et de la documentation utile d'être rares. Après beaucoup d'essais et d'erreurs et d'essayer différentes approches, j'ai été en mesure de trouver une solution robuste que j'ai été très heureux avec. Quand je regarde mon code, et de la comparer à la vôtre, je remarque une différence importante qui peut vous aider avec votre problème particulier. J'utilise un ServletResponse pour écrire des données et non pas un ServletOutputStream.

Ici mon Asynchrone classe de Servlet adapté légèrement pour votre some_big_data cas:

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.apache.log4j.Logger;

@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {

  private static final Logger logger = Logger.getLogger(AsyncServlet.class);

  public static final int CALLBACK_TIMEOUT = 10000; // ms

  /** executor service */
  private ExecutorService exec;

  @Override
  public void init(ServletConfig config) throws ServletException {

    super.init(config);
    int size = Integer.parseInt(getInitParameter("threadpoolsize"));
    exec = Executors.newFixedThreadPool(size);
  }

  @Override
  public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {

    final AsyncContext ctx = req.startAsync();
    final HttpSession session = req.getSession();

    // set the timeout
    ctx.setTimeout(CALLBACK_TIMEOUT);

    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {

      @Override
      public void onComplete(AsyncEvent event) throws IOException {

        logger.info("onComplete called");
      }

      @Override
      public void onTimeout(AsyncEvent event) throws IOException {

        logger.info("onTimeout called");
      }

      @Override
      public void onError(AsyncEvent event) throws IOException {

        logger.info("onError called: " + event.toString());
      }

      @Override
      public void onStartAsync(AsyncEvent event) throws IOException {

        logger.info("onStartAsync called");
      }
    });

    enqueLongRunningTask(ctx, session);
  }

  /**
   * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
   * <p/>
   * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
   */
  private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {

    exec.execute(new Runnable() {

      @Override
      public void run() {

        String some_big_data = getSomeBigData();

        try {

          ServletResponse response = ctx.getResponse();
          if (response != null) {
            response.getWriter().write(some_big_data);
            ctx.complete();
          } else {
            throw new IllegalStateException(); // this is caught below
          }
        } catch (IllegalStateException ex) {
          logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
        } catch (Exception e) {
          logger.error("ERROR IN AsyncServlet", e);
        }
      }
    });
  }

  /** destroy the executor */
  @Override
  public void destroy() {

    exec.shutdown();
  }
}

10voto

Erich Eichinger Points 838

Au cours de mes recherches sur ce sujet, ce fil traîne partout, donc pensé que je le mentionne ici:

Servlet 3.1 introduit des opérations asynchrones sur ServletInputStream et ServletOutputStream. Voir ServletOutputStream.setWriteListener.

Un exemple peut être trouvé à http://docs.oracle.com/javaee/7/tutorial/doc/servlets013.htm

3voto

Itcutives Points 333

3voto

Nicholas Wilson Points 4464

Nous ne pouvons pas tout à fait la cause de l'écrit asynchrone. Nous avons réellement avoir à vivre avec la restriction que lorsque nous faisons écrire quelque chose à un client, nous nous attendons à être en mesure de le faire rapidement et sont en mesure de la traiter comme une erreur si nous n'avons pas. C'est, si notre objectif est de flux de données pour le client aussi vite que possible et d'utiliser les bloquant ou non bloquant le statut de la chaîne comme un moyen de contrôler le flux, nous sommes hors de la chance. Mais, si nous sommes d'envoyer des données à un taux faible qu'un client doit être en mesure de gérer, nous sommes en mesure, à moins de débrancher rapidement clients qui n'ont pas de lire assez rapidement.

Par exemple, dans votre demande, nous vous envoyons le keepalives à un lent-ish taux (quelques secondes) et s'attendent les clients pour être en mesure de suivre avec tous les événements qui ils sont envoyés. Nous faire des folies les données au client, et s'il ne peut pas le garder, on peut le déconnecter rapidement et proprement. C'est un peu plus limitée que le vrai asynchronous I/O, mais il devrait répondre à votre besoin (et d'ailleurs, le mien).

Le truc, c'est que toutes les méthodes de l'écriture de sortie, qui vient de se jeter IOExceptions réellement faire un peu plus que cela: dans la mise en œuvre, tous les appels à des choses qui peuvent être interrupt()ed sera emballé avec quelque chose comme ceci (prises à partir de la Jetée 9):

catch (InterruptedException x)
    throw (IOException)new InterruptedIOException().initCause(x);

(Je note aussi que ce n'est pas se produire dans la Jetée de 8, où une InterruptedException est connecté et la boucle de blocage est immédiatement tentée à nouveau. Sans doute vous faire pour vous assurer que votre conteneur de servlet est bien comporté à utiliser cette astuce.)

C'est, quand un client lente provoque un thread d'écriture de bloc, nous avons tout simplement la force de l'écrire pour être jetés comme une IOException par l'appel de l'interruption() sur le fil. Pensez-y: le non-blocage du code de la consommer une unité de temps sur l'un de nos threads de traitement pour exécuter de toute façon, à l'aide de blocage de l'écrit qui sont tout simplement abandonnées (après les dire de la milliseconde) est vraiment identique dans son principe. Il suffit de nous mâcher un court laps de temps sur le fil, seulement légèrement moins efficace.

J'ai modifié ton code pour que le timer thread exécute un travail pour limiter le temps dans chaque écris juste avant de commencer à l'écrire, et le travail est annulé si l'écriture est terminée rapidement, qu'il le devrait.

Une dernière remarque: dans un bien mis en œuvre conteneur de servlet, provoquant l'I/O de jeter devrait être en sécurité. Ce serait bien si on pouvait attraper le InterruptedIOException et essayer de l'écrire à nouveau plus tard. Peut-être que nous aimerions donner lent clients un sous-ensemble des événements s'ils ne peuvent pas suivre avec le jet. Aussi loin que je peux dire, de la Jetée, ce n'est pas tout à fait sûr. Si une écriture jette, l'état interne de l'objet HttpResponse pourrait ne pas être assez cohérent pour gérer re-entrer dans l'écriture en toute sécurité plus tard. J'attends qu'il n'est pas sage d'essayer de pousser un conteneur de servlet de cette façon, à moins qu'il y a des docs que j'ai manqué offre cette garantie. Je pense que l'idée est que la connexion est conçu pour être fermé si une IOException qui se passe.

Voici le code, avec une version modifiée de RunJob::run() à l'aide d'un grotty simple illustration (en réalité, nous souhaitons utiliser le main thread horloge ici plutôt que de faire tourner l'un par l'écriture qui est stupide).

public void run()
{
    String message = null;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id) {
            this.id = HugeStreamWithThreads.this.id;
            message = HugeStreamWithThreads.this.message;
        }
    }
    if(message == null)
        message = ":keep-alive\n\n";
    else
        message = formatMessage(message);

    final Thread curr = Thread.currentThread();
    Thread canceller = new Thread(new Runnable() {
        public void run()
        {
            try {
                Thread.sleep(2000);
                curr.interrupt();
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });
    canceller.start();

    try {
        if(!sendMessage(message))
            return;
    } finally {
        canceller.interrupt();
        while (true) {
            try { canceller.join(); break; }
            catch (InterruptedException e) { }
        }
    }

    boolean once_again = false;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id)
            once_again = true;
    }
    if(once_again)
        pool.submit(this);

}

2voto

JJ Zabkar Points 510

Le printemps est-il une option pour vous? Spring-MVC 3.2 possède une classe appelée DeferredResult , qui gérera gracieusement votre scénario "10 000 connexions ouvertes / 10 threads de pool de serveurs".

Exemple: http://blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support/

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X