@@ -188,6 +188,77 @@ static Connection *colo_proxy_enqueue_packet(GHashTable *unprocessed_packets,
return connection;
}
+static ssize_t colo_enqueue_primary_packet(NetFilterState *nf,
+ NetClientState *sender,
+ unsigned flags,
+ const struct iovec *iov,
+ int iovcnt,
+ NetPacketSent *sent_cb)
+{
+ /*
+ * 1. parse packet, try to get connection factor
+ * (src_ip, src_port, dest_ip, dest_port)
+ * 2. enqueue the packet to primary_packet_list by connection
+ */
+ ColoProxyState *s = FILTER_COLO_PROXY(nf);
+ char *buf;
+ ssize_t size = iov_size(iov, iovcnt);
+ buf = g_malloc0(size);
+ iov_to_buf(iov, iovcnt, 0, buf, size);
+
+ Connection_key key = { 0 };
+ Packet *pkt = packet_new(s, buf, size, &key, sender);
+ Connection *connection;
+
+ if (!pkt) {
+ qemu_net_queue_send(s->incoming_queue, sender, flags,
+ (const uint8_t *)buf, size, NULL);
+ g_free(buf);
+ return 0;
+ }
+
+ connection = colo_proxy_enqueue_packet(s->unprocessed_packets, &key,
+ pkt, PRIMARY_OUTPUT);
+
+ if (!connection->processing) {
+ g_queue_push_tail(&s->unprocessed_connections, connection);
+ connection->processing = true;
+ }
+
+ if (pkt->should_be_sent) {
+ qemu_net_queue_send(s->incoming_queue, sender, flags,
+ (const uint8_t *)buf, size, NULL);
+ }
+
+ g_free(buf);
+ return 1;
+}
+
+static ssize_t colo_enqueue_secondary_packet(NetFilterState *nf,
+ char *buf, int len)
+{
+ /*
+ * 1, parse packet, try to get connection factor
+ * (src_ip, src_port, dest_ip, dest_port)
+ * 2. enqueue the packet to secondary_packet_list by connection
+ */
+ ColoProxyState *s = FILTER_COLO_PROXY(nf);
+ Connection_key key = { 0 };
+ Packet *pkt = packet_new(s, buf, len, &key, NULL);
+ Connection *connection;
+ if (!pkt) {
+ return -1;
+ }
+
+ connection = colo_proxy_enqueue_packet(s->unprocessed_packets, &key,
+ pkt, SECONDARY_OUTPUT);
+
+ if (!connection->processing) {
+ g_queue_push_tail(&s->unprocessed_connections, connection);
+ connection->processing = true;
+ }
+ return 1;
+}
/*
* Packets to be sent by colo forward to