From patchwork Wed Mar 30 08:35:50 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Zhang Chen X-Patchwork-Id: 603249 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 3qZgy8409Pz9sBg for ; Wed, 30 Mar 2016 19:37:52 +1100 (AEDT) Received: from localhost ([::1]:52441 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1alBdO-0007aj-Rf for incoming@patchwork.ozlabs.org; Wed, 30 Mar 2016 04:37:50 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:58965) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1alBbx-0004wL-Io for qemu-devel@nongnu.org; Wed, 30 Mar 2016 04:36:22 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1alBbt-0006TH-DK for qemu-devel@nongnu.org; Wed, 30 Mar 2016 04:36:21 -0400 Received: from [59.151.112.132] (port=30171 helo=heian.cn.fujitsu.com) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1alBbo-0006QS-H6 for qemu-devel@nongnu.org; Wed, 30 Mar 2016 04:36:17 -0400 X-IronPort-AV: E=Sophos;i="5.22,518,1449504000"; d="scan'208";a="5097724" Received: from unknown (HELO cn.fujitsu.com) ([10.167.33.5]) by heian.cn.fujitsu.com with ESMTP; 30 Mar 2016 16:35:37 +0800 Received: from G08CNEXCHPEKD02.g08.fujitsu.local (unknown [10.167.33.83]) by cn.fujitsu.com (Postfix) with ESMTP id EAFD6408D262; Wed, 30 Mar 2016 16:35:36 +0800 (CST) Received: from G08FNSTD140215.g08.fujitsu.local (10.167.226.56) by G08CNEXCHPEKD02.g08.fujitsu.local (10.167.33.89) with Microsoft SMTP Server (TLS) id 14.3.279.2; Wed, 30 Mar 2016 16:35:36 +0800 From: Zhang Chen To: qemu devel , Jason Wang Date: Wed, 30 Mar 2016 16:35:50 +0800 Message-ID: <1459326950-17708-4-git-send-email-zhangchen.fnst@cn.fujitsu.com> X-Mailer: git-send-email 1.9.1 In-Reply-To: <1459326950-17708-1-git-send-email-zhangchen.fnst@cn.fujitsu.com> References: <1459326950-17708-1-git-send-email-zhangchen.fnst@cn.fujitsu.com> MIME-Version: 1.0 X-Originating-IP: [10.167.226.56] X-yoursite-MailScanner-ID: EAFD6408D262.AE4DC X-yoursite-MailScanner: Found to be clean X-yoursite-MailScanner-From: zhangchen.fnst@cn.fujitsu.com X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 59.151.112.132 Cc: Li Zhijian , Gui jianfeng , "eddie.dong" , zhanghailiang , "Dr. David Alan Gilbert" , Zhang Chen , Yang Hongyang Subject: [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org if packets are same, we send primary packet and drop secondary packet, otherwise notify COLO do checkpoint. Signed-off-by: Zhang Chen Signed-off-by: Li Zhijian Signed-off-by: Wen Congyang --- net/colo-compare.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 121 insertions(+), 1 deletion(-) diff --git a/net/colo-compare.c b/net/colo-compare.c index 0bb5a51..1debc0e 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -36,6 +36,7 @@ static QTAILQ_HEAD(, CompareState) net_compares = QTAILQ_HEAD_INITIALIZER(net_compares); static ssize_t hashtable_max_size; +static int colo_need_checkpoint; typedef struct ReadState { int state; /* 0 = getting length, 1 = getting data */ @@ -91,6 +92,13 @@ typedef struct CompareState { GQueue unprocessed_connections; /* proxy current hash size */ ssize_t hashtable_size; + + /* notify compare thread */ + QemuEvent event; + /* compare thread, a thread for each NIC */ + QemuThread thread; + int thread_status; + } CompareState; typedef struct Packet { @@ -129,6 +137,15 @@ enum { SECONDARY_IN, }; +enum { + /* compare thread isn't started */ + COMPARE_THREAD_NONE, + /* compare thread is running */ + COMPARE_THREAD_RUNNING, + /* compare thread exit */ + COMPARE_THREAD_EXIT, +}; + static void packet_destroy(void *opaque, void *user_data); static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size); @@ -340,6 +357,88 @@ static inline void colo_flush_connection(void *opaque, void *user_data) qemu_mutex_unlock(&conn->list_lock); } +static void colo_notify_checkpoint(void) +{ + colo_need_checkpoint = true; +} + +/* TODO colo_do_checkpoint() { + * we flush the connections and reset 'colo_need_checkpoint' + * } + */ + +static inline void colo_dump_packet(Packet *pkt) +{ + int i; + for (i = 0; i < pkt->size; i++) { + printf("%02x ", ((uint8_t *)pkt->data)[i]); + } + printf("\n"); +} + +/* + * The IP packets sent by primary and secondary + * will be compared in here + * TODO support ip fragment, Out-Of-Order + * return: 0 means packet same + * > 0 || < 0 means packet different + */ +static int colo_packet_compare(Packet *ppkt, Packet *spkt) +{ + colo_dump_packet(ppkt); + colo_dump_packet(spkt); + + if (ppkt->size == spkt->size) { + return memcmp(ppkt->data, spkt->data, spkt->size); + } else { + return -1; + } +} + +static void colo_compare_connection(void *opaque, void *user_data) +{ + Connection *conn = opaque; + Packet *pkt = NULL; + GList *result = NULL; + int ret; + + qemu_mutex_lock(&conn->list_lock); + while (!g_queue_is_empty(&conn->primary_list) && + !g_queue_is_empty(&conn->secondary_list)) { + pkt = g_queue_pop_head(&conn->primary_list); + result = g_queue_find_custom(&conn->secondary_list, + pkt, (GCompareFunc)colo_packet_compare); + + if (result) { + ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size); + if (ret < 0) { + error_report("colo_send_primary_packet failed"); + } + g_queue_remove(&conn->secondary_list, result); + } else { + g_queue_push_head(&conn->primary_list, pkt); + colo_notify_checkpoint(); + break; + } + } + qemu_mutex_unlock(&conn->list_lock); +} + +static void *colo_compare_thread(void *opaque) +{ + CompareState *s = opaque; + + while (s->thread_status == COMPARE_THREAD_RUNNING) { + qemu_event_wait(&s->event); + qemu_event_reset(&s->event); + qemu_mutex_lock(&s->conn_list_lock); + g_queue_foreach(&s->conn_list, colo_compare_connection, NULL); + qemu_mutex_unlock(&s->conn_list_lock); + } + + return NULL; +} + static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size) { int ret = 0; @@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) if (ret == 1) { if (packet_enqueue(s, PRIMARY_IN)) { error_report("primary: unsupported packet in"); - compare_chr_send(s->chr_out, buf, size); + compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len); + } else { + qemu_event_set(&s->event); } } else if (ret == -1) { qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL); @@ -449,6 +550,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) if (ret == 1) { if (packet_enqueue(s, SECONDARY_IN)) { error_report("secondary: unsupported packet in"); + } else { + qemu_event_set(&s->event); } } else if (ret == -1) { qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL); @@ -504,6 +607,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) { CompareState *s = COLO_COMPARE(uc); struct sysinfo si; + char thread_name[64]; + static int compare_id; if (!s->pri_indev || !s->sec_indev || !s->outdev) { error_setg(errp, "colo compare needs 'primary_in' ," @@ -552,6 +657,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) g_queue_init(&s->conn_list); qemu_mutex_init(&s->conn_list_lock); + colo_need_checkpoint = false; s->hashtable_size = 0; /* * Idea from kernel tcp.c: use 1/16384 of memory. On i386: 32MB @@ -572,6 +678,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) g_free, connection_destroy); + s->thread_status = COMPARE_THREAD_RUNNING; + sprintf(thread_name, "proxy compare %d", compare_id); + qemu_thread_create(&s->thread, thread_name, + colo_compare_thread, s, + QEMU_THREAD_JOINABLE); + compare_id++; + return; out: @@ -615,6 +728,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data) QTAILQ_REMOVE(&net_compares, s, next); } qemu_mutex_destroy(&s->conn_list_lock); + + if (s->thread.thread) { + s->thread_status = COMPARE_THREAD_EXIT; + qemu_event_set(&s->event); + qemu_thread_join(&s->thread); + } + qemu_event_destroy(&s->event); } static void colo_compare_init(Object *obj)