Ⅰ. 线程的概念

有些情况需要在一个进程中同时执行多个控制流程,比如实现一个图形界面的下载软件,一方面需要和用户交互,等待和处理用户的鼠标键盘事件,另一方面又需要同时下载多个文件,等待和处理从多个网络主机发来的数据,这些任务都需要一个“等待一处理”的循环,那么如何才能同时进行多项任务呢?这时候就可以使用线程。

线程(thread):是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以井发多个线程,每条线程并行执行不同的任务

由于同一进程的多个线程共享同一地址空间,因此 Text Segment(代码段)、 Data Segment(数据段)都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:

  • 文件描述符表
  • 每种信号的处理方式
  • 当前工作目录
  • 用户id和组id

但有些资源是每个线程各有一份的:

  • 线程id
  • 上下文,包括各种寄存器的值、程序计数器和栈指针
  • 栈空间
  • errno变量
  • 信号屏蔽字signal mask
  • 调度优先级

在 Linux上线程函数位于libpthread共享库中,因此在编译时要加上-lpthread选项

Ⅱ. 线程控制

1.创建线程

#include <pthread.h>

int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);

pthread_create函数会在调用它的进程中开启一个新线程,这个线程会调用start_routine所指向的函数开始执行,arg作为start_routine()函数的参数。thread是结果参数,新线程的id存储在thread指向的buffer

  • 返回值:成功返回0,失败返回错误号。(之前学习过的系统函数都是成功返回0,失败返回-1,而错误号保存在全局变量errno中,而pthread库的函数都是通过返回值返回错误号,虽然每个线程也都有一个errno,但这是为了兼容其他函数接口而提供的,pthread库本身并不使用它,通过返回值返回错误码更加清晰)

新线程将会继承创建它的线程的信号屏蔽字(signal mask),但是新线程的未决信号集(pending signal set)会是空的。也不会继承alternate signal stack

2.获取当前线程的id

#include <pthread.h>

pthread_t pthread_self(void);
  • 返回值:总是成功返回调用该函数的线程ID
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

void printid(char *);

void *thr_fun(void *arg) {
    printid(arg);
    return NULL;
}

void printid(char *tip) {
    pid_t pid = getpid();
    pthread_t tid = pthread_self();

    printf("%s pid:%u tid:%u (%p)\n", tip, pid, tid, tid);
    printf("%s thr_fun=%p\n", tip, thr_fun);
}

/**
 * @brief 打印进程id、线程id
 * 
 * @param argc 
 * @param argv 
 * @return int 
 */
int main(int argc, char const *argv[]) {
    pthread_t tid;
    int rtn = pthread_create(&tid, NULL, thr_fun, "new thread.");
    if (rtn) {
        printf("create thread error: %s\n", strerror(rtn));
        exit(-1);
    }

    sleep(1);
    printid("main thread");
    return 0;
}

3.终止线程

如果需要只终止某个线程而不终止整个进程,可以有三种方法:

  • 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。
  • 一个线程可以调用pthread_cancel终止同一进程的另一个线程。
  • 线程可以调用pthread_exit终止自己。
#include <pthread.h>

void pthread_exit(void *retval);

retvalvoid*类型,和线程函数的返回值的用法一样,其他线程可以调用pthread_join获取这个指针。

需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局 的或者用malloc分配的,不能在线程函数的栈上分配,因为当其他线程得到这个返回值时线程函数已经退出了。

#include <pthread.h>

int pthread_join(pthread_t thread, void **retval);

调用该函数的线程将被挂起,知道id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:

  • 如果thread线程通过return返回,retval所指向的单元里存放的是thread线程函数的返回值。
  • 如果thread线程被其他线程调用pthread_cancel异常终止掉,retval所指向的单元里存放的是常数PTHREAD_CANCEL,也就是-1。
  • 如果thread线程是自己调用pthread_exit终止的,retval所指向的单元存放的是传给pthread_exit的参数。
  • 如果对thread线程的终止状态不感兴趣,可以传NULLretval参数
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

/**
 * @brief 线程1函数,使用return退出
 * 
 * @param arg 
 * @return void* 
 */
void* thr_fn1(void* arg) {
    printf("thread 1 returning\n");
    return (void*)1;
}

/**
 * @brief 线程2函数,使用pthread_exit终止
 * 
 * @param arg 
 * @return void* 
 */
void* thr_fn2(void* arg) {
    printf("thread 2 exiting\n");
    pthread_exit((void*)2);
    return NULL;
}

/**
 * @brief 线程3函数,死循环,需要其他线程使用pthread_cancel终止它
 * 
 * @param arg 
 * @return void* 
 */
void* thr_fn3(void* arg) {
    while (1) {
        printf("thread 3 sleeping\n");
        sleep(1);
    }
    return NULL;
}

int main(int argc, char const* argv[]) {
    pthread_t tid;
    void* stat;
    pthread_create(&tid, NULL, thr_fn1, NULL);
    pthread_join(tid, &stat);
    printf("thread 1 exit code %ld\n", (long)stat);

    pthread_create(&tid, NULL, thr_fn2, NULL);
    pthread_join(tid, &stat);
    printf("thread 2 exit code %ld\n", (long)stat);

    pthread_create(&tid, NULL, thr_fn3, NULL);
    sleep(3);
    pthread_cancel(tid);
    pthread_join(tid, &stat);
    printf("thread 3 exit code %ld\n", (long)stat);

    return 0;
}
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

void printid(char *);

void *thr_fun(void *arg) {
    printid(arg);
    return NULL;
}

void printid(char *tip) {
    pid_t pid = getpid();
    pthread_t tid = pthread_self();

    printf("%s pid:%u tid:%u (%p)\n", tip, pid, tid, tid);
    printf("%s thr_fun=%p\n", tip, thr_fun);
}

/**
 * @brief 打印进程id、线程id
 * 
 * @param argc 
 * @param argv 
 * @return int 
 */
int main(int argc, char const *argv[]) {
    pthread_t tid;
    int rtn = pthread_create(&tid, NULL, thr_fun, "new thread.");
    if (rtn) {
        printf("create thread error: %s\n", strerror(rtn));
        exit(-1);
    }

    //sleep(1);
    pthread_join(tid, NULL);  //使用pthread_join就不需要sleep来阻塞主线程了
    printid("main thread");
    return 0;
}

Ⅲ. 线程间同步

多个线程同时访问共享数据时可能会冲突,这跟前面讲信号时所说的可重入性是同样的问题。比如两个线程都要把某个全局变量增加1,这个操作在某平台需要三条指令完成:

  1. 从内存读变量值到寄存器(mov)
  2. 寄存器的值+1(add)
  3. 将寄存器的值写会内存(mov)
Linux系统编程笔记(9)——线程-萤火
两个线程都要把某个全局变量增加1,一共加了2,结果只加了1
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>

int cnt = 0;

/**
 * @brief 将全局变量cnt加5000次1
 * 
 * @param arg 
 * @return void* 
 */
void *cntadd(void *arg) {
    int val;
    for (int i = 0; i < 5000; i++) {
        val = cnt;
        //插入一条printf让操作时间变长一点
        printf("%ld: %d\n", pthread_self(), val);
        cnt = val + 1;
    }
    return NULL;
}
/**
 * @brief 创建两个线程同时对全局变量cnt进行累加计算,观察结果
 * 
 * @param argc 
 * @param argv 
 * @return int 
 */
int main(int argc, char const *argv[]) {
    pthread_t tid_A, tid_B;

    pthread_create(&tid_A, NULL, cntadd, NULL);
    pthread_create(&tid_B, NULL, cntadd, NULL);

    pthread_join(tid_A, NULL);
    pthread_join(tid_B, NULL);

    return 0;
}

1.互斥锁Mutex

对于多线程的程序,访问冲突的问题是很普遍的,解决的办法是引入互斥锁(Mutex, MutualExclusive Lock),获得锁的线程可以完成“读一修改-写”的操作,然后释放锁给其它线程,没有获得锁的线程只能等待而不能访问共享数据,这样“读-修改一写”三步操作组成一个原子操作,要么都执行,要么都不执行,不会执行到中间被打断,也不会在其它处理器上并行做这个操作。

一把互斥锁有两种状态:非锁定的(不属于任何一个线程)和已锁定的(仅属于一个线程),互斥锁永远不能被两个线程同时拥有。如果有一个线程试图获取一把已经被锁定的互斥锁,那么这个线程将被挂起,直到拥有这把锁的另一个线程释放了这把互斥锁。

#include <pthread.h>

pthread_mutex_t fastmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t recmutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
pthread_mutex_t  errchkmutex  =   PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
  • mutexattr:用来设定互斥锁的特性,如果为NULL将使用缺省特性
  • 返回值pthread_mutex_init总是返回0;其他函数成功返回0,失败返回错误码
  • mutex:结果参数

pthread_mutex_init函数对Mutex做初始化,参数attr设定Mutex的属性,如果attrNULL则表示缺省属性。
pthread_mutex_init函数初始化的Mutex可用pthread_mutex_destroy销毁。
如果Mutex变量是静态分配的(全局变量或static变量),也可以用宏定义PTHREAD_MUTEX_INITIALIZER来初始化,相当于用pthread_mutex_init初始化并且attr参数为NULL

LinuxThreads 实现只支持一个互斥体属性,互斥体种类,它可以是“快速”(fast)、“递归”(recursive)或“错误检查”(error checking)。 互斥锁的类型决定了它是否可以被已经拥有它的线程再次锁定。 默认类型是”fast“。

pthread_mutex_lock用于锁定给定的互斥锁mutex,如果该所当前是unlocked状态,那么mutex会变为locked状态并且被调用pthread_mutex_lock函数的线程所拥有,这时pthread_mutex_lock会立即返回;如果mutex状态是locked,那么pthread_mutex_lock将挂起调用它的线程,直到拥有mutex的线程释放这个锁为止。

如果mutex已经被调用pthread_mutex_lock的这个线程锁定了,那么pthread_mutex_lock的行为将取决于互斥锁mutex的类型(fastrecursiveerror checking):

  • fast类型:调用pthread_mutex_lock的线程将被挂起直到锁被释放,因此这将会导致死锁发生。
  • error checking类型:pthread_mutex_lock立即返回错误码EDEADLK
  • recursive类型:pthread_mutex_lock成功执行并立即返回,同时会记录调用线程锁定mutex的次数,要将mutex变为unlocked状态的话,需要执行相同次数的pthread_mutex_unlock操作

pthread_mutex_trylock的行为和pthread_mutex_lock一致,区别在于当互斥锁mutex已经被另一个线程(或调用线程的fast类型)获得时,它无法阻塞调用线程,而是立即返回错误码EBUSY

pthread_mutex_unlock会释放指定的互斥锁mutex

pthread_mutex_destroy会销毁互斥对象mutex,并释放它可能持有的资源。互斥锁在进入前必须是unlocked的。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>

pthread_mutex_t add_lock = PTHREAD_MUTEX_INITIALIZER;
int cnt = 0;

/**
 * @brief 将全局变量cnt加5000次1
 * 
 * @param arg 
 * @return void* 
 */
void *cntadd(void *arg) {
    int val;
    for (int i = 0; i < 5000; i++) {
        pthread_mutex_lock(&add_lock);  //加锁
        val = cnt;
        //插入一条printf让操作时间变长一点
        printf("%ld: %d\n", pthread_self(), val);
        cnt = val + 1;
        pthread_mutex_unlock(&add_lock);  //解锁
    }
    return NULL;
}
/**
 * @brief 创建两个线程同时对全局变量cnt进行累加计算,观察结果
 * 
 * @param argc 
 * @param argv 
 * @return int 
 */
int main(int argc, char const *argv[]) {
    pthread_t tid_A, tid_B;

    pthread_create(&tid_A, NULL, cntadd, NULL);
    pthread_create(&tid_B, NULL, cntadd, NULL);

    pthread_join(tid_A, NULL);
    pthread_join(tid_B, NULL);

    pthread_mutex_destroy(&add_lock);  //销毁锁

    return 0;
}

“挂起等待”和“唤醒等待线程”的操作如何实现?

每个 Mutex有一个等待队列,一个线程要在 Mutex上挂起等待,首先在把自己加入等待队列中,然后置线程状态为睡眠,然后调用调度器函数切换到别的线程。一个线程要唤醒等待队列中的其它线程,只需从等待队列中取出一项,把它的状态从睡眠改为就绪,加入就绪队列,那么下次调度器函数执行时就有可能切换到被唤醒的线程。

死锁(Deadlock)

如果同一个线程先后两次调用lock,在第二次调用时,由于锁已经被占用,该线程会挂起等待别的线程释放锁,然而锁正是被自己占用着的,该线程又被挂起而没有机会释放锁,因此就永远处于挂起等待状态了,这叫做死锁(Deadlock)(fast类型)
另一种典型的死锁情形是这样:线程A获得了锁1,线程B获得了锁2,这时线程A调用lock试图获得锁2,结果是需要挂起等待线程B释放锁2,而这时线程B也调用ock试图获得锁1,结果是需要挂起等待线程A释放锁1,于是线程A和B都永远处于挂起状态了。不难想象,如果涉及到更多的线程和更多的锁,有没有可能死锁的问题将会变得复杂和难以判断。

写程序时应该尽量避免同时获得多个锁,如果一定有必要这么做,则有一个原则:如果所有线程在需要多个锁时都按相同的先后顺序获得锁,则不会出现死锁。比如一个程序中用到锁1、锁2、锁3,它们所对应的Mtex変量是锁1 -> 锁2 -> 锁3,那么所有线程在需要同时获得2个或3个锁时都应该按锁1、锁2、锁3的顺序获得。如果要为所有的锁确定一个先后顺序比较困难,则应该尽量使用pthread mutex_ try lock调用代替pthread mutex_lock调用,以免死锁。

线程间的同步还有这样一种情况:线程A需要等某个条件成立才能继续往下执行,现在这个条件不成立,线程A就阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执行。

在pthread库中通过条件变量( Condition Variable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。 Condition Variable用pthread_cond_t类型的变量表示,可以这样初始化和销毁:

 #include <pthread.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

int   pthread_cond_init(pthread_cond_t   *cond,  pthread_condattr_t *cond_attr);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int   pthread_cond_wait(pthread_cond_t    *cond,    pthread_mutex_t *mutex);
int  pthread_cond_timedwait(pthread_cond_t  *cond,  pthread_mutex_t *mutex, const struct timespec *abstime);
int pthread_cond_destroy(pthread_cond_t *cond);
  • 返回值:成功返回0,失败返回错误号

pthread_cond_timedwait函数还有一个额外的参数可以设定等待超时,如果到达了abstime所指定的时刻仍然没有别的线程来唤醒当前线程,就返回ETIMEDOUT

一个线程可以调用pthread_cond_signal唤醒在某个Condition Variable上等待另一个线程,也可以调用pthread_cond_broadcast唤醒在这个Condition Variable上等待的所有线程。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

typedef struct Goods {
    int data;
    struct Goods *next;
} Goods;

Goods *head = NULL;  //使用一条链表作为全局操作对象
pthread_mutex_t headlock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t hasGoods = PTHREAD_COND_INITIALIZER;

void *producer(void *arg) {
    Goods *new_goods;
    while (1) {
        new_goods = (Goods *)malloc(sizeof(Goods));
        new_goods->data = rand() % 100;

        pthread_mutex_lock(&headlock);
        // 头插法
        new_goods->next = head;
        head = new_goods;
        printf("produce %d\n", new_goods->data);
        pthread_mutex_unlock(&headlock);
        pthread_cond_broadcast(&hasGoods);  //广播触发条件
        sleep(rand() % 3);
    }
}

void *consumer(void *arg) {
    Goods *good;
    while (1) {
        pthread_mutex_lock(&headlock);
        while (!head) {
            //链表为空,没得消费,等待条件触发,同时暂时释放锁headlock
            //这里用while不用if是为了让条件成立触发之后再判断一次head是否为null,
            //因为有多个消费者, 当一个消费者抢到锁消费完之后释放锁,
            //之后锁会被另一个消费者线程抢到, 如果用if,这个线程抢到锁之后就会往下运行,
            //但head可能为null
            pthread_cond_wait(&hasGoods, &headlock);
        }

        good = head;
        head = head->next;
        printf("consume %d\n", good->data);
        pthread_mutex_unlock(&headlock);
        free(good);
        sleep(rand() % 3);
    }
}
/**
 * @brief 1个线程做生产者,4个线程做消费者
 * 
 * @param argc 
 * @param argv 
 * @return int 
 */
int main(int argc, char const *argv[]) {
    srand(time(NULL));  //随机数种子
    pthread_t producer_tid, consumer_tid1, consumer_tid2, consumer_tid3, consumer_tid4;
    pthread_create(&producer_tid, NULL, producer, NULL);
    pthread_create(&consumer_tid1, NULL, consumer, NULL);
    pthread_create(&consumer_tid2, NULL, consumer, NULL);
    pthread_create(&consumer_tid3, NULL, consumer, NULL);
    pthread_create(&consumer_tid4, NULL, consumer, NULL);

    pthread_join(producer_tid, NULL);
    pthread_join(consumer_tid1, NULL);
    pthread_join(consumer_tid2, NULL);
    pthread_join(consumer_tid3, NULL);
    pthread_join(consumer_tid4, NULL);
    return 0;
}

2.信号量Semaphore

Mutex变量是非0即1的,可看作一种资源的可用数量,初始化时Mutex是1,表示有一个可用资源,加锁时获得该资源,将Mtex减到0,表示不再有可用资源,解锁时释放该资源,将Mtex重新加到1,表示又有了一个可用资源。

信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于1。这种信号量不仅可用于同一进程的线程间同步,也可用于不同进程间的同步。

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_destroy(sem_t *sem);

semaphore变量的类型为sem_t

  • sem_init初始化一个 semaphore变量, value参数表示可用资源的数量, pshared参数为0表示信号量用于同一进程的线程间同步。
  • 在用完semaphore变量之后应该调用sem_ destroy释放与semaphore相关的资源。
  • 调用sem_wait可以获得资源,使semaphore的值减1,如果调用sem_ wait时semaphore的值已经是0,则挂起等待。如果不希望挂起等待,可以调用sem_trywait
  • 调用sem_post可以释放资源,使 semaphore的值加1,同时唤醒挂起等待的线程。
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#define NUM 5

int goods[NUM];  //全局变量,给生产者和消费者操作

sem_t blank_number, goods_number;

int head, tail;

void *producer(void *arg) {
    while (1) {
        sem_wait(&blank_number);         //blank_number资源-1
        goods[tail] = rand() % 100 + 1;  //在tail位置填入数据,类似队列
        printf("produce %d\n", goods[tail]);
        tail = (tail + 1) % NUM;
        sem_post(&goods_number);  //goods_number资源+1
        sleep(rand() % 3);
    }
}

void *consumer(void *arg) {
    while (1) {
        sem_wait(&goods_number);  //goods_number资源-1
        printf("consume %d\n", goods[head]);
        goods[head] = 0;  //将位置head的数据置为0,代表已经消费掉了
        head = (head + 1) % NUM;
        sem_post(&blank_number);  //blank_number资源+1
        sleep(rand() % 3);
    }
}

int main(int argc, char const *argv[]) {
    srand(time(NULL));

    sem_init(&blank_number, 0, NUM);  //初始化blank_number资源数量为NUM
    sem_init(&goods_number, 0, 0);    //初始化goods_number资源数量为0

    pthread_t ptid1, ptid2, ctid1, ctid2, ctid3;

    pthread_create(&ptid1, NULL, producer, NULL);
    pthread_create(&ptid2, NULL, producer, NULL);
    pthread_create(&ctid1, NULL, consumer, NULL);
    pthread_create(&ctid2, NULL, consumer, NULL);
    pthread_create(&ctid3, NULL, consumer, NULL);

    pthread_join(ptid1, NULL);
    pthread_join(ptid2, NULL);
    pthread_join(ctid1, NULL);
    pthread_join(ctid2, NULL);
    pthread_join(ctid3, NULL);
    return 0;
}