1. 代码的“等待焦虑”:单线程的痛点 🕒
内容详解:
普通代码是“单线程”的——像“一个人干所有活”:如果有个任务要等 30 秒(比如查数据库、下载文件),整个程序会“卡着等”,啥也干不了。
比如 Web 应用里,用户请求一个“生成大数据报表”的接口,报表要算 30 秒——这期间其他用户的请求都会被卡住,体验超差!
类比:你做饭时“先等水烧开(3 分钟),再切菜”——单线程是“站在锅边干等 3 分钟”;而 线程 是“烧水的同时切菜”,并行做事,不浪费时间!
2. 线程入门:让代码“分身干活”🧵
关键字:线程、threading 模块、并行执行
内容详解:
Python 用 threading 模块创建线程——让一个程序同时跑多个“子任务”。比如“一边下载文件,一边显示进度”:
import threading
import time
# 任务 1:模拟“耗时工作”(比如查数据库)def long_task(task_name):
print(f"{task_name} 开始执行,需要 3 秒...")
time.sleep(3) # 模拟等待 3 秒
print(f"{task_name} 执行完成!")
# 单线程方式:卡着等
print("=== 单线程模式 ===")
long_task("任务 A")
long_task("任务 B") # 等任务 A 完成才会开始,总共花 6 秒
# 多线程方式:并行执行
print("\n=== 多线程模式 ===")
# 创建线程对象,指定要执行的函数
thread_a = threading.Thread(target=long_task, args=("任务 A",))
thread_b = threading.Thread(target=long_task, args=("任务 B",))
# 启动线程
thread_a.start()
thread_b.start()
# 等待所有线程完成(可选)thread_a.join()
thread_b.join() # 两个任务并行,总共花 3 秒
多线程下,任务 A 和 B 同时跑,原本 6 秒的工作 3 秒就做完了——这就是线程的价值!
3. 实战:Flask 里用线程处理“耗时请求”🕸️
关键字:Flask 线程、后台任务、用户体验
内容详解:
Web 应用中,耗时任务(比如生成报表)如果用单线程,用户会看到“加载中”卡 30 秒。用线程把任务丢到“后台”,让用户立刻得到响应:
from flask import Flask, jsonify
import threading
import time
app = Flask(__name__)
# 耗时的后台任务
def generate_report(user_id):
print(f"开始给用户 {user_id} 生成报表...")
time.sleep(10) # 模拟 10 秒的耗时工作
print(f"用户 {user_id} 的报表生成完成!")
# 这里可以把报表存到数据库 / 发邮件
# 前端请求接口:启动后台线程,立刻返回响应
@app.route("/generate_report/<int:user_id>")
def start_report(user_id):
# 创建线程,启动后台任务
thread = threading.Thread(
target=generate_report,
args=(user_id,),
daemon=True # 设为守护线程:主程序退出时自动结束
)
thread.start()
# 立刻给用户返回“任务已启动”return jsonify({"msg": f"报表生成任务已启动,请稍后查看!"})
if __name__ == "__main__":
app.run(debug=True)
用户访问/generate_report/123,会立刻收到“任务已启动”的响应,不用干等 10 秒——同时后台线程在悄悄生成报表,体验直接拉满!
4. 线程的“雷区”:共享数据要加“锁”🔒
关键字:线程安全、锁(Lock)、数据竞争
内容详解:
多个线程如果 同时改同一个变量,会出现“数据竞争”(结果混乱)。比如两个线程同时给计数器 +1:
# 错误示例:无锁的共享数据
counter = 0
def add_counter():
global counter
for _ in range(100000):
counter += 1 # 这行代码不是“原子操作”,多线程会出错
# 启动两个线程改计数器
thread1 = threading.Thread(target=add_counter)
thread2 = threading.Thread(target=add_counter)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("最终计数器值:", counter) # 预期 200000,但实际可能是 150000 左右
解决办法是用 threading.Lock() 加锁——同一时间只允许一个线程改共享数据:
# 正确示例:加锁保护共享数据
counter = 0
lock = threading.Lock() # 创建锁对象
def add_counter_safe():
global counter
for _ in range(100000):
lock.acquire() # 加锁:其他线程会等
counter += 1
lock.release() # 解锁:其他线程可以进来
# 再试一次,结果正确
thread1 = threading.Thread(target=add_counter_safe)
thread2 = threading.Thread(target=add_counter_safe)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("最终计数器值:", counter) # 输出 200000,正确!
这就是 线程安全——用锁保证共享数据的操作“原子性”(要么做完,要么没做,不会中间被打断)。
5. 进阶:Flask+ 线程的“优雅玩法”🚀
关键字:Flask 扩展、Celery、轻量级替代
内容详解:
简单线程适合“轻量级后台任务”,但复杂场景(比如任务队列、失败重试)需要更专业的工具:
- 🥕 Celery:Python 的分布式任务队列——把耗时任务丢到队列里,由专门的“worker 线程”处理,支持重试、定时任务等;
- ✨ 轻量级替代 :如果不想搞复杂的 Celery,可以用
Flask-APScheduler做定时任务,或threading.Timer做延迟任务。
比如用 threading.Timer 做“5 秒后给用户发通知”:
def send_notification(user_id):
print(f"给用户 {user_id} 发送通知:报表已生成!")
@app.route("/schedule_notify/<int:user_id>")
def schedule_notify(user_id):
# 5 秒后执行 send_notification
timer = threading.Timer(5, send_notification, args=(user_id,))
timer.start()
return "5 秒后会给你发通知!"
这比手动管理线程更灵活~
6. 划重点:线程的“适用 & 不适用”🎯
关键字:IO 密集型任务、CPU 密集型任务、GIL 锁
内容详解:
Python 的线程受GIL 锁(全局解释器锁)限制:
- ✅ 适用场景:IO 密集型任务(比如查数据库、下载文件、等待网络响应)——线程会在“等待 IO”时让出 CPU,让其他线程干活,能提升效率;
- ❌ 不适用场景:CPU 密集型任务(比如复杂计算)——GIL 锁会让多线程变成“伪并行”,效率不如多进程。
所以 Web 应用里的“等待数据库 / 接口”用线程很合适,而“大量数学计算”建议用多进程~
要不要我帮你整理一份Python 线程的核心语法 &Flask 实战代码清单?