Linux多线程编程详解

次浏览

概述

多线程编程是现代软件开发中不可或缺的技能。Linux 提供了 POSIX 线程库(pthread)来实现多线程编程,本文将深入讲解其核心概念和使用方法。


一、线程基础概念

1.1 什么是线程?

线程是进程中的执行单元,是 CPU 调度的基本单位。一个进程可以包含多个线程,它们共享进程的资源(内存、文件描述符等),但拥有独立的执行栈和寄存器状态。

特性 进程 线程
地址空间 独立 共享
创建开销
通信方式 IPC 共享内存
切换开销
安全性 高(隔离) 低(需同步)

1.2 为什么使用多线程?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 单线程处理多个任务
void process_tasks() {
    task1();  // 阻塞等待
    task2();  // 必须等 task1 完成
    task3();  // 必须等 task2 完成
}

// 多线程并行处理
void process_tasks_concurrent() {
    std::thread t1(task1);
    std::thread t2(task2);
    std::thread t3(task3);
    t1.join(); t2.join(); t3.join();
}

多线程的优势:

  • 并行计算:充分利用多核 CPU
  • 响应性:后台任务不阻塞主线程
  • 资源共享:线程间通信简单高效
  • 轻量级:创建和切换开销小

二、POSIX 线程库(pthread)

2.1 线程创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

// 线程函数必须返回 void*,参数为 void*
void* thread_function(void* arg) {
    int* num = (int*)arg;
    printf("Thread running, arg = %d\n", *num);
    return NULL;
}

int main() {
    pthread_t thread_id;
    int arg = 42;
    
    // 创建线程
    int ret = pthread_create(&thread_id, NULL, thread_function, &arg);
    if (ret != 0) {
        perror("pthread_create failed");
        exit(1);
    }
    
    // 等待线程结束
    pthread_join(thread_id, NULL);
    
    printf("Thread finished\n");
    return 0;
}

编译命令:

1
gcc -o thread_test thread_test.c -lpthread

2.2 pthread_create 参数详解

1
2
3
4
5
6
int pthread_create(
    pthread_t *thread,              // 输出:线程ID
    const pthread_attr_t *attr,     // 线程属性(NULL为默认)
    void *(*start_routine)(void *), // 线程函数
    void *arg                        // 传递给线程函数的参数
);

2.3 线程退出与等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <pthread.h>
#include <stdio.h>

void* thread_func(void* arg) {
    printf("Thread is working...\n");
    
    // 线程退出方式1:return
    // return (void*)123;
    
    // 线程退出方式2:pthread_exit
    pthread_exit((void*)456);
}

int main() {
    pthread_t tid;
    pthread_create(&tid, NULL, thread_func, NULL);
    
    void* retval;
    pthread_join(tid, &retval);  // 等待并获取返回值
    printf("Thread returned: %ld\n", (long)retval);
    
    return 0;
}

2.4 线程分离

分离的线程结束后自动释放资源,不需要 join:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <pthread.h>
#include <unistd.h>

void* detached_thread(void* arg) {
    printf("Detached thread running...\n");
    sleep(1);
    printf("Detached thread exiting\n");
    return NULL;
}

int main() {
    pthread_t tid;
    pthread_create(&tid, NULL, detached_thread, NULL);
    
    // 分离线程
    pthread_detach(tid);
    
    // pthread_join(tid, NULL);  // 错误!分离的线程不能join
    
    sleep(2);  // 等待线程结束
    printf("Main thread exiting\n");
    return 0;
}

三、线程同步

多线程访问共享资源时必须同步,否则会产生数据竞争(Data Race)。

3.1 问题示例:数据竞争

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <pthread.h>
#include <stdio.h>

int counter = 0;

void* increment(void* arg) {
    for (int i = 0; i < 100000; i++) {
        counter++;  // 非原子操作!
    }
    return NULL;
}

int main() {
    pthread_t t1, t2;
    
    pthread_create(&t1, NULL, increment, NULL);
    pthread_create(&t2, NULL, increment, NULL);
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    
    // 预期:200000,实际:小于200000的随机值
    printf("Counter = %d\n", counter);
    return 0;
}

3.2 互斥锁(Mutex)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <pthread.h>
#include <stdio.h>

int counter = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* increment(void* arg) {
    for (int i = 0; i < 100000; i++) {
        pthread_mutex_lock(&mutex);
        counter++;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main() {
    pthread_t t1, t2;
    
    pthread_create(&t1, NULL, increment, NULL);
    pthread_create(&t2, NULL, increment, NULL);
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    
    printf("Counter = %d\n", counter);  // 正确:200000
    
    pthread_mutex_destroy(&mutex);
    return 0;
}

3.3 互斥锁的动态创建与销毁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <pthread.h>
#include <stdlib.h>

typedef struct {
    int data;
    pthread_mutex_t mutex;
} SharedData;

SharedData* create_shared_data() {
    SharedData* sd = malloc(sizeof(SharedData));
    sd->data = 0;
    pthread_mutex_init(&sd->mutex, NULL);
    return sd;
}

void destroy_shared_data(SharedData* sd) {
    pthread_mutex_destroy(&sd->mutex);
    free(sd);
}

void safe_increment(SharedData* sd) {
    pthread_mutex_lock(&sd->mutex);
    sd->data++;
    pthread_mutex_unlock(&sd->mutex);
}

3.4 避免死锁

死锁产生的四个必要条件(必须同时满足):

  1. 互斥条件
  2. 持有并等待
  3. 不可抢占
  4. 循环等待
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 死锁示例
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER;

void* thread_a(void* arg) {
    pthread_mutex_lock(&mutex1);
    sleep(1);  // 模拟工作
    pthread_mutex_lock(&mutex2);  // 死锁!
    // ...
    pthread_mutex_unlock(&mutex2);
    pthread_mutex_unlock(&mutex1);
    return NULL;
}

void* thread_b(void* arg) {
    pthread_mutex_lock(&mutex2);
    sleep(1);
    pthread_mutex_lock(&mutex1);  // 死锁!
    // ...
    pthread_mutex_unlock(&mutex1);
    pthread_mutex_unlock(&mutex2);
    return NULL;
}

解决方案:按固定顺序加锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 正确做法:始终按相同顺序获取锁
void* thread_a_fixed(void* arg) {
    pthread_mutex_lock(&mutex1);  // 先锁 mutex1
    pthread_mutex_lock(&mutex2);  // 再锁 mutex2
    // ...
    pthread_mutex_unlock(&mutex2);
    pthread_mutex_unlock(&mutex1);
    return NULL;
}

void* thread_b_fixed(void* arg) {
    pthread_mutex_lock(&mutex1);  // 同样先锁 mutex1
    pthread_mutex_lock(&mutex2);  // 再锁 mutex2
    // ...
    pthread_mutex_unlock(&mutex2);
    pthread_mutex_unlock(&mutex1);
    return NULL;
}

四、条件变量(Condition Variable)

条件变量用于线程间的等待/通知机制,解决"等待某个条件成立"的问题。

4.1 生产者-消费者模型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define BUFFER_SIZE 10

typedef struct {
    int buffer[BUFFER_SIZE];
    int count;
    int in;
    int out;
    pthread_mutex_t mutex;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

void buffer_init(CircularBuffer* cb) {
    cb->count = 0;
    cb->in = 0;
    cb->out = 0;
    pthread_mutex_init(&cb->mutex, NULL);
    pthread_cond_init(&cb->not_full, NULL);
    pthread_cond_init(&cb->not_empty, NULL);
}

void buffer_put(CircularBuffer* cb, int item) {
    pthread_mutex_lock(&cb->mutex);
    
    // 等待缓冲区不满
    while (cb->count == BUFFER_SIZE) {
        pthread_cond_wait(&cb->not_full, &cb->mutex);
    }
    
    cb->buffer[cb->in] = item;
    cb->in = (cb->in + 1) % BUFFER_SIZE;
    cb->count++;
    
    // 通知消费者
    pthread_cond_signal(&cb->not_empty);
    pthread_mutex_unlock(&cb->mutex);
}

int buffer_get(CircularBuffer* cb) {
    pthread_mutex_lock(&cb->mutex);
    
    // 等待缓冲区不空
    while (cb->count == 0) {
        pthread_cond_wait(&cb->not_empty, &cb->mutex);
    }
    
    int item = cb->buffer[cb->out];
    cb->out = (cb->out + 1) % BUFFER_SIZE;
    cb->count--;
    
    // 通知生产者
    pthread_cond_signal(&cb->not_full);
    pthread_mutex_unlock(&cb->mutex);
    
    return item;
}

// 生产者线程
void* producer(void* arg) {
    CircularBuffer* cb = (CircularBuffer*)arg;
    for (int i = 0; i < 100; i++) {
        buffer_put(cb, i);
        printf("Produced: %d\n", i);
    }
    return NULL;
}

// 消费者线程
void* consumer(void* arg) {
    CircularBuffer* cb = (CircularBuffer*)arg;
    for (int i = 0; i < 100; i++) {
        int item = buffer_get(cb);
        printf("Consumed: %d\n", item);
    }
    return NULL;
}

int main() {
    CircularBuffer cb;
    buffer_init(&cb);
    
    pthread_t prod, cons;
    pthread_create(&prod, NULL, producer, &cb);
    pthread_create(&cons, NULL, consumer, &cb);
    
    pthread_join(prod, NULL);
    pthread_join(cons, NULL);
    
    pthread_mutex_destroy(&cb.mutex);
    pthread_cond_destroy(&cb.not_full);
    pthread_cond_destroy(&cb.not_empty);
    
    return 0;
}

4.2 条件变量使用要点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 正确使用模式
pthread_mutex_lock(&mutex);
while (condition_is_false) {  // 用 while,不用 if!
    pthread_cond_wait(&cond, &mutex);
}
// 条件满足,执行操作
pthread_mutex_unlock(&mutex);

// 通知条件变化
pthread_mutex_lock(&mutex);
// 改变条件
pthread_cond_signal(&cond);  // 或 pthread_cond_broadcast(&cond)
pthread_mutex_unlock(&mutex);

⚠️ 重要:必须用 while 循环检查条件,因为可能存在虚假唤醒


五、读写锁

读写锁允许多个读者同时读,但写者独占。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <pthread.h>
#include <stdio.h>

int shared_data = 0;
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;

void* reader(void* arg) {
    for (int i = 0; i < 5; i++) {
        pthread_rwlock_rdlock(&rwlock);  // 读锁
        printf("Reader %ld: data = %d\n", (long)arg, shared_data);
        pthread_rwlock_unlock(&rwlock);
    }
    return NULL;
}

void* writer(void* arg) {
    for (int i = 0; i < 5; i++) {
        pthread_rwlock_wrlock(&rwlock);  // 写锁
        shared_data++;
        printf("Writer: data = %d\n", shared_data);
        pthread_rwlock_unlock(&rwlock);
    }
    return NULL;
}

int main() {
    pthread_t readers[3], writer_thread;
    
    for (long i = 0; i < 3; i++) {
        pthread_create(&readers[i], NULL, reader, (void*)i);
    }
    pthread_create(&writer_thread, NULL, writer, NULL);
    
    for (int i = 0; i < 3; i++) {
        pthread_join(readers[i], NULL);
    }
    pthread_join(writer_thread, NULL);
    
    return 0;
}

六、线程池实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>

typedef struct Task {
    void (*function)(void*);
    void* arg;
    struct Task* next;
} Task;

typedef struct {
    Task* head;
    Task* tail;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t has_task;
    bool shutdown;
} TaskQueue;

typedef struct {
    pthread_t* threads;
    int thread_count;
    TaskQueue queue;
} ThreadPool;

// 任务队列操作
void task_queue_init(TaskQueue* queue) {
    queue->head = queue->tail = NULL;
    queue->count = 0;
    queue->shutdown = false;
    pthread_mutex_init(&queue->lock, NULL);
    pthread_cond_init(&queue->has_task, NULL);
}

void task_queue_push(TaskQueue* queue, void (*function)(void*), void* arg) {
    Task* task = malloc(sizeof(Task));
    task->function = function;
    task->arg = arg;
    task->next = NULL;
    
    pthread_mutex_lock(&queue->lock);
    
    if (queue->tail == NULL) {
        queue->head = queue->tail = task;
    } else {
        queue->tail->next = task;
        queue->tail = task;
    }
    queue->count++;
    
    pthread_cond_signal(&queue->has_task);
    pthread_mutex_unlock(&queue->lock);
}

Task* task_queue_pop(TaskQueue* queue) {
    pthread_mutex_lock(&queue->lock);
    
    while (queue->count == 0 && !queue->shutdown) {
        pthread_cond_wait(&queue->has_task, &queue->lock);
    }
    
    if (queue->shutdown && queue->count == 0) {
        pthread_mutex_unlock(&queue->lock);
        return NULL;
    }
    
    Task* task = queue->head;
    queue->head = queue->head->next;
    if (queue->head == NULL) {
        queue->tail = NULL;
    }
    queue->count--;
    
    pthread_mutex_unlock(&queue->lock);
    return task;
}

// 工作线程函数
void* worker(void* arg) {
    ThreadPool* pool = (ThreadPool*)arg;
    
    while (true) {
        Task* task = task_queue_pop(&pool->queue);
        if (task == NULL) break;
        
        task->function(task->arg);
        free(task);
    }
    
    return NULL;
}

// 线程池创建
ThreadPool* thread_pool_create(int thread_count) {
    ThreadPool* pool = malloc(sizeof(ThreadPool));
    pool->thread_count = thread_count;
    pool->threads = malloc(sizeof(pthread_t) * thread_count);
    
    task_queue_init(&pool->queue);
    
    for (int i = 0; i < thread_count; i++) {
        pthread_create(&pool->threads[i], NULL, worker, pool);
    }
    
    return pool;
}

// 提交任务
void thread_pool_submit(ThreadPool* pool, void (*function)(void*), void* arg) {
    task_queue_push(&pool->queue, function, arg);
}

// 销毁线程池
void thread_pool_destroy(ThreadPool* pool) {
    pthread_mutex_lock(&pool->queue.lock);
    pool->queue.shutdown = true;
    pthread_cond_broadcast(&pool->queue.has_task);
    pthread_mutex_unlock(&pool->queue.lock);
    
    for (int i = 0; i < pool->thread_count; i++) {
        pthread_join(pool->threads[i], NULL);
    }
    
    pthread_mutex_destroy(&pool->queue.lock);
    pthread_cond_destroy(&pool->queue.has_task);
    
    free(pool->threads);
    free(pool);
}

// 测试
void print_task(void* arg) {
    int* num = (int*)arg;
    printf("Task %d executed by thread %lu\n", *num, pthread_self());
    free(num);
}

int main() {
    ThreadPool* pool = thread_pool_create(4);
    
    for (int i = 0; i < 20; i++) {
        int* num = malloc(sizeof(int));
        *num = i;
        thread_pool_submit(pool, print_task, num);
    }
    
    sleep(2);  // 等待任务完成
    thread_pool_destroy(pool);
    
    return 0;
}

七、C++11 线程库

C++11 提供了更现代的多线程支持:

7.1 std::thread

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#include <thread>
#include <iostream>

void thread_func(int id) {
    std::cout << "Thread " << id << " running\n";
}

int main() {
    std::thread t1(thread_func, 1);
    std::thread t2([](int id) {
        std::cout << "Lambda thread " << id << "\n";
    }, 2);
    
    t1.join();
    t2.join();
    
    return 0;
}

7.2 std::mutex 和 std::lock_guard

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <mutex>
#include <thread>
#include <iostream>

int counter = 0;
std::mutex mtx;

void increment() {
    for (int i = 0; i < 100000; i++) {
        std::lock_guard<std::mutex> lock(mtx);  // RAII 风格
        counter++;
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    
    t1.join();
    t2.join();
    
    std::cout << "Counter = " << counter << "\n";  // 正确:200000
    return 0;
}

7.3 std::condition_variable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> dataQueue;
bool finished = false;

void producer() {
    for (int i = 0; i < 10; i++) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            dataQueue.push(i);
        }
        cv.notify_one();
    }
    {
        std::lock_guard<std::mutex> lock(mtx);
        finished = true;
    }
    cv.notify_one();
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return !dataQueue.empty() || finished; });
        
        while (!dataQueue.empty()) {
            int data = dataQueue.front();
            dataQueue.pop();
            std::cout << "Consumed: " << data << "\n";
        }
        
        if (finished) break;
    }
}

int main() {
    std::thread p(producer);
    std::thread c(consumer);
    
    p.join();
    c.join();
    
    return 0;
}

7.4 std::atomic

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <atomic>
#include <thread>
#include <iostream>

std::atomic<int> counter(0);

void increment() {
    for (int i = 0; i < 100000; i++) {
        counter++;  // 原子操作,无需锁
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    
    t1.join();
    t2.join();
    
    std::cout << "Counter = " << counter << "\n";  // 正确:200000
    return 0;
}

八、最佳实践

8.1 线程数量选择

1
2
3
4
5
6
7
8
#include <thread>

// 获取硬件支持的并发线程数
unsigned int hardware_threads = std::thread::hardware_concurrency();

// 一般原则:
// CPU密集型任务:线程数 = CPU核心数
// I/O密集型任务:线程数 = CPU核心数 * (1 + 等待时间/计算时间)

8.2 避免常见陷阱

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// ❌ 错误:传递局部变量的引用
void wrong_example() {
    int local = 42;
    std::thread t([&local]() {
        sleep(1);
        std::cout << local << "\n";  // 悬空引用!
    });
    t.detach();  // local 已被销毁
}

// ✅ 正确:值捕获
void right_example() {
    int local = 42;
    std::thread t([local]() {  // 值捕获
        sleep(1);
        std::cout << local << "\n";
    });
    t.detach();
}

// ✅ 正确:join 等待
void right_example2() {
    int local = 42;
    std::thread t([&local]() {
        std::cout << local << "\n";
    });
    t.join();  // 等待线程结束
}

8.3 使用 RAII 管理线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class ThreadGuard {
    std::thread& t;
public:
    explicit ThreadGuard(std::thread& t_) : t(t_) {}
    ~ThreadGuard() {
        if (t.joinable()) {
            t.join();  // 或 t.detach()
        }
    }
    ThreadGuard(const ThreadGuard&) = delete;
    ThreadGuard& operator=(const ThreadGuard&) = delete;
};

void safe_function() {
    std::thread t([](){ /* work */ });
    ThreadGuard guard(t);  // 异常安全
}

总结

概念 用途
pthread_create 创建线程
pthread_join 等待线程结束
pthread_detach 分离线程
pthread_mutex_t 互斥锁保护共享资源
pthread_cond_t 条件变量实现等待/通知
pthread_rwlock_t 读写锁优化读多写少场景

关键要点

  1. 线程共享进程资源,必须正确同步
  2. 使用 while 而非 if 检查条件变量
  3. 注意死锁,按固定顺序加锁
  4. 优先使用 C++11 的 RAII 风格同步原语
  5. 避免传递局部变量的引用给线程

参考资料

  • 《UNIX环境高级编程》
  • 《C++ Concurrency in Action》
  • Linux pthread 手册:man pthread_*
使用 Hugo 构建
主题 StackJimmy 设计