wake-up-neo.net

pandas Multiprocessing gelten

Ich versuche, Multiprocessing mit Pandas-Datenrahmen zu verwenden, dh den Datenrahmen in 8 Teile aufzuteilen. Wenden Sie mit apply eine Funktion auf jedes Teil an (wobei jedes Teil in einem anderen Prozess verarbeitet wird).

EDIT: Hier ist die Lösung, die ich endlich gefunden habe:

import multiprocessing as mp
import pandas.util.testing as pdt

def process_apply(x):
    # do some stuff to data here

def process(df):
    res = df.apply(process_apply, axis=1)
    return res

if __== '__main__':
    p = mp.Pool(processes=8)
    split_dfs = np.array_split(big_df,8)
    pool_results = p.map(aoi_proc, split_dfs)
    p.close()
    p.join()

    # merging parts processed by different processes
    parts = pd.concat(pool_results, axis=0)

    # merging newly calculated parts to big_df
    big_df = pd.concat([big_df, parts], axis=1)

    # checking if the dfs were merged correctly
    pdt.assert_series_equal(parts['id'], big_df['id'])
15
yemu

Da ich nicht viel von Ihrem Datenskript habe, ist dies eine Vermutung, aber ich würde vorschlagen, beim Rückruf p.map anstelle von apply_async zu verwenden.

p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
    results.extend(result)
4
Rafael Barros

Sie können https://github.com/nalepae/pandarallel wie im folgenden Beispiel verwenden:

from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

2

Eine allgemeinere Version, die auf der Autorenlösung basiert und es ermöglicht, sie auf allen Funktionen und Datenrahmen auszuführen:

from multiprocessing import  Pool
from functools import partial
import numpy as np

def parallelize(data, func, num_of_processes=8):
    data_split = np.array_split(data, num_of_processes)
    pool = Pool(num_of_processes)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def run_on_subset(func, data_subset):
    return data_subset.apply(func, axis=1)

def parallelize_on_rows(data, func, num_of_processes=8):
    return parallelize(data, partial(run_on_subset, func), num_of_processes)

Also die folgende Zeile:

df.apply(some_func, axis=1)

Wird werden:

parallelize_on_rows(df, some_func) 
2
Tom Raz

Ich habe auch das gleiche Problem, wenn ich multiprocessing.map() verwende, um eine Funktion auf einen anderen Teil eines großen Datenrahmens anzuwenden.

Ich möchte nur einige Punkte hinzufügen, nur für den Fall, dass andere Leute auf dasselbe Problem stoßen wie ich.

  1. denken Sie daran, if __== '__main__': hinzuzufügen
  2. führen Sie die Datei in einer .py-Datei aus. Wenn Sie ipython/jupyter notebook verwenden, können Sie multiprocessing nicht ausführen. (Dies gilt für meinen Fall, obwohl ich keine Ahnung habe.)
0
user6651227

Das hat bei mir gut funktioniert:

rows_iter = (row for _, row in df.iterrows())

with multiprocessing.Pool() as pool:
    df['new_column'] = pool.map(process_apply, rows_iter)
0
EliadL