@@ -201,6 +201,11 @@ struct dpif_handler {
struct nl_sock *sock; /* Each handler thread holds one netlink
socket. */
+ /* The receive handler thread deals with both normal and offload receive
+ * upcalls. To avoid starvation, the below flag is used to alternate the
+ * processing order. */
+ bool recv_offload_first;
+
#ifdef _WIN32
/* Pool of sockets. */
struct dpif_windows_vport_sock *vport_sock_pool;
@@ -3010,7 +3015,6 @@ dpif_netlink_recv_windows(struct dpif_netlink *dpif, uint32_t handler_id,
static int
dpif_netlink_recv_cpu_dispatch(struct dpif_netlink *dpif, uint32_t handler_id,
struct dpif_upcall *upcall, struct ofpbuf *buf)
- OVS_REQ_RDLOCK(dpif->upcall_lock)
{
struct dpif_handler *handler;
int read_tries = 0;
@@ -3061,7 +3065,6 @@ dpif_netlink_recv_vport_dispatch(struct dpif_netlink *dpif,
uint32_t handler_id,
struct dpif_upcall *upcall,
struct ofpbuf *buf)
- OVS_REQ_RDLOCK(dpif->upcall_lock)
{
struct dpif_handler *handler;
int read_tries = 0;
@@ -3135,13 +3138,12 @@ dpif_netlink_recv_vport_dispatch(struct dpif_netlink *dpif,
#endif
static int
-dpif_netlink_recv(struct dpif *dpif_, uint32_t handler_id,
- struct dpif_upcall *upcall, struct ofpbuf *buf)
+dpif_netlink_recv__(struct dpif_netlink *dpif, uint32_t handler_id,
+ struct dpif_upcall *upcall, struct ofpbuf *buf)
+ OVS_REQ_RDLOCK(dpif->upcall_lock)
{
- struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
int error;
- fat_rwlock_rdlock(&dpif->upcall_lock);
#ifdef _WIN32
error = dpif_netlink_recv_windows(dpif, handler_id, upcall, buf);
#else
@@ -3152,6 +3154,32 @@ dpif_netlink_recv(struct dpif *dpif_, uint32_t handler_id,
handler_id, upcall, buf);
}
#endif
+
+ return error;
+}
+
+static int
+dpif_netlink_recv(struct dpif *dpif_, uint32_t handler_id,
+ struct dpif_upcall *upcall, struct ofpbuf *buf)
+{
+ struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
+ struct dpif_handler *handler;
+ int error;
+
+ fat_rwlock_rdlock(&dpif->upcall_lock);
+ handler = &dpif->handlers[handler_id];
+ if (handler->recv_offload_first) {
+ error = netdev_offload_recv(upcall, buf, handler_id);
+ if (error == EAGAIN) {
+ error = dpif_netlink_recv__(dpif, handler_id, upcall, buf);
+ }
+ } else {
+ error = dpif_netlink_recv__(dpif, handler_id, upcall, buf);
+ if (error == EAGAIN) {
+ error = netdev_offload_recv(upcall, buf, handler_id);
+ }
+ }
+ handler->recv_offload_first = !handler->recv_offload_first;
fat_rwlock_unlock(&dpif->upcall_lock);
return error;
@@ -3217,6 +3245,7 @@ dpif_netlink_recv_wait(struct dpif *dpif_, uint32_t handler_id)
dpif_netlink_recv_wait_vport_dispatch(dpif, handler_id);
}
#endif
+ netdev_offload_recv_wait(handler_id);
fat_rwlock_unlock(&dpif->upcall_lock);
}
@@ -855,10 +855,17 @@ recv_upcalls(struct handler *handler)
break;
}
- upcall->fitness = odp_flow_key_to_flow(dupcall->key, dupcall->key_len,
- flow, NULL);
- if (upcall->fitness == ODP_FIT_ERROR) {
- goto free_dupcall;
+ /* If key and key_len are available, use them to construct flow.
+ * Otherwise, use upcall->flow. */
+ if (dupcall->key && dupcall->key_len) {
+ upcall->fitness = odp_flow_key_to_flow(dupcall->key,
+ dupcall->key_len,
+ flow, NULL);
+ if (upcall->fitness == ODP_FIT_ERROR) {
+ goto free_dupcall;
+ }
+ } else {
+ flow = &dupcall->flow;
}
if (dupcall->mru) {