python3-cookbook笔记:第十二章 并发编程,
python3-cookbook笔记:第十二章 并发编程,
12.1 启动与停止线程
import time from threading import Thread class CountdownTask: def __init__(self): self._running = True def terminate(self): self._running = False def run(self, n): while self._running and n > 0: print('T-minus', n) n -= 1 time.sleep(5) c = CountdownTask() t = Thread(, args=(10,)) t.start() # 主动终止线程 c.terminate() # 等待线程终止 t.join()
12.3 线程间通信
from queue import Queue from threading import Thread # 队列终止标志 _sentinel = object() def producer(out_q): while True: # 数据处理 ... # 向队列中添加数据 out_q.put(data) out_q.put(_sentinel) def consumer(in_q): while True: # 从队列获取数据 data = in_q.get() # 判断队列是否结束 if data is _sentinel: in_q.put(_sentinel) break # 数据处理 ... q = Queue() t1 = Thread(target=consumer, args=(q,)) t2 = Thread(target=producer, args=(q,)) t1.start() t2.start()
import queue q = queue.Queue() try: data = q.get(block=False) except queue.Empty: ... try: data = q.get(timeout=5.0) except queue.Empty: ... try: q.put(item, block=False) except queue.Full: ...
12.4 给关键部分加锁
12.5 防止死锁的加锁机制
哲学家就餐问题:五位哲学家围坐在一张桌子前,每个人 面前有一个碗饭和一只筷子。在这里每个哲学家可以看做是一个独立的线程,而每只筷子可以看做是一个锁。每个哲学家可以处在静坐、 思考、吃饭三种状态中的一个。需要注意的是,每个哲学家吃饭是需要两只筷子的,这样问题就来了:如果每个哲学家都拿起自己左边的筷子, 那么他们五个都只能拿着一只筷子坐在那儿,直到饿死。此时他们就进入了死锁状态。
import threading from contextlib import contextmanager # 线程运行时,local()返回的实例会为每个线程创建一个属于它自己的本地存储,不同线程的本地存储互不影响,且互不可见 _local = threading.local() # 利用上下文管理器和锁的id值进行排序来控制锁的分配 @contextmanager def acquire(*locks): # Sort locks by object identifier locks = sorted(locks, key=lambda x: id(x)) # 每个线程第一次运行到这儿时,结果都是空列表 acquired = getattr(_local, 'acquired', []) if acquired and max(id(lock) for lock in acquired) >= id(locks[0]): raise RuntimeError('Lock Order Violation') # 为线程的本地存储添加一个列表,存储所有锁的id值 acquired.extend(locks) _local.acquired = acquired try: for lock in locks: lock.acquire() yield finally: # Release locks in reverse order of acquisition for lock in reversed(locks): lock.release() del acquired[-len(locks):] # 5个哲学家就餐问题实现 # The philosopher thread def philosopher(left, right): while True: with acquire(left, right): print(threading.currentThread(), 'eating') # The chopsticks (represented by locks) NSTICKS = 5 chopsticks = [threading.Lock() for n in range(NSTICKS)] # Create all of the philosophers for n in range(NSTICKS): t = threading.Thread(target=philosopher, args=(chopsticks[n], chopsticks[(n + 1) % NSTICKS])) t.start()
12.6 保存线程的状态信息
import threading from functools import partial from socket import socket, AF_INET, SOCK_STREAM class LazyConnection: def __init__(self, address, family=AF_INET, type=SOCK_STREAM): self.address = address = AF_INET self.type = SOCK_STREAM self.local = threading.local() def __enter__(self): if hasattr(self.local, 'sock'): raise RuntimeError('Already connected') self.local.sock = socket(, self.type) self.local.sock.connect(self.address) return self.local.sock def __exit__(self, exc_ty, exc_val, tb): self.local.sock.close() del self.local.sock def test(conn): with conn as s: s.send(b'GET /index.html HTTP/1.0\r\n') s.send(b'Host:\r\n') s.send(b'\r\n') resp = b''.join(iter(partial(s.recv, 8192), b'')) print('Got {} bytes'.format(len(resp))) if __name__ == '__main__': conn = LazyConnection(('', 80)) t1 = threading.Thread(target=test, args=(conn,)) t2 = threading.Thread(target=test, args=(conn,)) t1.start() t2.start() t1.join() t2.join()
12.7 创建一个线程池
如果程序中需要使用到线程池,或者需要线程所执行函数的返回结果时,可以考虑使用from concurrent.futures import ThreadPoolExecutor。
import urllib.request from concurrent.futures import ThreadPoolExecutor def fetch_url(url): u = urllib.request.urlopen(url) data = return data # 创建线程池对象,并允许同时运行10个线程 pool = ThreadPoolExecutor(10) # 传入线程要执行的函数,以及它的参数 a = pool.submit(fetch_url, '') b = pool.submit(fetch_url, '') # 获取线程执行结果时,会阻塞当前线程,直到该线程执行完毕并返回结果 x = a.result() y = b.result()
12.8 简单的并行编程
# 使用map批量执行 from concurrent.futures import ProcessPoolExecutor def work(x): ... return result # 普通做法 # results = map(work, data) # 利用CPU多核特点 with ProcessPoolExecutor() as pool: results =, data)
from concurrent.futures import ProcessPoolExecutor def work(x): ... return result def when_done(r): print('Got:', r.result()) with ProcessPoolExecutor() as pool: ... # 单独执行某个函数 future_result = pool.submit(work, arg) # 在使用result()获取结果时,当前程序会被阻塞,直到产生结果 r = future_result.result() ... # 单独执行如果不想被阻塞,可以使用add_done_callback指定一个回调函数 # 这个函数接受一个Future实例参数,可以在回调函数中获取执行结果 future_result.add_done_callback(when_done)
- 暂无相关文章