RDMA Stack实现分析
希望通过这篇文章来梳理
- libibverbs提供的用户态Verbs API是如何进行设备操作/数据通信的,以Soft-RXE实现为例。
- 基于kernel: 5.19-rc1
- 基于rdma-core: v34.0
RDMA Stack的内核支持
参考:https://www.kernel.org/doc/html/latest/translations/zh_CN/infiniband/user_verbs.html
RDMA Stack分为控制面和数据面,图1中给出了Mellanox MLX5 RDMA stack 涉及到的用户态库以及内核模块。图二中给出了一个控制面API在用户态libibverbs与内核之间的调用关系。
- 控制面:
ib_uverbs.ko
内核模块会对每个IB设备生成一个字符设备文件。通常的形式为/dev/infiniband/uverbs%d
,用户态程序通过ioctl对字符设备进行属性的读写。- libibverbs 中的控制面相关Verbs API利用了
ib_uverbs.ko
暴露出来的字符设备对RDMA设备操作。
- 数据面
- 通常是通过直接写入
mmap()
到用户空间的硬件寄存器来完成 的,没有系统调用或上下文切换到内核。
- 通常是通过直接写入
图1: Mellanox OFED stack arch.)
图2: libibverbs中的控制面API与内核的调用关系: 以ibv_create_qp为例- 控制面:
1. ibv_open_device
用户态打开IB设备的API
1.1 RXE调用栈分析
1 |
|
open_cdev
会打开uverbs字符设备,文件描述符保存在cmd_fd中
1
2
3
4
5
6
7// rdma-core/libibverbs/device.c
cmd_fd = open_cdev(verbs_device->sysfs->sysfs_name,
verbs_device->sysfs->sysfs_cdev);
// rdma-core/util/open_cdev.c
// in open_cdev funcion
fd = open("/dev/infiniband/uverbs%d", O_RDWR | O_CLOEXEC)rxe_alloc_context
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// rdma-core/libibverbs/device.c
/*
* cmd_fd ownership is transferred into alloc_context, if it fails
* then it closes cmd_fd and returns NULL
*/
context_ex = verbs_device->ops->alloc_context(device, cmd_fd, private_data);
// rdma-core/providers/rxe/rxe.c
static struct verbs_context *rxe_alloc_context(struct ibv_device *ibdev,
int cmd_fd,
void *private_data)
{
struct rxe_context *context;
struct ibv_get_context cmd;
struct ib_uverbs_get_context_resp resp;
context = verbs_init_and_alloc_context(ibdev, cmd_fd, context, ibv_ctx,
RDMA_DRIVER_RXE);
...
ibv_cmd_get_context(&context->ibv_ctx, &cmd, sizeof(cmd),
&resp, sizeof(resp))
...
verbs_set_ops(&context->ibv_ctx, &rxe_ctx_ops);
...
}verbs_init_and_alloc_context
&((typeof(context))((void *)0))->ibv_ctx
, 类似于offset_of
宏,计算ibv_ctx
在rxe_context
结构体内偏移
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// rdma-core/providers/rxe/rxe.h
struct rxe_context {
struct verbs_context ibv_ctx;
};
// rdma-core/providers/rxe/rxe.c
// expand macro
context = ((typeof(context))_verbs_init_and_alloc_context(
ibdev,
cmd_fd,
sizeof(*context),
&((typeof(context))((void *)0))->ibv_ctx,
(RDMA_DRIVER_RXE)))
// rdma-core/libibverbs/device.c
/*
* Allocate and initialize a context structure. This is called to create the
* driver wrapper, and context_offset is the number of bytes into the wrapper
* structure where the verbs_context starts.
*/
void *_verbs_init_and_alloc_context(struct ibv_device *device, int cmd_fd,
size_t alloc_size,
struct verbs_context *context_offset,
uint32_t driver_id)
{
void *drv_context;
struct verbs_context *context;
drv_context = calloc(1, alloc_size);
...
context = drv_context + (uintptr_t)context_offset;
verbs_init_context(context, device, cmd_fd, driver_id)
...
return drv_context;
}verbs_init_context
context->cmd_fd = cmd_fd;
记录uverbs设备文件的描述符- 用
verbs_dummy_ops
初始化ibv_context->ibv_context_ops
回调函数结构体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// rdma-core/libibverbs/device.c
int verbs_init_context(struct verbs_context *context_ex,
struct ibv_device *device, int cmd_fd,
uint32_t driver_id)
{
struct ibv_context *context = &context_ex->context;
ibverbs_device_hold(device);
context->device = device;
context->cmd_fd = cmd_fd;
context->async_fd = -1;
pthread_mutex_init(&context->mutex, NULL);
context_ex->context.abi_compat = __VERBS_ABI_IS_EXTENDED;
context_ex->sz = sizeof(*context_ex);
context_ex->priv = calloc(1, sizeof(*context_ex->priv));
...
context_ex->priv->driver_id = driver_id;
verbs_set_ops(context_ex, &verbs_dummy_ops);
context_ex->priv->use_ioctl_write = has_ioctl_write(context);
return 0;
}ibv_cmd_get_context
- ioctl 传递给kernel的参数都会封装在ibv_command_buffer 结构体中,这里声明arg buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// in rdma-core/providers/rxe/rxe.c, call function
ibv_cmd_get_context(&context->ibv_ctx, &cmd, sizeof(cmd),
&resp, sizeof(resp));
// rdma-core/libibverbs/cmd_device.c
int ibv_cmd_get_context(struct verbs_context *context_ex,
struct ibv_get_context *cmd, size_t cmd_size,
struct ib_uverbs_get_context_resp *resp,
size_t resp_size)
{
// DECLARE_CMD_BUFFER_COMPAT(cmdb, UVERBS_OBJECT_DEVICE,
// UVERBS_METHOD_GET_CONTEXT, cmd, cmd_size,
// resp, resp_size);
// expands macro
struct ibv_command_buffer cmdb[((sizeof(struct ibv_command_buffer) +
sizeof(struct ib_uverbs_attr) * (__cmdbtotal) +
sizeof(struct ibv_command_buffer) - 1) /
sizeof(struct ibv_command_buffer))];
return cmd_get_context(context_ex, cmdb);
}verbs_set_ops
- 将
verbs_context
结构体中的verbs_context
中的回调函数,设置为rxe声明的回调函数(默认为dummy_ops) - 这里不太清楚为什么
verbs_context
内部的verbs_ex_private
结构体又保存了一份回调函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// in rdma-core/providers/rxe/rxe.c, call function
verbs_set_ops(&context->ibv_ctx, &rxe_ctx_ops);
// rdma-core/libibverbs/dummy_ops.c
void verbs_set_ops(struct verbs_context *vctx,
const struct verbs_context_ops *ops)
{
struct verbs_ex_private *priv = vctx->priv;
struct ibv_context_ops *ctx = &vctx->context.ops;
...
// expand macro
if (ops->alloc_dm) {
priv->ops.alloc_dm = ops->alloc_dm;
(vctx)->alloc_dm = ops->alloc_dm;
}
...
- 将
总结ibv_open_device
- 分配context结构体,打开uverbs字符设备
- 设置用于传递ioctl参数的 arg buffer
- 设置回调函数,这些回调函数会被ibv_xxx Verbs调用
2. ibv_create_qp
Verbs
2.1 RXE调用栈分析
1 |
|
ibv_create_qp
1
2
3
4
5
6
7
8
9
10// rdma-core/libibverbs/verbs.c
LATEST_SYMVER_FUNC(ibv_create_qp, 1_1, "IBVERBS_1.1",
struct ibv_qp *, // return type
struct ibv_pd *pd, // arg1
struct ibv_qp_init_attr *qp_init_attr) // arg2
{
struct ibv_qp *qp = get_ops(pd->context)->create_qp(pd, qp_init_attr);
return qp;
}rxe_create_qp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// rdma-core/providers/rxe/rxe.c
static struct ibv_qp *rxe_create_qp(struct ibv_pd *ibpd,
struct ibv_qp_init_attr *attr)
{
struct ibv_create_qp cmd = {};
struct urxe_create_qp_resp resp = {};
struct rxe_qp *qp;
int ret;
...
ret = ibv_cmd_create_qp(ibpd, &qp->vqp.qp, attr, &cmd, sizeof(cmd),
&resp.ibv_resp, sizeof(resp));
...
ret = map_queue_pair(ibpd->context->cmd_fd, qp, attr,
&resp.drv_payload);
...
}- 其中 ibv_create_qp/urxe_create_qp_resp 这两个结构体由宏声明,分别将二者展开后
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// rdma-core/cmake-build-debug/include/infiniband/kern-abi.h
DECLARE_CMD(IB_USER_VERBS_CMD_CREATE_CQ, ibv_create_cq, ib_uverbs_create_cq);
// expands macro
struct ibv_create_qp {
struct ib_uverbs_cmd_hdr hdr;
union {
_STRUCT_ib_uverbs_create_qp;
struct ib_uverbs_create_qp core_payload;
};
};
typedef struct ibv_create_qp _ABI_REQ_STRUCT_IB_USER_VERBS_CMD_CREATE_QP;
typedef struct ib_uverbs_create_qp _KABI_REQ_STRUCT_IB_USER_VERBS_CMD_CREATE_QP;
typedef struct ib_uverbs_create_qp_resp
_KABI_RESP_STRUCT_IB_USER_VERBS_CMD_CREATE_QP;
enum { _ABI_ALIGN_IB_USER_VERBS_CMD_CREATE_QP = 4 };1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// rdma-core/providers/rxe/rxe-abi.h
DECLARE_DRV_CMD(urxe_create_qp, IB_USER_VERBS_CMD_CREATE_QP,
empty, rxe_create_qp_resp);
// expands macro
struct urxe_create_qp {
_ABI_REQ_STRUCT_IB_USER_VERBS_CMD_CREATE_QP ibv_cmd;
union {
struct {};
struct empty drv_payload;
};
};
struct urxe_create_qp_resp {
_KABI_RESP_STRUCT_IB_USER_VERBS_CMD_CREATE_QP ibv_resp;
union {
struct {
struct mminfo rq_mi;
struct mminfo sq_mi;
};
struct rxe_create_qp_resp drv_payload;
};
};
- 其中 ibv_create_qp/urxe_create_qp_resp 这两个结构体由宏声明,分别将二者展开后
ibv_cmd_create_qp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// rdma-core/libibverbs/cmd_qp.c
int ibv_cmd_create_qp(struct ibv_pd *pd,
struct ibv_qp *qp, struct ibv_qp_init_attr *attr,
struct ibv_create_qp *cmd, size_t cmd_size,
struct ib_uverbs_create_qp_resp *resp, size_t resp_size)
{
DECLARE_CMD_BUFFER_COMPAT(cmdb, UVERBS_OBJECT_QP,
UVERBS_METHOD_QP_CREATE, cmd, cmd_size, resp,
resp_size);
struct ibv_qp_init_attr_ex attr_ex = {};
int ret;
attr_ex.qp_context = attr->qp_context;
attr_ex.send_cq = attr->send_cq;
attr_ex.recv_cq = attr->recv_cq;
attr_ex.srq = attr->srq;
attr_ex.cap = attr->cap;
attr_ex.qp_type = attr->qp_type;
attr_ex.sq_sig_all = attr->sq_sig_all;
attr_ex.comp_mask = IBV_QP_INIT_ATTR_PD;
attr_ex.pd = pd;
ret = ibv_icmd_create_qp(pd->context, NULL, qp, &attr_ex, cmdb);
if (!ret)
memcpy(&attr->cap, &attr_ex.cap, sizeof(attr_ex.cap));
return ret;
}ibv_icmd_create_qp
- libibverbs使用了两套与内核通信的方式:传统的write方式,以及ioctl方式。
- 如果当前的ib device context不支持ioctl方式与内核暴露的字符设备通信的话,则需要fallback回write方式
- 可以参考之前版本的libibverbs对应实现:https://github.com/hustcat/rdma-core/blob/v15/libibverbs/cmd.c#L1063
- _CMD_BIT(cmd_name) 宏作用:把verbs_context_ops看成bitmap,每个回调函数占一个bit,计算传入的cmd_name占哪个bit
- sizeof(void*) 由编译时target platform决定,amd64下值等于8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// rdma-core/libibverbs/cmd_qp.c
static int ibv_icmd_create_qp(struct ibv_context *context,
struct verbs_qp *vqp,
struct ibv_qp *qp_in,
struct ibv_qp_init_attr_ex *attr_ex,
struct ibv_command_buffer *link) {
...
execute_ioctl_fallback(context, create_qp, cmdb, &ret)
...
}
enum write_fallback _execute_ioctl_fallback(struct ibv_context *ctx,
unsigned int cmd_bit,
struct ibv_command_buffer *cmdb,
int *ret);
#define execute_ioctl_fallback(ctx, cmd_name, cmdb, ret) \
_execute_ioctl_fallback(ctx, _CMD_BIT(cmd_name), cmdb, ret)
#define _CMD_BIT(cmd_name) \
(offsetof(struct verbs_context_ops, cmd_name) / sizeof(void *))_execute_ioctl_fallback
- 检查当前设备是否可以使用ioctl方式进行操作,是的话直接ioctl传给内核响应命令,否则fallback回调用函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// rdma-core/libibverbs/cmd_fallback.c
/*
* Used to support callers that have a fallback to the old write ABI
* interface.
*/
enum write_fallback _execute_ioctl_fallback(struct ibv_context *ctx,
unsigned int cmd_bit,
struct ibv_command_buffer *cmdb,
int *ret)
{
struct verbs_ex_private *priv = get_priv(ctx);
if (bitmap_test_bit(priv->unsupported_ioctls, cmd_bit))
return _check_legacy(cmdb, ret);
*ret = execute_ioctl(ctx, cmdb);
...
}kernel: rxe_create_qp
在内核新建一个QP:根据QPN号hash后得到对应的src port,用于填充要发送的UDP报文
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// linux-kernel/rust-kernel/drivers/infiniband/sw/rxe/rxe_verbs.c
static int rxe_create_qp(struct ib_qp *ibqp, struct ib_qp_init_attr *init,
struct ib_udata *udata)
{
int err;
struct rxe_dev *rxe = to_rdev(ibqp->device);
struct rxe_pd *pd = to_rpd(ibqp->pd);
struct rxe_qp *qp = to_rqp(ibqp);
struct rxe_create_qp_resp __user *uresp = NULL;
if (udata) {
if (udata->outlen < sizeof(*uresp))
return -EINVAL;
uresp = udata->outbuf;
}
err = rxe_qp_chk_init(rxe, init);
...
qp->is_user = true;
...
err = rxe_add_to_pool(&rxe->qp_pool, qp);
...
err = rxe_qp_from_init(rxe, qp, pd, init, uresp, ibqp->pd, udata);
...
return 0;
}rxe_qp_init_req
- 在 rxe_queue_init 函数中申请一段连续的虚拟地址空间,用作SQ buffer
- 将SQ buffer的地址+大小设置为QP中RQ的mmap info
- 根据QPN号hash后得到对应的src port,用于填充UDP报文
rxe_qp_init_resp
- 在 rxe_queue_init 函数中申请一段连续的虚拟地址空间,用作RQ buffer
- 将RQ buffer的地址+大小设置为QP中RQ的mmap info
kernel: rxe_create_mmap_info
- 将QP中记录的SQ/RQ的mmap info,传回userspace,userspace根据内核指定的offset+size做mmap之后,使得kernel与userspace可以共享QP内存空间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// linux-kernel/rust-kernel/drivers/infiniband/sw/rxe/rxe_mmap.c
/*
* Allocate information for rxe_mmap
*/
struct rxe_mmap_info *rxe_create_mmap_info(struct rxe_dev *rxe, u32 size,
struct ib_udata *udata, void *obj)
{
struct rxe_mmap_info *ip;
ip = kmalloc(sizeof(*ip), GFP_KERNEL);
...
ip->info.offset = rxe->mmap_offset;
ip->info.size = size;
ip->context =
container_of(udata, struct uverbs_attr_bundle, driver_udata)
->context;
...
ip->obj = obj;
kref_init(&ip->ref);
return ip;
}最后
ip->info
会被传给userspace
,作为rxe_create_qp_resp
map_queue_pair
map_queue_pair
中第一个参数是ibpd->context->cmd_fd
, 字符设备的文件描述符。- mmap 的offset + size 是由kernel传到的用户空间的resp设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36// in rdma-core/providers/rxe/rxe.c, call function
ret = map_queue_pair(ibpd->context->cmd_fd, qp, attr,
&resp.drv_payload);
static int map_queue_pair(int cmd_fd, struct rxe_qp *qp,
struct ibv_qp_init_attr *attr,
struct rxe_create_qp_resp *resp)
{
if (attr->srq) {
qp->rq.max_sge = 0;
qp->rq.queue = NULL;
qp->rq_mmap_info.size = 0;
} else {
qp->rq.max_sge = attr->cap.max_recv_sge;
qp->rq.queue = mmap(NULL, resp->rq_mi.size, PROT_READ | PROT_WRITE,
MAP_SHARED,
cmd_fd, resp->rq_mi.offset);
if ((void *)qp->rq.queue == MAP_FAILED)
return errno;
qp->rq_mmap_info = resp->rq_mi;
pthread_spin_init(&qp->rq.lock, PTHREAD_PROCESS_PRIVATE);
}
qp->sq.max_sge = attr->cap.max_send_sge;
qp->sq.max_inline = attr->cap.max_inline_data;
qp->sq.queue = mmap(NULL, resp->sq_mi.size, PROT_READ | PROT_WRITE,
MAP_SHARED,
cmd_fd, resp->sq_mi.offset);
...
qp->sq_mmap_info = resp->sq_mi;
pthread_spin_init(&qp->sq.lock, PTHREAD_PROCESS_PRIVATE);
return 0;
}总结ibv_create_qp
- libRXE部分通过ioctl/write的方式请求kernel
- kernel创建SQ/RQ两端缓冲区,将地址+大小传回用户态
- libRXE根据传回信息,映射相同的地址到用户态,使得内核与用户态共享Queue Pair的地址空间。
3. ibv_post_send
Verbs
- RDMAmojo: ibv_post_send
- 将一组链式的ibv_send_wr发送到QP中的Send Queue中
3.1 RXE调用栈分析
1 |
|
rxe_post_send
- 将wr_list链表中的work request依次放到QP send queue buffer中
- 如果有失败的,则将bad_wr指向第一个失败的work request,bad_wr会返回给应用程序
- 所有WR放到Send Queue之后,敲一下door bell通知kernel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// rdma-core/providers/rxe/rxe.c
static int rxe_post_send(struct ibv_qp *ibqp,
struct ibv_send_wr *wr_list,
struct ibv_send_wr **bad_wr)
{
int rc = 0;
int err;
struct rxe_qp *qp = to_rqp(ibqp);
struct rxe_wq *sq = &qp->sq;
if (!bad_wr)
return EINVAL;
*bad_wr = NULL;
...
pthread_spin_lock(&sq->lock);
while (wr_list) {
rc = post_one_send(qp, sq, wr_list);
if (rc) {
*bad_wr = wr_list;
break;
}
wr_list = wr_list->next;
}
pthread_spin_unlock(&sq->lock);
err = post_send_db(ibqp);
return err ? err : rc;
}post_one_send
- 对于RXE而言,libRXE中的
post_one_send
作为生产者,向Send Queue中放置Work request。 - 读当前Send queue空闲区域起始地址需要原子操作(避免使用锁)
- 更新
producer_index
指针需要原子操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// rdma-core/providers/rxe/rxe.c
static int post_one_send(struct rxe_qp *qp, struct rxe_wq *sq,
struct ibv_send_wr *ibwr)
{
int err;
struct rxe_send_wqe *wqe;
unsigned int length = 0;
int i;
for (i = 0; i < ibwr->num_sge; i++)
length += ibwr->sg_list[i].length;
err = validate_send_wr(qp, ibwr, length);
if (err) {
verbs_err(verbs_get_ctx(qp->vqp.qp.context),
"validate send failed\n");
return err;
}
wqe = (struct rxe_send_wqe *)producer_addr(sq->queue);
err = init_send_wqe(qp, sq, ibwr, length, wqe);
if (err)
return err;
if (queue_full(sq->queue))
return -ENOMEM;
advance_producer(sq->queue);
return 0;
}- 对于RXE而言,libRXE中的
init_send_wqe
- 这个函数比较容易理解,就是将一个WR中所有
sg_list
拷贝到wqe buffer中,kernel再去根据SGList读内存。(RXE这里没有DMA操作) - 如果设置了 IBV_SEND_INLINE,则在这里直接拷贝数据本身到wqe buffer中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48// rdma-core/providers/rxe/rxe.c
static int init_send_wqe(struct rxe_qp *qp, struct rxe_wq *sq,
struct ibv_send_wr *ibwr, unsigned int length,
struct rxe_send_wqe *wqe)
{
int num_sge = ibwr->num_sge;
int i;
unsigned int opcode = ibwr->opcode;
convert_send_wr(qp, &wqe->wr, ibwr);
if (qp_type(qp) == IBV_QPT_UD) {
struct rxe_ah *ah = to_rah(ibwr->wr.ud.ah);
if (!ah->ah_num)
/* old kernels only */
memcpy(&wqe->wr.wr.ud.av, &ah->av, sizeof(struct rxe_av));
}
if (ibwr->send_flags & IBV_SEND_INLINE) {
uint8_t *inline_data = wqe->dma.inline_data;
for (i = 0; i < num_sge; i++) {
memcpy(inline_data,
(uint8_t *)(long)ibwr->sg_list[i].addr,
ibwr->sg_list[i].length);
inline_data += ibwr->sg_list[i].length;
}
} else
memcpy(wqe->dma.sge, ibwr->sg_list,
num_sge*sizeof(struct ibv_sge));
if ((opcode == IBV_WR_ATOMIC_CMP_AND_SWP)
|| (opcode == IBV_WR_ATOMIC_FETCH_AND_ADD))
wqe->iova = ibwr->wr.atomic.remote_addr;
else
wqe->iova = ibwr->wr.rdma.remote_addr;
wqe->dma.length = length;
wqe->dma.resid = length;
wqe->dma.num_sge = num_sge;
wqe->dma.cur_sge = 0;
wqe->dma.sge_offset = 0;
wqe->state = 0;
wqe->ssn = qp->ssn++;
return 0;
}- 这个函数比较容易理解,就是将一个WR中所有
post_send_db
- 敲doorbell 在RXE这里变为使用write的方式通知kernel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// rdma-core/providers/rxe/rxe.c
/* send a null post send as a doorbell */
static int post_send_db(struct ibv_qp *ibqp)
{
struct ibv_post_send cmd;
struct ib_uverbs_post_send_resp resp;
cmd.hdr.command = IB_USER_VERBS_CMD_POST_SEND;
cmd.hdr.in_words = sizeof(cmd) / 4;
cmd.hdr.out_words = sizeof(resp) / 4;
cmd.response = (uintptr_t)&resp;
cmd.qp_handle = ibqp->handle;
cmd.wr_count = 0;
cmd.sge_count = 0;
cmd.wqe_size = sizeof(struct ibv_send_wr);
if (write(ibqp->context->cmd_fd, &cmd, sizeof(cmd)) != sizeof(cmd))
return errno;
return 0;
}kernel: rxe_post_send
- libRXE将传递给内核的命令设置为 IB_USER_VERBS_CMD_POST_SEND
ib_uverbs_post_send
->ib_device->post_send
->rxe_post_send
->rxe_requester
->ip_local_out
- kernel在初始化Send Queue的时候,将 qp->req.task 回调函数注册为
rxe_requester
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// kernel/drivers/infiniband/sw/rxe/rxe_verbs.c
static int rxe_post_send(struct ib_qp *ibqp, const struct ib_send_wr *wr,
const struct ib_send_wr **bad_wr)
{
struct rxe_qp *qp = to_rqp(ibqp);
...
if (qp->is_user) {
/* Utilize process context to do protocol processing */
rxe_run_task(&qp->req.task, 0);
return 0;
...
}
// kernel/drivers/infiniband/sw/rxe/rxe_qp.c
// in func rxe_qp_init_req
rxe_init_task(rxe, &qp->req.task, qp,
rxe_requester, "req");
rxe_init_task(rxe, &qp->comp.task, qp,
rxe_completer, "comp");kernel: rxe_requester
- rxe_requester从rxe_qp队列取出wqe,生成对应的skb_buff,然后下发给对应的rxe_dev设备后发送IP包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54// kernel/drivers/infiniband/sw/rxe/rxe_req.c
int rxe_requester(void *arg)
{
struct rxe_qp *qp = (struct rxe_qp *)arg;
struct rxe_pkt_info pkt;
struct sk_buff *skb;
struct rxe_send_wqe *wqe;
...
wqe = req_next_wqe(qp);
opcode = next_opcode(qp, wqe, wqe->wr.opcode);
mask = rxe_opcode[opcode].mask;
// read payload from mem or inline
...
// set ib packet
pkt.rxe = rxe;
pkt.opcode = opcode;
pkt.qp = qp;
pkt.psn = qp->req.psn;
pkt.mask = rxe_opcode[opcode].mask;
pkt.wqe = wqe;
av = rxe_get_av(&pkt, &ah);
...
// set sk buffer
skb = init_req_packet(qp, wqe, opcode, payload, &pkt);
...
ret = finish_packet(qp, av, wqe, &pkt, skb, payload);
ret = rxe_xmit_packet(qp, &pkt, skb);
...
}
int rxe_xmit_packet(struct rxe_qp *qp, struct rxe_pkt_info *pkt,
struct sk_buff *skb)
{
int err;
int is_request = pkt->mask & RXE_REQ_MASK;
struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
...
rxe_icrc_generate(skb, pkt);
if (pkt->mask & RXE_LOOPBACK_MASK)
err = rxe_loopback(skb, pkt);
else
err = rxe_send(skb, pkt);
...
if ((qp_type(qp) != IB_QPT_RC) &&
(pkt->mask & RXE_END_MASK)) {
pkt->wqe->state = wqe_state_done;
rxe_run_task(&qp->comp.task, 1);
}
rxe_counter_inc(rxe, RXE_CNT_SENT_PKTS);
...
}
4.2 MLX5调用栈分析
- 将Work Request链表写入mmap到用户态地址空间的QP send queue buffer中
- 这里通过MMIO的方式访问位于PCI上的内存区间
- 全部Work Request写完后,敲一下硬件的doorbell
将一个ctrl结构体的地址通过MMIO的方式写入硬件的doorbell寄存器
// rdma-core/providers/mlx5/qp.c
mmio_write64_be(bf->reg + bf->offset, *(__be64 *)ctrl);
4. ibv_post_recv
4.1 RXE调用栈分析
1 |
|
rxe_post_recv
- 和post_send大同小异,向Recv Queue中写入recv work request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// rdma-core/providers/rxe/rxe.c
static int rxe_post_recv(struct ibv_qp *ibqp,
struct ibv_recv_wr *recv_wr,
struct ibv_recv_wr **bad_wr)
{
int rc = 0;
struct rxe_qp *qp = to_rqp(ibqp);
struct rxe_wq *rq = &qp->rq;
...
pthread_spin_lock(&rq->lock);
while (recv_wr) {
rc = rxe_post_one_recv(rq, recv_wr);
if (rc) {
*bad_wr = recv_wr;
break;
}
recv_wr = recv_wr->next;
}
pthread_spin_unlock(&rq->lock);
return rc;
}rxe_post_one_recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31// rdma-core/providers/rxe/rxe.c
static int rxe_post_one_recv(struct rxe_wq *rq, struct ibv_recv_wr *recv_wr)
{
int i;
struct rxe_recv_wqe *wqe;
struct rxe_queue_buf *q = rq->queue;
int length = 0;
int rc = 0;
...
wqe = (struct rxe_recv_wqe *)producer_addr(q);
wqe->wr_id = recv_wr->wr_id;
wqe->num_sge = recv_wr->num_sge;
memcpy(wqe->dma.sge, recv_wr->sg_list,
wqe->num_sge*sizeof(*wqe->dma.sge));
for (i = 0; i < wqe->num_sge; i++)
length += wqe->dma.sge[i].length;
wqe->dma.length = length;
wqe->dma.resid = length;
wqe->dma.cur_sge = 0;
wqe->dma.num_sge = wqe->num_sge;
wqe->dma.sge_offset = 0;
advance_producer(q);
out:
return rc;
}
5. ibv_poll_cq
5.1 RXE调用栈分析
1 |
|
rxe_poll_cq
- 从CQ buffer中读取多个Work Completions,写入到应用程序传递下来的 wc buffer中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// rdma-core/providers/rxe/rxe.c
static int rxe_poll_cq(struct ibv_cq *ibcq, int ne, struct ibv_wc *wc)
{
struct rxe_cq *cq = to_rcq(ibcq);
struct rxe_queue_buf *q;
int npolled;
uint8_t *src;
pthread_spin_lock(&cq->lock);
q = cq->queue;
for (npolled = 0; npolled < ne; ++npolled, ++wc) {
if (queue_empty(q))
break;
src = consumer_addr(q);
memcpy(wc, src, sizeof(*wc));
advance_consumer(q);
}
pthread_spin_unlock(&cq->lock);
return npolled;
}
6. RXE对于收包的处理
UDP隧道建立时设置回调函数
- rdma_rxe module在初始化的时候,会创建一个UDP tunnel,其回调函数被设置为rxe_udp_encap_rec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// kernel/drivers/infiniband/sw/rxe/rxe_net.c
static struct socket *rxe_setup_udp_tunnel(struct net *net, __be16 port,
bool ipv6)
{
...
udp_cfg.local_udp_port = port;
/* Create UDP socket */
err = udp_sock_create(net, &udp_cfg, &sock);
...
tnl_cfg.encap_type = 1;
tnl_cfg.encap_rcv = rxe_udp_encap_recv;
/* Setup UDP tunnel */
setup_udp_tunnel_sock(net, sock, &tnl_cfg);
return sock;
}rxe_udp_encap_recv → rxe_rcv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33// kernel/drivers/infiniband/sw/rxe/rxe_recv.c
/* rxe_rcv is called from the interface driver */
void rxe_rcv(struct sk_buff *skb)
{
int err;
struct rxe_pkt_info *pkt = SKB_TO_PKT(skb);
struct rxe_dev *rxe = pkt->rxe;
...
pkt->opcode = bth_opcode(pkt);
pkt->psn = bth_psn(pkt);
pkt->qp = NULL;
pkt->mask |= rxe_opcode[pkt->opcode].mask;
...
err = rxe_icrc_check(skb, pkt);
...
rxe_counter_inc(rxe, RXE_CNT_RCVD_PKTS);
if (unlikely(bth_qpn(pkt) == IB_MULTICAST_QPN))
rxe_rcv_mcast_pkt(rxe, skb);
else
rxe_rcv_pkt(pkt, skb);
return;
...
}
static inline void rxe_rcv_pkt(struct rxe_pkt_info *pkt, struct sk_buff *skb)
{
if (pkt->mask & RXE_REQ_MASK)
rxe_resp_queue_pkt(pkt->qp, skb);
else
rxe_comp_queue_pkt(pkt->qp, skb);
}rxe_responder
如果收到的包是RDMA请求的话(RDMA_READ/RDMA_WRITE/RDMA_SEND_REQ)
如果收到包的状态是RESPST_COMPLETE的话,说明一次RDMA通信完成,RXE kernel模块会在CQ中写入一个WR
1
2
3
4// kernel/drivers/infiniband/sw/rxe/rxe_resp.c
static enum resp_states do_complete(struct rxe_qp *qp,
struct rxe_pkt_info *pkt)
{...}