Ich versuche, Multiprocessing mit Pandas-Datenrahmen zu verwenden, dh den Datenrahmen in 8 Teile aufzuteilen. Wenden Sie mit apply eine Funktion auf jedes Teil an (wobei jedes Teil in einem anderen Prozess verarbeitet wird).
EDIT: Hier ist die Lösung, die ich endlich gefunden habe:
import multiprocessing as mp
import pandas.util.testing as pdt
def process_apply(x):
# do some stuff to data here
def process(df):
res = df.apply(process_apply, axis=1)
return res
if __== '__main__':
p = mp.Pool(processes=8)
split_dfs = np.array_split(big_df,8)
pool_results = p.map(aoi_proc, split_dfs)
p.close()
p.join()
# merging parts processed by different processes
parts = pd.concat(pool_results, axis=0)
# merging newly calculated parts to big_df
big_df = pd.concat([big_df, parts], axis=1)
# checking if the dfs were merged correctly
pdt.assert_series_equal(parts['id'], big_df['id'])
Da ich nicht viel von Ihrem Datenskript habe, ist dies eine Vermutung, aber ich würde vorschlagen, beim Rückruf p.map
anstelle von apply_async
zu verwenden.
p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
results.extend(result)
Sie können https://github.com/nalepae/pandarallel wie im folgenden Beispiel verwenden:
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
Eine allgemeinere Version, die auf der Autorenlösung basiert und es ermöglicht, sie auf allen Funktionen und Datenrahmen auszuführen:
from multiprocessing import Pool
from functools import partial
import numpy as np
def parallelize(data, func, num_of_processes=8):
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def run_on_subset(func, data_subset):
return data_subset.apply(func, axis=1)
def parallelize_on_rows(data, func, num_of_processes=8):
return parallelize(data, partial(run_on_subset, func), num_of_processes)
Also die folgende Zeile:
df.apply(some_func, axis=1)
Wird werden:
parallelize_on_rows(df, some_func)
Ich habe auch das gleiche Problem, wenn ich multiprocessing.map()
verwende, um eine Funktion auf einen anderen Teil eines großen Datenrahmens anzuwenden.
Ich möchte nur einige Punkte hinzufügen, nur für den Fall, dass andere Leute auf dasselbe Problem stoßen wie ich.
if __== '__main__':
hinzuzufügen.py
-Datei aus. Wenn Sie ipython/jupyter notebook
verwenden, können Sie multiprocessing
nicht ausführen. (Dies gilt für meinen Fall, obwohl ich keine Ahnung habe.)Das hat bei mir gut funktioniert:
rows_iter = (row for _, row in df.iterrows())
with multiprocessing.Pool() as pool:
df['new_column'] = pool.map(process_apply, rows_iter)