2 votes

Les tuyaux se bloquent - aucune autre solution sur stack overflow ne fonctionne

(MISE A JOUR) Je suis en train de construire un module pour distribuer des modèles basés sur des agents, l'idée est de répartir le modèle sur plusieurs processus et lorsque les agents atteignent une frontière, ils sont transmis au processeur qui s'occupe de cette région. Je peux mettre en place les processus et les faire fonctionner sans communication, mais je n'arrive pas à faire passer les données dans les tuyaux et à mettre à jour le segment du modèle sur l'autre processeur.

J'ai essayé les solutions proposées sur stackoverflow et j'ai construit une version simple du modèle. Dès que j'introduis un objet modèle dans le pipe, le modèle se bloque (il fonctionne avec les types de données standard de Python). La version simple ne fait que passer les agents d'un côté à l'autre.

 from pathos.multiprocessing import ProcessPool
 from pathos.helpers import mp
 import copy

 class TestAgent:

 "Agent Class-- Schedule iterates through each agent and \
  executes step function"

 def __init__(self, unique_id, model):
      self.unique_id = unique_id
      self.model = model
      self.type = "agent"

 def step(self):

       pass 
       #print ('     ', self.unique_id, "I have stepped")

class TestModel:

   "Model Class iterates through schedule and executes step function for \
   each agent"

   def __init__(self):
       self.schedule = []
       self.pipe = None
       self.process = None

       for i in range(1000):
           a = TestAgent(i, self)
           self.schedule.append(a)

   def step(self):

       for a in self.schedule:
           a.step()

if __name__ == '__main__':

   pool = ProcessPool(nodes=2)

   #create instance of model
   test_model = TestModel()
   #create copies of model to be run on 2 processors
   test1 = copy.deepcopy(test_model)
   #clear schedule
   test1.schedule = []
   #Put in only half the schedule
   for i in range(0,500):
       test1.schedule.append(test_model.schedule[i])  
   #Give process tracker number
   test1.process = 1
   #repeat for other processor
   test2= copy.deepcopy(test_model)
   test2.schedule = []
   for i in range(500,1000):
       test2.schedule.append(test_model.schedule[i])
   test2.process = 2

   #create pipe
   end1, end2 = mp.Pipe()

   #Main run function for each process
   def run(model, pipe):

      for i in range(5):
          print (model.process)#, [a.unique_id for a in model.schedule])
          model.step() # IT HANGS AFTER INITIAL STEP
          print ("send")
          pipe.send(model.schedule)
          print ("closed")
          sched = pipe.recv()
          print ("received")
          model.schedule = sched

   pool.map(run, [test1, test2], [end1,end2])

Les agents doivent changer de processeur et exécuter leurs fonctions d'impression. (Mon prochain problème sera de synchroniser les processeurs pour qu'ils restent à chaque étape, mais une chose à la fois).

enter image description here

0voto

TPike Points 110

J'ai réussi à le faire fonctionner. Je dépassais la limite de la mémoire tampon de Python (8192). Ceci est particulièrement vrai si l'agent détient une copie du modèle en tant qu'attribut. Une version fonctionnelle du code ci-dessus, qui passe les agents un par un, est présentée ci-dessous. Elle utilise Pympler pour obtenir la taille de tous les agents.

from pathos.multiprocessing import ProcessPool
from pathos.helpers import mp
import copy

# do a blocking map on the chosen function

class TestAgent:

"Agent Class-- Schedule iterates through each agent and \
executes step function"

   def __init__(self, unique_id, model):
       self.unique_id = unique_id
       self.type = "agent"

   def step(self):
       pass 

class TestModel:

   "Model Class iterates through schedule and executes step function for \
   each agent"

   def __init__(self):
       from pympler import asizeof 

       self.schedule = []
       self.pipe = None
       self.process = None
       self.size = asizeof.asizeof

       for i in range(1000):
           a = TestAgent(i, self)
           self.schedule.append(a)

   def step(self):

       for a in self.schedule:
           a.step()

if __name__ == '__main__':

   pool = ProcessPool(nodes=2)

   #create instance of model
   test_model = TestModel()
   #create copies of model to be run on 2 processors
   test1 = copy.deepcopy(test_model)
   #clear schedule
   test1.schedule = []
   #Put in only half the schedule
   for i in range(0,500):
       test1.schedule.append(test_model.schedule[i])  
   #Give process tracker number
   test1.process = 1
   #repeat for other processor
   test2= copy.deepcopy(test_model)
   test2.schedule = []
   for i in range(500,1000):
       test2.schedule.append(test_model.schedule[i])
   test2.process = 2

   #create pipe
   end1, end2 = mp.Pipe()

   #Main run function for each process
   def run(model, pipe):

      for i in range(5):
        agents = []
        print (model.process, model.size(model.schedule) ) 
        model.step() # IT HANGS AFTER INITIAL STEP
        #agent_num = list(model.schedule._agents.keys())
        for agent in model.schedule[:]:
            model.schedule.remove(agent)
            pipe.send(agent)
            agent = pipe.recv()
            agents.append(agent)
        print (model.process, "all agents received")
        for agent in agents: 
            model.schedule.append(agent)

        print (model.process, len(model.schedule))

   pool.map(run, [test1, test2], [end1,end2])

Mike McKerns et Thomas Moreau -Merci pour l'aide que vous m'avez apportée et qui m'a mis sur la bonne voie.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X