多线程编程的重要性
多线程编程是提高程序并发性能的重要手段。Python的threading模块提供了完整的线程支持,虽然受到GIL(全局解释器锁)的限制,但在I/O密集型任务中仍然非常有用。掌握多线程编程对于构建高性能应用至关重要。
多线程基础
1. 基本线程操作
import threading
import time
import random
def threading_basics():
"""多线程基础操作"""
def worker(name, duration):
"""工作线程函数"""
print(f"线程 {name} 开始工作")
for i in range(3):
print(f"线程 {name}: 执行任务 {i+1}")
time.sleep(duration)
print(f"线程 {name} 完成工作")
# 创建多个线程
threads = []
for i in range(3):
thread = threading.Thread(target=worker, args=(f"Worker-{i+1}", 0.5))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("所有线程执行完成")
threading_basics()
2. 线程状态管理
def thread_state_management():
"""线程状态管理"""
class WorkerThread(threading.Thread):
"""自定义工作线程"""
def __init__(self, name, work_duration):
super().__init__()
self.name = name
self.work_duration = work_duration
self.is_running = False
self.is_paused = False
self.result = None
def run(self):
"""线程执行逻辑"""
self.is_running = True
print(f"线程 {self.name} 开始执行")
try:
for i in range(5):
if not self.is_running:
break
# 检查暂停状态
while self.is_paused and self.is_running:
time.sleep(0.1)
if not self.is_running:
break
print(f"线程 {self.name}: 执行步骤 {i+1}")
time.sleep(self.work_duration)
self.result = f"线程 {self.name} 执行完成"
print(self.result)
except Exception as e:
print(f"线程 {self.name} 执行出错: {e}")
finally:
self.is_running = False
def pause(self):
"""暂停线程"""
self.is_paused = True
print(f"线程 {self.name} 已暂停")
def resume(self):
"""恢复线程"""
self.is_paused = False
print(f"线程 {self.name} 已恢复")
def stop(self):
"""停止线程"""
self.is_running = False
print(f"线程 {self.name} 已停止")
# 创建和管理线程
worker = WorkerThread("Worker-1", 0.3)
worker.start()
# 让线程运行一段时间
time.sleep(1)
# 暂停线程
worker.pause()
time.sleep(0.5)
# 恢复线程
worker.resume()
time.sleep(1)
# 停止线程
worker.stop()
worker.join()
print(f"线程结果: {worker.result}")
thread_state_management()
线程同步
1. 锁机制
def lock_mechanisms():
"""锁机制示例"""
class BankAccount:
"""银行账户类"""
def __init__(self, initial_balance=0):
self.balance = initial_balance
self.lock = threading.Lock()
def deposit(self, amount):
"""存款"""
with self.lock:
old_balance = self.balance
time.sleep(0.01) # 模拟处理时间
self.balance += amount
print(f"存款 {amount},余额从 {old_balance} 变为 {self.balance}")
def withdraw(self, amount):
"""取款"""
with self.lock:
if self.balance >= amount:
old_balance = self.balance
time.sleep(0.01) # 模拟处理时间
self.balance -= amount
print(f"取款 {amount},余额从 {old_balance} 变为 {self.balance}")
return True
else:
print(f"取款失败:余额不足,当前余额 {self.balance}")
return False
def get_balance(self):
"""获取余额"""
with self.lock:
return self.balance
def account_operations(account, operations):
"""账户操作函数"""
for operation, amount in operations:
if operation == 'deposit':
account.deposit(amount)
elif operation == 'withdraw':
account.withdraw(amount)
time.sleep(0.01)
# 创建银行账户
account = BankAccount(1000)
# 定义操作序列
operations1 = [('deposit', 100), ('withdraw', 50), ('deposit', 200)]
operations2 = [('withdraw', 150), ('deposit', 300), ('withdraw', 100)]
# 创建线程执行操作
thread1 = threading.Thread(target=account_operations, args=(account, operations1))
thread2 = threading.Thread(target=account_operations, args=(account, operations2))
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print(f"最终余额: {account.get_balance()}")
lock_mechanisms()
2. 条件变量和信号量
def synchronization_primitives():
"""同步原语示例"""
class ProducerConsumer:
"""生产者消费者模式"""
def __init__(self, max_size=5):
self.queue = []
self.max_size = max_size
self.lock = threading.Lock()
self.not_empty = threading.Condition(self.lock)
self.not_full = threading.Condition(self.lock)
def produce(self, item):
"""生产物品"""
with self.not_full:
while len(self.queue) >= self.max_size:
print("队列已满,等待消费...")
self.not_full.wait()
self.queue.append(item)
print(f"生产物品: {item},队列长度: {len(self.queue)}")
self.not_empty.notify()
def consume(self):
"""消费物品"""
with self.not_empty:
while len(self.queue) == 0:
print("队列为空,等待生产...")
self.not_empty.wait()
item = self.queue.pop(0)
print(f"消费物品: {item},队列长度: {len(self.queue)}")
self.not_full.notify()
return item
def producer(pc, items):
"""生产者线程"""
for item in items:
pc.produce(item)
time.sleep(0.1)
print("生产者完成")
def consumer(pc, count):
"""消费者线程"""
for _ in range(count):
item = pc.consume()
time.sleep(0.15)
print("消费者完成")
# 创建生产者消费者系统
pc = ProducerConsumer(max_size=3)
# 创建线程
producer_thread = threading.Thread(target=producer, args=(pc, ['A', 'B', 'C', 'D', 'E']))
consumer_thread = threading.Thread(target=consumer, args=(pc, 5))
# 启动线程
producer_thread.start()
consumer_thread.start()
# 等待完成
producer_thread.join()
consumer_thread.join()
print("生产者消费者模式演示完成")
synchronization_primitives()
队列通信
1. 线程安全队列
def queue_communication():
"""队列通信示例"""
import queue
def producer_worker(q, name, items):
"""生产者工作函数"""
for item in items:
q.put(item)
print(f"生产者 {name} 生产: {item}")
time.sleep(0.1)
q.put(None) # 结束信号
print(f"生产者 {name} 完成")
def consumer_worker(q, name):
"""消费者工作函数"""
while True:
item = q.get()
if item is None:
break
print(f"消费者 {name} 消费: {item}")
time.sleep(0.15)
q.task_done()
print(f"消费者 {name} 完成")
# 创建队列
q = queue.Queue(maxsize=3)
# 创建生产者线程
producer1 = threading.Thread(target=producer_worker, args=(q, "P1", ['A', 'B', 'C']))
producer2 = threading.Thread(target=producer_worker, args=(q, "P2", ['D', 'E', 'F']))
# 创建消费者线程
consumer1 = threading.Thread(target=consumer_worker, args=(q, "C1"))
consumer2 = threading.Thread(target=consumer_worker, args=(q, "C2"))
# 启动线程
producer1.start()
producer2.start()
consumer1.start()
consumer2.start()
# 等待生产者完成
producer1.join()
producer2.join()
# 发送结束信号
q.put(None)
q.put(None)
# 等待消费者完成
consumer1.join()
consumer2.join()
print("队列通信演示完成")
queue_communication()
2. 优先级队列
def priority_queue_example():
"""优先级队列示例"""
import queue
import heapq
class PriorityQueue:
"""优先级队列"""
def __init__(self):
self._queue = []
self._index = 0
self._lock = threading.Lock()
def put(self, item, priority):
"""添加项目"""
with self._lock:
heapq.heappush(self._queue, (priority, self._index, item))
self._index += 1
def get(self):
"""获取项目"""
with self._lock:
if self._queue:
return heapq.heappop(self._queue)[-1]
return None
def empty(self):
"""检查是否为空"""
with self._lock:
return len(self._queue) == 0
def task_processor(pq, name):
"""任务处理器"""
while True:
if pq.empty():
time.sleep(0.1)
continue
task = pq.get()
if task is None:
break
print(f"处理器 {name} 处理任务: {task}")
time.sleep(0.2)
print(f"处理器 {name} 完成")
# 创建优先级队列
pq = PriorityQueue()
# 添加任务(数字越小优先级越高)
tasks = [
("高优先级任务1", 1),
("低优先级任务1", 5),
("中优先级任务1", 3),
("高优先级任务2", 1),
("低优先级任务2", 5),
("中优先级任务2", 3)
]
for task, priority in tasks:
pq.put(task, priority)
# 创建处理器线程
processor1 = threading.Thread(target=task_processor, args=(pq, "P1"))
processor2 = threading.Thread(target=task_processor, args=(pq, "P2"))
# 启动处理器
processor1.start()
processor2.start()
# 等待所有任务完成
while not pq.empty():
time.sleep(0.1)
# 停止处理器
pq.put(None)
pq.put(None)
processor1.join()
processor2.join()
print("优先级队列演示完成")
priority_queue_example()
实际应用案例
1. 线程池实现
def thread_pool_example():
"""线程池示例"""
class ThreadPool:
"""简单线程池实现"""
def __init__(self, num_threads):
self.num_threads = num_threads
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.threads = []
self.shutdown = False
# 启动工作线程
for i in range(num_threads):
thread = threading.Thread(target=self._worker, args=(i,))
thread.daemon = True
thread.start()
self.threads.append(thread)
def _worker(self, worker_id):
"""工作线程函数"""
while not self.shutdown:
try:
task = self.task_queue.get(timeout=1)
if task is None:
break
func, args, kwargs = task
result = func(*args, **kwargs)
self.result_queue.put(result)
self.task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"工作线程 {worker_id} 出错: {e}")
def submit(self, func, *args, **kwargs):
"""提交任务"""
self.task_queue.put((func, args, kwargs))
def get_result(self):
"""获取结果"""
return self.result_queue.get()
def shutdown_pool(self):
"""关闭线程池"""
self.shutdown = True
for _ in range(self.num_threads):
self.task_queue.put(None)
for thread in self.threads:
thread.join()
def worker_task(task_id, duration):
"""工作任务"""
print(f"任务 {task_id} 开始执行")
time.sleep(duration)
result = f"任务 {task_id} 完成,耗时 {duration} 秒"
print(result)
return result
# 创建线程池
pool = ThreadPool(3)
# 提交任务
for i in range(5):
pool.submit(worker_task, i, random.uniform(0.5, 2.0))
# 获取结果
results = []
for _ in range(5):
result = pool.get_result()
results.append(result)
# 关闭线程池
pool.shutdown_pool()
print("线程池演示完成")
print("结果:", results)
thread_pool_example()
2. 并发下载器
def concurrent_downloader():
"""并发下载器示例"""
class Downloader:
"""下载器类"""
def __init__(self, max_workers=3):
self.max_workers = max_workers
self.download_queue = queue.Queue()
self.results = []
self.lock = threading.Lock()
def download_file(self, url, filename):
"""下载文件(模拟)"""
print(f"开始下载: {url}")
time.sleep(random.uniform(1, 3)) # 模拟下载时间
print(f"下载完成: {filename}")
return {"url": url, "filename": filename, "status": "success"}
def worker(self, worker_id):
"""工作线程"""
while True:
try:
task = self.download_queue.get(timeout=1)
if task is None:
break
url, filename = task
result = self.download_file(url, filename)
with self.lock:
self.results.append(result)
self.download_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"下载线程 {worker_id} 出错: {e}")
def download_files(self, file_list):
"""下载文件列表"""
# 创建下载线程
threads = []
for i in range(self.max_workers):
thread = threading.Thread(target=self.worker, args=(i,))
thread.daemon = True
thread.start()
threads.append(thread)
# 添加下载任务
for url, filename in file_list:
self.download_queue.put((url, filename))
# 等待所有任务完成
self.download_queue.join()
# 停止工作线程
for _ in range(self.max_workers):
self.download_queue.put(None)
for thread in threads:
thread.join()
return self.results
# 创建下载器
downloader = Downloader(max_workers=3)
# 定义下载列表
files_to_download = [
("http://example.com/file1.zip", "file1.zip"),
("http://example.com/file2.zip", "file2.zip"),
("http://example.com/file3.zip", "file3.zip"),
("http://example.com/file4.zip", "file4.zip"),
("http://example.com/file5.zip", "file5.zip")
]
# 开始下载
print("开始并发下载...")
results = downloader.download_files(files_to_download)
print("下载完成!")
for result in results:
print(f"文件: {result['filename']}, 状态: {result['status']}")
concurrent_downloader()
总结
掌握Python多线程编程是提高程序性能的关键:
- 线程基础:理解线程的创建、启动和管理
- 线程同步:掌握锁、条件变量等同步机制
- 队列通信:使用队列进行线程间通信
- 线程池:实现高效的线程池管理
- 实际应用:在下载器、任务处理等场景中应用
- 最佳实践:遵循多线程编程的最佳实践
通过系统学习这些概念,你将能够编写出高效、安全的多线程程序,提高应用的并发性能。
转载请注明:周志洋的博客 » Python实用技巧-多线程基础


