相关概念
并发(Concurrency)和并行(Parallelism)是两个极容易混淆的概念,它们都涉及到同时处理多个任务的能力,但它们的工作原理和适用场景有所不同。
并发
并发是指一个系统能够在同一时间段内处理多个任务的能力。这些任务可能不是真正“同时”执行,而是通过快速切换(如任务1执行0.01秒,切换到任务2执行0.02秒,再切换到任务1执行0.01秒)来给用户一种同时进行的错觉。
实现方式:可通过 多线程、异步 I/O 、事件循环等方式实现。
应用场景:适合 I/O 密集型 任务,如:网络请求、文件读写等。在这些情况下,任务可能会因为等待 I/O 操作完成而阻塞,此时可以切换到其他任务以提高资源利用率。
并行
并行是指多个 CPU 核心在同一时刻执行多个任务的能力。这些任务是真正“同时”执行,每个任务都有自己独立的执行环境。
实现方式:通常通过 多进程 或 多线程 结合 多核处理器 来实现。
应用场景:适合 CPU 密集型 任务,如:大规模数据处理、科学计算等。在这些情况下,并行可以显著缩短任务完成的时间。
多线程
多线程是指在同一个进程中创建多个线程来执行任务,每个线程可以想象为代码执行时的执行顺序流。所有线程共享同一个进程的资源,如:内存空间,文件描述符等,这使得线程间的通信更简单且成本更低。
资源共享:线程间共享全局数据和文件描述符,这使得线程间的通信变得简单。
轻量级:线程通常比进程更轻量级,创建和销毁线程的成本较低。
上下文切换开销较小:相比进程间的切换,线程间的上下文切换更快。
GIL锁限制:受限于 CPython 解释器 的 全局解释器锁(GIL),无法真正并行以充分利用多核 CPU 的能力,并不适合 CPU 密集型 任务。
适用于 I/O密集型 任务:如:网络请求、文件读写等,主要为等待 I/O 操作,多线程可有效利用等待时间来处理其他任务。
多线程执行
# -*- coding: utf-8 -*-
import time
import random
import threading
# 待执行函数:无参函数
def my_worker_1():
# 获取当前线程名
cur_trd_name = threading.current_thread().name
for i in range(10):
# 使用 flush=True 来强制刷新输出缓冲区
print( f"{cur_trd_name} | my_worker_1: {i+1}", flush=True )
# 随机暂停一段时间
time.sleep(random.uniform(0.5,2.0))
# 待执行函数:有参函数
def my_worker_2(num, name, age, score):
# 获取当前线程名
cur_trd_name = threading.current_thread().name
for i in range(num):
# 使用 flush=True 来强制刷新输出缓冲区
print( f"{cur_trd_name} | my_worker_2: name({name}) - age({age}) - score({score})", flush=True )
# 随机暂停一段时间
time.sleep(random.uniform(0.5,2.0))
if __name__ == "__main__":
# 声明线程对象,target=待调用的函数名(不带括号,带上括号表示调用执行函数取得返回结果)
my_trd_1 = threading.Thread(target=my_worker_1)
# 传参:args={tuple},元组中的参数与函数形参必须保持个数、顺序一致
my_trd_2 = threading.Thread(target=my_worker_2, args=(15, "juju", 18, 98))
# 传参:kwargs={dict},字典中的参数,key为函数形参名,value为函数实参值
my_trd_3 = threading.Thread(target=my_worker_2, kwargs={"age":20, "score":96, "num":20, "name":"zizi"})
# 混合传参:args=(10,) 当 args 只有一个参数时,后面要加逗号,是为了创建单元素元组
my_trd_4 = threading.Thread(target=my_worker_2, args=(10, ), kwargs={"name":"guagua", "age":20, "score":96})
# 启动线程(当调用 start() 时,才会真正创建线程,并开始执行)
my_trd_1.start()
my_trd_2.start()
my_trd_3.start()
my_trd_4.start()
# 获取当前程序中正在运行的所有线程信息 {list}
trd_info_list = threading.enumerate()
trd_length = len(trd_info_list)
print( f"当前运行的线程数为:{trd_length}" )
# 代码运行到此处,主线程的任务已完成
# 此时子线程的任务并不一定全完成了
# 但主线程默认会等待所有子线程结束后才结束
多线程消费任务队列
# -*- coding: utf-8 -*-
import time
import random
import queue
import threading
class ThreadQueue():
"""
# 功能:线程常用队列
"""
def __init__(self, queue_type="FIFO"):
"""
# 功能:线程常用队列 初始化
# 输入:
# queue_type: {str} 队列类型 ["FIFO", "LIFO", "Priority"],默认{"FIFO"}
# "FIFO": 先进先出队列
# "LIFO": 后进先出堆栈
# "Priority": 优先级队列
"""
self.queue_type = queue_type
if self.queue_type == "FIFO":
self.trd_queue = queue.Queue()
elif self.queue_type == "LIFO":
self.trd_queue = queue.LifoQueue()
elif self.queue_type == "Priority":
self.trd_queue = queue.PriorityQueue()
else:
raise ValueError( f"不支持的队列类型: {self.queue_type}\n当前仅支持 ['FIFO', 'LIFO', 'Priority']" )
print( f"已创建 {self.queue_type} 型队列" )
def is_empty(self):
"""
# 功能:判断队列是否为空
"""
return self.trd_queue.empty()
def queue_writer(self, writer_data, priority=None):
"""
# 功能:将数据存入队列
# 输入:
# writer_data: {str,int,float,bool,dict等} 待存入数据
# priority: {int} 仅在 优先级队列 中启用,数值越小,优先级越高, 越先取出
"""
if self.queue_type == "Priority":
if priority is None:
raise ValueError("优先级队列需要提供优先级参数 priority ")
self.trd_queue.put((priority, writer_data))
else:
self.trd_queue.put(writer_data)
def queue_reader(self, timeout=1):
"""
# 功能:从队列中取出一个数据
# 输入:
# timeout: {float} 超时限制,默认{1}
# 输出:
# reader_data: 读取到的数据
"""
try:
# 在指定时间内取出数据,防止阻塞
return self.trd_queue.get(True, timeout)
except queue.Empty:
# 如果指定时间内队列为空
return None
def run_task_multi_thread(trd_name, task_queue, task_result):
"""
# 功能:多线程消费任务队列
# 输入:
# trd_name: {str} 线程名
# task_queue: {ThreadQueue} 任务队列
# task_result: {list} 任务产物
"""
while True:
cur_task = task_queue.queue_reader(timeout=0.5)
if cur_task is None:
# 队列为空则退出循环
break
# 模拟任务处理
cur_result = f"{trd_name}|已处理:{cur_task}\n"
# 此处追加元素是线程安全的,如果涉及到列表修改,应使用线程锁 threading.Lock 来保护共享资源
task_result.append(cur_result)
print( f"{trd_name}|已处理:{cur_task}", flush=True )
# 随机暂停一段时间
time.sleep(random.uniform(0.5,2.0))
if __name__ == "__main__":
# 线程数
num_threads = 5
print( f"当前线程数:{num_threads}" )
# 模拟任务列表
task_list = [f"Task-{i}" for i in range(20)]
# 创建一个 FIFO 队列
task_queue = ThreadQueue(queue_type="FIFO")
# 将任务列表存入任务队列
print("正在读取任务列表,存入任务队列...")
for task_i in task_list:
task_queue.queue_writer(task_i)
print( "任务列表已存入任务队列" )
# 初始化任务产物
task_result = []
# 多线程消费队列中的任务
all_threads = []
for i in range(num_threads):
# 依次创建线程
cur_thread = threading.Thread(target=run_task_multi_thread, args=(f"Thread-{i}", task_queue, task_result))
# 启动线程
cur_thread.start()
all_threads.append(cur_thread)
# 等待所有线程结束,并释放资源
for trd_i in all_threads:
trd_i.join()
print("所有任务处理完毕")
print(task_result[:10])
线程池消费任务队列
【任务示例】
现有 task.txt 文件,包含了 1000 行内容,每行代表一条任务数据,由 序号|文本 组成,部分内容如下所示:
9000001|在一个遥远的星球上
9000002|有一个被蔚蓝海洋环抱的神秘国度
9000003|名为艾瑞斯利亚
9000004|艾瑞斯利亚的居民拥有一种独特的能力
9000005|能够通过心灵感应与自然界的万物沟通
9000006|无论是轻风的低语
9000007|还是古树的沉吟
9000008|对他们来说都是日常交流的一部分
9000009|王国的中心矗立着一座宏伟的水晶宫殿
9000010|宫殿闪耀着太阳的光辉
对于每条任务,需要进行如下处理:
(1)文本处理:在文本前添加 "已完成" 字符
(2)生成文件:将处理后的该条文本,存入独立的文件,文件名为 序号.txt
在处理完所有任务后,最终将每条处理后的文本汇总到一起,存入一个总的文件中,文件名为 task_result.txt
# -*- coding: utf-8 -*-
import os
import time
import random
import logging
import traceback
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class ThreadQueue():
"""
# 功能:线程常用队列
"""
def __init__(self, queue_type="FIFO"):
"""
# 功能:线程常用队列 初始化
# 输入:
# queue_type: {str} 队列类型 ["FIFO", "LIFO", "Priority"],默认{"FIFO"}
# "FIFO": 先进先出队列
# "LIFO": 后进先出堆栈
# "Priority": 优先级队列
"""
self.queue_type = queue_type
if self.queue_type == "FIFO":
self.trd_queue = queue.Queue()
elif self.queue_type == "LIFO":
self.trd_queue = queue.LifoQueue()
elif self.queue_type == "Priority":
self.trd_queue = queue.PriorityQueue()
else:
raise ValueError( f"不支持的队列类型: {self.queue_type}\n当前仅支持 ['FIFO', 'LIFO', 'Priority']" )
print( f"已创建 {self.queue_type} 型队列" )
def is_empty(self):
"""
# 功能:判断队列是否为空
"""
return self.trd_queue.empty()
def queue_writer(self, writer_data, priority=None):
"""
# 功能:将数据存入队列
# 输入:
# writer_data: {str,int,float,bool,dict等} 待存入数据
# priority: {int} 仅在 优先级队列 中启用,数值越小,优先级越高, 越先取出
"""
if self.queue_type == "Priority":
if priority is None:
raise ValueError("优先级队列需要提供优先级参数 priority ")
self.trd_queue.put((priority, writer_data))
else:
self.trd_queue.put(writer_data)
def queue_reader(self, timeout=1):
"""
# 功能:从队列中取出一个数据
# 输入:
# timeout: {float} 超时限制,默认{1}
# 输出:
# reader_data: 读取到的数据
"""
try:
# 在指定时间内取出数据,防止阻塞
return self.trd_queue.get(True, timeout)
except queue.Empty:
# 如果指定时间内队列为空
return None
def run_task_one(task_queue, task_done_event, task_result, task_output_path, error_list, trd_lock):
"""
# 功能:执行单个任务
# 输入:
# task_queue: {ThreadQueue} 共享任务队列
# task_done_event: {Event} 共享事件对象,用于通知所有任务已完成
# task_result: {list} 共享任务产物
# task_output_path: {str} 任务产物文件夹
# error_list: {list} 共享出错任务列表
# trd_lock: {Lock} 线程锁,用于保护共享资源
"""
while (not task_done_event.is_set()) or (not task_queue.is_empty()):
# 尝试获取任务,使用 timeout 来避免忙等待
cur_task = task_queue.queue_reader(timeout=0.5)
if cur_task is None:
# 当前队列为空时,该线程继续尝试从队列中获取新任务,直到所有任务都处理完毕
continue
try:
# 具体运行任务,以下仅作示例,根据实际修改
cur_split = cur_task.split("|")
cur_id, cur_text = cur_split[0], cur_split[1]
# (1)文本处理:在文本前添加 "已完成" 字符
cur_result = f"已完成|{cur_id}|{cur_text}\n"
# 存入最终任务结果
with trd_lock:
task_result.append(cur_result)
# (2)生成文件:将处理后的该条文本,存入独立的文件,文件名为 序号.txt
cur_file = f"{task_output_path}/{cur_id}.txt"
# 如果文件已存在,则跳过
if os.path.exists(cur_file):
print( f"文件已存在:{cur_file}" )
continue
# 生成文件
write_file_txt([cur_result], cur_file)
# 随机暂停一段时间
time.sleep(random.uniform(0.5,1.0))
logging.info( f"任务已完成: {cur_id}" )
except Exception as e:
# 记录出错任务列表
with trd_lock:
error_list.append( f"任务执行失败: {cur_id}\n" )
traceback.print_exc()
logging.error( f"任务执行失败: {cur_id} - {e}" )
finally:
# 无论如何,都标记该条任务完成
task_queue.trd_queue.task_done()
def run_task_multi_thread(task_list, task_output_path, task_log_path, num_threads=1):
"""
# 功能:多线程消费任务队列
# 输入:
# task_list: {list} 任务列表,每个元素为一个任务
# task_output_path: {str} 任务产物文件夹
# task_log_path: {str} 任务日志文件夹
# num_threads: {int} 线程数,默认{1}
"""
print( f"当前线程数:{num_threads}" )
# 确保输出和日志文件夹存在
os.makedirs(task_output_path, exist_ok=True)
os.makedirs(task_log_path, exist_ok=True)
# 任务运行日志
run_log_file = f"{task_log_path}/run.log"
# 设置日志格式和级别
logging.basicConfig(
filename=run_log_file,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 共享事件对象,用于通知所有任务已完成
task_done_event = threading.Event()
# 线程锁,用于保护共享资源
trd_lock = threading.Lock()
# 共享任务产物
task_result = []
task_result_file = f"{task_output_path}/task_result.txt"
# 共享出错任务列表
error_list = []
error_log_file = f"{task_log_path}/error.log"
# 创建一个 FIFO 队列
task_queue = ThreadQueue(queue_type="FIFO")
# 读取 task_list 并添加到任务队列中
print("正在读取任务列表,存入任务队列...")
for task_i in task_list:
task_queue.queue_writer(task_i)
print("任务列表已存入任务队列")
# 设置任务完成事件,确保不再添加新任务
# 在本示例中,先将所有任务加入任务队列,然后再开始消费任务队列,所以在此处即可声明任务完成,不再添加新任务
# 如果是一边消费任务队列任务,一边还要往任务队列中添加新任务,则不应在此处过早声明任务完成
task_done_event.set()
# 启动线程池并提交任务
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(run_task_one, task_queue, task_done_event, task_result, task_output_path, error_list, trd_lock) for _ in range(num_threads)}
for future in as_completed(futures):
try:
# 等待所有线程完成
future.result()
except Exception as e:
logging.error( f"子线程发生异常: {e}" )
traceback.print_exc()
# 等待所有任务被处理
task_queue.trd_queue.join()
# 最终写入任务产物
if task_result:
write_file_txt(task_result, task_result_file)
logging.info( f"已写入任务产物:{task_result_file}" )
# 最终写入错误日志
if error_list:
write_file_txt(error_list, error_log_file)
logging.info( f"发现出错任务,已写入错误日志:{error_log_file}" )
else:
logging.info( f"未发现出错任务。" )
logging.info("所有任务已完成!")
def write_file_txt(file_lines, txt_file, write_type='w'):
"""
# 功能:将 file_lines列表 内容按行写入 txt_file 文件
"""
with open(txt_file, write_type, errors='ignore') as f:
f.writelines(file_lines)
if __name__ == "__main__":
def read_file_txt(txt_file, encode_type='utf-8'):
try:
file_lines = []
with open(txt_file, encoding=encode_type, errors='ignore') as f:
for line in f:
file_lines.append(line.rstrip())
except FileNotFoundError:
print( "[ERROR] 文件不存在:{}".format(txt_file) )
return []
else:
print( "[INFO] 已读取文件:{}".format(txt_file) )
return file_lines
# 基本参数
task_file = "task.txt"
task_output_path = "./task_output"
task_log_path = "./task_logs"
# 线程数
num_threads = 5
# 读取文本文件,获取任务列表
task_list = read_file_txt(task_file)
# 多线程执行任务
run_task_multi_thread(task_list, task_output_path, task_log_path, num_threads)
print( f"所有任务已完成,任务详情参见文件夹:{task_log_path}" )
多进程
多进程是指启动多个独立的 Python 解释器进程来执行任务,每个进程可以想象为代码+代码执行所需的资源,它是操作系统分配资源的基本单元。每个进程都有自己独立的内存空间和系统资源,因此进程之间是完全隔离的。
稳定性:一个进程的崩溃不会影响到其他进程,增加了系统的稳定性。
资源消耗较大:与线程相比,创建和销毁进程的开销更大,并且占用更多的系统资源。
通信复杂:进程间不共享内存,因此它们之间的通信和数据共享更复杂,通常需要通过IPC机制来实现,如:管道、消息队列等。
不适用于复杂数据结构:子进程在创建时,需要通过序列化的方式,来复制和打包一份父进程的数据和资源。然而有些复杂的数据结构(如结构复杂的类、深度学习训练和推理结构等)无法被序列化,导致这种情况下无法创建子进程,也即无法使用多进程。
适用于 CPU密集型 任务:如:大规模数据处理、科学计算等,每个进程都有自己独立的 Python解释器 和 内存空间,在多核 CPU 上真正并行,能够更好地利用硬件资源。
多进程执行
# -*- coding: utf-8 -*-
import time
import random
import multiprocessing
# 待执行函数:无参函数
def my_worker_1():
# 获取当前进程名
cur_prc_name = multiprocessing.current_process().name
for i in range(10):
# 使用 flush=True 来强制刷新输出缓冲区
print( f"{cur_prc_name} | my_worker_1: {i+1}", flush=True )
# 随机暂停一段时间
time.sleep(random.uniform(0.5,2.0))
# 待执行函数:有参函数
def my_worker_2(num, name, age, score):
# 获取当前进程名
cur_prc_name = multiprocessing.current_process().name
for i in range(num):
# 使用 flush=True 来强制刷新输出缓冲区
print( f"{cur_prc_name} | my_worker_2: name({name}) - age({age}) - score({score})", flush=True )
# 随机暂停一段时间
time.sleep(random.uniform(0.5,2.0))
if __name__ == "__main__":
# 声明进程对象,target=待调用的函数名(不带括号,带上括号表示调用执行函数取得返回结果)
my_prc_1 = multiprocessing.Process(target=my_worker_1)
# 传参:args={tuple},元组中的参数与函数形参必须保持个数、顺序一致
my_prc_2 = multiprocessing.Process(target=my_worker_2, args=(15, "juju", 18, 98))
# 传参:kwargs={dict},字典中的参数,key为函数形参名,value为函数实参值
my_prc_3 = multiprocessing.Process(target=my_worker_2, kwargs={"age":20, "score":96, "num":20, "name":"zizi"})
# 混合传参:args=(10,) 当 args 只有一个参数时,后面要加逗号,是为了创建单元素元组
my_prc_4 = multiprocessing.Process(target=my_worker_2, args=(10, ), kwargs={"name":"guagua", "age":20, "score":96})
# 启动进程(当调用 start() 时,才会真正创建进程,并开始执行)
my_prc_1.start()
my_prc_2.start()
my_prc_3.start()
my_prc_4.start()
# 代码运行到此处,主进程的任务已完成,但需要等待所有子进程结束
# 在线程中此步骤是隐式的,但对进程来说,显式地等待所有子进程结束非常重要
for prc_i in [my_prc_1, my_prc_2, my_prc_3, my_prc_4]:
prc_i.join()
print("所有子进程已结束")
多进程消费任务队列
# -*- coding: utf-8 -*-
import time
import random
import heapq
import multiprocessing
class ProcessQueue():
"""
# 功能:进程常用队列
"""
def __init__(self, prc_manager, queue_type="FIFO"):
"""
# 功能:进程常用队列 初始化
# 输入:
# prc_manager: {multiprocessing.Manager()} 进程管理器,用于同步数据
# queue_type: {str} 队列类型 ["FIFO", "LIFO", "Priority"],默认{"FIFO"}
# "FIFO": 先进先出队列
# "LIFO": 后进先出堆栈
# "Priority": 优先级队列
"""
self.prc_manager = prc_manager
self.queue_type = queue_type
if self.queue_type == "FIFO":
self.prc_queue = self.prc_manager.Queue()
elif self.queue_type == "LIFO":
# 使用 list 模拟 LIFO
self.prc_queue = self.prc_manager.list()
elif self.queue_type == "Priority":
# 使用普通列表与 heapq 模块实现优先级队列
self.prc_queue = []
# 加锁确保进程安全
self.prc_lock = self.prc_manager.Lock()
else:
raise ValueError( f"不支持的队列类型: {self.queue_type}\n当前仅支持 ['FIFO', 'LIFO', 'Priority']" )
print( f"已创建 {self.queue_type} 型队列" )
def is_empty(self):
"""
# 功能:判断队列是否为空
"""
if self.queue_type == "FIFO":
return self.prc_queue.empty()
else:
return (len(self.prc_queue)==0)
def queue_writer(self, writer_data, priority=None):
"""
# 功能:将数据存入队列
# 输入:
# writer_data: {str,int,float,bool,dict等} 待存入数据
# priority: {int} 仅在 优先级队列 中启用,数值越小,优先级越高, 越先取出
"""
if self.queue_type == "FIFO":
self.prc_queue.put(writer_data)
elif self.queue_type == "LIFO":
self.prc_queue.append(writer_data)
elif self.queue_type == "Priority":
if priority is None:
raise ValueError("优先级队列需要提供优先级参数 priority ")
with self.prc_lock:
heapq.heappush(self.prc_queue, (priority, writer_data))
def queue_reader(self, timeout=1):
"""
# 功能:从队列中取出一个数据
# 输入:
# timeout: {float} 超时限制,默认{1}
# 输出:
# reader_data: 读取到的数据
"""
try:
if self.queue_type == "FIFO":
# 在指定时间内取出数据,防止阻塞
return self.prc_queue.get(True, timeout)
elif self.queue_type == "LIFO":
if len(self.prc_queue) > 0:
return self.prc_queue.pop(-1)
else:
return None
elif self.queue_type == "Priority":
if self.prc_queue:
with self.prc_lock:
return heapq.heappop(self.prc_queue)[1]
else:
return None
except Exception as e:
return None
def run_task_multi_process(prc_name, task_queue, task_result):
"""
# 功能:多进程消费任务队列
# 输入:
# prc_name: {str} 进程名
# task_queue: {ProcessQueue} 任务队列
# task_result: {list} 任务产物
"""
while True:
cur_task = task_queue.queue_reader(timeout=0.5)
if cur_task is None:
# 队列为空则退出循环
break
# 模拟任务处理
cur_result = f"{prc_name}|已处理:{cur_task}\n"
task_result.append(cur_result)
print( f"{prc_name}|已处理:{cur_task}", flush=True )
# 随机暂停一段时间
time.sleep(random.uniform(0.5,2.0))
if __name__ == "__main__":
# 进程数
num_processes = 5
print( f"当前进程数:{num_processes}" )
# 模拟任务列表
task_list = [f"Task-{i}" for i in range(20)]
# 创建可共享的进程管理器
prc_manager = multiprocessing.Manager()
# 共享任务产物
task_result = prc_manager.list()
# 创建一个 FIFO 队列
task_queue = ProcessQueue(prc_manager, queue_type="FIFO")
# 将任务列表存入任务队列
print("正在读取任务列表,存入任务队列...")
for task_i in task_list:
task_queue.queue_writer(task_i)
print( "任务列表已存入任务队列" )
# 多进程消费队列中的任务
all_processes = []
for i in range(num_processes):
# 依次创建进程
cur_process = multiprocessing.Process(target=run_task_multi_process, args=(f"Process-{i}", task_queue, task_result))
# 启动进程
cur_process.start()
all_processes.append(cur_process)
# 等待所有进程结束,并释放资源
for prc_i in all_processes:
prc_i.join()
print("所有任务处理完毕")
print(task_result[:10])
进程池消费任务队列
【任务示例】
现有 task.txt 文件,包含了 1000 行内容,每行代表一条任务数据,由 序号|文本 组成,部分内容如下所示:
9000001|在一个遥远的星球上
9000002|有一个被蔚蓝海洋环抱的神秘国度
9000003|名为艾瑞斯利亚
9000004|艾瑞斯利亚的居民拥有一种独特的能力
9000005|能够通过心灵感应与自然界的万物沟通
9000006|无论是轻风的低语
9000007|还是古树的沉吟
9000008|对他们来说都是日常交流的一部分
9000009|王国的中心矗立着一座宏伟的水晶宫殿
9000010|宫殿闪耀着太阳的光辉
对于每条任务,需要进行如下处理:
(1)文本处理:在文本前添加 "已完成" 字符
(2)生成文件:将处理后的该条文本,存入独立的文件,文件名为 序号.txt
在处理完所有任务后,最终将每条处理后的文本汇总到一起,存入一个总的文件中,文件名为 task_result.txt
# -*- coding: utf-8 -*-
import os
import time
import random
import logging
import traceback
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
def run_task_one(task_queue, task_done_event, task_result, task_output_path, error_list):
"""
# 功能:执行单个任务
# 输入:
# task_queue: {JoinableQueue} 共享任务队列
# task_done_event: {Event} 共享事件对象,用于通知所有任务已完成
# task_result: {list} 共享任务产物
# task_output_path: {str} 任务产物文件夹
# error_list: {list} 共享出错任务列表
"""
while (not task_done_event.is_set()) or (not task_queue.empty()):
try:
# 尝试获取任务,使用 timeout 来避免忙等待
cur_task = task_queue.get(True, timeout=0.1)
except Exception as e:
# 主要用于拦截队列为空时的 Empty 错误
# 当队列当前为空时,该进程继续尝试从队列中获取新任务,直到所有任务都处理完毕
continue
try:
# 具体运行任务,以下仅作示例,根据实际修改
cur_split = cur_task.split("|")
cur_id, cur_text = cur_split[0], cur_split[1]
# (1)文本处理:在文本前添加 "已完成" 字符
cur_result = f"已完成|{cur_id}|{cur_text}\n"
# 存入最终任务结果
task_result.append(cur_result)
# (2)生成文件:将处理后的该条文本,存入独立的文件,文件名为 序号.txt
cur_file = f"{task_output_path}/{cur_id}.txt"
# 如果文件已存在,则跳过
if os.path.exists(cur_file):
print( f"文件已存在:{cur_file}" )
continue
# 生成文件
write_file_txt([cur_result], cur_file)
# 随机暂停一段时间
time.sleep(random.uniform(0.5,1.0))
logging.info( f"任务已完成: {cur_id}" )
except Exception as e:
# 记录出错任务列表
error_list.append( f"任务执行失败: {cur_id}\n" )
traceback.print_exc()
logging.error( f"任务执行失败: {cur_id} - {e}" )
finally:
# 无论如何,都标记该条任务完成
task_queue.task_done()
def run_task_multi_process(task_list, task_output_path, task_log_path, num_processes=1):
"""
# 功能:多进程消费任务队列
# 输入:
# task_list: {list} 任务列表,每个元素为一个任务
# task_output_path: {str} 任务产物文件夹
# task_log_path: {str} 任务日志文件夹
# num_processes: {int} 进程数,默认{1}
"""
# 实际进程数(设置进程数 与 CPU 核心数 取较小值)
max_workers = min(multiprocessing.cpu_count(), num_processes)
print( f"当前实际进程数:{max_workers}" )
# 确保输出和日志文件夹存在
os.makedirs(task_output_path, exist_ok=True)
os.makedirs(task_log_path, exist_ok=True)
# 任务运行日志
run_log_file = f"{task_log_path}/run.log"
# 设置日志格式和级别
logging.basicConfig(
filename=run_log_file,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 创建可共享的进程管理器
prc_manager = multiprocessing.Manager()
# 共享事件对象,用于通知所有任务已完成
task_done_event = prc_manager.Event()
# 共享任务产物
task_result = prc_manager.list()
task_result_file = f"{task_output_path}/task_result.txt"
# 共享出错任务列表
error_list = prc_manager.list()
error_log_file = f"{task_log_path}/error.log"
# 任务队列:使用 JoinableQueue 以便可以调用 join()
task_queue = prc_manager.JoinableQueue()
# 读取 task_list 并添加到任务队列中
print("正在读取任务列表,存入任务队列...")
for task_i in task_list:
task_queue.put(task_i)
print("任务列表已存入任务队列")
# 设置任务完成事件,确保不再添加新任务
# 在本示例中,先将所有任务加入任务队列,然后再开始消费任务队列,所以在此处即可声明任务完成,不再添加新任务
# 如果是一边消费任务队列任务,一边还要往任务队列中添加新任务,则不应在此处过早声明任务完成
task_done_event.set()
# 启动进程池并提交任务
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(run_task_one, task_queue, task_done_event, task_result, task_output_path, error_list) for _ in range(max_workers)}
for future in as_completed(futures):
try:
# 等待所有进程完成
future.result()
except Exception as e:
logging.error( f"子进程发生异常: {e}" )
traceback.print_exc()
# 等待所有任务被处理
task_queue.join()
# 最终写入任务产物
if task_result:
write_file_txt(task_result, task_result_file)
logging.info( f"已写入任务产物:{task_result_file}" )
# 最终写入错误日志
if error_list:
write_file_txt(error_list, error_log_file)
logging.info( f"发现出错任务,已写入错误日志:{error_log_file}" )
else:
logging.info( f"未发现出错任务。" )
logging.info("所有任务已完成!")
def write_file_txt(file_lines, txt_file, write_type='w'):
"""
# 功能:将 file_lines列表 内容按行写入 txt_file 文件
"""
with open(txt_file, write_type, errors='ignore') as f:
f.writelines(file_lines)
if __name__ == "__main__":
def read_file_txt(txt_file, encode_type='utf-8'):
try:
file_lines = []
with open(txt_file, encoding=encode_type, errors='ignore') as f:
for line in f:
file_lines.append(line.rstrip())
except FileNotFoundError:
print( "[ERROR] 文件不存在:{}".format(txt_file) )
return []
else:
print( "[INFO] 已读取文件:{}".format(txt_file) )
return file_lines
# 基本参数
task_file = "task.txt"
task_output_path = "./task_output"
task_log_path = "./task_logs"
# 进程数
num_processes = 5
# 读取文本文件,获取任务列表
task_list = read_file_txt(task_file)
# 多进程执行任务
run_task_multi_process(task_list, task_output_path, task_log_path, num_processes)
print( f"所有任务已完成,任务详情参见文件夹:{task_log_path}" )
全局解释器锁(GIL)
全局解释器锁(GIL, Global Interpreter Lock) 是指在 CPython 解释器中,执行 Python 字节码时,为了保护访问 Python 对象而阻止多个线程执行的一把互斥锁。这把锁存在的主要原因是 CPython 解释器的内存管理不是线程安全的。
GIL 与 Python
GIL 是为了让解释器在执行 Python 代码时,同一时刻只有一个线程在运行,以此保证内存管理是安全的。
由于 Python 默认的解释器是 CPython,而 GIL 存在于 CPython 解释器中,因此平时说到 GIL 就默认它是 Python 语言的特性(其实并不准确)。
准确来说,GIL 是存在于 CPython 解释器中的,属于解释器层级,而并非属于 Python 的语言特性。也就是说,如果你自己有能力实现一个 Python 解释器,完全可以避免 GIL。
常见的 Python 解释器有如下几种:
CPython:C 语言开发的解释器,官方默认,使用最为广泛,存在 GIL
IPython:基于 CPython 开发的交互式解释器,只是增强了交互功能,执行过程与 CPython 完全一样
PyPy:采用 JIT 技术对 Python 代码进行动态编译(不是解释),可以显著提高代码的执行速度,但执行结果可能与 CPython 不同,存在 GIL
Jython:运行在 Java 平台的 Python 解释器,可以把 Python 代码编译成 Java 字节码,依赖 Java 平台,不存在 GIL
IronPython:和 Jython 类似,运行在微软的 .Net 平台下的 Python 解释器,可以把 Python 代码编译成 .Net 字节码,不存在 GIL
由于官方默认的 CPython 解释器存在 GIL,逐渐导致很多 Python 开发者认为 Python 就是线程安全的,因此写代码时对共享资源的访问不会加锁😅
GIL 的影响
想要了解 GIL 对 Python 多线程的影响,首先来看一个示例
import threading
def loop():
count = 0
while count <= 10000:
count += 1
# 2 个线程执行 loop 方法
t1 = threading.Thread(target=loop)
t2 = threading.Thread(target=loop)
t1.start()
t2.start()
t1.join()
t2.join()
在上述示例中,虽然我们开启了 2 个线程去执行 loop,但观察 CPU 的使用情况,发现这个程序只能跑满一个 CPU 核心,没有利用到多核。
这就是 GIL 带来的问题。
其原因在于,一个 Python 线程想要执行一段代码,必须先拿到 GIL 锁后才被允许执行,也就是说,即使我们使用了多线程,但同一时刻却只有一个线程在执行。
但进一步思考一下,就算有 GIL 的存在,理论来说,如果 GIL 释放的够快,多线程怎么也要比单线程执行效率高吧?
但现实的结果是:多线程比我们想象的更糟糕。
如下的示例,还是运行一个 CPU 密集型的任务程序,我们来看单线程执行 2 次和 2 个线程同时执行,哪个效率更高?
# 单线程执行 2 次 CPU 密集型任务
import time
import threading
def loop():
count = 0
while count <= 1000000000:
count += 1
start = time.time()
loop()
loop()
end = time.time()
print( f"耗时: {end-start}" ) # 89.63111019134521
# 2 个线程同时执行 CPU 密集型任务
import time
import threading
def loop():
count = 0
while count <= 1000000000:
count += 1
start = time.time()
t1 = threading.Thread(target=loop)
t2 = threading.Thread(target=loop)
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()
print( f"耗时: {end-start}" ) # 92.29994678497314
从执行结果来看,多线程的效率还不如单线程的执行效率高!为什么会导致这种情况?
其实,由于 Python 的线程就是 C 语言的 pthread,它是通过操作系统调度算法调度执行的。
Python 2.x 的代码执行是基于 opcode 数量的调度方式,简单来说就是每执行一定数量的字节码,或遇到系统 IO 时,会强制释放 GIL,然后触发一次操作系统的线程调度。
虽然在 Python 3.x 进行了优化,基于固定时间的调度方式,就是每执行固定时间的字节码,或遇到系统 IO 时,强制释放 GIL,触发系统的线程调度。
但这种线程的调度方式,都会导致同一时刻只有一个线程在运行。
而线程在调度时,又依赖系统的 CPU 环境,也就是在单核 CPU 或多核 CPU 下,多线程在调度切换时的成本是不同的。
如果是在单核 CPU 环境下,多线程在执行时,线程 A 释放了 GIL 锁,那么被唤醒的线程 B 能够立即拿到 GIL 锁,线程 B 可以无缝接力继续执行。
而在多核 CPU 环境下,当多线程执行时,线程 A 在 CPU0 执行完之后释放 GIL 锁,其他 CPU 上的线程都会进行竞争。但 CPU0 上的线程 B 可能又马上获取到了 GIL,这就导致其他 CPU 上被唤醒的线程,只能眼巴巴地看着 CPU0 上的线程愉快地执行着,而自己只能等待,直到又被切换到待调度的状态,这就会产生多核 CPU 频繁进行线程切换,消耗资源,这种情况也被叫做 CPU 颠簸。
这就是多线程在多核 CPU 下,执行效率还不如单线程或单核 CPU 效率高的原因。
至此我们可以得出一个结论:如果使用多线程运行一个 CPU 密集型任务,那么 Python 多线程是无法提高运行效率的。
等等,事情就这样结束了吗?
我们还需要考虑另一种场景:如果多线程运行的不是一个 CPU 密集型任务,而是一个 IO 密集型的任务,结果又会如何呢?
答案是:多线程可以显著提高 IO 密集型任务的运行效率!
其实原因也很简单,因为 IO 密集型的任务,大部分时间都花在等待 IO 上,并没有一直占用 CPU 的资源,所以并不会像上面的程序那样,进行无效的线程切换。
例如,如果我们想要下载 2 个网页的数据,也就是发起 2 个网络请求,如果使用单线程的方式运行,只能是依次串行执行,其中等待的总耗时是 2 个网络请求的时间之和。
而如果采用 2 个线程的方式同时处理,这 2 个网络请求会同时发送,然后同时等待数据返回(IO等待),最终等待的时间取决于耗时最久的线程时间,这会比串行执行效率要高得多。
所以,如果需要运行 IO 密集型任务,Python 多线程是可以提高运行效率的。
为什么会有 GIL
我们已经了解到 GIL 对于处理 CPU 密集型任务的场景,多线程是无法提高运行效率的。
既然 GIL 的影响这么大,那为什么 Python 解释器 CPython 在设计时要采用这种方式呢?
这就需要追溯历史原因了。
在 2000 年以前,各个 CPU 厂商为了提高计算机的性能,其努力方向都在提升单个 CPU 的运行频率上,但在之后的几年遇到了天花板,单个 CPU 性能已经无法再得到大幅度提升,所以在 2000 年以后,提升计算机性能的方向便改为向多 CPU 核心方向发展。
为了更有效的利用多核心 CPU,很多编程语言就出现了多线程的编程方式,但也正是有了多线程的存在,随之带来的问题就是多线程之间对于维护数据和状态一致性的困难。
Python 设计者在设计解释器时,可能没有想到 CPU 的性能提升会这么快转为多核心方向发展,所以在当时的场景下,设计一个全局锁是那个时代保护多线程资源一致性最简单经济的设计方案。
而随着多核心时代来临,当大家试图去拆分和去除 GIL 的时候,发现大量库的代码和开发者已经重度依赖 GIL(默认认为 Pythonn 内部对象是线程安全的,无需在开发时额外加锁),所以这个去除 GIL 的任务变得复杂且难以实现。
所以 GIL 的存在更多的是历史原因,在 Python 3 的版本,虽然对 GIL 做了优化,但依旧没有去除掉,Python 设计者的解释是,在去除 GIL 时,会破坏现有的 C 扩展模块,因为这些扩展模块都严重依赖于 GIL,去除 GIL 有可能会导致运行速度会比 Python 2 更慢。
Python 走到现在,已经有太多的历史包袱,所以现在只能背负着它们前行,如果一切推倒重来,想必 Python 设计者会设计得更加优雅一些。
GIL 解决方案
IO 密集型任务:使用多线程,可以显著提高运行效率
CPU 密集型任务:使用多进程,绕开 GIL 限制
更换没有 GIL 的 Python 解释器,但需要提前评估运行结果是否与 CPython 一致
编写 Python 的 C 扩展模块,把 CPU 密集型任务交给 C 模块处理,但是编码较为复杂
评论区