4 votes

Comment paralléliser au mieux les requêtes grakn avec Python ?

J'utilise Windows 10, Python 3.7 et j'ai un processeur à 6 cœurs. Un seul thread Python sur ma machine soumet 1 000 insertions par seconde à grakn. J'aimerais paralléliser mon code pour insérer et faire correspondre encore plus rapidement. Comment les gens font-ils cela ?

Ma seule expérience de la parallélisation est sur un autre projet, où je soumets une fonction personnalisée à un client distribué dask pour générer des milliers de tâches. Actuellement, cette même approche échoue chaque fois que la fonction personnalisée reçoit ou génère un objet/manifeste de transaction grakn. J'obtiens des erreurs comme :

Traceback (most recent call last):
  File "C:\Users\dvyd\.conda\envs\activefiction\lib\site-packages\distributed\protocol\pickle.py", line 41, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
...
  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

Je n'ai jamais utilisé directement le module de multiprocessing de Python. Que font les autres personnes pour paralléliser leurs requêtes à grakn ?

1voto

James Points 46

L'approche la plus simple que j'ai trouvée pour exécuter un lot de requêtes est de passer une session Grakn à chaque thread dans un fichier de type ThreadPool . Au sein de chaque thread, vous pouvez gérer des transactions et, bien sûr, effectuer des opérations logiques plus complexes :

from grakn.client import GraknClient
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

def write_query_batch(session, batch):
    tx = session.transaction().write()
    for query in batch:
        tx.query(query)
    tx.commit()

def multi_thread_write_query_batches(session, query_batches, num_threads=8):
    pool = ThreadPool(num_threads)
    pool.map(partial(write_query_batch, session), query_batches)
    pool.close()
    pool.join()

def generate_query_batches(my_data_entries_list, batch_size):
    batch = []
    for index, data_entry in enumerate(my_data_entries_list):
        batch.append(data_entry)
        if index % batch_size == 0 and index != 0:
            yield batch
            batch = []
    if batch:
        yield batch

# (Part 2) Somewhere in your application open a client and a session
client = GraknClient(uri="localhost:48555")
session = client.session(keyspace="grakn")

query_batches_iterator = generate_query_batches(my_data_entries_list, batch_size)
multi_thread_write_query_batches(session, query_batches_iterator, num_threads=8)

session.close()
client.close()

La méthode ci-dessus est une méthode générique. Comme exemple concret, vous pouvez utiliser la méthode ci-dessus (en omettant la partie 2) pour paralléliser les lots de insert à partir de deux fichiers. En ajoutant ceci à ce qui précède, cela devrait fonctionner :

files = [
    {
        "file_path": f"/path/to/your/file.gql",
    },
    {
        "file_path": f"/path/to/your/file2.gql",
    }
]

KEYSPACE = "grakn"
URI = "localhost:48555"
BATCH_SIZE = 10
NUM_BATCHES = 1000

# Entry point where migration starts
def migrate_graql_files():
    start_time = time.time()

    for file in files:
        print('==================================================')
        print(f'Loading from {file["file_path"]}')
        print('==================================================')

        open_file = open(file["file_path"], "r")  # Here we are assuming you have 1 Graql query per line!
        batches = generate_query_batches(open_file.readlines(), BATCH_SIZE)

        with GraknClient(uri=URI) as client:  # Using `with` auto-closes the client
            with client.session(KEYSPACE) as session:  # Using `with` auto-closes the session
                multi_thread_write_query_batches(session, batches, num_threads=16)  # Pick `num_threads` according to your machine

        elapsed = time.time() - start_time
        print(f'Time elapsed {elapsed:.1f} seconds')

    elapsed = time.time() - start_time
    print(f'Time elapsed {elapsed:.1f} seconds')

if __name__ == "__main__":
    migrate_graql_files()

Vous devriez également être en mesure de voir comment vous pouvez charger depuis un csv ou tout autre type de fichier de cette manière, mais en prenant les valeurs que vous trouvez dans ce fichier et en les substituant dans des modèles de chaînes de requêtes Graql. Jetez un coup d'œil au exemple de migration dans les docs pour en savoir plus.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X