Multiprocessing: una piscina de procesos en tu Python
Cada año durante las vacaciones, como buen informático, aprovecho para leer cosas diferentes y aprender algo nuevo, en esta ocasión he podido estudiar sobre la librería...
Cada año durante las vacaciones, como buen informático, aprovecho para leer cosas diferentes y aprender algo nuevo, en esta ocasión he podido estudiar sobre la librería Multiprocessing de Python y poder ver toda la potencia que esta ofrece al programador. Lo que antes era complicado (comunicación entre procesos del sistema operativo) se ha simplificado mucho gracias a esta potente librería que ahora os presento.
Si alguien me preguntase “¡Pero en pocas palabras! ¿qué hace la librería Multiprocessing de Python?”, diría que es como la librería “Threading” que facilita la creación y manejo de hilos (Threads), pero desde el punto de vista de procesos del sistema operativo (Forks), algunos programadores hablan de hilos blandos y duros, a mi personalmente esa definición no me gusta y por tanto hablaré todo el tiempo de Hilos y Procesos donde para mi un hilo es un Thread y un proceso es el resultado de un Fork.
Pero la librería no sólo se encarga de hacer “forks”, su habilidad más interesante es la de facilitar la gestión de estos procesos, para ello provee de los mecanismos necesarios para que todos los procesos puedan comunicarse entre si, puedan ponerse en sincronía e incluso gestionar cola comunes a todos ellos de una forma fiable y segura. Igualmente y si buscas resolver una tarea que no requiere trabajadores demasiado inteligentes o autónomos, pone a disposición del programador una piscina o Pool de procesos.
En este artículo voy a presentar las funciones básicas de esta librería, como comenzar a sacarle jugo desde el primer momento y terminaré mostrando los mecanismos de gestión disponibles: Cola, Tuberías y Semáforos.
La diferencia entre hilo y proceso es fácil de reconocer, un hilo ocurre dentro del espacio de memoria de un programa y un proceso es una copia completa del programa, por esta razón, los hilos son rápidos de crear y destruir además de que consumen poca memoria y los procesos son lentos de crear y destruir además de que requieren clonar el espacio de memoria del programa en otro lugar de la RAM, y esto es lento. Ejemplos de esto serían, subrutinas que recogen mensajes de un puerto de comunicaciones y los usan para actuar sobre emails almacenados en un servidor (lo que podría ser un servidor imap para gestionar los emails), desde el punto de vista del servidor, el proceso remoto (o cliente de correo) sólo necesitan usar el servidor durante un corto plazo de tiempo, porque envía un mensaje al servidor donde le indica lo que el usuario desea hacer (saber si hay mensajes nuevos, borrar un correo, moverlo), el servidor abre un hilo para atender a ese usuario y el hilo sólo vive mientras dure la conexión del usuario, una vez el usuario ha terminado (ve que no tiene correos nuevos) el cliente de correo desconecta hasta nueva acción. Este proceso que he descrito es rápido, ocurre en milisegundos y generalmente se resuelve con hilos porque es más ligero para el sistema operativo y su vida media es especialmente corta, además de que el sistema podrá aceptar ciento o miles de conexiones por segundo y será ligero, rápido y eficiente en esta tarea.
Matizando:
La tendencia actual entre los desarrolladores es hacer una aplicaciones que sean rápidas en un sólo hilo y luego escalar a tantas instancias como sea necesario para cubrir nuestros objetivos de aprovechamiento (ejemplo de esto es Redis o Python Tornado que usan epoll async), estos servidores pueden atender en un sólo proceso a miles o decena de miles de conexiones.
Por lo tanto, si queremos realizar un programa que aproveche las diferentes CPUs y pueda realizar múltiples tareas a la vez tenemos muchos mecanismos para llevar esta tarea a cabo. Dependiendo del uso que se quiera dar probablemente queramos usar hilos o procesos, es aquí donde querremos escribir nuestro código con Threading (hilos) o Multiprocessing (procesos).
Empezar a usar Multiprocessing es muy sencillo, como se puede observar a continuación:
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': p = Pool(5) print(p.map(f, [1, 2, 3]))
Resultado:
[1, 4, 9]
En este ejemplo se busca resolver un problema mediante la técnica del “divide y vencerás“, separando el trabajo a hacer entre varios procesos para que entre todos consigan terminar lo antes posible porque cada parte de la solución se resolvería en paralelo con las demás. Como se puede observar es muy sencillo empezar a trabajar con múltiples procesos.
En mi caso me preocupaba mucho el poder controlar qué tipo de procesos se iniciaban y qué hacían, que realmente el decirle al sistema “computa” y sentarme de brazos cruzados a esperar un resultado mágicamente, ya que mi objetivo en este desarrollo era controlar múltiples colas de trabajo de diferente tipo organizadas por prioridades y poder decidir (cuando cada hijo terminaba) qué tarea se metía en la cola para hacer las prioritarias primero. Es por esto que decidí entrar más a fondo en cómo usar Multiprocessing de forma más granular. De aquí surgió el siguiente código en el que nosotros controlamos el Pool:
import time import random import multiprocessing class tarea: def __init__(self, cid): self.__cid=cid print("HIJO {0} - Nace".format(self.__cid)) def __del__(self): print("HIJO {0} - Muere".format(self.__cid)) def run(self): # Generamos un tiempo de espera aleatorio s=1+int(10*random.random()) print("HIJO {0} - Inicio (Durmiendo {1} segundos)".format(self.__cid,s)) time.sleep(s) print("HIJO {0} - Fin".format(self.__cid)) # Creamos la piscina (Pool) piscina = [] for i in range(1,5): print("PADRE: creando HIJO {0}".format(i)) piscina.append(multiprocessing.Process(name="Proceso {0}".format(i), target=tarea(i).run)) # Arrancamos a todos los hijos print("PADRE: arrancando hijos") for proceso in piscina: proceso.start() print("PADRE: esperando a que los procesos hijos hagan su trabajo") for proceso in piscina: proceso.join() print("PADRE: todos los hijos han terminado, cierro")
La primera vez obtenemos:
PADRE: creando HIJO 1 HIJO 1 - Nace PADRE: creando HIJO 2 HIJO 2 - Nace PADRE: creando HIJO 3 HIJO 3 - Nace PADRE: creando HIJO 4 HIJO 4 - Nace PADRE: arrancando hijos PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 1 - Inicio (Durmiendo 1 segundos) HIJO 2 - Inicio (Durmiendo 4 segundos) HIJO 4 - Inicio (Durmiendo 4 segundos) HIJO 3 - Inicio (Durmiendo 2 segundos) HIJO 1 - Fin HIJO 3 - Fin HIJO 2 - Fin HIJO 4 - Fin PADRE: todos los hijos han terminado, cierro HIJO 3 - Muere HIJO 2 - Muere HIJO 1 - Muere HIJO 4 - Muere
La segunda vez obtenemos:
PADRE: creando HIJO 1 HIJO 1 - Nace PADRE: creando HIJO 2 HIJO 2 - Nace PADRE: creando HIJO 3 HIJO 3 - Nace PADRE: creando HIJO 4 HIJO 4 - Nace PADRE: arrancando hijos PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 1 - Inicio (Durmiendo 1 segundos) HIJO 3 - Inicio (Durmiendo 2 segundos) HIJO 4 - Inicio (Durmiendo 5 segundos) HIJO 2 - Inicio (Durmiendo 6 segundos) HIJO 1 - Fin HIJO 3 - Fin HIJO 4 - Fin HIJO 2 - Fin PADRE: todos los hijos han terminado, cierro HIJO 3 - Muere HIJO 2 - Muere HIJO 1 - Muere HIJO 4 - Muere
Si nos fijamos en los resultados después de ejecutar dicho código, se pueden ver 2 problemas importantes:
1) Que el join() de los procesos se hace en el orden en que los procesos están almacenados en la piscina (Pool), eso no es “Multiproceso” ya que el proceso 4 podría haber terminado primero y estaría esperando a ser recogido por el padre, pero el padre lo ha recogido en último lugar en ambas ejecuciones.
2) Que el proceso en sí (objeto) sigue ocupando memoria porque todavía no ha sido destruido por Python, esto se debe a que sigue vinculado a la piscina (lista). Para que liberase verdaderamente la memoria debería liberarse también de la piscina.
Para dar este último paso de verdadero paralelismo, os propongo el siguiente ejemplo adaptado, donde nos centramos en cómo el padre interactúa con los hijos a fin de conseguir controlarlos granularmente y actuar en consecuencia al estado real de cada hijo (sea cuando sea). Ejemplo padre controla hijos:
import time import random import multiprocessing class tarea: def __init__(self, cid): self.__cid=cid print("HIJO {0} - Nace".format(self.__cid)) def __del__(self): print("HIJO {0} - Muere".format(self.__cid)) def run(self): # Generamos un tiempo de espera aleatorio s=1+int(10*random.random()) print("HIJO {0} - Inicio (Durmiendo {1} segundos)".format(self.__cid,s)) time.sleep(s) print("HIJO {0} - Fin".format(self.__cid)) # Creamos la piscina (Pool) piscina = [] for i in range(1,5): print("PADRE: creando HIJO {0}".format(i)) piscina.append(multiprocessing.Process(name="Proceso {0}".format(i), target=tarea(i).run)) # Arrancamos a todos los hijos print("PADRE: arrancando hijos") for proceso in piscina: proceso.start() print("PADRE: esperando a que los procesos hijos hagan su trabajo") # Mientras la piscina tenga procesos while piscina: # Para cada proceso de la piscina for proceso in piscina: # Revisamos si el proceso ha muerto if not proceso.is_alive(): # Recuperamos el proceso y lo sacamos de la piscina proceso.join() piscina.remove(proceso) del(proceso) # Para no saturar, dormimos al padre durante 1 segundo print("PADRE: esperando a que los procesos hijos hagan su trabajo") time.sleep(1) print("PADRE: todos los hijos han terminado, cierro")
La primera vez obtenemos:
PADRE: creando HIJO 1 HIJO 1 - Nace PADRE: creando HIJO 2 HIJO 2 - Nace PADRE: creando HIJO 3 HIJO 3 - Nace PADRE: creando HIJO 4 HIJO 4 - Nace PADRE: arrancando hijos PADRE: esperando a que los procesos hijos hagan su trabajo PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 2 - Inicio (Durmiendo 4 segundos) HIJO 4 - Inicio (Durmiendo 1 segundos) HIJO 3 - Inicio (Durmiendo 3 segundos) HIJO 1 - Inicio (Durmiendo 6 segundos) PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 4 - Fin HIJO 4 - Muere PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 3 - Fin PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 3 - Muere PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 2 - Fin HIJO 2 - Muere PADRE: esperando a que los procesos hijos hagan su trabajo PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 1 - Fin HIJO 1 - Muere PADRE: esperando a que los procesos hijos hagan su trabajo PADRE: todos los hijos han terminado, cierro
La segunda vez obtenemos:
PADRE: creando HIJO 1 HIJO 1 - Nace PADRE: creando HIJO 2 HIJO 2 - Nace PADRE: creando HIJO 3 HIJO 3 - Nace PADRE: creando HIJO 4 HIJO 4 - Nace PADRE: arrancando hijos PADRE: esperando a que los procesos hijos hagan su trabajo PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 1 - Inicio (Durmiendo 5 segundos) HIJO 2 - Inicio (Durmiendo 3 segundos) HIJO 3 - Inicio (Durmiendo 1 segundos) HIJO 4 - Inicio (Durmiendo 7 segundos) PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 3 - Fin HIJO 3 - Muere PADRE: esperando a que los procesos hijos hagan su trabajo PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 2 - Fin HIJO 2 - Muere PADRE: esperando a que los procesos hijos hagan su trabajo PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 1 - Fin HIJO 1 - Muere PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 4 - Fin PADRE: esperando a que los procesos hijos hagan su trabajo HIJO 4 - Muere PADRE: esperando a que los procesos hijos hagan su trabajo PADRE: todos los hijos han terminado, cierro
En resumen, con este sencillo ejemplo pasamos a tener un perfecto control de qué hace cada hijo y cuando, incluso podríamos hacer que arranque un proceso nuevo cuando otro termine, para mantener al sistema ocupado y maximizar así el trabajo realizado. Este es el enfoque que yo buscaba cuando comencé a trabajar con Multiprocessing: concretamente una cola compartida de tareas y que cada proceso estuviera recogiendo esas tareas de la cola y maximizase el uso de la CPU mientras hubiese trabajo que hacer en la cola. Incluso uno de los procesos podría inyectar nuevas tareas en la CPU conforme fuesen llegando al sistema.
He hablado de la gestión de procesos y del Pool, pero también indiqué previamente que existen otros mecanismos que hacen que Multiprocessing sea realmente interesante, estos son:
Y todas estas herramientas “thread safe“, es decir, que su utilización es segura entre los procesos y proporciona integridad en su funcionamiento.
Las colas permiten colocar en ellas elementos que están a disposición de todos los procesos de tal modo que si un proceso extrae algo de la cola este elemento NO estará disponible para el resto de procesos. Esto permite gestionar trabajos comunes y resultados de estos de una forma coherente para todos y en especial para el proceso padre tanto para la gestión del trabajo a realizar como a la hora de recoger los resultados generados por cada proceso. Ejemplo de cola (Queue):
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hola']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() p.join()
Resultado:
[42, None, 'hola']
Si algunos os preguntáis si existen colas con Prioridad en la librería Multiprocessing, la respuesta corta y rápida es NO, ya que al ser un sistema donde obligatoriamente los procesos deben comunicarse (mediante flujos compartidos), el mantener una cola por prioridades obligaría al sistema a usar semáforos para que los procesos no leyesen y escribiesen en la cola concurrentemente, esto provocaría una bajada importante del rendimiento. Yo, por las características de mi sistemas (el que he desarrollado), cada proceso tiene un tiempo alto de ejecución (por diseño) y por tanto he desarrollado una gestión dinámica de colas en la que el sistema mide el tiempo de ejecución de cada proceso y le pregunta a cada proceso cuando estima que el resultado estará listo (en base a la tarea que está realizando), de este modo mantiene la cola de prioridades actualizadas en función a estas 2 variables. Sin embargo en mi sistema no existe pérdida de rendimiento puesto que los semáforos sólo actúan cuando muere un proceso y puesto que mis procesos trabajan durante mucho tiempo, no existe posibilidad de postergación y retrasos entre los procesos por el efecto de los semáforos, y por consiguiente no existe pérdida de rendimiento apreciable.
Las tubería ayudan a establecer un canal de comunicación entre 2 actores. Cuando se genera una tubería, Python devuelve 2 objetos que responden a cada uno de los extremos de dicha tubería. Por lo general se pretende que existan sólo 2 procesos que interaccionen en el canal, aunque es viable que existan más procesos hablando si la interacción en el canal ocurre de forma ordenada. Por defecto el canal se abre en modo full-duplex, es decir, es posible hablar por ambos extremos a la vez sin corromper el contenido del canal.
El funcionamiento es sencillo, en cada extremo es posible enviar datos al canal y recibir datos de este. Es importante destacar que el canal se corrompe si 2 procesos leen o escriben simultáneamente por el mismo extremo, obviamente esto no ocurre si cada procesos lee o habla por un extremo diferente del canal. Ejemplo de tubería (Pipe):
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hola']) conn.close() if __name__ == '__main__': conexion_padre, conexion_hijo = Pipe() p = Process(target=f, args=(conexion_hijo,)) p.start() print conexion_padre.recv() p.join()
Resultado:
[42, None, 'hola']
Los semáforos funcionan de forma equivalente a su homólogo en la librería Threading (o flock() en otros lenguajes). El funcionamiento es muy sencillo: instancias el objeto y adquieres el candado de bloqueo, una vez lo consigues haces la tarea que tenías prevista realizar y posteriormente al finalizar liberas dicho candado. Ejemplo de semáforo (Lock):
from multiprocessing import Process, Lock def f(l, i): l.acquire() print 'hola mundo', i l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
Resultado:
hola mundo 0 hola mundo 1 hola mundo 2 hola mundo 3 hola mundo 4 hola mundo 5 hola mundo 6 hola mundo 7 hola mundo 8 hola mundo 9
La última genialidad de esta librería es la capacidad de disponer de una memoria compartida entre los procesos. Gracias a esto es posible que 2 o más procesos compartan una variable o array de forma segura. Sin embargo, la variable o el array deben usar elementos de C-Python, por lo que no es posible asignar objetos de otras clases a estas variables o arrays. No obstante, siempre se pueden inyectar objetos de clases que hereden de la clase Structure de ctypes y mediante esta definan un nuevo tipo C-Python. Ejemplo de memoria compartida (shared memory):
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:]
Resultado:
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
No obstante, a todo este cocktail le añadiría un desarrollo “Lock-free” o libre de bloqueos (Non-blocking algorithms), aquí os dejo unos enlaces que os ayudarán a entenderlo mejor (gracias a @m_karmona por esta aportación):
En resumen, la librería Multiprocessing es un equivalente a la librería Threading, pero orientada a procesos, y el trabajo que han hecho los desarrolladores de la misma es realmente admirable debido a que han simplificado y abstraído las capas más complejas de la gestión de procesos para permitir usar estos de una forma sencilla y simplificada.
Para más información podéis acudir a las siguiente referencias que os serán de utilidad (todas en inglés):
During our operations, very often, we use a Waterlinked Underwater GPS. This helps us to keep track in 3D of our movements and link video recording’s timing and telemetry to...
In my last post, “Finishing the frame for an Underwater ROV”, I gave all details about the design I used to build the frame for Alioli Underwater ROV. In this post, I...
Hola, me he confundido algo con esta seccion
if not proceso.is_alive():
# Recuperamos el proceso y lo sacamos de la piscina
proceso.join()
piscina.remove(proceso)
del(proceso)
Mi duda es si “if not proceso.is_alive()” esta verificando que el proceso ya este “muerto”, no me queda claro si el objetivo de proceso.join() es estar seguro que ha finalizado?, gracias! excelente tutorial
Que esté “muerto” no significa que haya liberado sus recursos:
Sigue este enlace a Wikipedia para obtener más información.
Fabuloso artículo, Juanmi, sobre un tema en el que muchos tenemos muy poca experiencia.
Estoy usando tu código (gracias por compartirlo) como base para un trabajo del siguiente tipo:
Tengo una colección de un centenar de tuplas para pasarle a una misma función.
Tenemos un i7 al que quiero darle un máximo de 7 tareas concurrentes.
Cuando vea que un proceso ha terminqdo, quiero lanzar otro.
¿Puedo utilizar el is_alive si el proceso está recién creado y aún no ha ejecutado ningún trabajo?
¿Puedo lanzar otro trabajo en un procedimiento que haya terminado el trabajo anterior?
Una vez más: gracias por tu artículo. Nos está siendo de gran ayuda
Estimado Vicente:
Me alegro que os esté sirviendo de ayuda.
is_alive() lo puedes usar desde el momento en que inicias la tarea. Si la tarea está haciendo algo o no es un tema interno que es ajeno a la librería Muiltiprocesing. Aunque estés haciendo un sleep is_alive() responderá con True.
Siempre que recuperes el proceso con join() puedes enganchar nuevos procesos en la piscina, en realidad el proceso padre es el encargado de ocuparse de que en la piscina siempre haya 7 procesos funcionando y no más. En tal caso yo haría un bucle while que integrara varios estados de control, si hay hueco lanzo otra tarea start(), si no hay hueco me duermo, si no hay más tareas salgo, y siempre que una tarea de mi piscina haya terminado la recupero con join().
Espero haber servido de ayuda,