Python и пляски с процессами

Про использование нескольких процессов в Python уже написано невообразимое множество статей, хороших и не очень; сломано немало копий в обсуждении проблемы GIL и псевдопотоков. Я же просто хочу рассказать о том, как решалась задача с управлением небольшого дерева дочерних процессов.

Исходная ситуация

Основной процесс запускает два дочерних, используя multiprocessing.Process, передает туда очередь multiprocessing.Queue. Первый дочерний процесс просто собирает какие-то данные и пишет в очередь, второй же запускает еще несколько процессов, куда передает ту-же очередь, с которой они и работают.

Задача 1: Завершение родительского процесса по любой причине должно вызывать автоматическое завершение всех дочерних.

Как известно, чтобы дочерний процесс не остался висеть после завершения родительского и не стал зомби, нужно вызвать его метод join. Это блокирующий метод, который остановит основной поток выполнения до завершения дочернего процесса. Другой вариант - выставление свойства daemon дочернего процесса в True, что приведет к автоматическому его завершению после закрытия родителя.

В простых случаях этого достаточно. Правда, второй метод нельзя использовать для процессов, которые должны в свою очередь запускать другие процессы. В этой ситуации явно вызывать join процесса при корректном завершении основного потока можно с помощью atexit.

from multiprocessing import Process
import time
import atexit

def func():
    for i in range(10):
        print(i)
        time.sleep(1)

# Blocking
p = Process(target=func)
p.start()
p.join()  # Waiting for 10 seconds

# Daemonized
p = Process(target=func)
p.daemon = True
p.start()

@atexit.register
def clean():
    p.terminate()
    p.join()

Но все это не очень помогает, если родительский процесс убивают извне, например используя команду kill. Хорошо, что у нас есть модуль signal, позволяющий повесить реакцию на системные сообщения. Например, вместо atexit повесим реакцию на обычный kill (по умолчанию он посылает код SIGTERM):

import signal

def clean(signum, frame):
    p.terminate()
    p.join()
    exit(15)

signal.signal(signal.SIGTERM, clean)

Можно также упростить себе жизнь, если дочерних процессов много. Для этого сначала определим себе группу процессов, а потом отправим сообщение сразу всей группе:

import os
import signal

os.setpgrp()

def clean(signum, frame):
    os.killpg(0, signal.SIGKILL)

signal.signal(signal.SIGTERM, clean)

# ... Start our child-processes ...

Задача 2. Завершение родительского процесса при падении дочернего по любой причине

Тут тоже гроздь способов, один другого лучше. Первый заключается в создании треда в основном процессе, который будет периодически проверять объекты процессов, вызывая у них метод is_alive, и в случае значения False хотя бы у одного вызывать тот же os.killpg(0, signal.SIGTERM). Второй - это тот-же atexit для обработки корректного завершения изнутри дочернего процесса. Ну и наконец ловля событий с использованием модуля signal.

Задача 3. Завершение дочернего процесса при смене родителя (перемещение на init)

Ну и наконец ситуация, когда родительский процесс убит быстро и в голову, например отправкой SIGKILL (kill -9 pid). В этом случае обработать закрытие процесса не поможет ни signal, ни atexit. Дочерние процессы вываливаются и их родителем становится корневой процесс OS, для linux это будет видимо init. А это значит, что обработкой придется заниматься дочернему процессу, который внезапно получил другого родителя. Ну в общем, почему бы и нет? Всего то и нужно - получить PID родительского процесса и периодически проверять, не изменился ли он.

parent_pid = os.getppid()
while True:
    if os.getppid() != parent_pid:
        exit(9)
    time.sleep(1)

Задача 4. Закрытие процесса, если он слишком долго живет

Также бывает такое, что процесс должен обязательно завершиться за ограниченное время, причины могут быть разные, не суть. Решения тоже возможны всякие, но вот например с использованием модуля signal: вешаем слушателя собщения на SIGALARM и отправляем себе таковое через 10 секунд:

def alarm_quit():
    exit(9)
signal.signal(signal.SIGALRM, alarm_quit)
signal.alarm(10)
Дата публикации: 2016-09-01