Multiprocessing en python pour accélérer les fonctions

Je suis confondu avec le multitraitement Python.

J’essaie d’accélérer une fonction qui traite des chaînes à partir d’une firebase database, mais j’ai du mal à comprendre comment fonctionne le multitraitement, car la fonction prend plus de temps pour un groupe de travailleurs que pour un “traitement normal”.

Voici un exemple de ce que j’essaie de réaliser.

from time import clock, time from multiprocessing import Pool, freeze_support from random import choice def foo(x): TupWerteMany = [] for i in range(0,len(x)): TupWerte = [] s = list(x[i][3]) NewValue = choice(s)+choice(s)+choice(s)+choice(s) TupWerte.append(NewValue) TupWerte = tuple(TupWerte) TupWerteMany.append(TupWerte) return TupWerteMany if __name__ == '__main__': start_time = time() List = [(u'1', u'aa', u'Jacob', u'Emily'), (u'2', u'bb', u'Ethan', u'Kayla')] List1 = List*1000000 # METHOD 1 : NORMAL (takes 20 seconds) x2 = foo(List1) print x2[1:3] # METHOD 2 : APPLY_ASYNC (takes 28 seconds) # pool = Pool(4) # Werte = pool.apply_async(foo, args=(List1,)) # x2 = Werte.get() # print '--------' # print x2[1:3] # print '--------' # METHOD 3: MAP (!! DOES NOT WORK !!) # pool = Pool(4) # Werte = pool.map(foo, args=(List1,)) # x2 = Werte.get() # print '--------' # print x2[1:3] # print '--------' print 'Time Elaspse: ', time() - start_time 

Mes questions:

  1. Pourquoi apply_async prend-il plus de temps que la “manière normale”?
  2. Qu’est-ce que je fais mal avec la carte?
  3. Est-il judicieux d’accélérer de telles tâches avec le multitraitement?
  4. Enfin: après tout ce que j’ai lu ici, je me demande si le multitraitement en Python fonctionne sur Windows du tout?

Donc, votre premier problème est qu’il n’ya pas de parallélisme réel dans foo(x) , vous transmettez la liste entière à la fonction une fois.

1) L’idée d’un pool de processus est d’avoir de nombreux processus effectuant des calculs sur des bits distincts de certaines données.

  # METHOD 2 : APPLY_ASYNC jobs = 4 size = len(List1) pool = Pool(4) results = [] # split the list into 4 equally sized chunks and submit those to the pool heads = range(size/jobs, size, size/jobs) + [size] tails = range(0,size,size/jobs) for tail,head in zip(tails, heads): werte = pool.apply_async(foo, args=(List1[tail:head],)) results.append(werte) pool.close() pool.join() # wait for the pool to be done for result in results: werte = result.get() # get the return value from the sub jobs 

Cela ne vous donnera une accélération réelle que si le temps nécessaire au traitement de chaque morceau est supérieur au temps nécessaire au lancement du processus, dans le cas de quatre processus et de quatre tâches à accomplir, ces dynamics changent évidemment si vous Nous avons 4 processus et 100 emplois à accomplir. Rappelez-vous que vous créez un interpréteur python complètement nouveau quatre fois, ce n’est pas gratuit.

2) Le problème que vous avez avec la carte est qu’elle applique foo à chaque élément de List1 dans un processus séparé, cela prendra du temps. Donc, si vous avez un pool avec 4 processus, map affichera un élément de la liste quatre fois et l’enverra à un processus à traiter – attendez que le processus se termine – affichez d’autres éléments de la liste – attendez que le processus se termine . Cela n’a de sens que si le traitement d’un seul élément prend beaucoup de temps, comme par exemple si chaque élément est un nom de fichier pointant vers un fichier texte d’un gigaoctet. Mais en l’état, la carte ne prendra qu’une seule chaîne de la liste et la passera à fooapply_async prend une partie de la liste. Essayez le code suivant

 def foo(thing): print thing map(foo, ['a','b','c','d']) 

C’est la carte Python intégrée qui exécutera un processus unique, mais l’idée est exactement la même pour la version multiprocess.

Ajouté selon le commentaire de JFSebastian: Vous pouvez cependant utiliser l’argument chunksize pour map pour spécifier une taille approximative pour chaque segment.

 pool.map(foo, List1, chunksize=size/jobs) 

Je ne sais pas s’il y a un problème avec la map sous Windows car je n’en ai pas de disponible pour les tests.

3) oui, étant donné que votre problème est suffisamment important pour justifier la sortie de nouveaux interprètes python

4) ne peut pas vous donner une réponse définitive à ce sujet, car cela dépend du nombre de cœurs / processeurs, etc.

Sur la question (2) Avec l’aide de Dougal et Matti, j’ai compris ce qui n’allait pas. La fonction foo originale traite une liste de listes, tandis que la carte nécessite une fonction pour traiter des éléments individuels.

La nouvelle fonction devrait être

 def foo2 (x): TupWerte = [] s = list(x[3]) NewValue = choice(s)+choice(s)+choice(s)+choice(s) TupWerte.append(NewValue) TupWerte = tuple(TupWerte) return TupWerte 

et le bloc pour l’appeler:

 jobs = 4 size = len(List1) pool = Pool() #Werte = pool.map(foo2, List1, chunksize=size/jobs) Werte = pool.map(foo2, List1) pool.close() print Werte[1:3] 

Merci à tous ceux qui m’ont aidé à comprendre cela.

Résultats de toutes les méthodes: pour List * 2 Mio enregistrements: normal 13,3 secondes, parallèle avec asynchrone: 7,5 secondes, parallèle avec avec map avec chuncksize: 7.3, sans taille 5.2.

Voici un modèle générique de multitraitement si vous êtes intéressé.

 import multiprocessing as mp import time def worker(x): time.sleep(0.2) print "x= %s, x squared = %s" % (x, x*x) return x*x def apply_async(): pool = mp.Pool() for i in range(100): pool.apply_async(worker, args = (i, )) pool.close() pool.join() if __name__ == '__main__': apply_async() 

Et la sortie ressemble à ceci:

 x= 0, x squared = 0 x= 1, x squared = 1 x= 2, x squared = 4 x= 3, x squared = 9 x= 4, x squared = 16 x= 6, x squared = 36 x= 5, x squared = 25 x= 7, x squared = 49 x= 8, x squared = 64 x= 10, x squared = 100 x= 11, x squared = 121 x= 9, x squared = 81 x= 12, x squared = 144 

Comme vous pouvez le voir, les numéros ne sont pas en ordre, car ils sont exécutés de manière asynchrone.