IO多路复用之select结合源码剖析


   select ,同步IO多路复用。select调用后,会调用内核中的sys_select的系统调用方法中。而sys_select的方法,定义于 /fs/select.c文件内。

asmlinkage long
sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp)
{
	fd_set_bits fds;
	char *bits;
	long timeout;
	int ret, size, max_fdset;

	timeout = MAX_SCHEDULE_TIMEOUT;
	if (tvp) {
		time_t sec, usec;
		//.............
	}

	ret = -EINVAL;
	if (n < 0)
		goto out_nofds;

	/* max_fdset can increase, so grab it once to avoid race */
	max_fdset = current->files->max_fdset;
	if (n > max_fdset)
		n = max_fdset;

	/*
	 * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
	 * since we used fdset we need to allocate memory in units of
	 * long-words. 
	 */
	ret = -ENOMEM;
	size = FDS_BYTES(n);
	bits = select_bits_alloc(size);
	if (!bits)
		goto out_nofds;
	fds.in      = (unsigned long *)  bits;
	fds.out     = (unsigned long *) (bits +   size);
	fds.ex      = (unsigned long *) (bits + 2*size);
	fds.res_in  = (unsigned long *) (bits + 3*size);
	fds.res_out = (unsigned long *) (bits + 4*size);
	fds.res_ex  = (unsigned long *) (bits + 5*size);

	if ((ret = get_fd_set(n, inp, fds.in)) ||
	    (ret = get_fd_set(n, outp, fds.out)) ||
	    (ret = get_fd_set(n, exp, fds.ex)))
		goto out;

	zero_fd_set(n, fds.res_in);
	zero_fd_set(n, fds.res_out);
	zero_fd_set(n, fds.res_ex);

	ret = do_select(n, &fds, &timeout);
        
       //........

out:
	select_bits_free(bits, size);
out_nofds:
	return ret;
}

我们首先分析,sys_select的参数类型。

fd_set参数的实质类型:从下面的内容可以看出,fd_set是一个long类型的数组,数组的长度是1024位/long的位数,数组元素的每一位用来与文件描述符,管道等建立联系。

 // include/linux/types.h
 typedef __kernel_fd_set		fd_set;

 // include/linux/posix_types.h
#undef __NFDBITS
#define __NFDBITS	(8 * sizeof(unsigned long))

#undef __FD_SETSIZE
#define __FD_SETSIZE	1024

#undef __FDSET_LONGS
#define __FDSET_LONGS	(__FD_SETSIZE/__NFDBITS)

typedef struct {
	unsigned long fds_bits [__FDSET_LONGS];
} __kernel_fd_set;
timeval类型,timeval用来设置超时时间:
// include/linux/time.h

struct timeval {
	time_t		tv_sec;		/* seconds  秒*/
	suseconds_t	tv_usec;	/* microseconds  微秒*/
};

sys_select函数分析

    1.定义一个类型为fd_set_bits的fds变量

    2.处理超时参数tvp,如果tvp设置了值,则从tvp变量中取出超时的时间,转换成时钟数。如果没有设置值,直接进入步骤3

    3.获取进程打开的文件描述符的当前最大数目,存入局部变量max_fdset,用这个值对比传来的参数n,参数n表示监测的最大描述符+1.如果n大于max_fdset,则设置n的值为max_fdset。

    4.调用宏FDS_BYTES计算参数n需要的long类型的个数。定义指针bits指向将要表示的位空间,调用select_bits_alloc()方法,分配内存,方法内部会分配6*size个单位的空间,因为我们需要6个位图去表示in/out/ex/以及等待内核传递过来的res_in/res_out/res_ex

    5.分别设置变量fds的成员指向bits内的空间。

    6.调用get_fd_set()方法,将用户传递过来的inp,oup,exp的值写入到变量fds的in/out/ex中。如果get_fd_set()有一个返回失败,则跳到步骤12.

    7.分别调用zero_fd_set()方法,设置fds中的输出参数res_in/res_out/res_ex的值为0

    8.调用do_select()方法,去监听检查文件设备的状态,do_select会修改输出参数fds,fds既是输入参数,也是输出参数。

    9.检查do_select()方法的返回值ret,如果小于0,则进入步骤12。等于0,则进入步骤10,大于0,则进入步骤11.

    10.ret值为0,没有就绪的IO设备,设置ret的值为-ERESTARTNOHAND,调用signal_pending()方法检查进程是否有信号处理,如果是,跳入步骤12.如果不是,则说明是超时时间到了,此时修改ret的值为0,然后进入步骤12.

    11.有就绪的IO设备,则分别调用set_fd_set()方法,从fds中读取res_in/res_out/res_ex,写入到参数inp/oup/exp中,inp,oup,exp既是输入参数,也是输出参数。如果函数有一个执行失败,则设置ret的值为-EFAULT。

    12.释放申请的bits空间。

    13.返回ret值。

这样的话,我们的sys_select()方法分析就到这里。接下来,我们分析do_select函数到底做了啥。依然,先从参数分析入手.

分析参数fds,类型是结构体的指针,fd_set_bits结构体定义:

include/poll.h

/*
 * Scalable version of the fd_set. fd_set的可扩展版本
 */

typedef struct {
	unsigned long *in, *out, *ex;
	unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;

include/poll.h

/*
 * How many longwords for "nr" bits?
 */
#define FDS_BITPERLONG	(8*sizeof(long))
#define FDS_LONGS(nr)	(((nr)+FDS_BITPERLONG-1)/FDS_BITPERLONG)
#define FDS_BYTES(nr)	(FDS_LONGS(nr)*sizeof(long))



注释中说明是fd_set的可扩展版本。我们可以看到,结构体中,包含了指向in,out,ex等的long类型指针,我们也可以看成是指向long类型的数组。如果这么理解的话,就和fd_set的类型类似,说不定这些指针,就是指向了fd_set,这个我们暂时先这么假设。接下来看函数实现

int do_select(int n, fd_set_bits *fds, long *timeout)
{
	//........
	for (;;) {
		unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;

		set_current_state(TASK_INTERRUPTIBLE);

		inp = fds->in; outp = fds->out; exp = fds->ex;
		rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
			unsigned long in, out, ex, all_bits, bit = 1, mask, j;
			unsigned long res_in = 0, res_out = 0, res_ex = 0;
			struct file_operations *f_op = NULL;
			struct file *file = NULL;

			in = *inp++; out = *outp++; ex = *exp++;
			all_bits = in | out | ex;
			if (all_bits == 0) {
				i += __NFDBITS;
				continue;
			}

			for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
				if (i >= n)
					break;
				if (!(bit & all_bits))
					continue;
				file = fget(i);
				if (file) {
					f_op = file->f_op;
					mask = DEFAULT_POLLMASK;
					if (f_op && f_op->poll)
						mask = (*f_op->poll)(file, retval ? NULL : wait);
					fput(file);
					if ((mask & POLLIN_SET) && (in & bit)) {
						res_in |= bit;
						retval++;
					}
					if ((mask & POLLOUT_SET) && (out & bit)) {
						res_out |= bit;
						retval++;
					}
					if ((mask & POLLEX_SET) && (ex & bit)) {
						res_ex |= bit;
						retval++;
					}
				}
			}
			if (res_in)
				*rinp = res_in;
			if (res_out)
				*routp = res_out;
			if (res_ex)
				*rexp = res_ex;
		}
		wait = NULL;
		if (retval || !__timeout || signal_pending(current))
			break;
		if(table.error) {
			retval = table.error;
			break;
		}
		__timeout = schedule_timeout(__timeout);
	}
	__set_current_state(TASK_RUNNING);

	poll_freewait(&table);

	/*
	 * Up-to-date the caller timeout.
	 */
	*timeout = __timeout;
	return retval;
}

    在函数实现体内,首先是个大的循环。在循环内部,先设置当前进程的状态为TASK_INTERRUPTIBLE,此时进程是否会继续运行,取决于是否有其他的进程调度,抢得cpu。如果没有,则继续执行下面的循环步骤。否则,就等进行被唤醒时,再执行后面的检测设备文件状态并记录,退出循环等操作。

(*f_op->poll)(file, retval ? NULL : wait)方法,会获取fd描述符对应的文件的状态,比如读写/异常等。

    do_select退出循环存在以下三种情况:

        1.设备的状态同调用者感兴趣的IO事件匹配,则记录下来,并且退出循环体,将描述符个数和修改后的fd_set返回给上层函数

        2.当前进程有信号处理,也会退出do_select的循环。

        3.超时时间已到    

当第一遍检测IO是否就绪的循环结束后,如果没有就绪的IO事件,或者没有信号处理,那么,进程会进入睡眠状态。然后等待fd设备唤醒进程或者超时定时器来唤醒。

    以上就是do_select的大致功能。


需要了解的知识点:

 这里涉及进程状态转换的内容,进程的超时内容。

    TASK_INTERRUPTIBLE(可中断的睡眠状态)  -------------唤醒后------》TASK_RUNNING(就绪)状态,处于这个状态的进程,在下次进程调度的时候,就会被执行

       函数schedule_timeout,调度进程的超时时间,注意,在调用该方法前,进程状态必须设置成TASK_INTERRUPTABLE或TASK_UNINTERRUPTABLE否则进程不会睡眠。由于schedule_timeout()函数需要调用schedule()调度函数,所以调用它的代码必须处于进程上下文中,并且不能持有锁。下面我们看看函数的实现源码

    /kernel/timer.c

fastcall signed long __sched schedule_timeout(signed long timeout)
{
	struct timer_list timer;
	unsigned long expire;

	switch (timeout)
	{
	case MAX_SCHEDULE_TIMEOUT:
		schedule();
		goto out;
	default:
		if (timeout < 0)
		{
			printk(KERN_ERR "schedule_timeout: wrong timeout "
			       "value %lx from %p\n", timeout,
			       __builtin_return_address(0));
			current->state = TASK_RUNNING;
			goto out;
		}
	}

	expire = timeout + jiffies;

	init_timer(&timer);
	timer.expires = expire;
	timer.data = (unsigned long) current;
	timer.function = process_timeout;
	add_timer(&timer);
	schedule();
	del_singleshot_timer_sync(&timer);
	timeout = expire - jiffies;
 out:
	return timeout < 0 ? 0 : timeout;
}

EXPORT_SYMBOL(schedule_timeout);

    函数定义了一个类型为timer_list的软件时钟变量timer,如果timeout的值是MAX_SCHEDULE_TIMEOUT,则不设置定时器,直接调用schedule(),此时进程睡眠,交出CPU控制权。反之,设置时间到期后的处理函数process_timeout,以及过期时间expire,调用add_timer函数将变量timer加入到内核管理中。

    之后调用schedule()函数,此时进程睡眠,交出执行权,内核调用其他进程运行。但内核在每一个时钟中断处理结束后都会检测这个软件时钟变量是否到期。如果到期,将调用process_timeout函数,参数为睡眠的进程的进程描述符。函数process_timeout调用wake_up_process函数将进程唤醒。当内核重新调用该进程执行时,该进程继续执行schedule_timeout函数,执行流则从schedule函数中返回。

    之后调用del_singleshot_timer_sync函数,卸载软件时钟timer。之后执行的 timeout = expire - jiffies; 主要是迎来判断进程被唤醒的原因。因为有可能在timer没到期时,其他进程唤醒了该进程,这种情况下,timeout = expire - jiffies算出的timeout是> 0 的,我们直接返回此时的timeout值。0意味着进程是因为超时而被唤醒,一个正数意味着进程在时间还没超时的情况下被唤醒。(以上说法,来源于:http://blog.sina.com.cn/s/blog_aed82f6f0101ap8m.html






评论

发表评论