J'ai une base de données SQLite3. Je dois analyser 10000 fichiers. Je lis des données dans chaque fichier, puis j'interroge la base de données avec ces données pour obtenir un résultat. Mon code fonctionne bien dans un environnement à processus unique. Mais j'obtiens une erreur lorsque j'essaie d'utiliser le pool mulitprocessing.
My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files:
foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB
My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4)
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB
J'obtiens l'erreur suivante : sqlite3.ProgrammingError : Base Cursor.__init__ n'a pas été appelé.
Ma classe DB est mise en œuvre comme suit :
def open_db(sqlite_file):
"""Open SQLite database connection.
Args:
sqlite_file -- File path
Return:
Connection
"""
log.info('Open SQLite database %s', sqlite_file)
try:
conn = sqlite3.connect(sqlite_file)
except sqlite3.Error, e:
log.error('Unable to open SQLite database %s', e.args[0])
sys.exit(1)
return conn
def close_db(conn, sqlite_file):
"""Close SQLite database connection.
Args:
conn -- Connection
"""
if conn:
log.info('Close SQLite database %s', sqlite_file)
conn.close()
class MapDB:
def __init__(self, sqlite_file):
"""Initialize.
Args:
sqlite_file -- File path
"""
# 1. Open database.
# 2. Setup to receive data as dict().
# 3. Get cursor to execute queries.
self._sqlite_file = sqlite_file
self._conn = open_db(sqlite_file)
self._conn.row_factory = sqlite3.Row
self._cursor = self._conn.cursor()
def close(self):
"""Close DB connection."""
if self._cursor:
self._cursor.close()
close_db(self._conn, self._sqlite_file)
def check(self):
...
def get_driver_net(self, net):
...
def get_cell_id(self, net):
...
La fonction foo() ressemble à ceci :
def foo(f, x1, x2, db):
extract some data from file f
r1 = db.get_driver_net(...)
r2 = db.get_cell_id(...)
La mise en œuvre globale de l'arrêt de travail est la suivante :
mapdb = MapDB(sqlite_file)
log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)
pool.close()
mapdb.close()
Pour résoudre ce problème, je pense que je dois créer l'objet MapDB() à l'intérieur de chaque travailleur du pool (afin d'avoir 4 connexions parallèles/indépendantes). Mais je ne sais pas trop comment faire. Je ne suis pas sûr de savoir comment faire. Quelqu'un peut-il me montrer un exemple de la façon dont on peut faire cela avec Pool ?