@@ -172,6 +172,73 @@ bool colo_proxy_query_checkpoint(void)
return colo_do_checkpoint;
}
+static ssize_t colo_proxy_enqueue_primary_packet(NetFilterState *nf,
+ NetClientState *sender,
+ unsigned flags,
+ const struct iovec *iov,
+ int iovcnt,
+ NetPacketSent *sent_cb)
+{
+ /*
+ * 1. parse packet, try to get connection factor
+ * (src_ip, src_port, dest_ip, dest_port)
+ * 2. enqueue the packet to primary_packet_list by connection
+ */
+ COLOProxyState *s = FILTER_COLO_PROXY(nf);
+ ssize_t size = iov_size(iov, iovcnt);
+ char *buf = g_malloc0(size); /* free by packet destory */
+ ConnectionKey key = {{ 0 } };
+ Packet *pkt;
+ Connection *conn;
+
+ iov_to_buf(iov, iovcnt, 0, buf, size);
+ pkt = packet_new(s, buf, size, &key, sender);
+ if (!pkt) {
+ return 0;
+ }
+
+ conn = colo_proxy_get_conn(s, &key);
+ if (!conn->processing) {
+ g_queue_push_tail(&s->conn_list, conn);
+ conn->processing = true;
+ }
+
+ g_queue_push_tail(&conn->primary_list, pkt);
+ qemu_event_set(&s->need_compare_ev);
+ return 1;
+}
+
+static ssize_t
+colo_proxy_enqueue_secondary_packet(NetFilterState *nf,
+ char *buf, int len)
+{
+ /*
+ * 1, parse packet, try to get connection factor
+ * (src_ip, src_port, dest_ip, dest_port)
+ * 2. enqueue the packet to secondary_packet_list by connection
+ */
+ COLOProxyState *s = FILTER_COLO_PROXY(nf);
+ Connection *conn;
+ ConnectionKey key = {{ 0 } };
+ Packet *pkt = packet_new(s, buf, len, &key, NULL);
+
+ if (!pkt) {
+ error_report("%s paket_new failed", __func__);
+ return -1;
+ }
+
+ conn = colo_proxy_get_conn(s, &key);
+ if (!conn->processing) {
+ g_queue_push_tail(&s->conn_list, conn);
+ conn->processing = true;
+ }
+
+ /* In primary notify compare thead */
+ g_queue_push_tail(&conn->secondary_list, pkt);
+ qemu_event_set(&s->need_compare_ev);
+ return 0;
+}
+
/*
* send a packet to peer
* >=0: success
@@ -235,6 +302,75 @@ static void colo_proxy_sock_receive(void *opaque)
}
}
+/*
+ * colo primary handle host's normal send and
+ * recv packets to primary guest
+ * return: >= 0 success
+ * < 0 failed
+ */
+static ssize_t colo_proxy_primary_handler(NetFilterState *nf,
+ NetClientState *sender,
+ unsigned flags,
+ const struct iovec *iov,
+ int iovcnt,
+ NetPacketSent *sent_cb)
+{
+ ssize_t ret = 0;
+
+ /*
+ * if packet's direction=rx
+ * enqueue packets to primary queue
+ * and wait secondary queue to compare
+ * if packet's direction=tx
+ * enqueue packets then send packets to
+ * secondary and flush queued packets
+ */
+ if (sender == nf->netdev) {
+ /* This packet is sent by netdev itself */
+ ret = colo_proxy_sock_send(nf, iov, iovcnt);
+ if (ret > 0) {
+ ret = 0;
+ }
+ } else {
+ ret = colo_proxy_enqueue_primary_packet(nf, sender, flags, iov,
+ iovcnt, sent_cb);
+ }
+
+ return ret;
+}
+
+/*
+ * colo secondary handle host's normal send and
+ * recv packets to secondary guest
+ * return: >= 0 success
+ * < 0 failed
+ */
+static ssize_t colo_proxy_secondary_handler(NetFilterState *nf,
+ NetClientState *sender,
+ unsigned flags,
+ const struct iovec *iov,
+ int iovcnt,
+ NetPacketSent *sent_cb)
+{
+ ssize_t ret = 0;
+
+ /*
+ * if packet's direction=rx
+ * enqueue packets and send to
+ * primary QEMU
+ * if packet's direction=tx
+ * record PVM's packet inital seq & adjust
+ * client's ack,send adjusted packets to SVM(next version will be do)
+ */
+ if (sender == nf->netdev) {
+ /* This packet is sent by netdev itself */
+ } else {
+ ret = colo_proxy_sock_send(nf, iov, iovcnt);
+ }
+
+ return ret;
+}
+
static ssize_t colo_proxy_receive_iov(NetFilterState *nf,
NetClientState *sender,
unsigned flags,
@@ -256,9 +392,17 @@ static ssize_t colo_proxy_receive_iov(NetFilterState *nf,
}
if (s->colo_mode == COLO_MODE_PRIMARY) {
- /* colo_proxy_primary_handler */
+ ret = colo_proxy_primary_handler(nf, sender, flags,
+ iov, iovcnt, sent_cb);
+ if (ret == 0) {
+ return 0;
+ }
} else {
- /* colo_proxy_secondary_handler */
+ ret = colo_proxy_secondary_handler(nf, sender, flags,
+ iov, iovcnt, sent_cb);
+ }
+ if (ret < 0) {
+ trace_colo_proxy("colo_proxy_receive_iov running failed");
}
return iov_size(iov, iovcnt);
}