Python threading 模块常用函数与示例

Thread 类(创建与启动)

Thread 是模块的核心类,用于创建新线程。常用参数:

import threading
import time

def worker(num):
    print(f"Worker {num} start")
    time.sleep(2)
    print(f"Worker {num} end")

# 方式一:直接传入 target
t1 = threading.Thread(target=worker, args=(1,), name="T-1")
t1.start()

# 方式二:继承 Thread 并重写 run()
class MyThread(threading.Thread):
    def __init__(self, num):
        super().__init__(name=f"MyThread-{num}")
        self.num = num
    def run(self):
        worker(self.num)

t2 = MyThread(2)
t2.start()

# 等待线程结束
t1.join()
t2.join()
print("All done")
    

查询当前线程与活动线程列表

常用函数:

import threading, time

def demo():
    print("Current:", threading.current_thread().name)
    print("All threads:", [t.name for t in threading.enumerate()])
    print("Count:", threading.active_count())

t = threading.Thread(target=demo, name="DemoThread")
t.start()
t.join()
    

Lock 与 RLock(互斥)

Lock 用于保护共享资源,防止竞争条件。RLock 允许同一线程多次获取。

import threading, time

counter = 0
lock = threading.Lock()

def inc():
    global counter
    for _ in range(100000):
        with lock:          # 等价于 lock.acquire(); ...; lock.release()
            counter += 1

threads = [threading.Thread(target=inc) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print("Final counter:", counter)   # 400000
    

Condition(生产者‑消费者示例)

import threading, time
cond = threading.Condition()
queue = []

def producer():
    for i in range(5):
        time.sleep(1)
        with cond:
            queue.append(i)
            print("Produced", i)
            cond.notify()          # 通知消费者

def consumer():
    for _ in range(5):
        with cond:
            while not queue:
                cond.wait()       # 等待生产者
            item = queue.pop(0)
            print("Consumed", item)

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()
    

Event(线程间信号)

import threading, time

event = threading.Event()

def waiter():
    print("Waiting for event...")
    event.wait()               # 阻塞直到被 set
    print("Event received!")

def setter():
    time.sleep(3)
    print("Setting event")
    event.set()               # 触发

threading.Thread(target=waiter).start()
threading.Thread(target=setter).start()
    

Semaphore(限制并发数)

import threading, time, random

sem = threading.Semaphore(3)   # 同时最多 3 个线程运行

def task(i):
    with sem:
        print(f"Task {i} start")
        time.sleep(random.uniform(1,3))
        print(f"Task {i} end")

threads = [threading.Thread(target=task, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()
    

Timer(延迟执行)

import threading

def hello():
    print("Hello after 2 seconds")

t = threading.Timer(2.0, hello)   # 2 秒后调用 hello
t.start()
t.join()
    

其他常用函数

帮助快速掌握 threading 模块的核心功能与常见用法。