3 votes

Python : Utiliser sqlite3 avec le multiprocessing

J'ai une base de données SQLite3. Je dois analyser 10000 fichiers. Je lis des données dans chaque fichier, puis j'interroge la base de données avec ces données pour obtenir un résultat. Mon code fonctionne bien dans un environnement à processus unique. Mais j'obtiens une erreur lorsque j'essaie d'utiliser le pool mulitprocessing.

My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files: 
     foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB

My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4) 
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB 

J'obtiens l'erreur suivante : sqlite3.ProgrammingError : Base Cursor.__init__ n'a pas été appelé.

Ma classe DB est mise en œuvre comme suit :

def open_db(sqlite_file):
    """Open SQLite database connection.

    Args:
    sqlite_file -- File path

    Return:
    Connection
    """

    log.info('Open SQLite database %s', sqlite_file)
    try:
        conn = sqlite3.connect(sqlite_file)
    except sqlite3.Error, e:
        log.error('Unable to open SQLite database %s', e.args[0])
        sys.exit(1)

    return conn

def close_db(conn, sqlite_file):
    """Close SQLite database connection.

    Args:
    conn -- Connection
    """

    if conn:
        log.info('Close SQLite database %s', sqlite_file)
        conn.close()

class MapDB:

    def __init__(self, sqlite_file):
        """Initialize.

        Args:
        sqlite_file -- File path
        """

        # 1. Open database.
        # 2. Setup to receive data as dict().
        # 3. Get cursor to execute queries.
        self._sqlite_file      = sqlite_file
        self._conn             = open_db(sqlite_file)
        self._conn.row_factory = sqlite3.Row
        self._cursor           = self._conn.cursor()

    def close(self):
        """Close DB connection."""

        if self._cursor:
            self._cursor.close()
        close_db(self._conn, self._sqlite_file)

    def check(self):
        ...

    def get_driver_net(self, net):
        ...

    def get_cell_id(self, net):
       ...

La fonction foo() ressemble à ceci :

def foo(f, x1, x2, db):

  extract some data from file f
  r1 = db.get_driver_net(...)
  r2 = db.get_cell_id(...)

La mise en œuvre globale de l'arrêt de travail est la suivante :

mapdb = MapDB(sqlite_file)

log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]                 
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)    
pool.close()
mapdb.close()

Pour résoudre ce problème, je pense que je dois créer l'objet MapDB() à l'intérieur de chaque travailleur du pool (afin d'avoir 4 connexions parallèles/indépendantes). Mais je ne sais pas trop comment faire. Je ne suis pas sûr de savoir comment faire. Quelqu'un peut-il me montrer un exemple de la façon dont on peut faire cela avec Pool ?

2voto

user5402 Points 8479

Qu'en est-il de la définition de foo comme ceci :

def foo(f, x1, x2, db_path):
    mapdb = MapDB(db_path)
    ... open mapdb
    ... process data ...
    ... close mapdb

puis modifiez votre appel à pool.map en :

pool.map(functools.partial(foo, x1=x1, x2=x2, db_path="path-to-sqlite3-db"), files)    

Mise à jour

Une autre option consiste à gérer soi-même les fils de travail et à distribuer le travail par l'intermédiaire d'un fichier Queue .

from Queue import Queue
from threading import Thread

q = Queue()

def worker():
  mapdb = ...open the sqlite database
  while True:
    item = q.get()
    if item[0] == "file":
      file = item[1]
      ... process file ...
      q.task_done()
    else:
      q.task_done()
      break
  ...close sqlite connection...

# Start up the workers

nworkers = 4

for i in range(nworkers):
  worker = Thread(target=worker)
  worker.daemon = True
  worker.start()

# Place work on the Queue

for x in ...list of files...:
  q.put(("file",x))

# Place termination tokens onto the Queue

for i in range(nworkers):
  q.put(("end",))

# Wait for all work to be done.

q.join()

Les jetons de terminaison sont utilisés pour s'assurer que les connexions sqlite sont fermées - au cas où cela aurait de l'importance.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X