Fig. Steps of Linux kernel receiving data process and the corresponding chapters in this post
本文尝试从技术研发与工程实践(而非纯理论学习)角度,在原理与实现、监控告警、 配置调优三方面介绍内核5.10 网络栈。由于内容非常多,因此分为了几篇系列文章。
从比较高的层次看,一个数据包从被网卡接收到进入 socket 的整个过程如下:
Fig. Steps of Linux kernel receiving data process and the corresponding chapters in this post
poll()
方法;ksoftirqd
;poll()
从 ring buffer 收包,并以 skb
的形式送至更上层处理;本文分 9 个章节来介绍图中内核接收及处理数据的全过程。
网卡成功完成初始化之后,数据包通过网线到达网卡时,才能被正确地收起来送到更上层去处理。 因此,我们的工作必须从网卡初始化开始,更准确地说,就是网卡驱动模块的初始化。 这一过程虽然是厂商相关的,但是非常重要;弄清楚了网卡驱动的工作原理, 才能对后面的部分有更加清晰的认识和理解。
网卡驱动一般都是作为独立的内核模块(kernel module)维护的,本文 将拿 mlx5_core
驱动作为例子,它是如今常见的 Mellanox ConnectX-4/ConnectX-5 25Gbps/40Gbps
以太网卡的驱动。
目前主流的网卡驱动都是以太网驱动,例如最常见的 Intel 系列:
i
是 intel
,gb
表示(每秒 1)Gb
x
是罗马数字 10,所以 xgb
表示 10Gb
,e
表示以太网intel
40Gbps 以太网mlx5_core
这个驱动有点特殊,它支持以太网驱动,但由于历史原因,它的实现与普通以太网驱动有很大不同: Mellanox 是做高性能传输起家的(2019 年被 NVIDIA 收购),早期产品是 InfiniBand, 这是一个平行于以太网的二层传输和互联方案:
Fig. Different L2 protocols for interconnecting in the industry (with Ethernet as the dominant one)
Infiniband 在高性能计算、RDMA 网络中应用广泛,但毕竟市场还是太小了,所以 后来 Mellanox 又对它的网卡添加了以太网支持。表现在驱动代码上,就是会看到它有一些 特定的术语、变量和函数命名、模块组织等等,读起来比 ixgbe 这样原生的以太网驱动要累一些。 这里列一些,方便后面看代码:
接下来看下一个具体 Mellanox 网卡的硬件相关信息:
$ lspci -vvv | grep Mellanox -A 50
d8:00.0 Ethernet controller: Mellanox Technologies MT27710 Family [ConnectX-4 Lx]
Subsystem: Mellanox ... ConnectX-4 Lx EN, 25GbE dual-port SFP28, PCIe3.0 x8, MCX4121A-ACAT
Interrupt: pin A routed to IRQ 114
Capabilities: [9c] MSI-X: Enable+ Count=64 Masked-
Vector table: BAR=0 offset=00002000
PBA: BAR=0 offset=00003000
Capabilities: [180 v1] Single Root I/O Virtualization (SR-IOV)
...
Initial VFs: 8, Total VFs: 8, Number of VFs: 0, Function Dependency Link: 00
...
Kernel driver in use: mlx5_core
Kernel modules: mlx5_core
其中的一些关键信息:
ConnectX-4
;25Gbps
以太网卡;双接口,也就是在系统中能看到 eth0
和 eth1
两个网卡;PCIe3.0 x8
;IRQ 114
, MSI-X
;mlx5_core
,对应的内核内核模块是 mlx5_core
。可以看到现在用的驱动叫 mlx5_core
,接下来就来看它的实现,代码位于 drivers/net/ethernet/mellanox/mlx5/core
。
module_init() -> init() -> pci/mlx5e init
首先看内核启动时,驱动的注册和初始化过程。非常简单直接, module_init()
注册一个初始化函数,内核加载驱动执行:
// https://github.com/torvalds/linux/blob/v5.10/drivers/net/ethernet/mellanox/mlx5/core/main.c
static int __init init(void) {
mlx5_register_debugfs(); // /sys/kernel/debug
pci_register_driver(&mlx5_core_driver); // 初始化 PCI 相关的东西
mlx5e_init(); // 初始化 ethernet 相关东西
}
module_init(init);
初始化的大部分工作在 pci_register_driver()
和 mlx5e_init()
里面完成。
网卡通过 PCIe 连接到 CPU,简单的场景就是 NIC 直连 CPU 上自带的 PCIe, 复杂的场景(比如 8 卡 GPU 主机)还会引入额外的 PCIe Switch 芯片,因为机器上设备太多, PCIe 接口不够用了。更多信息可参考:
典型 8 卡 A100 主机硬件拓扑
pci_register_driver()
前面 lspci
可以看到这个网卡是 PCI express 设备, 这种设备设备通过 PCI Configuration Space 识别。当设备驱动编译时,MODULE_DEVICE_TABLE
宏会导出一个 global 的 PCI 设备 ID 列表, 驱动据此识别它可以控制哪些设备,这样内核就能对各设备加载正确的驱动:
// https://github.com/torvalds/linux/blob/v5.10/include/linux/module.h
#ifdef MODULE
/* Creates an alias so file2alias.c can find device table. */
#define MODULE_DEVICE_TABLE(type, name) \
extern typeof(name) __mod_##type##__##name##_device_table __attribute__ ((unused, alias(__stringify(name))))
#else /* !MODULE */
#define MODULE_DEVICE_TABLE(type, name)
#endif
mlx5_core
驱动的设备表和 PCI 设备 ID:
// https://github.com/torvalds/linux/blob/v5.10/drivers/net/ethernet/mellanox/mlx5/core/main.c
static const struct pci_device_id mlx5_core_pci_table[] = {
{ PCI_VDEVICE(MELLANOX, PCI_DEVICE_ID_MELLANOX_CONNECTIB) },
{ PCI_VDEVICE(MELLANOX, 0x1012), MLX5_PCI_DEV_IS_VF}, /* Connect-IB VF */
{ PCI_VDEVICE(MELLANOX, PCI_DEVICE_ID_MELLANOX_CONNECTX4) },
{ PCI_VDEVICE(MELLANOX, 0x1014), MLX5_PCI_DEV_IS_VF}, /* ConnectX-4 VF */
{ PCI_VDEVICE(MELLANOX, PCI_DEVICE_ID_MELLANOX_CONNECTX4_LX) },
{ PCI_VDEVICE(MELLANOX, 0x1016), MLX5_PCI_DEV_IS_VF}, /* ConnectX-4LX VF */
{ PCI_VDEVICE(MELLANOX, 0x1017) }, /* ConnectX-5, PCIe 3.0 */
{ PCI_VDEVICE(MELLANOX, 0x1018), MLX5_PCI_DEV_IS_VF}, /* ConnectX-5 VF */
{ PCI_VDEVICE(MELLANOX, 0x1019) }, /* ConnectX-5 Ex */
{ PCI_VDEVICE(MELLANOX, 0x101a), MLX5_PCI_DEV_IS_VF}, /* ConnectX-5 Ex VF */
{ PCI_VDEVICE(MELLANOX, 0x101b) }, /* ConnectX-6 */
{ PCI_VDEVICE(MELLANOX, 0x101c), MLX5_PCI_DEV_IS_VF}, /* ConnectX-6 VF */
{ PCI_VDEVICE(MELLANOX, 0x101d) }, /* ConnectX-6 Dx */
{ PCI_VDEVICE(MELLANOX, 0x101e), MLX5_PCI_DEV_IS_VF}, /* ConnectX Family mlx5Gen Virtual Function */
{ PCI_VDEVICE(MELLANOX, 0x101f) }, /* ConnectX-6 LX */
{ PCI_VDEVICE(MELLANOX, 0x1021) }, /* ConnectX-7 */
{ PCI_VDEVICE(MELLANOX, 0xa2d2) }, /* BlueField integrated ConnectX-5 network controller */
{ PCI_VDEVICE(MELLANOX, 0xa2d3), MLX5_PCI_DEV_IS_VF}, /* BlueField integrated ConnectX-5 network controller VF */
{ PCI_VDEVICE(MELLANOX, 0xa2d6) }, /* BlueField-2 integrated ConnectX-6 Dx network controller */
{ 0, }
};
MODULE_DEVICE_TABLE(pci, mlx5_core_pci_table);
pci_register_driver()
会将该驱动的各种回调方法注册到一个 struct pci_driver mlx5_core_driver
变量,
// https://github.com/torvalds/linux/blob/v5.10/drivers/net/ethernet/mellanox/mlx5/core/main.c
static struct pci_driver mlx5_core_driver = {
.name = DRIVER_NAME,
.id_table = mlx5_core_pci_table,
.probe = init_one, // 初始化时执行这个方法
.remove = remove_one,
.suspend = mlx5_suspend,
.resume = mlx5_resume,
.shutdown = shutdown,
.err_handler = &mlx5_err_handler,
.sriov_configure = mlx5_core_sriov_configure,
};
更详细的 PCI 驱动信息不在本文讨论范围,如想进一步了解,可参考 分享, wiki, Linux Kernel Documentation: PCI。
pci_driver->probe()
内核启动过程中,会通过 PCI ID 依次识别各 PCI 设备,然后为设备选择合适的驱动。 每个 PCI 驱动都注册了一个 probe()
方法,为设备寻找驱动就是调用其 probe() 方法。 probe()
做的事情因厂商和设备而已,但总体来说这个过程涉及到的东西都非常多, 最终目标也都是使设备 ready。典型过程包括:
来看下 mlx5_core
驱动的 probe()
包含哪些过程:
// https://github.com/torvalds/linux/blob/v5.10/drivers/net/ethernet/mellanox/mlx5/core/main.c
static int init_one(struct pci_dev *pdev, const struct pci_device_id *id) {
struct devlink *devlink = mlx5_devlink_alloc(); // 内核通用结构体,包含 netns 等信息
struct mlx5_core_dev *dev = devlink_priv(devlink); // 私有数据区域存储的是 mlx5 自己的 device 表示
dev->device = &pdev->dev;
dev->pdev = pdev;
mlx5_mdev_init(dev, prof_sel); // 初始化 mlx5 debugfs 目录
mlx5_pci_init(dev, pdev, id); // 初始化 IO/DMA/Capabilities 等功能
mlx5_load_one(dev, true); // 初始化 IRQ 等
request_module_nowait(MLX5_IB_MOD);
pci_save_state(pdev);
if (!mlx5_core_is_mp_slave(dev))
devlink_reload_enable(devlink);
}
第一行创建 devlink 时,可以声明一段私有空间大小(封装在函数里面),里面放什么数 据完全由这个 devlink 的 owner 来决定;在 Mellanox 驱动中,这段空间的大小是 sizeof(struct mlx5_core_dev)
,存放具体的 device 信息,所以第二行 devlink_priv()
拿到的就是 device 指针。
.probe()
|-init_one(struct pci_dev *pdev, pci_device_id id)
|-mlx5_devlink_alloc()
| |-devlink_alloc(ops) // net/core/devlink.c
| |-devlink = kzalloc()
| |-devlink->ops = ops
| |-return devlink
|
|-mlx5_mdev_init(dev, prof_sel)
| |-debugfs_create_dir
| |-mlx5_pagealloc_init(dev)
| |-create_singlethread_workqueue("mlx5_page_allocator")
| |-alloc_ordered_workqueue
| |-alloc_workqueue // kernel/workqueue.c
|
|-mlx5_pci_init(dev, pdev, id);
| |-mlx5_pci_enable_device(dev);
| |-request_bar(pdev); // Reserve PCI I/O and memory resources
| |-pci_set_master(pdev); // Enables bus-mastering on the device
| |-set_dma_caps(pdev); // setting DMA capabilities mask, set max_seg <= 1GB
| |-dev->iseg = ioremap()
|
|-mlx5_load_one(dev, true);
| |-if interface already STATE_UP
| | return
| |
| |-dev->state = STATE_UP
| |-mlx5_function_setup // Init firmware functions
| |-if boot:
| | mlx5_init_once
| | |-mlx5_irq_table_init // Allocate IRQ table memory
| | |-mlx5_eq_table_init // events queue
| | |-dev->vxlan = mlx5_vxlan_create
| | |-dev->geneve = mlx5_geneve_create
| | |-mlx5_sriov_init
| | else:
| | mlx5_attach_device
| |-mlx5_load
| | |-mlx5_irq_table_create // 初始化硬中断
| | | |-pci_alloc_irq_vectors(MLX5_IRQ_VEC_COMP_BASE + 1, PCI_IRQ_MSIX);
| | | |-request_irqs
| | | for i in vectors:
| | | request_irq(irqn, mlx5_irq_int_handler) // 注册中断处理函数
| | |-mlx5_eq_table_create // 初始化事件队列(EventQueue)
| | |-create_comp_eqs(dev) // Completion EQ
| | for ncomp_eqs:
| | eq->irq_nb.notifier_call = mlx5_eq_comp_int;
| | create_map_eq()
| | mlx5_eq_enable()
| |-set_bit(MLX5_INTERFACE_STATE_UP, &dev->intf_state);
|
|-pci_save_state(pdev);
|
|-if (!mlx5_core_is_mp_slave(dev))
devlink_reload_enable(devlink);
Fig. PCI probe during device initialization
devlink
:mlx5_devlink_alloc()
struct devlink
是个通用内核结构体:
// include/net/devlink.h
struct devlink {
struct list_head list;
struct list_head port_list;
struct list_head sb_list;
struct list_head dpipe_table_list;
struct list_head resource_list;
struct list_head param_list;
struct list_head region_list;
struct list_head reporter_list;
struct mutex reporters_lock; /* protects reporter_list */
struct devlink_dpipe_headers *dpipe_headers;
struct list_head trap_list;
struct list_head trap_group_list;
struct list_head trap_policer_list;
const struct devlink_ops *ops;
struct xarray snapshot_ids;
struct devlink_dev_stats stats;
struct device *dev;
possible_net_t _net; // netns 相关
u8 reload_failed:1,
reload_enabled:1,
registered:1;
char priv[0] __aligned(NETDEV_ALIGN);
};
mlx5_devlink_alloc()
注册一些硬件特性相关的方法,例如 reload、info_get 等:
// drivers/net/ethernet/mellanox/mlx5/core/devlink.c
struct devlink *mlx5_devlink_alloc(void) {
return devlink_alloc(&mlx5_devlink_ops, sizeof(struct mlx5_core_dev));
}
static const struct devlink_ops mlx5_devlink_ops = {
.flash_update = mlx5_devlink_flash_update,
.info_get = mlx5_devlink_info_get,
.reload_actions = BIT(DEVLINK_RELOAD_ACTION_DRIVER_REINIT) | BIT(DEVLINK_RELOAD_ACTION_FW_ACTIVATE),
.reload_limits = BIT(DEVLINK_RELOAD_LIMIT_NO_RESET),
.reload_down = mlx5_devlink_reload_down,
.reload_up = mlx5_devlink_reload_up,
};
mlx5_mdev_init()
这里面会初始化 debugfs 目录:/sys/kernel/debug/mlx5/
$ tree -L 2 /sys/kernel/debug/mlx5/
/sys/kernel/debug/mlx5/
├── 0000:12:00.0
│ ├── cc_params
│ ├── cmd
│ ├── commands
│ ├── CQs
│ ├── delay_drop
│ ├── EQs
│ ├── mr_cache
│ └── QPs
└── 0000:12:00.1
├── cc_params
├── cmd
├── commands
├── CQs
├── delay_drop
├── EQs
├── mr_cache
└── QPs
里面的信息非常多,可以 cat 其中一些文件来帮助排障。
另外就是初始化一些 WQ。Workqueue (WQ) 也是一个内核通用结构体,更多信息,见 Linux 中断(IRQ/softirq)基础:原理及内核实现。
mlx5_pci_init()
static int
mlx5_pci_init(struct mlx5_core_dev *dev, struct pci_dev *pdev, const struct pci_device_id *id) {
mlx5_pci_enable_device(dev);
request_bar(pdev); // Reserve PCI I/O and memory resources
pci_set_master(pdev); // Enables bus-mastering on the device
set_dma_caps(pdev); // setting DMA capabilities mask, set max_seg <= 1GB
dev->iseg = ioremap(dev->iseg_base, sizeof(*dev->iseg)); // mapping initialization segment
mlx5_pci_vsc_init(dev);
dev->caps.embedded_cpu = mlx5_read_embedded_cpu(dev);
return 0;
}
mlx5_load_one()
int mlx5_load_one(struct mlx5_core_dev *dev, bool boot) {
dev->state = MLX5_DEVICE_STATE_UP;
mlx5_function_setup(dev, boot);
if (boot) {
mlx5_init_once(dev);
}
mlx5_load(dev);
set_bit(MLX5_INTERFACE_STATE_UP, &dev->intf_state);
if (boot) {
mlx5_devlink_register(priv_to_devlink(dev), dev->device);
mlx5_register_device(dev);
} else {
mlx5_attach_device(dev);
}
}
这里会初始化硬件中断,
| |-mlx5_load
| | |-mlx5_irq_table_create // 初始化硬中断
| | | |-pci_alloc_irq_vectors(MLX5_IRQ_VEC_COMP_BASE + 1, PCI_IRQ_MSIX);
| | | |-request_irqs
| | | for i in vectors:
| | | request_irq(irqn, mlx5_irq_int_handler) // 注册中断处理函数
| | |-mlx5_eq_table_create // 初始化事件队列(EventQueue)
| | |-create_comp_eqs(dev) // Completion EQ
| | for ncomp_eqs:
| | eq->irq_nb.notifier_call = mlx5_eq_comp_int;
| | create_map_eq()
| | mlx5_eq_enable()
使用的中断方式是 MSI-X。 当一个数据帧通过 DMA 写到内核内存 ringbuffer 后,网卡通过硬件中断(IRQ)通知其他系统。 设备有多种方式触发一个中断:
设备驱动的实现也因此而异。驱动必须判断出设备支持哪种中断方式,然后注册相应的中断处理函数,这些函数在中断发 生的时候会被执行。
irqbalance
方式,或者修改/proc/irq/IRQ_NUMBER/smp_affinity
)。后面会看到 ,处理中断的 CPU 也是随后处理这个包的 CPU。这样的话,从网卡硬件中断的层面就可 以设置让收到的包被不同的 CPU 处理。mlx5e_init()
PCI 功能初始化成功后,接下来执行以太网相关功能的初始化。
// en_main.c
void mlx5e_init(void) {
mlx5e_ipsec_build_inverse_table();
mlx5e_build_ptys2ethtool_map();
mlx5_register_interface(&mlx5e_interface);
}
static struct mlx5_interface mlx5e_interface = {
.add = mlx5e_add,
.remove = mlx5e_remove,
.attach = mlx5e_attach,
.detach = mlx5e_detach,
.protocol = MLX5_INTERFACE_PROTOCOL_ETH, // 接口运行以太网协议
};
mlx5e_nic
|-mlx5e_ipsec_build_inverse_table();
|-mlx5e_build_ptys2ethtool_map();
|-mlx5_register_interface(&mlx5e_interface)
|-list_add_tail(&intf->list, &intf_list);
|
|-for priv in mlx5_dev_list
mlx5_add_device(intf, priv)
/
/
mlx5_add_device(intf, priv)
|-if !mlx5_lag_intf_add
| return // if running in InfiniBand mode, directly return
|
|-dev_ctx = kzalloc()
|-dev_ctx->context = intf->add(dev)
|-mlx5e_add
|-netdev = mlx5e_create_netdev(mdev, &mlx5e_nic_profile, nch);
| |-mlx5e_nic_init
| |-mlx5e_build_nic_netdev(netdev);
| |-netdev->netdev_ops = &mlx5e_netdev_ops; // 注册 ethtool_ops, poll
|-mlx5e_attach
| |-mlx5e_attach_netdev
| |-profile->init_tx()
| |-profile->init_rx()
| |-profile->enable()
| |-mlx5e_nic_enable
| |-mlx5e_init_l2_addr
| |-queue_work(priv->wq, &priv->set_rx_mode_work)
| |-mlx5e_open(netdev);
| | |-mlx5e_open_locked
| | |-mlx5e_open_channels
| | | |-mlx5e_open_channel
| | | |-netif_napi_add(netdev, &c->napi, mlx5e_napi_poll, 64);
| | | |-mlx5e_open_queues
| | | |-mlx5e_open_cq
| | | | |-mlx5e_alloc_cq
| | | | |-mlx5e_alloc_cq_common
| | | | |-mcq->comp = mlx5e_completion_event;
| | | |-napi_enable(&c->napi)
| | |-mlx5e_activate_priv_channels
| | |-mlx5e_activate_channels
| | |-for ch in channels:
| | mlx5e_activate_channel
| | |-mlx5e_activate_rq
| | |-mlx5e_trigger_irq
| |-netif_device_attach(netdev);
Fig. mlx5_core driver ethernet functions initialization
mlx5e_init() -> mlx5_register_interface() -> mlx5_add_device()
// drivers/net/ethernet/mellanox/mlx5/core/dev.c
static LIST_HEAD(intf_list); // 全局 interface 双向链表
static LIST_HEAD(mlx5_dev_list); // 全局 mlx5 device 双向链表
int mlx5_register_interface(struct mlx5_interface *intf) {
list_add_tail(&intf->list, &intf_list);
list_for_each_entry(struct mlx5_priv *priv, &mlx5_dev_list, dev_list)
mlx5_add_device(intf, priv); // 如果网卡运行的是 InfiniBand 协议,里面会直接返回
}
intf_list
;static const struct mlx5e_profile mlx5e_nic_profile = {
.init = mlx5e_nic_init, // 网卡以太网相关内容初始化
.cleanup = mlx5e_nic_cleanup,
.init_rx = mlx5e_init_nic_rx, // 初始化接收队列(RX)
.cleanup_rx = mlx5e_cleanup_nic_rx,
.init_tx = mlx5e_init_nic_tx, // 初始化发送队列(TX)
.cleanup_tx = mlx5e_cleanup_nic_tx,
.enable = mlx5e_nic_enable, // 启用网卡时的回调
.disable = mlx5e_nic_disable,
.update_rx = mlx5e_update_nic_rx,
.update_stats = mlx5e_stats_update_ndo_stats,
.update_carrier = mlx5e_update_carrier,
.rx_handlers = &mlx5e_rx_handlers_nic, // 收包函数
.max_tc = MLX5E_MAX_NUM_TC,
.rq_groups = MLX5E_NUM_RQ_GROUPS(XSK),
.stats_grps = mlx5e_nic_stats_grps,
.stats_grps_num = mlx5e_nic_stats_grps_num,
};
mlx5_add_device() -> intf.add() -> mlx5e_add()
mlx5e_create_netdev()
:创建 netdev、注册 ethtool
方法ethtool
是一个命令行工具,可以查看和修改网卡配置,常用于收集网卡统计数据。 内核实现了一个通用 ethtool
接口,网卡驱动只要实现这些接口,就可以使用 ethtool
来查看或修改网络配置; 在底层,它是通过 ioctl 和设备驱动通信的。
// drivers/net/ethernet/mellanox/mlx5/core/en_main.c
const struct net_device_ops mlx5e_netdev_ops = {
.ndo_open = mlx5e_open,
.ndo_stop = mlx5e_close,
.ndo_start_xmit = mlx5e_xmit,
.ndo_setup_tc = mlx5e_setup_tc,
.ndo_select_queue = mlx5e_select_queue,
.ndo_get_stats64 = mlx5e_get_stats,
.ndo_set_rx_mode = mlx5e_set_rx_mode,
.ndo_set_mac_address = mlx5e_set_mac,
.ndo_vlan_rx_add_vid = mlx5e_vlan_rx_add_vid,
.ndo_vlan_rx_kill_vid = mlx5e_vlan_rx_kill_vid,
.ndo_set_features = mlx5e_set_features,
.ndo_fix_features = mlx5e_fix_features,
.ndo_change_mtu = mlx5e_change_nic_mtu,
.ndo_do_ioctl = mlx5e_ioctl,
.ndo_set_tx_maxrate = mlx5e_set_tx_maxrate,
.ndo_udp_tunnel_add = udp_tunnel_nic_add_port,
.ndo_udp_tunnel_del = udp_tunnel_nic_del_port,
.ndo_features_check = mlx5e_features_check,
.ndo_tx_timeout = mlx5e_tx_timeout,
.ndo_bpf = mlx5e_xdp, // BPF
.ndo_xdp_xmit = mlx5e_xdp_xmit, // XDP
.ndo_xsk_wakeup = mlx5e_xsk_wakeup,
.ndo_get_devlink_port = mlx5e_get_devlink_port,
};
mlx5e_attach()
今天的大部分网卡都使用 DMA 将数据直接写到内存,接下来操作系统可以直接从里 面读取。实现这一目的所使用的数据结构就是 ring buffer(环形缓冲区)。 要实现这一功能,设备驱动必须和操作系统合作,预留(reserve)出一段内存来给网卡使用。 预留成功后,网卡知道了这块内存的地址,接下来收到的包就会放到这里,进而被操作系统取走。
static int mlx5e_init_nic_rx(struct mlx5e_priv *priv) {
struct mlx5_core_dev *mdev = priv->mdev;
mlx5e_create_q_counters(priv);
mlx5e_open_drop_rq(priv, &priv->drop_rq);
mlx5e_create_indirect_rqt(priv);
mlx5e_create_direct_rqts(priv, priv->direct_tir);
mlx5e_create_indirect_tirs(priv, true);
mlx5e_create_direct_tirs(priv, priv->direct_tir);
mlx5e_create_direct_rqts(priv, priv->xsk_tir);
mlx5e_create_direct_tirs(priv, priv->xsk_tir);
mlx5e_create_flow_steering(priv);
mlx5e_tc_nic_init(priv); // 初始化 TC offload 功能
mlx5e_accel_init_rx(priv); // 初始化 kTLS offload
#ifdef CONFIG_MLX5_EN_ARFS
priv->netdev->rx_cpu_rmap = mlx5_eq_table_get_rmap(priv->mdev);
#endif
以及 profile 中注册的 rx_handlers:
const struct mlx5e_rx_handlers mlx5e_rx_handlers_nic = {
.handle_rx_cqe = mlx5e_handle_rx_cqe,
.handle_rx_cqe_mpwqe = mlx5e_handle_rx_cqe_mpwrq,
};
查看一台机器的 queue 数量:
$ ethtool -l eth0 # 能用 ethtool 看到这些信息,就是因为前面注册了 ethtool 的相应方法
Channel parameters for eth0:
Pre-set maximums:
RX: 0
TX: 0
Other: 0
Combined: 40 # 最多支持 40 个
Current hardware settings:
RX: 0
TX: 0
Other: 0
Combined: 40 # 目前配置 40 个
每个 Queue 的 descriptor 数量(或称 queue depth):
$ ethtool -g eth0
Ring parameters for eth0:
Pre-set maximums:
RX: 8192
RX Mini: 0
RX Jumbo: 0
TX: 8192
Current hardware settings:
RX: 1024
RX Mini: 0
RX Jumbo: 0
TX: 1024
Hash:
$ ethtool -x eth0
RX flow hash indirection table for eth0 with 40 RX ring(s):
0: 0 1 2 3 4 5 6 7
8: 8 9 10 11 12 13 14 15
16: 16 17 18 19 20 21 22 23
24: 24 25 26 27 28 29 30 31
32: 32 33 34 35 36 37 38 39
40: 0 1 2 3 4 5 6 7
48: 8 9 10 11 12 13 14 15
56: 16 17 18 19 20 21 22 23
64: 24 25 26 27 28 29 30 31
72: 32 33 34 35 36 37 38 39
80: 0 1 2 3 4 5 6 7
88: 8 9 10 11 12 13 14 15
96: 16 17 18 19 20 21 22 23
104: 24 25 26 27 28 29 30 31
112: 32 33 34 35 36 37 38 39
120: 0 1 2 3 4 5 6 7
RSS hash key:
9a:b0:e3:53:ed:d4:14:7a:a0:...:e5:57:e8:6a:ec
RSS hash function:
toeplitz: off
xor: on
crc32: off
内核有一种称为 NAPI(New API)的机制,允许网卡注册自己的 poll() 方法,执行这个方法就会从相应的网卡收包。 关于 NAPI 后面会有更详细介绍,这里只看一下注册时的调用栈:
mlx5e_open(netdev);
|-mlx5e_open_locked
|-mlx5e_open_channels
| |-mlx5e_open_channel
| |-netif_napi_add(netdev, &c->napi, mlx5e_napi_poll, 64); // 注册 NAPI
| |-mlx5e_open_queues
| |-mlx5e_open_cq
| | |-mlx5e_alloc_cq
| | |-mlx5e_alloc_cq_common
| | |-mcq->comp = mlx5e_completion_event;
| |-napi_enable(&c->napi) // 启用 NAPI
|-mlx5e_activate_priv_channels
|-mlx5e_activate_channels
|-for ch in channels:
mlx5e_activate_channel // 启用硬中断(IRQ)
|-mlx5e_activate_rq
|-mlx5e_trigger_irq
到这里,几乎所有的准备工作都就绪了,唯一剩下的就是打开硬中断, 等待数据包进来。
打开硬中断的方式因硬件而异,mxl5_core
驱动是在 mlx5e_trigger_irq()
中完成的。 调用栈见上面。
网卡启用后,驱动可能还会做一些额外的事情,例如启动定时器 或者其他硬件相关的设置。这些工作做完后,网卡就可以收包了。
从流程来说,网卡驱动初始化完成之后,我们就等着包从网卡上来了,也就是图中第 2 步:
Fig. Steps of Linux kernel receiving data process and the corresponding chapters in this post
这是物理层(L1)和数据链路层(L2)的行为,这里就不展开了。
Fig. Steps of Linux kernel receiving data process and the corresponding chapters in this post
数据从网线进入网卡,通过 DMA 直接写到 ring buffer(第 3 步),然后就该操作系统来收包了。
如果对其原理感兴趣,可以查看内核文档 DMA API HOWTO: Dynamic DMA mapping Guide。
在包从网卡到达应用层的过程中,会经历几次数据复制,这个对性能影响非常大,所以我们记录一下:
Fig. DMA, ring buffer and the data copy steps
这里需要停下来考虑一个问题:网卡是没有处理器的(近几年刚出现的所谓“ 智能网卡”除外,它们有自己独立的处理器、内存等等,可以与 CPU 并行处理),所以 如果到达网卡的数据没有进程或线程来处理,就只能被丢弃。那么,接下来谁来收这些包,怎么收这些包呢?
有两种方式,我们分别来看下。
中断方式针对高吞吐场景的改进是 NAPI 方式,简单来说它结合了轮询和中断两种方式。 绝大部分网卡都是这种模式,本文所用的 mlx5_core
就属于这一类。
Fig. NAPI working mechanism: poll (batch receiving) + IRQ
简单来说,NAPI 结合了轮询和中断两种方式。
假如此时 NAPI poll() 没有正在运行, 接下来我们看通过 IRQ 来通知 CPU(图中第 4 步)从 ring buffer 收包的。
DMA 将包复制到 ring buffer(内核内存)之后,网卡发起对应的中断(在 MSI-X 场景,中断和 RX 队列绑定)。 来个具体例子,下面是台 40 核的机器,
$ cat /proc/interrupts
CPU0 CPU1 ... CPU38 CPU39
0: 33 0 ... 0 0 IR-IO-APIC 2-edge timer
1: 0 0 ... 0 0 IR-IO-APIC 1-edge i8042
8: 0 0 ... 0 0 IR-IO-APIC 8-edge rtc0
9: 0 0 ... 0 0 IR-IO-APIC 9-fasteoi acpi
...
93: 0 0 ... 0 0 DMAR-MSI 1-edge dmar1
94: 0 0 ... 0 0 DMAR-MSI 0-edge dmar0
96: 68719 42920 ... 0 0 IR-PCI-MSI 9437189-edge mlx5_comp1@pci:0000:12:00.0
...
179: 5743 15415 ... 0 1 IR-PCI-MSI 9439275-edge mlx5_comp39@pci:0000:12:00.1
200: 0 0 ... 0 0 IR-PCI-MSI 67188736-edge ioat-msix
NMI: 8282 78282 ... 8281 8281 Non-maskable interrupts
PMI: 8282 78282 ... 8281 8281 Performance monitoring interrupts
IWI: 3768 86227 ... 3083 0136 IRQ work interrupts
CAL: 217210 245619 ... 425777 430686 Function call interrupts
其中的 mlx5_comp{id}@pci:xxx
就是我们网卡的中断处理函数, 它是在网卡初始化 IRQ 时生成的,后面会看到调用栈。
硬中断期间是不能再进行另外的硬中断的,也就是说不能嵌套。 所以硬中断处理函数(handler)执行时,会屏蔽部分或全部(新的)硬中断。
所以,所有耗时的操作都应该从硬中断处理逻辑中剥离出来, 硬中断因此能尽可能快地执行,然后再重新打开。软中断就是针对这一目的提出的。
内核中也有其他机制将耗时操作转移出去,不过对于网络栈,我们接下来只看软中断这种方式。
这个过程其实是在网卡驱动初始化(第一章)时完成的,但是第一章的内容太多了,所以我们放到这里看一下:
mlx5_load
|-mlx5_irq_table_create
| |-table->irq = kcalloc()
| |-pci_alloc_irq_vectors(dev->pdev, MLX5_IRQ_VEC_COMP_BASE + 1, nvec, PCI_IRQ_MSIX);
| |-request_irqs
| |-for i in vectors:
| irq = mlx5_irq_get(dev, i);
| irqn = pci_irq_vector(dev->pdev, i);
| irq_set_name(sprintf("mlx5_comp%d", vecidx-MLX5_IRQ_VEC_COMP_BASE), i);
| snprintf(irq->name, "%s@pci:%s", name, pci_name(dev->pdev));
| request_irq(irqn, mlx5_irq_int_handler, 0, irq->name, &irq->nh); // 注册中断处理函数
|
|-mlx5_eq_table_create // 初始化事件队列(EventQueue)
|-create_comp_eqs(dev) // Completion EQ
for ncomp_eqs:
eq->irq_nb.notifier_call = mlx5_eq_comp_int; // 每个 EQ 事件完成时的回调函数
create_map_eq()
mlx5_eq_enable()
mlx5_irq_int_handler()
,irq->name
,格式是 mlx5_comp{id}@pci:xxx
,正是 cat /proc/interrupts
看到的最后一列;request_irq()
已经跳出了 mlx5_core 驱动,是通用内核代码,见 include/linux/interrupt.h
。mlx5_eq_comp_int()
注意是先注册 NAPI poll,再打开硬件中断; 硬中断先执行到网卡注册的 IRQ handler,在 handler 里面再触发 NET_RX_SOFTIRQ
softirq。
mlx5_irq_int_handler()
mlx5_irq_int_handler()
非常简单,
// drivers/net/ethernet/mellanox/mlx5/core/pci_irq.c
static irqreturn_t mlx5_irq_int_handler(int irq, void *nh) {
atomic_notifier_call_chain(nh, 0, NULL); // 内核函数,见 kernel/notifier.c
return IRQ_HANDLED;
}
然后中断事件就进入了 Mellanox 的完成队列(EQ),会执行 EQ 完成时的回调方法。
irq_nb.notifier_call() -> mlx5_eq_comp_int()
// https://github.com/torvalds/linux/blob/v5.10/drivers/net/ethernet/mellanox/mlx5/core/eq.c
static int mlx5_eq_comp_int(struct notifier_block *nb) {
struct mlx5_eq_comp *eq_comp = container_of(nb, struct mlx5_eq_comp, irq_nb);
struct mlx5_eq *eq = &eq_comp->core;
u32 cqn = -1;
struct mlx5_eqe *eqe = next_eqe_sw(eq); // 从 event queue 拿出一个 entry
do {
dma_rmb(); // Make sure we read EQ entry contents after we've checked the ownership bit.
struct mlx5_core_cq *cq = mlx5_eq_cq_get(eq, cqn); // 获取 EQ completion queue
if (likely(cq)) {
++cq->arm_sn;
cq->comp(cq, eqe); // 执行 completion 方法,里面最主要的步骤是执行 napi_schedule()
mlx5_cq_put(cq); // refcnt--,释放空间
}
++eq->cons_index;
} while ((++num_eqes < MLX5_EQ_POLLING_BUDGET) && (eqe = next_eqe_sw(eq)));
eq_update_ci(eq, 1); // 更新一个硬件寄存器,记录 IRQ 频率
if (cqn != -1)
tasklet_schedule(&eq_comp->tasklet_ctx.task);
}
是个比较简单的循环,
最后,更新一个硬件寄存器,记录硬件中断频率。 可以用ethtool 调整 IRQ 触发频率。
cq->comp() -> mlx5e_completion_event() -> napi_schedule()
void mlx5e_completion_event(struct mlx5_core_cq *mcq, struct mlx5_eqe *eqe) {
struct mlx5e_cq *cq = container_of(mcq, struct mlx5e_cq, mcq);
napi_schedule(cq->napi);
cq->event_ctr++;
cq->channel->stats->events++;
}
napi_schedule() -> ____napi_schedule()
接下来看从硬件中断中调用的 napi_schedule
是如何工作的。
注意,NAPI 存在的意义是无需硬件中断通知就可以接收网络数据。前面提到, NAPI 的轮询循环(poll loop)是受硬件中断触发而跑起来的。换句话说,NAPI 功能启用了 ,但是默认是没有工作的,直到第一个包到达的时候,网卡触发的一个硬件将它唤醒。后面 会看到,也还有其他的情况,NAPI 功能也会被关闭,直到下一个硬中断再次将它唤起。
napi_schedule
只是一个简单的封装,内层调用 __napi_schedule
。
// https://github.com/torvalds/linux/blob/v5.10/net/core/dev.c
/**
* __napi_schedule - schedule for receive
* @n: entry to schedule
*
* The entry's receive function will be scheduled to run.
* Consider using __napi_schedule_irqoff() if hard irqs are masked.
*/
void __napi_schedule(struct napi_struct *n) {
unsigned long flags;
local_irq_save(flags);
____napi_schedule(this_cpu_ptr(&softnet_data), n);
local_irq_restore(flags);
}
获取对应这个 CPU 的 structure softnet_data
变量,作为参数传过去。
____napi_schedule() -> __raise_softirq_irqoff()
// https://github.com/torvalds/linux/blob/v5.10/net/core/dev.c
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd, struct napi_struct *napi) {
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ); // kernel/softirq.c
}
这段代码了做了两个重要的事情:
napi_struct
变量,添加到 poll list,后者 attach 到这个 CPU 上的 softnet_data
__raise_softirq_irqoff
触发一个 NET_RX_SOFTIRQ
类型软中断。这会触发执行 net_rx_action
(如果没有正在执行),后者是网络设备初始化的时候注册的(下一章会看到)。接下来会看到,软中断处理函数 net_rx_action
会调用 NAPI 的 poll 函数来收包。
注意到目前为止,我们从硬中断处理函数中转移到软中断处理函数的逻辑,都是使用的本 CPU 变量。
驱动的硬中断处理函数做的事情很少,但软中断将会在和硬中断相同的 CPU 上执行。这就 是为什么给每个 CPU 一个特定的硬中断非常重要:这个 CPU 不仅处理这个硬中断,而且通 过 NAPI 处理接下来的软中断来收包。
后面我们会看到,Receive Packet Steering 可以将软中断分给其他 CPU。
ksoftirqd
线程现在来到了图中第 5 步:
Fig. Steps of Linux kernel receiving data process and the corresponding chapters in this post
网络设备(netdev)的初始化在 net_dev_init()
,在系统启动期间执行:
// net/core/dev.c
// Initialize the DEV module. At boot time this walks the device list and
// unhooks any devices that fail to initialise (normally hardware not
// present) and leaves us with a valid list of present and active devices.
static int __init net_dev_init(void) {
dev_proc_init(); // 注册 /proc/net/{dev,softnet_stat,ptytpe}
netdev_kobject_init();
INIT_LIST_HEAD(&ptype_all);
for (i = 0; i < PTYPE_HASH_SIZE; i++)
INIT_LIST_HEAD(&ptype_base[i]);
INIT_LIST_HEAD(&offload_base);
register_pernet_subsys(&netdev_net_ops);
// 针对每个 CPU,初始化各种接收队列(packet receive queues)
for_each_possible_cpu(i) {
struct work_struct *flush = per_cpu_ptr(&flush_works, i);
struct softnet_data *sd = &per_cpu(softnet_data, i);
INIT_WORK(flush, flush_backlog);
skb_queue_head_init(&sd->input_pkt_queue);
skb_queue_head_init(&sd->process_queue);
skb_queue_head_init(&sd->xfrm_backlog);
INIT_LIST_HEAD(&sd->poll_list);
sd->output_queue_tailp = &sd->output_queue;
#ifdef CONFIG_RPS
sd->csd.func = rps_trigger_softirq;
sd->csd.info = sd;
sd->cpu = i;
#endif
init_gro_hash(&sd->backlog); // GRO
sd->backlog.poll = process_backlog;
sd->backlog.weight = weight_p;
}
dev_boot_phase = 0;
register_pernet_device(&loopback_net_ops)
register_pernet_device(&default_device_ops)
// 注册 RX/TX 软中断处理函数
open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
cpuhp_setup_state_nocalls(CPUHP_NET_DEV_DEAD, "net/dev:dead", NULL, dev_cpu_dead);
}
/proc/net/{dev,softnet_data,ptype}
dev_proc_init()
会注册 /proc/net/{dev,softnet_stat,ptytpe}
:
$ cat /proc/net/dev # 网络设备收发统计
Inter-| Receive | Transmit
face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed
lo: 5354018267 34340262 0 0 0 0 0 0 5354018267 34340262 0 0 0 0 0 0
enp0s3: 58744000 428517 0 0 0 0 0 0 654185884 624670 0 0 0 0 0 0
$ cat /proc/net/softnet_stat # 网络设备收发、错误等详细信息
0212786d 00000000 00005674 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000000
$ cat /proc/net/ptype # 网络设备 RX 协议
Type Device Function
0800 ip_rcv
0806 arp_rcv
86dd ipv6_rcv
struct softnet_data
net_dev_init()
为每个 CPU 创建一个 struct softnet_data
变量。这个变量包含很多 重要信息:
net_dev_init
分别为接收和发送数据注册了一个软中断处理函数, 后面会看到网卡驱动的中断处理函数是如何触发 net_rx_action()
执行的。
内核的软中断系统是一种在硬中断处理上下文(驱动中)之外执行代码的机制。
调度执行到某个特定线程的调用栈:
smpboot_thread_fn
|-while (1) {
set_current_state(TASK_INTERRUPTIBLE); // 设置当前 CPU 为可中断状态
if !thread_should_run { // 无 pending 的软中断
preempt_enable_no_resched();
schedule();
} else { // 有 pending 的软中断
__set_current_state(TASK_RUNNING);
preempt_enable();
thread_fn(td->cpu); // 如果此时执行的是 ksoftirqd 线程,
|-run_ksoftirqd // 那会执行 run_ksoftirqd() 回调函数
|-local_irq_disable(); // 关闭所在 CPU 的所有硬中断
|
|-if local_softirq_pending() {
| __do_softirq();
| local_irq_enable(); // 重新打开所在 CPU 的所有硬中断
| cond_resched(); // 将 CPU 交还给调度器
| return;
|-}
|
|-local_irq_enable(); // 重新打开所在 CPU 的所有硬中断
}
}
如果此时调度到的是 ksoftirqd
线程,那 thread_fn()
执行的就是 run_ksoftirqd()
。
run_ksoftirqd()
软中断线程在内核启动的很早阶段就 spawn 出来了:
// https://github.com/torvalds/linux/blob/v5.10/kernel/softirq.c#L730
static struct smp_hotplug_thread softirq_threads = {
.store = &ksoftirqd,
.thread_should_run= ksoftirqd_should_run, // 调度到该线程是,判断能否执行
.thread_fn = run_ksoftirqd, // 调度到该线程时,执行的回调函数
.thread_comm = "ksoftirqd/%u", // 线程名字,ps -ef 最后一列可以看到
};
static __init int spawn_ksoftirqd(void) {
cpuhp_setup_state_nocalls(CPUHP_SOFTIRQ_DEAD, "softirq:dead", NULL, takeover_tasklets);
BUG_ON(smpboot_register_percpu_thread(&softirq_threads));
}
early_initcall(spawn_ksoftirqd);
smpboot_thread_fn() -> run_ksoftirqd()
每个 CPU 上的调度器会调度执行不同的线程,例如处理 OOM 的线程、处理 swap 的线程,以及 我们的软中断处理线程。每个线程分配一定的时间片:
查看 CPU 利用率时,si 字段对应的就是 softirq 开销
, 衡量(从硬中断转移过来的)软中断的 CPU 使用量:
$ top -n1 | head -n3
top - 18:14:05 up 86 days, 23:45, 2 users, load average: 5.01, 5.56, 6.26
Tasks: 969 total, 2 running, 733 sleeping, 0 stopped, 2 zombie
%Cpu(s): 13.9 us, 3.2 sy, 0.0 ni, 82.7 id, 0.0 wa, 0.0 hi, 0.1 si, 0.0 st
# ^
# |
# si: software interrupt overhead
代码:
// https://github.com/torvalds/linux/blob/v5.10/kernel/smpboot.c#L107
// smpboot_thread_fn - percpu hotplug thread loop function
// @data: thread data pointer
//
// Returns 1 when the thread should exit, 0 otherwise.
static int smpboot_thread_fn(void *data) {
struct smpboot_thread_data *td = data;
struct smp_hotplug_thread *ht = td->ht;
while (1) {
set_current_state(TASK_INTERRUPTIBLE); // 设置当前 CPU 为可中断状态
preempt_disable();
if (kthread_should_park()) {
...
continue; /* We might have been woken for stop */
}
switch (td->status) { /* Check for state change setup */
case HP_THREAD_NONE: ... continue;
case HP_THREAD_PARKED: ... continue;
}
if (!ht->thread_should_run(td->cpu)) { // 无 pending 的软中断
preempt_enable_no_resched();
schedule();
} else { // 有 pending 的软中断
__set_current_state(TASK_RUNNING);
preempt_enable();
ht->thread_fn(td->cpu); // 执行 `run_ksoftirqd()`
}
}
}
如果此时调度到的是 ksoftirqd
线程,并且有 pending 的软中断等待处理, 那 thread_fn()
执行的就是 run_ksoftirqd()
。
run_ksoftirqd()
// https://github.com/torvalds/linux/blob/v5.10/kernel/softirq.c#L730
static void run_ksoftirqd(unsigned int cpu) {
local_irq_disable(); // 关闭所在 CPU 的所有硬中断
if (local_softirq_pending()) {
// We can safely run softirq on inline stack, as we are not deep in the task stack here.
__do_softirq();
local_irq_enable(); // 重新打开所在 CPU 的所有硬中断
cond_resched(); // 将 CPU 交还给调度器
return;
}
local_irq_enable(); // 重新打开所在 CPU 的所有硬中断
}
首先调用 local_irq_disable()
,这是一个宏,最终会 展开成处理器架构相关的函数,功能是关闭所在 CPU 的所有硬中断;
接下来,判断如果有 pending softirq,则 执行 __do_softirq()
处理软中断,然后重新打开所在 CPU 的硬中断,然后返回;
否则直接打开所在 CPU 的硬中断,然后返回。
__do_softirq() -> net_rx_action()
Fig. run_ksoftirqd()
// https://github.com/torvalds/linux/blob/v5.10/kernel/softirq.c#L730
asmlinkage __visible void __softirq_entry
__do_softirq(void) {
unsigned long end = jiffies + MAX_SOFTIRQ_TIME; // 时间片最晚结束时刻
unsigned long old_flags = current->flags;
int max_restart = MAX_SOFTIRQ_RESTART; // 最大轮询 pending softirq 次数
// Mask out PF_MEMALLOC as the current task context is borrowed for the softirq.
// A softirq handler, such as network RX, might set PF_MEMALLOC again if the socket is related to swapping.
current->flags &= ~PF_MEMALLOC;
pending = local_softirq_pending(); // 获取 pending softirq 数量
account_irq_enter_time(current);
__local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET);
in_hardirq = lockdep_softirq_start();
restart:
set_softirq_pending(0); // Reset the pending bitmask before enabling irqs
local_irq_enable(); // 打开 IRQ 中断
struct softirq_action *h = softirq_vec;
while ((softirq_bit = ffs(pending))) {
h += softirq_bit - 1;
unsigned int vec_nr = h - softirq_vec;
int prev_count = preempt_count();
kstat_incr_softirqs_this_cpu(vec_nr);
h->action(h); // 指向 net_rx_action()
h++;
pending >>= softirq_bit;
}
if (__this_cpu_read(ksoftirqd) == current)
rcu_softirq_qs();
local_irq_disable(); // 关闭 IRQ
pending = local_softirq_pending(); // 再次获取 pending softirq 的数量
if (pending) {
if (time_before(jiffies, end) && !need_resched() && --max_restart)
goto restart;
wakeup_softirqd();
}
lockdep_softirq_end(in_hardirq);
account_irq_exit_time(current);
__local_bh_enable(SOFTIRQ_OFFSET);
current_restore_flags(old_flags, PF_MEMALLOC);
}
一旦软中断代码判断出有 softirq 处于 pending 状态,就会开始处理, 执行 net_rx_action,从 ring buffer 收包
。
现在来到了图中第 6 步:
Fig. Steps of Linux kernel receiving data process and the corresponding chapters in this post
上一节看到,软中断线程 ksoftirqd
被处理器调度执行之后,会调用 net_rx_action()
方法。 这个函数的功能是 ring buffer 取出数据包,然后对其进行进入协议栈之前的大量处理。
net_rx_action() -> napi_poll()
:从 ring buffer 读数据Ring buffer 是内核内存,其中存放的包是网卡通过 DMA 直接送过来的, net_rx_action()
从处理 ring buffer 开始处理。
Fig. ksoftirqd: receiving packets from ring buffer with net_rx_action()
// net/core/dev.c
static __latent_entropy void
net_rx_action(struct softirq_action *h) {
struct softnet_data *sd = this_cpu_ptr(&softnet_data); // 该 CPU 的 softnet_data 统计
time_limit = jiffies + usecs_to_jiffies(netdev_budget_usecs); // 该 CPU 的所有 NAPI 变量的总 time limit
budget = netdev_budget; // 该 CPU 的所有 NAPI 变量的总预算
LIST_HEAD(list); // 声明 struct list_head list 变量并初始化;
LIST_HEAD(repoll); // 声明 struct list_head repoll 变量并初始化;
local_irq_disable();
list_splice_init(&sd->poll_list, &list);
local_irq_enable();
for (;;) {
if list_empty(&list) {
if !sd_has_rps_ipi_waiting(sd) && list_empty(&repoll) // RPS 相关逻辑,IPI 进程间中断
goto out;
break;
}
struct napi_struct *n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll); // 执行网卡驱动注册的 poll() 方法,返回的是处理的数据帧数量,
// 函数返回时,那些数据帧都已经发送到上层栈进行处理了。
if (budget <= 0 || time_after_eq(jiffies, time_limit)) { // budget 或 time limit 用完了
sd->time_squeeze++; // 更新 softnet_data.time_squeeze 计数
break;
}
}
local_irq_disable();
list_splice_tail_init(&sd->poll_list, &list);
list_splice_tail(&repoll, &list);
list_splice(&list, &sd->poll_list);
if !list_empty(&sd->poll_list) // 在给定的 time/budget 内,没有能够处理完全部 napi
__raise_softirq_irqoff(NET_RX_SOFTIRQ); // 关闭 NET_RX_SOFTIRQ 类型软中断,将 CPU 让给其他任务用,
// 主动让出 CPU,不要让这种 softirq 独占 CPU 太久。
// Receive Packet Steering:唤醒其他 CPU 从 ring buffer 收包。
net_rps_action_and_irq_enable(sd);
out:
__kfree_skb_flush();
}
在给定的预算范围内,函数遍历当前 CPU 队列的 NAPI 变量列表,依次执行其 poll 方法。
有三种情况会退出循环:
list_empty(&list) == true
:说明 list 已经为空,没有 NAPI 需要 poll 了(这个 list 是怎么初始化和更新的,需要好好研究);budget <= 0
:说明收包的数量已经 >= netdev_budget(例如 2400 个,默认是 300), 或者time_after_eq(jiffies, time_limit) == true
:说明累计运行时间已经 >= netdev_budget_us(例如 8ms,默认是 2ms)netdev_budget
和 netdev_budget_usecs
这俩参数控制每次 softirq 线程的收包预算,
这两个预算限制中,任何一个达到后都将导致退出本次 softirq 处理。二者的默认值:
// net/core/dev.c
int netdev_budget __read_mostly = 300;
// Must be at least 2 jiffes to guarantee 1 jiffy timeout
unsigned int __read_mostly netdev_budget_usecs = 2 * USEC_PER_SEC / HZ; // 2000us
注意这些预算是每个 CPU 上所有 NAPI 实例共享的总预算。
另外,这两个变量都是 sysctl 配置项,可以按需调优:
$ sudo sysctl -a | grep netdev_budget
net.core.netdev_budget = 300 # 300 个包
net.core.netdev_budget_usecs = 2000 # 2ms
net_rx_action()
也体现出了内核是如何防止 softirq 收包过程霸占过多 CPU 的。 这也是多队列网卡应该精心调整 IRQ Affinity 的原因。
最终导致该 CPU 无法及时接收它负责的 RX 队列中的所有包。
如果没有足够的 CPU 来分散网卡硬中断,可以考虑增大以上两个 budget 配置项,使每个 CPU 能有更多的预算来处理收包,这会增加 CPU 使用量(top
等命令看到的 sitime
或 si
部分),但可以减少延迟,因为软中断收包更加及时。
time_squeeze
计数:ringbuffer 还有包,但 softirq 预算用完了 if (budget <= 0 || time_after_eq(jiffies, time_limit)) { // budget 或 time limit 用完了
sd->time_squeeze++; // 更新 softnet_data.time_squeeze 计数
break;
}
softnet_data.time_squeeze
字段记录的是满足如下条件的次数: ring buffer 中还有包等待接收,但本次 softirq 的 budget 已经用完了。 这对理解网络处理的瓶颈至关重要。
需要说明的是,time_squeeze 在 softnet_data 的所有字段中是非常特殊的一个:
xx_drop/xx_errors
,会在多个地方更新,所以不能从计数增 加直接定位到是哪段代码或哪个逻辑导致的;另外,time_squeeze
升高并不一定表示系统有丢包,只是表示 softirq 的收包预算用完时,RX queue 中仍然有包等待处理。只要 RX queue 在下次 softirq 处理之前没有溢出,那就不会因为 time_squeeze 而导致丢包;但如果有持续且 大量的 time_squeeze,那确实有 RX queue 溢出导致丢包的可能,需要结合其他监控指标 来看。在这种情况下,调大 budget 参数是更合理的选择:与其让网卡频繁触发 IRQ->SoftIRQ 来收包,不如让 SoftIRQ 一次多执行一会,处理掉 RX queue 中尽量多的 包再返回,因为中断和线程切换开销也是很大的。
接下来重新回到主线,看一次 napi poll() 调用到网卡 handler 的具体过程。
napi_poll() -> mlx5e_napi_poll() -> mlx5e_poll_rx_cq()
设备驱动在注册其 poll() 方法时,会传一个 weight 参数,在 Mellanox 中是 hardcode 64:
netif_napi_add(netdev, &c->napi, mlx5e_napi_poll, 64);
这个参数控制了网卡一次 poll() 时,最多允许处理的包数(下面会看到,变量名也叫 budget)。
$ sudo sysctl -a | grep dev_weight
net.core.dev_weight = 64
net.core.dev_weight_rx_bias = 1
net.core.dev_weight_tx_bias = 1
NAPI 子系统和设备驱动之间的就是否关闭 NAPI 有一份契约:
poll()
用完了它的全部 weight,那它不要更改 NAPI 状态。接下来 net_rx_action()
会做的;poll()
没有用完全部 weight,那它必须关闭 NAPI。下次有硬件 中断触发,驱动的硬件处理函数调用 napi_schedule()
时,NAPI 会被重新打开。mlx5e_napi_poll() -> mlx5e_poll_rx_cq()
对于本文所用的网卡,注册的 poll()
方法是 mlx5e_napi_poll()
:
// drivers/net/ethernet/mellanox/mlx5/core/en_txrx.c
int mlx5e_napi_poll(struct napi_struct *napi, int budget) { // 这里的 budget 就是上面的 weight
struct mlx5e_channel *c = container_of(napi, struct mlx5e_channel, napi);
struct mlx5e_ch_stats *ch_stats = c->stats;
struct mlx5e_xdpsq *xsksq = &c->xsksq;
struct mlx5e_rq *xskrq = &c->xskrq;
struct mlx5e_rq *rq = &c->rq;
bool aff_change = false;
bool busy_xsk = false;
bool busy = false;
int work_done = 0;
bool xsk_open;
xsk_open = test_bit(MLX5E_CHANNEL_STATE_XSK, c->state);
ch_stats->poll++;
for (i = 0; i < c->num_tc; i++) // 遍历 TX channel,发包,释放 ring buffer
busy |= mlx5e_poll_tx_cq(&c->sq[i].cq, budget); // 每次发送最多 MLX5E_TX_CQ_POLL_BUDGET 个包
// budget 结束时还有包没发完,则返回 busy=true。
busy |= mlx5e_poll_xdpsq_cq(&c->xdpsq.cq); // XDP TX 队列发包,有自己独立的 budget 预算
if (c->xdp)
busy |= mlx5e_poll_xdpsq_cq(&c->rq_xdpsq.cq); // XDP RX 队列收包,有自己独立的 budget 预算
if (likely(budget)) { // 如果 budget!=0,则 poll RX ring 来收包;否则就跳过 RX ring
if (xsk_open)
work_done = mlx5e_poll_rx_cq(&xskrq->cq, budget);
if (likely(budget - work_done))
work_done += mlx5e_poll_rx_cq(&rq->cq, budget - work_done);
busy |= work_done == budget;
}
mlx5e_poll_ico_cq(&c->icosq.cq);
if (mlx5e_poll_ico_cq(&c->async_icosq.cq))
clear_bit(MLX5E_SQ_STATE_PENDING_XSK_TX, &c->async_icosq.state);
busy |= INDIRECT_CALL_2(rq->post_wqes, mlx5e_post_rx_mpwqes, mlx5e_post_rx_wqes, rq);
if (xsk_open) {
busy |= mlx5e_poll_xdpsq_cq(&xsksq->cq);
busy_xsk |= mlx5e_napi_xsk_post(xsksq, xskrq);
}
busy |= busy_xsk;
if (busy) {
if (likely(mlx5e_channel_no_affinity_change(c))) {
work_done = budget;
goto out;
}
ch_stats->aff_change++;
aff_change = true;
if (budget && work_done == budget)
work_done--;
}
if (unlikely(!napi_complete_done(napi, work_done)))
goto out;
ch_stats->arm++;
for (i = 0; i < c->num_tc; i++) {
mlx5e_handle_tx_dim(&c->sq[i]);
mlx5e_cq_arm(&c->sq[i].cq);
}
mlx5e_handle_rx_dim(rq);
mlx5e_cq_arm(&rq->cq);
mlx5e_cq_arm(&c->icosq.cq);
mlx5e_cq_arm(&c->async_icosq.cq);
mlx5e_cq_arm(&c->xdpsq.cq);
if (xsk_open) {
mlx5e_handle_rx_dim(xskrq);
mlx5e_cq_arm(&xsksq->cq);
mlx5e_cq_arm(&xskrq->cq);
}
if (unlikely(aff_change && busy_xsk)) {
mlx5e_trigger_irq(&c->icosq);
ch_stats->force_irq++;
}
return work_done;
}
Mellanox 是 TX 和 RX 是一起的,成为 combined queue。所以在一个 poll 操作里会依次处理 TX 和 RX 队列。
接下来只看普通的 RX 队列处理。
mlx5e_poll_rx_cq() -> mlx5e_handle_rx_cqe()
Mellanox 术语:
// drivers/net/ethernet/mellanox/mlx5/core/en_rx.c
int mlx5e_poll_rx_cq(struct mlx5e_cq *cq, int budget) { // budget <= weight (hardcode 64)
struct mlx5e_rq *rq = container_of(cq, struct mlx5e_rq, cq);
struct mlx5_cqwq *cqwq = &cq->wq; // CQ 的 WQ
int work_done = 0;
cqe = mlx5_cqwq_get_cqe(cqwq);
do {
mlx5_cqwq_pop(cqwq); // 出队,解除 DMA 映射,释放 ring buffer
INDIRECT_CALL_2(rq->handle_rx_cqe, mlx5e_handle_rx_cqe_mpwrq, mlx5e_handle_rx_cqe, rq, cqe);
} while ((++work_done < budget) && (cqe = mlx5_cqwq_get_cqe(cqwq)));
out:
if (rcu_access_pointer(rq->xdp_prog)) // 如果 attach 了 XDP 程序,
mlx5e_xdp_rx_poll_complete(rq); // 处理 XDP RX 队列
mlx5_cqwq_update_db_record(cqwq); // 更新 ring buffer 元信息
wmb(); // 确保 CQ 空间(ring buffer)已经释放了,这样才能放新的 CQE 进来
return work_done; // 返回处理的包数。注意:XDP 的包并没有统计在这里面
}
也是个循环,退出条件很明确:
mlx5e_handle_rx_cqe() -> napi_gro_receive()
根据 rq->handle_rx_cqe
是哪种情况,分别调用相应的 handler,最终都是调用到 napi_gro_receive()
:
static void mlx5e_handle_rx_cqe(struct mlx5e_rq *rq, struct mlx5_cqe64 *cqe) {
struct mlx5_wq_cyc *wq = &rq->wqe.wq;
u16 ci = mlx5_wq_cyc_ctr2ix(wq, be16_to_cpu(cqe->wqe_counter));
struct mlx5e_wqe_frag_info *wi = get_frag(rq, ci);
u32 cqe_bcnt = be32_to_cpu(cqe->byte_cnt);
// 创建 skb,从 ringbuffer DMA 区域复制数据包;从这一步开始,才有了内核 skb 结构体
struct sk_buff *skb = INDIRECT_CALL_2(rq->wqe.skb_from_cqe,
mlx5e_skb_from_cqe_linear,
mlx5e_skb_from_cqe_nonlinear,
rq, cqe, wi, cqe_bcnt);
if (!skb) { // 可能是 XDP 包
if (__test_and_clear_bit(MLX5E_RQ_FLAG_XDP_XMIT, rq->flags)) {
goto wq_cyc_pop; // do not return page to cache, it will be returned on XDP_TX completion.
}
goto free_wqe;
}
mlx5e_complete_rx_cqe(rq, cqe, cqe_bcnt, skb);
if (mlx5e_cqe_regb_chain(cqe))
if (!mlx5e_tc_update_skb(cqe, skb)) { // TC 信息
dev_kfree_skb_any(skb);
goto free_wqe;
}
napi_gro_receive(rq->cq.napi, skb);
free_wqe:
mlx5e_free_rx_wqe(rq, wi, true);
wq_cyc_pop:
mlx5_wq_cyc_pop(wq);
}
skb = mlx5e_skb_from_cqe_linear()
:创建内核数据包这个函数从 ringbuffer DMA 区域复制数据,然后初始化一个 struct sk_buff *skb
结构体变量,也就是我们最常打交道的内核协议栈中的数据包 —— 也就是说,本文从开始到现在,网络处理流程中不出现任何错误的话,我们才终于在内核中 有了一个数据包(skb),至于那些依赖 skb 的邻居子系统、路由子系统、 Netfilter/iptables、TC、各种网络 BPF 程序(XDP 除外),都还在后面(甚至很远的地方)。 可以体会到内核网络栈是多么复杂(而精巧)。
// drivers/net/ethernet/mellanox/mlx5/core/en_rx.c
static struct sk_buff *
mlx5e_skb_from_cqe_linear(struct mlx5e_rq *rq, struct mlx5_cqe64 *cqe, struct mlx5e_wqe_frag_info *wi, u32 cqe_bcnt) {
struct mlx5e_dma_info *di = wi->di;
u16 rx_headroom = rq->buff.headroom;
struct xdp_buff xdp;
struct sk_buff *skb;
void *va, *data;
u32 frag_size;
va = page_address(di->page) + wi->offset;
data = va + rx_headroom;
frag_size = MLX5_SKB_FRAG_SZ(rx_headroom + cqe_bcnt);
dma_sync_single_range_for_cpu(rq->pdev, di->addr, wi->offset,
frag_size, DMA_FROM_DEVICE);
net_prefetchw(va); /* xdp_frame data area */
net_prefetch(data);
mlx5e_fill_xdp_buff(rq, va, rx_headroom, cqe_bcnt, &xdp);
if (mlx5e_xdp_handle(rq, di, &cqe_bcnt, &xdp))
return NULL; /* page/packet was consumed by XDP */
rx_headroom = xdp.data - xdp.data_hard_start;
frag_size = MLX5_SKB_FRAG_SZ(rx_headroom + cqe_bcnt);
skb = mlx5e_build_linear_skb(rq, va, frag_size, rx_headroom, cqe_bcnt);
if (unlikely(!skb))
return NULL;
/* queue up for recycling/reuse */
page_ref_inc(di->page);
return skb;
}
static inline
struct sk_buff *mlx5e_build_linear_skb(struct mlx5e_rq *rq, void *va, u32 frag_size, u16 headroom, u32 cqe_bcnt) {
struct sk_buff *skb = build_skb(va, frag_size); // 通用内核函数
skb_reserve(skb, headroom);
skb_put(skb, cqe_bcnt);
return skb;
}
这里就是第二次数据复制的地方。
Fig. Steps of Linux kernel receiving data process and the corresponding chapters in this post
mlx5e_complete_rx_cqe() -> mlx5e_build_rx_skb()
:初始化 L2 header、IPSec、时间戳等static inline void
mlx5e_complete_rx_cqe(struct mlx5e_rq *rq, struct mlx5_cqe64 *cqe, u32 cqe_bcnt, struct sk_buff *skb) {
struct mlx5e_rq_stats *stats = rq->stats;
stats->packets++;
stats->bytes += cqe_bcnt;
mlx5e_build_rx_skb(cqe, cqe_bcnt, rq, skb); // 初始化 L2 header
}
static inline void
mlx5e_build_rx_skb(struct mlx5_cqe64 *cqe, u32 cqe_bcnt, struct mlx5e_rq *rq, struct sk_buff *skb)
{
u8 lro_num_seg = be32_to_cpu(cqe->srqn) >> 24;
struct net_device *netdev = rq->netdev;
skb->mac_len = ETH_HLEN; // L2 header length
mlx5e_tls_handle_rx_skb(rq, skb, cqe, &cqe_bcnt);
if (unlikely(mlx5_ipsec_is_rx_flow(cqe)))
mlx5e_ipsec_offload_handle_rx_skb(netdev, skb, cqe);
if (lro_num_seg > 1) {
mlx5e_lro_update_hdr(skb, cqe, cqe_bcnt);
skb_shinfo(skb)->gso_size = DIV_ROUND_UP(cqe_bcnt, lro_num_seg);
stats->packets += lro_num_seg - 1;
stats->lro_packets++;
stats->lro_bytes += cqe_bcnt;
}
if (unlikely(mlx5e_rx_hw_stamp(rq->tstamp))) // 硬件打时间戳
skb_hwtstamps(skb)->hwtstamp = mlx5_timecounter_cyc2time(rq->clock, get_cqe_ts(cqe));
skb_record_rx_queue(skb, rq->ix);
if (likely(netdev->features & NETIF_F_RXHASH))
mlx5e_skb_set_hash(cqe, skb);
if (cqe_has_vlan(cqe)) { // 添加 VLAN header
__vlan_hwaccel_put_tag(skb, htons(ETH_P_8021Q), be16_to_cpu(cqe->vlan_info));
stats->removed_vlan_packets++;
}
skb->mark = be32_to_cpu(cqe->sop_drop_qpn) & MLX5E_TC_FLOW_ID_MASK;
mlx5e_handle_csum(netdev, cqe, rq, skb, !!lro_num_seg); // L2 校验和
if (unlikely(cqe->ml_path & MLX5E_CE_BIT_MASK))
mlx5e_enable_ecn(rq, skb); // L2 纠错
skb->protocol = eth_type_trans(skb, netdev); // 函数里面还会设置 skb->pkt_type
}
eth_type_trans()
定义在 net/ethernet/eth.c
,这个函数除了返回 skb 的 ethernet protocol 类型, 还会设置 skb->pkt_type,可能类型:
mlx5e_build_rx_skb() -> mlx5e_post_rx_wqes()
:重新填充 RX ring已经完成的 CQ WQ (简单来说就是 RX ringbuffer)会释放掉,所以最后需要重新分配 CQ WQ:
INDIRECT_CALLABLE_SCOPE bool
mlx5e_post_rx_wqes(struct mlx5e_rq *rq) {
struct mlx5_wq_cyc *wq = &rq->wqe.wq;
u8 wqe_bulk = rq->wqe.info.wqe_bulk;
if (mlx5_wq_cyc_missing(wq) < wqe_bulk)
return false;
do {
u16 head = mlx5_wq_cyc_get_head(wq);
mlx5e_alloc_rx_wqes(rq, head, wqe_bulk);
mlx5_wq_cyc_push_n(wq, wqe_bulk);
} while (mlx5_wq_cyc_missing(wq) >= wqe_bulk);
dma_wmb(); /* ensure wqes are visible to device before updating doorbell record */
mlx5_wq_cyc_update_db_record(wq);
return !!err;
}
执行完这些网卡相关的 poll() 逻辑之后,最后通过 napi_gro_receive()
重新回到内核通用逻辑。
napi_gro_receive()
Large Receive Offloading (LRO) 是一个硬件优化,GRO 是 LRO 的一种软件实现。
GRO 是一种较老的硬件特性(LRO)的软件实现,功能是对分片的包进行重组然后交给更上层,以提高吞吐。 GRO 给协议栈提供了一次将包交给网络协议栈之前,对其检查校验和 、修改协议头和发送应答包(ACK packets)的机会。
enqueue_backlog()
将这个分片放到某个 CPU 的包队列;当包重组完成后,会交会协议栈网上送;LRO 和 GRO 的主要思想都是通过合并“足够类似”的包来减少传送给网络栈的包数,这有 助于减少 CPU 的使用量。例如,考虑大文件传输的场景,包的数量非常多,大部分包都是一 段文件数据。相比于每次都将小包送到网络栈,可以将收到的小包合并成一个很大的包再送 到网络栈。GRO 使协议层只需处理一个 header,而将包含大量数据的整个大包送到用 户程序。
这类优化方式的缺点是信息丢失:包的 option 或者 flag 信息在合并时会丢 失。这也是为什么大部分人不使用或不推荐使用 LRO 的原因。LRO 的实现,一般来说,对 合并包的规则非常宽松。GRO 是 LRO 的软件实现,但是对于包合并的规则更严苛。
顺便说一下,如果用 tcpdump 抓包,有时会看到机器收到了看起来不现实的、非常大的包, 这很可能是你的系统开启了 GRO;后面会看到,tcpdump 的抓包点(捕获包的 tap)在整个栈的更后面一些,在 GRO 之后。
Fig. GRO receive process: heavy-lifting work before entering kernel stack
napi_gro_receive()
负责处理数据,并将数据送到协议栈,大部分 相关的逻辑在函数 dev_gro_receive()
里实现:
// net/core/dev.c
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb) {
skb_mark_napi_id(skb, napi);
skb_gro_reset_offset(skb, 0);
return napi_skb_finish(napi, skb, dev_gro_receive(napi, skb));
}
dev_gro_receive()
// https://github.com/torvalds/linux/blob/v5.10/net/core/dev.c
static struct list_head offload_base __read_mostly; // offload list
static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb) {
struct list_head *gro_head = gro_list_prepare(napi, skb);
struct sk_buff *pp;
struct packet_offload *ptype; // packet offload 相关结构体
struct list_head *head = &offload_base;
list_for_each_entry_rcu(ptype, head, list) {
if (ptype->type != skb->protocol || !ptype->callbacks.gro_receive)
continue; // L2/L3 协议类型不同,或者没有回调方法,只能跳过
skb_set_network_header(skb, skb_gro_offset(skb));
skb_reset_mac_len(skb);
NAPI_GRO_CB(skb)->same_flow = 0;
NAPI_GRO_CB(skb)->flush = skb_is_gso(skb) || skb_has_frag_list(skb);
NAPI_GRO_CB(skb)->free = 0;
NAPI_GRO_CB(skb)->encap_mark = 0;
NAPI_GRO_CB(skb)->recursion_counter = 0;
NAPI_GRO_CB(skb)->is_fou = 0;
NAPI_GRO_CB(skb)->gro_remcsum_start = 0;
/* Setup for GRO checksum validation */
...
pp = INDIRECT_CALL_INET(ptype->callbacks.gro_receive, ipv6_gro_receive, inet_gro_receive, gro_head, skb);
break;
}
if (&ptype->list == head)
goto normal;
u32 hash = skb_get_hash_raw(skb) & (GRO_HASH_BUCKETS - 1);
if (pp) { // 如果 pp 非空,说明这已经是一个完成的包,可以送到更上层去处理了
skb_list_del_init(pp);
napi_gro_complete(napi, pp); // 将包送到协议栈
napi->gro_hash[hash].count--;
}
// 执行到这里说明这个包不完整,需要执行 GRO 合并
int same_flow = NAPI_GRO_CB(skb)->same_flow;
if (same_flow) // 这个包属于已经存在的一个 flow,需要和其他包合并
goto ok;
if (NAPI_GRO_CB(skb)->flush) // 合并完成了,已经拿到完整包
goto normal;
if (unlikely(napi->gro_hash[hash].count >= MAX_GRO_SKBS)) { // 新建一个 entry 加到本 CPU 的 NAPI 变量的 gro_list
gro_flush_oldest(napi, gro_head);
} else {
napi->gro_hash[hash].count++;
}
NAPI_GRO_CB(skb)->count = 1;
NAPI_GRO_CB(skb)->age = jiffies;
NAPI_GRO_CB(skb)->last = skb;
skb_shinfo(skb)->gso_size = skb_gro_len(skb);
list_add(&skb->list, gro_head);
ret = GRO_HELD;
pull:
int grow = skb_gro_offset(skb) - skb_headlen(skb);
if (grow > 0)
gro_pull_from_frag0(skb, grow);
ok:
if (napi->gro_hash[hash].count) {
if (!test_bit(hash, &napi->gro_bitmask))
__set_bit(hash, &napi->gro_bitmask);
} else if (test_bit(hash, &napi->gro_bitmask)) {
__clear_bit(hash, &napi->gro_bitmask);
}
return ret;
normal:
ret = GRO_NORMAL;
goto pull;
}
首先遍历 packet type 列表,找到合适的 receive 方法:
$ cat /proc/net/ptype # packet type (skb->protocol)
Type Device Function
0800 ip_rcv
0806 arp_rcv
86dd ipv6_rcv
然后遍历一个 offload filter 列表,如果高层协议(L3/L4)认为其中一些数据属于 GRO 处理的范围,就会允许其对数据进行操作。 协议层的返回是 struct sk_buff *
指针,指针非空表示这个 packet 需要做 GRO,非空表示可以送到协议栈 。另外,也可以通过这种方式传递一些协议相关的信息。例如,TCP 协 议需要判断是否以及合适应该将一个 ACK 包合并到其他包里。
如果协议层提示是时候 flush GRO packet 了,那就到下一步处理了。这发生在 napi_gro_complete()
,会进一步调用相应协议的 gro_complete()
回调方法,然后调用 netif_receive_skb_list_internal()
将包送到协议栈。
napi_gro_complete()
static int napi_gro_complete(struct napi_struct *napi, struct sk_buff *skb) {
if (NAPI_GRO_CB(skb)->count == 1) {
skb_shinfo(skb)->gso_size = 0;
goto out;
}
struct packet_offload *ptype;
struct list_head *head = &offload_base;
list_for_each_entry_rcu(ptype, head, list) {
if (ptype->type != skb->protocol || !ptype->callbacks.gro_complete)
continue;
INDIRECT_CALL_INET(ptype->callbacks.gro_complete, ipv6_gro_complete, inet_gro_complete, skb, 0);
break;
}
out:
gro_normal_one(napi, skb, NAPI_GRO_CB(skb)->count);
return NET_RX_SUCCESS;
}
// net/ipv4/af_inet.c
int inet_gro_complete(struct sk_buff *skb, int nhoff) {
struct iphdr *iph = (struct iphdr *)(skb->data + nhoff); // IP header 起始位置
if (skb->encapsulation) {
skb_set_inner_protocol(skb, cpu_to_be16(ETH_P_IP));
skb_set_inner_network_header(skb, nhoff);
}
__be16 newlen = htons(skb->len - nhoff);
csum_replace2(&iph->check, iph->tot_len, newlen); // IP header checksum
iph->tot_len = newlen;
const struct net_offload *ops = rcu_dereference(inet_offloads[iph->protocol]);
/* Only need to add sizeof(*iph) to get to the next hdr below
* because any hdr with option will have been flushed in
* inet_gro_receive().
*/
INDIRECT_CALL_2(ops->callbacks.gro_complete, tcp4_gro_complete, udp4_gro_complete, skb, nhoff + sizeof(*iph));
}
napi_skb_finish() -> gro_normal_one() -> gro_normal_list()
dev_gro_receive()
的返回结果是 napi_skb_finish()
的参数,
static gro_result_t napi_skb_finish(struct napi_struct *napi, struct sk_buff *skb, gro_result_t ret) {
switch (ret) {
case GRO_NORMAL:
gro_normal_one(napi, skb, 1); break;
case GRO_DROP:
kfree_skb(skb); break;
case GRO_MERGED_FREE: // 已经被合并,释放 skb
if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)
napi_skb_free_stolen_head(skb);
else
__kfree_skb(skb);
break;
case GRO_HELD:
case GRO_MERGED:
case GRO_CONSUMED:
break;
}
return ret;
}
如果是 GRO_NORMAL,会调用 gro_normal_one()
,它会更新当前 napi->rx_count 计数, 当数量足够多时,将调用 gro_normal_list()
,后者会将包一次性都送到协议栈:
// Queue one GRO_NORMAL SKB up for list processing. If batch size exceeded, pass the whole batch up to the stack.
static void gro_normal_one(struct napi_struct *napi, struct sk_buff *skb, int segs) {
napi->rx_count += segs;
if (napi->rx_count >= gro_normal_batch) // int gro_normal_batch __read_mostly = 8;
gro_normal_list(napi); //
}
这里的阈值 gro_normal_batch 默认是 8,可以通过 sysctl 配置
:
$ sudo sysctl net.core.gro_normal_batch
net.core.gro_normal_batch = 8
gro_normal_list() -> netif_receive_skb_list_internal()
:批量将包送到协议栈gro_normal_list()
调用 netif_receive_skb_list_internal()
将一组包一次性送到协议栈:
// Pass the currently batched GRO_NORMAL SKBs up to the stack.
static void gro_normal_list(struct napi_struct *napi) {
if (!napi->rx_count) // 没有包的话直接返回,什么都不做
return;
netif_receive_skb_list_internal(&napi->rx_list); // 注意这个函数没有返回值,所有错误都在内部处理
INIT_LIST_HEAD(&napi->rx_list); // 把这个 NAPI 重新入队,并放到队首(下次最后一个被处理,因为刚处理过一次了)
napi->rx_count = 0; // 重置计数
}
每个 NAPI 变量都会运行在相应 CPU 的软中断的上下文中。而且,触发硬中断的这个 CPU 接下来会负责执行相应的软中断处理函数来收包。换言之,同一个 CPU 既处理硬中断,又 处理相应的软中断。
另一方面,DMA 区域是网卡与内核协商之后预留的内存,由于这块内存区域是有限的, 如果收到的包非常多,单个 CPU 来不及取走这些包,新来的包就会被丢弃。 一些网卡有能力将接收到的包写到多个不同的内存区域,每个区域都是独立的接收队列,即多队列功能。 这样操作系统就可以利用多个 CPU(硬件层面)并行处理收到的包。
如今大部分网卡都在硬件层支持多队列。这意味着收进来的包会被通过 DMA 放到 位于不同内存的队列上,而不同的队列有相应的 NAPI 变量管理软中断 poll()
过程。因此, 多个 CPU 同时处理从网卡来的中断,处理收包过程。 这个特性被称作 RSS(Receive Side Scaling,接收端水平扩展)。
RPS(Receive Packet Steering,接收包控制,接收包引导)是 RSS 的一种软件实现。
作者仅向会员开放了此篇文章。开通会员即可立即解锁此文章和其他会员专属权益。