L'approche la plus simple que j'ai trouvée pour exécuter un lot de requêtes est de passer une session Grakn à chaque thread dans un fichier de type ThreadPool
. Au sein de chaque thread, vous pouvez gérer des transactions et, bien sûr, effectuer des opérations logiques plus complexes :
from grakn.client import GraknClient
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
def write_query_batch(session, batch):
tx = session.transaction().write()
for query in batch:
tx.query(query)
tx.commit()
def multi_thread_write_query_batches(session, query_batches, num_threads=8):
pool = ThreadPool(num_threads)
pool.map(partial(write_query_batch, session), query_batches)
pool.close()
pool.join()
def generate_query_batches(my_data_entries_list, batch_size):
batch = []
for index, data_entry in enumerate(my_data_entries_list):
batch.append(data_entry)
if index % batch_size == 0 and index != 0:
yield batch
batch = []
if batch:
yield batch
# (Part 2) Somewhere in your application open a client and a session
client = GraknClient(uri="localhost:48555")
session = client.session(keyspace="grakn")
query_batches_iterator = generate_query_batches(my_data_entries_list, batch_size)
multi_thread_write_query_batches(session, query_batches_iterator, num_threads=8)
session.close()
client.close()
La méthode ci-dessus est une méthode générique. Comme exemple concret, vous pouvez utiliser la méthode ci-dessus (en omettant la partie 2) pour paralléliser les lots de insert
à partir de deux fichiers. En ajoutant ceci à ce qui précède, cela devrait fonctionner :
files = [
{
"file_path": f"/path/to/your/file.gql",
},
{
"file_path": f"/path/to/your/file2.gql",
}
]
KEYSPACE = "grakn"
URI = "localhost:48555"
BATCH_SIZE = 10
NUM_BATCHES = 1000
# Entry point where migration starts
def migrate_graql_files():
start_time = time.time()
for file in files:
print('==================================================')
print(f'Loading from {file["file_path"]}')
print('==================================================')
open_file = open(file["file_path"], "r") # Here we are assuming you have 1 Graql query per line!
batches = generate_query_batches(open_file.readlines(), BATCH_SIZE)
with GraknClient(uri=URI) as client: # Using `with` auto-closes the client
with client.session(KEYSPACE) as session: # Using `with` auto-closes the session
multi_thread_write_query_batches(session, batches, num_threads=16) # Pick `num_threads` according to your machine
elapsed = time.time() - start_time
print(f'Time elapsed {elapsed:.1f} seconds')
elapsed = time.time() - start_time
print(f'Time elapsed {elapsed:.1f} seconds')
if __name__ == "__main__":
migrate_graql_files()
Vous devriez également être en mesure de voir comment vous pouvez charger depuis un csv
ou tout autre type de fichier de cette manière, mais en prenant les valeurs que vous trouvez dans ce fichier et en les substituant dans des modèles de chaînes de requêtes Graql. Jetez un coup d'œil au exemple de migration dans les docs pour en savoir plus.