RDMA Stack实现分析

希望通过这篇文章来梳理

  1. libibverbs提供的用户态Verbs API是如何进行设备操作/数据通信的,以Soft-RXE实现为例。
  2. 基于kernel: 5.19-rc1
  3. 基于rdma-core: v34.0

RDMA Stack的内核支持

  1. 参考:https://www.kernel.org/doc/html/latest/translations/zh_CN/infiniband/user_verbs.html

  2. RDMA Stack分为控制面和数据面,图1中给出了Mellanox MLX5 RDMA stack 涉及到的用户态库以及内核模块。图二中给出了一个控制面API在用户态libibverbs与内核之间的调用关系。

    1. 控制面:
      1. ib_uverbs.ko 内核模块会对每个IB设备生成一个字符设备文件。通常的形式为 /dev/infiniband/uverbs%d ,用户态程序通过ioctl对字符设备进行属性的读写。
      2. libibverbs 中的控制面相关Verbs API利用了ib_uverbs.ko 暴露出来的字符设备对RDMA设备操作。
    2. 数据面
      1. 通常是通过直接写入mmap()到用户空间的硬件寄存器来完成 的,没有系统调用或上下文切换到内核。

    Mellanox OFED stack arch.
    图1: Mellanox OFED stack arch.)


    图2: libibverbs中的控制面API与内核的调用关系: 以ibv_create_qp为例

1. ibv_open_device

用户态打开IB设备的API

  1. 参考:RDMAmojo: ibv_open_device

1.1 RXE调用栈分析

1
2
3
4
5
6
7
8
ibv_open_device
|-- verbs_open_device
|-- open_cdev
|-- rxe_alloc_context
|-- verbs_init_and_alloc_context
|-- verbs_init_context
|-- ibv_cmd_get_context
|-- verbs_set_ops
  1. open_cdev

    会打开uverbs字符设备,文件描述符保存在cmd_fd中

    1
    2
    3
    4
    5
    6
    7
    // rdma-core/libibverbs/device.c
    cmd_fd = open_cdev(verbs_device->sysfs->sysfs_name,
    verbs_device->sysfs->sysfs_cdev);

    // rdma-core/util/open_cdev.c
    // in open_cdev funcion
    fd = open("/dev/infiniband/uverbs%d", O_RDWR | O_CLOEXEC)
  2. rxe_alloc_context

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // rdma-core/libibverbs/device.c
    /*
    * cmd_fd ownership is transferred into alloc_context, if it fails
    * then it closes cmd_fd and returns NULL
    */
    context_ex = verbs_device->ops->alloc_context(device, cmd_fd, private_data);

    // rdma-core/providers/rxe/rxe.c
    static struct verbs_context *rxe_alloc_context(struct ibv_device *ibdev,
    int cmd_fd,
    void *private_data)
    {
    struct rxe_context *context;
    struct ibv_get_context cmd;
    struct ib_uverbs_get_context_resp resp;

    context = verbs_init_and_alloc_context(ibdev, cmd_fd, context, ibv_ctx,
    RDMA_DRIVER_RXE);
    ...
    ibv_cmd_get_context(&context->ibv_ctx, &cmd, sizeof(cmd),
    &resp, sizeof(resp))
    ...

    verbs_set_ops(&context->ibv_ctx, &rxe_ctx_ops);
    ...
    }

  3. verbs_init_and_alloc_context

    1. &((typeof(context))((void *)0))->ibv_ctx, 类似于offset_of 宏,计算ibv_ctxrxe_context 结构体内偏移
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // rdma-core/providers/rxe/rxe.h
    struct rxe_context {
    struct verbs_context ibv_ctx;
    };

    // rdma-core/providers/rxe/rxe.c
    // expand macro
    context = ((typeof(context))_verbs_init_and_alloc_context(
    ibdev,
    cmd_fd,
    sizeof(*context),
    &((typeof(context))((void *)0))->ibv_ctx,
    (RDMA_DRIVER_RXE)))

    // rdma-core/libibverbs/device.c
    /*
    * Allocate and initialize a context structure. This is called to create the
    * driver wrapper, and context_offset is the number of bytes into the wrapper
    * structure where the verbs_context starts.
    */
    void *_verbs_init_and_alloc_context(struct ibv_device *device, int cmd_fd,
    size_t alloc_size,
    struct verbs_context *context_offset,
    uint32_t driver_id)
    {
    void *drv_context;
    struct verbs_context *context;

    drv_context = calloc(1, alloc_size);
    ...
    context = drv_context + (uintptr_t)context_offset;
    verbs_init_context(context, device, cmd_fd, driver_id)
    ...
    return drv_context;
    }
  4. verbs_init_context

    1. context->cmd_fd = cmd_fd; 记录uverbs设备文件的描述符
    2. verbs_dummy_ops初始化 ibv_context->ibv_context_ops 回调函数结构体
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    // rdma-core/libibverbs/device.c
    int verbs_init_context(struct verbs_context *context_ex,
    struct ibv_device *device, int cmd_fd,
    uint32_t driver_id)
    {
    struct ibv_context *context = &context_ex->context;

    ibverbs_device_hold(device);

    context->device = device;
    context->cmd_fd = cmd_fd;
    context->async_fd = -1;
    pthread_mutex_init(&context->mutex, NULL);

    context_ex->context.abi_compat = __VERBS_ABI_IS_EXTENDED;
    context_ex->sz = sizeof(*context_ex);

    context_ex->priv = calloc(1, sizeof(*context_ex->priv));
    ...
    context_ex->priv->driver_id = driver_id;
    verbs_set_ops(context_ex, &verbs_dummy_ops);
    context_ex->priv->use_ioctl_write = has_ioctl_write(context);

    return 0;
    }
  5. ibv_cmd_get_context

    1. ioctl 传递给kernel的参数都会封装在ibv_command_buffer 结构体中,这里声明arg buffer
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // in rdma-core/providers/rxe/rxe.c, call function
    ibv_cmd_get_context(&context->ibv_ctx, &cmd, sizeof(cmd),
    &resp, sizeof(resp));

    // rdma-core/libibverbs/cmd_device.c
    int ibv_cmd_get_context(struct verbs_context *context_ex,
    struct ibv_get_context *cmd, size_t cmd_size,
    struct ib_uverbs_get_context_resp *resp,
    size_t resp_size)
    {
    // DECLARE_CMD_BUFFER_COMPAT(cmdb, UVERBS_OBJECT_DEVICE,
    // UVERBS_METHOD_GET_CONTEXT, cmd, cmd_size,
    // resp, resp_size);
    // expands macro
    struct ibv_command_buffer cmdb[((sizeof(struct ibv_command_buffer) +
    sizeof(struct ib_uverbs_attr) * (__cmdbtotal) +
    sizeof(struct ibv_command_buffer) - 1) /
    sizeof(struct ibv_command_buffer))];
    return cmd_get_context(context_ex, cmdb);
    }

  6. verbs_set_ops

    1. verbs_context 结构体中的verbs_context 中的回调函数,设置为rxe声明的回调函数(默认为dummy_ops)
    2. 这里不太清楚为什么verbs_context内部的verbs_ex_private 结构体又保存了一份回调函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // in rdma-core/providers/rxe/rxe.c, call function
    verbs_set_ops(&context->ibv_ctx, &rxe_ctx_ops);

    // rdma-core/libibverbs/dummy_ops.c
    void verbs_set_ops(struct verbs_context *vctx,
    const struct verbs_context_ops *ops)
    {
    struct verbs_ex_private *priv = vctx->priv;
    struct ibv_context_ops *ctx = &vctx->context.ops;
    ...
    // expand macro
    if (ops->alloc_dm) {
    priv->ops.alloc_dm = ops->alloc_dm;
    (vctx)->alloc_dm = ops->alloc_dm;
    }
    ...

  7. 总结ibv_open_device

    1. 分配context结构体,打开uverbs字符设备
    2. 设置用于传递ioctl参数的 arg buffer
    3. 设置回调函数,这些回调函数会被ibv_xxx Verbs调用

2. ibv_create_qp

Verbs

  1. RDMAmojo: ibv_create_qp

2.1 RXE调用栈分析

1
2
3
4
5
6
7
8
ibv_create_qp
|-- rxe_create_qp
|-- ibv_cmd_create_qp
|-- ibv_icmd_create_qp
|-- execute_ioctl_fallback
|-- execute_ioctl
|-- kernel: rxe_create_qp
|-- map_queue_pair
  1. ibv_create_qp

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // rdma-core/libibverbs/verbs.c
    LATEST_SYMVER_FUNC(ibv_create_qp, 1_1, "IBVERBS_1.1",
    struct ibv_qp *, // return type
    struct ibv_pd *pd, // arg1
    struct ibv_qp_init_attr *qp_init_attr) // arg2
    {
    struct ibv_qp *qp = get_ops(pd->context)->create_qp(pd, qp_init_attr);

    return qp;
    }
  2. rxe_create_qp

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // rdma-core/providers/rxe/rxe.c
    static struct ibv_qp *rxe_create_qp(struct ibv_pd *ibpd,
    struct ibv_qp_init_attr *attr)
    {
    struct ibv_create_qp cmd = {};
    struct urxe_create_qp_resp resp = {};
    struct rxe_qp *qp;
    int ret;

    ...
    ret = ibv_cmd_create_qp(ibpd, &qp->vqp.qp, attr, &cmd, sizeof(cmd),
    &resp.ibv_resp, sizeof(resp));
    ...
    ret = map_queue_pair(ibpd->context->cmd_fd, qp, attr,
    &resp.drv_payload);

    ...
    }
    1. 其中 ibv_create_qp/urxe_create_qp_resp 这两个结构体由宏声明,分别将二者展开后
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      // rdma-core/cmake-build-debug/include/infiniband/kern-abi.h 
      DECLARE_CMD(IB_USER_VERBS_CMD_CREATE_CQ, ibv_create_cq, ib_uverbs_create_cq);

      // expands macro
      struct ibv_create_qp {
      struct ib_uverbs_cmd_hdr hdr;
      union {
      _STRUCT_ib_uverbs_create_qp;
      struct ib_uverbs_create_qp core_payload;
      };
      };
      typedef struct ibv_create_qp _ABI_REQ_STRUCT_IB_USER_VERBS_CMD_CREATE_QP;
      typedef struct ib_uverbs_create_qp _KABI_REQ_STRUCT_IB_USER_VERBS_CMD_CREATE_QP;
      typedef struct ib_uverbs_create_qp_resp
      _KABI_RESP_STRUCT_IB_USER_VERBS_CMD_CREATE_QP;
      enum { _ABI_ALIGN_IB_USER_VERBS_CMD_CREATE_QP = 4 };
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23

      // rdma-core/providers/rxe/rxe-abi.h
      DECLARE_DRV_CMD(urxe_create_qp, IB_USER_VERBS_CMD_CREATE_QP,
      empty, rxe_create_qp_resp);

      // expands macro
      struct urxe_create_qp {
      _ABI_REQ_STRUCT_IB_USER_VERBS_CMD_CREATE_QP ibv_cmd;
      union {
      struct {};
      struct empty drv_payload;
      };
      };
      struct urxe_create_qp_resp {
      _KABI_RESP_STRUCT_IB_USER_VERBS_CMD_CREATE_QP ibv_resp;
      union {
      struct {
      struct mminfo rq_mi;
      struct mminfo sq_mi;
      };
      struct rxe_create_qp_resp drv_payload;
      };
      };
  3. ibv_cmd_create_qp

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // rdma-core/libibverbs/cmd_qp.c
    int ibv_cmd_create_qp(struct ibv_pd *pd,
    struct ibv_qp *qp, struct ibv_qp_init_attr *attr,
    struct ibv_create_qp *cmd, size_t cmd_size,
    struct ib_uverbs_create_qp_resp *resp, size_t resp_size)
    {
    DECLARE_CMD_BUFFER_COMPAT(cmdb, UVERBS_OBJECT_QP,
    UVERBS_METHOD_QP_CREATE, cmd, cmd_size, resp,
    resp_size);

    struct ibv_qp_init_attr_ex attr_ex = {};
    int ret;

    attr_ex.qp_context = attr->qp_context;
    attr_ex.send_cq = attr->send_cq;
    attr_ex.recv_cq = attr->recv_cq;
    attr_ex.srq = attr->srq;
    attr_ex.cap = attr->cap;
    attr_ex.qp_type = attr->qp_type;
    attr_ex.sq_sig_all = attr->sq_sig_all;
    attr_ex.comp_mask = IBV_QP_INIT_ATTR_PD;
    attr_ex.pd = pd;
    ret = ibv_icmd_create_qp(pd->context, NULL, qp, &attr_ex, cmdb);
    if (!ret)
    memcpy(&attr->cap, &attr_ex.cap, sizeof(attr_ex.cap));

    return ret;
    }
  4. ibv_icmd_create_qp

    1. libibverbs使用了两套与内核通信的方式:传统的write方式,以及ioctl方式。
    2. 如果当前的ib device context不支持ioctl方式与内核暴露的字符设备通信的话,则需要fallback回write方式
      1. 可以参考之前版本的libibverbs对应实现:https://github.com/hustcat/rdma-core/blob/v15/libibverbs/cmd.c#L1063
    3. _CMD_BIT(cmd_name) 宏作用:把verbs_context_ops看成bitmap,每个回调函数占一个bit,计算传入的cmd_name占哪个bit
      1. sizeof(void*) 由编译时target platform决定,amd64下值等于8
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // rdma-core/libibverbs/cmd_qp.c
    static int ibv_icmd_create_qp(struct ibv_context *context,
    struct verbs_qp *vqp,
    struct ibv_qp *qp_in,
    struct ibv_qp_init_attr_ex *attr_ex,
    struct ibv_command_buffer *link) {
    ...
    execute_ioctl_fallback(context, create_qp, cmdb, &ret)
    ...
    }

    enum write_fallback _execute_ioctl_fallback(struct ibv_context *ctx,
    unsigned int cmd_bit,
    struct ibv_command_buffer *cmdb,
    int *ret);

    #define execute_ioctl_fallback(ctx, cmd_name, cmdb, ret) \
    _execute_ioctl_fallback(ctx, _CMD_BIT(cmd_name), cmdb, ret)

    #define _CMD_BIT(cmd_name) \
    (offsetof(struct verbs_context_ops, cmd_name) / sizeof(void *))

  5. _execute_ioctl_fallback

    1. 检查当前设备是否可以使用ioctl方式进行操作,是的话直接ioctl传给内核响应命令,否则fallback回调用函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // rdma-core/libibverbs/cmd_fallback.c
    /*
    * Used to support callers that have a fallback to the old write ABI
    * interface.
    */
    enum write_fallback _execute_ioctl_fallback(struct ibv_context *ctx,
    unsigned int cmd_bit,
    struct ibv_command_buffer *cmdb,
    int *ret)
    {
    struct verbs_ex_private *priv = get_priv(ctx);

    if (bitmap_test_bit(priv->unsupported_ioctls, cmd_bit))
    return _check_legacy(cmdb, ret);

    *ret = execute_ioctl(ctx, cmdb);
    ...
    }
  6. kernel: rxe_create_qp

    1. 在内核新建一个QP:根据QPN号hash后得到对应的src port,用于填充要发送的UDP报文

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      // linux-kernel/rust-kernel/drivers/infiniband/sw/rxe/rxe_verbs.c
      static int rxe_create_qp(struct ib_qp *ibqp, struct ib_qp_init_attr *init,
      struct ib_udata *udata)
      {
      int err;
      struct rxe_dev *rxe = to_rdev(ibqp->device);
      struct rxe_pd *pd = to_rpd(ibqp->pd);
      struct rxe_qp *qp = to_rqp(ibqp);
      struct rxe_create_qp_resp __user *uresp = NULL;
      if (udata) {
      if (udata->outlen < sizeof(*uresp))
      return -EINVAL;
      uresp = udata->outbuf;
      }
      err = rxe_qp_chk_init(rxe, init);
      ...
      qp->is_user = true;
      ...
      err = rxe_add_to_pool(&rxe->qp_pool, qp);
      ...
      err = rxe_qp_from_init(rxe, qp, pd, init, uresp, ibqp->pd, udata);
      ...
      return 0;
      }
    2. rxe_qp_init_req

      • 在 rxe_queue_init 函数中申请一段连续的虚拟地址空间,用作SQ buffer
      • 将SQ buffer的地址+大小设置为QP中RQ的mmap info
      • 根据QPN号hash后得到对应的src port,用于填充UDP报文
    3. rxe_qp_init_resp

      • 在 rxe_queue_init 函数中申请一段连续的虚拟地址空间,用作RQ buffer
      • 将RQ buffer的地址+大小设置为QP中RQ的mmap info
  7. kernel: rxe_create_mmap_info

    1. 将QP中记录的SQ/RQ的mmap info,传回userspace,userspace根据内核指定的offset+size做mmap之后,使得kernel与userspace可以共享QP内存空间。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // linux-kernel/rust-kernel/drivers/infiniband/sw/rxe/rxe_mmap.c
    /*
    * Allocate information for rxe_mmap
    */
    struct rxe_mmap_info *rxe_create_mmap_info(struct rxe_dev *rxe, u32 size,
    struct ib_udata *udata, void *obj)
    {
    struct rxe_mmap_info *ip;
    ip = kmalloc(sizeof(*ip), GFP_KERNEL);
    ...
    ip->info.offset = rxe->mmap_offset;
    ip->info.size = size;
    ip->context =
    container_of(udata, struct uverbs_attr_bundle, driver_udata)
    ->context;
    ...
    ip->obj = obj;
    kref_init(&ip->ref);
    return ip;
    }

    最后ip->info 会被传给userspace,作为rxe_create_qp_resp

  8. map_queue_pair

    1. map_queue_pair 中第一个参数是ibpd->context->cmd_fd, 字符设备的文件描述符。
    2. mmap 的offset + size 是由kernel传到的用户空间的resp设置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    // in rdma-core/providers/rxe/rxe.c, call function
    ret = map_queue_pair(ibpd->context->cmd_fd, qp, attr,
    &resp.drv_payload);

    static int map_queue_pair(int cmd_fd, struct rxe_qp *qp,
    struct ibv_qp_init_attr *attr,
    struct rxe_create_qp_resp *resp)
    {
    if (attr->srq) {
    qp->rq.max_sge = 0;
    qp->rq.queue = NULL;
    qp->rq_mmap_info.size = 0;
    } else {
    qp->rq.max_sge = attr->cap.max_recv_sge;
    qp->rq.queue = mmap(NULL, resp->rq_mi.size, PROT_READ | PROT_WRITE,
    MAP_SHARED,
    cmd_fd, resp->rq_mi.offset);
    if ((void *)qp->rq.queue == MAP_FAILED)
    return errno;

    qp->rq_mmap_info = resp->rq_mi;
    pthread_spin_init(&qp->rq.lock, PTHREAD_PROCESS_PRIVATE);
    }

    qp->sq.max_sge = attr->cap.max_send_sge;
    qp->sq.max_inline = attr->cap.max_inline_data;
    qp->sq.queue = mmap(NULL, resp->sq_mi.size, PROT_READ | PROT_WRITE,
    MAP_SHARED,
    cmd_fd, resp->sq_mi.offset);
    ...
    qp->sq_mmap_info = resp->sq_mi;
    pthread_spin_init(&qp->sq.lock, PTHREAD_PROCESS_PRIVATE);

    return 0;
    }

  9. 总结ibv_create_qp

    1. libRXE部分通过ioctl/write的方式请求kernel
    2. kernel创建SQ/RQ两端缓冲区,将地址+大小传回用户态
    3. libRXE根据传回信息,映射相同的地址到用户态,使得内核与用户态共享Queue Pair的地址空间。

3. ibv_post_send

Verbs

  1. RDMAmojo: ibv_post_send
  2. 将一组链式的ibv_send_wr发送到QP中的Send Queue中

3.1 RXE调用栈分析

1
2
3
4
5
6
ibv_post_send
|-- rxe_post_send
|-- post_one_send
|-- post_send_db
|-- rxe_post_send
|-- rxe_requester
  1. rxe_post_send

    • 将wr_list链表中的work request依次放到QP send queue buffer中
    • 如果有失败的,则将bad_wr指向第一个失败的work request,bad_wr会返回给应用程序
    • 所有WR放到Send Queue之后,敲一下door bell通知kernel
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // rdma-core/providers/rxe/rxe.c
    static int rxe_post_send(struct ibv_qp *ibqp,
    struct ibv_send_wr *wr_list,
    struct ibv_send_wr **bad_wr)
    {
    int rc = 0;
    int err;
    struct rxe_qp *qp = to_rqp(ibqp);
    struct rxe_wq *sq = &qp->sq;

    if (!bad_wr)
    return EINVAL;

    *bad_wr = NULL;
    ...
    pthread_spin_lock(&sq->lock);

    while (wr_list) {
    rc = post_one_send(qp, sq, wr_list);
    if (rc) {
    *bad_wr = wr_list;
    break;
    }

    wr_list = wr_list->next;
    }

    pthread_spin_unlock(&sq->lock);

    err = post_send_db(ibqp);
    return err ? err : rc;
    }
  2. post_one_send

    • 对于RXE而言,libRXE中的post_one_send作为生产者,向Send Queue中放置Work request。
    • 读当前Send queue空闲区域起始地址需要原子操作(避免使用锁)
    • 更新 producer_index 指针需要原子操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // rdma-core/providers/rxe/rxe.c
    static int post_one_send(struct rxe_qp *qp, struct rxe_wq *sq,
    struct ibv_send_wr *ibwr)
    {
    int err;
    struct rxe_send_wqe *wqe;
    unsigned int length = 0;
    int i;

    for (i = 0; i < ibwr->num_sge; i++)
    length += ibwr->sg_list[i].length;

    err = validate_send_wr(qp, ibwr, length);
    if (err) {
    verbs_err(verbs_get_ctx(qp->vqp.qp.context),
    "validate send failed\n");
    return err;
    }

    wqe = (struct rxe_send_wqe *)producer_addr(sq->queue);

    err = init_send_wqe(qp, sq, ibwr, length, wqe);
    if (err)
    return err;

    if (queue_full(sq->queue))
    return -ENOMEM;

    advance_producer(sq->queue);

    return 0;
    }
  3. init_send_wqe

    • 这个函数比较容易理解,就是将一个WR中所有sg_list 拷贝到wqe buffer中,kernel再去根据SGList读内存。(RXE这里没有DMA操作)
    • 如果设置了 IBV_SEND_INLINE,则在这里直接拷贝数据本身到wqe buffer中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    // rdma-core/providers/rxe/rxe.c
    static int init_send_wqe(struct rxe_qp *qp, struct rxe_wq *sq,
    struct ibv_send_wr *ibwr, unsigned int length,
    struct rxe_send_wqe *wqe)
    {
    int num_sge = ibwr->num_sge;
    int i;
    unsigned int opcode = ibwr->opcode;

    convert_send_wr(qp, &wqe->wr, ibwr);

    if (qp_type(qp) == IBV_QPT_UD) {
    struct rxe_ah *ah = to_rah(ibwr->wr.ud.ah);

    if (!ah->ah_num)
    /* old kernels only */
    memcpy(&wqe->wr.wr.ud.av, &ah->av, sizeof(struct rxe_av));
    }

    if (ibwr->send_flags & IBV_SEND_INLINE) {
    uint8_t *inline_data = wqe->dma.inline_data;

    for (i = 0; i < num_sge; i++) {
    memcpy(inline_data,
    (uint8_t *)(long)ibwr->sg_list[i].addr,
    ibwr->sg_list[i].length);
    inline_data += ibwr->sg_list[i].length;
    }
    } else
    memcpy(wqe->dma.sge, ibwr->sg_list,
    num_sge*sizeof(struct ibv_sge));

    if ((opcode == IBV_WR_ATOMIC_CMP_AND_SWP)
    || (opcode == IBV_WR_ATOMIC_FETCH_AND_ADD))
    wqe->iova = ibwr->wr.atomic.remote_addr;
    else
    wqe->iova = ibwr->wr.rdma.remote_addr;

    wqe->dma.length = length;
    wqe->dma.resid = length;
    wqe->dma.num_sge = num_sge;
    wqe->dma.cur_sge = 0;
    wqe->dma.sge_offset = 0;
    wqe->state = 0;
    wqe->ssn = qp->ssn++;

    return 0;
    }
  4. post_send_db

    • 敲doorbell 在RXE这里变为使用write的方式通知kernel
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // rdma-core/providers/rxe/rxe.c
    /* send a null post send as a doorbell */
    static int post_send_db(struct ibv_qp *ibqp)
    {
    struct ibv_post_send cmd;
    struct ib_uverbs_post_send_resp resp;

    cmd.hdr.command = IB_USER_VERBS_CMD_POST_SEND;
    cmd.hdr.in_words = sizeof(cmd) / 4;
    cmd.hdr.out_words = sizeof(resp) / 4;
    cmd.response = (uintptr_t)&resp;
    cmd.qp_handle = ibqp->handle;
    cmd.wr_count = 0;
    cmd.sge_count = 0;
    cmd.wqe_size = sizeof(struct ibv_send_wr);

    if (write(ibqp->context->cmd_fd, &cmd, sizeof(cmd)) != sizeof(cmd))
    return errno;

    return 0;
    }
  5. kernel: rxe_post_send

    1. libRXE将传递给内核的命令设置为 IB_USER_VERBS_CMD_POST_SEND
    2. ib_uverbs_post_send -> ib_device->post_send -> rxe_post_send -> rxe_requester -> ip_local_out
    3. kernel在初始化Send Queue的时候,将 qp->req.task 回调函数注册为rxe_requester
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // kernel/drivers/infiniband/sw/rxe/rxe_verbs.c
    static int rxe_post_send(struct ib_qp *ibqp, const struct ib_send_wr *wr,
    const struct ib_send_wr **bad_wr)
    {
    struct rxe_qp *qp = to_rqp(ibqp);
    ...
    if (qp->is_user) {
    /* Utilize process context to do protocol processing */
    rxe_run_task(&qp->req.task, 0);
    return 0;
    ...
    }

    // kernel/drivers/infiniband/sw/rxe/rxe_qp.c
    // in func rxe_qp_init_req
    rxe_init_task(rxe, &qp->req.task, qp,
    rxe_requester, "req");
    rxe_init_task(rxe, &qp->comp.task, qp,
    rxe_completer, "comp");
  6. kernel: rxe_requester

    • rxe_requester从rxe_qp队列取出wqe,生成对应的skb_buff,然后下发给对应的rxe_dev设备后发送IP包
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    // kernel/drivers/infiniband/sw/rxe/rxe_req.c
    int rxe_requester(void *arg)
    {
    struct rxe_qp *qp = (struct rxe_qp *)arg;
    struct rxe_pkt_info pkt;
    struct sk_buff *skb;
    struct rxe_send_wqe *wqe;
    ...
    wqe = req_next_wqe(qp);
    opcode = next_opcode(qp, wqe, wqe->wr.opcode);
    mask = rxe_opcode[opcode].mask;
    // read payload from mem or inline
    ...
    // set ib packet
    pkt.rxe = rxe;
    pkt.opcode = opcode;
    pkt.qp = qp;
    pkt.psn = qp->req.psn;
    pkt.mask = rxe_opcode[opcode].mask;
    pkt.wqe = wqe;
    av = rxe_get_av(&pkt, &ah);
    ...
    // set sk buffer
    skb = init_req_packet(qp, wqe, opcode, payload, &pkt);
    ...
    ret = finish_packet(qp, av, wqe, &pkt, skb, payload);
    ret = rxe_xmit_packet(qp, &pkt, skb);
    ...
    }

    int rxe_xmit_packet(struct rxe_qp *qp, struct rxe_pkt_info *pkt,
    struct sk_buff *skb)
    {
    int err;
    int is_request = pkt->mask & RXE_REQ_MASK;
    struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
    ...
    rxe_icrc_generate(skb, pkt);

    if (pkt->mask & RXE_LOOPBACK_MASK)
    err = rxe_loopback(skb, pkt);
    else
    err = rxe_send(skb, pkt);
    ...
    if ((qp_type(qp) != IB_QPT_RC) &&
    (pkt->mask & RXE_END_MASK)) {
    pkt->wqe->state = wqe_state_done;
    rxe_run_task(&qp->comp.task, 1);
    }

    rxe_counter_inc(rxe, RXE_CNT_SENT_PKTS);
    ...
    }

4.2 MLX5调用栈分析

  1. 将Work Request链表写入mmap到用户态地址空间的QP send queue buffer中
    • 这里通过MMIO的方式访问位于PCI上的内存区间
  2. 全部Work Request写完后,敲一下硬件的doorbell
    • 将一个ctrl结构体的地址通过MMIO的方式写入硬件的doorbell寄存器

      // rdma-core/providers/mlx5/qp.c

      mmio_write64_be(bf->reg + bf->offset, *(__be64 *)ctrl);

4. ibv_post_recv

4.1 RXE调用栈分析

1
2
3
ibv_post_recv
|-- rxe_post_recv
|-- rxe_post_one_recv
  1. rxe_post_recv

    • 和post_send大同小异,向Recv Queue中写入recv work request
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    // rdma-core/providers/rxe/rxe.c
    static int rxe_post_recv(struct ibv_qp *ibqp,
    struct ibv_recv_wr *recv_wr,
    struct ibv_recv_wr **bad_wr)
    {
    int rc = 0;
    struct rxe_qp *qp = to_rqp(ibqp);
    struct rxe_wq *rq = &qp->rq;
    ...
    pthread_spin_lock(&rq->lock);

    while (recv_wr) {
    rc = rxe_post_one_recv(rq, recv_wr);
    if (rc) {
    *bad_wr = recv_wr;
    break;
    }

    recv_wr = recv_wr->next;
    }

    pthread_spin_unlock(&rq->lock);

    return rc;
    }
  2. rxe_post_one_recv

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    // rdma-core/providers/rxe/rxe.c
    static int rxe_post_one_recv(struct rxe_wq *rq, struct ibv_recv_wr *recv_wr)
    {
    int i;
    struct rxe_recv_wqe *wqe;
    struct rxe_queue_buf *q = rq->queue;
    int length = 0;
    int rc = 0;
    ...
    wqe = (struct rxe_recv_wqe *)producer_addr(q);

    wqe->wr_id = recv_wr->wr_id;
    wqe->num_sge = recv_wr->num_sge;

    memcpy(wqe->dma.sge, recv_wr->sg_list,
    wqe->num_sge*sizeof(*wqe->dma.sge));

    for (i = 0; i < wqe->num_sge; i++)
    length += wqe->dma.sge[i].length;

    wqe->dma.length = length;
    wqe->dma.resid = length;
    wqe->dma.cur_sge = 0;
    wqe->dma.num_sge = wqe->num_sge;
    wqe->dma.sge_offset = 0;

    advance_producer(q);

    out:
    return rc;
    }

5. ibv_poll_cq

5.1 RXE调用栈分析

1
2
ibv_poll_cq
|-- rxe_poll_cq
  1. rxe_poll_cq

    • 从CQ buffer中读取多个Work Completions,写入到应用程序传递下来的 wc buffer中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // rdma-core/providers/rxe/rxe.c
    static int rxe_poll_cq(struct ibv_cq *ibcq, int ne, struct ibv_wc *wc)
    {
    struct rxe_cq *cq = to_rcq(ibcq);
    struct rxe_queue_buf *q;
    int npolled;
    uint8_t *src;

    pthread_spin_lock(&cq->lock);
    q = cq->queue;

    for (npolled = 0; npolled < ne; ++npolled, ++wc) {
    if (queue_empty(q))
    break;

    src = consumer_addr(q);
    memcpy(wc, src, sizeof(*wc));
    advance_consumer(q);
    }

    pthread_spin_unlock(&cq->lock);
    return npolled;
    }

6. RXE对于收包的处理

  1. UDP隧道建立时设置回调函数

    • rdma_rxe module在初始化的时候,会创建一个UDP tunnel,其回调函数被设置为rxe_udp_encap_rec
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // kernel/drivers/infiniband/sw/rxe/rxe_net.c
    static struct socket *rxe_setup_udp_tunnel(struct net *net, __be16 port,
    bool ipv6)
    {
    ...
    udp_cfg.local_udp_port = port;

    /* Create UDP socket */
    err = udp_sock_create(net, &udp_cfg, &sock);
    ...
    tnl_cfg.encap_type = 1;
    tnl_cfg.encap_rcv = rxe_udp_encap_recv;

    /* Setup UDP tunnel */
    setup_udp_tunnel_sock(net, sock, &tnl_cfg);

    return sock;
    }
  2. rxe_udp_encap_recv → rxe_rcv

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // kernel/drivers/infiniband/sw/rxe/rxe_recv.c
    /* rxe_rcv is called from the interface driver */
    void rxe_rcv(struct sk_buff *skb)
    {
    int err;
    struct rxe_pkt_info *pkt = SKB_TO_PKT(skb);
    struct rxe_dev *rxe = pkt->rxe;
    ...
    pkt->opcode = bth_opcode(pkt);
    pkt->psn = bth_psn(pkt);
    pkt->qp = NULL;
    pkt->mask |= rxe_opcode[pkt->opcode].mask;
    ...
    err = rxe_icrc_check(skb, pkt);
    ...
    rxe_counter_inc(rxe, RXE_CNT_RCVD_PKTS);

    if (unlikely(bth_qpn(pkt) == IB_MULTICAST_QPN))
    rxe_rcv_mcast_pkt(rxe, skb);
    else
    rxe_rcv_pkt(pkt, skb);

    return;
    ...
    }

    static inline void rxe_rcv_pkt(struct rxe_pkt_info *pkt, struct sk_buff *skb)
    {
    if (pkt->mask & RXE_REQ_MASK)
    rxe_resp_queue_pkt(pkt->qp, skb);
    else
    rxe_comp_queue_pkt(pkt->qp, skb);
    }
  3. rxe_responder

    • 如果收到的包是RDMA请求的话(RDMA_READ/RDMA_WRITE/RDMA_SEND_REQ)

    • 如果收到包的状态是RESPST_COMPLETE的话,说明一次RDMA通信完成,RXE kernel模块会在CQ中写入一个WR

      1
      2
      3
      4
      // kernel/drivers/infiniband/sw/rxe/rxe_resp.c 
      static enum resp_states do_complete(struct rxe_qp *qp,
      struct rxe_pkt_info *pkt)
      {...}

RDMA Stack实现分析
https://gwzlchn.github.io/202207/rdma-stack-01/
作者
Zelin Wang
发布于
2022年7月4日
更新于
2022年10月23日
许可协议