Параллельное программирование#

Теоретический минимум#

Типы задач требующих распаралеливания:

  • CPU-bound (вычислительно сложные)

  • IO-bound (требующие большого объема ввода-вывода)

Уровни параллелизма с прикладной точки зрения#

  • Кооперативная многозадачность (async, await, тасклеты)

  • Потоки

  • Процессы

  • Гомогенные вычислительные кластеры

  • Гетерагенные вычислительные кластеры

Критический путь#

import pydot
from IPython.display import Image, display
def view_pydot(pdot): plt = Image(pdot.create_png()); display(plt);

mydot = ''' graph my_graph { rankdir="LR";
    a [color="red", style="solid"]; b1; b2; b3 [color="red", style="solid"]; b4; c [color="red", style="solid"];
    d1 [color="red", style="solid"]; d2; d3; e [color="red", style="solid"]; a -- b1 -- c; a -- b2 -- c; a -- b3 -- c;
    a -- b4 -- c; c -- d1 -- e; c -- d2 -- e; c -- d3 -- e; } '''

graphs = pydot.graph_from_dot_data(mydot)
view_pydot(graphs[0])
_images/17c5c698517f553f6a6021a69c96a5744f1509dc87df523d17cc322e9de56015.png

Парадигма map-reduce#

Есть способ записи программ способствующий их эффективному распараллеливанию - парадигма map-reduce.

import pydot
from IPython.display import Image, display
def view_pydot(pdot): plt = Image(pdot.create_png()); display(plt);

mydot = ''' digraph my_graph { rankdir="LR"; aa [shape=record, label="A1|A2|A3|A4|A5|A6"];
    nmap[label="map(f, [A]) → [B]"]; nmapfoo[label="f(A) → B"]; bb [shape=record, label="B1|B2|B3|B4|B5|B6"];
    nfilter[label="filter(g, [B]) → [B]"]; nfilterfoo[label="g(B) → True/False"]; cc [shape=record, label="B1|B3|B5"];
    nreduce[label="reduce(h, [B]) → B"]; nreducefoo[label="h(B,B,I) → B"]; dd [shape=record, label="B"];
    aa -- nmap -- bb -- nfilter -- cc -- nreduce -- dd; nmapfoo -- nmap; nfilterfoo -- nfilter; nreducefoo -- nreduce; } '''

graphs = pydot.graph_from_dot_data(mydot)
view_pydot(graphs[0])
_images/4b07baf846b3324750f8664cff458fd6d6dbb92c7f88564c86e451bef8402c9e.png
from functools import reduce

#Находим колисчество символов с самой короткой непустой строке

data = ["mklsf", "", "woiejfwe", "mkm", "oiejwefw", "pujskdlfjiejsmd", "", "", "sdfe"]

def count_symbols(s):
    return len(s)

def fiter_empty(n):
    return n > 0

def find_min(a,b):
    return min(a,b)

result = reduce(find_min,filter(fiter_empty, map(count_symbols, data)), 1000)

print(result)
3

Запуск параллельных вычислений с использованием пулов#

Запуска многопоточного выполнения функции через ThreadPoolExecutor#

from time import sleep
import numpy.random as random
from concurrent.futures import ThreadPoolExecutor

common_data = []

def work(i):
    sleep(random.random())
    common_data.append(i)
    # print(f'Task {i} is complete')

data = list(range(8))

results = []

with ThreadPoolExecutor(2) as executor:
    executor.map(work, data)

print(common_data)
[1, 0, 3, 2, 4, 5, 7, 6]

Запуска многопоточного выполнения функции через ProcessPoolExecutor#

from time import sleep
import numpy.random as random
from concurrent.futures import ProcessPoolExecutor

#Этот пример не работет - common_data не является общей переменный для разных процессов

common_data = []

def work(i):
    sleep(random.random())
    common_data.append(i)
    # print(f'Task {i} is complete')
    
if __name__ == '__main__':
    data = list(range(8))
    with ProcessPoolExecutor(2) as executor:
        executor.map(work, data)
    print(common_data)
[]

Автоматический сбор результатов через return#

from time import sleep
import numpy.random as random
from concurrent.futures import ProcessPoolExecutor
from multiprocessing.sharedctypes import Array

def work(i):
    sleep(random.random())
    # print(f'Task {i} is complete')
    return i
    
if __name__ == '__main__':
    data = list(range(8))
    with ProcessPoolExecutor(2) as executor:
        ft = executor.map(work, data)
    
    print(list(ft))
[0, 1, 2, 3, 4, 5, 6, 7]

Запуска многопоточного выполнения функции через MPIPoolExecutor#

Для запуск данного кода нужно раскоментировать все закоментированные строки, а запуск следует производить командой mpiexec -n 4 python -m mpi4py.futures file.py, где 4 - число вычислительных узлов.

# from mpi4py import MPI
# from mpi4py.futures import MPIPoolExecutor
from time import sleep
import numpy.random as random

def work(i):
    sleep(random.random())
    return i

if __name__ == '__main__':
    data = list(range(8))
    results = []
    #with MPIPoolExecutor(2) as executor:
    #    s = executor.map(work, data)
    # print(list(s))