Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/multi.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions CPU-bound.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import concurrent.futures
from hashlib import md5
from random import choice


def coin_gen(quantity):
count = 0
while count < quantity:
s = "".join([choice("0123456789") for i in range(50)])
h = md5(s.encode('utf8')).hexdigest()

if h.endswith("0000"):
count += 1
print(s, h)


def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
coin_gen(5)


if __name__ == '__main__':
main()
186 changes: 12 additions & 174 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,186 +1,17 @@
# Параллелизм и асинхронность

Мы затронем только самые общие аспекты работы с потоками и процессами. Задачи, которые мы будем рассматривать обладают свойством [чрезвычайная параллельности](https://ru.wikipedia.org/wiki/%D0%A7%D1%80%D0%B5%D0%B7%D0%B2%D1%8B%D1%87%D0%B0%D0%B9%D0%BD%D0%B0%D1%8F_%D0%BF%D0%B0%D1%80%D0%B0%D0%BB%D0%BB%D0%B5%D0%BB%D1%8C%D0%BD%D0%BE%D1%81%D1%82%D1%8C).

Образцом для работы мы примем два куска кода из примера документации CPython для модуля [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html). Класc, больше подходящий для IO-bound задач: `ThreadPoolExecutor` (используются потоки), а для CPU-bound &mdash; `ProcessPoolExecutor` (используются процессы). Оба работают по принципу запуска одноранговых воркеров с некоторой функцией внутри (как кассы в &laquo;Пятерочке&raquo;).

## ThreadPoolExecutor

```python
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
```
## ProcessPoolExecutor

```python
import concurrent.futures
import math

PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]

def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
main()
```

Возврат в синхронный код происходит благодаря использованию генератора `concurrent.futures.as_completed`, который возвращает результаты по мере готовности их в воркерах. Ручная синхронизация отсутствует, что очень удобно.

Помните о том, что в CPython есть [GIL](https://docs.python.org/3/glossary.html#term-global-interpreter-lock), что не позволяет эффективно работать с потоками в CPU-bound задачах.

### IO-bound. Проверяем ссылки на страницах Википедии

Википедия &mdash; вторичный источник информации: высказывания в ней должны опираться на авторитетные источники в виде ссылок. Публикация оригинальных исследований запрещена. Со временем ссылки становятся нерабочими (сайт сделал редизайн, DNS больше не принадлежит владельцам, за хостинг не заплатили, сервис закрылся).

Давайте попытаемся оценить количество неработающих ссылок. Возьмем 100 случайных страниц Википедии (пройдем по ссылке [Случайная страница](https://ru.wikipedia.org/wiki/%D0%A1%D0%BB%D1%83%D0%B6%D0%B5%D0%B1%D0%BD%D0%B0%D1%8F:%D0%A1%D0%BB%D1%83%D1%87%D0%B0%D0%B9%D0%BD%D0%B0%D1%8F_%D1%81%D1%82%D1%80%D0%B0%D0%BD%D0%B8%D1%86%D0%B0)).

```python
from urllib.request import urlopen
from urllib.parse import unquote
from bs4 import BeautifulSoup

url = 'https://ru.wikipedia.org/wiki/%D0%A1%D0%BB%D1%83%D0%B6%D0%B5%D0%B1%D0%BD%D0%B0%D1%8F:%D0%A1%D0%BB%D1%83%D1%87%D0%B0%D0%B9%D0%BD%D0%B0%D1%8F_%D1%81%D1%82%D1%80%D0%B0%D0%BD%D0%B8%D1%86%D0%B0'

for i in range(100):
s = urlopen(url)
print(unquote(s.url))

##https://ru.wikipedia.org/wiki/Бурасы
##https://ru.wikipedia.org/wiki/Волшебный_куст
##https://ru.wikipedia.org/wiki/Льюис,_Леннокс
##https://ru.wikipedia.org/wiki/Ильинская_Поповка
##https://ru.wikipedia.org/wiki/Стрельцов,_Василий_Витальевич
##https://ru.wikipedia.org/wiki/Anasimyia
##https://ru.wikipedia.org/wiki/Малая_Осница
##https://ru.wikipedia.org/wiki/Владимиров,_Георгий_Петрович
##https://ru.wikipedia.org/wiki/Bhutan_Today
##https://ru.wikipedia.org/wiki/Польтроньери,_Альберто
##https://ru.wikipedia.org/wiki/Радзивилл,_Мартин_Николай
##https://ru.wikipedia.org/wiki/Эренрайк,_Олден
```

Из полученных страниц извлечем все ссылки и сохраним их в файл.

```python
from urllib.request import urlopen
from urllib.parse import unquote
from bs4 import BeautifulSoup
from tqdm import tqdm

url = 'https://ru.wikipedia.org/wiki/%D0%A1%D0%BB%D1%83%D0%B6%D0%B5%D0%B1%D0%BD%D0%B0%D1%8F:%D0%A1%D0%BB%D1%83%D1%87%D0%B0%D0%B9%D0%BD%D0%B0%D1%8F_%D1%81%D1%82%D1%80%D0%B0%D0%BD%D0%B8%D1%86%D0%B0'

res = open('res.txt', 'w', encoding='utf8')

for i in tqdm(range(100)):
html = urlopen(url).read().decode('utf8')
soup = BeautifulSoup(html, 'html.parser')
links = soup.find_all('a')

for l in links:
href = l.get('href')
if href and href.startswith('http') and 'wiki' not in href:
print(href, file=res)
```

Попробуем теперь синхронно, в 1 поток спрашивать каждую ссылку. Возможно иногда будет 404, возможно будет ошибка соединения.

```python
from urllib.request import Request, urlopen
from urllib.parse import unquote

links = open('res.txt', encoding='utf8').read().split('\n')

for url in links:
try:
request = Request(
url,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 9.0; Win65; x64; rv:97.0) Gecko/20105107 Firefox/92.0'},
)
resp = urlopen(request, timeout=5)
code = resp.code
print(code)
resp.close()
except Exception as e:
print(url, e)
```

* Замерьте время синхронной проверки ссылок.
* Перепишите код, используя `ThreadPoolExecutor`.
* Изменяйте количество воркеров: 5, 10, 100.
* Во время работы посмотрите с использованием стандартных утилит вашей OC загрузку памяти, процессора, сети, время работы. Зависят ли они от количества воркеров и как?

## CPU-bound. Генерируем монетки

Придумаем некоторый прототип криптовалюты, построенный на концепции [Proof of work](https://en.wikipedia.org/wiki/Proof_of_work). Монетой будет считаться некоторая строка длины 50 из последовательности цифр 0-9, у которой md5-hash заканчивается на `00000`. Так как md5 &mdash; односторонняя функция, мы не можем по ее результату судить об аргументе, найти монеты мы можем только одим способом &mdash; перебором.

```python
from hashlib import md5
from random import choice
* 5 workers: 96.50568079948425 seconds
* 10 workers: 25.422240495681763 seconds
* 100 workers: 18.888001918792725 seconds

Разница составляет 1% между 5 и 100 воркерами. Минимальное значение 0.1%. Максимальное 1.3%

while True:
s = "".join([choice("0123456789") for i in range(50)])
h = md5(s.encode('utf8')).hexdigest()

if h.endswith("00000"):
print(s, h)
```

Я нашел несколько монет:

```
91625571520935147263403534421427761877088219542499 8adaf58d5c51fc1216820c1201100000
49262841446921579383645162499800846153508846372671 974d52bc5430d4c8ed96963648e00000
34359601233782192016006582448729953029075086207271 0209b01867080f7eaf20f6c674000000
02809251779741159345845523287375801745436182367614 2fd27ad5f1d1efe1f000c3ee66f00000
```
## CPU-bound. Генерируем монетки

У нас отсутсвует Блокчейн, то есть мы не можем доказать, что монета была сгенерирована именно нами или принадлежит нам: если мы кому-то ее покажем, ее тут же украдут. Эту часть мы оставим за рамками задания.

Expand All @@ -190,3 +21,10 @@ while True:
* Во время работы посмотрите с использованием стандартных утилит вашей OC загрузку памяти, процессора, сети, время работы. Зависят ли они от количества воркеров и как?
* Убедитесь в том, что так как задача CPU bound, наращивать количество воркеров, большее количества ядер, бесполезно.

* 2 workers: 94.197070837020874 seconds
* 4 workers: 58.564107233047485 seconds
* 5 workers: 55.39698505401611 seconds
* 10 workers: 25.912094116210938 seconds
* 100 workers: 27.17059063911438 seconds

При увеличении воркеров снижается нагрузка на каждый поток в среднем. Снижается максимальная нагрузка на поток.
32 changes: 32 additions & 0 deletions bound.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from urllib.request import Request, urlopen
from urllib.parse import unquote
import time
import concurrent.futures

start_time = time.time()
threads_count = 10
with open('res.txt', "r", encoding='utf8') as f:
links = f.readlines()


def url_check(url):
for url in links:
try:
request = Request(
url,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 9.0; Win65; x64; rv:97.0) Gecko/20105107 Firefox/92.0'},
)
resp = urlopen(request, timeout=5)
code = resp.code
print(code)
resp.close()
except Exception as e:
return links, e


with concurrent.futures.ThreadPoolExecutor(threads_count) as executor:
futures = [executor.submit(url_check, url=url) for url in links]
for future in concurrent.futures.as_completed(futures):
future.result()

print("---%s seconds ---" % (time.time() - start_time))
Loading