RXE中的定时器

1. RDMA_RXE中结构体的定义和初始化

  1. 每个 rxe_qp 包含两个重传定时器,一个重传定时器用于指示未收到对端ACK:retrans_timer,另一个重传计时器指示收到了一个 RNR NAK (收到对端receiver 未准备好的NAK包后,同样要定时重传):rnrnak_timer

    1. RNR NAK 可参考IB Spec 1.3 Sec. 9.7.5.2.8 RNR NAK
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // drivers/infiniband/sw/rxe/rxe_verbs.h
    struct rxe_qp{
    ...
    /* Timer for retranmitting packet when ACKs have been lost. RC
    * only. The requester sets it when it is not already
    * started. The responder resets it whenever an ack is
    * received.
    */
    struct timer_list retrans_timer;
    u64 qp_timeout_jiffies;

    /* Timer for handling RNR NAKS. */
    struct timer_list rnr_nak_timer;

    ...
    };
  2. 初始化QP的时候,初始化两个定时器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    // drivers/infiniband/sw/rxe/rxe_qp.c
    static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
    struct ib_qp_init_attr *init, struct ib_udata *udata,
    struct rxe_create_qp_resp __user *uresp)
    {
    ...

    qp->qp_timeout_jiffies = 0; /* Can't be set for UD/UC in modify_qp */
    if (init->qp_type == IB_QPT_RC) {
    timer_setup(&qp->rnr_nak_timer, rnr_nak_timer, 0);
    timer_setup(&qp->retrans_timer, retransmit_timer, 0);
    }
    return 0;
    }

    // drivers/infiniband/sw/rxe/rxe_comp.c
    void retransmit_timer(struct timer_list *t)
    {
    struct rxe_qp *qp = from_timer(qp, t, retrans_timer);

    pr_debug("%s: fired for qp#%d\n", __func__, qp->elem.index);

    if (qp->valid) {
    qp->comp.timeout = 1;
    rxe_run_task(&qp->comp.task, 1);
    }
    }

    // drivers/infiniband/sw/rxe/rxe_req.c
    void rnr_nak_timer(struct timer_list *t)
    {
    struct rxe_qp *qp = from_timer(qp, t, rnr_nak_timer);

    pr_debug("%s: fired for qp#%d\n", __func__, qp_num(qp));

    /* request a send queue retry */
    qp->req.need_retry = 1;
    qp->req.wait_for_rnr_timer = 0;
    rxe_run_task(&qp->req.task, 1);
    }

    // include/linux/timer.h
    #define from_timer(var, callback_timer, timer_fieldname) \
    container_of(callback_timer, typeof(*var), timer_fieldname)
    1. timer_setup API

      • 这是kernel 4.14之后新引入的一个设置定时器的API

        参考 https://lwn.net/Articles/735887/

        功能就是把 timer的回调函数设为callback,flag设为当前CPU ID

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      // include/linux/timer.h
      /**
      * timer_setup - prepare a timer for first use
      * @timer: the timer in question
      * @callback: the function to call when timer expires
      * @flags: any TIMER_* flags
      *
      * Regular timer initialization should use either DEFINE_TIMER() above,
      * or timer_setup(). For timers on the stack, timer_setup_on_stack() must
      * be used and must be balanced with a call to destroy_timer_on_stack().
      */
      #define timer_setup(timer, callback, flags) \
      __init_timer((timer), (callback), (flags))

      // include/linux/timer.h
      #define __init_timer(_timer, _fn, _flags) \
      init_timer_key((_timer), (_fn), (_flags), NULL, NULL)

      // kernel/time/timer.c
      void init_timer_key(struct timer_list *timer,
      void (*func)(struct timer_list *), unsigned int flags,
      const char *name, struct lock_class_key *key)
      {
      debug_init(timer);
      do_init_timer(timer, func, flags, name, key);
      }

      // kernel/time/timer.c
      static void do_init_timer(struct timer_list *timer,
      void (*func)(struct timer_list *),
      unsigned int flags,
      const char *name, struct lock_class_key *key)
      {
      timer->entry.pprev = NULL;
      timer->function = func;
      if (WARN_ON_ONCE(flags & ~TIMER_INIT_FLAGS))
      flags &= TIMER_INIT_FLAGS;
      timer->flags = flags | raw_smp_processor_id();
      lockdep_init_map(&timer->lockdep_map, name, key, 0);
      }
    2. from_timer 是通过 contianer_of 使用 timer_list* 的地址,获得它所在的结构体基地址(即rxe_qp*)

2. retrans_timer重传定时器

  1. 设置重传时间:用户 ibv_modify_qp 的时候,传入的 timeout 代表重传的超时时间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // drivers/infiniband/sw/rxe/rxe_qp.c
    /* called by the modify qp verb */
    int rxe_qp_from_attr(struct rxe_qp *qp, struct ib_qp_attr *attr, int mask,
    struct ib_udata *udata)
    {
    ...
    if (mask & IB_QP_TIMEOUT) {
    qp->attr.timeout = attr->timeout;
    if (attr->timeout == 0) {
    qp->qp_timeout_jiffies = 0;
    } else {
    /* According to the spec, timeout = 4.096 * 2 ^ attr->timeout [us] */
    int j = nsecs_to_jiffies(4096ULL << attr->timeout);

    qp->qp_timeout_jiffies = j ? j : 1;
    }
    }
    ...
    }
  2. 启动重传定时器

    每发送一个rxe packet后,启动重传定时器

    mod_timer() 主要修改 timer->expires

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    // drivers/infiniband/sw/rxe/rxe_req.c
    int rxe_requester(void *arg)
    {
    ...
    err = rxe_xmit_packet(qp, &pkt, skb);
    ...

    update_state(qp, &pkt);
    }

    static void update_state(struct rxe_qp *qp, struct rxe_pkt_info *pkt)
    {
    qp->req.opcode = pkt->opcode;

    if (pkt->mask & RXE_END_MASK)
    qp->req.wqe_index = queue_next_index(qp->sq.queue,
    qp->req.wqe_index);

    qp->need_req_skb = 0;

    if (qp->qp_timeout_jiffies && !timer_pending(&qp->retrans_timer))
    mod_timer(&qp->retrans_timer,
    jiffies + qp->qp_timeout_jiffies);
    }

    // drivers/infiniband/sw/rxe/rxe_comp.c
    void retransmit_timer(struct timer_list *t)
    {
    struct rxe_qp *qp = from_timer(qp, t, retrans_timer);

    pr_debug("%s: fired for qp#%d\n", __func__, qp->elem.index);

    if (qp->valid) {
    qp->comp.timeout = 1;
    rxe_run_task(&qp->comp.task, 1);
    }
    }
    1. 超时重传时,rxe_completer FSM中状态的转换
      • COMPST_GET_ACK 状态中取 skb 为空,切到 COMPST_GET_WQE 态
      • COMPST_GET_WQE 中看到wqe→status == wqe_state_posted,切到COMPST_EXIT
      • COMPST_EXIT 看到 qp->comp.timeout_retry && wqe 满足条件,切到 COMPST_ERROR_RETRY
      • COMPST_ERROR_RETRY ,设置 qp->req.need_retry = 1,调用 rxe_req 做重传
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      static inline enum comp_state get_wqe(struct rxe_qp *qp,
      struct rxe_pkt_info *pkt,
      struct rxe_send_wqe **wqe_p)
      {
      struct rxe_send_wqe *wqe;

      /* we come here whether or not we found a response packet to see if
      * there are any posted WQEs
      */
      wqe = queue_head(qp->sq.queue, QUEUE_TYPE_FROM_CLIENT);
      *wqe_p = wqe;

      /* no WQE or requester has not started it yet */
      if (!wqe || wqe->state == wqe_state_posted)
      return pkt ? COMPST_DONE : COMPST_EXIT;
      ...
      }

      int rxe_completer(void *arg)
      {
      ...
      if (qp->comp.timeout) {
      qp->comp.timeout_retry = 1;
      qp->comp.timeout = 0;
      } else {
      qp->comp.timeout_retry = 0;
      }
      state = COMPST_GET_ACK;

      while (1) {
      pr_debug("qp#%d state = %s\n", qp_num(qp),
      comp_state_name[state]);
      switch (state) {
      case COMPST_GET_ACK:
      skb = skb_dequeue(&qp->resp_pkts);
      if (skb) {
      pkt = SKB_TO_PKT(skb);
      qp->comp.timeout_retry = 0;
      }
      state = COMPST_GET_WQE;
      break;

      case COMPST_GET_WQE:
      state = get_wqe(qp, pkt, &wqe);
      break;
      ...
      case COMPST_EXIT:
      if (qp->comp.timeout_retry && wqe) {
      state = COMPST_ERROR_RETRY;
      break;
      }

      /* re reset the timeout counter if
      * (1) QP is type RC
      * (2) the QP is alive
      * (3) there is a packet sent by the requester that
      * might be acked (we still might get spurious
      * timeouts but try to keep them as few as possible)
      * (4) the timeout parameter is set
      */
      if ((qp_type(qp) == IB_QPT_RC) &&
      (qp->req.state == QP_STATE_READY) &&
      (psn_compare(qp->req.psn, qp->comp.psn) > 0) &&
      qp->qp_timeout_jiffies)
      mod_timer(&qp->retrans_timer,
      jiffies + qp->qp_timeout_jiffies);
      goto exit;
      case COMPST_ERROR_RETRY:
      /* we come here if the retry timer fired and we did
      * not receive a response packet. try to retry the send
      * queue if that makes sense and the limits have not
      * been exceeded. remember that some timeouts are
      * spurious since we do not reset the timer but kick
      * it down the road or let it expire
      */

      /* there is nothing to retry in this case */
      if (!wqe || (wqe->state == wqe_state_posted))
      goto exit;

      /* if we've started a retry, don't start another
      * retry sequence, unless this is a timeout.
      */
      if (qp->comp.started_retry &&
      !qp->comp.timeout_retry)
      goto done;

      if (qp->comp.retry_cnt > 0) {
      if (qp->comp.retry_cnt != 7)
      qp->comp.retry_cnt--;

      /* no point in retrying if we have already
      * seen the last ack that the requester could
      * have caused
      */
      if (psn_compare(qp->req.psn,
      qp->comp.psn) > 0) {
      /* tell the requester to retry the
      * send queue next time around
      */
      rxe_counter_inc(rxe,
      RXE_CNT_COMP_RETRY);
      qp->req.need_retry = 1;
      qp->comp.started_retry = 1;
      rxe_run_task(&qp->req.task, 0);
      }
      goto done;

      } else {
      rxe_counter_inc(rxe, RXE_CNT_RETRY_EXCEEDED);
      wqe->status = IB_WC_RETRY_EXC_ERR;
      state = COMPST_ERROR;
      }
      break;
      ...
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      // drivers/infiniband/sw/rxe/rxe_req.c
      int rxe_requester(void *arg)
      {
      ...

      /* we come here if the retransmit timer has fired
      * or if the rnr timer has fired. If the retransmit
      * timer fires while we are processing an RNR NAK wait
      * until the rnr timer has fired before starting the
      * retry flow
      */
      if (unlikely(qp->req.need_retry && !qp->req.wait_for_rnr_timer)) {
      req_retry(qp);
      qp->req.need_retry = 0;
      }
      }

      static void req_retry(struct rxe_qp *qp)
      {
      struct rxe_send_wqe *wqe;
      unsigned int wqe_index;
      unsigned int mask;
      int npsn;
      int first = 1;
      struct rxe_queue *q = qp->sq.queue;
      unsigned int cons;
      unsigned int prod;

      cons = queue_get_consumer(q, QUEUE_TYPE_FROM_CLIENT);
      prod = queue_get_producer(q, QUEUE_TYPE_FROM_CLIENT);

      qp->req.wqe_index = cons;
      qp->req.psn = qp->comp.psn;
      qp->req.opcode = -1;

      for (wqe_index = cons; wqe_index != prod;
      wqe_index = queue_next_index(q, wqe_index)) {
      wqe = queue_addr_from_index(qp->sq.queue, wqe_index);
      mask = wr_opcode_mask(wqe->wr.opcode, qp);

      if (wqe->state == wqe_state_posted)
      break;

      if (wqe->state == wqe_state_done)
      continue;

      wqe->iova = (mask & WR_ATOMIC_MASK) ?
      wqe->wr.wr.atomic.remote_addr :
      (mask & WR_READ_OR_WRITE_MASK) ?
      wqe->wr.wr.rdma.remote_addr :
      0;

      if (!first || (mask & WR_READ_MASK) == 0) {
      wqe->dma.resid = wqe->dma.length;
      wqe->dma.cur_sge = 0;
      wqe->dma.sge_offset = 0;
      }

      if (first) {
      first = 0;

      if (mask & WR_WRITE_OR_SEND_MASK) {
      npsn = (qp->comp.psn - wqe->first_psn) &
      BTH_PSN_MASK;
      retry_first_write_send(qp, wqe, npsn);
      }

      if (mask & WR_READ_MASK) {
      npsn = (wqe->dma.length - wqe->dma.resid) /
      qp->mtu;
      wqe->iova += npsn * qp->mtu;
      }
      }

      wqe->state = wqe_state_posted;
      }
      }

      req_retry 中根据Send Queue中从 consumer index到 producer index,读WQE

      consumer index 只有当收到ACK包的时候才更新

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      /*
      * IBA Spec. Section 10.7.3.1 SIGNALED COMPLETIONS
      * ---------8<---------8<-------------
      * ...Note that if a completion error occurs, a Work Completion
      * will always be generated, even if the signaling
      * indicator requests an Unsignaled Completion.
      * ---------8<---------8<-------------
      */
      static void do_complete(struct rxe_qp *qp, struct rxe_send_wqe *wqe)
      {
      struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
      struct rxe_cqe cqe;
      bool post;

      /* do we need to post a completion */
      post = ((qp->sq_sig_type == IB_SIGNAL_ALL_WR) ||
      (wqe->wr.send_flags & IB_SEND_SIGNALED) ||
      wqe->status != IB_WC_SUCCESS);

      if (post)
      make_send_cqe(qp, wqe, &cqe);

      queue_advance_consumer(qp->sq.queue, QUEUE_TYPE_FROM_CLIENT);

      ...
      }

3. rnrnak_timer计时器

  1. 收到RNR NAK包后,启动 rnr_nak_timer
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    int rxe_completer(void *arg)
    {
    ...
    case COMPST_RNR_RETRY:
    /* we come here if we received an RNR NAK */
    if (qp->comp.rnr_retry > 0) {
    if (qp->comp.rnr_retry != 7)
    qp->comp.rnr_retry--;

    /* don't start a retry flow until the
    * rnr timer has fired
    */
    qp->req.wait_for_rnr_timer = 1;
    pr_debug("qp#%d set rnr nak timer\n",
    qp_num(qp));
    mod_timer(&qp->rnr_nak_timer,
    jiffies + rnrnak_jiffies(aeth_syn(pkt)
    & ~AETH_TYPE_MASK));
    goto exit;
    } else {
    rxe_counter_inc(rxe,
    RXE_CNT_RNR_RETRY_EXCEEDED);
    wqe->status = IB_WC_RNR_RETRY_EXC_ERR;
    state = COMPST_ERROR;
    }
    break;
    ...
    }

    // drivers/infiniband/sw/rxe/rxe_req.c
    void rnr_nak_timer(struct timer_list *t)
    {
    struct rxe_qp *qp = from_timer(qp, t, rnr_nak_timer);

    pr_debug("%s: fired for qp#%d\n", __func__, qp_num(qp));

    /* request a send queue retry */
    qp->req.need_retry = 1;
    qp->req.wait_for_rnr_timer = 0;
    rxe_run_task(&qp->req.task, 1);
    }
    之后流程依然是rxe_requester→ req_retry

RXE中的定时器
https://gwzlchn.github.io/202210/rdma-stack-03/
作者
Zelin Wang
发布于
2022年10月15日
更新于
2022年10月23日
许可协议