@@ -144,6 +144,8 @@ static inline void colo_proxy_dump_packet(Packet *pkt)
printf("\n");
}
+static void packet_destroy(void *opaque, void *user_data);
+
static uint32_t connection_key_hash(const void *opaque)
{
const ConnectionKey *key = opaque;
@@ -190,6 +192,28 @@ static Connection *connection_new(ConnectionKey *key)
return conn;
}
+static void colo_send_primary_packet(void *opaque, void *user_data)
+{
+ Packet *pkt = opaque;
+ qemu_net_queue_send(pkt->s->incoming_queue, pkt->sender, 0,
+ (const uint8_t *)pkt->data, pkt->size, NULL);
+}
+
+static void colo_flush_connection(void *opaque, void *user_data)
+{
+ Connection *conn = opaque;
+ Packet *pkt = NULL;
+
+ while (!g_queue_is_empty(&conn->primary_list)) {
+ pkt = g_queue_pop_head(&conn->primary_list);
+ colo_send_primary_packet(pkt, NULL);
+ }
+ while (!g_queue_is_empty(&conn->secondary_list)) {
+ pkt = g_queue_pop_head(&conn->secondary_list);
+ packet_destroy(pkt, NULL);
+ }
+}
+
/*
* Clear hashtable, stop this hash growing really huge
*/
@@ -205,6 +229,52 @@ bool colo_proxy_query_checkpoint(void)
return colo_do_checkpoint;
}
+static int colo_proxy_primary_checkpoint(COLOProxyState *s)
+{
+ g_queue_foreach(&s->conn_list, colo_flush_connection, NULL);
+ return 0;
+}
+
+static int colo_proxy_secondary_checkpoint(COLOProxyState *s)
+{
+ return 0;
+}
+
+static void colo_proxy_checkpoint_one(NetFilterState *nf,
+ void *opaque, Error **errp)
+{
+ COLOProxyState *s;
+ int mode;
+
+ if (strcmp(object_get_typename(OBJECT(nf)), TYPE_FILTER_COLO_PROXY)) {
+ return;
+ }
+
+ s = FILTER_COLO_PROXY(nf);
+ mode = *(int *)opaque;
+ assert(s->colo_mode == mode);
+
+ if (s->colo_mode == COLO_MODE_PRIMARY) {
+ colo_proxy_primary_checkpoint(s);
+ } else {
+ /* secondary do checkpoint */
+ colo_proxy_secondary_checkpoint(s);
+ }
+}
+
+int colo_proxy_do_checkpoint(int mode)
+{
+ Error *err = NULL;
+ qemu_foreach_netfilter(colo_proxy_checkpoint_one, &mode, &err);
+ if (err) {
+ error_report("colo proxy do checkpoint failed");
+ return -1;
+ }
+
+ colo_do_checkpoint = false;
+ return 0;
+}
+
/* Return 0 on success, or return -1 if the pkt is corrupted */
static int parse_packet_early(Packet *pkt, ConnectionKey *key)
{
@@ -294,6 +364,7 @@ static Connection *colo_proxy_get_conn(COLOProxyState *s,
if (s->hashtable_size > hashtable_max_size) {
trace_colo_proxy("colo proxy connection hashtable full, clear it");
clear_connection_hashtable(s);
+ /* TODO:clear conn_list */
} else {
g_hash_table_insert(colo_conn_hash, new_key, conn);
}
@@ -751,6 +822,8 @@ void colo_proxy_stop(int mode)
static void colo_proxy_setup(NetFilterState *nf, Error **errp)
{
COLOProxyState *s = FILTER_COLO_PROXY(nf);
+ ssize_t factor = 8;
+ struct sysinfo si;
if (!s->addr) {
error_setg(errp, "filter colo_proxy needs 'addr' property set!");
@@ -768,6 +841,21 @@ static void colo_proxy_setup(NetFilterState *nf, Error **errp)
colo_do_checkpoint = false;
qemu_event_init(&s->need_compare_ev, false);
+ /*
+ * Idea from kernel tcp.c: use 1/16384 of memory. On i386: 32MB
+ * machine has 512 buckets. >= 1GB machines have 16384 buckets.
+ * default factor = 8
+ */
+ sysinfo(&si);
+ hashtable_max_size = 16384;
+ if (si.totalram > (1024 * 1024 * 1024)) {
+ hashtable_max_size = 16384;
+ }
+ if (hashtable_max_size < 32) {
+ hashtable_max_size = 32;
+ }
+
+ hashtable_max_size = hashtable_max_size * factor;
s->incoming_queue = qemu_new_net_queue(qemu_netfilter_pass_to_next, nf);
colo_conn_hash = g_hash_table_new_full(connection_key_hash,
connection_key_equal,