Je veux implémenter une lecture-traitement-écriture parallèle ligne par ligne basée sur boost::thread, mais la version actuelle a un comportement indéfini : le test suivant lit un fichier CSV en remplissant la file d'attente de lecture (concurrente), qui est simplement transféré dans la file d'attente d'écriture pour être écrit dans un fichier de sortie (aucun traitement pour l'instant).
Problèmes rencontrés :
- Sous Windows et Unix, le programme ne se termine jamais (~3/5 fois) et génère un SIGSEGV (~1/100).
- Sous Unix, il existe de nombreuses erreurs : SIGABRT à la création du thread, "memory cloberred before allocated block" (-> SIGABRT aussi) après la création, aléatoirement entre 1 et ~15 lignes.
Je DÉTESTE de donner les problèmes et les codes et de laisser les autres répondre (je suis parfois de votre côté des sujets), mais croyez-moi, je ne peux pas penser à autre chose pour le corriger (gérer les fils, le débogage est un cauchemar), donc je m'excuse par avance. Voici :
Main.cpp :
#include "io.hpp"
#include <iostream>
int main(int argc, char *argv[]) {
CSV::Reader reader;
CSV::Writer writer;
if(reader.open("test_grandeur_nature.csv") && writer.open("output.txt")) {
CSV::Row row;
reader.run(); //Reads the CSV file and fills the read queue
writer.run(); //Reads the to-be-written queue and writes it to a txt file
//The loop is supposed to end only if the reader is finished and empty
while(!(reader.is_finished() && reader.empty())) {
//Transfers line by line from the read to the to-be-written queues
reader.wait_and_pop(row);
writer.push(row);
}
//The reader will likely finish before the writer, so he has to finish his queue before continuing.
writer.finish();
}
else {
std::cout << "File error";
}
return EXIT_SUCCESS;
}
Io.hpp :
#ifndef IO_H_INCLUDED
#define IO_H_INCLUDED
#include "threads.hpp"
#include <fstream>
namespace CSV {
class Row {
std::vector<std::string> m_data;
friend class Iterator;
friend void write_row(Row const &row, std::ostream &stream);
void read_next(std::istream& csv);
public:
inline std::string const& operator[](std::size_t index) const {
return m_data[index];
}
inline std::size_t size() const {
return m_data.size();
}
};
/** Reading *************************************************************************/
class Iterator {
public:
Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) {
++(*this);
}
Iterator() : m_csv(NULL) {}
//Pre-Increment
Iterator& operator++() {
if (m_csv != NULL) {
m_row.read_next(*m_csv);
m_csv = m_csv->good() ? m_csv : NULL;
}
return *this;
}
inline Row const& operator*() const {
return m_row;
}
inline bool operator==(Iterator const& rhs) {
return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL)));
}
inline bool operator!=(Iterator const& rhs) {
return !((*this) == rhs);
}
private:
std::istream* m_csv;
Row m_row;
};
class Reader : public Concurrent_queue<Row>, public Thread {
std::ifstream m_csv;
Thread_safe_value<bool> m_finished;
void work() {
if(!!m_csv) {
for(Iterator it(m_csv) ; it != Iterator() ; ++it) {
push(*it);
}
m_finished.set(true);
}
}
public:
Reader() {
m_finished.set(false);
}
inline bool open(std::string path) {
m_csv.open(path.c_str());
return !!m_csv;
}
inline bool is_finished() {
return m_finished.get();
}
};
/** Writing ***************************************************************************/
void write_row(Row const &row, std::ostream &stream);
//Is m_finishing really thread-safe ? By the way, is it mandatory ?
class Writer : public Concurrent_queue<Row>, public Thread {
std::ofstream m_csv;
Thread_safe_value<bool> m_finishing;
void work() {
if(!!m_csv) {
CSV::Row row;
while(!(m_finishing.get() && empty())) {
wait_and_pop(row);
write_row(row, m_csv);
}
}
}
public:
Writer() {
m_finishing.set(false);
}
inline void finish() {
m_finishing.set(true);
catch_up();
}
inline bool open(std::string path) {
m_csv.open(path.c_str());
return !!m_csv;
}
};
}
#endif
Io.cpp :
#include "io.hpp"
#include <boost/bind.hpp>
#include <boost/tokenizer.hpp>
void CSV::Row::read_next(std::istream& csv) {
std::string row;
std::getline(csv, row);
boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"'));
m_data.assign(tokenizer.begin(), tokenizer.end());
}
void CSV::write_row(Row const &row, std::ostream &stream) {
std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
stream << std::endl;
}
Threads.hpp :
#ifndef THREADS_HPP_INCLUDED
#define THREADS_HPP_INCLUDED
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class Thread {
protected:
boost::thread *m_thread;
virtual void work() = 0;
void do_work() {
work();
}
public:
Thread() : m_thread(NULL) {}
virtual ~Thread() {
catch_up();
if(m_thread != NULL) {
delete m_thread;
}
}
inline void catch_up() {
if(m_thread != NULL) {
m_thread->join();
}
}
void run() {
m_thread = new boost::thread(boost::bind(&Thread::do_work, boost::ref(*this)));
}
};
/** Thread-safe datas **********************************************************/
#include <queue>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
template <class T>
class Thread_safe_value : public boost::noncopyable {
T m_value;
boost::mutex m_mutex;
public:
T const &get() {
boost::mutex::scoped_lock lock(m_mutex);
return m_value;
}
void set(T const &value) {
boost::mutex::scoped_lock lock(m_mutex);
m_value = value;
}
};
template<typename Data>
class Concurrent_queue {
std::queue<Data> m_queue;
mutable boost::mutex m_mutex;
boost::condition_variable m_cond;
public:
void push(Data const& data) {
boost::mutex::scoped_lock lock(m_mutex);
m_queue.push(data);
lock.unlock();
m_cond.notify_one();
}
bool empty() const {
boost::mutex::scoped_lock lock(m_mutex);
return m_queue.empty();
}
void wait_and_pop(Data& popped) {
boost::mutex::scoped_lock lock(m_mutex);
while(m_queue.empty()) {
m_cond.wait(lock);
}
popped = m_queue.front();
m_queue.pop();
}
};
#endif // THREAD_HPP_INCLUDED
Ce projet est important et j'apprécierais vraiment si vous pouviez m'aider =)
Merci d'avance.
Regards,
Monsieur Mystère.