I/O多路复用学习(二)do_select分析

paw5zx Lv4

前言

前一篇笔记:I/O多路复用学习(一)select ,梳理了一下select的调用过程,这一篇笔记对其中的do_select函数的细节做一下简单的讨论。
本文中涉及的源码均出自Linux内核5.4.0版本

相关结构体

了解相关的结构体可以帮助我们更好地理解select内部的实现
相关结构体关系示意图:

结构体关系图
结构体关系图

struct poll_wqueues

poll_wqueues结构体(轮询等待队列)用于维护若干特定于文件描述符的设备驱动等待队列。但是它并不直接存储设备驱动等待队列本身;它通过inline_entries[]管理一个或多个poll_table_entry项,每个项与一个特定的文件描述符和其等待事件相关联。

1
2
3
4
5
6
7
8
9
10
//file: include/linux/poll.h
struct poll_wqueues {
poll_table pt;
struct poll_table_page *table;
struct task_struct *polling_task;
int triggered;
int error;
int inline_index;
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};

struct poll_table_page

poll_table_page结构体用于管理一组poll_table_entry 对象,可以形成链表

1
2
3
4
5
struct poll_table_page {
struct poll_table_page * next;
struct poll_table_entry * entry;
struct poll_table_entry entries[0];
};

struct poll_table_struct

poll_table_struct是 Linux 内核中用于轮询机制的一个数据结构,其成员_qproc是一个函数指针,一般会被设置为__pollwaitNULL

1
2
3
4
5
//file: include/linux/poll.h
typedef struct poll_table_struct {
poll_queue_proc _qproc;
__poll_t _key;
} poll_table;

struct poll_table_entry

poll_table_entry用于管理与特定文件描述符相关的设备驱动等待队列

1
2
3
4
5
6
7
//file: include/linux/poll.h
struct poll_table_entry {
struct file *filp;
__poll_t key;
wait_queue_entry_t wait;
wait_queue_head_t *wait_address;
};

wait_queue_head_t

等待队列头节点。这个等待队列的底层数据结构其实就是双向链表

1
2
3
4
5
6
//file: include/linux/wait.h
typedef struct wait_queue_head wait_queue_head_t;
struct wait_queue_head {
spinlock_t lock;
struct list_head head;
};

wait_queue_entry_t

等待队列的节点,每个wait_queue_entry实例代表一个在等待队列中等待特定事件的进程或线程

1
2
3
4
5
6
7
8
//file: include/linux/wait.h
typedef struct wait_queue_entry wait_queue_entry_t;
struct wait_queue_entry {
unsigned int flags;
void *private;
wait_queue_func_t func;
struct list_head entry;
};

fd_set_bits

内核中管理位图的结构体,inoutex用于表示输入,输出,异常的位图。res_inres_outres_ex用于存储输入,输出,异常事件的结果,当某个fd活跃时,结构体中对应的位就会被置1。

1
2
3
4
5
//file: fs/select.c
typedef struct {
unsigned long *in, *out, *ex;
unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;

do_select

do_select流程图
do_select流程图

代码省略了一些,比如超时标志位timed_out的置位操作等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
//file: fs/select.c
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
...
retval = max_select_fd(n, fds);

if (retval < 0)
return retval;
n = retval;

poll_initwait(&table);
wait = &table.pt;
//如果等待时间为0,则直接超时返回
//如果设置了超时时间且超时时间设置为0(非阻塞)
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait->_qproc = NULL;
//超时标志位
timed_out = 1;
}

retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;

inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
__poll_t mask;
//按组获取读写异常的位图
in = *inp++; out = *outp++; ex = *exp++;
//读写异常三个位图按位与,判断对应fd是否需要监听
//若一组fd都无需监听则直接跳过
all_bits = in | out | ex;
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}
//对于unsigned中的每个bit
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
if (i >= n)
break;
//若遍历到的bit在all_bits中对应位为0,则无感兴趣事件,直接跳过
if (!(bit & all_bits))
continue;
//掩码初始化
mask = EPOLLNVAL;
//fdget获取文件描述符对应的文件对象
//会增加fd的引用计数
f = fdget(i);
if (f.file) {
wait_key_set(wait, in, out, bit,
busy_flag);
//返回文件描述符当前状态,比如若fd可读可写,则返回EPOLLIN | EPOLLOUT
mask = vfs_poll(f.file, wait);
//减少fd的引用计数
fdput(f);
}
//检查是否有事件发生,并更新相关变量
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
...
}
//将发生的事件拷贝出去;最终传给用户态使用
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
//完成一轮循环后,CPU放权,给紧急任务让渡CPU资源(有条件重调度)
cond_resched();
}
wait->_qproc = NULL;
//如果有事件发生(`retval`不为0),超时(`timed_out`为1),或者当前进程收到挂起信号则停止循环检测
if (retval || timed_out || signal_pending(current))
break;
...
}
poll_freewait(&table);

return retval;
}

poll_initwait

poll_initwait是do_select中比较关键的一个函数,它的作用是初始化struct poll_wqueues对象的成员:

1
2
3
4
5
6
7
8
9
10
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->polling_task = current;
pwq->triggered = 0;
pwq->error = 0;
pwq->table = NULL;
pwq->inline_index = 0;
}
EXPORT_SYMBOL(poll_initwait);

其中的init_poll_funcptr函数就是将__polwait绑定到pwq->pt->_qproc

1
2
3
4
5
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->_qproc = qproc;
pt->_key = ~(__poll_t)0; /* all events enabled */
}

这就注册了__pollwait函数,他会在设备驱动调用自己的poll方法时被回调,后面会介绍。

三层for循环

三层for循环也是do_select中的关键部分,下面我们逐层分析

第一层for循环

一个无限循环,代表一整次扫描。这个循环会一直进行,直到满足以下条件之一:有事件发生(retval不为0),超时(timed_out为1),或者当前进程收到挂起信号(signal_pending(current)为真)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;
//读写异常
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
//内层循环,遍历所有fd并进行操作
...

wait->_qproc = NULL;
//如果有事件发生(`retval`不为0),超时(`timed_out`为1),或者当前进程收到挂起信号则停止循环检测
if (retval || timed_out || signal_pending(current))
break;
...
}

第二层for循环

第二层循环是以unsigned long为单位进行遍历。也就是说是以64bit(64位机)为一组,按组进行遍历。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
__poll_t mask;
//按组获取读写异常的位图
in = *inp++; out = *outp++; ex = *exp++;
//读写异常三个位图按位与,判断对应fd是否需要监听
//若一组fd都无需监听则直接跳过
all_bits = in | out | ex;
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}
//内层循环,对一组中的每个fd操作
...
//将发生的事件拷贝出去;最终传给用户态使用
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
...
}

其中以读事件为例,in大小为64bit,代表64个fd。在本层循环开始时,in被赋值为inp指向的值(64bit),然后inp向后移动sizeof(unsigned long)个字节,指向后64bit的起始位置。以此实现了以unsigned long为单位的遍历。

all_bits代表了一组fd中,每个fd在读,写,异常上是否有相应的事件需要监听,若64个fd在读,写,异常上都没有要监听的事件,则直接跳过本组。

第三层for循环

第三层循环是遍历一组中的所有fd,并对每个监听的fd做相应操作。这些操作中最关键的是vfs_poll,后面会详细介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
if (i >= n)
break;
//若遍历到的bit在all_bits中对应位为0,则无感兴趣事件,直接跳过
if (!(bit & all_bits))
continue;
//掩码初始化
mask = EPOLLNVAL;
//fdget获取文件描述符对应的文件对象
//会增加fd的引用计数
f = fdget(i);
if (f.file) {
wait_key_set(wait, in, out, bit,
busy_flag);
//返回文件描述符当前状态,比如若fd可读可写,则返回EPOLLIN | EPOLLOUT
mask = vfs_poll(f.file, wait);
//减少fd的引用计数
fdput(f);
}
//检查是否有事件发生,并更新相关变量
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
...
}

循环总结

总结一下三层循环:
第一层for循环就是进行select的一次扫描,包括

  • 扫描开始的变量初始化环节
  • 中间for循环遍历fd的环节
  • 扫描结束之前变量重置以及退出条件判断环节

第二、三层for循环本质是遍历所有fd,并对每个感兴趣的fd做相应操作。操作中最关键的一步就是vfs_poll,后面会介绍。

poll_freewait

待讨论

vfs_poll

前面的select调用链提到,vfs_poll会调用设备驱动中的poll方法:

1
2
3
4
5
6
7
//file: include/linux/poll.h
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt)
{
if (unlikely(!file->f_op->poll))
return DEFAULT_POLLMASK;
return file->f_op->poll(file, pt);
}

下面以socket中的tcp为例,继续对调用链进行分析:
vfs_poll->sock_poll->tcp_poll->sock_poll_wait->poll_wait
梳理调用链可以知道,最后调用到poll_wait

1
2
3
4
5
6
//file: include/linux/poll.h
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}

p->_qproc(filp, wait_address, p)中的p就是do_select中的变量wait,因此这一行实际执行的操作就是回调__pollwait函数。
所以vfs_poll的主要操作就是调用设备的poll方法从而回调__pollwait函数。

__pollwait

pollwait
pollwait

__pollwait用于处理轮询时的等待队列。这个函数通过创建并初始化poll_table_entry实例,将等待进程(或线程)添加到指定设备或文件的等待队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//file: fs/select.c
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
//获取poll_wqueues,也就是do_select中的table变量
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
//为设备驱动分配一个poll_table_entry实例
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
entry->filp = get_file(filp);
entry->wait_address = wait_address;
entry->key = p->_key;
//初始化等待队列项,注册pollwake
init_waitqueue_func_entry(&entry->wait, pollwake);
entry->wait.private = pwq;
//将等待队列节点挂到等待队列中
add_wait_queue(wait_address, &entry->wait);
}

其中三个关键函数:poll_get_entryinit_waitqueue_func_entryadd_wait_queue,下面逐个介绍:

poll_get_entry

poll_get_entry用于分配一个poll_table_entry对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//file: fs/select.c
static struct poll_table_entry *poll_get_entry(struct poll_wqueues *p)
{
struct poll_table_page *table = p->table;
//p->pt还有地,就从里面分配
if (p->inline_index < N_INLINE_POLL_ENTRIES)
return p->inline_entries + p->inline_index++;
//p->pt没有地了,且NULL == p->table(说明还没有分配过)或p->table满了
if (!table || POLL_TABLE_FULL(table)) {
//分配一页内存
struct poll_table_page *new_table;
new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
//分配失败
if (!new_table) {
p->error = -ENOMEM;
return NULL;
}
//将分配的页链接到p中
new_table->entry = new_table->entries;
new_table->next = table;
p->table = new_table;
table = new_table;
}
//返回一个新页的第一个entry,并将指针后移一位,方便后面的分配
return table->entry++;
}

poll_wqueues内嵌的poll_table_entry数组inline_entries[] 的大小是固定的:N_INLINE_POLL_ENTRIES,如果空间已满,就会动态申请物理内存页并以链表的形式挂在poll_wqueues对象上统一管理。

所以源码中就有对inline_entriespoll_table_entry对象个数的判断:

1
p->inline_index < N_INLINE_POLL_ENTRIES

inline_entries[]还有空间,就从里面分配,若inline_entries[]没有空间了,且NULL == p->table(说明还没有分配过内存页)或p->table满了,就新分配一页内存。

那么如何判断一个poll_table_page对象是否还有地去存储poll_table_entry对象(内存页已满)?

源码中使用了一个名为POLL_TABLE_FULL的宏定义:

1
2
3
4
#define PAGE_SHIFT		12
#define PAGE_SIZE (_AC(1,UL) << PAGE_SHIFT) //一个内存页的大小是4096字节
#define POLL_TABLE_FULL(table) \
((unsigned long)((table)->entry+1) > PAGE_SIZE + (unsigned long)(table))

这个宏就是在分配poll_table_entry对象之前,先计算当前页是否还有空间存放新的poll_table_entry对象

我自己在gcc 10.3.0下测得poll_table_entry对象大小为64字节(poll_table_entry中的keywait_queue_entry_t中的flags各有4字节对齐),而一个poll_table_page对象大小为4096字节,也就是说一个poll_table_page对象最多可以管理63个poll_table_entry对象。

POLL_TABLE_FULL为true情况的示意图:

POLL_TABLE_FULL为真
POLL_TABLE_FULL为真

init_waitqueue_func_entry

init_waitqueue_func_entry用于初始化wait_queue_entry实例的各个成员。

1
2
3
4
5
6
7
8
//file: include/linux/wait.h
static inline void
init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func)
{
wq_entry->flags = 0;
wq_entry->private = NULL;
wq_entry->func = func;
}

__pollwit中有init_waitqueue_func_entry(&entry->wait, pollwake);,这是注册了pollwake函数,它会在设备驱动中被回调。

add_wait_queue

add_wait_queue主要用于将一个 wait_queue_entry对象添加到一个 wait_queue_head 指向的等待队列中(直接添加到头节点后面)。

1
2
3
4
5
6
7
8
9
10
11
//file: kernel/sched/wait.c
void add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
unsigned long flags;

wq_entry->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&wq_head->lock, flags);
__add_wait_queue(wq_head, wq_entry);
spin_unlock_irqrestore(&wq_head->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue);

查看源码可知其调用链为:add_wait_queue->__add_wait_queue->list_add->__list_add
list_add__list_add源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//file: include/linux/list.h
static inline void list_add(struct list_head *new, struct list_head *head)
{
__list_add(new, head, head->next);
}
static inline void __list_add(struct list_head *new,
struct list_head *prev,
struct list_head *next)
{
if (!__list_add_valid(new, prev, next))
return;

next->prev = new;
new->next = next;
new->prev = prev;
WRITE_ONCE(prev->next, new);
}

可见底层是通过将wq_entry->entry链接到wq_head->head后面,从而实现的将一个 wait_queue_entry对象添加到一个 wait_queue_head指向的等待队列中。

pollwake

pollwake
pollwake

回调过程

前面提到pollwake会在设备驱动中被回调,下面简单梳理一下在设备驱动中的调用链,还是以socket中的tcp为例,当有数据包来临时函数的调用链为:
tcp_data_queue->sock_def_readable->wake_up_interruptible_sync_poll->__wake_up_sync_key->__wake_up_common_lock->__wake_up_common

__wake_up_common中有ret = curr->func(curr, mode, wake_flags, key);,这里的curr->func就是之前注册好的pollwake。所以在数据来临时,pollwake会被回调,以唤醒当前进程。

pollwake调用链

pollwake->__pollwake->default_wake_function->try_to_wake_up。最终调用try_to_wake_up唤醒当前进程。

总结

本文讨论了do_select函数调用链上的一些实现细节,梳理了相关结构体之间的关系。有些地方可能解释的不是很详细,后面持续更新。

参考文章

1.select 机制 - 访问方式(三)

  • 标题: I/O多路复用学习(二)do_select分析
  • 作者: paw5zx
  • 创建于 : 2024-03-08 14:13:35
  • 更新于 : 2024-08-15 01:58:40
  • 链接: https://paw5zx.github.io/IO-Multiplexing-do-select/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论