diff mbox series

[RFC,bpf-next,5/7] net: add busy-poll support for XDP sockets

Message ID 1556786363-28743-6-git-send-email-magnus.karlsson@intel.com
State RFC
Delegated to: BPF Maintainers
Headers show
Series busy poll support for AF_XDP sockets | expand

Commit Message

Magnus Karlsson May 2, 2019, 8:39 a.m. UTC
This patch adds busy-poll support for XDP sockets (AF_XDP). With
busy-poll, the driver is executed in process context by calling the
poll() syscall. The main advantage with this is that all processing
occurs on a single core. This eliminates the core-to-core cache
transfers that occur between the application and the softirqd
processing on another core, that occurs without busy-poll. From a
systems point of view, it also provides an advatage that we do not
have to provision extra cores in the system to handle
ksoftirqd/softirq processing, as all processing is done on the single
core that executes the application. The drawback of busy-poll is that
max throughput seen from a single application will be lower (due to
the syscall), but on a per core basis it will often be higher as the
normal mode runs on two cores and busy-poll on a single one.

The semantics of busy-poll from the application point of view are the
following:

* The application is required to call poll() to drive rx and tx
  processing. There is no guarantee that softirq and interrupts will
  do this for you.

* It should be enabled on a per socket basis. No global enablement, i.e.
  the XDP socket busy-poll will not care about the current
  /proc/sys/net/core/busy_poll and busy_read global enablement
  mechanisms.

* The batch size (how many packets that are processed every time the
  napi function in the driver is called, i.e. the weight parameter)
  should be configurable. Currently, the busy-poll size of AF_INET
  sockets is set to 8, but for AF_XDP sockets this is too small as the
  amount of processing per packet is much smaller with AF_XDP. This
  should be configurable on a per socket basis.

* If you put multiple AF_XDP busy-poll enabled sockets into a poll()
  call the napi contexts of all of them should be executed. This is in
  contrast to the AF_INET busy-poll that quits after the fist one that
  finds any packets. We need all napi contexts to be executed due to
  the first requirement in this list. The behaviour we want is much more
  like regular sockets in that all of them are checked in the poll
  call.

* Should be possible to mix AF_XDP busy-poll sockets with any other
  sockets including busy-poll AF_INET ones in a single poll() call
  without any change to semantics or the behaviour of any of those
  socket types.

* As suggested by Maxim Mikityanskiy, poll() will in the busy-poll
  mode return POLLERR if the fill ring is empty or the completion
  queue is full.

Busy-poll support is enabled by calling a new setsockopt called
XDP_BUSY_POLL_BATCH_SIZE that takes batch size as an argument. A value
between 1 and NAPI_WEIGHT (64) will turn it on, 0 will turn it off and
any other value will return an error.

A typical packet processing rxdrop loop with busy-poll will look something
like this:

for (i = 0; i < num_socks; i++) {
    fds[i].fd = xsk_socket__fd(xsks[i]->xsk);
    fds[i].events = POLLIN;
}

for (;;) {
    ret = poll(fds, num_socks, 0);
    if (ret <= 0)
            continue;

    for (i = 0; i < num_socks; i++)
        rx_drop(xsks[i], fds); /* The actual application */
}

Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
---
 include/net/xdp_sock.h |   3 ++
 net/xdp/Kconfig        |   1 +
 net/xdp/xsk.c          | 122 ++++++++++++++++++++++++++++++++++++++++++++++++-
 net/xdp/xsk_queue.h    |  18 ++++++--
 4 files changed, 138 insertions(+), 6 deletions(-)

Comments

Samudrala, Sridhar May 3, 2019, 12:23 a.m. UTC | #1
On 5/2/2019 1:39 AM, Magnus Karlsson wrote:
> This patch adds busy-poll support for XDP sockets (AF_XDP). With
> busy-poll, the driver is executed in process context by calling the
> poll() syscall. The main advantage with this is that all processing
> occurs on a single core. This eliminates the core-to-core cache
> transfers that occur between the application and the softirqd
> processing on another core, that occurs without busy-poll. From a
> systems point of view, it also provides an advatage that we do not
> have to provision extra cores in the system to handle
> ksoftirqd/softirq processing, as all processing is done on the single
> core that executes the application. The drawback of busy-poll is that
> max throughput seen from a single application will be lower (due to
> the syscall), but on a per core basis it will often be higher as the
> normal mode runs on two cores and busy-poll on a single one.
> 
> The semantics of busy-poll from the application point of view are the
> following:
> 
> * The application is required to call poll() to drive rx and tx
>    processing. There is no guarantee that softirq and interrupts will
>    do this for you.
> 
> * It should be enabled on a per socket basis. No global enablement, i.e.
>    the XDP socket busy-poll will not care about the current
>    /proc/sys/net/core/busy_poll and busy_read global enablement
>    mechanisms.
> 
> * The batch size (how many packets that are processed every time the
>    napi function in the driver is called, i.e. the weight parameter)
>    should be configurable. Currently, the busy-poll size of AF_INET
>    sockets is set to 8, but for AF_XDP sockets this is too small as the
>    amount of processing per packet is much smaller with AF_XDP. This
>    should be configurable on a per socket basis.
> 
> * If you put multiple AF_XDP busy-poll enabled sockets into a poll()
>    call the napi contexts of all of them should be executed. This is in
>    contrast to the AF_INET busy-poll that quits after the fist one that
>    finds any packets. We need all napi contexts to be executed due to
>    the first requirement in this list. The behaviour we want is much more
>    like regular sockets in that all of them are checked in the poll
>    call.
> 
> * Should be possible to mix AF_XDP busy-poll sockets with any other
>    sockets including busy-poll AF_INET ones in a single poll() call
>    without any change to semantics or the behaviour of any of those
>    socket types.
> 
> * As suggested by Maxim Mikityanskiy, poll() will in the busy-poll
>    mode return POLLERR if the fill ring is empty or the completion
>    queue is full.
> 
> Busy-poll support is enabled by calling a new setsockopt called
> XDP_BUSY_POLL_BATCH_SIZE that takes batch size as an argument. A value
> between 1 and NAPI_WEIGHT (64) will turn it on, 0 will turn it off and
> any other value will return an error.
> 
> A typical packet processing rxdrop loop with busy-poll will look something
> like this:
> 
> for (i = 0; i < num_socks; i++) {
>      fds[i].fd = xsk_socket__fd(xsks[i]->xsk);
>      fds[i].events = POLLIN;
> }
> 
> for (;;) {
>      ret = poll(fds, num_socks, 0);
>      if (ret <= 0)
>              continue;
> 
>      for (i = 0; i < num_socks; i++)
>          rx_drop(xsks[i], fds); /* The actual application */
> }
> 
> Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
> ---
>   include/net/xdp_sock.h |   3 ++
>   net/xdp/Kconfig        |   1 +
>   net/xdp/xsk.c          | 122 ++++++++++++++++++++++++++++++++++++++++++++++++-
>   net/xdp/xsk_queue.h    |  18 ++++++--
>   4 files changed, 138 insertions(+), 6 deletions(-)
> 
> diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
> index d074b6d..2e956b37 100644
> --- a/include/net/xdp_sock.h
> +++ b/include/net/xdp_sock.h
> @@ -57,7 +57,10 @@ struct xdp_sock {
>   	struct net_device *dev;
>   	struct xdp_umem *umem;
>   	struct list_head flush_node;
> +	unsigned int napi_id_rx;
> +	unsigned int napi_id_tx;
>   	u16 queue_id;
> +	u16 bp_batch_size;
>   	struct xsk_queue *tx ____cacheline_aligned_in_smp;
>   	struct list_head list;
>   	bool zc;
> diff --git a/net/xdp/Kconfig b/net/xdp/Kconfig
> index 0255b33..219baaa 100644
> --- a/net/xdp/Kconfig
> +++ b/net/xdp/Kconfig
> @@ -1,6 +1,7 @@
>   config XDP_SOCKETS
>   	bool "XDP sockets"
>   	depends on BPF_SYSCALL
> +	select NET_RX_BUSY_POLL
>   	default n
>   	help
>   	  XDP sockets allows a channel between XDP programs and
> diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
> index a14e886..bd3d0fe 100644
> --- a/net/xdp/xsk.c
> +++ b/net/xdp/xsk.c
> @@ -22,6 +22,7 @@
>   #include <linux/net.h>
>   #include <linux/netdevice.h>
>   #include <linux/rculist.h>
> +#include <net/busy_poll.h>
>   #include <net/xdp_sock.h>
>   #include <net/xdp.h>
>   
> @@ -302,16 +303,107 @@ static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
>   	return (xs->zc) ? xsk_zc_xmit(sk) : xsk_generic_xmit(sk, m, total_len);
>   }
>   
> +static unsigned int xsk_check_rx_poll_err(struct xdp_sock *xs)
> +{
> +	return xskq_consumer_empty(xs->umem->fq) ? POLLERR : 0;
> +}
> +
> +static unsigned int xsk_check_tx_poll_err(struct xdp_sock *xs)
> +{
> +	return xskq_producer_full(xs->umem->cq) ? POLLERR : 0;
> +}
> +
> +static bool xsk_busy_loop_end(void *p, unsigned long start_time)
> +{
> +	return true;

This function should be updated to return TRUE on busy poll timeout OR 
it should check for xskq_full_desc on TX and xskq_empty_desc on RX

> +}
> +
diff mbox series

Patch

diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
index d074b6d..2e956b37 100644
--- a/include/net/xdp_sock.h
+++ b/include/net/xdp_sock.h
@@ -57,7 +57,10 @@  struct xdp_sock {
 	struct net_device *dev;
 	struct xdp_umem *umem;
 	struct list_head flush_node;
+	unsigned int napi_id_rx;
+	unsigned int napi_id_tx;
 	u16 queue_id;
+	u16 bp_batch_size;
 	struct xsk_queue *tx ____cacheline_aligned_in_smp;
 	struct list_head list;
 	bool zc;
diff --git a/net/xdp/Kconfig b/net/xdp/Kconfig
index 0255b33..219baaa 100644
--- a/net/xdp/Kconfig
+++ b/net/xdp/Kconfig
@@ -1,6 +1,7 @@ 
 config XDP_SOCKETS
 	bool "XDP sockets"
 	depends on BPF_SYSCALL
+	select NET_RX_BUSY_POLL
 	default n
 	help
 	  XDP sockets allows a channel between XDP programs and
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index a14e886..bd3d0fe 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -22,6 +22,7 @@ 
 #include <linux/net.h>
 #include <linux/netdevice.h>
 #include <linux/rculist.h>
+#include <net/busy_poll.h>
 #include <net/xdp_sock.h>
 #include <net/xdp.h>
 
@@ -302,16 +303,107 @@  static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
 	return (xs->zc) ? xsk_zc_xmit(sk) : xsk_generic_xmit(sk, m, total_len);
 }
 
+static unsigned int xsk_check_rx_poll_err(struct xdp_sock *xs)
+{
+	return xskq_consumer_empty(xs->umem->fq) ? POLLERR : 0;
+}
+
+static unsigned int xsk_check_tx_poll_err(struct xdp_sock *xs)
+{
+	return xskq_producer_full(xs->umem->cq) ? POLLERR : 0;
+}
+
+static bool xsk_busy_loop_end(void *p, unsigned long start_time)
+{
+	return true;
+}
+
+static unsigned int xsk_get_napi_id_rx(struct xdp_sock *xs)
+{
+	if (xs->napi_id_rx)
+		return xs->napi_id_rx;
+	if (xs->dev->_rx[xs->queue_id].xdp_rxq.napi_id) {
+		xs->napi_id_rx = xdp_rxq_info_get(xs->dev,
+						  xs->queue_id)->napi_id;
+		return xs->napi_id_rx;
+	}
+
+	WARN_ON_ONCE(true);
+	return 0;
+}
+
+static unsigned int xsk_get_napi_id_tx(struct xdp_sock *xs)
+{
+	if (xs->napi_id_tx)
+		return xs->napi_id_tx;
+	if (xs->dev->_tx[xs->queue_id].xdp_txq.napi_id) {
+		xs->napi_id_tx = xdp_txq_info_get(xs->dev,
+						  xs->queue_id)->napi_id;
+		return xs->napi_id_tx;
+	}
+
+	WARN_ON_ONCE(true);
+	return 0;
+}
+
+static void xsk_exec_poll_generic(struct sock *sk, struct xdp_sock *xs,
+				  __poll_t events)
+{
+	if (events & (POLLIN | POLLRDNORM))
+		/* NAPI id filled in by the generic XDP code */
+		napi_busy_loop(xsk_get_napi_id_rx(xs), xsk_busy_loop_end, NULL,
+			       xs->bp_batch_size);
+	if (events & (POLLOUT | POLLWRNORM))
+		/* Use the regular send path as we do not have any
+		 * NAPI id for the Tx path. It is only in the driver
+		 * and not communicated upwards in the skb case.
+		 */
+		xsk_generic_xmit(sk, NULL, 0);
+}
+
+static void xsk_exec_poll_zc(struct xdp_sock *xs, __poll_t events)
+{
+	unsigned int napi_id_rx = xsk_get_napi_id_rx(xs);
+	unsigned int napi_id_tx = xsk_get_napi_id_tx(xs);
+
+	if (events & (POLLIN | POLLRDNORM))
+		napi_busy_loop(xs->napi_id_rx, xsk_busy_loop_end, NULL,
+			       xs->bp_batch_size);
+	if (napi_id_rx != napi_id_tx)
+		if (events & (POLLOUT | POLLWRNORM))
+			/* Tx has its own napi so we need to call it too */
+			napi_busy_loop(xs->napi_id_tx, xsk_busy_loop_end, NULL,
+				       xs->bp_batch_size);
+}
+
 static unsigned int xsk_poll(struct file *file, struct socket *sock,
 			     struct poll_table_struct *wait)
 {
 	unsigned int mask = datagram_poll(file, sock, wait);
 	struct sock *sk = sock->sk;
 	struct xdp_sock *xs = xdp_sk(sk);
+	__poll_t events = poll_requested_events(wait);
+
+	if (xs->bp_batch_size) {
+		if (xs->zc)
+			xsk_exec_poll_zc(xs, events);
+		else
+			xsk_exec_poll_generic(sk, xs, events);
+
+		if (events & (POLLIN | POLLRDNORM))
+			mask |= xsk_check_rx_poll_err(xs);
+		if (events & (POLLOUT | POLLWRNORM))
+			mask |= xsk_check_tx_poll_err(xs);
+
+		/* Clear the busy_loop flag so that any further fds in
+		 * the pollfd struct will have their napis scheduled.
+		 */
+		mask &= ~POLL_BUSY_LOOP;
+	}
 
-	if (xs->rx && !xskq_empty_desc(xs->rx))
+	if (xs->rx && !xskq_producer_empty(xs->rx))
 		mask |= POLLIN | POLLRDNORM;
-	if (xs->tx && !xskq_full_desc(xs->tx))
+	if (xs->tx && !xskq_consumer_full(xs->tx))
 		mask |= POLLOUT | POLLWRNORM;
 
 	return mask;
@@ -572,6 +664,21 @@  static int xsk_setsockopt(struct socket *sock, int level, int optname,
 		mutex_unlock(&xs->mutex);
 		return err;
 	}
+	case XDP_BUSY_POLL_BATCH_SIZE:
+	{
+		u16 batch_size;
+
+		if (copy_from_user(&batch_size, optval, sizeof(batch_size)))
+			return -EFAULT;
+
+		if (batch_size == 0 || batch_size > NAPI_POLL_WEIGHT)
+			return -EINVAL;
+
+		mutex_lock(&xs->mutex);
+		xs->bp_batch_size = batch_size;
+		mutex_unlock(&xs->mutex);
+		return 0;
+	}
 	default:
 		break;
 	}
@@ -644,6 +751,17 @@  static int xsk_getsockopt(struct socket *sock, int level, int optname,
 
 		return 0;
 	}
+	case XDP_BUSY_POLL_BATCH_SIZE:
+		if (len < sizeof(xs->bp_batch_size))
+			return -EINVAL;
+
+		if (copy_to_user(optval, &xs->bp_batch_size,
+				 sizeof(xs->bp_batch_size)))
+			return -EFAULT;
+		if (put_user(sizeof(xs->bp_batch_size), optlen))
+			return -EFAULT;
+
+		return 0;
 	default:
 		break;
 	}
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 88b9ae2..ebbd996 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -292,14 +292,24 @@  static inline void xskq_produce_flush_desc(struct xsk_queue *q)
 	WRITE_ONCE(q->ring->producer, q->prod_tail);
 }
 
-static inline bool xskq_full_desc(struct xsk_queue *q)
+static inline bool xskq_consumer_full(struct xsk_queue *q)
 {
-	return xskq_nb_avail(q, q->nentries) == q->nentries;
+	return READ_ONCE(q->ring->producer) - q->cons_tail == q->nentries;
 }
 
-static inline bool xskq_empty_desc(struct xsk_queue *q)
+static inline bool xskq_producer_empty(struct xsk_queue *q)
 {
-	return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
+	return READ_ONCE(q->ring->consumer) == q->prod_tail;
+}
+
+static inline bool xskq_consumer_empty(struct xsk_queue *q)
+{
+	return READ_ONCE(q->ring->producer) == q->cons_tail;
+}
+
+static inline bool xskq_producer_full(struct xsk_queue *q)
+{
+	return q->prod_tail - READ_ONCE(q->ring->consumer) == q->nentries;
 }
 
 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);