threading
threading库是一个提供线程相关的操作,线程是应用程序工作的最小单位,python当前版本的多线程没有实现优先级、线程,组线程也不能被停止、暂停、恢复、中断,且cPython的GIL锁的存在,python的多线程并不适合处理cpu密集型的应用,建议使用多进程。
Thread类
构造方法:
Thread(group=None,target=None,name=None,arg=(),kwargs=None,*,daemon=None)
- group: 线程组,目前还没有实现,库引用中必须是None
- target: 要执行的方法
- name: 线程名
- args/kwargs: 要传入方法的参数
实例方法:
- isAlive(): 返回线程是否在运行,正在运行指启动后、终止前
- getName(): 获取线程名
- isDaemon(): 获取是否为后台线程
- join(timeout=None): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout
- setDaemon(): 设置为后台线程
- setName(name): 设置线程名
- start(): 启动线程
创建线程的两种方法:
1 | import threading |
2 | import time |
3 | |
4 | def action(arg): |
5 | time.sleep(arg) |
6 | print('sleep {}'.format(arg)) |
7 | |
8 | for i in range(4): |
9 | t=threadig.Thread(target=action,args=(i,)) |
10 | t.start() |
11 | |
12 | print('main thread end!') |
1 | import threading |
2 | import time |
3 | |
4 | class MyThread(threading.Thread): |
5 | def __init__(self,arg): |
6 | super(MyThread,self).__init__() #显式的调用父类的初始化函数 |
7 | self.arg=arg |
8 | def run(self): |
9 | time.sleep(self.arg) |
10 | print('sleep {}'.format(self.arg)) |
11 | |
12 | for i in range(4): |
13 | t=MyThread(i) |
14 | #t.setDaemon(True) #设置为后台线程,后台线程在前台线程退出就会退出 |
15 | t.start() |
16 | #t.join() #阻塞当前环境的上下文 |
17 | |
18 | print('main thread end!') |
Lock、Rlock类
由于线程的随机调度:某线程可能在执行n条后,cpu接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。
实例方法:
- acquire([timeout]): 尝试获得锁定,使线程进入同步阻塞状态。
- release(): 释放锁,使用前线程必须已获得锁定,否则抛出异常。
1 | import threading,time |
2 | |
3 | gl_num=0 |
4 | lock=threading.RLock() |
5 | |
6 | def action(): |
7 | lock.acquire() |
8 | global gl_num |
9 | gl_num+=1 |
10 | time.sleep(1) |
11 | print('gl_num') |
12 | lock.release() |
13 | |
14 | |
15 | for i in range(10): |
16 | t=threading.Thread(target=action) |
17 | t.start() |
Lock属于全局,重复锁定会产生死锁;RLock属于线程,可重复施加锁,需要执行相同次数的锁释放。
Condition类
Condition通常与一个锁关联,需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自动生产一个RLock实例。
可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
实例方法:
- acquire([timeout])/release():调用关联的锁的相应方法
- wait([timeout]):调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常
- notify():调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定,其他线程仍然在等待池中
-notifyAll():调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定
生产者消费者模型:
1
import threading
2
import time
3
4
product=None
5
con=threading.Condition()
6
7
#生产者方法
8
def produce():
9
global product
10
if con.acquire():
11
while True:
12
if product is None:
13
print('produce...')
14
product='anything'
15
16
#通知消费者,商品已经生产
17
con.notify()
18
19
#等待通知
20
con.wait()
21
time.sleep(2)
22
23
#消费者方法
24
def consume():
25
global product
26
if con.acquire():
27
while True:
28
if product is not None:
29
print('consume...')
30
product=None
31
32
#通知生产者,商品已经没了
33
con.notify()
34
35
#等待通知
36
con.wait()
37
time.sleep(2)
38
39
t1=threading.Thread(target=produce)
40
t2=threading.Thread(target=consume)
41
t2.start()
42
t1.start()
1 | import threading |
2 | import time |
3 | |
4 | con=threading.Condition() |
5 | product=0 |
6 | |
7 | class Producer(threading.Thread): |
8 | def __init__(self,name='phone'): |
9 | super(Producer,self).__init__() |
10 | self.name=name |
11 | def run(self): |
12 | global product |
13 | while True: |
14 | if con.acquire(): |
15 | if product < 10: |
16 | product+=1 |
17 | print('Producer(%s):deliver one,now product:%s' % (self.name,product)) |
18 | con.notify() |
19 | con.release() |
20 | else: |
21 | print('Producer(%s): already 10,stop deliver,now product:%s' % (self.name,product)) |
22 | con.wait() |
23 | time.sleep(2) |
24 | |
25 | class Consumer(threading.Thread): |
26 | def __init__(self,name='phone'): |
27 | super(Consumer,self).__init__() |
28 | self.name=name |
29 | def run(self): |
30 | global product |
31 | while True: |
32 | if con.acquire(): |
33 | if product > 1: |
34 | product -= 1 |
35 | print('Consumer(%s):consume one,now product:%s' % (self.name,product)) |
36 | con.notify() |
37 | con.release() |
38 | else: |
39 | print('Consumer(%s):only one,stop consume,product:%s' % (self.name,product)) |
40 | con.wait() |
41 | time.sleep(2) |
42 | |
43 | if __name__ == "__main__": |
44 | for p in range(2): |
45 | p=Producer() |
46 | p.start() |
47 | |
48 | for c in range(3): |
49 | c=Consumer() |
50 | c.start() |
1 | import threading |
2 | |
3 | alist=None |
4 | con=threading.Condition() |
5 | |
6 | def doSet(): |
7 | if con.acquire(): |
8 | while alist is None: |
9 | con.wait() |
10 | alist.reverse() |
11 | con.release() |
12 | |
13 | def doPrint(): |
14 | if con.acquire(): |
15 | while alist is None: |
16 | con.wait() |
17 | for i in alist: |
18 | print(i) |
19 | con.release() |
20 | |
21 | def doCreate(): |
22 | global alist |
23 | if con.acquire(): |
24 | if alist is None: |
25 | alist=[i for i in range(10)] |
26 | con.notifyAll() |
27 | con.release() |
28 | |
29 | tset=threading.Thread(target=doSet,name='tset') |
30 | tprint=threading.Thread(target=doPrint,name='tprint') |
31 | tcreate=threading.Thread(target=doCreate,name='tcreate') |
32 | |
33 | tset.start() |
34 | tprint.start() |
35 | tcreate.start() |
Event类
Event是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个为False的标志,当调用set()时设为True,调用clear()时重置为False。wait()将阻塞线程至等待阻塞状态。
实例方法:
- isSet(): 当内置标志为True时返回True
- set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态
- clear(): 将标志设为False
- wait([timeout]): 如果标志True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()
1 | import threading |
2 | import time |
3 | |
4 | event=threading.Event() |
5 | |
6 | def action(): |
7 | print('%s wait for event...' % threading.currentThread().getName()) |
8 | event.wait() |
9 | if event.isSet(): |
10 | print('%s recv event.' % threading.currentThread().getName()) |
11 | |
12 | t1=threading.Thread(target=action) |
13 | t2=threading.Thread(target=action) |
14 | t1.start() |
15 | t2.start() |
16 | time.sleep(2) |
17 | |
18 | print('MainThread set event') |
19 | event.set() |
timer类
Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。
构造方法:
Timer(interval,function,args=[],kwarg={})
- interval:指定的时间
- function:要执行的方法
- args/kwargs:方法的参数
1 | import threading |
2 | |
3 | def func(): |
4 | print('hello timer!') |
5 | |
6 | timer=threading.Timer(5,func) |
7 | timer.start() |
local类
local是一个小写字母开头的类,用于管理thread-local(线程局部)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。
1 | import threading |
2 | |
3 | local=threading.local() |
4 | local.tname='main' |
5 | |
6 | def func(): |
7 | local.tname='notmain' |
8 | print(local.tname) |
9 | |
10 | t1=threading.Thread(target=func) |
11 | t1.start() |
12 | t1.join() |
13 | |
14 | print(local.tname) |