Posix Thread,简称 PThread,算不上是 iOS 基础知识,而是 UNIX 中的线程管理,iOS 也是类 UNIX 系统,当然可以在其中使用 PThread,PThread 都是 C API,当用 C++ 写一些跨平台的代码时需要操作线程,PThread 是很自然的选择。

Apple Develop Xcode

PThread

  • pthread_create 创建新的控制流
  • pthread_exit 从现有的控制流中退出
  • pthread_join 从控制流中得到退出状态
  • pthread_cleanup_push 注册在退出控制流时调用的函数
  • pthread_self 获取控制流的 ID
  • pthread_cancel 请求控制流的非正常退出

线程标识

每个线程也有一个线程标识,也就是线程 ID,其类型是 pthread_t,获取自身的线程 ID。

创建线程

PThread 函数在调用失败时通常会返回错误码,它们并不像其他的 POSIX 函数一样设置 errno。

#include <stdio.h>
#include <pthread.h>
#include <zconf.h>

pthread_t ntid;

void printids(const char *s) {
    pid_t pid;
    pthread_t tid;

    pid = getpid();
    tid = pthread_self();
    printf("%s pid %lu tid %lu (0x%lx)\n", s, (unsigned long)pid, (unsigned long)tid, (unsigned long)tid);
}

void* thr_fn(void *arg) {
    printids("new thread: ");
    return ((void *)0);
}

int main() {
    int err;
    err = pthread_create(&ntid, NULL, thr_fn, NULL);
    if (err != 0) {
        printf("can`t create thread\n");
        return -1;
    }
    printids("main thread: ");
    sleep(1);
    return 0;
}
main thread:  pid 30220 tid 4641422784 (0x114a67dc0)
new thread:  pid 30220 tid 123145522049024 (0x70000d18f000)

线程终止

单个线程可以通过3种方式退出,在不终止整个进程的情况下,停止它的控制流:

  1. 线程可以简单地从启动例程中返回,返回值是线程的退出码。
  2. 线程可以被同一进程中的其他线程取消 pthread_cancel。
  3. 线程调用 pthread_exit。

调用 pthread_join 的线程将一直阻塞,直到指定的线程调用 pthread_exit、从启动例程中返回或者被取消。

#include <stdio.h>
#include <pthread.h>
#include <zconf.h>

void* thr_fn1(void *arg) {
    printf("thread 1 returning\n");
    return ((void *)1);
}

void* thr_fn2(void *arg) {
    printf("thread 2 exiting\n");
    pthread_exit((void *)2);
}

int main() {
    int err;
    pthread_t tid1, tid2;
    void *tret;

    err = pthread_create(&tid1, NULL, thr_fn1, NULL);
    if (err != 0) {
        printf("can`t create thread 1");
    }
    err = pthread_create(&tid2, NULL, thr_fn2, NULL);
    if (err != 0) {
        printf("can`t create thread 2");
    }

    err = pthread_join(tid1, &tret);
    if (err != 0) {
        printf("can`t join with thread 1");
    }
    printf("thread 1 exit code %ld\n", (long)tret);
    err = pthread_join(tid2, &tret);
    if (err != 0) {
        printf("can`t join with thread 2");
    }
    printf("thread 2 exit code %ld\n", (long)tret);

    return 0;
}
thread 1 returning
thread 2 exiting
thread 1 exit code 1
thread 2 exit code 2

线程可以安排它退出时需要调用的函数

利用 pthread_cleanup_push 和 pthread_cleanup_pop。

从下面的示例程序可以看出,只有第二个线程能正确地启动和退出,且清理处理程序被调用了,因为第二个线程调用 pthread_exit 来退出。

第一个线程是通过从它的启动例程中返回而终止的,它的清理处理程序就不会被调用,在 macOS 上运行相同的程序,可以看到程序会出现段异常,这是因为在这个平台上,pthread_cleanup_push 是用宏实现的,而宏把某些上下文存放在栈上,当线程 1 在调用 pthread_cleanup_push 和调用 pthread_cleanup_pop 之间返回时,栈已被改写,而这两个平台在调用清理处理程序时就用了这个被改写的上下文。

#include <stdio.h>
#include <pthread.h>
#include <zconf.h>

void cleanup(void *arg) {
    printf("cleanup %s\n", (char *)arg);
}

void* thr_c_fn1(void *arg) {
    printf("thread 1 start\n");
    pthread_cleanup_push(cleanup, "thread 1 first handler");
    pthread_cleanup_push(cleanup, "thread 1 second handler");
    printf("thread 1 push complete\n");
    if (arg) {
        return ((void *) 1); // exit with error
    }
    pthread_cleanup_pop(0);
    pthread_cleanup_pop(0);
    return ((void *) 1);
}

void* thr_c_fn2(void *arg) {
    printf("thread 2 start\n");
    pthread_cleanup_push(cleanup, "thread 2 first handler");
    pthread_cleanup_push(cleanup, "thread 2 second handler");
    printf("thread 2 push complete\n");
    if (arg) {
        pthread_exit((void *) 2);
    }
    pthread_cleanup_pop(0);
    pthread_cleanup_pop(0);
    pthread_exit((void *) 2);
}

int run_cleanup(void) {
    int err;
    pthread_t tid1, tid2;
    void *tret;

    err = pthread_create(&tid1, NULL, thr_c_fn1, (void *)1);
    if (err != 0) {
        printf("can`t create thread 1");
    }
    err = pthread_create(&tid2, NULL, thr_c_fn2, (void *)1);
    if (err != 0) {
        printf("can`t create thread 2");
    }

    err = pthread_join(tid1, &tret);
    if (err != 0) {
        printf("can`t join with thread 1");
    }
    printf("thread 1 exit code %ld\n", (long)tret);
    err = pthread_join(tid2, &tret);
    if (err != 0) {
        printf("can`t join with thread 2");
    }
    printf("thread 2 exit code %ld\n", (long)tret);

    sleep(2);

    return 0;
}

线程同步机制

互斥量 mutex

  • pthread_mutex_t
  • pthread_mutex_init
  • pthread_mutex_destroy

  • pthread_mutex_lock
  • pthread_mutex_trylock
  • pthread_mutex_unlock

  • pthread_mutexattr_init
  • pthread_mutexattr_destroy

可以使用 PThread 的互斥接口来保护数据,确保同一时间只有一个线程访问数据。互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行设置(加锁),在访问完成后释放(解锁)互斥量。对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线程都会被阻塞直到当前线程释放该互斥锁。如果释放互斥量时有一个以上的线程阻塞,那么所有该锁上的阻塞线程都会变成可运行状态,第一个变为运行的线程就可以对互斥量加锁,其他线程就会看到互斥量依然是锁着的,只能回去再次等待它重新变为可用。在这种方式下,每次只有一个线程可以向前执行。

#include <stdlib.h>
#include <pthread.h>

struct foo {
    int             f_count;
    pthread_mutex_t f_lock;
    int             f_id;
    /* ... more stuff here ... */
};

struct foo * foo_alloc(int id) /* allocate the object */
{
    struct foo *fp;

    if ((fp = malloc(sizeof(struct foo))) != NULL) {
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
            free(fp);
            return(NULL);
        }
        /* ... continue initialization ... */
    }
    return(fp);
}

void foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}

void foo_rele(struct foo *fp) /* release a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    if (--fp->f_count == 0) { /* last reference */
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } else {
        pthread_mutex_unlock(&fp->f_lock);
    }
}

条件变量

条件变量是线程可用的另一种同步机制。条件变量给多个线程提供了一个会合的场所。条件变量与互斥量一起使用时,允许线程以无竞争的方式等待特定的条件发生。条件本身是由互斥量保护的。线程在改变条件状态之前必须首先锁住互斥量。其他线程在获得互斥量之前不会察觉到这种改变,因为互斥量必须在锁定以后才能计算条件。

  • pthread_cond_t
  • pthread_cond_init
  • pthread_cond_destroy

  • pthread_cond_wait
    • 该函数原子地执行两个动作:
      1. 给互斥锁 mutex 解锁。
      2. 把调用线程投入睡眠,直到另外某个线程就本条件变量调用pthread_cond_signal。
    • pthread_cond_wait 在返回前重新给互斥锁 mutex 上锁。
    • 当 pthread_cond_wait 返回时,我们总是再次测试相应条件成立与否,因为可能发生虚假的唤醒:期待的条件尚不成立时的唤醒。各种线程实现都试图最大限度减少这些虚假唤醒的数量,但是仍有可能发生。
  • pthread_cond_signal
  • pthread_cond_broadcast 唤醒阻塞在相应条件变量上的所有线程。
  • pthread_cond_timedwait 允许线程就阻塞时间设置一个限制值。

  • pthread_condattr_init
  • pthread_condattr_destroy
#include <pthread.h>

struct msg {
    struct msg *m_next;
    /* ... more stuff here ... */
};

struct msg *workq;

pthread_cond_t qready = PTHREAD_COND_INITIALIZER;

pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

void process_msg(void) {
    struct msg *mp;

    for (;;) {
        pthread_mutex_lock(&qlock);
        while (workq == NULL)
            pthread_cond_wait(&qready, &qlock);
        mp = workq;
        workq = mp->m_next;
        pthread_mutex_unlock(&qlock);
        /* now process the message mp */
    }
}

void enqueue_msg(struct msg *mp) {
    pthread_mutex_lock(&qlock);
    mp->m_next = workq;
    workq = mp;
    pthread_mutex_unlock(&qlock);
    pthread_cond_signal(&qready);
}

生产者消费者问题

同步中有一个称为生产者—消费者(producer consumer)问题的经典问题,也称为有界缓冲区(bounded buffer)问题。一个或多个生产者(线程或进程)创建着一个个的数据条目,然后这些条目由一个或多个消费者(线程或进程)处理。

采用上面介绍的互斥量和条件变量来解决这个问题:

/* include globals */
#include	"unpipc.h"

#define	MAXNITEMS 		1000000
#define	MAXNTHREADS			100

/* globals shared by threads */
int		nitems;				/* read-only by producer and consumer */
int		buff[MAXNITEMS];
struct {
    pthread_mutex_t	mutex;
    int				nput;	/* next index to store */
    int				nval;	/* next value to store */
} put = { PTHREAD_MUTEX_INITIALIZER };

struct {
    pthread_mutex_t	mutex;
    pthread_cond_t	cond;
    int				nready;	/* number ready for consumer */
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
/* end globals */

int		nsignals;

void	*produce(void *), *consume(void *);

/* include main */
int main(int argc, char **argv)
{
    int			i, nthreads, count[MAXNTHREADS];
    pthread_t	tid_produce[MAXNTHREADS], tid_consume;

    if (argc != 3)
        err_quit("usage: prodcons7 <#items> <#threads>");
    nitems = min(atoi(argv[1]), MAXNITEMS);
    nthreads = min(atoi(argv[2]), MAXNTHREADS);

    Set_concurrency(nthreads + 1);
    /* 4create all producers and one consumer */
    for (i = 0; i < nthreads; i++) {
        count[i] = 0;
        Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }
    Pthread_create(&tid_consume, NULL, consume, NULL);

    /* wait for all producers and the consumer */
    for (i = 0; i < nthreads; i++) {
        Pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);
    }
    Pthread_join(tid_consume, NULL);
    printf("nsignals = %d\n", nsignals);

    exit(0);
}
/* end main */

/* include prodcons */
void * produce(void *arg)
{
    for ( ; ; ) {
        Pthread_mutex_lock(&put.mutex);
        if (put.nput >= nitems) {
            Pthread_mutex_unlock(&put.mutex);
            return(NULL);		/* array is full, we're done */
        }
        buff[put.nput] = put.nval;
        put.nput++;
        put.nval++;
        Pthread_mutex_unlock(&put.mutex);

        Pthread_mutex_lock(&nready.mutex);
        if (nready.nready == 0) {
            Pthread_cond_signal(&nready.cond);
            nsignals++;
        }
        nready.nready++;
        Pthread_mutex_unlock(&nready.mutex);

        *((int *) arg) += 1;
    }
}

void * consume(void *arg)
{
    int		i;

    for (i = 0; i < nitems; i++) {
        Pthread_mutex_lock(&nready.mutex);
        while (nready.nready == 0)
            Pthread_cond_wait(&nready.cond, &nready.mutex);
        nready.nready--;
        Pthread_mutex_unlock(&nready.mutex);

        if (buff[i] != i)
            printf("buff[%d] = %d\n", i, buff[i]);
    }
    return(NULL);
}
/* end prodcons */

读写锁 reader-writer lock

  • pthread_rwlock_t
  • pthread_rwlock_init
  • pthread_rwlock_destroy

  • pthread_rwlock_wrlock
  • pthread_rwlock_rdlock
  • pthread_rwlock_unlock
  • pthread_rwlock_tryrdlock
  • pthread_rwlock_trywrlock

  • pthread_rwlockattr_init
  • pthread_rwlockattr_destroy

当读写锁是写加锁状态时,在这个锁被解锁之前,所有试图对这个锁加锁的线程都会被阻塞。当读写锁在读加锁状态时,所有试图以读模式对它进行加锁的线程都可以得到访问权,但是任何希望以写模式对此锁进行加锁的线程都会阻塞,直到所有的线程释放它们的读锁为止。虽然各操作系统对读写锁的实现各不相同,但当读写锁处于读模式锁住的状态,而这时有一个线程试图以写模式获取锁时,读写锁通常会阻塞随后的读模式锁请求。这样可以避免读模式锁长期占用,而等待的写模式锁请求一直得不到满足。

#include <stdlib.h>
#include <pthread.h>

struct job {
    struct job *j_next;
    struct job *j_prev;
    pthread_t   j_id;   /* tells which thread handles this job */
    /* ... more stuff here ... */
};

struct queue {
    struct job      *q_head;
    struct job      *q_tail;
    pthread_rwlock_t q_lock;
};

/*
 * Initialize a queue.
 */
int queue_init(struct queue *qp) {
    int err;

    qp->q_head = NULL;
    qp->q_tail = NULL;
    err = pthread_rwlock_init(&qp->q_lock, NULL);
    if (err != 0)
        return(err);
    /* ... continue initialization ... */
    return(0);
}

/*
 * Insert a job at the head of the queue.
 */
void job_insert(struct queue *qp, struct job *jp) {
    pthread_rwlock_wrlock(&qp->q_lock);
    jp->j_next = qp->q_head;
    jp->j_prev = NULL;
    if (qp->q_head != NULL)
        qp->q_head->j_prev = jp;
    else
        qp->q_tail = jp;	/* list was empty */
    qp->q_head = jp;
    pthread_rwlock_unlock(&qp->q_lock);
}

/*
 * Append a job on the tail of the queue.
 */
void job_append(struct queue *qp, struct job *jp) {
    pthread_rwlock_wrlock(&qp->q_lock);
    jp->j_next = NULL;
    jp->j_prev = qp->q_tail;
    if (qp->q_tail != NULL)
        qp->q_tail->j_next = jp;
    else
        qp->q_head = jp;	/* list was empty */
    qp->q_tail = jp;
    pthread_rwlock_unlock(&qp->q_lock);
}

/*
 * Remove the given job from a queue.
 */
void job_remove(struct queue *qp, struct job *jp) {
    pthread_rwlock_wrlock(&qp->q_lock);
    if (jp == qp->q_head) {
        qp->q_head = jp->j_next;
        if (qp->q_tail == jp)
            qp->q_tail = NULL;
        else
            jp->j_next->j_prev = jp->j_prev;
    } else if (jp == qp->q_tail) {
        qp->q_tail = jp->j_prev;
        jp->j_prev->j_next = jp->j_next;
    } else {
        jp->j_prev->j_next = jp->j_next;
        jp->j_next->j_prev = jp->j_prev;
    }
    pthread_rwlock_unlock(&qp->q_lock);
}

/*
 * Find a job for the given thread ID.
 */
struct job * job_find(struct queue *qp, pthread_t id) {
    struct job *jp;

    if (pthread_rwlock_rdlock(&qp->q_lock) != 0)
        return(NULL);

    for (jp = qp->q_head; jp != NULL; jp = jp->j_next)
        if (pthread_equal(jp->j_id, id))
            break;

    pthread_rwlock_unlock(&qp->q_lock);
    return(jp);
}

参考资料