Usar multiprocesamiento en un marco de datos de Pandas

Mehvish Ashiq 15 febrero 2024
  1. Introducción al multiprocesamiento
  2. Importancia de usar multiprocesamiento
  3. Usar multiprocesamiento en un marco de datos de Pandas
Usar multiprocesamiento en un marco de datos de Pandas

Este tutorial presenta el multiprocesamiento en Python y lo educa mediante ejemplos de código y representaciones gráficas. También destaca la importancia del multiprocesamiento y demuestra cómo usar el módulo de multiprocesamiento con un dataframe de Pandas.

Introducción al multiprocesamiento

Multiprocesamiento significa tener la capacidad del sistema para soportar múltiples procesadores al mismo tiempo. En el multiprocesamiento, las aplicaciones se dividen en rutinas más pequeñas que pueden ejecutarse de forma independiente o menos independiente.

El sistema operativo asignará estos hilos a diferentes procesadores, mejorando el rendimiento del sistema. ¿Cómo? Entendámoslo a través del siguiente programa simple.

Código de ejemplo:

import time

start = time.perf_counter()


def print_something():
    print("Sleeping for 1 second.")
    time.sleep(1)
    print("Done with sleeping.")


print_something()
finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

Producción :

Sleeping for 1 second.
Done with sleeping.
Completed in 1.01 second(s)

El resultado anterior suena bien porque, en la cerca de código, primero importamos el módulo tiempo, que usaremos para medir cuánto tiempo tarda en ejecutarse el script. Para medir eso, calculamos el tiempo de inicio y fin usando start = time.perf_counter() y finish = time.perf_counter().

También tenemos una función llamada print_something() que imprime algo, duerme por un segundo e imprime otra declaración. Llamamos a esta función y finalmente imprimimos la última declaración que muestra que hemos completado el script.

Ahora, si ejecutamos print_something() dos veces, tardará casi dos segundos. Puede verificar eso a continuación.

Código de ejemplo:

import time

start = time.perf_counter()


def print_something():
    print("Sleeping for 1 second.")
    time.sleep(1)
    print("Done with sleeping.")


print_something()
print_something()

finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

Producción :

Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
Completed in 2.02 second(s)

Ahora, el programa está dormido por un segundo dos veces, por lo que lleva casi dos segundos terminar el script.

Entonces, podemos ver que cada vez que ejecutamos esta función print_something(), agrega aproximadamente un segundo al script. Nuestro script solo espera durmiendo por un segundo, y luego, pasa a la siguiente función y se sienta a esperar otro segundo.

Luego, en ese punto, terminamos y nuestro script finaliza. Podemos entenderlo a través de la siguiente representación gráfica.

python usa multiprocesamiento en un marco de datos de pandas - representación gráfica uno

Esta representación gráfica muestra que ejecutamos una función (es print_something() en nuestro caso), esperamos un segundo, ejecutamos la función nuevamente, luego esperamos otro segundo, y una vez hecho esto, imprime el print final. declaración que demuestra que hemos terminado.

Como se presenta en la representación gráfica anterior, la ejecución del script en este orden se denomina ejecución sincrónica.

Ahora, si tenemos alguna tarea que no necesita ejecutarse sincrónicamente, entonces podemos usar el módulo de multiprocesamiento para dividir estas tareas en las otras CPU y ejecutarlas simultáneamente.

Recuerde, multihilo no es lo mismo que multiprocesamiento. Puede encontrar la diferencia aquí.

La representación gráfica se verá de la siguiente manera si se supone que vamos a utilizar el módulo de multiprocesamiento.

python usa multiprocesamiento en un marco de datos de pandas - representación gráfica dos

Aquí, todavía tenemos dos tareas, pero las dividimos en dos procesos que se ejecutan simultáneamente en diferentes procesos. Ahora, implementemos esta representación gráfica en nuestro programa de la siguiente manera:

import multiprocessing
import time


start = time.perf_counter()


def print_something():
    print("Sleeping for 1 second.")
    time.sleep(1)
    print("Done with sleeping.")


process1 = multiprocessing.Process(target=print_something)
process2 = multiprocessing.Process(target=print_something)

process1.start()
process2.start()

process1.join()
process2.join()

finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

Producción :

Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.
Completed in 1.02 second(s)

Como podemos ver ahora, el script tarda un segundo en lugar de dos. El guión anterior es similar al anterior excepto por algunas modificaciones.

Importamos el módulo multiprocesamiento, que usaremos para crear procesos. Luego, hacemos dos procesos usando multiprocessing.Process(target=print_something) y los guardamos en process1 y process2.

Después de eso, comenzamos ambos procesos usando process1.start() y process2.start(). Usamos el método .join() para evitar ejecutar el resto de nuestro script antes de terminar los procesos.

Significa que si omitimos el método .join(), ejecutará las siguientes dos declaraciones antes de finalizar los procesos:

finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

Podemos aprenderlo usando el siguiente código de ejemplo:

import multiprocessing
import time


start = time.perf_counter()


def print_something():
    print("Sleeping for 1 second.")
    time.sleep(1)
    print("Done with sleeping.")


process1 = multiprocessing.Process(target=print_something)
process2 = multiprocessing.Process(target=print_something)

process1.start()
process2.start()

# process1.join()
# process2.join()

finish = time.perf_counter()
print(f"Completed in {round(finish-start,2)} second(s)")

Producción :

Completed in 0.01 second(s)
Sleeping for 1 second.
Done with sleeping.
Sleeping for 1 second.
Done with sleeping.

Como podemos ver, imprime la sentencia Completed antes de las sentencias Sleeping y Done. Para evitar eso, podemos usar el método .join().

Importancia de usar multiprocesamiento

Supongamos que tenemos una máquina (PC/Laptop) con un procesador. Si le asignamos varios procesos simultáneamente, tendrá que molestar o interrumpir cada tarea y cambiar de una a otra para seguir ejecutando todos los procesos.

Aquí es donde entra en escena el concepto de multiprocesamiento. Una computadora multiprocesador puede tener varios procesadores centrales (multiprocesador) o un componente informático con dos o más unidades de procesamiento reales independientes conocidas como núcleos (procesadores multinúcleo).

Aquí, la CPU ejecutará fácilmente múltiples tareas simultáneamente, donde cada tarea usará su proceso. De esta manera, aceleraremos nuestro programa y ahorraremos mucho tiempo y dinero.

Usar multiprocesamiento en un marco de datos de Pandas

Tenemos suficiente conocimiento sobre el módulo de multiprocesamiento, su uso básico y su importancia. Aprendamos cómo podemos usar este módulo con marcos de datos.

  • Importe las bibliotecas y los módulos.

    Primero, importe todos los módulos y bibliotecas necesarios.

    import pandas as pd
    from geopy.distance import geodesic
    from itertools import combinations
    import multiprocessing as mp
    
  • Cree un marco de datos.

    Creamos un marco de datos que contiene las columnas serial_number, column_name, lat y lon y sus valores.

    df = pd.DataFrame(
        {
            "serial_number": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
            "column_name": ["aa", "aa", "aa", "bb", "bb", "bb", "bb", "cc", "cc", "cc"],
            "lat": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            "lon": [21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
        }
    )
    
  • Divida las tareas.

    Dividimos la tarea entre procesos, probablemente enviando cada tupla (grp, lst) a un proceso separado. Las siguientes líneas de código hacen lo mismo:

    grp_lst_args = list(df.groupby("column_name").groups.items())
    print(grp_lst_args)
    # [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
    

    Después de eso, enviamos cada tupla como argumento a una función (calc_dist() en este caso) en un proceso separado. Veamos eso a continuación.

  • Envía la lista de tuplas a una función.

    El calc_dist() toma un parámetro de tipo lista para calcular la distancia y lo devuelve como un marco de datos. Tenga en cuenta que pd.DataFrame() se utiliza para convertir a un marco de datos.

    Aquí, la lista contiene tres tuplas [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])] que creamos en el anterior.

    def calc_dist(arg):
        grp, lst = arg
        return pd.DataFrame(
            [
                [
                    grp,
                    df.loc[c[0]].serial_number,
                    df.loc[c[1]].serial_number,
                    geodesic(df.loc[c[0], ["lat", "lon"]], df.loc[c[1], ["lat", "lon"]]),
                ]
                for c in combinations(lst, 2)
            ],
            columns=["column_name", "machine_A", "machine_B", "Distance"],
        )
    
  • Implementar multiprocesamiento.

    En este punto, usaremos el multiprocesamiento para llamar simultáneamente a la función calc_dist() para cada tupla. En el siguiente código, usamos Pool() para la ejecución paralela de la función calc_dist() para cada tupla.

    pool = mp.Pool(processes=(mp.cpu_count() - 1))
    results = pool.map(calc_dist, grp_lst_args)
    pool.close()
    pool.join()
    results_df = pd.concat(results)
    

El código fuente completo se proporciona a continuación:

import pandas as pd
from geopy.distance import geodesic
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame(
    {
        "serial_number": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
        "column_name": ["aa", "aa", "aa", "bb", "bb", "bb", "bb", "cc", "cc", "cc"],
        "lat": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        "lon": [21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
    }
)

grp_lst_args = list(df.groupby("column_name").groups.items())
print(grp_lst_args)
# [('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]


def calc_dist(arg):
    grp, lst = arg
    return pd.DataFrame(
        [
            [
                grp,
                df.loc[c[0]].serial_number,
                df.loc[c[1]].serial_number,
                geodesic(df.loc[c[0], ["lat", "lon"]], df.loc[c[1], ["lat", "lon"]]),
            ]
            for c in combinations(lst, 2)
        ],
        columns=["column_name", "machine_A", "machine_B", "Distance"],
    )


pool = mp.Pool(processes=(mp.cpu_count() - 1))
results = pool.map(calc_dist, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)

results_df

Producción :

|      | column_name | machine_A | machine_B | Distance              |
| ---- | ----------- | --------- | --------- | --------------------- |
| 0    | aa          | 1         | 2         | 156.87614940188664 km |
| 1    | aa          | 1         | 3         | 313.70544546930296 km |
| 2    | aa          | 2         | 3         | 156.82932911607335 km |
| 0    | bb          | 4         | 5         | 156.66564184752647 km |
| 1    | bb          | 4         | 6         | 313.21433304645853 km |
| 2    | bb          | 4         | 7         | 469.6225353706956 km  |
| 3    | bb          | 5         | 6         | 156.54889742502786 km |
| 4    | bb          | 5         | 7         | 312.95759748703733 km |
| 5    | bb          | 6         | 7         | 156.40899678081678 km |
| 0    | cc          | 8         | 9         | 156.0601654009819 km  |
| 1    | cc          | 8         | 0         | 311.91099818906036 km |
| 2    | cc          | 9         | 0         | 155.85149814424545 km |
Mehvish Ashiq avatar Mehvish Ashiq avatar

Mehvish Ashiq is a former Java Programmer and a Data Science enthusiast who leverages her expertise to help others to learn and grow by creating interesting, useful, and reader-friendly content in Computer Programming, Data Science, and Technology.

LinkedIn GitHub Facebook

Artículo relacionado - Pandas DataFrame