diff mbox

[ovs-dev,v8,2/2] dpif-netdev: Add per pmd cmap of available tx queues.

Message ID 1450872982-597-3-git-send-email-i.maximets@samsung.com
State Deferred
Headers show

Commit Message

Ilya Maximets Dec. 23, 2015, 12:16 p.m. UTC
Introduced per pmd thread hash map 'tx_queues', where will
be stored all available tx queues for that pmd thread with
port_no as a key(hash). All tx_qid-s will be unique per port.

Implemented infrastructure may be used in the future to
distribute traffic between all available tx queues.

Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---
 lib/dpif-netdev.c | 173 ++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 154 insertions(+), 19 deletions(-)
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index c300e8a..28f2667 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -372,6 +372,13 @@  struct dp_netdev_pmd_cycles {
     atomic_ullong n[PMD_N_CYCLES];
 };
 
+struct dp_netdev_pmd_txq {
+    struct cmap_node node;        /* In owning dp_netdev_pmd_thread's */
+                                  /* 'tx_queues'. */
+    struct dp_netdev_port *port;
+    int tx_qid;
+};
+
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
  * the performance overhead of interrupt processing.  Therefore netdev can
  * not implement rx-wait for these devices.  dpif-netdev needs to poll
@@ -430,8 +437,8 @@  struct dp_netdev_pmd_thread {
                                      * calculate tx_qid.*/
     unsigned core_id;               /* CPU core id of this pmd thread. */
     int numa_id;                    /* numa node id of this pmd thread. */
-    int tx_qid;                     /* Queue id used by this pmd thread to
-                                     * send packets on all netdevs */
+    struct cmap tx_queues;          /* Queue ids used by this pmd thread to
+                                     * send packets to ports */
 
     /* Only a pmd thread can write on its own 'cycles' and 'stats'.
      * The main thread keeps 'stats_zero' and 'cycles_zero' as base
@@ -473,6 +480,15 @@  static void dp_netdev_input(struct dp_netdev_pmd_thread *,
 
 static void dp_netdev_disable_upcall(struct dp_netdev *);
 void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
+                                  struct dp_netdev_port *port, int queue_id);
+static void dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
+                                  struct dp_netdev_pmd_txq *txq);
+static void dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd);
+static struct dp_netdev_pmd_txq *
+dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
+                         odp_port_t port_no);
 static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
                                     struct dp_netdev *dp, int index,
                                     unsigned core_id, int numa_id);
@@ -1054,6 +1070,7 @@  do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     struct netdev_saved_flags *sf;
     struct dp_netdev_port *port;
     struct netdev *netdev;
+    struct dp_netdev_pmd_thread *non_pmd;
     enum netdev_flags flags;
     const char *open_type;
     int error;
@@ -1130,10 +1147,16 @@  do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     ovs_refcount_init(&port->ref_cnt);
     cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
 
+    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
+    if (non_pmd) {
+        dp_netdev_pmd_add_txq(non_pmd, port, ovs_numa_get_n_cores());
+        dp_netdev_pmd_unref(non_pmd);
+    }
     if (netdev_is_pmd(netdev)) {
         dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
-        dp_netdev_reload_pmds(dp);
     }
+    dp_netdev_reload_pmds(dp);
+
     seq_change(dp->port_seq);
 
     return 0;
@@ -1318,18 +1341,33 @@  static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
 {
+    struct dp_netdev_pmd_thread *non_pmd;
+
     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
     seq_change(dp->port_seq);
+
+    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
+    if (non_pmd) {
+        /* There is only one txq for each port for non pmd thread */
+        struct dp_netdev_pmd_txq *txq;
+        txq = dp_netdev_pmd_lookup_txq(non_pmd, port->port_no);
+        if (OVS_LIKELY(txq)) {
+            dp_netdev_pmd_del_txq(non_pmd, txq);
+        }
+        dp_netdev_pmd_unref(non_pmd);
+    }
+
     if (netdev_is_pmd(port->netdev)) {
         int numa_id = netdev_get_numa_id(port->netdev);
 
         /* If there is no netdev on the numa node, deletes the pmd threads
-         * for that numa.  Else, just reloads the queues.  */
+         * for that numa. */
         if (!has_pmd_port_for_numa(dp, numa_id)) {
             dp_netdev_del_pmds_on_numa(dp, numa_id);
         }
-        dp_netdev_reload_pmds(dp);
     }
+    /* Reload queues of pmd threads. */
+    dp_netdev_reload_pmds(dp);
 
     port_unref(port);
 }
@@ -2593,6 +2631,81 @@  dpif_netdev_wait(struct dpif *dpif)
     seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
 }
 
+static void
+dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
+                      struct dp_netdev_port *port, int queue_id)
+{
+    if (port_try_ref(port)) {
+        struct dp_netdev_pmd_txq * txq = xmalloc(sizeof *txq);
+        txq->port = port;
+        txq->tx_qid = queue_id;
+        cmap_insert(&pmd->tx_queues, &txq->node,
+                        hash_port_no(port->port_no));
+    }
+}
+
+/* Configures tx_queues for non pmd thread. */
+static void
+dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd)
+{
+    struct dp_netdev_port *port;
+
+    if (!cmap_is_empty(&pmd->tx_queues)) {
+        dp_netdev_pmd_detach_tx_queues(pmd);
+    }
+    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
+        dp_netdev_pmd_add_txq(pmd, port, ovs_numa_get_n_cores());
+    }
+}
+
+static void
+dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
+                      struct dp_netdev_pmd_txq *txq)
+{
+    cmap_remove(&pmd->tx_queues, &txq->node,
+                hash_port_no(txq->port->port_no));
+    port_unref(txq->port);
+    free(txq);
+}
+
+/* Removes all queues from 'tx_queues' of pmd thread. */
+static void
+dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd)
+{
+    struct dp_netdev_pmd_txq *txq;
+
+    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
+        dp_netdev_pmd_del_txq(pmd, txq);
+    }
+}
+
+static void OVS_UNUSED
+dp_netdev_pmd_tx_queues_print(struct dp_netdev_pmd_thread *pmd)
+{
+    struct dp_netdev_pmd_txq *txq;
+
+    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
+        VLOG_INFO("Core_id: %d, Port: %s, tx_qid: %d\n",
+                   pmd->core_id, netdev_get_name(txq->port->netdev),
+                   txq->tx_qid);
+    }
+}
+
+static struct dp_netdev_pmd_txq *
+dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
+                         odp_port_t port_no)
+{
+    struct dp_netdev_pmd_txq *txq;
+
+    CMAP_FOR_EACH_WITH_HASH (txq, node, hash_port_no(port_no),
+                             &pmd->tx_queues) {
+        if (txq->port->port_no == port_no) {
+            return txq;
+        }
+    }
+    return NULL;
+}
+
 struct rxq_poll {
     struct dp_netdev_port *port;
     struct netdev_rxq *rx;
@@ -2604,18 +2717,19 @@  pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
 {
     struct rxq_poll *poll_list = *ppoll_list;
     struct dp_netdev_port *port;
-    int n_pmds_on_numa, index, i;
+    int n_pmds, n_pmds_on_numa, rx_index, i, n_txq;
 
     /* Simple scheduler for netdev rx polling. */
+    dp_netdev_pmd_detach_tx_queues(pmd);
+
     for (i = 0; i < poll_cnt; i++) {
         port_unref(poll_list[i].port);
     }
 
     poll_cnt = 0;
+    n_pmds = get_n_pmd_threads(pmd->dp);
     n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
-    index = 0;
-
-    pmd->tx_qid = pmd->global_index;
+    rx_index = 0;
 
     CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
         /* Calls port_try_ref() to prevent the main thread
@@ -2626,7 +2740,7 @@  pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
                 int i;
 
                 for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                    if ((index % n_pmds_on_numa) == pmd->index) {
+                    if ((rx_index % n_pmds_on_numa) == pmd->index) {
                         poll_list = xrealloc(poll_list,
                                         sizeof *poll_list * (poll_cnt + 1));
 
@@ -2635,9 +2749,26 @@  pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
                         poll_list[poll_cnt].rx = port->rxq[i];
                         poll_cnt++;
                     }
-                    index++;
+                    rx_index++;
                 }
+
             }
+
+            n_txq = netdev_n_txq(port->netdev);
+            /* If multi-queue isn't supported by netdev, add txq 0
+             * to that pmd thread. Otherwise, add all queues available
+             * for that thread. pmd->global_index used to prevent
+             * assigning same txq to multiple pmd threads on different
+             * NUMA sockets.*/
+            if (n_txq == 1) {
+                dp_netdev_pmd_add_txq(pmd, port, 0);
+            } else {
+                /* Last queue reserved for non pmd threads */
+                for (i = pmd->global_index; i < n_txq - 1; i += n_pmds) {
+                    dp_netdev_pmd_add_txq(pmd, port, i);
+                }
+            }
+
             /* Unrefs the port_try_ref(). */
             port_unref(port);
         }
@@ -2706,6 +2837,8 @@  reload:
         goto reload;
     }
 
+    dp_netdev_pmd_detach_tx_queues(pmd);
+
     for (i = 0; i < poll_cnt; i++) {
          port_unref(poll_list[i].port);
     }
@@ -2830,10 +2963,6 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->core_id = core_id;
     pmd->numa_id = numa_id;
 
-    if (core_id == NON_PMD_CORE_ID) {
-        pmd->tx_qid = ovs_numa_get_n_cores();
-    }
-
     ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
     atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
@@ -2842,9 +2971,11 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     ovs_mutex_init(&pmd->flow_mutex);
     dpcls_init(&pmd->cls);
     cmap_init(&pmd->flow_table);
-    /* init the 'flow_cache' since there is no
+    cmap_init(&pmd->tx_queues);
+    /* init the 'flow_cache' and 'tx_queues' since there is no
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
+        dp_netdev_configure_non_pmd_txqs(pmd);
         emc_cache_init(&pmd->flow_cache);
     }
     cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
@@ -2857,6 +2988,7 @@  dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
     dp_netdev_pmd_flow_flush(pmd);
     dpcls_destroy(&pmd->cls);
     cmap_destroy(&pmd->flow_table);
+    cmap_destroy(&pmd->tx_queues);
     ovs_mutex_destroy(&pmd->flow_mutex);
     latch_destroy(&pmd->exit_latch);
     xpthread_cond_destroy(&pmd->cond);
@@ -2873,6 +3005,7 @@  dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
      * no actual thread uninit it for NON_PMD_CORE_ID. */
     if (pmd->core_id == NON_PMD_CORE_ID) {
         emc_cache_uninit(&pmd->flow_cache);
+        dp_netdev_pmd_detach_tx_queues(pmd);
     } else {
         latch_set(&pmd->exit_latch);
         dp_netdev_reload_pmd__(pmd);
@@ -3514,13 +3647,15 @@  dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
     struct dp_netdev *dp = pmd->dp;
     int type = nl_attr_type(a);
     struct dp_netdev_port *p;
+    struct dp_netdev_pmd_txq *txq;
     int i;
 
     switch ((enum ovs_action_attr)type) {
     case OVS_ACTION_ATTR_OUTPUT:
-        p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
-        if (OVS_LIKELY(p)) {
-            netdev_send(p->netdev, pmd->tx_qid, packets, cnt, may_steal);
+        txq = dp_netdev_pmd_lookup_txq(pmd, u32_to_odp(nl_attr_get_u32(a)));
+        if (OVS_LIKELY(txq)) {
+            netdev_send(txq->port->netdev, txq->tx_qid,
+                        packets, cnt, may_steal);
             return;
         }
         break;