Ⅰ. 线程的概念
有些情况需要在一个进程中同时执行多个控制流程,比如实现一个图形界面的下载软件,一方面需要和用户交互,等待和处理用户的鼠标键盘事件,另一方面又需要同时下载多个文件,等待和处理从多个网络主机发来的数据,这些任务都需要一个“等待一处理”的循环,那么如何才能同时进行多项任务呢?这时候就可以使用线程。
线程(thread):是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以井发多个线程,每条线程并行执行不同的任务
由于同一进程的多个线程共享同一地址空间,因此 Text Segment
(代码段)、 Data Segment
(数据段)都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
- 文件描述符表
- 每种信号的处理方式
- 当前工作目录
- 用户id和组id
但有些资源是每个线程各有一份的:
- 线程id
- 上下文,包括各种寄存器的值、程序计数器和栈指针
- 栈空间
- errno变量
- 信号屏蔽字
signal mask
- 调度优先级
在 Linux上线程函数位于libpthread
共享库中,因此在编译时要加上-lpthread
选项
Ⅱ. 线程控制
1.创建线程
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
pthread_create
函数会在调用它的进程中开启一个新线程,这个线程会调用start_routine
所指向的函数开始执行,arg
作为start_routine()
函数的参数。thread
是结果参数,新线程的id存储在thread
指向的buffer
中
返回值
:成功返回0,失败返回错误号。(之前学习过的系统函数都是成功返回0,失败返回-1,而错误号保存在全局变量errno中,而pthread库的函数都是通过返回值返回错误号,虽然每个线程也都有一个errno,但这是为了兼容其他函数接口而提供的,pthread库本身并不使用它,通过返回值返回错误码更加清晰)
新线程将会继承创建它的线程的信号屏蔽字(signal mask
),但是新线程的未决信号集(pending signal set
)会是空的。也不会继承alternate signal stack
2.获取当前线程的id
#include <pthread.h>
pthread_t pthread_self(void);
返回值
:总是成功返回调用该函数的线程ID
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
void printid(char *);
void *thr_fun(void *arg) {
printid(arg);
return NULL;
}
void printid(char *tip) {
pid_t pid = getpid();
pthread_t tid = pthread_self();
printf("%s pid:%u tid:%u (%p)\n", tip, pid, tid, tid);
printf("%s thr_fun=%p\n", tip, thr_fun);
}
/**
* @brief 打印进程id、线程id
*
* @param argc
* @param argv
* @return int
*/
int main(int argc, char const *argv[]) {
pthread_t tid;
int rtn = pthread_create(&tid, NULL, thr_fun, "new thread.");
if (rtn) {
printf("create thread error: %s\n", strerror(rtn));
exit(-1);
}
sleep(1);
printid("main thread");
return 0;
}
3.终止线程
如果需要只终止某个线程而不终止整个进程,可以有三种方法:
- 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。
- 一个线程可以调用
pthread_cancel
终止同一进程的另一个线程。 - 线程可以调用
pthread_exit
终止自己。
#include <pthread.h>
void pthread_exit(void *retval);
retval
是void*
类型,和线程函数的返回值的用法一样,其他线程可以调用pthread_join
获取这个指针。
需要注意,pthread_exit
或者return
返回的指针所指向的内存单元必须是全局 的或者用malloc分配的,不能在线程函数的栈上分配,因为当其他线程得到这个返回值时线程函数已经退出了。
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
调用该函数的线程将被挂起,知道id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join
得到的终止状态是不同的,总结如下:
- 如果thread线程通过return返回,
retval
所指向的单元里存放的是thread线程函数的返回值。 - 如果thread线程被其他线程调用
pthread_cancel
异常终止掉,retval
所指向的单元里存放的是常数PTHREAD_CANCEL
,也就是-1。 - 如果thread线程是自己调用
pthread_exit
终止的,retval
所指向的单元存放的是传给pthread_exit
的参数。 - 如果对thread线程的终止状态不感兴趣,可以传
NULL
给retval
参数
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
/**
* @brief 线程1函数,使用return退出
*
* @param arg
* @return void*
*/
void* thr_fn1(void* arg) {
printf("thread 1 returning\n");
return (void*)1;
}
/**
* @brief 线程2函数,使用pthread_exit终止
*
* @param arg
* @return void*
*/
void* thr_fn2(void* arg) {
printf("thread 2 exiting\n");
pthread_exit((void*)2);
return NULL;
}
/**
* @brief 线程3函数,死循环,需要其他线程使用pthread_cancel终止它
*
* @param arg
* @return void*
*/
void* thr_fn3(void* arg) {
while (1) {
printf("thread 3 sleeping\n");
sleep(1);
}
return NULL;
}
int main(int argc, char const* argv[]) {
pthread_t tid;
void* stat;
pthread_create(&tid, NULL, thr_fn1, NULL);
pthread_join(tid, &stat);
printf("thread 1 exit code %ld\n", (long)stat);
pthread_create(&tid, NULL, thr_fn2, NULL);
pthread_join(tid, &stat);
printf("thread 2 exit code %ld\n", (long)stat);
pthread_create(&tid, NULL, thr_fn3, NULL);
sleep(3);
pthread_cancel(tid);
pthread_join(tid, &stat);
printf("thread 3 exit code %ld\n", (long)stat);
return 0;
}
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
void printid(char *);
void *thr_fun(void *arg) {
printid(arg);
return NULL;
}
void printid(char *tip) {
pid_t pid = getpid();
pthread_t tid = pthread_self();
printf("%s pid:%u tid:%u (%p)\n", tip, pid, tid, tid);
printf("%s thr_fun=%p\n", tip, thr_fun);
}
/**
* @brief 打印进程id、线程id
*
* @param argc
* @param argv
* @return int
*/
int main(int argc, char const *argv[]) {
pthread_t tid;
int rtn = pthread_create(&tid, NULL, thr_fun, "new thread.");
if (rtn) {
printf("create thread error: %s\n", strerror(rtn));
exit(-1);
}
//sleep(1);
pthread_join(tid, NULL); //使用pthread_join就不需要sleep来阻塞主线程了
printid("main thread");
return 0;
}
Ⅲ. 线程间同步
多个线程同时访问共享数据时可能会冲突,这跟前面讲信号时所说的可重入性是同样的问题。比如两个线程都要把某个全局变量增加1,这个操作在某平台需要三条指令完成:
- 从内存读变量值到寄存器(mov)
- 寄存器的值+1(add)
- 将寄存器的值写会内存(mov)
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
int cnt = 0;
/**
* @brief 将全局变量cnt加5000次1
*
* @param arg
* @return void*
*/
void *cntadd(void *arg) {
int val;
for (int i = 0; i < 5000; i++) {
val = cnt;
//插入一条printf让操作时间变长一点
printf("%ld: %d\n", pthread_self(), val);
cnt = val + 1;
}
return NULL;
}
/**
* @brief 创建两个线程同时对全局变量cnt进行累加计算,观察结果
*
* @param argc
* @param argv
* @return int
*/
int main(int argc, char const *argv[]) {
pthread_t tid_A, tid_B;
pthread_create(&tid_A, NULL, cntadd, NULL);
pthread_create(&tid_B, NULL, cntadd, NULL);
pthread_join(tid_A, NULL);
pthread_join(tid_B, NULL);
return 0;
}
1.互斥锁Mutex
对于多线程的程序,访问冲突的问题是很普遍的,解决的办法是引入互斥锁(Mutex, MutualExclusive Lock
),获得锁的线程可以完成“读一修改-写”的操作,然后释放锁给其它线程,没有获得锁的线程只能等待而不能访问共享数据,这样“读-修改一写”三步操作组成一个原子操作,要么都执行,要么都不执行,不会执行到中间被打断,也不会在其它处理器上并行做这个操作。
一把互斥锁有两种状态:非锁定的(不属于任何一个线程)和已锁定的(仅属于一个线程),互斥锁永远不能被两个线程同时拥有。如果有一个线程试图获取一把已经被锁定的互斥锁,那么这个线程将被挂起,直到拥有这把锁的另一个线程释放了这把互斥锁。
#include <pthread.h>
pthread_mutex_t fastmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t recmutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
pthread_mutex_t errchkmutex = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
mutexattr
:用来设定互斥锁的特性,如果为NULL
将使用缺省特性返回值
:pthread_mutex_init
总是返回0;其他函数成功返回0,失败返回错误码mutex
:结果参数
pthread_mutex_init
函数对Mutex
做初始化,参数attr
设定Mutex
的属性,如果attr
为NULL
则表示缺省属性。
用pthread_mutex_init
函数初始化的Mutex可用pthread_mutex_destroy
销毁。
如果Mutex变量是静态分配的(全局变量或static变量),也可以用宏定义PTHREAD_MUTEX_INITIALIZER
来初始化,相当于用pthread_mutex_init
初始化并且attr
参数为NULL
。
LinuxThreads 实现只支持一个互斥体属性,互斥体种类,它可以是“快速”(fast
)、“递归”(recursive
)或“错误检查”(error checking
)。 互斥锁的类型决定了它是否可以被已经拥有它的线程再次锁定。 默认类型是”fast
“。
pthread_mutex_lock
用于锁定给定的互斥锁mutex
,如果该所当前是unlocked
状态,那么mutex
会变为locked
状态并且被调用pthread_mutex_lock
函数的线程所拥有,这时pthread_mutex_lock
会立即返回;如果mutex
状态是locked
,那么pthread_mutex_lock
将挂起调用它的线程,直到拥有mutex
的线程释放这个锁为止。
如果mutex已经被调用pthread_mutex_lock
的这个线程锁定了,那么pthread_mutex_lock
的行为将取决于互斥锁mutex
的类型(fast
、recursive
、error checking
):
fast
类型:调用pthread_mutex_lock
的线程将被挂起直到锁被释放,因此这将会导致死锁发生。error checking
类型:pthread_mutex_lock
立即返回错误码EDEADLK
。recursive
类型:pthread_mutex_lock
成功执行并立即返回,同时会记录调用线程锁定mutex
的次数,要将mutex
变为unlocked
状态的话,需要执行相同次数的pthread_mutex_unlock
操作
pthread_mutex_trylock
的行为和pthread_mutex_lock
一致,区别在于当互斥锁mutex
已经被另一个线程(或调用线程的fast
类型)获得时,它无法阻塞调用线程,而是立即返回错误码EBUSY
pthread_mutex_unlock
会释放指定的互斥锁mutex
pthread_mutex_destroy
会销毁互斥对象mutex
,并释放它可能持有的资源。互斥锁在进入前必须是unlocked
的。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
pthread_mutex_t add_lock = PTHREAD_MUTEX_INITIALIZER;
int cnt = 0;
/**
* @brief 将全局变量cnt加5000次1
*
* @param arg
* @return void*
*/
void *cntadd(void *arg) {
int val;
for (int i = 0; i < 5000; i++) {
pthread_mutex_lock(&add_lock); //加锁
val = cnt;
//插入一条printf让操作时间变长一点
printf("%ld: %d\n", pthread_self(), val);
cnt = val + 1;
pthread_mutex_unlock(&add_lock); //解锁
}
return NULL;
}
/**
* @brief 创建两个线程同时对全局变量cnt进行累加计算,观察结果
*
* @param argc
* @param argv
* @return int
*/
int main(int argc, char const *argv[]) {
pthread_t tid_A, tid_B;
pthread_create(&tid_A, NULL, cntadd, NULL);
pthread_create(&tid_B, NULL, cntadd, NULL);
pthread_join(tid_A, NULL);
pthread_join(tid_B, NULL);
pthread_mutex_destroy(&add_lock); //销毁锁
return 0;
}
“挂起等待”和“唤醒等待线程”的操作如何实现?
每个 Mutex有一个等待队列,一个线程要在 Mutex上挂起等待,首先在把自己加入等待队列中,然后置线程状态为睡眠,然后调用调度器函数切换到别的线程。一个线程要唤醒等待队列中的其它线程,只需从等待队列中取出一项,把它的状态从睡眠改为就绪,加入就绪队列,那么下次调度器函数执行时就有可能切换到被唤醒的线程。
死锁(Deadlock)
如果同一个线程先后两次调用lock,在第二次调用时,由于锁已经被占用,该线程会挂起等待别的线程释放锁,然而锁正是被自己占用着的,该线程又被挂起而没有机会释放锁,因此就永远处于挂起等待状态了,这叫做死锁(Deadlock)(fast类型)
另一种典型的死锁情形是这样:线程A获得了锁1,线程B获得了锁2,这时线程A调用lock试图获得锁2,结果是需要挂起等待线程B释放锁2,而这时线程B也调用ock试图获得锁1,结果是需要挂起等待线程A释放锁1,于是线程A和B都永远处于挂起状态了。不难想象,如果涉及到更多的线程和更多的锁,有没有可能死锁的问题将会变得复杂和难以判断。
写程序时应该尽量避免同时获得多个锁,如果一定有必要这么做,则有一个原则:如果所有线程在需要多个锁时都按相同的先后顺序获得锁,则不会出现死锁。比如一个程序中用到锁1、锁2、锁3,它们所对应的Mtex変量是锁1 -> 锁2 -> 锁3,那么所有线程在需要同时获得2个或3个锁时都应该按锁1、锁2、锁3的顺序获得。如果要为所有的锁确定一个先后顺序比较困难,则应该尽量使用pthread mutex_ try lock
调用代替pthread mutex_lock
调用,以免死锁。
线程间的同步还有这样一种情况:线程A需要等某个条件成立才能继续往下执行,现在这个条件不成立,线程A就阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执行。
在pthread库中通过条件变量( Condition Variable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。 Condition Variable用pthread_cond_t
类型的变量表示,可以这样初始化和销毁:
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
int pthread_cond_destroy(pthread_cond_t *cond);
- 返回值:成功返回0,失败返回错误号
pthread_cond_timedwait
函数还有一个额外的参数可以设定等待超时,如果到达了abstime所指定的时刻仍然没有别的线程来唤醒当前线程,就返回ETIMEDOUT
一个线程可以调用pthread_cond_signal
唤醒在某个Condition Variable上等待另一个线程,也可以调用pthread_cond_broadcast
唤醒在这个Condition Variable上等待的所有线程。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
typedef struct Goods {
int data;
struct Goods *next;
} Goods;
Goods *head = NULL; //使用一条链表作为全局操作对象
pthread_mutex_t headlock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t hasGoods = PTHREAD_COND_INITIALIZER;
void *producer(void *arg) {
Goods *new_goods;
while (1) {
new_goods = (Goods *)malloc(sizeof(Goods));
new_goods->data = rand() % 100;
pthread_mutex_lock(&headlock);
// 头插法
new_goods->next = head;
head = new_goods;
printf("produce %d\n", new_goods->data);
pthread_mutex_unlock(&headlock);
pthread_cond_broadcast(&hasGoods); //广播触发条件
sleep(rand() % 3);
}
}
void *consumer(void *arg) {
Goods *good;
while (1) {
pthread_mutex_lock(&headlock);
while (!head) {
//链表为空,没得消费,等待条件触发,同时暂时释放锁headlock
//这里用while不用if是为了让条件成立触发之后再判断一次head是否为null,
//因为有多个消费者, 当一个消费者抢到锁消费完之后释放锁,
//之后锁会被另一个消费者线程抢到, 如果用if,这个线程抢到锁之后就会往下运行,
//但head可能为null
pthread_cond_wait(&hasGoods, &headlock);
}
good = head;
head = head->next;
printf("consume %d\n", good->data);
pthread_mutex_unlock(&headlock);
free(good);
sleep(rand() % 3);
}
}
/**
* @brief 1个线程做生产者,4个线程做消费者
*
* @param argc
* @param argv
* @return int
*/
int main(int argc, char const *argv[]) {
srand(time(NULL)); //随机数种子
pthread_t producer_tid, consumer_tid1, consumer_tid2, consumer_tid3, consumer_tid4;
pthread_create(&producer_tid, NULL, producer, NULL);
pthread_create(&consumer_tid1, NULL, consumer, NULL);
pthread_create(&consumer_tid2, NULL, consumer, NULL);
pthread_create(&consumer_tid3, NULL, consumer, NULL);
pthread_create(&consumer_tid4, NULL, consumer, NULL);
pthread_join(producer_tid, NULL);
pthread_join(consumer_tid1, NULL);
pthread_join(consumer_tid2, NULL);
pthread_join(consumer_tid3, NULL);
pthread_join(consumer_tid4, NULL);
return 0;
}
2.信号量Semaphore
Mutex变量是非0即1的,可看作一种资源的可用数量,初始化时Mutex是1,表示有一个可用资源,加锁时获得该资源,将Mtex减到0,表示不再有可用资源,解锁时释放该资源,将Mtex重新加到1,表示又有了一个可用资源。
信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于1。这种信号量不仅可用于同一进程的线程间同步,也可用于不同进程间的同步。
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_destroy(sem_t *sem);
semaphore变量的类型为sem_t
sem_init
初始化一个 semaphore变量, value参数表示可用资源的数量,pshared
参数为0表示信号量用于同一进程的线程间同步。- 在用完semaphore变量之后应该调用
sem_ destroy
释放与semaphore相关的资源。 - 调用
sem_wait
可以获得资源,使semaphore的值减1,如果调用sem_ wait
时semaphore的值已经是0,则挂起等待。如果不希望挂起等待,可以调用sem_trywait
- 调用
sem_post
可以释放资源,使 semaphore的值加1,同时唤醒挂起等待的线程。
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#define NUM 5
int goods[NUM]; //全局变量,给生产者和消费者操作
sem_t blank_number, goods_number;
int head, tail;
void *producer(void *arg) {
while (1) {
sem_wait(&blank_number); //blank_number资源-1
goods[tail] = rand() % 100 + 1; //在tail位置填入数据,类似队列
printf("produce %d\n", goods[tail]);
tail = (tail + 1) % NUM;
sem_post(&goods_number); //goods_number资源+1
sleep(rand() % 3);
}
}
void *consumer(void *arg) {
while (1) {
sem_wait(&goods_number); //goods_number资源-1
printf("consume %d\n", goods[head]);
goods[head] = 0; //将位置head的数据置为0,代表已经消费掉了
head = (head + 1) % NUM;
sem_post(&blank_number); //blank_number资源+1
sleep(rand() % 3);
}
}
int main(int argc, char const *argv[]) {
srand(time(NULL));
sem_init(&blank_number, 0, NUM); //初始化blank_number资源数量为NUM
sem_init(&goods_number, 0, 0); //初始化goods_number资源数量为0
pthread_t ptid1, ptid2, ctid1, ctid2, ctid3;
pthread_create(&ptid1, NULL, producer, NULL);
pthread_create(&ptid2, NULL, producer, NULL);
pthread_create(&ctid1, NULL, consumer, NULL);
pthread_create(&ctid2, NULL, consumer, NULL);
pthread_create(&ctid3, NULL, consumer, NULL);
pthread_join(ptid1, NULL);
pthread_join(ptid2, NULL);
pthread_join(ctid1, NULL);
pthread_join(ctid2, NULL);
pthread_join(ctid3, NULL);
return 0;
}
评论 (0)