并发
python标准库为我们提供了threading和multiprocessing模块编写对线程多进程,但是频繁的创建/销毁线程或进程是非常消耗资源的,这时候我们就需要编写线程池和进程池,以空间来换取时间。自从python3.2后标准库为我们提供了concurrent.future模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象。
submit提交任务到线程池/进程池
1 | from concurrent.futures import ThreadPoolExecutor |
2 | import time |
3 |
|
4 | def task(message): |
5 | time.sleep(10) |
6 | return message |
7 |
|
8 | pool = ThreadPoolExecutor(max_workers=2) |
9 | task1 = pool.submit(task,('Thread1')) |
10 | task2 = pool.submit(task,('Thread2')) |
11 | while not (task1.done() and task2.done()): |
12 | print(task1.result()) |
13 | print(task2.result()) |
测试:
1 | ~] |
2 | Thread1 |
3 | Thread2 |
4 | python threadpool.py 0.10s user 0.06s system 1% cpu 10.176 total |
通过submit方法往线程池中加入一个task,submit返回一个future对象,因为time.sleep(10),导致主线程柱塞,所以在整个过程中,应该运行有三个线程。
1 | from concurrent.futures import ProcessPoolExecutor |
2 | import time |
3 |
|
4 | def task(message): |
5 | time.sleep(10) |
6 | return message |
7 |
|
8 | pool = ProcessPoolExecutor(max_workers=2) |
9 | task1 = pool.submit(task,('Process1')) |
10 | task2 = pool.submit(task,('Process2')) |
11 | while not (task1.done() and task2.done()): |
12 | print(task1.result()) |
13 | print(task2.result()) |
测试:
1 | ~] |
2 | Process1 |
3 | Process2 |
4 | python processpool.py 0.11s user 0.08s system 1% cpu 10.185 total |
map/wait操作线程池/进程池
1 | import concurrent.futures |
2 | from bs4 import BeautifulSoup |
3 | import requests |
4 |
|
5 | URLS = ['http://wwww.baidu.com','http://www.sina.com','http://www.163.com','http://www.qq.com'] |
6 |
|
7 | def parse_title(url): |
8 | with requests.get(url,timeout=60) as conn: |
9 | html = conn.content.decode('utf-8') |
10 | bs = BeautifulSoup(html,'html5lib') |
11 | return bs.title.get_text() |
12 |
|
13 | with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: |
14 | future_get_title = {executor.submit(parse_title,(url)) : url for url in URLS} |
15 | for future in concurrent.futures.as_completed(future_get_title): |
16 | url = future_get_title[future] |
17 | try: |
18 | title = future.result() |
19 | except Exception as e: |
20 | print(e) |
21 | else: |
22 | print('{} title is {}'.format(url,title)) |
map 正如我们平时使用的map一样的使用方法:
1 | import concurrent.futures |
2 | from bs4 import BeautifulSoup |
3 | import requests |
4 |
|
5 | URLS = ['http://wwww.baidu.com','http://www.sina.com','http://www.163.com','http://www.qq.com'] |
6 |
|
7 | def parse_title(url): |
8 | with requests.get(url,timeout=60) as conn: |
9 | html = conn.content.decode('utf-8') |
10 | bs = BeautifulSoup(html,'html5lib') |
11 | return bs.title.get_text() |
12 |
|
13 | with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: |
14 | for url,title in zip(URLS,executor.map(parse_title,URLS)): |
15 | print('{} title is {}'.format(url,title)) |
wait方法返回一个tuple,tuple中包含两个集合set,一个是complated(已完成),另一个是uncompeleted(未完成的),它接受3个参数:FIRST_COMPLATED,FIRST_EXCEPTION和ALL_COMPLETE,默认ALL_COMPLETED。
1 | from concurrent.futures import ThreadPoolExecutor,wait,as_completed |
2 | from time import sleep |
3 | from random import randint |
4 | def return_after_random(num): |
5 | sleep(randint(1,5)) |
6 | return 'Return of {}'.format(num) |
7 |
|
8 | pool = ThreadPoolExecutor(5) |
9 | futures = [] |
10 | for i in range(5): |
11 | futures.append(pool.submit(return_after_random,'x')) |
12 | wait(futures,timeout=None,return_when='ALL_COMPLETE') |