并发编程
进程
进程创建
进程类
import os
import time
from multiprocessing import Process
def func(arg1, arg2, arg3): # 元组元素与函数参数对应
time.sleep(3)
print('p1', os.getppid(), os.getpid())
print(arg1, arg2, arg3)
class ProcessDemo(Process):
def run(self) -> None:
time.sleep(3)
print('p2', os.getppid(), os.getpid())
if __name__ == "__main__":
print('main', os.getppid(), os.getpid()) # 任务管理器详细信息可以查看PID
p1 = Process(target=func, args=('rm', '-rf', '*'))
p1.start()
p2 = ProcessDemo()
p2.start()
[p.join() for p in [p1, p2]]
# main 4948 13156
# p1 13156 3696
# rm -rf *
# p2 13156 17000
注:不能将进程启动代码直接放在模块全局,应当放在if __name__ == '__main__'或其他函数中。
进程池
import os
import time
from multiprocessing import Pool
def func1(n):
print('in func1', n, os.getpid())
return n
def func2(n):
print('in func2', n * n, os.getpid())
return n * n
if __name__ == "__main__":
print('in main', os.getpid())
p = Pool(5)
for i in range(10):
p.apply(func1, args=(i,)) # 同步提交
p.apply_async(func1, args=(i,), callback=func2) # 异步提交,回调是在主进程调用的
p.close() # 关闭任务接受
p.join() # 同步进程任务结束
# p.terminate()
进程通信
同步原语
信号量
from multiprocessing import Semaphore
sem = Semaphore(4)
sem.acquire() # 申请
sem.acquire() # 申请
sem.acquire() # 申请
sem.acquire() # 申请
sem.acquire() # 申请 此处卡住
sem.release() # 释放
事件
基本使用
from multiprocessing import Event
e = Event()
print(e.is_set())
e.set()
print(e.is_set())
e.clear()
print(e.is_set())
e.wait(3)
print("12345")
红绿灯
import random
import time
from multiprocessing import Event, Process
def car_process(e, i):
if not e.is_set():
print("car%i在等待" % i)
e.wait()
print("\033[0;32;40mcar%i通过\033[0m" % i)
def light_process(e):
while True:
if not e.is_set():
time.sleep(5)
e.set()
print("\033[32m绿灯亮了\033[0m")
else:
time.sleep(2)
e.clear()
print("\033[31m红灯亮了\033[0m")
if __name__ == "__main__":
e = Event()
traffic = Process(target=light_process, args=(e,))
traffic.start()
for i in range(20):
car = Process(target=car_process, args=(e, i))
car.start()
time.sleep(random.random())
消息机制
Queue
from multiprocessing import Queue, Process
def producer(q):
while True:
q.put("food")
print('put food')
def consumer(q):
while True:
q.get()
print('get food')
if __name__ == "__main__":
q = Queue()
p = Process(target=producer, args=(q,))
c = Process(target=consumer, args=(q,))
p.start()
c.start()
JoinableQueue
import time
import random
from multiprocessing import JoinableQueue, Process
def producer(q, name, food):
for i in range(5):
time.sleep(random.random())
fd = '%s(%s)' % (food, i + 1)
q.put(fd)
print('%s生产了一个%s' % (name, food))
q.join() # 生产者等待队列中消费完毕
print('生产完毕')
def consumer(q, name):
while True:
food = q.get()
time.sleep(random.random())
print('%s吃了%s' % (name, food))
q.task_done()
if __name__ == '__main__':
jq = JoinableQueue()
p = Process(target=producer, args=(jq, '厨师', '包子'))
p.start()
c = Process(target=consumer, args=(jq, '顾客'), daemon=True)
c.start()
p.join()
Manger,管理器
from multiprocessing import Manager, Process
def func(dic):
dic['count'] -= 1
print(dic)
if __name__ == "__main__":
m = Manager()
dic = m.dict({'count': 100})
p = Process(target=func, args=(dic,))
p.start()
p.join()
print("主进程", dic)
线程
GIL,Global Interpreter Lock,全局解释器锁
CPython解释器中,GIL制约同一时刻某一进程下多个线程只能有一个线占用CPU,在CPython中计算密集型任务不适合多线程
线程创建
import os
import time
from threading import Thread
def func(n):
time.sleep(1)
print(n,os.getpid())
for i in range(10):
t = Thread(target=func, args=(i,))
t.start()
import time
from threading import Thread
class ThreadDemo(Thread):
def __init__(self, args):
super().__init__()
self.args = args
def run(self) -> None:
time.sleep(1)
print(self.args)
t = ThreadDemo((1, 2))
t.start()
全局变量在多个线程之间共享
import os
from threading import Thread
def func(a, b):
global g
g += a
print(g, os.getpid())
g = 100
f = g
t_list = []
for i in range(10):
t = Thread(target=func, args=(i, 5))
t.start()
t_list.append(t)
for t in t_list: t.join()
print(g)
print(g, f)
线程通信
Condition
from threading import Condition, Thread
def func(condition, i):
condition.acquire()
condition.wait()
print("在第%s个循环里" % i)
condition.release()
condition = Condition()
for i in range(10):
Thread(target=func, args=(condition, i)).start()
while True:
num = int(input(">>> "))
condition.acquire()
condition.notify(num)
condition.release()
from threading import Semaphore, Event, Lock, RLock, Condition
Timer
计时器
import time
from threading import Timer
def func():
print("func")
start = time.perf_counter()
t = Timer(1, func) # 一秒后执行func
t.start()
print(time.perf_counter() - start)
concurrent
ThreadPoolExecutor
import time
from concurrent.futures import ThreadPoolExecutor
def func(n):
time.sleep(2)
return n * n
def call_back(n):
print("结果是" + n.result().__str__())
tpool = ThreadPoolExecutor()
t_list = []
for i in range(20):
t = tpool.submit(func, i)
t.add_done_callback(call_back)
t_list.append(t)
# tpool.shutdown()#可以不等部分获取结果
print("主线程")
for t in t_list:
print(t.result())
协程
协程由用户进行切换,该过程对操作系统透明
yield
import time
def eat():
while True:
print("eat")
yield
time.sleep(0.5)
def drink():
while True:
print("drink")
yield
time.sleep(0.5)
if __name__ == "__main__":
e = eat()
d = drink()
while True:
e.__next__()
d.__next__()
greenlet
from greenlet import greenlet
def eat():
while True:
print("eat")
gl2.switch()
def drink():
while True:
print("drink")
gl1.switch()
if __name__ == "__main__":
gl1 = greenlet(run=eat)
gl2 = greenlet(run=drink)
gl1.switch() # 没有运行会先运行run
gevent
import threading
import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
while True:
print("eat", threading.current_thread().name, threading.current_thread())
time.sleep(0.5)
def drink():
while True:
print("drink", threading.current_thread().name, threading.current_thread())
time.sleep(0.5)
if __name__ == "__main__":
g1 = gevent.spawn(eat)
g2 = gevent.spawn(drink)
g1.join()
g2.join()