10.并发编程

予早 2024-10-05 11:39:22
Categories: Tags:

并发编程

进程

进程创建

进程类

import os
import time
from multiprocessing import Process


def func(arg1, arg2, arg3):  # 元组元素与函数参数对应
    time.sleep(3)
    print('p1', os.getppid(), os.getpid())
    print(arg1, arg2, arg3)


class ProcessDemo(Process):
    def run(self) -> None:
        time.sleep(3)
        print('p2', os.getppid(), os.getpid())


if __name__ == "__main__":
    print('main', os.getppid(), os.getpid())  # 任务管理器详细信息可以查看PID
    p1 = Process(target=func, args=('rm', '-rf', '*'))
    p1.start()
    p2 = ProcessDemo()
    p2.start()
    [p.join() for p in [p1, p2]]
# main 4948 13156
# p1 13156 3696
# rm -rf *
# p2 13156 17000

注:不能将进程启动代码直接放在模块全局,应当放在if __name__ == '__main__'或其他函数中。

进程池

import os
import time
from multiprocessing import Pool


def func1(n):
    print('in func1', n, os.getpid())
    return n


def func2(n):
    print('in func2', n * n, os.getpid())
    return n * n


if __name__ == "__main__":
    print('in main', os.getpid())
    p = Pool(5)
    for i in range(10):
        p.apply(func1, args=(i,))  # 同步提交
        p.apply_async(func1, args=(i,), callback=func2)  # 异步提交,回调是在主进程调用的
    p.close()  # 关闭任务接受
    p.join()  # 同步进程任务结束
    # p.terminate()

进程通信

同步原语

信号量
from multiprocessing import Semaphore

sem = Semaphore(4)
sem.acquire()  # 申请
sem.acquire()  # 申请
sem.acquire()  # 申请
sem.acquire()  # 申请
sem.acquire()  # 申请 此处卡住
sem.release()  # 释放
事件

基本使用

from multiprocessing import Event

e = Event()
print(e.is_set())
e.set()
print(e.is_set())
e.clear()
print(e.is_set())
e.wait(3)
print("12345")

红绿灯

import random
import time
from multiprocessing import Event, Process


def car_process(e, i):
    if not e.is_set():
        print("car%i在等待" % i)
        e.wait()
    print("\033[0;32;40mcar%i通过\033[0m" % i)


def light_process(e):
    while True:
        if not e.is_set():
            time.sleep(5)
            e.set()
            print("\033[32m绿灯亮了\033[0m")
        else:
            time.sleep(2)
            e.clear()
            print("\033[31m红灯亮了\033[0m")


if __name__ == "__main__":
    e = Event()
    traffic = Process(target=light_process, args=(e,))
    traffic.start()
    for i in range(20):
        car = Process(target=car_process, args=(e, i))
        car.start()
        time.sleep(random.random())

消息机制

Queue
from multiprocessing import Queue, Process


def producer(q):
    while True:
        q.put("food")
        print('put food')


def consumer(q):
    while True:
        q.get()
        print('get food')


if __name__ == "__main__":
    q = Queue()
    p = Process(target=producer, args=(q,))
    c = Process(target=consumer, args=(q,))
    p.start()
    c.start()
JoinableQueue
import time
import random
from multiprocessing import JoinableQueue, Process


def producer(q, name, food):
    for i in range(5):
        time.sleep(random.random())
        fd = '%s(%s)' % (food, i + 1)
        q.put(fd)
        print('%s生产了一个%s' % (name, food))
    q.join()  # 生产者等待队列中消费完毕
    print('生产完毕')


def consumer(q, name):
    while True:
        food = q.get()
        time.sleep(random.random())
        print('%s吃了%s' % (name, food))
        q.task_done()


if __name__ == '__main__':
    jq = JoinableQueue()
    p = Process(target=producer, args=(jq, '厨师', '包子'))
    p.start()
    c = Process(target=consumer, args=(jq, '顾客'), daemon=True)
    c.start()
    p.join()

Manger,管理器

from multiprocessing import Manager, Process


def func(dic):
    dic['count'] -= 1
    print(dic)


if __name__ == "__main__":
    m = Manager()
    dic = m.dict({'count': 100})
    p = Process(target=func, args=(dic,))
    p.start()
    p.join()
    print("主进程", dic)

线程

GIL,Global Interpreter Lock,全局解释器锁
CPython解释器中,GIL制约同一时刻某一进程下多个线程只能有一个线占用CPU,在CPython中计算密集型任务不适合多线程

线程创建

import os
import time
from threading import Thread


def func(n):
    time.sleep(1)
    print(n,os.getpid())

for i in range(10):
    t = Thread(target=func, args=(i,))
    t.start()
import time
from threading import Thread


class ThreadDemo(Thread):
    def __init__(self, args):
        super().__init__()
        self.args = args

    def run(self) -> None:
        time.sleep(1)
        print(self.args)


t = ThreadDemo((1, 2))
t.start()

全局变量在多个线程之间共享

import os
from threading import Thread


def func(a, b):
    global g
    g += a
    print(g, os.getpid())


g = 100
f = g
t_list = []
for i in range(10):
    t = Thread(target=func, args=(i, 5))
    t.start()
    t_list.append(t)
for t in t_list: t.join()
print(g)
print(g, f)

线程通信

Condition

from threading import Condition, Thread


def func(condition, i):
    condition.acquire()
    condition.wait()
    print("在第%s个循环里" % i)
    condition.release()


condition = Condition()
for i in range(10):
    Thread(target=func, args=(condition, i)).start()
while True:
    num = int(input(">>> "))
    condition.acquire()
    condition.notify(num)
    condition.release()
from threading import Semaphore, Event, Lock, RLock, Condition

Timer

计时器

import time
from threading import Timer


def func():
    print("func")


start = time.perf_counter()
t = Timer(1, func)  # 一秒后执行func
t.start()
print(time.perf_counter() - start)

concurrent

ThreadPoolExecutor

import time
from concurrent.futures import ThreadPoolExecutor


def func(n):
    time.sleep(2)
    return n * n


def call_back(n):
    print("结果是" + n.result().__str__())


tpool = ThreadPoolExecutor()
t_list = []
for i in range(20):
    t = tpool.submit(func, i)
    t.add_done_callback(call_back)
    t_list.append(t)

# tpool.shutdown()#可以不等部分获取结果
print("主线程")
for t in t_list:
    print(t.result())

协程

强烈推荐:https://www.bilibili.com/video/BV1sS4y1b7qb/?spm_id_from=333.999.0.0&vd_source=1c6bbc08f3ca11adc6d983810711f9ca

协程由用户进行切换,该过程对操作系统透明

yield

import time


def eat():
    while True:
        print("eat")
        yield
        time.sleep(0.5)


def drink():
    while True:
        print("drink")
        yield
        time.sleep(0.5)


if __name__ == "__main__":
    e = eat()
    d = drink()
    while True:
        e.__next__()
        d.__next__()

greenlet

from greenlet import greenlet


def eat():
    while True:
        print("eat")
        gl2.switch()


def drink():
    while True:
        print("drink")
        gl1.switch()


if __name__ == "__main__":
    gl1 = greenlet(run=eat)
    gl2 = greenlet(run=drink)
    gl1.switch()  # 没有运行会先运行run

gevent

import threading
import time

import gevent
from gevent import monkey

monkey.patch_all()


def eat():
    while True:
        print("eat", threading.current_thread().name, threading.current_thread())
        time.sleep(0.5)


def drink():
    while True:
        print("drink", threading.current_thread().name, threading.current_thread())
        time.sleep(0.5)


if __name__ == "__main__":
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(drink)
    g1.join()
    g2.join()