(MISE A JOUR) Je suis en train de construire un module pour distribuer des modèles basés sur des agents, l'idée est de répartir le modèle sur plusieurs processus et lorsque les agents atteignent une frontière, ils sont transmis au processeur qui s'occupe de cette région. Je peux mettre en place les processus et les faire fonctionner sans communication, mais je n'arrive pas à faire passer les données dans les tuyaux et à mettre à jour le segment du modèle sur l'autre processeur.
J'ai essayé les solutions proposées sur stackoverflow et j'ai construit une version simple du modèle. Dès que j'introduis un objet modèle dans le pipe, le modèle se bloque (il fonctionne avec les types de données standard de Python). La version simple ne fait que passer les agents d'un côté à l'autre.
from pathos.multiprocessing import ProcessPool
from pathos.helpers import mp
import copy
class TestAgent:
"Agent Class-- Schedule iterates through each agent and \
executes step function"
def __init__(self, unique_id, model):
self.unique_id = unique_id
self.model = model
self.type = "agent"
def step(self):
pass
#print (' ', self.unique_id, "I have stepped")
class TestModel:
"Model Class iterates through schedule and executes step function for \
each agent"
def __init__(self):
self.schedule = []
self.pipe = None
self.process = None
for i in range(1000):
a = TestAgent(i, self)
self.schedule.append(a)
def step(self):
for a in self.schedule:
a.step()
if __name__ == '__main__':
pool = ProcessPool(nodes=2)
#create instance of model
test_model = TestModel()
#create copies of model to be run on 2 processors
test1 = copy.deepcopy(test_model)
#clear schedule
test1.schedule = []
#Put in only half the schedule
for i in range(0,500):
test1.schedule.append(test_model.schedule[i])
#Give process tracker number
test1.process = 1
#repeat for other processor
test2= copy.deepcopy(test_model)
test2.schedule = []
for i in range(500,1000):
test2.schedule.append(test_model.schedule[i])
test2.process = 2
#create pipe
end1, end2 = mp.Pipe()
#Main run function for each process
def run(model, pipe):
for i in range(5):
print (model.process)#, [a.unique_id for a in model.schedule])
model.step() # IT HANGS AFTER INITIAL STEP
print ("send")
pipe.send(model.schedule)
print ("closed")
sched = pipe.recv()
print ("received")
model.schedule = sched
pool.map(run, [test1, test2], [end1,end2])
Les agents doivent changer de processeur et exécuter leurs fonctions d'impression. (Mon prochain problème sera de synchroniser les processeurs pour qu'ils restent à chaque étape, mais une chose à la fois).