Jusene's Blog

Python 并发编程之线程池/进程池

字数统计: 643阅读时长: 3 min
2018/05/15 Share

并发

python标准库为我们提供了threading和multiprocessing模块编写对线程多进程,但是频繁的创建/销毁线程或进程是非常消耗资源的,这时候我们就需要编写线程池和进程池,以空间来换取时间。自从python3.2后标准库为我们提供了concurrent.future模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象。

submit提交任务到线程池/进程池

1
from concurrent.futures import ThreadPoolExecutor
2
import time
3
4
def task(message):
5
    time.sleep(10)
6
    return message
7
8
pool = ThreadPoolExecutor(max_workers=2) #创建一个最大可容纳2个task的线程池
9
task1 = pool.submit(task,('Thread1'))
10
task2 = pool.submit(task,('Thread2'))
11
while not (task1.done() and task2.done()):
12
    print(task1.result())
13
    print(task2.result())

测试:

1
~]# time python threadpool.py
2
Thread1
3
Thread2
4
python threadpool.py  0.10s user 0.06s system 1% cpu 10.176 total

通过submit方法往线程池中加入一个task,submit返回一个future对象,因为time.sleep(10),导致主线程柱塞,所以在整个过程中,应该运行有三个线程。

1
from concurrent.futures import ProcessPoolExecutor
2
import time
3
4
def task(message):
5
    time.sleep(10)
6
    return message
7
8
pool = ProcessPoolExecutor(max_workers=2) #创建一个最大可容纳2个task的进程池
9
task1 = pool.submit(task,('Process1'))
10
task2 = pool.submit(task,('Process2'))
11
while not (task1.done() and task2.done()):
12
    print(task1.result())
13
    print(task2.result())

测试:

1
~]# time python processpool.py
2
Process1
3
Process2
4
python processpool.py  0.11s user 0.08s system 1% cpu 10.185 total

map/wait操作线程池/进程池

1
import concurrent.futures 
2
from bs4 import BeautifulSoup
3
import requests
4
5
URLS = ['http://wwww.baidu.com','http://www.sina.com','http://www.163.com','http://www.qq.com']
6
7
def parse_title(url):
8
    with requests.get(url,timeout=60) as conn:
9
        html = conn.content.decode('utf-8')
10
        bs = BeautifulSoup(html,'html5lib')
11
        return bs.title.get_text()
12
13
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
14
    future_get_title = {executor.submit(parse_title,(url)) : url for url in URLS}
15
    for future in concurrent.futures.as_completed(future_get_title):
16
        url = future_get_title[future]
17
        try:
18
            title = future.result()
19
        except Exception as e:
20
            print(e)
21
        else:
22
            print('{} title is {}'.format(url,title))

map 正如我们平时使用的map一样的使用方法:

1
import concurrent.futures 
2
from bs4 import BeautifulSoup
3
import requests
4
5
URLS = ['http://wwww.baidu.com','http://www.sina.com','http://www.163.com','http://www.qq.com']
6
7
def parse_title(url):
8
    with requests.get(url,timeout=60) as conn:
9
        html = conn.content.decode('utf-8')
10
        bs = BeautifulSoup(html,'html5lib')
11
        return bs.title.get_text()
12
13
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
14
    for url,title in zip(URLS,executor.map(parse_title,URLS)):
15
        print('{} title is {}'.format(url,title))

wait方法返回一个tuple,tuple中包含两个集合set,一个是complated(已完成),另一个是uncompeleted(未完成的),它接受3个参数:FIRST_COMPLATED,FIRST_EXCEPTION和ALL_COMPLETE,默认ALL_COMPLETED。

1
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
2
from time import sleep
3
from random import randint
4
def return_after_random(num):
5
    sleep(randint(1,5))
6
    return 'Return of {}'.format(num)
7
8
pool = ThreadPoolExecutor(5)
9
futures = []
10
for i in range(5):
11
    futures.append(pool.submit(return_after_random,'x'))
12
    wait(futures,timeout=None,return_when='ALL_COMPLETE')
CATALOG
  1. 1. 并发
  2. 2. submit提交任务到线程池/进程池
  3. 3. map/wait操作线程池/进程池