wake-up-neo.net

Python: Multiprocessing für einen Pandas-Datenrahmen verwenden

Ich möchte multiprocessing in einem großen Datensatz verwenden, um den Abstand zwischen zwei GPS-Punkten zu ermitteln. Ich habe ein Test-Set erstellt, aber ich konnte multiprocessing nicht dazu bringen, an diesem Set zu arbeiten.

import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})



def calc_dist(x):
    return pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], x], 
                           df.loc[c[1], x])
                 ]
                 for grp,lst in df.groupby('co_nm').groups.items()
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])

if __== '__main__':
    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    pool.map(calc_dist, ['lat','lon'])
    pool.close()
    pool.join()

Ich verwende Python 2.7.11 und Ipython 4.1.2 mit Anaconda 2.5.0 64-Bit unter Windows7 Professional, wenn dieser Fehler auftritt.

runfile ('C: /.../ Desktop/Multiprocessing test.py', wdir = 'C: /.../ Desktop') Traceback (letzter Anruf zuletzt):

Datei "", Zeile 1, in runfile ('C: /.../ Desktop/Multiprocessing test.py', wdir = 'C: /.../ Desktop')

Datei "C: ...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", Zeile 699, in der Ausführungsdatei Execfile (Dateiname, Namensraum)

Datei "C: ...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", Zeile 74 in der Ausführungsdatei exec (kompilieren (Skripttext, Dateiname, 'exec'), glob, loc)

Datei "C: /..../ Multiprocessing test.py", Zeile 33, in pool.map (calc_dist, ['lat', 'lon'])

Datei "C: ...\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", Zeile 251, in Karte return self.map_async (func, iterable, chunksize) .get ()

Datei "C: ...\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py", Zeile 567, in get Selbstwert erhöhen

TypeError: Fehler beim Erstellen der Point-Instanz von 1.

def get(self, timeout=None):
    self.wait(timeout)
    if not self._ready:
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value
20
dustin

Was ist falsch

Diese Zeile aus Ihrem Code:

pool.map(calc_dist, ['lat','lon'])

spawns 2 Prozesse - einer führt calc_dist('lat') und der andere calc_dist('lon') aus. Vergleichen Sie das erste Beispiel in doc . (Grundsätzlich ruft pool.map(f, [1,2,3])f dreimal mit den in der folgenden Liste angegebenen Argumenten auf: f(1), f(2) und f(3).) Wenn ich mich nicht irre, kann Ihre Funktion calc_dist nur calc_dist('lat', 'lon') aufgerufen werden. Und es erlaubt keine parallele Verarbeitung.

Lösung

Ich glaube, Sie möchten die Arbeit zwischen den Prozessen aufteilen, indem Sie jeden Tuple (grp, lst) an einen separaten Prozess senden. Der folgende Code macht genau das. 

Zuerst wollen wir uns auf das Aufteilen vorbereiten:

grp_lst_args = list(df.groupby('co_nm').groups.items())

print(grp_lst_args)
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]

Wir werden jedes dieser Tupel (hier gibt es drei) als Argument an eine Funktion in einem separaten Prozess senden. Wir müssen die Funktion neu schreiben, nennen wir sie calc_dist2. Der Einfachheit halber ist das Argument ein Tuple wie in calc_dist2(('aa',[0,1,2])).

def calc_dist2(arg):
    grp, lst = arg
    return pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], ['lat','lon']], 
                           df.loc[c[1], ['lat','lon']])
                 ]
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])

Und jetzt kommt das Multiprocessing:

pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()

results_df = pd.concat(results)

results ist eine Liste von Ergebnissen (hier Datenrahmen) von Aufrufen calc_dist2((grp,lst)) für (grp,lst) in grp_lst_args. Elemente von results werden später zu einem Datenrahmen verkettet.

print(results_df)
  co_nm  machineA  machineB          distance
0    aa         1         2  156.876149391 km
1    aa         1         3  313.705445447 km
2    aa         2         3  156.829329105 km
0    cc         8         9  156.060165391 km
1    cc         8         0  311.910998169 km
2    cc         9         0  155.851498134 km
0    bb         4         5  156.665641837 km
1    bb         4         6  313.214333025 km
2    bb         4         7  469.622535339 km
3    bb         5         6  156.548897414 km
4    bb         5         7  312.957597466 km
5    bb         6         7   156.40899677 km

Übrigens, in Python 3 können wir eine with-Konstruktion verwenden:

with mp.Pool() as pool:
    results = pool.map(calc_dist2, grp_lst_args)

Update

Ich habe diesen Code nur unter Linux getestet. Unter Linux kann auf den schreibgeschützten Datenrahmen df von untergeordneten Prozessen zugegriffen werden und er wird nicht in den Speicherbereich kopiert. Ich bin jedoch nicht sicher, wie er unter Windows genau funktioniert. Sie können erwägen, df in Chunks aufzuteilen (gruppiert nach co_nm) und diese Chunks als Argumente an eine andere Version von calc_dist zu senden.

19
ptrj

Seltsam. Es scheint unter Python2 zu funktionieren, aber nicht unter Python3.

Dies ist eine minimal geänderte Version, um die Ausgabe zu drucken:

import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})



def calc_dist(x):
    ret =  pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], x],
                           df.loc[c[1], x])
                 ]
                 for grp,lst in df.groupby('co_nm').groups.items()
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])
    print(ret)
    return ret

if __== '__main__':
    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    pool.map(calc_dist, ['lat','lon'])
    pool.close()
    pool.join()

Und das ist die Ausgabe von Python2

0     aa         1         2  110.723608682 km
1     aa         1         3  221.460709525 km
2     aa         2         3  110.737100843 km
3     cc         8         9  110.827576495 km
4     cc         8         0  221.671650552 km
   co_nm  machineA  machineB          distance
5     cc         9         0  110.844074057 km
0     aa         1         2  110.575064814 km
1     aa         1         3  221.151481337 km
6     bb         4         5  110.765515243 km
2     aa         2         3  110.576416524 km
7     bb         4         6    221.5459187 km
3     cc         8         9  110.598565514 km
4     cc         8         0  221.203121352 km
8     bb         4         7  332.341640771 km
5     cc         9         0  110.604555838 km
6     bb         4         5   110.58113908 km
9     bb         5         6  110.780403457 km
7     bb         4         6  221.165643396 km
10    bb         5         7  221.576125528 km
8     bb         4         7  331.754177186 km
9     bb         5         6  110.584504316 km
10    bb         5         7  221.173038106 km
11    bb         6         7  110.795722071 km
11    bb         6         7   110.58853379 km

Und das ist die Stack-Spur von Python3

"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 123, in __new__
    seq = iter(arg)
TypeError: 'numpy.int64' object is not iterable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "gps.py", line 29, in calc_dist
    for grp, lst in df.groupby('co_nm').groups.items()
  File "gps.py", line 30, in <listcomp>
    for c in combinations(lst, 2)
  File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 322, in __init__
    super(vincenty, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 115, in __init__
    kilometers += self.measure(a, b)
  File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 342, in measure
    a, b = Point(a), Point(b)
  File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 126, in __new__
    "Failed to create Point instance from %r." % (arg,)
TypeError: Failed to create Point instance from 8.
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "gps.py", line 38, in <module>
    pool.map(calc_dist, ['lat', 'lon'])
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
TypeError: Failed to create Point instance from 8.

Ich weiß, das ist nicht die Antwort, aber vielleicht hilft es ...

1
salomonderossi