深入理解Linux网络(五):TCP接收唤醒
TCP接收唤醒由软中断提供服务。
软中断(也就是 Linux ⾥的 ksoftirqd 进程)⾥收到数据包以后,发现是 tcp 的包的话就会执⾏到 tcp_v4_rcv 函数。接着如果是 ESTABLISH 状态下的数据包,则最终会把数据拆出来放到对应 socket 的接收队列中。然后调⽤ sk_data_ready 来唤醒⽤户进程。
// file: net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{......th = tcp_hdr(skb); //获取tcp headeriph = ip_hdr(skb); //获取ip header//根据数据包 header 中的 ip、端⼝信息查找到对应的socketsk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);......//socket 未被⽤户锁定if (!sock_owned_by_user(sk)) {{if (!tcp_prequeue(sk, skb))ret = tcp_v4_do_rcv(sk, skb);}}
}
在 tcp_v4_rcv 中⾸先根据收到的⽹络包的 header ⾥的 source 和 dest 信息来在本机上查询对应的 socket。找到以后,我们直接进⼊接收的主体函数 tcp_v4_do_rcv 来看。
//file: net/ipv4/tcp_ipv4.c
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{if (sk->sk_state == TCP_ESTABLISHED) {//执⾏连接状态下的数据处理if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) {rsk = sk;goto reset;}return 0;}//其它⾮ ESTABLISH 状态的数据包处理......
}
我们假设处理的是 ESTABLISH 状态下的包,这样就⼜进⼊ tcp_rcv_established 函数中进⾏处理。
//file: net/ipv4/tcp_input.c
int tcp_rcv_established(struct sock *sk, struct sk_buff *skb,const struct tcphdr *th, unsigned int len)
{......//接收数据到队列中eaten = tcp_queue_rcv(sk, skb, tcp_header_len,&fragstolen);//数据 ready,唤醒 socket 上阻塞掉的进程sk->sk_data_ready(sk, 0);
在 tcp_rcv_established 中通过调⽤ tcp_queue_rcv 函数中完成了将接收数据放到 socket 的接收队列上。
//file: net/ipv4/tcp_input.c
static int __must_check tcp_queue_rcv(struct sock *sk, structsk_buff *skb, int hdrlen, bool *fragstolen)
{//把接收到的数据放到 socket 的接收队列的尾部if (!eaten) {__skb_queue_tail(&sk->sk_receive_queue, skb);skb_set_owner_r(skb, sk);}return eaten;
}
调⽤ tcp_queue_rcv 接收完成之后,接着再调⽤ sk_data_ready 来唤醒在socket上等待的⽤户进程。
这⼜是⼀个函数指针。 回想上⾯我们在 创建 socket 流程⾥执⾏到的 sock_init_data 函数,在这个函数⾥已经把 sk_data_ready 设置成 sock_def_readable 函数了。它是默认的数据就绪处理函数。
//file: net/core/sock.c
static void sock_def_readable(struct sock *sk, int len)
{struct socket_wq *wq;rcu_read_lock();wq = rcu_dereference(sk->sk_wq);//有进程在此 socket 的等待队列if (wq_has_sleeper(wq))//唤醒等待队列上的进程wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI| POLLRDNORM | POLLRDBAND);sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);rcu_read_unlock();
}
在 sock_def_readable 中再⼀次访问到了 sock->sk_wq 下的wait。回忆下我们前⾯调⽤ recvfrom 执⾏的最后,通过 DEFINE_WAIT(wait) 将当前进程关联的等待队列添加到 sock->sk_wq 下的 wait ⾥了。
那接下来就是调⽤ wake_up_interruptible_sync_poll 来唤醒在 socket 上因为等待数据⽽被阻塞掉的进程了。
//file: include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m) \__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))//file: kernel/sched/core.c
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key)
{unsigned long flags;int wake_flags = WF_SYNC;if (unlikely(!q))return;if (unlikely(!nr_exclusive))wake_flags = 0;spin_lock_irqsave(&q->lock, flags);__wake_up_common(q, mode, nr_exclusive, wake_flags, key);spin_unlock_irqrestore(&q->lock, flags);
}
__wake_up_common 实现唤醒。这⾥注意下, 该函数调⽤是参数 nr_exclusive 传⼊的是 1,这⾥指的是即使是有多个进程都阻塞在同⼀个 socket 上,也只唤醒 1 个进程。其作⽤是为了避免惊群。
//file: kernel/sched/core.c
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,int nr_exclusive, int wake_flags, void *key)
{wait_queue_t *curr, *next;list_for_each_entry_safe(curr, next, &q->task_list, task_list) {unsigned flags = curr->flags;if (curr->func(curr, mode, wake_flags, key) &&(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)break;}
}
在 __wake_up_common 中找出⼀个等待队列项 curr,然后调⽤其 curr->func。在 recv 函数执⾏的时候,使⽤ DEFINE_WAIT() 定义等待队列项的细节,内核把 curr->func 设置成了 autoremove_wake_function。
//file: include/linux/wait.h
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
#define DEFINE_WAIT_FUNC(name, function) \wait_queue_t name = { \.private = current, \.func = function, \.task_list = LIST_HEAD_INIT((name).task_list), \}
autoremove_wake_function 又调⽤了 default_wake_function。
//file: kernel/sched/core.c
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags, void *key)
{return try_to_wake_up(curr->private, mode, wake_flags);
}
调⽤ try_to_wake_up 时传⼊的 task_struct 是 curr->private。这个就是当时因为等待⽽被阻塞的进程项。 当这个函数执⾏完的时候,在 socket 上等待⽽被阻塞的进程就被推⼊到可运⾏队列⾥了,这⼜将是⼀次进程上下⽂切换的开销。