Pandas DataFrame でマルチプロセッシングを使用する

Mehvish Ashiq 2024年2月15日
  1. マルチプロセッシングの紹介
  2. マルチプロセッシングの重要性
  3. Pandas DataFrame でマルチプロセッシングを使用する
Pandas DataFrame でマルチプロセッシングを使用する

このチュートリアルでは、Python のマルチプロセッシングを紹介し、コード例とグラフィック表現を使用してそれについて説明します。 また、マルチプロセッシングの重要性を強調し、Pandas データフレームで multiprocessing モジュールを使用する方法を示します。

マルチプロセッシングの紹介

マルチプロセッシングとは、同時に複数のプロセッサをサポートするシステムの能力を持つことを意味します。 マルチプロセッシングでは、アプリケーションは独立して実行できる、または独立性が低いルーチンに分割されます。

オペレーティング システムはこれらのスレッドを異なるプロセッサに割り当て、システムのパフォーマンスを向上させます。 どうやって? 次の簡単なプログラムで理解してみましょう。

コード例:

import time

start = time.perf_counter()


def print_something():
    print("Sleeping for 1 second.")
    time.sleep(1)
    print("Done with sleeping.")


print_something()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

出力:

Sleeping for 1 second.
Done with sleeping.
Completed in 1.01 second(s)

上記の出力は、コード フェンスで最初に time モジュールをインポートしたため、正しく聞こえます。これを使用して、スクリプトの実行にかかる時間を測定します。 それを測定するために、start = time.perf_counter()finish = time.perf_counter() を使用して startfinish の時間を計算しました。

print_something() という名前の関数もあり、何かを出力し、1 秒間スリープし、別のステートメントを出力します。 この関数を呼び出し、最後に、スクリプトが完了したことを示す最後のステートメントを出力します。

ここで、print_something() を 2 回実行すると、ほぼ 2 秒かかります。 以下で確認できます。

コード例:

import time

start = time.perf_counter()


def print_something():
    print("Sleeping for 1 second.")
    time.sleep(1)
    print("Done with sleeping.")


print_something()
print_something()

finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

出力:

Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
Completed in 2.02 second(s)

現在、プログラムは 1 秒間 2 回スリープしているため、スクリプトが完了するまでに約 2 秒かかります。

したがって、この print_something() 関数を実行するたびに、スクリプトに約 1 秒追加されることがわかります。 スクリプトは 1 秒間スリープ状態で待機しているだけで、次の関数に移動して、さらに 1 秒間待機します。

そして、その時点で作業は完了し、スクリプトは終了します。 次の図で理解できます。

python は pandas データ フレームで multiprocessing を使用 - グラフィカル表現 1

このグラフィカルな表現は、関数 (この場合は print_something()) を実行し、1 秒待ってから関数を再度実行し、さらに 1 秒待ってから、最終的な print を出力することを示しています。 私たちが終わったことを示す声明。

上の図に示されているように、この順序でスクリプトを実行することを同期実行と呼びます。

ここで、同期的に実行する必要のないタスクがある場合、multiprocessing モジュールを使用してこれらのタスクを他の CPU に分割し、同時に実行できます。

multithreadingmultiprocessing と同じではないことに注意してください。 ここで違いを見つけることができます。

multiprocessing モジュールを使用する場合、グラフィック表示は次のようになります。

python は pandas データ フレームで multiprocessing を使用 - グラフィカル表現 2

ここでは、まだ 2つのタスクがありますが、これを 2つのプロセスに分割し、異なるプロセスで同時に実行します。 それでは、このグラフィカルな表現を次のようにプログラムに実装しましょう。

import multiprocessing
import time


start = time.perf_counter()


def print_something():
    print("Sleeping for 1 second.")
    time.sleep(1)
    print("Done with sleeping.")


process1 = multiprocessing.Process(target=print_something)
process2 = multiprocessing.Process(target=print_something)

process1.start()
process2.start()

process1.join()
process2.join()

finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

出力:

Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
Completed in 1.02 second(s)

ご覧のとおり、スクリプトの所要時間は 2 秒ではなく 1 秒です。 上記のスクリプトは、いくつかの変更を除いて前のスクリプトと似ています。

プロセスの作成に使用する multiprocessing モジュールをインポートしました。 次に、multiprocessing.Process(target=print_something) を使用して 2つのプロセスを作成し、process1process2 に保存します。

その後、process1.start()process2.start() を使用してこれらのプロセスの両方を開始します。 .join() メソッドを使用して、プロセスを終了する前に残りのスクリプトを実行しないようにしました。

.join() メソッドを省略すると、プロセスを終了する前に次の 2つのステートメントが実行されることを意味します。

finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

次のサンプルコードを使用して学習できます。

import multiprocessing
import time


start = time.perf_counter()


def print_something():
    print("Sleeping for 1 second.")
    time.sleep(1)
    print("Done with sleeping.")


process1 = multiprocessing.Process(target=print_something)
process2 = multiprocessing.Process(target=print_something)

process1.start()
process2.start()

# process1.join()
# process2.join()

finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

出力:

Completed in 0.01 second(s)
Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.

ご覧のとおり、Sleeping ステートメントと Done ステートメントの前に Completed ステートメントを出力します。 それを避けるために、.join() メソッドを使用できます。

マルチプロセッシングの重要性

プロセッサが 1つあるマシン (PC/ラップトップ) があるとします。 複数のプロセスを同時に割り当てると、すべてのタスクを妨害または中断し、すべてのプロセスを実行し続けるために、あるタスクから別のタスクに切り替える必要があります。

ここで、マルチプロセッシングの概念が登場します。 マルチプロセッシング コンピューターには、複数の中央処理装置 (マルチプロセッサー) を搭載することも、コアと呼ばれる 2つ以上の独立した実際の処理ユニット (マルチコア プロセッサー) を備えた 1つのコンピューティング コンポーネントを搭載することもできます。

ここでは、CPU は複数のタスクを同時に簡単に実行でき、各タスクはそのプロセスを使用します。 このようにして、プログラムを高速化し、多くの時間とコストを節約します。

Pandas DataFrame でマルチプロセッシングを使用する

multiprocessing モジュール、その基本的な使用法、およびその重要性について十分な知識があります。 このモジュールをデータ フレームで使用する方法を学びましょう。

  • ライブラリとモジュールをインポートします。

    まず、必要なモジュールとライブラリをすべてインポートします。

    import pandas as pd
    from geopy.distance import geodesic
    from itertools import combinations
    import multiprocessing as mp
    
  • データ フレームを作成します。

    serial_numbercolumn_namelat、および lon 列とその値を含むデータ フレームを作成します。

    df = pd.DataFrame(
        {
            "serial_number": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
            "column_name": ["aa", "aa", "aa", "bb", "bb", "bb", "bb", "cc", "cc", "cc"],
            "lat": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            "lon": [21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
        }
    )
    
  • タスクを分割します。

    タスクをプロセス間で分割し、おそらくすべてのタプル (grp, lst) を別のプロセスに送信します。 次のコード行は同じことを行います。

    grp_lst_args = list(df.groupby("column_name").groups.items())
    print(grp_lst_args)
    # [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
    

    その後、各タプルを別のプロセスで関数 (この場合は calc_dist()) に引数として送信します。 以下でそれを見てみましょう。

  • タプルのリストを関数に送信します。

    calc_dist()list 型のパラメーターを受け取り、距離を計算し、それをデータ フレームとして返します。 pd.DataFrame() を使用してデータ フレームに変換することに注意してください。

    ここで、リストには 3つのタプル [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])] 前に作成したもの。

    def calc_dist(arg):
        grp, lst = arg
        return pd.DataFrame(
            [
                [
                    grp,
                    df.loc[c[0]].serial_number,
                    df.loc[c[1]].serial_number,
                    geodesic(df.loc[c[0], ["lat", "lon"]], df.loc[c[1], ["lat", "lon"]]),
                ]
                for c in combinations(lst, 2)
            ],
            columns=["column_name", "machine_A", "machine_B", "Distance"],
        )
    
  • マルチプロセッシングを実装します。

    この時点で、マルチプロセッシングを使用して、各タプルに対して calc_dist() 関数を同時に呼び出します。 次のコードでは、タプルごとに calc_dist() 関数を並列実行するために Pool() を使用します。

    pool = mp.Pool(processes=(mp.cpu_count() - 1))
    results = pool.map(calc_dist, grp_lst_args)
    pool.close()
    pool.join()
    results_df = pd.concat(results)
    

完全なソース コードを以下に示します。

import pandas as pd
from geopy.distance import geodesic
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame(
    {
        "serial_number": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
        "column_name": ["aa", "aa", "aa", "bb", "bb", "bb", "bb", "cc", "cc", "cc"],
        "lat": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        "lon": [21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
    }
)

grp_lst_args = list(df.groupby("column_name").groups.items())
print(grp_lst_args)
# [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]


def calc_dist(arg):
    grp, lst = arg
    return pd.DataFrame(
        [
            [
                grp,
                df.loc[c[0]].serial_number,
                df.loc[c[1]].serial_number,
                geodesic(df.loc[c[0], ["lat", "lon"]], df.loc[c[1], ["lat", "lon"]]),
            ]
            for c in combinations(lst, 2)
        ],
        columns=["column_name", "machine_A", "machine_B", "Distance"],
    )


pool = mp.Pool(processes=(mp.cpu_count() - 1))
results = pool.map(calc_dist, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)

results_df

出力:

|      | column_name | machine_A | machine_B | Distance              |
| ---- | ----------- | --------- | --------- | --------------------- |
| 0    | aa          | 1         | 2         | 156.87614940188664 km |
| 1    | aa          | 1         | 3         | 313.70544546930296 km |
| 2    | aa          | 2         | 3         | 156.82932911607335 km |
| 0    | bb          | 4         | 5         | 156.66564184752647 km |
| 1    | bb          | 4         | 6         | 313.21433304645853 km |
| 2    | bb          | 4         | 7         | 469.6225353706956 km  |
| 3    | bb          | 5         | 6         | 156.54889742502786 km |
| 4    | bb          | 5         | 7         | 312.95759748703733 km |
| 5    | bb          | 6         | 7         | 156.40899678081678 km |
| 0    | cc          | 8         | 9         | 156.0601654009819 km  |
| 1    | cc          | 8         | 0         | 311.91099818906036 km |
| 2    | cc          | 9         | 0         | 155.85149814424545 km |
著者: Mehvish Ashiq
Mehvish Ashiq avatar Mehvish Ashiq avatar

Mehvish Ashiq is a former Java Programmer and a Data Science enthusiast who leverages her expertise to help others to learn and grow by creating interesting, useful, and reader-friendly content in Computer Programming, Data Science, and Technology.

LinkedIn GitHub Facebook

関連記事 - Pandas DataFrame