周志洋

个人站

持续学习 才能不被淘汰


Python实用技巧-多线程编程详解

多线程编程的重要性

多线程编程是提高程序并发性能的重要手段。Python的threading模块提供了完整的线程支持,虽然受到GIL(全局解释器锁)的限制,但在I/O密集型任务中仍然非常有用。掌握多线程编程对于构建高性能应用至关重要。

多线程基础

1. 基本线程操作

import threading
import time
import random

def threading_basics():
    """多线程基础操作"""
    
    def worker(name, duration):
        """工作线程函数"""
        print(f"线程 {name} 开始工作")
        for i in range(3):
            print(f"线程 {name}: 执行任务 {i+1}")
            time.sleep(duration)
        print(f"线程 {name} 完成工作")
    
    # 创建多个线程
    threads = []
    for i in range(3):
        thread = threading.Thread(target=worker, args=(f"Worker-{i+1}", 0.5))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    print("所有线程执行完成")

threading_basics()

2. 线程状态管理

def thread_state_management():
    """线程状态管理"""
    
    class WorkerThread(threading.Thread):
        """自定义工作线程"""
        
        def __init__(self, name, work_duration):
            super().__init__()
            self.name = name
            self.work_duration = work_duration
            self.is_running = False
            self.is_paused = False
            self.result = None
        
        def run(self):
            """线程执行逻辑"""
            self.is_running = True
            print(f"线程 {self.name} 开始执行")
            
            try:
                for i in range(5):
                    if not self.is_running:
                        break
                    
                    # 检查暂停状态
                    while self.is_paused and self.is_running:
                        time.sleep(0.1)
                    
                    if not self.is_running:
                        break
                    
                    print(f"线程 {self.name}: 执行步骤 {i+1}")
                    time.sleep(self.work_duration)
                
                self.result = f"线程 {self.name} 执行完成"
                print(self.result)
                
            except Exception as e:
                print(f"线程 {self.name} 执行出错: {e}")
            finally:
                self.is_running = False
        def pause(self):
            """暂停线程"""
            self.is_paused = True
            print(f"线程 {self.name} 已暂停")
        
        def resume(self):
            """恢复线程"""
            self.is_paused = False
            print(f"线程 {self.name} 已恢复")
        
        def stop(self):
            """停止线程"""
            self.is_running = False
            print(f"线程 {self.name} 已停止")
    
    # 创建和管理线程
    worker = WorkerThread("Worker-1", 0.3)
    worker.start()
    
    # 让线程运行一段时间
    time.sleep(1)
    
    # 暂停线程
    worker.pause()
    time.sleep(0.5)
    
    # 恢复线程
    worker.resume()
    time.sleep(1)
    
    # 停止线程
    worker.stop()
    worker.join()
    
    print(f"线程结果: {worker.result}")

thread_state_management()

线程同步

1. 锁机制

def lock_mechanisms():
    """锁机制示例"""
    
    class BankAccount:
        """银行账户类"""
        
        def __init__(self, initial_balance=0):
            self.balance = initial_balance
            self.lock = threading.Lock()
        
        def deposit(self, amount):
            """存款"""
            with self.lock:
                old_balance = self.balance
                time.sleep(0.01)  # 模拟处理时间
                self.balance += amount
                print(f"存款 {amount},余额从 {old_balance} 变为 {self.balance}")
        
        def withdraw(self, amount):
            """取款"""
            with self.lock:
                if self.balance >= amount:
                    old_balance = self.balance
                    time.sleep(0.01)  # 模拟处理时间
                    self.balance -= amount
                    print(f"取款 {amount},余额从 {old_balance} 变为 {self.balance}")
                    return True
                else:
                    print(f"取款失败:余额不足,当前余额 {self.balance}")
                    return False
        
        def get_balance(self):
            """获取余额"""
            with self.lock:
                return self.balance
    
    def account_operations(account, operations):
        """账户操作函数"""
        for operation, amount in operations:
            if operation == 'deposit':
                account.deposit(amount)
            elif operation == 'withdraw':
                account.withdraw(amount)
            time.sleep(0.01)
    # 创建银行账户
    account = BankAccount(1000)
    
    # 定义操作序列
    operations1 = [('deposit', 100), ('withdraw', 50), ('deposit', 200)]
    operations2 = [('withdraw', 150), ('deposit', 300), ('withdraw', 100)]
    
    # 创建线程执行操作
    thread1 = threading.Thread(target=account_operations, args=(account, operations1))
    thread2 = threading.Thread(target=account_operations, args=(account, operations2))
    
    # 启动线程
    thread1.start()
    thread2.start()
    
    # 等待线程完成
    thread1.join()
    thread2.join()
    
    print(f"最终余额: {account.get_balance()}")

lock_mechanisms()

2. 条件变量和信号量

def synchronization_primitives():
    """同步原语示例"""
    
    class ProducerConsumer:
        """生产者消费者模式"""
        
        def __init__(self, max_size=5):
            self.queue = []
            self.max_size = max_size
            self.lock = threading.Lock()
            self.not_empty = threading.Condition(self.lock)
            self.not_full = threading.Condition(self.lock)
        
        def produce(self, item):
            """生产物品"""
            with self.not_full:
                while len(self.queue) >= self.max_size:
                    print("队列已满,等待消费...")
                    self.not_full.wait()
                
                self.queue.append(item)
                print(f"生产物品: {item},队列长度: {len(self.queue)}")
                self.not_empty.notify()
        
        def consume(self):
            """消费物品"""
            with self.not_empty:
                while len(self.queue) == 0:
                    print("队列为空,等待生产...")
                    self.not_empty.wait()
                
                item = self.queue.pop(0)
                print(f"消费物品: {item},队列长度: {len(self.queue)}")
                self.not_full.notify()
                return item
    def producer(pc, items):
        """生产者线程"""
        for item in items:
            pc.produce(item)
            time.sleep(0.1)
        print("生产者完成")
    
    def consumer(pc, count):
        """消费者线程"""
        for _ in range(count):
            item = pc.consume()
            time.sleep(0.15)
        print("消费者完成")
    
    # 创建生产者消费者系统
    pc = ProducerConsumer(max_size=3)
    
    # 创建线程
    producer_thread = threading.Thread(target=producer, args=(pc, ['A', 'B', 'C', 'D', 'E']))
    consumer_thread = threading.Thread(target=consumer, args=(pc, 5))
    
    # 启动线程
    producer_thread.start()
    consumer_thread.start()
    
    # 等待完成
    producer_thread.join()
    consumer_thread.join()
    
    print("生产者消费者模式演示完成")

synchronization_primitives()

队列通信

1. 线程安全队列

def queue_communication():
    """队列通信示例"""
    
    import queue
    
    def producer_worker(q, name, items):
        """生产者工作函数"""
        for item in items:
            q.put(item)
            print(f"生产者 {name} 生产: {item}")
            time.sleep(0.1)
        q.put(None)  # 结束信号
        print(f"生产者 {name} 完成")
    
    def consumer_worker(q, name):
        """消费者工作函数"""
        while True:
            item = q.get()
            if item is None:
                break
            print(f"消费者 {name} 消费: {item}")
            time.sleep(0.15)
            q.task_done()
        print(f"消费者 {name} 完成")
    
    # 创建队列
    q = queue.Queue(maxsize=3)
    
    # 创建生产者线程
    producer1 = threading.Thread(target=producer_worker, args=(q, "P1", ['A', 'B', 'C']))
    producer2 = threading.Thread(target=producer_worker, args=(q, "P2", ['D', 'E', 'F']))
    
    # 创建消费者线程
    consumer1 = threading.Thread(target=consumer_worker, args=(q, "C1"))
    consumer2 = threading.Thread(target=consumer_worker, args=(q, "C2"))
    
    # 启动线程
    producer1.start()
    producer2.start()
    consumer1.start()
    consumer2.start()
    
    # 等待生产者完成
    producer1.join()
    producer2.join()
    
    # 发送结束信号
    q.put(None)
    q.put(None)
    
    # 等待消费者完成
    consumer1.join()
    consumer2.join()
    
    print("队列通信演示完成")

queue_communication()

2. 优先级队列

def priority_queue_example():
    """优先级队列示例"""
    
    import queue
    import heapq
    
    class PriorityQueue:
        """优先级队列"""
        
        def __init__(self):
            self._queue = []
            self._index = 0
            self._lock = threading.Lock()
        
        def put(self, item, priority):
            """添加项目"""
            with self._lock:
                heapq.heappush(self._queue, (priority, self._index, item))
                self._index += 1
        
        def get(self):
            """获取项目"""
            with self._lock:
                if self._queue:
                    return heapq.heappop(self._queue)[-1]
                return None
        def empty(self):
            """检查是否为空"""
            with self._lock:
                return len(self._queue) == 0
    def task_processor(pq, name):
        """任务处理器"""
        while True:
            if pq.empty():
                time.sleep(0.1)
                continue
            task = pq.get()
            if task is None:
                break
            print(f"处理器 {name} 处理任务: {task}")
            time.sleep(0.2)
        
        print(f"处理器 {name} 完成")
    
    # 创建优先级队列
    pq = PriorityQueue()
    
    # 添加任务(数字越小优先级越高)
    tasks = [
        ("高优先级任务1", 1),
        ("低优先级任务1", 5),
        ("中优先级任务1", 3),
        ("高优先级任务2", 1),
        ("低优先级任务2", 5),
        ("中优先级任务2", 3)
    ]
    
    for task, priority in tasks:
        pq.put(task, priority)
    
    # 创建处理器线程
    processor1 = threading.Thread(target=task_processor, args=(pq, "P1"))
    processor2 = threading.Thread(target=task_processor, args=(pq, "P2"))
    
    # 启动处理器
    processor1.start()
    processor2.start()
    
    # 等待所有任务完成
    while not pq.empty():
        time.sleep(0.1)
    
    # 停止处理器
    pq.put(None)
    pq.put(None)
    
    processor1.join()
    processor2.join()
    
    print("优先级队列演示完成")

priority_queue_example()

实际应用案例

1. 线程池实现

def thread_pool_example():
    """线程池示例"""
    
    class ThreadPool:
        """简单线程池实现"""
        
        def __init__(self, num_threads):
            self.num_threads = num_threads
            self.task_queue = queue.Queue()
            self.result_queue = queue.Queue()
            self.threads = []
            self.shutdown = False
            
            # 启动工作线程
            for i in range(num_threads):
                thread = threading.Thread(target=self._worker, args=(i,))
                thread.daemon = True
                thread.start()
                self.threads.append(thread)
        
        def _worker(self, worker_id):
            """工作线程函数"""
            while not self.shutdown:
                try:
                    task = self.task_queue.get(timeout=1)
                    if task is None:
                        break
                    
                    func, args, kwargs = task
                    result = func(*args, **kwargs)
                    self.result_queue.put(result)
                    self.task_queue.task_done()
                    
                except queue.Empty:
                    continue
                except Exception as e:
                    print(f"工作线程 {worker_id} 出错: {e}")
        
        def submit(self, func, *args, **kwargs):
            """提交任务"""
            self.task_queue.put((func, args, kwargs))
        
        def get_result(self):
            """获取结果"""
            return self.result_queue.get()
        
        def shutdown_pool(self):
            """关闭线程池"""
            self.shutdown = True
            for _ in range(self.num_threads):
                self.task_queue.put(None)
            
            for thread in self.threads:
                thread.join()
    def worker_task(task_id, duration):
        """工作任务"""
        print(f"任务 {task_id} 开始执行")
        time.sleep(duration)
        result = f"任务 {task_id} 完成,耗时 {duration} 秒"
        print(result)
        return result
    # 创建线程池
    pool = ThreadPool(3)
    
    # 提交任务
    for i in range(5):
        pool.submit(worker_task, i, random.uniform(0.5, 2.0))
    
    # 获取结果
    results = []
    for _ in range(5):
        result = pool.get_result()
        results.append(result)
    
    # 关闭线程池
    pool.shutdown_pool()
    
    print("线程池演示完成")
    print("结果:", results)

thread_pool_example()

2. 并发下载器

def concurrent_downloader():
    """并发下载器示例"""
    
    class Downloader:
        """下载器类"""
        
        def __init__(self, max_workers=3):
            self.max_workers = max_workers
            self.download_queue = queue.Queue()
            self.results = []
            self.lock = threading.Lock()
        
        def download_file(self, url, filename):
            """下载文件(模拟)"""
            print(f"开始下载: {url}")
            time.sleep(random.uniform(1, 3))  # 模拟下载时间
            print(f"下载完成: {filename}")
            return {"url": url, "filename": filename, "status": "success"}
        
        def worker(self, worker_id):
            """工作线程"""
            while True:
                try:
                    task = self.download_queue.get(timeout=1)
                    if task is None:
                        break
                    
                    url, filename = task
                    result = self.download_file(url, filename)
                    
                    with self.lock:
                        self.results.append(result)
                    
                    self.download_queue.task_done()
                    
                except queue.Empty:
                    continue
                except Exception as e:
                    print(f"下载线程 {worker_id} 出错: {e}")
        
        def download_files(self, file_list):
            """下载文件列表"""
            # 创建下载线程
            threads = []
            for i in range(self.max_workers):
                thread = threading.Thread(target=self.worker, args=(i,))
                thread.daemon = True
                thread.start()
                threads.append(thread)
            
            # 添加下载任务
            for url, filename in file_list:
                self.download_queue.put((url, filename))
            
            # 等待所有任务完成
            self.download_queue.join()
            
            # 停止工作线程
            for _ in range(self.max_workers):
                self.download_queue.put(None)
            
            for thread in threads:
                thread.join()
            
            return self.results
    # 创建下载器
    downloader = Downloader(max_workers=3)
    
    # 定义下载列表
    files_to_download = [
        ("http://example.com/file1.zip", "file1.zip"),
        ("http://example.com/file2.zip", "file2.zip"),
        ("http://example.com/file3.zip", "file3.zip"),
        ("http://example.com/file4.zip", "file4.zip"),
        ("http://example.com/file5.zip", "file5.zip")
    ]
    
    # 开始下载
    print("开始并发下载...")
    results = downloader.download_files(files_to_download)
    
    print("下载完成!")
    for result in results:
        print(f"文件: {result['filename']}, 状态: {result['status']}")

concurrent_downloader()

总结

掌握Python多线程编程是提高程序性能的关键:

  1. 线程基础:理解线程的创建、启动和管理
  2. 线程同步:掌握锁、条件变量等同步机制
  3. 队列通信:使用队列进行线程间通信
  4. 线程池:实现高效的线程池管理
  5. 实际应用:在下载器、任务处理等场景中应用
  6. 最佳实践:遵循多线程编程的最佳实践

通过系统学习这些概念,你将能够编写出高效、安全的多线程程序,提高应用的并发性能。


转载请注明:周志洋的博客 » Python实用技巧-多线程基础

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦