-1 votes

Fil sans fin aléatoire et divers bogues, pendant la lecture/écriture parallèle ligne par ligne.

Je veux implémenter une lecture-traitement-écriture parallèle ligne par ligne basée sur boost::thread, mais la version actuelle a un comportement indéfini : le test suivant lit un fichier CSV en remplissant la file d'attente de lecture (concurrente), qui est simplement transféré dans la file d'attente d'écriture pour être écrit dans un fichier de sortie (aucun traitement pour l'instant).

Problèmes rencontrés :

  • Sous Windows et Unix, le programme ne se termine jamais (~3/5 fois) et génère un SIGSEGV (~1/100).
  • Sous Unix, il existe de nombreuses erreurs : SIGABRT à la création du thread, "memory cloberred before allocated block" (-> SIGABRT aussi) après la création, aléatoirement entre 1 et ~15 lignes.

Je DÉTESTE de donner les problèmes et les codes et de laisser les autres répondre (je suis parfois de votre côté des sujets), mais croyez-moi, je ne peux pas penser à autre chose pour le corriger (gérer les fils, le débogage est un cauchemar), donc je m'excuse par avance. Voici :

Main.cpp :

#include "io.hpp"

#include <iostream>

int main(int argc, char *argv[]) {
    CSV::Reader reader;
    CSV::Writer writer;

    if(reader.open("test_grandeur_nature.csv") && writer.open("output.txt")) {
        CSV::Row row;

        reader.run(); //Reads the CSV file and fills the read queue
        writer.run(); //Reads the to-be-written queue and writes it to a txt file

        //The loop is supposed to end only if the reader is finished and empty
        while(!(reader.is_finished() && reader.empty())) {
            //Transfers line by line from the read to the to-be-written queues
            reader.wait_and_pop(row);
            writer.push(row);
        }
        //The reader will likely finish before the writer, so he has to finish his queue before continuing.
        writer.finish(); 
    }
    else {
        std::cout << "File error";
    }

    return EXIT_SUCCESS;
}

Io.hpp :

#ifndef IO_H_INCLUDED
#define IO_H_INCLUDED

#include "threads.hpp"

#include <fstream>

namespace CSV {
    class Row {
        std::vector<std::string> m_data;

        friend class Iterator;
        friend void write_row(Row const &row, std::ostream &stream);

        void read_next(std::istream& csv);

        public:
            inline std::string const& operator[](std::size_t index) const {
                return m_data[index];
            }
            inline std::size_t size() const {
                return m_data.size();
            }
    };

    /** Reading *************************************************************************/

    class Iterator {
        public:
            Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) {
                ++(*this);
            }
            Iterator() : m_csv(NULL) {}

            //Pre-Increment
            Iterator& operator++() {
                if (m_csv != NULL) {
                    m_row.read_next(*m_csv);
                    m_csv = m_csv->good() ? m_csv : NULL;
                }

                return *this;
            }
            inline Row const& operator*() const {
                return m_row;
            }

            inline bool operator==(Iterator const& rhs) {
                return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL)));
            }
            inline bool operator!=(Iterator const& rhs) {
                return !((*this) == rhs);
            }
        private:
            std::istream* m_csv;
            Row m_row;
    };

    class Reader : public Concurrent_queue<Row>, public Thread {
        std::ifstream m_csv;

        Thread_safe_value<bool> m_finished;

        void work() {
            if(!!m_csv) {
                for(Iterator it(m_csv) ; it != Iterator() ; ++it) {
                    push(*it);
                }
                m_finished.set(true);
            }
        }

    public:
        Reader() {
            m_finished.set(false);
        }

        inline bool open(std::string path) {
            m_csv.open(path.c_str());

            return !!m_csv;
        }

        inline bool is_finished() {
            return m_finished.get();
        }
    };

    /** Writing ***************************************************************************/

    void write_row(Row const &row, std::ostream &stream);

    //Is m_finishing really thread-safe ? By the way, is it mandatory ?
    class Writer : public Concurrent_queue<Row>, public Thread {
        std::ofstream m_csv;

        Thread_safe_value<bool> m_finishing;

        void work() {
            if(!!m_csv) {
                CSV::Row row;

                while(!(m_finishing.get() && empty())) {
                    wait_and_pop(row);
                    write_row(row, m_csv);
                }
            }
        }

    public:
        Writer() {
            m_finishing.set(false);
        }

        inline void finish() {
            m_finishing.set(true);
            catch_up();
        }

        inline bool open(std::string path) {
            m_csv.open(path.c_str());

            return !!m_csv;
        }
    };
}

#endif

Io.cpp :

#include "io.hpp"

#include <boost/bind.hpp>
#include <boost/tokenizer.hpp>

void CSV::Row::read_next(std::istream& csv) {
    std::string row;
    std::getline(csv, row);

    boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"'));
    m_data.assign(tokenizer.begin(), tokenizer.end());
}

void CSV::write_row(Row const &row, std::ostream &stream) {
    std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
    stream << std::endl;
}

Threads.hpp :

#ifndef THREADS_HPP_INCLUDED
#define THREADS_HPP_INCLUDED

#include <boost/bind.hpp>
#include <boost/thread.hpp>

class Thread {
protected:
    boost::thread *m_thread;

    virtual void work() = 0;

    void do_work() {
        work();
    }

public:
    Thread() : m_thread(NULL) {}
    virtual ~Thread() {
        catch_up();
        if(m_thread != NULL) {
            delete m_thread;
        }
    }

    inline void catch_up() {
        if(m_thread != NULL) {
            m_thread->join();
        }
    }

    void run() {
        m_thread = new boost::thread(boost::bind(&Thread::do_work, boost::ref(*this)));
    }
};

/** Thread-safe datas **********************************************************/

#include <queue>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

template <class T>
class Thread_safe_value : public boost::noncopyable {
    T m_value;
    boost::mutex m_mutex;

    public:
        T const &get() {
            boost::mutex::scoped_lock lock(m_mutex);
            return m_value;
        }
        void set(T const &value) {
            boost::mutex::scoped_lock lock(m_mutex);
            m_value = value;
        }
};

template<typename Data>
class Concurrent_queue {
    std::queue<Data> m_queue;
    mutable boost::mutex m_mutex;
    boost::condition_variable m_cond;

public:
    void push(Data const& data) {
        boost::mutex::scoped_lock lock(m_mutex);
        m_queue.push(data);
        lock.unlock();
        m_cond.notify_one();
    }

    bool empty() const {
        boost::mutex::scoped_lock lock(m_mutex);
        return m_queue.empty();
    }

    void wait_and_pop(Data& popped) {
        boost::mutex::scoped_lock lock(m_mutex);
        while(m_queue.empty()) {
            m_cond.wait(lock);
        }

        popped = m_queue.front();
        m_queue.pop();
    }
};

#endif // THREAD_HPP_INCLUDED

Ce projet est important et j'apprécierais vraiment si vous pouviez m'aider =)

Merci d'avance.

Regards,

Monsieur Mystère.

1voto

Anthony Williams Points 28904

Vous avez une erreur dans votre logique d'achèvement.

El main() est en train de lire la dernière entrée de la file d'attente, et de bloquer en attendant la suivante avant que la boucle de la m_finished est activé.

Si vous collez une attente importante avant l'appel à m_finished.set(true) (par exemple sleep(5) sur linux ou Sleep(5000) sous Windows pour une attente de 5 secondes), votre code se bloquera à chaque fois.

(Ceci n'aborde pas les défauts de segmentation ou les erreurs d'allocation de mémoire, qui sont probablement quelque chose d'autre).

L'exécution problématique se déroule comme suit :

  1. Le lecteur lit le dernier élément du fichier et le place dans la file d'attente.
  2. Le fil principal extrait le dernier élément de la file d'attente.
  3. Le thread principal pousse le dernier élément dans la file d'attente pour le thread écrivain.
  4. le fil principal fait des boucles en rond ; m_finished n'est pas défini, il appelle donc wait_and_pop .
  5. Le lecteur réalise qu'il est à la fin du fichier et définit m_finished .
  6. Le thread principal est maintenant bloqué en attendant un autre élément dans la file d'attente du lecteur, mais le lecteur ne le fournira pas.

L'appel au sommeil force cet ordre d'événements en mettant un grand délai entre les étapes 1 et 5 sur le thread du lecteur, de sorte que le thread principal a tout le loisir d'effectuer les étapes 2-4. Il s'agit d'une technique de débogage utile pour les conditions de course.

-1voto

SoapBox Points 14183

Après une lecture rapide, le seul problème évident que j'ai repéré, et qui pourrait probablement expliquer une partie (mais peut-être pas la totalité) de vos problèmes, est que vous ne signalez pas correctement votre état dans la section Concurrent_queue::push .

Chaque fois que vous vous retrouvez à appeler unlock() sur un mutex scopé devrait vous indiquer que quelque chose ne va pas. L'un des principaux avantages de l'utilisation des mutex scopés est que les verrouillages et déverrouillages sont implicites lorsque l'objet entre ou sort de la portée. Si vous avez besoin d'un déverrouillage, vous devrez peut-être restructurer votre code.

Dans ce cas, cependant, vous n'avez pas besoin de restructurer le code. Dans ce cas, le déverrouillage est tout simplement faux. Quand une condition est signalée, le mutex doit être verrouillé. Il est déverrouillé après que le signal ait eu lieu. Vous pouvez donc remplacer unlock par ce code :

void push(Data const& data) {
    boost::mutex::scoped_lock lock(m_mutex);
    m_queue.push(data);
    m_cond.notify_one();
}

Ce qui déverrouillera le mutex au retour de la fonction, après que la condition ait été signalée.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X