From patchwork Tue Oct 5 06:55:31 2010 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Bernard Metzler X-Patchwork-Id: 66764 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 0A426B6EEE for ; Tue, 5 Oct 2010 17:55:37 +1100 (EST) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S932216Ab0JEGze (ORCPT ); Tue, 5 Oct 2010 02:55:34 -0400 Received: from mtagate4.de.ibm.com ([195.212.17.164]:34859 "EHLO mtagate4.de.ibm.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S932206Ab0JEGzd (ORCPT ); Tue, 5 Oct 2010 02:55:33 -0400 Received: from d12nrmr1607.megacenter.de.ibm.com (d12nrmr1607.megacenter.de.ibm.com [9.149.167.49]) by mtagate4.de.ibm.com (8.13.1/8.13.1) with ESMTP id o956tWpa014937; Tue, 5 Oct 2010 06:55:32 GMT Received: from d12av03.megacenter.de.ibm.com (d12av03.megacenter.de.ibm.com [9.149.165.213]) by d12nrmr1607.megacenter.de.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id o956tWiw3154030; Tue, 5 Oct 2010 08:55:32 +0200 Received: from d12av03.megacenter.de.ibm.com (loopback [127.0.0.1]) by d12av03.megacenter.de.ibm.com (8.12.11.20060308/8.13.3) with ESMTP id o956tV3G026085; Tue, 5 Oct 2010 08:55:31 +0200 Received: from inn.zurich.ibm.com (inn.zurich.ibm.com [9.4.4.229]) by d12av03.megacenter.de.ibm.com (8.12.11.20060308/8.12.11) with ESMTP id o956tVMD026036; Tue, 5 Oct 2010 08:55:31 +0200 Received: from localhost.localdomain (achilles.zurich.ibm.com [9.4.243.2]) by inn.zurich.ibm.com (AIX5.3/8.13.4/8.13.4) with ESMTP id o956tVH3286758; Tue, 5 Oct 2010 08:55:31 +0200 From: Bernard Metzler To: netdev@vger.kernel.org Cc: linux-rdma@vger.kernel.org, Bernard Metzler Subject: [PATCH] SIW: Transmit path Date: Tue, 5 Oct 2010 08:55:31 +0200 Message-Id: <1286261731-5233-1-git-send-email-bmt@zurich.ibm.com> X-Mailer: git-send-email 1.5.4.3 Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org --- drivers/infiniband/hw/siw/siw_qp_tx.c | 1309 +++++++++++++++++++++++++++++++++ 1 files changed, 1309 insertions(+), 0 deletions(-) create mode 100644 drivers/infiniband/hw/siw/siw_qp_tx.c diff --git a/drivers/infiniband/hw/siw/siw_qp_tx.c b/drivers/infiniband/hw/siw/siw_qp_tx.c new file mode 100644 index 0000000..ef774eb --- /dev/null +++ b/drivers/infiniband/hw/siw/siw_qp_tx.c @@ -0,0 +1,1309 @@ +/* + * Software iWARP device driver for Linux + * + * Authors: Bernard Metzler + * + * Copyright (c) 2008-2010, IBM Corporation + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * - Neither the name of IBM nor the names of its contributors may be + * used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "siw.h" +#include "siw_obj.h" +#include "siw_cm.h" + +static int zcopy_tx = 1; +module_param(zcopy_tx, int, 0644); +MODULE_PARM_DESC(zcopy_tx, "Zero copy user data transmit if possible"); + +DEFINE_PER_CPU(atomic_t, siw_workq_len); + +static inline int siw_crc_txhdr(struct siw_iwarp_tx *ctx) +{ + crypto_hash_init(&ctx->mpa_crc_hd); + return siw_crc_array(&ctx->mpa_crc_hd, (u8 *)&ctx->pkt, + ctx->ctrl_len); +} + +#define PKT_FRAGMENTED 1 +#define PKT_COMPLETE 0 + +/* + * siw_qp_prepare_tx() + * + * Prepare tx state for sending out one fpdu. Builds complete pkt + * if no user data or only immediate data are present. + * + * returns PKT_COMPLETE if complete pkt built, PKT_FRAGMENTED otherwise. + */ +static int siw_qp_prepare_tx(struct siw_iwarp_tx *c_tx) +{ + struct siw_wqe *wqe = c_tx->wqe; + u32 *crc = NULL; + + dprint(DBG_TX, "(QP%d):\n", TX_QPID(c_tx)); + + switch (wr_type(wqe)) { + + case SIW_WR_RDMA_READ_REQ: + memcpy(&c_tx->pkt.ctrl, + &iwarp_pktinfo[RDMAP_RDMA_READ_REQ].ctrl, + sizeof(struct iwarp_ctrl)); + + c_tx->pkt.rreq.rsvd = 0; + c_tx->pkt.rreq.ddp_qn = htonl(RDMAP_UNTAGGED_QN_RDMA_READ); + c_tx->pkt.rreq.ddp_msn = + htonl(++c_tx->ddp_msn[RDMAP_UNTAGGED_QN_RDMA_READ]); + c_tx->pkt.rreq.ddp_mo = 0; + c_tx->pkt.rreq.sink_stag = htonl(wqe->wr.rread.sge[0].lkey); + c_tx->pkt.rreq.sink_to = + cpu_to_be64(wqe->wr.rread.sge[0].addr); /* abs addr! */ + c_tx->pkt.rreq.source_stag = htonl(wqe->wr.rread.rtag); + c_tx->pkt.rreq.source_to = cpu_to_be64(wqe->wr.rread.raddr); + c_tx->pkt.rreq.read_size = htonl(wqe->bytes); + + dprint(DBG_TX, ": RREQ: Sink: %x, 0x%016llx\n", + wqe->wr.rread.sge[0].lkey, wqe->wr.rread.sge[0].addr); + + c_tx->ctrl_len = sizeof(struct iwarp_rdma_rreq); + crc = &c_tx->pkt.rreq_pkt.crc; + break; + + case SIW_WR_SEND: + if (wr_flags(wqe) & IB_SEND_SOLICITED) + memcpy(&c_tx->pkt.ctrl, + &iwarp_pktinfo[RDMAP_SEND_SE].ctrl, + sizeof(struct iwarp_ctrl)); + else + memcpy(&c_tx->pkt.ctrl, + &iwarp_pktinfo[RDMAP_SEND].ctrl, + sizeof(struct iwarp_ctrl)); + + c_tx->pkt.send.ddp_qn = RDMAP_UNTAGGED_QN_SEND; + c_tx->pkt.send.ddp_msn = + htonl(++c_tx->ddp_msn[RDMAP_UNTAGGED_QN_SEND]); + c_tx->pkt.send.ddp_mo = 0; + c_tx->pkt.send.rsvd = 0; + + c_tx->ctrl_len = sizeof(struct iwarp_send); + + if (!wqe->bytes) + crc = &c_tx->pkt.send_pkt.crc; + break; + + case SIW_WR_RDMA_WRITE: + memcpy(&c_tx->pkt.ctrl, &iwarp_pktinfo[RDMAP_RDMA_WRITE].ctrl, + sizeof(struct iwarp_ctrl)); + + c_tx->pkt.rwrite.sink_stag = htonl(wqe->wr.write.rtag); + c_tx->pkt.rwrite.sink_to = cpu_to_be64(wqe->wr.write.raddr); + c_tx->ctrl_len = sizeof(struct iwarp_rdma_write); + + if (!wqe->bytes) + crc = &c_tx->pkt.write_pkt.crc; + break; + + case SIW_WR_RDMA_READ_RESP: + memcpy(&c_tx->pkt.ctrl, + &iwarp_pktinfo[RDMAP_RDMA_READ_RESP].ctrl, + sizeof(struct iwarp_ctrl)); + + /* NBO */ + c_tx->pkt.rresp.sink_stag = wqe->wr.rresp.rtag; + c_tx->pkt.rresp.sink_to = cpu_to_be64(wqe->wr.rresp.raddr); + + c_tx->ctrl_len = sizeof(struct iwarp_rdma_rresp); + + dprint(DBG_TX, ": RRESP: Sink: %x, 0x%016llx\n", + wqe->wr.rresp.rtag, wqe->wr.rresp.raddr); + + if (!wqe->bytes) + crc = &c_tx->pkt.rresp_pkt.crc; + break; + + default: + dprint(DBG_ON, "Unsupported WQE type %d\n", wr_type(wqe)); + BUG(); + break; + } + c_tx->ctrl_sent = 0; + c_tx->sge_idx = 0; + c_tx->sge_off = 0; + c_tx->pg_idx = 0; + c_tx->umem_chunk = NULL; + + /* + * Do complete CRC if enabled and short packet + */ + if (crc) { + *crc = 0; + if (c_tx->crc_enabled) { + if (siw_crc_txhdr(c_tx) != 0) + return -EINVAL; + crypto_hash_final(&c_tx->mpa_crc_hd, (u8 *)crc); + } + } + c_tx->ctrl_len += MPA_CRC_SIZE; + + /* + * Allow direct sending out of user buffer if WR is non signalled + * and payload is over threshold and no CRC is enabled. + * Per RDMA verbs, the application should not change the send buffer + * until the work completed. In iWarp, work completion is only + * local delivery to TCP. TCP may reuse the buffer for + * retransmission or may even did not yet sent the data. Changing + * unsent data also breaks the CRC, if applied. + */ + if (zcopy_tx && + !(wr_flags(wqe) & IB_SEND_SIGNALED) && + wqe->bytes > SENDPAGE_THRESH && + wr_type(wqe) != SIW_WR_RDMA_READ_REQ) + c_tx->use_sendpage = 1; + else + c_tx->use_sendpage = 0; + + return crc == NULL ? PKT_FRAGMENTED : PKT_COMPLETE; +} + +/* + * Send out one complete FPDU. Used for fixed sized packets like + * Read Requests or zero length SENDs, WRITEs, READ.responses. + * Also used for pushing an FPDU hdr only. + */ +static inline int siw_tx_ctrl(struct siw_iwarp_tx *c_tx, struct socket *s, + int flags) +{ + struct msghdr msg = {.msg_flags = flags}; + struct kvec iov = { + .iov_base = (char *)&c_tx->pkt.ctrl + c_tx->ctrl_sent, + .iov_len = c_tx->ctrl_len - c_tx->ctrl_sent}; + + int rv = kernel_sendmsg(s, &msg, &iov, 1, + c_tx->ctrl_len - c_tx->ctrl_sent); + + dprint(DBG_TX, " (QP%d): op=%d, %d of %d sent (%d)\n", + TX_QPID(c_tx), c_tx->pkt.ctrl.opcode, + c_tx->ctrl_sent + rv, c_tx->ctrl_len, rv); + + if (rv >= 0) { + c_tx->ctrl_sent += rv; + + if (c_tx->ctrl_sent == c_tx->ctrl_len) { + siw_dprint_hdr(&c_tx->pkt.hdr, TX_QPID(c_tx), + "CTRL sent"); + if (!(flags & MSG_MORE)) + c_tx->new_tcpseg = 1; + rv = 0; + } else if (c_tx->ctrl_sent < c_tx->ctrl_len) + rv = -EAGAIN; + else + BUG(); + } + return rv; +} + +/* + * 0copy TCP transmit interface. + * + * Push page array page by page or in one shot. + * Pushing the whole page array requires the inner do_tcp_sendpages + * function to be exported by the kernel. + */ +static int siw_tcp_sendpages(struct socket *s, struct page **page, + int offset, size_t size) +{ + int rv = 0; + +#ifdef SIW_SENDPAGES_EXPORT + struct sock *sk = s->sk; + + if (!(sk->sk_route_caps & NETIF_F_SG) || + !(sk->sk_route_caps & NETIF_F_ALL_CSUM)) { + /* FIXME: + * This should also be handled in a + * loop + */ + return -EFAULT; + } + + lock_sock(sk); + TCP_CHECK_TIMER(sk); + + /* + * just return what sendpages has return + */ + rv = do_tcp_sendpages(sk, page, offset, size, MSG_MORE|MSG_DONTWAIT); + + TCP_CHECK_TIMER(sk); + release_sock(sk); + if (rv == -EAGAIN) + rv = 0; +#else + /* + * If do_tcp_sendpages() function is not exported + * push page by page + */ + size_t todo = size; + int i; + + for (i = 0; size > 0; i++) { + size_t bytes = min_t(size_t, PAGE_SIZE - offset, size); + + rv = s->ops->sendpage(s, page[i], offset, bytes, + MSG_MORE|MSG_DONTWAIT); + if (rv <= 0) + break; + + size -= rv; + + if (rv != bytes) + break; + + offset = 0; + } + if (rv >= 0 || rv == -EAGAIN) + rv = todo - size; +#endif + return rv; +} + +/* + * siw_0copy_tx() + * + * Pushes list of pages to TCP socket. If pages from multiple + * SGE's, all referenced pages of each SGE are pushed in one + * shot. + */ +static int siw_0copy_tx(struct socket *s, struct page **page, + struct siw_sge *sge, unsigned int offset, + unsigned int size) +{ + int i = 0, sent = 0, rv; + int sge_bytes = min(sge->len - offset, size); + + offset = (sge->addr + offset) & ~PAGE_MASK; + + while (sent != size) { + + rv = siw_tcp_sendpages(s, &page[i], offset, sge_bytes); + if (rv >= 0) { + sent += rv; + if (size == sent || sge_bytes > rv) + break; + + i += PAGE_ALIGN(sge_bytes + offset) >> PAGE_SHIFT; + sge++; + sge_bytes = min(sge->len, size - sent); + offset = sge->addr & ~PAGE_MASK; + } else { + sent = rv; + break; + } + } + return sent; +} + +/* + * siw_tx_umem_init() + * + * Resolve memory chunk and update page index pointer + * + * @chunk: Umem Chunk to be updated + * @p_idx Page Index to be updated + * @mr: Memory Region + * @va: Virtual Address within MR + * + */ +static void siw_tx_umem_init(struct ib_umem_chunk **chunk, int *page_index, + struct siw_mr *mr, u64 va) +{ + struct ib_umem_chunk *cp; + int p_ix; + + BUG_ON(va < mr->mem.va); + va -= mr->mem.va & PAGE_MASK; + /* + * equivalent to + * va += mr->umem->offset; + * va = va >> PAGE_SHIFT; + */ + + p_ix = va >> PAGE_SHIFT; + + list_for_each_entry(cp, &mr->umem->chunk_list, list) { + if (p_ix < cp->nents) + break; + p_ix -= cp->nents; + } + BUG_ON(p_ix >= cp->nents); + + dprint(DBG_MM, "(): New chunk 0x%p: Page idx %d, nents %d\n", + cp, p_ix, cp->nents); + + *chunk = cp; + *page_index = p_ix; + + return; +} + +/* + * update memory chunk and page index from given starting point + * before current transmit described by: c_tx->sge_off, + * sge->addr, c_tx->pg_idx, and c_tx->umem_chunk + */ +static inline void +siw_umem_chunk_update(struct siw_iwarp_tx *c_tx, struct siw_mr *mr, + struct siw_sge *sge, unsigned int off) +{ + struct ib_umem_chunk *chunk = c_tx->umem_chunk; + u64 va_start = sge->addr + c_tx->sge_off; + + off += (unsigned int)(va_start & ~PAGE_MASK); /* + first page offset */ + off >>= PAGE_SHIFT; /* bytes offset becomes pages offset */ + + list_for_each_entry_from(chunk, &mr->umem->chunk_list, list) { + if (c_tx->pg_idx + off < chunk->nents) + break; + off -= chunk->nents - c_tx->pg_idx; + c_tx->pg_idx = 0; + } + c_tx->pg_idx += off; + + c_tx->umem_chunk = chunk; +} + +#define MAX_TRAILER 8 +#define MAX_ARRAY 130 /* Max number of kernel_sendmsg elements */ + +static inline void +siw_save_txstate(struct siw_iwarp_tx *c_tx, struct ib_umem_chunk *chunk, + unsigned int pg_idx, unsigned int sge_idx, + unsigned int sge_off) +{ + c_tx->umem_chunk = chunk; + c_tx->pg_idx = pg_idx; + c_tx->sge_idx = sge_idx; + c_tx->sge_off = sge_off; +} +/* + * Write out iov referencing hdr, data and trailer of current FPDU. + * Update transmit state dependent on write return status + */ +static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s) +{ + struct siw_wqe *wqe = c_tx->wqe; + struct siw_sge *sge = &wqe->wr.sgl.sge[c_tx->sge_idx], + *first_sge = sge; + struct siw_mr *mr = siw_mem2mr(sge->mem.obj); + struct ib_umem_chunk *chunk = c_tx->umem_chunk; + + struct kvec iov[MAX_ARRAY]; + struct page *page_array[MAX_ARRAY]; + struct msghdr msg = {.msg_flags = MSG_DONTWAIT}; + + int seg = 0, do_crc = c_tx->do_crc, kbuf = 0, + rv; + unsigned int data_len = c_tx->bytes_unsent, + hdr_len = 0, + trl_len = 0, + sge_off = c_tx->sge_off, + sge_idx = c_tx->sge_idx, + pg_idx = c_tx->pg_idx; + + if (SIW_INLINED_DATA(wqe)) { + kbuf = 1; + chunk = 0; + } + + if (c_tx->state == SIW_SEND_HDR) { + if (c_tx->use_sendpage) { + rv = siw_tx_ctrl(c_tx, s, MSG_DONTWAIT|MSG_MORE); + if (rv) + goto done; + + c_tx->state = SIW_SEND_DATA; + } else { + iov[0].iov_base = + (char *)&c_tx->pkt.ctrl + c_tx->ctrl_sent; + iov[0].iov_len = hdr_len = + c_tx->ctrl_len - c_tx->ctrl_sent; + seg = 1; + siw_dprint_hdr(&c_tx->pkt.hdr, TX_QPID(c_tx), + "HDR to send: "); + } + } + + wqe->processed += data_len; + + while (data_len) { /* walk the list of SGE's */ + unsigned int sge_len = min(sge->len - sge_off, data_len); + unsigned int fp_off = (sge->addr + sge_off) & ~PAGE_MASK; + + BUG_ON(!sge_len); + + if (kbuf) { + /* + * In kernel buffers to be tx'ed. + */ + iov[seg].iov_base = + (void *)(unsigned long)(sge->addr + sge_off); + iov[seg].iov_len = sge_len; + if (do_crc) + siw_crc_array(&c_tx->mpa_crc_hd, + iov[seg].iov_base, sge_len); + sge_off += sge_len; + data_len -= sge_len; + seg++; + goto sge_done; + } + while (sge_len) { + struct scatterlist *sl; + size_t plen; + + if (!chunk) { + mr = siw_mem2mr(sge->mem.obj); + siw_tx_umem_init(&chunk, &pg_idx, mr, + sge->addr + sge_off); + + if (!c_tx->umem_chunk) + /* Starting first tx for this WQE */ + siw_save_txstate(c_tx, chunk, pg_idx, + sge_idx, sge_off); + } + sl = &chunk->page_list[pg_idx]; + plen = min((int)PAGE_SIZE - fp_off, sge_len); + + BUG_ON(plen <= 0); + + page_array[seg] = sg_page(sl); + + if (!c_tx->use_sendpage) { + iov[seg].iov_base = kmap(sg_page(sl)) + fp_off; + iov[seg].iov_len = plen; + } + if (do_crc) + siw_crc_sg(&c_tx->mpa_crc_hd, sl, fp_off, plen); + + sge_len -= plen; + sge_off += plen; + data_len -= plen; + + if (plen + fp_off == PAGE_SIZE && + sge_off < sge->len && ++pg_idx == chunk->nents) { + chunk = mem_chunk_next(chunk); + pg_idx = 0; + } + fp_off = 0; + if (++seg > MAX_ARRAY) { + dprint(DBG_ON, "(QP%d): Too many fragments\n", + TX_QPID(c_tx)); + if (!kbuf) { + int i = (hdr_len > 0) ? 1 : 0; + seg--; + while (i < seg) + kunmap(page_array[i++]); + } + wqe->processed = 0; + rv = -EINVAL; + goto done_crc; + } + } +sge_done: + /* Update SGE variables at end of SGE */ + if (sge_off == sge->len && wqe->processed < wqe->bytes) { + sge_idx++; + sge++; + sge_off = 0; + chunk = NULL; + } + } + /* trailer */ + if (likely(c_tx->state != SIW_SEND_TRAILER)) { + iov[seg].iov_base = &c_tx->trailer.pad[4 - c_tx->pad]; + iov[seg].iov_len = trl_len = MAX_TRAILER - (4 - c_tx->pad); + } else { + iov[seg].iov_base = &c_tx->trailer.pad[c_tx->ctrl_sent]; + iov[seg].iov_len = trl_len = MAX_TRAILER - c_tx->ctrl_sent; + } + + if (c_tx->pad) { + *(u32 *)c_tx->trailer.pad = 0; + if (do_crc) + siw_crc_array(&c_tx->mpa_crc_hd, + (u8 *)&c_tx->trailer.crc - c_tx->pad, + c_tx->pad); + } + if (!c_tx->crc_enabled) + c_tx->trailer.crc = 0; + else if (do_crc) + crypto_hash_final(&c_tx->mpa_crc_hd, (u8 *)&c_tx->trailer.crc); + + data_len = c_tx->bytes_unsent; + + if (c_tx->tcp_seglen >= (int)MPA_MIN_FRAG && TX_MORE_WQE(TX_QP(c_tx))) { + msg.msg_flags |= MSG_MORE; + c_tx->new_tcpseg = 0; + } else + c_tx->new_tcpseg = 1; + + if (c_tx->use_sendpage) { + rv = siw_0copy_tx(s, page_array, first_sge, c_tx->sge_off, + data_len); + if (rv == data_len) { + rv = kernel_sendmsg(s, &msg, &iov[seg], 1, trl_len); + if (rv > 0) + rv += data_len; + else + rv = data_len; + } + } else { + rv = kernel_sendmsg(s, &msg, iov, seg + 1, + hdr_len + data_len + trl_len); + if (!kbuf) { + int i = (hdr_len > 0) ? 1 : 0; + while (i < seg) + kunmap(page_array[i++]); + } + } + if (rv < (int)hdr_len) { + /* Not even complete hdr pushed or negative rv */ + wqe->processed -= data_len; + if (rv >= 0) { + c_tx->ctrl_sent += rv; + rv = -EAGAIN; + } + goto done_crc; + } + + rv -= hdr_len; + + if (rv >= (int)data_len) { + /* all user data pushed to TCP or no data to push */ + if (data_len > 0 && wqe->processed < wqe->bytes) + /* Save the current state for next tx */ + siw_save_txstate(c_tx, chunk, pg_idx, sge_idx, sge_off); + + rv -= data_len; + + if (rv == trl_len) /* all pushed */ + rv = 0; + else { + c_tx->state = SIW_SEND_TRAILER; + c_tx->ctrl_len = MAX_TRAILER; + c_tx->ctrl_sent = rv + 4 - c_tx->pad; + c_tx->bytes_unsent = 0; + rv = -EAGAIN; + } + + } else if (data_len > 0) { + /* Maybe some user data pushed to TCP */ + c_tx->state = SIW_SEND_DATA; + wqe->processed -= data_len - rv; + + if (rv) { + /* + * Some bytes out. Recompute tx state based + * on old state and bytes pushed + */ + c_tx->bytes_unsent -= rv; + sge = &wqe->wr.sgl.sge[c_tx->sge_idx]; + + if (c_tx->sge_idx == sge_idx && c_tx->umem_chunk) + /* + * same SGE as starting SGE for this FPDU + */ + siw_umem_chunk_update(c_tx, mr, sge, rv); + else { + while (sge->len <= c_tx->sge_off + rv) { + rv -= sge->len - c_tx->sge_off; + sge = &wqe->wr.sgl.sge[++c_tx->sge_idx]; + c_tx->sge_off = 0; + } + c_tx->umem_chunk = NULL; + } + c_tx->sge_off += rv; + BUG_ON(c_tx->sge_off >= sge->len); + } + rv = -EAGAIN; + } +done_crc: + c_tx->do_crc = 0; +done: + return rv; +} + +static void siw_calculate_tcpseg(struct siw_iwarp_tx *c_tx, struct socket *s) +{ + /* + * refresh TCP segement len if we start a new segment or + * remaining segment len is less than MPA_MIN_FRAG or + * the socket send buffer is empty. + */ + if (c_tx->new_tcpseg || c_tx->tcp_seglen < (int)MPA_MIN_FRAG || + !tcp_send_head(s->sk)) + + c_tx->tcp_seglen = get_tcp_mss(s->sk); +} + + +/* + * siw_unseg_txlen() + * + * Compute complete tcp payload len if packet would not + * get fragmented + */ +static inline int siw_unseg_txlen(struct siw_iwarp_tx *c_tx) +{ + int pad = c_tx->bytes_unsent ? -c_tx->bytes_unsent & 0x3 : 0; + + return c_tx->bytes_unsent + c_tx->ctrl_len + pad + MPA_CRC_SIZE; +} + + +/* + * siw_prepare_fpdu() + * + * Prepares transmit context to send out one FPDU if FPDU will contain + * user data and user data are not immediate data. + * Checks and locks involved memory segments of data to be sent. + * Computes maximum FPDU length to fill up TCP MSS if possible. + * + * @qp: QP from which to transmit + * @wqe: Current WQE causing transmission + * + * TODO: Take into account real available sendspace on socket + * to avoid header misalignment due to send pausing within + * fpdu transmission + */ +int siw_prepare_fpdu(struct siw_qp *qp, struct siw_wqe *wqe) +{ + struct siw_iwarp_tx *c_tx = &qp->tx_ctx; + int rv = 0; + + /* + * TODO: TCP Fragmentation dynamics needs for further investigation. + * Resuming SQ processing may start with full-sized packet + * or short packet which resets MSG_MORE and thus helps + * to synchronize. + * This version resumes with short packet. + */ + c_tx->ctrl_len = iwarp_pktinfo[c_tx->pkt.ctrl.opcode].hdr_len; + c_tx->ctrl_sent = 0; + + /* + * Update target buffer offset if any + */ + if (!c_tx->pkt.ctrl.t) { + /* Untagged message */ + c_tx->pkt.c_untagged.ddp_mo = cpu_to_be32(wqe->processed); + } else { + /* Tagged message */ + if (wr_type(wqe) == SIW_WR_RDMA_READ_RESP) { + c_tx->pkt.c_tagged.ddp_to = + cpu_to_be64(wqe->wr.rresp.raddr + wqe->processed); + } else { + c_tx->pkt.c_tagged.ddp_to = + cpu_to_be64(wqe->wr.write.raddr + wqe->processed); + } + } + + /* First guess: one big unsegmented DDP segment */ + c_tx->bytes_unsent = wqe->bytes - wqe->processed; + c_tx->tcp_seglen -= siw_unseg_txlen(c_tx); + + if (c_tx->tcp_seglen >= 0) { + /* Whole DDP segment fits into current TCP segment */ + c_tx->pkt.ctrl.l = 1; + c_tx->pad = -c_tx->bytes_unsent & 0x3; + } else { + /* Trim DDP payload to fit into current TCP segment */ + c_tx->bytes_unsent += c_tx->tcp_seglen; + c_tx->bytes_unsent &= ~0x3; + c_tx->pad = 0; + c_tx->pkt.ctrl.l = 0; + } + c_tx->pkt.ctrl.mpa_len = + htons(c_tx->ctrl_len + c_tx->bytes_unsent - MPA_HDR_SIZE); + +#ifdef SIW_TX_FULLSEGS + c_tx->fpdu_len = + c_tx->ctrl_len + c_tx->bytes_unsent + c_tx->pad + MPA_CRC_SIZE; +#endif + /* + * Init MPA CRC computation + */ + if (c_tx->crc_enabled) { + siw_crc_txhdr(c_tx); + c_tx->do_crc = 1; + } + if (c_tx->bytes_unsent && !SIW_INLINED_DATA(wqe)) { + struct siw_sge *sge = &wqe->wr.sgl.sge[c_tx->sge_idx]; + /* + * Reference memory to be tx'd + */ + BUG_ON(c_tx->sge_idx > wqe->wr.sgl.num_sge - 1); + + if (wr_type(wqe) != SIW_WR_RDMA_READ_RESP) + rv = siw_check_sgl(qp->pd, sge, SR_MEM_LREAD, + c_tx->sge_off, c_tx->bytes_unsent); + else + rv = siw_check_sge(qp->pd, sge, SR_MEM_RREAD, + c_tx->sge_off, c_tx->bytes_unsent); + } + return rv; +} + +#ifdef SIW_TX_FULLSEGS +static inline int siw_test_wspace(struct socket *s, struct siw_iwarp_tx *c_tx) +{ + struct sock *sk = s->sk; + int rv = 0; + + lock_sock(sk); + if (sk_stream_wspace(sk) < (int)c_tx->fpdu_len) { + set_bit(SOCK_NOSPACE, &s->flags); + rv = -EAGAIN; + } + release_sock(sk); + + return rv; +} +#endif +/* + * siw_qp_sq_proc_tx() + * + * Process one WQE which needs transmission on the wire. + * Return with: + * -EAGAIN, if handover to tcp remained incomplete + * 0, if handover to tcp complete + * < 0, if other errors happend. + * + * @qp: QP to send from + * @wqe: WQE causing transmission + */ +static int siw_qp_sq_proc_tx(struct siw_qp *qp, struct siw_wqe *wqe) +{ + struct siw_iwarp_tx *c_tx = &qp->tx_ctx; + struct socket *s = qp->attrs.llp_stream_handle; + int rv = 0; + + + if (wqe->wr_status == SR_WR_QUEUED) { + wqe->wr_status = SR_WR_INPROGRESS; + + siw_calculate_tcpseg(c_tx, s); + + rv = siw_qp_prepare_tx(c_tx); + if (rv == PKT_FRAGMENTED) { + c_tx->state = SIW_SEND_HDR; + rv = siw_prepare_fpdu(qp, wqe); + if (rv) + return rv; + } else if (rv == PKT_COMPLETE) + c_tx->state = SIW_SEND_SHORT_FPDU; + else + goto tx_done; + } +next_segment: +#ifdef SIW_TX_FULLSEGS + rv = siw_test_wspace(s, c_tx); + if (rv < 0) + goto tx_done; +#endif + + if (c_tx->state == SIW_SEND_SHORT_FPDU) { + enum siw_wr_opcode tx_type = wr_type(wqe); + + /* + * Always end current TCP segment (no MSG_MORE flag): + * trying to fill segment would result in excessive delay. + */ + rv = siw_tx_ctrl(c_tx, s, MSG_DONTWAIT); + + if (!rv && tx_type != SIW_WR_RDMA_READ_REQ) + wqe->processed = wqe->bytes; + + goto tx_done; + + } else + rv = siw_tx_hdt(c_tx, s); + + if (!rv) { + /* Verbs, 6.4.: Try stopping sending after a full DDP segment + * if the connection goes down (== peer halfclose) + */ + if (unlikely(c_tx->tx_suspend)) { + rv = -ECONNABORTED; + goto tx_done; + } + /* + * One segment sent. Processing completed if last segment. + * Do next segment otherwise. Stop if tx error. + */ + if (c_tx->pkt.ctrl.l == 1) { + dprint(DBG_TX, "(QP%d): WR completed\n", QP_ID(qp)); + goto tx_done; + } + c_tx->state = SIW_SEND_HDR; + + siw_calculate_tcpseg(c_tx, s); + + rv = siw_prepare_fpdu(qp, wqe); + if (!rv) + goto next_segment; + } +tx_done: + return rv; +} + + +/* + * siw_wqe_sq_processed() + * + * Called after WQE processing completed. + * If WQE is not of signalled typ, it can be released. + * If the ORQ is empty, a signalled WQE is attached to the CQ. + * Otherwise, it is appended to the end of the ORQ for later + * completion. To keep WQE ordering, the ORQ is always consumed FIFO. + */ +static void siw_wqe_sq_processed(struct siw_wqe *wqe, struct siw_qp *qp) +{ + unsigned long flags; + LIST_HEAD(c_list); + + if (!(wr_flags(wqe) & IB_SEND_SIGNALED)) { + atomic_inc(&qp->sq_space); + siw_wqe_put(wqe); + return; + } + lock_orq_rxsave(qp, flags); + + if (ORQ_EMPTY(qp)) { + unlock_orq_rxsave(qp, flags); + dprint(DBG_WR|DBG_TX, + "(QP%d): Immediate completion, wr_type %d\n", + QP_ID(qp), wr_type(wqe)); + list_add_tail(&wqe->list, &c_list); + siw_sq_complete(&c_list, qp, 1, wr_flags(wqe)); + } else { + list_add_tail(&wqe->list, &qp->orq); + dprint(DBG_WR|DBG_TX, + "(QP%d): Defer completion, wr_type %d\n", + QP_ID(qp), wr_type(wqe)); + } +} + +int siw_qp_sq_proc_local(struct siw_qp *qp, struct siw_wqe *wqe) +{ + printk(KERN_ERR "local WR's not yet implemented\n"); + BUG(); + return 0; +} + + +/* + * siw_qp_sq_process() + * + * Core TX path routine for RDMAP/DDP/MPA using a TCP kernel socket. + * Sends RDMAP payload for the current SQ WR @wqe of @qp in one or more + * MPA FPDUs, each containing a DDP segment. + * + * SQ processing may occur in user context as a result of posting + * new WQE's or from siw_sq_work_handler() context. + * + * SQ processing may get paused anytime, possibly in the middle of a WR + * or FPDU, if insufficient send space is available. SQ processing + * gets resumed from siw_sq_work_handler(), if send space becomes + * available again. + * + * Must be called with the QP state read-locked. + * + * TODO: + * To be solved more seriously: an outbound RREQ can be satisfied + * by the corresponding RRESP _before_ it gets assigned to the ORQ. + * This happens regularly in RDMA READ via loopback case. Since both + * outbound RREQ and inbound RRESP can be handled by the same CPU + * locking the ORQ is dead-lock prone and thus not an option. + * Tentatively, the RREQ gets assigned to the ORQ _before_ being + * sent (and pulled back in case of send failure). + */ +int siw_qp_sq_process(struct siw_qp *qp, int user_ctx) +{ + struct siw_wqe *wqe; + enum siw_wr_opcode tx_type; + unsigned long flags; + int rv = 0; + int max_burst; + + if (user_ctx) + max_burst = SQ_USER_MAXBURST; + else + max_burst = max(qp->attrs.sq_size, qp->attrs.ird); + + atomic_inc(&qp->tx_ctx.in_use); + + wait_event(qp->tx_ctx.waitq, atomic_read(&qp->tx_ctx.in_use) == 1); + + wqe = tx_wqe(qp); + BUG_ON(wqe == NULL); + +next_wqe: + /* + * Stop QP processing if SQ state changed + */ + if (unlikely(qp->tx_ctx.tx_suspend)) { + dprint(DBG_WR|DBG_TX, "(QP%d): tx suspend\n", QP_ID(qp)); + goto done; + } + tx_type = wr_type(wqe); + + dprint(DBG_WR|DBG_TX, + " QP(%d): WR type %d, state %d, data %u, sent %u, id %llu\n", + QP_ID(qp), wr_type(wqe), wqe->wr_status, wqe->bytes, + wqe->processed, (unsigned long long)wr_id(wqe)); + + if (SIW_WQE_IS_TX(wqe)) + rv = siw_qp_sq_proc_tx(qp, wqe); + else + rv = siw_qp_sq_proc_local(qp, wqe); + + if (!rv) { + /* + * WQE processing done + */ + switch (tx_type) { + + case SIW_WR_SEND: + case SIW_WR_RDMA_WRITE: + + wqe->wc_status = IB_WC_SUCCESS; + wqe->wr_status = SR_WR_DONE; + siw_wqe_sq_processed(wqe, qp); + break; + + case SIW_WR_RDMA_READ_REQ: + /* + * already enqueued to ORQ queue + */ + break; + + case SIW_WR_RDMA_READ_RESP: + /* + * silently recyclye wqe + */ + /* XXX DEBUG AID, please remove */ + wqe->wr_status = SR_WR_DONE; + siw_wqe_put(wqe); + break; + default: + BUG(); + } + + lock_sq_rxsave(qp, flags); + + wqe = siw_next_tx_wqe(qp); + if (!wqe) { + tx_wqe(qp) = NULL; + unlock_sq_rxsave(qp, flags); + goto done; + } + if (wr_type(wqe) == SIW_WR_RDMA_READ_REQ) { + if (ORD_SUSPEND_SQ(qp)) { + tx_wqe(qp) = NULL; + unlock_sq_rxsave(qp, flags); + dprint(DBG_WR|DBG_TX, + " QP%d PAUSE SQ: ORD limit\n", + QP_ID(qp)); + goto done; + } else { + tx_wqe(qp) = wqe; + siw_rreq_queue(wqe, qp); + } + } else { + list_del_init(&wqe->list); + tx_wqe(qp) = wqe; + } + unlock_sq_rxsave(qp, flags); + + if (--max_burst == 0) { + if (user_ctx) { + /* + * Avoid to keep the user sending from its + * context for too long (blocking user thread) + */ + siw_sq_queue_work(qp); + goto done; + } else { + /* + * Avoid to starve other QP's tx if consumer + * keeps posting new tx work for current cpu. + */ + int workq_len = + atomic_read(&get_cpu_var(siw_workq_len)); + + put_cpu_var(siw_workq_len); + + if (workq_len) { + /* Another QP's work on same WQ */ + siw_sq_queue_work(qp); + goto done; + } + } + max_burst = max(qp->attrs.sq_size, qp->attrs.ird); + } + goto next_wqe; + + } else if (rv == -EAGAIN) { + dprint(DBG_WR|DBG_TX, + "(QP%d): SQ paused: hd/tr %d of %d, data %d\n", + QP_ID(qp), qp->tx_ctx.ctrl_sent, qp->tx_ctx.ctrl_len, + qp->tx_ctx.bytes_unsent); + rv = 0; + goto done; + } else { + /* + * WQE processing failed. + * Verbs 8.3.2: + * o It turns any WQE into a signalled WQE. + * o Local catastrophic error must be surfaced + * o QP must be moved into Terminate state: done by code + * doing socket state change processing + * + * o TODO: Termination message must be sent. + * o TODO: Implement more precise work completion errors, + * see enum ib_wc_status in ib_verbs.h + */ + dprint(DBG_ON, " (QP%d): WQE type %d processing failed: %d\n", + QP_ID(qp), wr_type(wqe), rv); + + lock_sq_rxsave(qp, flags); + /* + * RREQ may have already been completed by inbound RRESP! + */ + if (tx_type == RDMAP_RDMA_READ_REQ) { + lock_orq(qp); + if (!ORQ_EMPTY(qp) && + wqe == list_entry_wqe(qp->orq.prev)) { + /* + * wqe still on the ORQ + * TODO: fix a potential race condition if the + * rx path is currently referencing the wqe(!) + */ + dprint(DBG_ON, " (QP%d): Bad RREQ in ORQ\n", + QP_ID(qp)); + list_del_init(&wqe->list); + unlock_orq(qp); + } else { + /* + * already completed by inbound RRESP + */ + dprint(DBG_ON, + " (QP%d): Bad RREQ already Completed\n", + QP_ID(qp)); + unlock_orq(qp); + tx_wqe(qp) = NULL; + unlock_sq_rxsave(qp, flags); + + goto done; + } + } + tx_wqe(qp) = NULL; + unlock_sq_rxsave(qp, flags); + /* + * immediately suspends further TX processing + */ + if (!qp->tx_ctx.tx_suspend) + siw_qp_cm_drop(qp, 0); + + switch (tx_type) { + + case SIW_WR_SEND: + case SIW_WR_RDMA_WRITE: + case SIW_WR_RDMA_READ_REQ: + wqe->wr_status = SR_WR_DONE; + wqe->wc_status = IB_WC_LOC_QP_OP_ERR; + wqe->error = rv; + wr_flags(wqe) |= IB_SEND_SIGNALED; + if (tx_type != SIW_WR_RDMA_READ_REQ) + /* + * RREQ already enqueued to ORQ queue + */ + siw_wqe_sq_processed(wqe, qp); + + siw_async_ev(qp, NULL, IB_EVENT_QP_FATAL); + + break; + + case SIW_WR_RDMA_READ_RESP: + /* + * Recyclye wqe + */ + dprint(DBG_WR|DBG_TX|DBG_ON, "(QP%d): " + "Processing RRESPONSE failed with %d\n", + QP_ID(qp), rv); + + siw_async_ev(qp, NULL, IB_EVENT_QP_REQ_ERR); + + siw_wqe_put(wqe); + break; + + default: + BUG(); + } + } +done: + atomic_dec(&qp->tx_ctx.in_use); + wake_up(&qp->tx_ctx.waitq); + + return rv; +} + +static struct workqueue_struct *siw_sq_wq; + +int __init siw_sq_worker_init(void) +{ + siw_sq_wq = create_workqueue("siw_sq_wq"); + if (!siw_sq_wq) + return -ENOMEM; + + dprint(DBG_TX|DBG_OBJ, " Init WQ\n"); + return 0; +} + + +void __exit siw_sq_worker_exit(void) +{ + dprint(DBG_TX|DBG_OBJ, " Destroy WQ\n"); + if (siw_sq_wq) { + flush_workqueue(siw_sq_wq); + destroy_workqueue(siw_sq_wq); + } +} + + +/* + * siw_sq_work_handler() + * + * Scheduled by siw_qp_llp_write_space() socket callback if socket + * send space became available again. This function resumes SQ + * processing. + */ +static void siw_sq_work_handler(struct work_struct *w) +{ + struct siw_sq_work *this_work; + struct siw_qp *qp; + int rv; + + atomic_dec(&get_cpu_var(siw_workq_len)); + put_cpu_var(siw_workq_len); + + this_work = container_of(w, struct siw_sq_work, work); + qp = container_of(this_work, struct siw_qp, sq_work); + + dprint(DBG_TX|DBG_OBJ, "(QP%d)\n", QP_ID(qp)); + + if (down_read_trylock(&qp->state_lock)) { + if (likely(qp->attrs.state == SIW_QP_STATE_RTS && + !qp->tx_ctx.tx_suspend)) { + + rv = siw_qp_sq_process(qp, 0); + up_read(&qp->state_lock); + + if (rv < 0) { + dprint(DBG_TX, "(QP%d): failed: %d\n", + QP_ID(qp), rv); + + if (!qp->tx_ctx.tx_suspend) + siw_qp_cm_drop(qp, 0); + } + } else { + dprint(DBG_ON|DBG_TX, "(QP%d): state: %d %d\n", + QP_ID(qp), qp->attrs.state, + qp->tx_ctx.tx_suspend); + up_read(&qp->state_lock); + } + } else { + dprint(DBG_ON|DBG_TX, "(QP%d): QP locked\n", QP_ID(qp)); + } + siw_qp_put(qp); +} + + +int siw_sq_queue_work(struct siw_qp *qp) +{ + int cpu, rv; + + dprint(DBG_TX|DBG_OBJ, "(QP%d)\n", QP_ID(qp)); + + siw_qp_get(qp); + + INIT_WORK(&qp->sq_work.work, siw_sq_work_handler); + + cpu = get_cpu(); + + if (in_softirq()) { + if (cpu == qp->cpu) { + /* + * Try not to use the current CPU for tx traffic. + */ + for_each_online_cpu(cpu) { + if (cpu != qp->cpu) + break; + } + } else + cpu = qp->cpu; + } + atomic_inc(&per_cpu(siw_workq_len, cpu)); + rv = queue_work_on(cpu, siw_sq_wq, &qp->sq_work.work); + /* + * Remember CPU: Avoid spreading SQ work of QP over WQ's + */ + qp->cpu = cpu; + + put_cpu(); + + return rv; +}