Параллельное программирование#
Теоретический минимум#
Типы задач требующих распаралеливания:
CPU-bound (вычислительно сложные)
IO-bound (требующие большого объема ввода-вывода)
Уровни параллелизма с прикладной точки зрения#
Кооперативная многозадачность (async, await, тасклеты)
Потоки
Процессы
Гомогенные вычислительные кластеры
Гетерагенные вычислительные кластеры
Критический путь#
import pydot
from IPython.display import Image, display
def view_pydot(pdot): plt = Image(pdot.create_png()); display(plt);
mydot = ''' graph my_graph { rankdir="LR";
a [color="red", style="solid"]; b1; b2; b3 [color="red", style="solid"]; b4; c [color="red", style="solid"];
d1 [color="red", style="solid"]; d2; d3; e [color="red", style="solid"]; a -- b1 -- c; a -- b2 -- c; a -- b3 -- c;
a -- b4 -- c; c -- d1 -- e; c -- d2 -- e; c -- d3 -- e; } '''
graphs = pydot.graph_from_dot_data(mydot)
view_pydot(graphs[0])
Парадигма map-reduce#
Есть способ записи программ способствующий их эффективному распараллеливанию - парадигма map-reduce.
import pydot
from IPython.display import Image, display
def view_pydot(pdot): plt = Image(pdot.create_png()); display(plt);
mydot = ''' digraph my_graph { rankdir="LR"; aa [shape=record, label="A1|A2|A3|A4|A5|A6"];
nmap[label="map(f, [A]) → [B]"]; nmapfoo[label="f(A) → B"]; bb [shape=record, label="B1|B2|B3|B4|B5|B6"];
nfilter[label="filter(g, [B]) → [B]"]; nfilterfoo[label="g(B) → True/False"]; cc [shape=record, label="B1|B3|B5"];
nreduce[label="reduce(h, [B]) → B"]; nreducefoo[label="h(B,B,I) → B"]; dd [shape=record, label="B"];
aa -- nmap -- bb -- nfilter -- cc -- nreduce -- dd; nmapfoo -- nmap; nfilterfoo -- nfilter; nreducefoo -- nreduce; } '''
graphs = pydot.graph_from_dot_data(mydot)
view_pydot(graphs[0])
from functools import reduce
#Находим колисчество символов с самой короткой непустой строке
data = ["mklsf", "", "woiejfwe", "mkm", "oiejwefw", "pujskdlfjiejsmd", "", "", "sdfe"]
def count_symbols(s):
return len(s)
def fiter_empty(n):
return n > 0
def find_min(a,b):
return min(a,b)
result = reduce(find_min,filter(fiter_empty, map(count_symbols, data)), 1000)
print(result)
3
Запуск параллельных вычислений с использованием пулов#
Запуска многопоточного выполнения функции через ThreadPoolExecutor#
from time import sleep
import numpy.random as random
from concurrent.futures import ThreadPoolExecutor
common_data = []
def work(i):
sleep(random.random())
common_data.append(i)
# print(f'Task {i} is complete')
data = list(range(8))
results = []
with ThreadPoolExecutor(2) as executor:
executor.map(work, data)
print(common_data)
[1, 0, 3, 2, 4, 5, 7, 6]
Запуска многопоточного выполнения функции через ProcessPoolExecutor#
from time import sleep
import numpy.random as random
from concurrent.futures import ProcessPoolExecutor
#Этот пример не работет - common_data не является общей переменный для разных процессов
common_data = []
def work(i):
sleep(random.random())
common_data.append(i)
# print(f'Task {i} is complete')
if __name__ == '__main__':
data = list(range(8))
with ProcessPoolExecutor(2) as executor:
executor.map(work, data)
print(common_data)
[]
Автоматический сбор результатов через return#
from time import sleep
import numpy.random as random
from concurrent.futures import ProcessPoolExecutor
from multiprocessing.sharedctypes import Array
def work(i):
sleep(random.random())
# print(f'Task {i} is complete')
return i
if __name__ == '__main__':
data = list(range(8))
with ProcessPoolExecutor(2) as executor:
ft = executor.map(work, data)
print(list(ft))
[0, 1, 2, 3, 4, 5, 6, 7]
Запуска многопоточного выполнения функции через MPIPoolExecutor#
Для запуск данного кода нужно раскоментировать все закоментированные строки, а запуск следует производить командой mpiexec -n 4 python -m mpi4py.futures file.py
, где 4 - число вычислительных узлов.
# from mpi4py import MPI
# from mpi4py.futures import MPIPoolExecutor
from time import sleep
import numpy.random as random
def work(i):
sleep(random.random())
return i
if __name__ == '__main__':
data = list(range(8))
results = []
#with MPIPoolExecutor(2) as executor:
# s = executor.map(work, data)
# print(list(s))