前言
前一篇笔记:I/O多路复用学习(一)select ,梳理了一下select的调用过程,这一篇笔记对其中的do_select函数的细节做一下简单的讨论。
本文中涉及的源码均出自Linux内核5.4.0版本
相关结构体
了解相关的结构体可以帮助我们更好地理解select内部的实现
相关结构体关系示意图:
struct poll_wqueues
poll_wqueues
结构体(轮询等待队列)用于维护若干特定于文件描述符的设备驱动等待队列。但是它并不直接存储设备驱动等待队列本身;它通过inline_entries[]
管理一个或多个poll_table_entry
项,每个项与一个特定的文件描述符和其等待事件相关联。
1 2 3 4 5 6 7 8 9 10
| struct poll_wqueues { poll_table pt; struct poll_table_page *table; struct task_struct *polling_task; int triggered; int error; int inline_index; struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; };
|
struct poll_table_page
poll_table_page
结构体用于管理一组poll_table_entry
对象,可以形成链表
1 2 3 4 5
| struct poll_table_page { struct poll_table_page * next; struct poll_table_entry * entry; struct poll_table_entry entries[0]; };
|
struct poll_table_struct
poll_table_struct
是 Linux 内核中用于轮询机制的一个数据结构,其成员_qproc
是一个函数指针,一般会被设置为__pollwait
或NULL
1 2 3 4 5
| typedef struct poll_table_struct { poll_queue_proc _qproc; __poll_t _key; } poll_table;
|
struct poll_table_entry
poll_table_entry
用于管理与特定文件描述符相关的设备驱动等待队列
1 2 3 4 5 6 7
| struct poll_table_entry { struct file *filp; __poll_t key; wait_queue_entry_t wait; wait_queue_head_t *wait_address; };
|
wait_queue_head_t
等待队列头节点。这个等待队列的底层数据结构其实就是双向链表
1 2 3 4 5 6
| typedef struct wait_queue_head wait_queue_head_t; struct wait_queue_head { spinlock_t lock; struct list_head head; };
|
wait_queue_entry_t
等待队列的节点,每个wait_queue_entry
实例代表一个在等待队列中等待特定事件的进程或线程
1 2 3 4 5 6 7 8
| typedef struct wait_queue_entry wait_queue_entry_t; struct wait_queue_entry { unsigned int flags; void *private; wait_queue_func_t func; struct list_head entry; };
|
fd_set_bits
内核中管理位图的结构体,in
,out
,ex
用于表示输入,输出,异常的位图。res_in
,res_out
,res_ex
用于存储输入,输出,异常事件的结果,当某个fd活跃时,结构体中对应的位就会被置1。
1 2 3 4 5
| typedef struct { unsigned long *in, *out, *ex; unsigned long *res_in, *res_out, *res_ex; } fd_set_bits;
|
do_select
代码省略了一些,比如超时标志位timed_out
的置位操作等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time) { struct poll_wqueues table; poll_table *wait; int retval, i, timed_out = 0; ... retval = max_select_fd(n, fds);
if (retval < 0) return retval; n = retval;
poll_initwait(&table); wait = &table.pt; if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { wait->_qproc = NULL; timed_out = 1; }
retval = 0; for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; bool can_busy_loop = false;
inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) { unsigned long in, out, ex, all_bits, bit = 1, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; __poll_t mask; in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; if (all_bits == 0) { i += BITS_PER_LONG; continue; } for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { struct fd f; if (i >= n) break; if (!(bit & all_bits)) continue; mask = EPOLLNVAL; f = fdget(i); if (f.file) { wait_key_set(wait, in, out, bit, busy_flag); mask = vfs_poll(f.file, wait); fdput(f); } if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait->_qproc = NULL; } ... } if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait->_qproc = NULL; if (retval || timed_out || signal_pending(current)) break; ... } poll_freewait(&table);
return retval; }
|
poll_initwait
poll_initwait
是do_select中比较关键的一个函数,它的作用是初始化struct poll_wqueues
对象的成员:
1 2 3 4 5 6 7 8 9 10
| void poll_initwait(struct poll_wqueues *pwq) { init_poll_funcptr(&pwq->pt, __pollwait); pwq->polling_task = current; pwq->triggered = 0; pwq->error = 0; pwq->table = NULL; pwq->inline_index = 0; } EXPORT_SYMBOL(poll_initwait);
|
其中的init_poll_funcptr
函数就是将__polwait
绑定到pwq->pt->_qproc
上
1 2 3 4 5
| static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc) { pt->_qproc = qproc; pt->_key = ~(__poll_t)0; }
|
这就注册了__pollwait
函数,他会在设备驱动调用自己的poll方法时被回调,后面会介绍。
三层for循环
三层for循环也是do_select中的关键部分,下面我们逐层分析
第一层for循环
一个无限循环,代表一整次扫描。这个循环会一直进行,直到满足以下条件之一:有事件发生(retval
不为0),超时(timed_out
为1),或者当前进程收到挂起信号(signal_pending(current)
为真)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; bool can_busy_loop = false; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; ... wait->_qproc = NULL; if (retval || timed_out || signal_pending(current)) break; ... }
|
第二层for循环
第二层循环是以unsigned long为单位进行遍历。也就是说是以64bit(64位机)为一组,按组进行遍历。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| for (i = 0; i < n; ++rinp, ++routp, ++rexp) { unsigned long in, out, ex, all_bits, bit = 1, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; __poll_t mask; in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; if (all_bits == 0) { i += BITS_PER_LONG; continue; } ... if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; ... }
|
其中以读事件为例,in
大小为64bit,代表64个fd。在本层循环开始时,in
被赋值为inp
指向的值(64bit),然后inp
向后移动sizeof(unsigned long)
个字节,指向后64bit的起始位置。以此实现了以unsigned long为单位的遍历。
all_bits
代表了一组fd中,每个fd在读,写,异常上是否有相应的事件需要监听,若64个fd在读,写,异常上都没有要监听的事件,则直接跳过本组。
第三层for循环
第三层循环是遍历一组中的所有fd,并对每个监听的fd做相应操作。这些操作中最关键的是vfs_poll
,后面会详细介绍。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { struct fd f; if (i >= n) break; if (!(bit & all_bits)) continue; mask = EPOLLNVAL; f = fdget(i); if (f.file) { wait_key_set(wait, in, out, bit, busy_flag); mask = vfs_poll(f.file, wait); fdput(f); } if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait->_qproc = NULL; } ... }
|
循环总结
总结一下三层循环:
第一层for循环就是进行select的一次扫描,包括
- 扫描开始的变量初始化环节
- 中间for循环遍历fd的环节
- 扫描结束之前变量重置以及退出条件判断环节
第二、三层for循环本质是遍历所有fd,并对每个感兴趣的fd做相应操作。操作中最关键的一步就是vfs_poll
,后面会介绍。
poll_freewait
待讨论
vfs_poll
前面的select调用链提到,vfs_poll
会调用设备驱动中的poll方法:
1 2 3 4 5 6 7
| static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) { if (unlikely(!file->f_op->poll)) return DEFAULT_POLLMASK; return file->f_op->poll(file, pt); }
|
下面以socket中的tcp为例,继续对调用链进行分析:
vfs_poll
->sock_poll
->tcp_poll
->sock_poll_wait
->poll_wait
。
梳理调用链可以知道,最后调用到poll_wait
:
1 2 3 4 5 6
| static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); }
|
p->_qproc(filp, wait_address, p)
中的p就是do_select
中的变量wait
,因此这一行实际执行的操作就是回调__pollwait
函数。
所以vfs_poll
的主要操作就是调用设备的poll方法从而回调__pollwait
函数。
__pollwait
__pollwait
用于处理轮询时的等待队列。这个函数通过创建并初始化poll_table_entry
实例,将等待进程(或线程)添加到指定设备或文件的等待队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake); entry->wait.private = pwq; add_wait_queue(wait_address, &entry->wait); }
|
其中三个关键函数:poll_get_entry
,init_waitqueue_func_entry
,add_wait_queue
,下面逐个介绍:
poll_get_entry
poll_get_entry
用于分配一个poll_table_entry对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| static struct poll_table_entry *poll_get_entry(struct poll_wqueues *p) { struct poll_table_page *table = p->table; if (p->inline_index < N_INLINE_POLL_ENTRIES) return p->inline_entries + p->inline_index++; if (!table || POLL_TABLE_FULL(table)) { struct poll_table_page *new_table; new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL); if (!new_table) { p->error = -ENOMEM; return NULL; } new_table->entry = new_table->entries; new_table->next = table; p->table = new_table; table = new_table; } return table->entry++; }
|
poll_wqueues
内嵌的poll_table_entry
数组inline_entries[]
的大小是固定的:N_INLINE_POLL_ENTRIES
,如果空间已满,就会动态申请物理内存页并以链表的形式挂在poll_wqueues
对象上统一管理。
所以源码中就有对inline_entries
中poll_table_entry
对象个数的判断:
1
| p->inline_index < N_INLINE_POLL_ENTRIES
|
若inline_entries[]
还有空间,就从里面分配,若inline_entries[]
没有空间了,且NULL == p->table
(说明还没有分配过内存页)或p->table
满了,就新分配一页内存。
那么如何判断一个poll_table_page
对象是否还有地去存储poll_table_entry
对象(内存页已满)?
源码中使用了一个名为POLL_TABLE_FULL
的宏定义:
1 2 3 4
| #define PAGE_SHIFT 12 #define PAGE_SIZE (_AC(1,UL) << PAGE_SHIFT) #define POLL_TABLE_FULL(table) \ ((unsigned long)((table)->entry+1) > PAGE_SIZE + (unsigned long)(table))
|
这个宏就是在分配poll_table_entry
对象之前,先计算当前页是否还有空间存放新的poll_table_entry
对象
我自己在gcc 10.3.0
下测得poll_table_entry
对象大小为64字节(poll_table_entry
中的key
和wait_queue_entry_t
中的flags
各有4字节对齐),而一个poll_table_page
对象大小为4096字节,也就是说一个poll_table_page
对象最多可以管理63个poll_table_entry
对象。
POLL_TABLE_FULL
为true情况的示意图:
init_waitqueue_func_entry
init_waitqueue_func_entry
用于初始化wait_queue_entry
实例的各个成员。
1 2 3 4 5 6 7 8
| static inline void init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func) { wq_entry->flags = 0; wq_entry->private = NULL; wq_entry->func = func; }
|
在__pollwit
中有init_waitqueue_func_entry(&entry->wait, pollwake);
,这是注册了pollwake
函数,它会在设备驱动中被回调。
add_wait_queue
add_wait_queue
主要用于将一个 wait_queue_entry
对象添加到一个 wait_queue_head
指向的等待队列中(直接添加到头节点后面)。
1 2 3 4 5 6 7 8 9 10 11
| void add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry) { unsigned long flags;
wq_entry->flags &= ~WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&wq_head->lock, flags); __add_wait_queue(wq_head, wq_entry); spin_unlock_irqrestore(&wq_head->lock, flags); } EXPORT_SYMBOL(add_wait_queue);
|
查看源码可知其调用链为:add_wait_queue
->__add_wait_queue
->list_add
->__list_add
list_add
和__list_add
源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| static inline void list_add(struct list_head *new, struct list_head *head) { __list_add(new, head, head->next); } static inline void __list_add(struct list_head *new, struct list_head *prev, struct list_head *next) { if (!__list_add_valid(new, prev, next)) return;
next->prev = new; new->next = next; new->prev = prev; WRITE_ONCE(prev->next, new); }
|
可见底层是通过将wq_entry->entry
链接到wq_head->head
后面,从而实现的将一个 wait_queue_entry
对象添加到一个 wait_queue_head
指向的等待队列中。
pollwake
回调过程
前面提到pollwake
会在设备驱动中被回调,下面简单梳理一下在设备驱动中的调用链,还是以socket中的tcp为例,当有数据包来临时函数的调用链为:
tcp_data_queue
->sock_def_readable
->wake_up_interruptible_sync_poll
->__wake_up_sync_key
->__wake_up_common_lock
->__wake_up_common
在__wake_up_common
中有ret = curr->func(curr, mode, wake_flags, key);
,这里的curr->func
就是之前注册好的pollwake。所以在数据来临时,pollwake会被回调,以唤醒当前进程。
pollwake调用链
pollwake
->__pollwake
->default_wake_function
->try_to_wake_up
。最终调用try_to_wake_up
唤醒当前进程。
总结
本文讨论了do_select函数调用链上的一些实现细节,梳理了相关结构体之间的关系。有些地方可能解释的不是很详细,后面持续更新。
参考文章
1.select 机制 - 访问方式(三)