Python3 多线程(千字长文)

Python3 多线程入门:从零开始理解并发编程

在日常开发中,我们经常遇到需要同时处理多个任务的场景。比如一个 Web 服务要同时响应多个用户的请求,或者一个爬虫程序要并发抓取多个网页。如果用传统的单线程方式,程序必须一个一个地等待任务完成,效率非常低。这时候,Python3 多线程 就派上用场了。

想象一下,你是一家小餐馆的老板。如果只有一个厨师,那他只能同时做一道菜。客人多的时候,排队等得要命。但如果增加多个厨师,每个人负责一道菜,就能大大提升出餐速度。Python3 多线程 就像是给你的厨房配备了多个厨师,让多个任务可以“并行”处理。

不过,和真实厨房不同的是,Python3 的多线程并不是真正意义上的并行执行。由于 GIL(全局解释器锁)的存在,同一时间只有一个线程能执行 Python 字节码。但对 I/O 密集型任务(如网络请求、文件读写),多线程依然能显著提升性能,因为线程在等待 I/O 时会自动释放 GIL,让其他线程有机会运行。

线程基础:threading 模块初探

Python3 内置了 threading 模块,它是实现多线程的核心工具。我们先来看一个最简单的线程示例。

import threading
import time

def print_numbers():
    for i in range(5):
        print(f"数字: {i}")
        time.sleep(1)  # 模拟耗时操作

thread = threading.Thread(target=print_numbers)

thread.start()

print("主线程正在运行...")

thread.join()

print("所有任务完成。")

这段代码中:

  • threading.Thread(target=print_numbers) 创建了一个线程对象,指定它要运行 print_numbers 函数。
  • .start() 启动线程,让线程开始执行。
  • .join() 是关键!它会让主线程等待子线程结束,否则主线程可能在子线程还没完成时就退出了。

注意:time.sleep(1) 模拟的是 I/O 等待,如网络请求或文件读写。在这种场景下,线程会释放 GIL,其他线程可以运行。

多线程任务并行执行的实战案例

让我们用一个更贴近实际的案例来展示 Python3 多线程 的威力。假设我们要同时下载多个网页,用单线程会逐个等待,耗时较长;用多线程则可以并行下载。

import threading
import requests
import time

urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/3"
]

def download_url(url):
    print(f"开始下载: {url}")
    try:
        response = requests.get(url, timeout=5)
        print(f"下载完成: {url}, 状态码: {response.status_code}")
    except Exception as e:
        print(f"下载失败: {url}, 错误: {e}")

start_time = time.time()

threads = []

for url in urls:
    thread = threading.Thread(target=download_url, args=(url,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

end_time = time.time()
print(f"所有下载任务耗时: {end_time - start_time:.2f} 秒")

这个例子中:

  • args=(url,) 用于向线程函数传递参数,注意逗号不能少,否则会报错。
  • 我们创建了 4 个线程,每个线程负责一个 URL 的下载。
  • 使用 join() 等待所有线程结束,确保程序不会提前退出。

对比单线程版本(逐个下载,总耗时约 7 秒),多线程版本通常能在 3 秒内完成,效率提升明显。

线程同步:避免数据竞争

多线程虽然强大,但也会带来一个问题:数据竞争。当多个线程同时修改同一个变量时,结果可能不可预测。

举个例子:

import threading
import time

counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1  # 这行代码不是原子操作

t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)

t1.start()
t2.start()

t1.join()
t2.join()

print(f"最终计数器值: {counter}")

运行结果可能不是 200000,而是 190000 左右。为什么?因为 counter += 1 实际上分为三步:读取、加 1、写回。两个线程可能在“读取”阶段同时读到相同的值,导致结果丢失。

解决方法是使用锁(Lock):

import threading
import time

counter = 0
lock = threading.Lock()  # 创建一个锁

def increment():
    global counter
    for _ in range(100000):
        with lock:  # 加锁,确保同一时间只有一个线程进入
            counter += 1

t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)

t1.start()
t2.start()

t1.join()
t2.join()

print(f"加锁后最终计数器值: {counter}")  # 一定是 200000

with lock: 会自动获取锁并释放,是推荐的使用方式。锁就像一个“独占门”,一次只能让一个线程通过。

线程通信:使用 Queue 实现安全数据传递

在多线程编程中,线程之间需要通信。最安全的方式是使用 queue.Queue,它是一个线程安全的队列。

import threading
import queue
import time

task_queue = queue.Queue()

def worker():
    while True:
        try:
            # 从队列中获取任务,timeout 防止无限等待
            task = task_queue.get(timeout=1)
            print(f"工作线程处理任务: {task}")
            time.sleep(1)  # 模拟处理时间
            task_queue.task_done()  # 标记任务完成
        except queue.Empty:
            print("工作线程退出。")
            break

threads = []
for i in range(3):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for i in range(10):
    task_queue.put(f"任务-{i}")

task_queue.join()

print("所有任务处理完毕。")

for t in threads:
    t.join()

关键点:

  • task_queue.put(task) 添加任务。
  • task_queue.get() 获取任务,自动阻塞直到有任务。
  • task_queue.task_done() 告诉队列这个任务已完成。
  • task_queue.join() 会阻塞,直到所有任务都被 task_done() 标记。

这个模式常用于生产者-消费者模型,比如爬虫中,一个线程负责生成 URL,多个线程负责下载。

高级技巧:线程池与性能优化

当需要大量线程时,手动管理线程很麻烦。Python3 提供了 concurrent.futures 模块,支持线程池。

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_url(url):
    print(f"正在下载: {url}")
    time.sleep(1)
    return f"完成: {url}"

with ThreadPoolExecutor(max_workers=5) as executor:
    urls = [f"https://httpbin.org/delay/{i}" for i in range(10)]
    
    # 提交所有任务
    futures = [executor.submit(fetch_url, url) for url in urls]
    
    # 获取结果
    for future in futures:
        print(future.result())

ThreadPoolExecutor 会自动管理线程的创建和销毁,避免资源浪费。executor.submit() 提交任务,返回 Future 对象,调用 .result() 获取返回值。

相比手动创建线程,这种方式更简洁、更安全,尤其适合批量任务处理。

总结:Python3 多线程 的最佳实践

Python3 多线程 是应对 I/O 密集型任务的利器。虽然受限于 GIL,无法提升 CPU 密集型任务的性能,但在网络请求、文件操作等场景下,多线程能显著提升程序响应速度。

核心要点:

  • 使用 threading.Thread 创建线程。
  • join() 等待线程结束。
  • Lock 避免数据竞争。
  • queue.Queue 安全传递数据。
  • 大量任务时优先使用 ThreadPoolExecutor

记住:多线程不是万能药,过度使用反而会增加复杂度。建议只在 I/O 密集型任务中使用,避免在 CPU 密集型任务中滥用。

掌握这些技巧后,你就能写出高效、安全的并发程序。下一步,可以探索 asyncio 异步编程,它在某些场景下比多线程更高效。但无论选择哪种方式,理解并发的本质才是关键。