信号量的创建
信号量的创建和共享内存类似,实际调用semget(),操作也大同小异:创建对应的ipc_namespaace指针并指向该进程的ipc_ns,初始化共享内存对应的操作sem_ops,并将传参key, size, semflg封装为传参sem_params,最终调用ipcget()。
SYSCALL_DEFINE3(semget, key_t, key, int, nsems, int, semflg)
{
struct ipc_namespace *ns;
static const struct ipc_ops sem_ops = {
.getnew = newary,
.associate = sem_security,
.more_checks = sem_more_checks,
};
struct ipc_params sem_params;
ns = current->nsproxy->ipc_ns;
sem_params.key = key;
sem_params.flg = semflg;
sem_params.u.nsems = nsems;
return ipcget(ns, &sem_ids(ns), &sem_ops, &sem_params);
}
共享内存最终走到newseg()函数,而信号量则调用newary(),该函数也有着类似的逻辑:
- 通过kvmalloc()在直接映射区分配struct sem_array结构体描述该信号量。在该结构体中会有多个信号量保存在struct sem sems[]中,通过semval表示当前信号量。
- 初始化sem_array和sems中的各个链表
- 通过ipc_addid()将创建的sem_array挂载到基数树上,并返回对应id
static int newary(struct ipc_namespace *ns, struct ipc_params *params)
{
int retval;
struct sem_array *sma;
key_t key = params->key;
int nsems = params->u.nsems;
int semflg = params->flg;
int i;
......
sma = sem_alloc(nsems);
......
sma->sem_perm.mode = (semflg & S_IRWXUGO);
sma->sem_perm.key = key;
sma->sem_perm.security = NULL;
......
for (i = 0; i < nsems; i++) {
INIT_LIST_HEAD(&sma->sems[i].pending_alter);
INIT_LIST_HEAD(&sma->sems[i].pending_const);
spin_lock_init(&sma->sems[i].lock);
}
sma->complex_count = 0;
sma->use_global_lock = USE_GLOBAL_LOCK_HYSTERESIS;
INIT_LIST_HEAD(&sma->pending_alter);
INIT_LIST_HEAD(&sma->pending_const);
INIT_LIST_HEAD(&sma->list_id);
sma->sem_nsems = nsems;
sma->sem_ctime = ktime_get_real_seconds();
/* ipc_addid() locks sma upon success. */
retval = ipc_addid(&sem_ids(ns), &sma->sem_perm, ns->sc_semmni);
......
ns->used_sems += nsems;
sem_unlock(sma, -1);
rcu_read_unlock();
return sma->sem_perm.id;
}
struct sem_array {
struct kern_ipc_perm sem_perm; /* permissions .. see ipc.h */
time64_t sem_ctime; /* create/last semctl() time */
struct list_head pending_alter; /* pending operations */
/* that alter the array */
struct list_head pending_const; /* pending complex operations */
/* that do not alter semvals */
struct list_head list_id; /* undo requests on this array */
int sem_nsems; /* no. of semaphores in array */
int complex_count; /* pending complex operations */
unsigned int use_global_lock;/* >0: global lock required */
struct sem sems[];
} __randomize_layout;
struct sem {
int semval; /* current value */
/*
* PID of the process that last modified the semaphore. For
* Linux, specifically these are:
* - semop
* - semctl, via SETVAL and SETALL.
* - at task exit when performing undo adjustments (see exit_sem).
*/
struct pid *sempid;
spinlock_t lock; /* spinlock for fine-grained semtimedop */
struct list_head pending_alter; /* pending single-sop operations */
/* that alter the semaphore */
struct list_head pending_const; /* pending single-sop operations */
/* that do not alter the semaphore*/
time64_t sem_otime; /* candidate for sem_otime */
} ____cacheline_aligned_in_smp;
信号量的初始化
信号量通过semctl()实现初始化,主要使用semctl_main()和semctl_setval()函数。
SYSCALL_DEFINE4(semctl, int, semid, int, semnum, int, cmd, unsigned long, arg)
{
int version;
struct ipc_namespace *ns;
void __user *p = (void __user *)arg;
ns = current->nsproxy->ipc_ns;
switch (cmd) {
case IPC_INFO:
case SEM_INFO:
case IPC_STAT:
case SEM_STAT:
return semctl_nolock(ns, semid, cmd, version, p);
case GETALL:
case GETVAL:
case GETPID:
case GETNCNT:
case GETZCNT:
case SETALL:
return semctl_main(ns, semid, semnum, cmd, p);
case SETVAL:
return semctl_setval(ns, semid, semnum, arg);
case IPC_RMID:
case IPC_SET:
return semctl_down(ns, semid, cmd, version, p);
default:
return -EINVAL;
}
}
SETALL操作调用semctl_main(),传参为 union semun 里面的 unsigned short *array
,会设置整个信号量集合。semctl_main() 函数中,先是通过 sem_obtain_object_check()根据信号量集合的 id 在基数树里面找到 struct sem_array 对象,发现如果是 SETALL 操作,就将用户的参数中的 unsigned short *array 通过 copy_from_user() 拷贝到内核里面的 sem_io 数组,然后是一个循环,对于信号量集合里面的每一个信号量,设置 semval,以及修改这个信号量值的 pid。
static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
int cmd, void __user *p)
{
struct sem_array *sma;
struct sem *curr;
int err, nsems;
ushort fast_sem_io[SEMMSL_FAST];
ushort *sem_io = fast_sem_io;
DEFINE_WAKE_Q(wake_q);
sma = sem_obtain_object_check(ns, semid);
nsems = sma->sem_nsems;
......
switch (cmd) {
......
case SETALL:
{
int i;
struct sem_undo *un;
......
if (copy_from_user(sem_io, p, nsems*sizeof(ushort))) {
......
}
......
for (i = 0; i < nsems; i++) {
sma->sems[i].semval = sem_io[i];
sma->sems[i].sempid = task_tgid_vnr(current);
}
......
sma->sem_ctime = get_seconds();
/* maybe some queued-up processes were waiting for this */
do_smart_update(sma, NULL, 0, 0, &wake_q);
err = 0;
goto out_unlock;
}
}
......
wake_up_q(&wake_q);
......
}
SETVAL 操作调用semctl_setval()函数,传进来的参数 union semun 里面的 int val仅仅会设置某个信号量。在 semctl_setval() 函数中,我们先是通过 sem_obtain_object_check()根据信号量集合的 id 在基数树里面找到 struct sem_array 对象,对于 SETVAL 操作,直接根据参数中的 val 设置 semval,以及修改这个信号量值的 pid。
static int semctl_setval(struct ipc_namespace *ns, int semid, int semnum,
unsigned long arg)
{
struct sem_undo *un;
struct sem_array *sma;
struct sem *curr;
int err, val;
DEFINE_WAKE_Q(wake_q);
......
sma = sem_obtain_object_check(ns, semid);
......
curr = &sma->sems[semnum];
......
curr->semval = val;
curr->sempid = task_tgid_vnr(current);
sma->sem_ctime = get_seconds();
/* maybe some queued-up processes were waiting for this */
do_smart_update(sma, NULL, 0, 0, &wake_q);
......
wake_up_q(&wake_q);
return 0;
}
信号量的操作
信号量的操作通过semop()实现,实际调用sys_emtimedop(),最终调用为do_semtimedop()
SYSCALL_DEFINE3(semop, int, semid, struct sembuf __user *, tsops, unsigned, nsops)
{
return sys_semtimedop(semid, tsops, nsops, NULL);
}
do_semtimedop()是一个很较长的函数,逻辑比较复杂,主要为:
- 调用copy_from_user()拷贝用户参数至内核态,如对信号量的操作struct sembuf。
- 如果需要进入等待状态,,则需要设置超时
- 调用sem_obtain_object_check()根据id获取对应的信号量集合sma
- 创建struct sem_queue queue表示当前信号量操作。这里之所以称之为queue是因为操作的执行不可预期,因此排在队列之中等待信号量满足条件时再调用perform_atomic_semop()实施信号量操作。
- 如果不需要等待,则说明信号量操作已完成,也改变了信号量的值。接下来,就是一个标准流程。首先通过 DEFINE_WAKE_Q(wake_q) 声明一个 wake_q,调用 do_smart_update()看这次对于信号量的值的改变可以影响并可以激活等待队列中的哪些 struct sem_queue,然后把它们都放在 wake_q 里面,调用 wake_up_q() 唤醒这些进程。
- 如果需要等待,则会根据信号量操作是对单个信号量还是整个信号量集合,将queue挂载至信号量链表pending_alter或者信号量集合的链表pending_alter中
- 进入do-while循环等待,如果没有时间限制则调用schedule()让出CPU资源,如果有则调用schedule_timeout()让出资源并过一段时间后回来。当回来的时候,判断是否等待超时,如果没有等待超时则进入下一轮循环,再次等待,如果超时则退出循环,返回错误。在让出 CPU 的时候,设置进程的状态为 TASK_INTERRUPTIBLE,并且循环的结束会通过 signal_pending 查看是否收到过信号,这说明这个等待信号量的进程是可以被信号中断的,也即一个等待信号量的进程是可以通过 kill 杀掉的。
static long do_semtimedop(int semid, struct sembuf __user *tsops,
unsigned nsops, const struct timespec64 *timeout)
{
int error = -EINVAL;
struct sem_array *sma;
struct sembuf fast_sops[SEMOPM_FAST];
struct sembuf *sops = fast_sops, *sop;
struct sem_undo *un;
int max, locknum;
bool undos = false, alter = false, dupsop = false;
struct sem_queue queue;
unsigned long dup = 0, jiffies_left = 0;
struct ipc_namespace *ns;
ns = current->nsproxy->ipc_ns;
......
if (copy_from_user(sops, tsops, nsops * sizeof(*tsops))) {
error = -EFAULT;
goto out_free;
}
if (timeout) {
if (timeout->tv_sec < 0 || timeout->tv_nsec < 0 ||
timeout->tv_nsec >= 1000000000L) {
error = -EINVAL;
goto out_free;
}
jiffies_left = timespec64_to_jiffies(timeout);
}
......
un = find_alloc_undo(ns, semid);
......
sma = sem_obtain_object_check(ns, semid);
......
queue.sops = sops;
queue.nsops = nsops;
queue.undo = un;
queue.pid = task_tgid(current);
queue.alter = alter;
queue.dupsop = dupsop;
error = perform_atomic_semop(sma, &queue);
if (error == 0) { /* non-blocking succesfull path */
DEFINE_WAKE_Q(wake_q);
/*
* If the operation was successful, then do
* the required updates.
*/
if (alter)
do_smart_update(sma, sops, nsops, 1, &wake_q);
else
set_semotime(sma, sops);
......
}
......
/*
* We need to sleep on this operation, so we put the current
* task into the pending queue and go to sleep.
*/
if (nsops == 1) {
struct sem *curr;
int idx = array_index_nospec(sops->sem_num, sma->sem_nsems);
curr = &sma->sems[idx];
if (alter) {
if (sma->complex_count) {
list_add_tail(&queue.list,
&sma->pending_alter);
} else {
list_add_tail(&queue.list,
&curr->pending_alter);
}
} else {
list_add_tail(&queue.list, &curr->pending_const);
}
} else {
if (!sma->complex_count)
merge_queues(sma);
if (alter)
list_add_tail(&queue.list, &sma->pending_alter);
else
list_add_tail(&queue.list, &sma->pending_const);
sma->complex_count++;
}
do {
WRITE_ONCE(queue.status, -EINTR);
queue.sleeper = current;
__set_current_state(TASK_INTERRUPTIBLE);
......
if (timeout)
jiffies_left = schedule_timeout(jiffies_left);
else
schedule();
......
/*
* If an interrupt occurred we have to clean up the queue.
*/
if (timeout && jiffies_left == 0)
error = -EAGAIN;
} while (error == -EINTR && !signal_pending(current)); /* spurious */
......
}
do_smart_update() 会调用 update_queue(),update_queue() 会依次循环整个信号量集合的等待队列 pending_alter或者某个信号量的等待队列,试图在信号量的值变了的情况下,再次尝试 perform_atomic_semop 进行信号量操作。如果不成功,则尝试队列中的下一个;如果尝试成功,则调用 unlink_queue() 从队列上取下来,然后调用 wake_up_sem_queue_prepare()将 q->sleeper 加到 wake_q 上去。q->sleeper 是一个 task_struct,是等待在这个信号量操作上的进程。
static int update_queue(struct sem_array *sma, int semnum, struct wake_q_head *wake_q)
{
struct sem_queue *q, *tmp;
struct list_head *pending_list;
int semop_completed = 0;
if (semnum == -1)
pending_list = &sma->pending_alter;
else
pending_list = &sma->sems[semnum].pending_alter;
again:
list_for_each_entry_safe(q, tmp, pending_list, list) {
int error, restart;
......
error = perform_atomic_semop(sma, q);
/* Does q->sleeper still need to sleep? */
if (error > 0)
continue;
unlink_queue(sma, q);
......
wake_up_sem_queue_prepare(q, error, wake_q);
......
}
return semop_completed;
}
static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error,
struct wake_q_head *wake_q)
{
wake_q_add(wake_q, q->sleeper);
......
}
接下来wake_up_q 就依次唤醒 wake_q 上的所有 task_struct,调用的是进程调度中分析过的 wake_up_process()方法。
void wake_up_q(struct wake_q_head *head)
{
struct wake_q_node *node = head->first;
while (node != WAKE_Q_TAIL) {
struct task_struct *task;
task = container_of(node, struct task_struct, wake_q);
node = node->next;
task->wake_q.next = NULL;
wake_up_process(task);
put_task_struct(task);
}
}
perform_atomic_semop() 函数对于所有信号量操作都进行两次循环。在第一次循环中,如果发现计算出的 result 小于 0,则说明必须等待,于是跳到 would_block 中,设置 q->blocking = sop 表示这个 queue 是 block 在这个操作上,然后如果需要等待,则返回 1。如果第一次循环中发现无需等待,则第二个循环实施所有的信号量操作,将信号量的值设置为新的值,并且返回 0。
static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
{
int result, sem_op, nsops;
struct sembuf *sop;
struct sem *curr;
struct sembuf *sops;
struct sem_undo *un;
sops = q->sops;
nsops = q->nsops;
un = q->undo;
for (sop = sops; sop < sops + nsops; sop++) {
curr = &sma->sems[sop->sem_num];
sem_op = sop->sem_op;
result = curr->semval;
......
result += sem_op;
if (result < 0)
goto would_block;
......
if (sop->sem_flg & SEM_UNDO) {
int undo = un->semadj[sop->sem_num] - sem_op;
.....
}
}
for (sop = sops; sop < sops + nsops; sop++) {
curr = &sma->sems[sop->sem_num];
sem_op = sop->sem_op;
result = curr->semval;
if (sop->sem_flg & SEM_UNDO) {
int undo = un->semadj[sop->sem_num] - sem_op;
un->semadj[sop->sem_num] = undo;
}
curr->semval += sem_op;
curr->sempid = q->pid;
}
return 0;
would_block:
q->blocking = sop;
return sop->sem_flg & IPC_NOWAIT ? -EAGAIN : 1;
}
SEM_ UNDO机制
信号量是整个 Linux 可见的全局资源,而不是某个进程独占的资源,好处是可以跨进程通信,坏处就是如果一个进程通过操作拿到了一个信号量,但是不幸异常退出了,如果没有来得及归还这个信号量,可能所有其他的进程都阻塞了。为此,Linux设计了SEM_UNDO机制解决该问题。
该机制简而言之就是每一个 semop 操作都会保存一个反向 struct sem_undo 操作,当因为某个进程异常退出的时候,这个进程做的所有的操作都会回退,从而保证其他进程可以正常工作。在sem_flg标记位设置SUM_UNDO即可开启该功能。
struct sem_queue {
......
struct sem_undo *undo; /* undo structure */
......
};
在进程的 task_struct 里面对于信号量有一个成员 struct sysv_sem,里面是一个 struct sem_undo_list将这个进程所有的 semop 所带来的 undo 操作都串起来。
struct task_struct {
......
struct sysv_sem sysvsem;
......
}
struct sysv_sem {
struct sem_undo_list *undo_list;
};
struct sem_undo {
struct list_head list_proc; /* per-process list: *
* all undos from one process
* rcu protected */
struct rcu_head rcu; /* rcu struct for sem_undo */
struct sem_undo_list *ulp; /* back ptr to sem_undo_list */
struct list_head list_id; /* per semaphore array list:
* all undos for one array */
int semid; /* semaphore set identifier */
short *semadj; /* array of adjustments */
/* one per semaphore */
};
struct sem_undo_list {
atomic_t refcnt;
spinlock_t lock;
struct list_head list_proc;
};
这种设计思想较为常见,在MySQL的innodb的日志系统中也有着类似的实现。
总结
共享内存和信号量是有着相似性有可以共同使用从而完成进程通信的手段。下面引用极客时间中的两幅图来总结二者的整个过程。
酷客网相关文章:
评论前必须登录!
注册