多线程与多进程
大约 11 分钟
GIL
# GIL:Global Interpreter Lock,全局解释器锁
# 它让python在同一时刻只有一个线程运行在一个CPU上去执行字节码,无法发挥多核的优势
# 但它并不是线程安全的
import threading
counter = 0
def inc():
global counter
for i in range(1000000):
counter += 1
def dec():
global counter
for i in range(1000000):
counter -= 1
# gil并不是一直占有锁,它会在某些条件下释放,例如,时间片、执行代码的行数或者遇到IO操作等
thread1 = threading.Thread(target=inc)
thread2 = threading.Thread(target=dec)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# 打印出的值是随机的
print(counter)
threading
# python中threading的使用方法,大致与java的thread用法类似
import time
import threading
def executer1():
print("开始执行任务一")
time.sleep(2)
print("任务一执行结束")
def executer2():
print("开始执行任务二")
time.sleep(3)
print("任务二执行结束")
thread1 = threading.Thread(target=executer1, args=())
thread2 = threading.Thread(target=executer2, args=())
# 当设为守护进程时,主线程执行结束后,不会等待子线程执行完,而是立即将它们kill掉
# thread1.setDaemon(True)
# thread2.setDaemon(True)
start = time.time()
thread1.start()
thread2.start()
# 与java一样,等待子线程执行结束主线程才能结束
thread1.join()
thread2.join()
print("主线程结束,耗时:", time.time() - start)
# 也可以通过继承Thread实现多线程
class MyThread1(threading.Thread):
def __init__(self, name, func):
super().__init__(name=name)
self.name = name
self.func = func
# 需要重载run()方法
def run(self):
print("MyThread1开始:", self.name)
self.func()
print("MyThread1结束:", self.name)
class MyThread2(threading.Thread):
def __init__(self, name, func):
super().__init__(name=name)
self.name = name
self.func = func
def run(self):
print("MyThread2开始:", self.name)
self.func()
print("MyThread2结束:", self.name)
myThread1 = MyThread1("线程1", executer1)
myThread2 = MyThread2("线程2", executer2)
myThread1.start()
myThread2.start()
Queue
import time
import threading
# def executer1():
# global task
# while True:
# if len(task):
# content = task.pop()
# print("执行任务 {content}".format(content=content))
# time.sleep(0.5)
#
# def executer2():
# global task
# while True:
# print("开始执行任务二")
# time.sleep(1)
# for i in range(10):
# task.append(i)
# print("任务二执行结束")
#
# # 线程间的通信方式
# # 1. 共享变量
# # 2. 通过参数传递,例如 executer1(task)、executer2(task)
# task = []
# thread2 = threading.Thread(target=executer2, args=())
# thread2.start()
# for i in range(10):
# thread = threading.Thread(target=executer1, args=())
# thread.start()
# 3. 队列
# Python中的Queue类似于Java中的ArrayBlockQueue
from queue import Queue
task_queue = Queue(maxsize=1000)
def executer3(task_queue):
while True:
# 从队列中获取任务,如果队列为空,则阻塞等待
# 完整的参数是这样的:get(self, block=True, timeout=None)
content = task_queue.get()
print("执行任务{content}".format(content=content))
time.sleep(0.5)
def executer4(task_queue):
while True:
time.sleep(1)
for i in range(10):
print("放入任务{i}".format(i=i))
# 放入队列,如果队列满了,则阻塞等待
# 完整的参数是这样的:put(self, item, block=True, timeout=None)
task_queue.put(i)
time.sleep(0.5)
for i in range(10):
thread = threading.Thread(target=executer3, args=(task_queue, ))
thread.start()
thread2 = threading.Thread(target=executer4, args=(task_queue, ))
thread2.start()
Lock和RLock
from threading import Lock, RLock
counter = 0
lock = Lock()
# 在同一个线程里面,可以连续调用多次rlock.acquire(),但它的次数要和rlock.release()相等
rlock = RLock()
# 使用Lock
def inc1():
global counter
global lock
for i in range(1000000):
# 获取锁
lock.acquire()
# 故意造成死锁
# lock.acquire()
counter += 1
# 释放锁
lock.release()
def dec1():
global counter
global lock
for i in range(1000000):
# 获取锁
lock.acquire()
counter -= 1
# 释放锁
lock.release()
def doingLogic(rlock):
# 如果使用lock,可能会发生死锁,需要使用rlock
# lock.acquire()
rlock.acquire()
print("doingLogic......")
rlock.release()
# 使用RLock
def inc2(rlock):
global counter
for i in range(1000000):
# 获取可重入锁
rlock.acquire()
# doingLogic(rlock)
counter += 1
# 释放可重入锁
rlock.release()
def dec2(rlock):
global counter
for i in range(1000000):
# 获取可重入锁
rlock.acquire()
counter -= 1
# 释放可重入锁
rlock.release()
# 使用锁会影响性能,还可能引起死锁
# inc1()
# dec1()
# print("counter1 ==>", counter)
# 如果使用lock会发生死锁,需要使用可重入锁RLock
inc2(rlock)
dec2(rlock)
print("counter2 ==>", counter)
Condition
'''
Condition 可以用于复杂的线程间同步的条件变量
《不良人·第六季·第八集》片头对话:
将臣:《九幽玄天神功》又名《我好恨啊,我要杀了你》功,或者《有没有人再爱我一次》功
李星云: 很难想象尸祖当年都经历了什么
将臣:不要问!姓李的,我知道你在想什么,但...
李星云:但天生万物各具其性,我想让我修炼此功法以阴调阳,这法子可行
将臣:这法子虽然可行,但...
李星云:但风险极高
将臣:你以为你懂得很多吗?我可不止这一种法子能救你...
李星云:我明白,您还可以找我采阳补阴...这种老牛吃嫩草的事还是尽量别做的好...
将臣:你可知我平生最讨厌什么?我最讨厌...
李星云:别人打断你...
将臣:姓李的,你以为你又猜对了吧,其实我最讨厌的是...
李星云:有人猜到你要说的话
将臣:小姑娘,你可知他的身份...
李星云:前辈心思缜密,如滔滔江水一浪一浪又一浪,一浪更比一浪强...
将臣:你不是很能猜吗?接着猜呀...
李星云:刚刚都是巧合,这回我可真猜不出来。不会是帮您找到其他尸......祖~~~~~~
'''
import threading
from threading import Condition
class Jiangchen(threading.Thread):
def __init__(self, condition):
super().__init__(name='将臣')
self.condition = condition
def run(self):
with self.condition:
print('%s: 《九幽玄天神功》又名《我好恨啊,我要杀了你》功,或者《有没有人再爱我一次》功\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 不要问!姓李的,我知道你在想什么,但...\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 这法子虽然可行,但...\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 你以为你懂得很多吗?我可不止这一种法子能救你...\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 你可知我平生最讨厌什么?我最讨厌...\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 姓李的,你以为你又猜对了吧,其实我最讨厌的是...\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 小姑娘,你可知他的身份...\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 你不是很能猜吗?接着猜呀...\n' % self.name)
self.condition.notify()
self.condition.wait()
class Lixingyun(threading.Thread):
def __init__(self, condition):
super().__init__(name='李星云')
self.condition = condition
def run(self):
# 也可以通这种方式调用
# 获取锁
self.condition.acquire()
# with self.condition:
self.condition.wait()
print('%s: 很难想象尸祖当年都经历了什么\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 但天生万物各具其性,我想让我修炼此功法以阴调阳,这法子可行\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 但风险极高\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 我明白,您还可以找我采阳补阴...这种老牛吃嫩草的事还是尽量别做的好...\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 别人打断你...\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 有人猜到你要说的话\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 前辈心思缜密,如滔滔江水一浪一浪又一浪,一浪更比一浪强...\n' % self.name)
self.condition.notify()
self.condition.wait()
print('%s: 刚刚都是巧合,这回我可真猜不出来。不会是帮您找到其他尸......祖~~~~~~\n' % self.name)
self.condition.notify()
# 释放锁
self.condition.release()
cond = Condition()
jiangchen = Jiangchen(cond)
lixingyun = Lixingyun(cond)
# 使用Condition时,线程的启动顺序很重要
# 下面两个线程如果启动顺序相反,就会发生死锁
# 而且只有在调用 with 或者 acquire() 之后才能调用 notify() 或 wait()
lixingyun.start()
jiangchen.start()
Semaphore信号量
# 和java中的semaphore用法类似,用于控制进入锁的线程数量
# 实现python版的叫号器
import threading
import time
semaphore = threading.Semaphore(3)
class Customer(threading.Thread):
def __init__(self, semaphore):
super().__init__()
self.semaphore = semaphore
def run(self):
time.sleep(2)
print('顾客就餐完毕')
time.sleep(0.5)
# 释放一个信号量
self.semaphore.release()
class CallNumber(threading.Thread):
def __init__(self, semaphore):
threading.Thread.__init__(self)
self.semaphore = semaphore
def run(self):
for i in range(20):
# 获取信号量
self.semaphore.acquire()
print(i, '号顾客准备就餐')
customer = Customer(semaphore)
customer.start()
callnumber = CallNumber(semaphore)
callnumber.start()
线程池
# 线程池,futures可以让多线程和多进程的的编码接口一致
import random
import time
from concurrent import futures
# 创建一个线程池
executor = futures.ThreadPoolExecutor(max_workers=3)
def task(times):
time.sleep(times)
print("task has completed in {} seconds".format(times))
# 返回一个随机数
return random.randint(1, 10)
# 将任务提交到线程池
result1 = executor.submit(task, 1)
result2 = executor.submit(task, 3)
while True:
if result1.done():
print("task1 is done")
print(result1.result())
break
while True:
if result2.done():
print("task2 is done")
print(result2.result())
break
# 也可以用列表推导式批量提交任务
result_list = [executor.submit(task, i) for i in range(1, 10)]
# 强制主线程等待所有的 result_list 执行完才能接着往下执行
# 而且执行过程中,连返回都被暂停输出了,必须是 futures.wait()全部完成之后才会统一输出返回值
# 增加 return_when=futures.FIRST_COMPLETED 表示当第一个任务完成时主线程就可以往下执行了
futures.wait(result_list, return_when=futures.FIRST_COMPLETED)
print("all task is done")
# 通过 as_completed 遍历已经完成的线程任务结果,谁先完成就处理谁
for future in futures.as_completed(result_list):
print(future.result())
print("task {} is as_completed".format(future))
# 也可以用 executor 获取已经完成的任务
# map() 方法等同于循环调用 pool.submit() 方法但是它的输出和 as_completed() 不太一样
results = executor.map(task, range(1, 10))
for result in results:
print(result)
print("task {} is map".format(result))
多进程
# 因为有GIL锁的存在,所以多线程无法利用多CPU的优势,因此才有了多进程编程,这样才能充分利用多CPU并发的能力
# 对于IO来说,进程的切换代价要高于线程,因此在IO密集型的应用中,多线程比多进程性能更高
from concurrent import futures
import time
# 对于计算类型的任务,主要是以CPU的计算为主
# 定义一个斐波拉契函数
def fib(n):
if n < 2:
return n
else:
return fib(n-1) + fib(n-2)
# 模拟IO操作
def io_task(n):
time.sleep(n)
return n
if __name__ == '__main__':
# 对于CPU计算任务来说,多进程优于多线程
# with futures.ThreadPoolExecutor(max_workers=3) as executor:
# result_list = [executor.submit(fib, i) for i in range(25, 35)]
# start = time.time()
# for future in futures.as_completed(result_list):
# print(future.result())
# print('多线程计算耗时:', time.time() - start)
#
# with futures.ProcessPoolExecutor(max_workers=3) as executor:
# result_list = [executor.submit(fib, i) for i in range(25, 35)]
# start = time.time()
# for future in futures.as_completed(result_list):
# print(future.result())
# print('多进程计算耗时:', time.time() - start)
# 而对于IO操作来说,多线程优于多进程
with futures.ThreadPoolExecutor(max_workers=3) as executor:
result_list = [executor.submit(io_task, i) for i in [2] * 30]
start = time.time()
for future in futures.as_completed(result_list):
print(future.result())
print('多线程IO耗时:', time.time() - start)
with futures.ProcessPoolExecutor(max_workers=3) as executor:
result_list = [executor.submit(io_task, i) for i in [2] * 30]
start = time.time()
for future in futures.as_completed(result_list):
print(future.result())
print('多进程IO耗时:', time.time() - start)
# 多进程编程示例
import multiprocessing
import os
import time
# # fork 会新建子进程
# # 因此这段代码会在两个进程中同时执行
# pid = os.fork()
# print('lixingyun')
# if pid == 0:
# print('子进程 pid:', os.getpid())
# print('父进程 pid:', os.getppid())
# else:
# print('父进程 pid:', pid)
# time.sleep(2)
# 多进程编程首选 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
# 不推荐使用 Process
from multiprocessing import Process
def task(n):
time.sleep(n)
print('task进程执行完成')
return n
if __name__ == '__main__':
# progress = Process(target=task, args=(2, ))
# progress.start()
# print(progress.pid)
# progress.join()
# print('main进程执行完成')
#
# # 使用进程池
pool = multiprocessing.Pool(processes=3)
# result = pool.apply_async(task, args=(2, ))
#
# # 等待所有任务完成
# pool.close()
# pool.join()
# print(result.get())
# # imap 和 线程池的 map 方法类似:任务会按照输入的顺序完成
# for result in pool.imap(task, [1, 3, 2]):
# print("{} sleep".format(result))
# imap_unordered:谁先完成就打印谁
for result in pool.imap_unordered(task, [1, 3, 2]):
print("{} sleep".format(result))
进程间通信
import time
from multiprocessing import Process, Queue, Pool, Manager, Pipe
# 使用线程的Queue会报错
# from queue import Queue
queue = Queue(10)
def producer1(queue):
for i in range(10):
queue.put(i)
def consumer1(queue):
while True:
if queue.empty():
break
print(queue.get(), end='\t')
# 全局变量不适用于多进程,因为一个进程无法读取另一个进程的共享全局变量
# 这种方式只适用于多线程
def producer2(vara):
for i in range(10):
vara += 1
def consumer2(vara):
time.sleep(2)
print(vara)
# 通过Pipe实现进程间的通信
def producer3(send_pipe):
send_pipe.send('lixingyun')
def consumer3(receive_pipe):
time.sleep(2)
print(receive_pipe.recv(), end='\t')
def increment(data_dict, key, value):
data_dict[key] = value
print(data_dict)
if __name__ == '__main__':
# 使用线程的Queue无法实现进程间的通信,会抛出异常:TypeError: cannot pickle '_thread.lock' object
producer_process1 = Process(target=producer1, args=(queue,))
consumer_process1 = Process(target=consumer1, args=(queue,))
producer_process1.start()
consumer_process1.start()
producer_process1.join()
consumer_process1.join()
print()
# 使用全局共享变量来通信
a = 1
producer_process2 = Process(target=producer2, args=(a,))
consumer_process2 = Process(target=consumer2, args=(a,))
producer_process2.start()
consumer_process2.start()
producer_process2.join()
consumer_process2.join()
print()
# multiprocessing 中的 Queue 不能用于 Pool 进程池
pool = Pool(processes=2)
pool.apply_async(producer1, (queue,))
pool.apply_async(consumer1, (queue,))
pool.close()
# 没有任何输出
pool.join()
print()
# 进程间通信需要使用 multiprocessing 中的 Manager.Queue()
queue = Manager().Queue(10)
pool = Pool(processes=2)
pool.apply_async(producer1, (queue,))
pool.apply_async(consumer1, (queue,))
pool.close()
# 有了输出
pool.join()
print()
# 因此总共有三个Queue
# from queue import Queue # 线程的Queue,实现线程间通信
# from multiprocessing import Queue # 进程的Queue,无法实现进程间通信
# from multiprocessing import Mamager # 进程的Queue,可以实现进程间通信
# Manager().Queue()
# 另一种进程间通信的方式:Pipe
# Pipe只能用于两个进程之间发送数据
# Pipe的性能高于Queue
receive_pipe, send_pipe = Pipe()
producer_process3 = Process(target=producer3, args=(send_pipe, ))
consumer_process3 = Process(target=consumer3, args=(receive_pipe, ))
producer_process3.start()
consumer_process3.start()
producer_process3.join()
consumer_process3.join()
print()
# 其他进程间通信的方式
'''
除了 dict,还有下面这些类型的共享变量
def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...
def Condition(self, lock: Any = ...) -> threading.Condition: ...
def Event(self) -> threading.Event: ...
def Lock(self) -> threading.Lock: ...
def Namespace(self) -> _Namespace: ...
def Queue(self, maxsize: int = ...) -> queue.Queue[Any]: ...
def RLock(self) -> threading.RLock: ...
def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...
def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...
def Value(self, typecode: Any, value: _T) -> ValueProxy[_T]: ...
def list(self) -> ListProxy[Any]: ...
'''
process_dict = Manager().dict()
first_process = Process(target=increment, args=(process_dict, 'first', 1))
second_process = Process(target=increment, args=(process_dict, 'second', 2))
third_process = Process(target=increment, args=(process_dict, 'third', 3))
first_process.start()
second_process.start()
third_process.start()
first_process.join()
second_process.join()
third_process.join()
感谢支持
更多内容,请移步《超级个体》。