4 votes

comment spacy-io utilise-t-il le multi threading sans GIL ?

Référence ce poste NLP multithreadé avec Spacy pipe qui en parle,

et ici de https://spacy.io/

from spacy.attrs import *
# All strings mapped to integers, for easy export to numpy
np_array = doc.to_array([LOWER, POS, ENT_TYPE, IS_ALPHA])

from reddit_corpus import RedditComments
reddit = RedditComments('/path/to/reddit/corpus')
# Parse a stream of documents, with multi-threading (no GIL!)
# Processes over 100,000 tokens per second.
for doc in nlp.pipe(reddit.texts, batch_size=10000, n_threads=4):
    # Multi-word expressions, such as names, dates etc
    # can be merged into single tokens
    for ent in doc.ents:
        ent.merge(ent.root.tag_, ent.text, ent.ent_type_)
    # Efficient, lossless serialization --- all annotations
    # saved, same size as uncompressed text
    byte_string = doc.to_bytes()

11voto

syllogism_ Points 16

Je dois rédiger un article de blog sur ce sujet. En résumé, spaCy est implémenté en Cython, un langage similaire à Python qui se transpose en C ou C++, et produit finalement une extension Python. Vous pouvez en savoir plus sur la publication du GIL avec Cython ici :

http://docs.cython.org/src/userguide/parallelism.html

Voici l'implémentation de la méthode .pipe dans spaCy :

https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/parser.pyx#L135

def pipe(self, stream, int batch_size=1000, int n_threads=2):
    cdef Pool mem = Pool()
    cdef TokenC** doc_ptr = <TokenC**>mem.alloc(batch_size, sizeof(TokenC*))
    cdef int* lengths = <int*>mem.alloc(batch_size, sizeof(int))
    cdef Doc doc
    cdef int i
    cdef int nr_class = self.moves.n_moves
    cdef int nr_feat = self.model.nr_feat
    cdef int status
    queue = []
    for doc in stream:
        doc_ptr[len(queue)] = doc.c
        lengths[len(queue)] = doc.length
        queue.append(doc)
        if len(queue) == batch_size:
            with nogil:
                for i in cython.parallel.prange(batch_size, num_threads=n_threads):
                    status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
                    if status != 0:
                        with gil:
                            sent_str = queue[i].text
                            raise ValueError("Error parsing doc: %s" % sent_str)
            PyErr_CheckSignals()
            for doc in queue:
                self.moves.finalize_doc(doc)
                yield doc
            queue = []
    batch_size = len(queue)
    with nogil:
        for i in cython.parallel.prange(batch_size, num_threads=n_threads):
            status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
            if status != 0:
                with gil:
                    sent_str = queue[i].text
                    raise ValueError("Error parsing doc: %s" % sent_str)
    PyErr_CheckSignals()
    for doc in queue:
        self.moves.finalize_doc(doc)
        yield doc

La mécanique réelle du multithreading est très simple, parce que le NLP est (souvent) embarrassant parallèle --- chaque document est analysé indépendamment, donc nous avons juste besoin de faire une boucle prange sur un flux de textes.

La mise en œuvre de l'analyseur syntaxique de manière multithread a été assez difficile, cependant. Pour utiliser efficacement le multithreading, il faut libérer la GIL et ne pas la réacquérir. Cela signifie ne pas utiliser d'objets Python, ne pas lever d'exceptions, etc.

Lorsque vous créez un objet Python --- disons une liste --- vous devez incrémenter son nombre de références, qui est stocké globalement. Cela signifie qu'il faut acquérir le GIL. Il n'y a aucun moyen de contourner cela. Mais si vous êtes dans une extension C et que vous voulez juste, disons, mettre un entier sur la pile, ou faire un appel à malloc ou free, vous n'avez pas besoin d'acquérir la GIL. Donc, si vous écrivez le programme à ce niveau, en utilisant uniquement des constructions C et C++, vous pouvez libérer la GIL.

J'écris des analyseurs statistiques en Cython depuis quelques années maintenant. (Avant spaCy, j'avais une implémentation pour mes recherches universitaires.) Obtenir l'écriture de la boucle d'analyse complète sans le GIL était difficile. Fin 2015, j'avais l'apprentissage automatique, la table de hachage, la boucle d'analyse externe et la plupart de l'extraction de caractéristiques sous forme de code nogil. Mais l'objet state avait une interface compliquée, et était implémenté comme une classe cdef. Je ne pouvais pas créer cet objet ou le stocker dans un conteneur sans acquérir le GIL.

La percée s'est produite lorsque j'ai trouvé un moyen non documenté d'écrire une classe C++ en Cython. Cela m'a permis d'évider la classe cdef existante qui contrôlait l'état de l'analyseur. J'ai transféré son interface à la classe C++ interne, méthode par méthode. De cette façon, je pouvais continuer à faire fonctionner le code et m'assurer que je n'introduisais pas de bogues subtils dans le calcul des fonctionnalités.

Vous pouvez voir la classe intérieure ici : https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/_state.pxd

Si vous naviguez dans l'historique git de ce fichier, vous pouvez voir les patchs où j'ai implémenté la méthode .pipe.

3voto

Dunes Points 6740

On peut supposer qu'il effectue l'analyse syntaxique au niveau C plutôt qu'au niveau python. Une fois que vous êtes passé au niveau C, si vous n'avez pas besoin d'accéder à des objets python, vous pouvez libérer le GIL en toute sécurité. Au plus bas niveau de lecture et d'écriture, CPython libère également le GIL. Le raisonnement est le suivant : si d'autres threads sont en cours d'exécution et que nous sommes sur le point d'appeler une fonction C bloquante, nous devons libérer le GIL pour la durée de l'appel de la fonction.

Vous pouvez voir cela en action dans l'implémentation la plus basse de CPython de écrire .

    if (gil_held) {
        do {
            Py_BEGIN_ALLOW_THREADS
            errno = 0;
#ifdef MS_WINDOWS
            n = write(fd, buf, (int)count);
#else
            n = write(fd, buf, count);
#endif
            /* save/restore errno because PyErr_CheckSignals()
             * and PyErr_SetFromErrno() can modify it */
            err = errno;
            Py_END_ALLOW_THREADS
        } while (n < 0 && err == EINTR &&
                !(async_err = PyErr_CheckSignals()));

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X