From patchwork Tue Nov 6 12:20:21 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 993657 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (mailfrom) smtp.mailfrom=nongnu.org (client-ip=2001:4830:134:3::11; helo=lists.gnu.org; envelope-from=qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org; receiver=) Authentication-Results: ozlabs.org; dmarc=fail (p=none dis=none) header.from=gmail.com Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (2048-bit key; unprotected) header.d=gmail.com header.i=@gmail.com header.b="Ed2frqWQ"; dkim-atps=neutral Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 42q7yY2RG3z9sBN for ; Tue, 6 Nov 2018 23:24:21 +1100 (AEDT) Received: from localhost ([::1]:40675 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0P4-00079B-Rk for incoming@patchwork.ozlabs.org; Tue, 06 Nov 2018 07:24:18 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:43318) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0LZ-0002mQ-4H for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:41 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gK0LX-0001fE-88 for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:41 -0500 Received: from mail-pf1-x444.google.com ([2607:f8b0:4864:20::444]:43086) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gK0LW-0001cT-3U for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:38 -0500 Received: by mail-pf1-x444.google.com with SMTP id g7-v6so3773271pfo.10 for ; Tue, 06 Nov 2018 04:20:37 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=8yuKyyw8d4l95Odfc54RiKR7Oe6eGv/wZcUX85zjzVA=; b=Ed2frqWQ0WCOTbULrLvVYoXHc41bwO1VJwCbNB+rA5kQlcT9Wo9de6oxqBWmjoggQw K1zZsZsZiB+T9+jt8UKO+w7vqpRqK1c35f5DCNoH6B4fRGNKG6V+frn3gyvQb7rfA8GO FU1GEtlIK6yCoU4+QUcu7q4xTLHRaO/WUNMo4P2chDrMX+qIPg6ngcV4WLSoxF+hPg6g o8dZ8q5WTySeLkQtG10a3YX5H9FPhzlZFZUhTLxr2KtsrQ5lT/EXDBBJ/xZyiOz3E/3L +imawpKFwcF/8jrhZrYXaAXuGSIH0VoqUeVWKOCGtEnDj28LfS+zZdLJiSoo0tA4fTcO 1Ltg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=8yuKyyw8d4l95Odfc54RiKR7Oe6eGv/wZcUX85zjzVA=; b=TExyphePlirQD5++iDhCd1sxRLG2GgGR3TxoIB63o82xu0mdHrIP13R38c8PwmA+WP 3jlUSZ5GlLB7cfcnxEiZQKLAXcbMLQ2ngijTnMTA1Z5EsIr63fau9QnNEKogTjwymRqW GZfzJUfvHZfAfLZPTeyzOPfdSA7Kok988cW6FzaKueb72jN4PqGzHi9rq1OIgeu8aYto hxB2vdyrY7bIszK+pYnu7mRMUS75OobjzxT+9KKWT8ZaOwaqyiXlINmfHnVddHH/hY9b esFKLfrV6iHWF9Y4xHdMwVjQwGoP/X9rwbeOQIAB1NRIUxYzaKSHlq6YLiGhryY/qeGI BUow== X-Gm-Message-State: AGRZ1gKQNLHnOc31BgbgkJ5PT1w//cW8UxOYMyhRdQ4hbrfmp2TspGtQ 49ugOZ/Yb7rI0C6/fA+dHaQ= X-Google-Smtp-Source: AJdET5ezJWyALmDe6vfgs+Wdj/dGOO5zNbcGBQeByVDaueU5LxLPjp9ooyHLH83/Rw0OVJqm41ch6Q== X-Received: by 2002:a65:4049:: with SMTP id h9mr23246694pgp.304.1541506837191; Tue, 06 Nov 2018 04:20:37 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.33 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:36 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Tue, 6 Nov 2018 20:20:21 +0800 Message-Id: <20181106122025.3487-2-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::444 Subject: [Qemu-devel] [PATCH v2 1/5] bitops: introduce change_bit_atomic X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: "Qemu-devel" From: Xiao Guangrong It will be used by threaded workqueue Signed-off-by: Xiao Guangrong --- include/qemu/bitops.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/include/qemu/bitops.h b/include/qemu/bitops.h index 3f0926cf40..c522958852 100644 --- a/include/qemu/bitops.h +++ b/include/qemu/bitops.h @@ -79,6 +79,19 @@ static inline void change_bit(long nr, unsigned long *addr) *p ^= mask; } +/** + * change_bit_atomic - Toggle a bit in memory atomically + * @nr: Bit to change + * @addr: Address to start counting from + */ +static inline void change_bit_atomic(long nr, unsigned long *addr) +{ + unsigned long mask = BIT_MASK(nr); + unsigned long *p = addr + BIT_WORD(nr); + + atomic_xor(p, mask); +} + /** * test_and_set_bit - Set a bit and return its old value * @nr: Bit to set From patchwork Tue Nov 6 12:20:22 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 993654 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (mailfrom) smtp.mailfrom=nongnu.org (client-ip=2001:4830:134:3::11; helo=lists.gnu.org; envelope-from=qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org; receiver=) Authentication-Results: ozlabs.org; dmarc=fail (p=none dis=none) header.from=gmail.com Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (2048-bit key; unprotected) header.d=gmail.com header.i=@gmail.com header.b="anuNOmWx"; dkim-atps=neutral Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 42q7vH3FRRz9sBN for ; Tue, 6 Nov 2018 23:21:31 +1100 (AEDT) Received: from localhost ([::1]:40661 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0MK-0002rY-Vi for incoming@patchwork.ozlabs.org; Tue, 06 Nov 2018 07:21:29 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:43359) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0Ld-0002pc-EA for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:47 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gK0Lb-0001kW-Fn for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:45 -0500 Received: from mail-pf1-x441.google.com ([2607:f8b0:4864:20::441]:41241) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gK0Lb-0001im-6d for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:43 -0500 Received: by mail-pf1-x441.google.com with SMTP id e22-v6so6035089pfn.8 for ; Tue, 06 Nov 2018 04:20:42 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=DCSTE3C9H/zKxT8TB1q3TndGZmpdq5RyqNFTw5hO3zg=; b=anuNOmWxfOy3f2DH/MY4mAxXWGYhrt1Tre7MniieHKC1wQ5EhlHTsT6TN6kR1Mdt8j J/HmVzUR5E0BVEiB88ffVnsGNeZWW304X6JtjWcy7F6Z7D4zQTCBG09mA6IfD2aTZYWY ayjM4ICdm/eM4xdP/jjv8TZdikjIw5rxv6pgneDaNdiOnEk4v9ZBRHlQqlj90LBnrUUS YuDxMrcPPqexNnq9CA4KDb1gxhLgc/Vil31Nh69SwkvpVfq7UnfWHkis7UwNI49EdAyU AdYzVtVArGslhsV2E/tWREeCqb4eVKhnGyV0WciM5nxbLBtmuYKixc8YcRzD89kKuLew zf7A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=DCSTE3C9H/zKxT8TB1q3TndGZmpdq5RyqNFTw5hO3zg=; b=e/Vo4wjEoXl2mJQuGiymUWiWFp43cA6gl0V5Kxqd+9mIhhBcvsPpPqwWMhlBHEwMHr ab98/BvPEBrn2G3K0XJGpXnJWAAO/OS0IrKIrBbbv8D7GADlEjmV7Jyfye804C6IHE9U 2SSkpuGGxPa6d+VypVTpQfTdkmY6al0Wo5YZCGhcvB8joAd8Qw4gYnw7LnveX0Bt1JrR UKgs9k3Tx6tpFMZLwSis61AUWU+Rqsz15fnLnqpVnA6PDKe56cP3Lp9t0rF1gwZNld6X ECZEZC1Tvsc5EXzErAdUpimlY6RokGiUaoH4v4+VGVNIcDoRqhPKw2WWCyhDhKFLeaTb vI+w== X-Gm-Message-State: AGRZ1gK9RZEonuEMy+1ezcomDNu80eGuwegN6mNda7TXxp4yK6QPqjf/ u+b1szuqhN0fx4XHd/ghC/M= X-Google-Smtp-Source: AJdET5f85IzNkvkkGdSPAhocuDKHr/YmwYSttORpWD/Pf9miGwRko+AA/0cyyfG0GbW2L9WHC8HK8g== X-Received: by 2002:a63:4d1d:: with SMTP id a29-v6mr20926506pgb.408.1541506841299; Tue, 06 Nov 2018 04:20:41 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.37 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:40 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Tue, 6 Nov 2018 20:20:22 +0800 Message-Id: <20181106122025.3487-3-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> MIME-Version: 1.0 X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::441 Subject: [Qemu-devel] [PATCH v2 2/5] util: introduce threaded workqueue X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: "Qemu-devel" From: Xiao Guangrong This modules implements the lockless and efficient threaded workqueue. Three abstracted objects are used in this module: - Request. It not only contains the data that the workqueue fetches out to finish the request but also offers the space to save the result after the workqueue handles the request. It's flowed between user and workqueue. The user fills the request data into it when it is owned by user. After it is submitted to the workqueue, the workqueue fetched data out and save the result into it after the request is handled. All the requests are pre-allocated and carefully partitioned between threads so there is no contention on the request, that make threads be parallel as much as possible. - User, i.e, the submitter It's the one fills the request and submits it to the workqueue, the result will be collected after it is handled by the work queue. The user can consecutively submit requests without waiting the previous requests been handled. It only supports one submitter, you should do serial submission by yourself if you want more, e.g, use lock on you side. - Workqueue, i.e, thread Each workqueue is represented by a running thread that fetches the request submitted by the user, do the specified work and save the result to the request. Signed-off-by: Xiao Guangrong --- include/qemu/threaded-workqueue.h | 94 ++++++++ util/Makefile.objs | 1 + util/threaded-workqueue.c | 466 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 561 insertions(+) create mode 100644 include/qemu/threaded-workqueue.h create mode 100644 util/threaded-workqueue.c diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h new file mode 100644 index 0000000000..d7eb66c8d2 --- /dev/null +++ b/include/qemu/threaded-workqueue.h @@ -0,0 +1,94 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#ifndef QEMU_THREADED_WORKQUEUE_H +#define QEMU_THREADED_WORKQUEUE_H + +#include "qemu/queue.h" +#include "qemu/thread.h" + +/* + * This modules implements the lockless and efficient threaded workqueue. + * + * Three abstracted objects are used in this module: + * - Request. + * It not only contains the data that the workqueue fetches out + * to finish the request but also offers the space to save the result + * after the workqueue handles the request. + * + * It's flowed between user and workqueue. The user fills the request + * data into it when it is owned by user. After it is submitted to the + * workqueue, the workqueue fetched data out and save the result into + * it after the request is handled. + * + * All the requests are pre-allocated and carefully partitioned between + * threads so there is no contention on the request, that make threads + * be parallel as much as possible. + * + * - User, i.e, the submitter + * It's the one fills the request and submits it to the workqueue, + * the result will be collected after it is handled by the work queue. + * + * The user can consecutively submit requests without waiting the previous + * requests been handled. + * It only supports one submitter, you should do serial submission by + * yourself if you want more, e.g, use lock on you side. + * + * - Workqueue, i.e, thread + * Each workqueue is represented by a running thread that fetches + * the request submitted by the user, do the specified work and save + * the result to the request. + */ + +typedef struct Threads Threads; + +struct ThreadedWorkqueueOps { + /* return the size of each request */ + int (*thread_get_request_size)(void); + + /* constructor of the request */ + int (*thread_request_init)(void *request); + /* destructor of the request */ + void (*thread_request_uninit)(void *request); + + /* the handler of the request that is called by the thread */ + void (*thread_request_handler)(void *request); + /* called by the user after the request has been handled */ + void (*thread_request_done)(void *request); +}; +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps; + +/* the default number of requests that thread need handle */ +#define DEFAULT_THREAD_REQUEST_NR 4 + +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + int thread_request_nr, + ThreadedWorkqueueOps *ops); + +void threaded_workqueue_destroy(Threads *threads); + +/* + * find a free request where the user can store the data that is needed to + * finish the request + * + * If all requests are used up, return NULL + */ +void *threaded_workqueue_get_request(Threads *threads); +/* submit the request and notify the thread */ +void threaded_workqueue_submit_request(Threads *threads, void *request); + +/* + * wait all threads to complete the request to make sure there is no + * previous request exists. + */ +void threaded_workqueue_wait_for_requests(Threads *threads); +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index 0820923c18..f26dfe5182 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -50,5 +50,6 @@ util-obj-y += range.o util-obj-y += stats64.o util-obj-y += systemd.o util-obj-y += iova-tree.o +util-obj-y += threaded-workqueue.o util-obj-$(CONFIG_LINUX) += vfio-helpers.o util-obj-$(CONFIG_OPENGL) += drm.o diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c new file mode 100644 index 0000000000..966479631a --- /dev/null +++ b/util/threaded-workqueue.c @@ -0,0 +1,466 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/bitmap.h" +#include "qemu/threaded-workqueue.h" + +#define SMP_CACHE_BYTES 64 +#define BITS_ALIGNED_TO_CACHE(_bits_) \ + QEMU_ALIGN_UP(_bits_, SMP_CACHE_BYTES * BITS_PER_BYTE) + +/* + * the request representation which contains the internally used mete data, + * it is the header of user-defined data. + * + * It should be aligned to the nature size of CPU. + */ +struct ThreadRequest { + /* + * the request has been handled by the thread and need the user + * to fetch result out. + */ + bool done; + /* + * the index to Threads::requests. + * Save it to the padding space although it can be calculated at runtime. + */ + int index; +}; +typedef struct ThreadRequest ThreadRequest; + +struct ThreadLocal { + struct Threads *threads; + + /* + * the request region in Threads::requests that the thread + * need handle + */ + int start_request_index; + int end_request_index; + + /* + * the interim bitmap used by the thread to avoid frequent + * memory allocation + */ + unsigned long *result_bitmap; + + /* the index of the thread */ + int self; + + /* thread is useless and needs to exit */ + bool quit; + + QemuThread thread; + + /* the event used to wake up the thread */ + QemuEvent ev; +}; +typedef struct ThreadLocal ThreadLocal; + +/* + * the main data struct represents multithreads which is shared by + * all threads + */ +struct Threads { + /* + * in order to avoid contention, the @requests is partitioned to + * @threads_nr pieces, each thread exclusively handles + * @thread_request_nr requests in the array. + */ + void *requests; + + /* + * the bit in these two bitmaps indicates the index of the ï¼ requests + * respectively. If it's the same, the corresponding request is free + * and owned by the user, i.e, where the user fills a request. Otherwise, + * it is valid and owned by the thread, i.e, where the thread fetches + * the request and write the result. + */ + + /* after the user fills the request, the bit is flipped. */ + unsigned long *request_fill_bitmap; + /* after handles the request, the thread flips the bit. */ + unsigned long *request_done_bitmap; + + /* + * the interim bitmap used by the user to avoid frequent + * memory allocation + */ + unsigned long *result_bitmap; + + /* the request header, ThreadRequest, is contained */ + unsigned int request_size; + + /* the number of requests that each thread need handle */ + unsigned int thread_request_nr; + unsigned int total_requests; + + unsigned int threads_nr; + + /* the request is pushed to the thread with round-robin manner */ + unsigned int current_thread_index; + + ThreadedWorkqueueOps *ops; + + const char *name; + QemuEvent ev; + + ThreadLocal per_thread_data[0]; +}; +typedef struct Threads Threads; + +static ThreadRequest *index_to_request(Threads *threads, int request_index) +{ + ThreadRequest *request; + + request = threads->requests + request_index * threads->request_size; + + assert(request->index == request_index); + return request; +} + +static int request_to_index(ThreadRequest *request) +{ + return request->index; +} + +static int thread_to_first_request_index(Threads *threads, int thread_id) +{ + thread_id %= threads->threads_nr; + return thread_id * threads->thread_request_nr; +} + +static int request_index_to_thread(Threads *threads, int request_index) +{ + return request_index / threads->thread_request_nr; +} + +/* + * free request: the request is not used by any thread, however, it might + * contian the result need the user to call thread_request_done() + * + * valid request: the request contains the request data and it's commited + * to the thread, i,e. it's owned by thread. + */ +static unsigned long *get_free_request_bitmap(Threads *threads) +{ + bitmap_xor(threads->result_bitmap, threads->request_fill_bitmap, + threads->request_done_bitmap, threads->total_requests); + + /* + * paired with smp_wmb() in mark_request_free() to make sure that we + * read request_done_bitmap before fetch the result out. + */ + smp_rmb(); + + return threads->result_bitmap; +} + +static int find_free_request_index(Threads *threads) +{ + unsigned long *result_bitmap = get_free_request_bitmap(threads); + int index, cur_index; + + cur_index = thread_to_first_request_index(threads, + threads->current_thread_index); + +retry: + index = find_next_zero_bit(result_bitmap, threads->total_requests, + cur_index); + if (index < threads->total_requests) { + return index; + } + + /* if we get nothing, start it over. */ + if (cur_index != 0) { + cur_index = 0; + goto retry; + } + + return -1; +} + +static void mark_request_valid(Threads *threads, int request_index) +{ + /* + * paired with smp_rmb() in find_first_valid_request_index() to make + * sure the request has been filled before the bit is flipped that + * will make the request be visible to the thread + */ + smp_wmb(); + + change_bit(request_index, threads->request_fill_bitmap); +} + +static int thread_find_first_valid_request_index(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + int index; + + bitmap_xor(thread->result_bitmap, threads->request_fill_bitmap, + threads->request_done_bitmap, threads->total_requests); + /* + * paired with smp_wmb() in mark_request_valid() to make sure that + * we read request_fill_bitmap before fetch the request out. + */ + smp_rmb(); + + index = find_next_bit(thread->result_bitmap, threads->total_requests, + thread->start_request_index); + return index > thread->end_request_index ? -1 : index; +} + +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request) +{ + int index = request_to_index(request); + + /* + * smp_wmb() is implied in change_bit_atomic() that is paired with + * smp_rmb() in get_free_request_bitmap() to make sure the result + * has been saved before the bit is flipped. + */ + change_bit_atomic(index, thread->threads->request_done_bitmap); +} + +/* retry to see if there is available request before actually go to wait. */ +#define BUSY_WAIT_COUNT 1000 + +static ThreadRequest * +thread_busy_wait_for_request(ThreadLocal *thread) +{ + int index, count = 0; + + for (count = 0; count < BUSY_WAIT_COUNT; count++) { + index = thread_find_first_valid_request_index(thread); + if (index >= 0) { + assert(index >= thread->start_request_index && + index <= thread->end_request_index); + return index_to_request(thread->threads, index); + } + + cpu_relax(); + } + + return NULL; +} + +static void *thread_run(void *opaque) +{ + ThreadLocal *self_data = (ThreadLocal *)opaque; + Threads *threads = self_data->threads; + void (*handler)(void *request) = threads->ops->thread_request_handler; + ThreadRequest *request; + + for ( ; !atomic_read(&self_data->quit); ) { + qemu_event_reset(&self_data->ev); + + request = thread_busy_wait_for_request(self_data); + if (!request) { + qemu_event_wait(&self_data->ev); + continue; + } + + assert(!request->done); + + handler(request + 1); + request->done = true; + mark_request_free(self_data, request); + qemu_event_set(&threads->ev); + } + + return NULL; +} + +static void uninit_requests(Threads *threads, int free_nr) +{ + ThreadRequest *request; + int i; + + for (request = threads->requests, i = 0; i < free_nr; i++) { + threads->ops->thread_request_uninit(request + 1); + request = (void *)request + threads->request_size; + } + + g_free(threads->result_bitmap); + g_free(threads->request_fill_bitmap); + g_free(threads->request_done_bitmap); + g_free(threads->requests); +} + +static int init_requests(Threads *threads) +{ + ThreadRequest *request; + int aligned_requests, free_nr = 0, ret = -1; + + aligned_requests = BITS_ALIGNED_TO_CACHE(threads->total_requests); + threads->request_fill_bitmap = bitmap_new(aligned_requests); + threads->request_done_bitmap = bitmap_new(aligned_requests); + threads->result_bitmap = bitmap_new(threads->total_requests); + + QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long))); + + threads->request_size = threads->ops->thread_get_request_size(); + threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long)); + threads->request_size += sizeof(ThreadRequest); + threads->requests = g_try_malloc0_n(threads->total_requests, + threads->request_size); + if (!threads->requests) { + goto exit; + } + + for (request = threads->requests; free_nr < threads->total_requests; + free_nr++) { + ret = threads->ops->thread_request_init(request + 1); + if (ret < 0) { + goto exit; + } + + request->index = free_nr; + request = (void *)request + threads->request_size; + } + + return 0; + +exit: + uninit_requests(threads, free_nr); + return ret; +} + +static void uninit_thread_data(Threads *threads) +{ + ThreadLocal *thread_local = threads->per_thread_data; + int i; + + for (i = 0; i < threads->threads_nr; i++) { + thread_local[i].quit = true; + qemu_event_set(&thread_local[i].ev); + qemu_thread_join(&thread_local[i].thread); + qemu_event_destroy(&thread_local[i].ev); + g_free(thread_local[i].result_bitmap); + } +} + +static void init_thread_data(Threads *threads) +{ + ThreadLocal *thread_local = threads->per_thread_data; + char *name; + int start_index, end_index, i; + + for (i = 0; i < threads->threads_nr; i++) { + thread_local[i].threads = threads; + thread_local[i].self = i; + + start_index = thread_to_first_request_index(threads, i); + end_index = start_index + threads->thread_request_nr - 1; + thread_local[i].start_request_index = start_index; + thread_local[i].end_request_index = end_index; + + thread_local[i].result_bitmap = bitmap_new(threads->total_requests); + + qemu_event_init(&thread_local[i].ev, false); + + name = g_strdup_printf("%s/%d", threads->name, thread_local[i].self); + qemu_thread_create(&thread_local[i].thread, name, + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); + g_free(name); + } +} + +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + int thread_request_nr, ThreadedWorkqueueOps *ops) +{ + Threads *threads; + + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); + threads->name = name; + threads->ops = ops; + + threads->threads_nr = threads_nr; + threads->thread_request_nr = thread_request_nr; + + threads->total_requests = thread_request_nr * threads_nr; + if (init_requests(threads) < 0) { + g_free(threads); + return NULL; + } + + qemu_event_init(&threads->ev, false); + init_thread_data(threads); + return threads; +} + +void threaded_workqueue_destroy(Threads *threads) +{ + uninit_thread_data(threads); + uninit_requests(threads, threads->total_requests); + qemu_event_destroy(&threads->ev); + g_free(threads); +} + +static void request_done(Threads *threads, ThreadRequest *request) +{ + if (!request->done) { + return; + } + + threads->ops->thread_request_done(request + 1); + request->done = false; +} + +void *threaded_workqueue_get_request(Threads *threads) +{ + ThreadRequest *request; + int index; + + index = find_free_request_index(threads); + if (index < 0) { + return NULL; + } + + request = index_to_request(threads, index); + request_done(threads, request); + return request + 1; +} + +void threaded_workqueue_submit_request(Threads *threads, void *request) +{ + ThreadRequest *req = request - sizeof(ThreadRequest); + int request_index = request_to_index(req); + int thread_index = request_index_to_thread(threads, request_index); + ThreadLocal *thread_local = &threads->per_thread_data[thread_index]; + + assert(!req->done); + + mark_request_valid(threads, request_index); + + threads->current_thread_index = ++thread_index; + qemu_event_set(&thread_local->ev); +} + +void threaded_workqueue_wait_for_requests(Threads *threads) +{ + unsigned long *result_bitmap; + int index = 0; + +retry: + qemu_event_reset(&threads->ev); + result_bitmap = get_free_request_bitmap(threads); + for (; index < threads->total_requests; index++) { + if (test_bit(index, result_bitmap)) { + qemu_event_wait(&threads->ev); + goto retry; + }; + + request_done(threads, index_to_request(threads, index)); + } +} From patchwork Tue Nov 6 12:20:23 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 993658 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (mailfrom) smtp.mailfrom=nongnu.org (client-ip=2001:4830:134:3::11; helo=lists.gnu.org; envelope-from=qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org; receiver=) Authentication-Results: ozlabs.org; dmarc=fail (p=none dis=none) header.from=gmail.com Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (2048-bit key; unprotected) header.d=gmail.com header.i=@gmail.com header.b="O5xjg9+j"; dkim-atps=neutral Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 42q818294mz9sBN for ; Tue, 6 Nov 2018 23:26:36 +1100 (AEDT) Received: from localhost ([::1]:40693 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0RF-0000Xk-TZ for incoming@patchwork.ozlabs.org; Tue, 06 Nov 2018 07:26:33 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:43383) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0Li-0002tR-1h for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:52 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gK0Le-0001oJ-M9 for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:50 -0500 Received: from mail-pg1-x542.google.com ([2607:f8b0:4864:20::542]:42760) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gK0Le-0001nV-Dd for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:46 -0500 Received: by mail-pg1-x542.google.com with SMTP id i4-v6so5749943pgq.9 for ; Tue, 06 Nov 2018 04:20:46 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=h+93I4KEYaqBzpYCU5F/5aYLsBjGY/tiNdC2Huafomg=; b=O5xjg9+jM7XNdO3GsYeN5Q6ox2ForyZCFx47oAYMudMkGMUc/s2mJOMXq3QhsgvYlx v1AQJNNbjPVjnW6HwwjkhwRArRrzMmmL5qDNDUf0nJ3Fr+WRYh15LcOXS1y3Yf4itcwb NS/ehuKDIiW0tJalJO95zhih4oGHCv4Wg0FgjwynR3huyngjuHw9X9yVrgUfO+kiMR12 37zerEpx8X9s3/sgX8soxj0RBe5oIQbBalARnldc0BxRnzwGklKOOHyQOUtYlaejzrp/ dh9OIDRKde/tddfaMfHAj4t2gEg6qQcJ88WvKyQMM2R7UtZOQ7H5Vhb7Uk8P3LoTAB0R S00A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=h+93I4KEYaqBzpYCU5F/5aYLsBjGY/tiNdC2Huafomg=; b=gQ1V1JQuLJDXuhhPw1h229lBQOAt/DW1KLX4b6peNXAUFrPNfGvQHA6X9KK/Sz+2NI 7fIJQUucZnTJagGJLLbIWxLVTJjeMaV6puMwNY7xV046ZHL4K6PZR0xxmG2IjrbgoIxe dwtKy/HVcyXZgxYFtmX8xJQYb5ZHVs2BzK2pQy5gniCCa/zYZd73eFbZdRVMx4gAuT75 Uufg5xFdA/AdmQa2YD9Xcyz4u/VjkLxZAfQABsPH7P7bW4BTgCB7J2HLNj7y3TWv/8rM V+8BXFJcNmP/iEJa9bi8GnGZ1u2/5jj/ZT/zobpTkmoVXxCEvK59X9cKeo5eMF6ntw1W XrhQ== X-Gm-Message-State: AGRZ1gLrra2Em1cPjcDnkyOr2tWGcNsy/LXvUOthCFZs0LTSDkYHcHze YNnCuivcyPcDNAq3OWV0AmE= X-Google-Smtp-Source: AJdET5f1VyAO7c9EHVXOgX8ycJBta0RCDvr9P+2BoyAAFo9l4flep9NsA5dSzMEh0zMqxgd3rhGAGw== X-Received: by 2002:a62:4896:: with SMTP id q22-v6mr26153401pfi.248.1541506845496; Tue, 06 Nov 2018 04:20:45 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.41 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:44 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Tue, 6 Nov 2018 20:20:23 +0800 Message-Id: <20181106122025.3487-4-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::542 Subject: [Qemu-devel] [PATCH v2 3/5] migration: use threaded workqueue for compression X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: "Qemu-devel" From: Xiao Guangrong Adapt the compression code to the threaded workqueue Signed-off-by: Xiao Guangrong --- migration/ram.c | 313 +++++++++++++++++++++----------------------------------- 1 file changed, 115 insertions(+), 198 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 7e7deec4d8..acca842aff 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -57,6 +57,7 @@ #include "qemu/uuid.h" #include "savevm.h" #include "qemu/iov.h" +#include "qemu/threaded-workqueue.h" /***********************************************************/ /* ram save/restore */ @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct CompressParam { - bool done; - bool quit; - bool zero_page; - QEMUFile *file; - QemuMutex mutex; - QemuCond cond; - RAMBlock *block; - ram_addr_t offset; - - /* internally used fields */ - z_stream stream; - uint8_t *originbuf; -}; -typedef struct CompressParam CompressParam; - struct DecompressParam { bool done; bool quit; @@ -377,15 +362,6 @@ struct DecompressParam { }; typedef struct DecompressParam DecompressParam; -static CompressParam *comp_param; -static QemuThread *compress_threads; -/* comp_done_cond is used to wake up the migration thread when - * one of the compression threads has finished the compression. - * comp_done_lock is used to co-work with comp_done_cond. - */ -static QemuMutex comp_done_lock; -static QemuCond comp_done_cond; -/* The empty QEMUFileOps will be used by file in CompressParam */ static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf); - -static void *do_data_compress(void *opaque) -{ - CompressParam *param = opaque; - RAMBlock *block; - ram_addr_t offset; - bool zero_page; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->block) { - block = param->block; - offset = param->offset; - param->block = NULL; - qemu_mutex_unlock(¶m->mutex); - - zero_page = do_compress_ram_page(param->file, ¶m->stream, - block, offset, param->originbuf); - - qemu_mutex_lock(&comp_done_lock); - param->done = true; - param->zero_page = zero_page; - qemu_cond_signal(&comp_done_cond); - qemu_mutex_unlock(&comp_done_lock); - - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } - } - qemu_mutex_unlock(¶m->mutex); - - return NULL; -} - -static void compress_threads_save_cleanup(void) -{ - int i, thread_count; - - if (!migrate_use_compression() || !comp_param) { - return; - } - - thread_count = migrate_compress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!comp_param[i].file) { - break; - } - - qemu_mutex_lock(&comp_param[i].mutex); - comp_param[i].quit = true; - qemu_cond_signal(&comp_param[i].cond); - qemu_mutex_unlock(&comp_param[i].mutex); - - qemu_thread_join(compress_threads + i); - qemu_mutex_destroy(&comp_param[i].mutex); - qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); - g_free(comp_param[i].originbuf); - qemu_fclose(comp_param[i].file); - comp_param[i].file = NULL; - } - qemu_mutex_destroy(&comp_done_lock); - qemu_cond_destroy(&comp_done_cond); - g_free(compress_threads); - g_free(comp_param); - compress_threads = NULL; - comp_param = NULL; -} - -static int compress_threads_save_setup(void) -{ - int i, thread_count; - - if (!migrate_use_compression()) { - return 0; - } - thread_count = migrate_compress_threads(); - compress_threads = g_new0(QemuThread, thread_count); - comp_param = g_new0(CompressParam, thread_count); - qemu_cond_init(&comp_done_cond); - qemu_mutex_init(&comp_done_lock); - for (i = 0; i < thread_count; i++) { - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); - if (!comp_param[i].originbuf) { - goto exit; - } - - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { - g_free(comp_param[i].originbuf); - goto exit; - } - - /* comp_param[i].file is just used as a dummy buffer to save data, - * set its ops to empty. - */ - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); - comp_param[i].done = true; - comp_param[i].quit = false; - qemu_mutex_init(&comp_param[i].mutex); - qemu_cond_init(&comp_param[i].cond); - qemu_thread_create(compress_threads + i, "compress", - do_data_compress, comp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; - -exit: - compress_threads_save_cleanup(); - return -1; -} - /* Multiple fd's */ #define MULTIFD_MAGIC 0x11223344U @@ -1909,12 +1766,25 @@ exit: return zero_page; } +struct CompressData { + /* filled by migration thread.*/ + RAMBlock *block; + ram_addr_t offset; + + /* filled by compress thread. */ + QEMUFile *file; + z_stream stream; + uint8_t *originbuf; + bool zero_page; +}; +typedef struct CompressData CompressData; + static void -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) +update_compress_thread_counts(CompressData *cd, int bytes_xmit) { ram_counters.transferred += bytes_xmit; - if (param->zero_page) { + if (cd->zero_page) { ram_counters.duplicate++; return; } @@ -1924,81 +1794,128 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) compression_counters.pages++; } +static int compress_thread_get_data_size(void) +{ + return sizeof(CompressData); +} + +static int compress_thread_data_init(void *request) +{ + CompressData *cd = request; + + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); + if (!cd->originbuf) { + return -1; + } + + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { + g_free(cd->originbuf); + return -1; + } + + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return 0; +} + +static void compress_thread_data_fini(void *request) +{ + CompressData *cd = request; + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); + g_free(cd->originbuf); +} + +static void compress_thread_data_handler(void *request) +{ + CompressData *cd = request; + + /* + * if compression fails, it will be indicated by + * migrate_get_current()->to_dst_file. + */ + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, + cd->offset, cd->originbuf); +} + +static void compress_thread_data_done(void *request) +{ + CompressData *cd = request; + RAMState *rs = ram_state; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); + update_compress_thread_counts(cd, bytes_xmit); +} + +static ThreadedWorkqueueOps compress_ops = { + .thread_get_request_size = compress_thread_get_data_size, + .thread_request_init = compress_thread_data_init, + .thread_request_uninit = compress_thread_data_fini, + .thread_request_handler = compress_thread_data_handler, + .thread_request_done = compress_thread_data_done, +}; + +static Threads *compress_threads; + static bool save_page_use_compression(RAMState *rs); static void flush_compressed_data(RAMState *rs) { - int idx, len, thread_count; - if (!save_page_use_compression(rs)) { return; } - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!comp_param[idx].done) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - } - } - qemu_mutex_unlock(&comp_done_lock); + threaded_workqueue_wait_for_requests(compress_threads); +} - for (idx = 0; idx < thread_count; idx++) { - qemu_mutex_lock(&comp_param[idx].mutex); - if (!comp_param[idx].quit) { - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); - /* - * it's safe to fetch zero_page without holding comp_done_lock - * as there is no further request submitted to the thread, - * i.e, the thread should be waiting for a request at this point. - */ - update_compress_thread_counts(&comp_param[idx], len); - } - qemu_mutex_unlock(&comp_param[idx].mutex); +static void compress_threads_save_cleanup(void) +{ + if (!compress_threads) { + return; } + + threaded_workqueue_destroy(compress_threads); + compress_threads = NULL; } -static inline void set_compress_params(CompressParam *param, RAMBlock *block, - ram_addr_t offset) +static int compress_threads_save_setup(void) { - param->block = block; - param->offset = offset; + if (!migrate_use_compression()) { + return 0; + } + + compress_threads = threaded_workqueue_create("compress", + migrate_compress_threads(), + DEFAULT_THREAD_REQUEST_NR, &compress_ops); + return compress_threads ? 0 : -1; } static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - int idx, thread_count, bytes_xmit = -1, pages = -1; + CompressData *cd; bool wait = migrate_compress_wait_thread(); - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); retry: - for (idx = 0; idx < thread_count; idx++) { - if (comp_param[idx].done) { - comp_param[idx].done = false; - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); - qemu_mutex_lock(&comp_param[idx].mutex); - set_compress_params(&comp_param[idx], block, offset); - qemu_cond_signal(&comp_param[idx].cond); - qemu_mutex_unlock(&comp_param[idx].mutex); - pages = 1; - update_compress_thread_counts(&comp_param[idx], bytes_xmit); - break; + cd = threaded_workqueue_get_request(compress_threads); + if (!cd) { + /* + * wait for the free thread if the user specifies + * 'compress-wait-thread', otherwise we will post + * the page out in the main thread as normal page. + */ + if (wait) { + cpu_relax(); + goto retry; } - } - /* - * wait for the free thread if the user specifies 'compress-wait-thread', - * otherwise we will post the page out in the main thread as normal page. - */ - if (pages < 0 && wait) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - goto retry; - } - qemu_mutex_unlock(&comp_done_lock); - - return pages; + return -1; + } + cd->block = block; + cd->offset = offset; + threaded_workqueue_submit_request(compress_threads, cd); + return 1; } /** From patchwork Tue Nov 6 12:20:24 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 993655 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (mailfrom) smtp.mailfrom=nongnu.org (client-ip=2001:4830:134:3::11; helo=lists.gnu.org; envelope-from=qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org; receiver=) Authentication-Results: ozlabs.org; dmarc=fail (p=none dis=none) header.from=gmail.com Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (2048-bit key; unprotected) header.d=gmail.com header.i=@gmail.com header.b="H84aoVrl"; dkim-atps=neutral Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 42q7y66Fc1z9sBN for ; Tue, 6 Nov 2018 23:23:57 +1100 (AEDT) Received: from localhost ([::1]:40671 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0Og-0006Zd-JC for incoming@patchwork.ozlabs.org; Tue, 06 Nov 2018 07:23:54 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:43401) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0Lk-0002uy-3q for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:53 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gK0Li-0001vu-Oq for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:52 -0500 Received: from mail-pf1-x444.google.com ([2607:f8b0:4864:20::444]:33374) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gK0Li-0001tj-Gm for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:50 -0500 Received: by mail-pf1-x444.google.com with SMTP id v68-v6so2453476pfk.0 for ; Tue, 06 Nov 2018 04:20:50 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=bSrHkivhHJMtQkgfkJcIW0le1CciJhfpKRO9fbQYbd8=; b=H84aoVrlmJgc9brppb8J39vP52aoWpHArMya35+I82V6fUJLeG25JbeTE0MW+nhhp3 SFt5WGNDz+KB5HQZoPnztqzwDvz34Amge50GpvTj/NZt1oad/VuWQlWHypILfZD1IXLw S1eNA5DshtZp6JxH2f/EM34GFlxR6xUd72r4o/WDkv+SFOJpdrHjj7d0KSTtT0TPU2gJ aS17bw3YN4FjTb3Ikghmg9A9DTkNyWO5vtPQwcRylvH+qF/zX4RMrCBwoJx8CMCMLv1v 7HI4Xbv52ruRugG2sn7d23iNxcjjBubyABVsjTsMo/qat7AksJ1/wriFyQVPtzvEsFcB p/Hw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=bSrHkivhHJMtQkgfkJcIW0le1CciJhfpKRO9fbQYbd8=; b=KR+2Yw5aFZ/ABV6Ur4GMhkhVUAlminlm1zFd2risn1FZpGVblw9iQTpOUjZkkjhU99 YN6FBkIWPvxXYnx4CMjGZeNZivtjuU4YdRP69mJfjDkHppg1s4OoOQghj8eZ0OF+qPQm Y5uNlFSO/4RtM3I/E/uP36nCQTHy2j+IdCf3HiYtJ9bxJiWMim415jEw/GmikpI5yZoS 91awfZ4eXyyJh9rHLnZJFYfDbWpgFEeJgyfU/0bZ7AI0aob/nwXqEJfvJzhiVgzk8L7z JQOphbuV1as96LiLOHiobAzbRuocYpmq4bs3ENO5sEu/wDFqsyVDdEkVlMtfK8+n2kVo 6aLg== X-Gm-Message-State: AGRZ1gL3PehyzrFwk5WoOcTK3ulggyoEfKWR0evbldiGlFyNaD5AKumm 98fyMBj0qLL45jT3uaPmEsw= X-Google-Smtp-Source: AJdET5dJGXd5Wv1IOAUylvx06lh5T2hCdHmZ7njJQtdVcpiiIcLUhSTQJ8lwYCPr1Ww1L0jRWIkheQ== X-Received: by 2002:a63:eb42:: with SMTP id b2-v6mr23221907pgk.348.1541506849676; Tue, 06 Nov 2018 04:20:49 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.45 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:49 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Tue, 6 Nov 2018 20:20:24 +0800 Message-Id: <20181106122025.3487-5-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::444 Subject: [Qemu-devel] [PATCH v2 4/5] migration: use threaded workqueue for decompression X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: "Qemu-devel" From: Xiao Guangrong Adapt the compression code to the threaded workqueue Signed-off-by: Xiao Guangrong --- migration/ram.c | 225 ++++++++++++++++++++------------------------------------ 1 file changed, 81 insertions(+), 144 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index acca842aff..834198f11c 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct DecompressParam { - bool done; - bool quit; - QemuMutex mutex; - QemuCond cond; - void *des; - uint8_t *compbuf; - int len; - z_stream stream; -}; -typedef struct DecompressParam DecompressParam; - static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; -static DecompressParam *decomp_param; -static QemuThread *decompress_threads; -static QemuMutex decomp_done_lock; -static QemuCond decomp_done_cond; /* Multiple fd's */ @@ -3404,6 +3388,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } + /* return the size after decompression, or negative value on error */ static int qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, @@ -3429,166 +3414,118 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, return stream->total_out; } -static void *do_data_decompress(void *opaque) +struct DecompressData { + /* filled by migration thread.*/ + void *des; + uint8_t *compbuf; + size_t len; + + z_stream stream; +}; +typedef struct DecompressData DecompressData; + +static Threads *decompress_threads; + +static int decompress_thread_get_data_size(void) { - DecompressParam *param = opaque; - unsigned long pagesize; - uint8_t *des; - int len, ret; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->des) { - des = param->des; - len = param->len; - param->des = 0; - qemu_mutex_unlock(¶m->mutex); - - pagesize = TARGET_PAGE_SIZE; - - ret = qemu_uncompress_data(¶m->stream, des, pagesize, - param->compbuf, len); - if (ret < 0 && migrate_get_current()->decompress_error_check) { - error_report("decompress data failed"); - qemu_file_set_error(decomp_file, ret); - } + return sizeof(DecompressData); +} - qemu_mutex_lock(&decomp_done_lock); - param->done = true; - qemu_cond_signal(&decomp_done_cond); - qemu_mutex_unlock(&decomp_done_lock); +static int decompress_thread_data_init(void *request) +{ + DecompressData *dd = request; - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } + if (inflateInit(&dd->stream) != Z_OK) { + return -1; } - qemu_mutex_unlock(¶m->mutex); - return NULL; + dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + return 0; } -static int wait_for_decompress_done(void) +static void decompress_thread_data_fini(void *request) { - int idx, thread_count; + DecompressData *dd = request; - if (!migrate_use_compression()) { - return 0; - } + inflateEnd(&dd->stream); + g_free(dd->compbuf); +} - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!decomp_param[idx].done) { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +static void decompress_thread_data_handler(void *request) +{ + DecompressData *dd = request; + unsigned long pagesize = TARGET_PAGE_SIZE; + int ret; + + ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize, + dd->compbuf, dd->len); + if (ret < 0 && migrate_get_current()->decompress_error_check) { + error_report("decompress data failed"); + qemu_file_set_error(decomp_file, ret); } - qemu_mutex_unlock(&decomp_done_lock); - return qemu_file_get_error(decomp_file); } -static void compress_threads_load_cleanup(void) +static void decompress_thread_data_done(void *request) { - int i, thread_count; +} + +static ThreadedWorkqueueOps decompress_ops = { + .thread_get_request_size = decompress_thread_get_data_size, + .thread_request_init = decompress_thread_data_init, + .thread_request_uninit = decompress_thread_data_fini, + .thread_request_handler = decompress_thread_data_handler, + .thread_request_done = decompress_thread_data_done, +}; +static int decompress_init(QEMUFile *f) +{ if (!migrate_use_compression()) { - return; + return 0; } - thread_count = migrate_decompress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!decomp_param[i].compbuf) { - break; - } - qemu_mutex_lock(&decomp_param[i].mutex); - decomp_param[i].quit = true; - qemu_cond_signal(&decomp_param[i].cond); - qemu_mutex_unlock(&decomp_param[i].mutex); - } - for (i = 0; i < thread_count; i++) { - if (!decomp_param[i].compbuf) { - break; - } + decomp_file = f; + decompress_threads = threaded_workqueue_create("decompress", + migrate_decompress_threads(), + DEFAULT_THREAD_REQUEST_NR, &decompress_ops); + return decompress_threads ? 0 : -1; +} - qemu_thread_join(decompress_threads + i); - qemu_mutex_destroy(&decomp_param[i].mutex); - qemu_cond_destroy(&decomp_param[i].cond); - inflateEnd(&decomp_param[i].stream); - g_free(decomp_param[i].compbuf); - decomp_param[i].compbuf = NULL; +static void decompress_fini(void) +{ + if (!decompress_threads) { + return; } - g_free(decompress_threads); - g_free(decomp_param); + + threaded_workqueue_destroy(decompress_threads); decompress_threads = NULL; - decomp_param = NULL; decomp_file = NULL; } -static int compress_threads_load_setup(QEMUFile *f) +static int flush_decompressed_data(void) { - int i, thread_count; - if (!migrate_use_compression()) { return 0; } - thread_count = migrate_decompress_threads(); - decompress_threads = g_new0(QemuThread, thread_count); - decomp_param = g_new0(DecompressParam, thread_count); - qemu_mutex_init(&decomp_done_lock); - qemu_cond_init(&decomp_done_cond); - decomp_file = f; - for (i = 0; i < thread_count; i++) { - if (inflateInit(&decomp_param[i].stream) != Z_OK) { - goto exit; - } - - decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); - qemu_mutex_init(&decomp_param[i].mutex); - qemu_cond_init(&decomp_param[i].cond); - decomp_param[i].done = true; - decomp_param[i].quit = false; - qemu_thread_create(decompress_threads + i, "decompress", - do_data_decompress, decomp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; -exit: - compress_threads_load_cleanup(); - return -1; + threaded_workqueue_wait_for_requests(decompress_threads); + return qemu_file_get_error(decomp_file); } static void decompress_data_with_multi_threads(QEMUFile *f, - void *host, int len) + void *host, size_t len) { - int idx, thread_count; + DecompressData *dd; - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - while (true) { - for (idx = 0; idx < thread_count; idx++) { - if (decomp_param[idx].done) { - decomp_param[idx].done = false; - qemu_mutex_lock(&decomp_param[idx].mutex); - qemu_get_buffer(f, decomp_param[idx].compbuf, len); - decomp_param[idx].des = host; - decomp_param[idx].len = len; - qemu_cond_signal(&decomp_param[idx].cond); - qemu_mutex_unlock(&decomp_param[idx].mutex); - break; - } - } - if (idx < thread_count) { - break; - } else { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +retry: + dd = threaded_workqueue_get_request(decompress_threads); + if (!dd) { + goto retry; } - qemu_mutex_unlock(&decomp_done_lock); + + dd->des = host; + dd->len = len; + qemu_get_buffer(f, dd->compbuf, len); + threaded_workqueue_submit_request(decompress_threads, dd); } /* @@ -3683,7 +3620,7 @@ void colo_release_ram_cache(void) */ static int ram_load_setup(QEMUFile *f, void *opaque) { - if (compress_threads_load_setup(f)) { + if (decompress_init(f)) { return -1; } @@ -3704,7 +3641,7 @@ static int ram_load_cleanup(void *opaque) } xbzrle_load_cleanup(); - compress_threads_load_cleanup(); + decompress_fini(); RAMBLOCK_FOREACH_MIGRATABLE(rb) { g_free(rb->receivedmap); @@ -4106,7 +4043,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } } - ret |= wait_for_decompress_done(); + ret |= flush_decompressed_data(); rcu_read_unlock(); trace_ram_load_complete(ret, seq_iter); From patchwork Tue Nov 6 12:20:25 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 993656 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (mailfrom) smtp.mailfrom=nongnu.org (client-ip=2001:4830:134:3::11; helo=lists.gnu.org; envelope-from=qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org; receiver=) Authentication-Results: ozlabs.org; dmarc=fail (p=none dis=none) header.from=gmail.com Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (2048-bit key; unprotected) header.d=gmail.com header.i=@gmail.com header.b="XKxVl5zD"; dkim-atps=neutral Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 42q7yJ3256z9sBN for ; Tue, 6 Nov 2018 23:24:08 +1100 (AEDT) Received: from localhost ([::1]:40673 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0Or-0006i8-VV for incoming@patchwork.ozlabs.org; Tue, 06 Nov 2018 07:24:06 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:43415) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gK0Lo-0002xW-80 for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:59 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gK0Lm-00020l-Vx for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:56 -0500 Received: from mail-pf1-x442.google.com ([2607:f8b0:4864:20::442]:41243) by eggs.gnu.org with esmtps (TLS1.0:RSA_AES_128_CBC_SHA1:16) (Exim 4.71) (envelope-from ) id 1gK0Lm-0001za-Ny for qemu-devel@nongnu.org; Tue, 06 Nov 2018 07:20:54 -0500 Received: by mail-pf1-x442.google.com with SMTP id e22-v6so6035325pfn.8 for ; Tue, 06 Nov 2018 04:20:54 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=jznZJVjlRLoD9TNH6b8XfAsUK78wEZWIsv5uQU7dmjQ=; b=XKxVl5zDkr+Jqt/XwiYms5HgLgo5bc4trnulWq73QfHgBX9d+P0CG+WYUM6jPiwr+W EESPJkX0arpFOOwe+oXjtUPgHGtnWuDIbKQjGNi7lwpZRLLHVCZ4lcJK/diUEt40tRPO k4rXegWgZdT8x9sFvuivHTPZDOLq/3CTGIEH7vKeOtwf59AAppx5h0WMwYWR1zhNkas+ 7Tn/k5qvPdyF0umOAaPLE+fDrWDUXaKqUNsSVuc/AGh53o8A0QiuHqHi9jo2+F00YqLG UKEHodWfx7QTdGzJ1XQVeiBeUOHeNTJS4MJGxzAInbQS2/ewODhQkEtBPH0aIX15KgLO Qefg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=jznZJVjlRLoD9TNH6b8XfAsUK78wEZWIsv5uQU7dmjQ=; b=YIlxZRgYCyCqI3/KIRddX6oQ8hs1/fATD21LIBZ62rr6m8g2XZOkIV0PacpqEVFX96 bdhbPfB37tFl40uentmcLOoDGtoQ/NTWyHldfSPBkZ3JzPw8rPZYyPwHrSv0nlX5w+hY 2gc/N5lr4NHx8/pHAEzAQ4I+RDq0nCLiPiyTB/p5Pz8Eadp+4IFO9aA7vbJOitRGOpem RXCWuqn1ux+DyWqSoNJqxKUC+Rl52HlDSnEek5ihEr0BznjEF2JIjUmvBGlkGXYPfAMf 1IWqGIRheOjSKaaGXcdSI3goBHHgyx+rEes0IDsp9uMZpm8snPwexWr89uHr07Go6YxG +J+w== X-Gm-Message-State: AGRZ1gLtzxXeIMS6cfRn6+uQBGquoCVs/+VkVl/9nSEtz09opE9ezc/J swbmgrcPRJQr/vV3dKeEVkw= X-Google-Smtp-Source: AJdET5dP8D0zZIR+KN+46nAH9f//8cE3SqKupptEdNFlDdaFsAdZjjkh0XKDywC/ocMbsPH1fvUVrw== X-Received: by 2002:a63:6205:: with SMTP id w5mr23315591pgb.53.1541506853788; Tue, 06 Nov 2018 04:20:53 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.49 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:53 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Date: Tue, 6 Nov 2018 20:20:25 +0800 Message-Id: <20181106122025.3487-6-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 2607:f8b0:4864:20::442 Subject: [Qemu-devel] [PATCH v2 5/5] tests: add threaded-workqueue-bench X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kvm@vger.kernel.org, quintela@redhat.com, Xiao Guangrong , qemu-devel@nongnu.org, peterx@redhat.com, dgilbert@redhat.com, wei.w.wang@intel.com, cota@braap.org, jiang.biao2@zte.com.cn Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: "Qemu-devel" From: Xiao Guangrong It's the benhcmark of threaded-workqueue, also it's a good example to show how threaded-workqueue is used Signed-off-by: Xiao Guangrong --- tests/Makefile.include | 5 +- tests/threaded-workqueue-bench.c | 256 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 260 insertions(+), 1 deletion(-) create mode 100644 tests/threaded-workqueue-bench.c diff --git a/tests/Makefile.include b/tests/Makefile.include index d2e577eabb..a4deb210ab 100644 --- a/tests/Makefile.include +++ b/tests/Makefile.include @@ -499,7 +499,8 @@ test-obj-y = tests/check-qnum.o tests/check-qstring.o tests/check-qdict.o \ tests/test-rcu-tailq.o \ tests/test-qdist.o tests/test-shift128.o \ tests/test-qht.o tests/qht-bench.o tests/test-qht-par.o \ - tests/atomic_add-bench.o tests/atomic64-bench.o + tests/atomic_add-bench.o tests/atomic64-bench.o \ + tests/threaded-workqueue-bench.o $(test-obj-y): QEMU_INCLUDES += -Itests QEMU_CFLAGS += -I$(SRC_PATH)/tests @@ -555,6 +556,8 @@ tests/qht-bench$(EXESUF): tests/qht-bench.o $(test-util-obj-y) tests/test-bufferiszero$(EXESUF): tests/test-bufferiszero.o $(test-util-obj-y) tests/atomic_add-bench$(EXESUF): tests/atomic_add-bench.o $(test-util-obj-y) tests/atomic64-bench$(EXESUF): tests/atomic64-bench.o $(test-util-obj-y) +tests/threaded-workqueue-bench$(EXESUF): tests/threaded-workqueue-bench.o migration/qemu-file.o \ + $(test-util-obj-y) tests/fp/%: $(MAKE) -C $(dir $@) $(notdir $@) diff --git a/tests/threaded-workqueue-bench.c b/tests/threaded-workqueue-bench.c new file mode 100644 index 0000000000..88026f1a8f --- /dev/null +++ b/tests/threaded-workqueue-bench.c @@ -0,0 +1,256 @@ +/* + * Threaded Workqueue Benchmark + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ +#include + +#include "qemu/osdep.h" +#include "exec/cpu-common.h" +#include "qemu/error-report.h" +#include "migration/qemu-file.h" +#include "qemu/threaded-workqueue.h" + +#define PAGE_SHIFT 12 +#define PAGE_SIZE (1 << PAGE_SHIFT) +#define DEFAULT_THREAD_NR 2 +#define DEFAULT_MEM_SIZE 1 +#define DEFAULT_REPEATED_COUNT 3 + +static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt, + int64_t pos) +{ + int i, size = 0; + + for (i = 0; i < iovcnt; i++) { + size += iov[i].iov_len; + } + return size; +} + +static int test_fclose(void *opaque) +{ + return 0; +} + +static const QEMUFileOps test_write_ops = { + .writev_buffer = test_writev_buffer, + .close = test_fclose +}; + +static QEMUFile *dest_file; + +static const QEMUFileOps empty_ops = { }; + +struct CompressData { + uint8_t *ram_addr; + QEMUFile *file; + z_stream stream; +}; +typedef struct CompressData CompressData; + +static int compress_request_size(void) +{ + return sizeof(CompressData); +} + +static int compress_request_init(void *request) +{ + CompressData *cd = request; + + if (deflateInit(&cd->stream, 1) != Z_OK) { + return -1; + } + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return 0; +} + +static void compress_request_uninit(void *request) +{ + CompressData *cd = request; + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); +} + +static void compress_thread_data_handler(void *request) +{ + CompressData *cd = request; + int blen; + + blen = qemu_put_compression_data(cd->file, &cd->stream, cd->ram_addr, + PAGE_SIZE); + if (blen < 0) { + error_report("compressed data failed!"); + qemu_file_set_error(dest_file, blen); + } +} + +struct CompressStats { + unsigned long pages; + unsigned long compressed_size; +}; +typedef struct CompressStats CompressStats; + +static CompressStats comp_stats; + +static void compress_thread_data_done(void *request) +{ + CompressData *cd = request; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(dest_file, cd->file); + + comp_stats.pages++; + comp_stats.compressed_size += bytes_xmit; +} + +static ThreadedWorkqueueOps ops = { + .thread_get_request_size = compress_request_size, + .thread_request_init = compress_request_init, + .thread_request_uninit = compress_request_uninit, + .thread_request_handler = compress_thread_data_handler, + .thread_request_done = compress_thread_data_done, +}; + +static void compress_threads_save_cleanup(Threads *threads) +{ + threaded_workqueue_destroy(threads); + qemu_fclose(dest_file); +} + +static Threads *compress_threads_save_setup(int threads_nr, int requests_nr) +{ + Threads *compress_threads; + + dest_file = qemu_fopen_ops(NULL, &test_write_ops); + compress_threads = threaded_workqueue_create("compress", threads_nr, + requests_nr, &ops); + assert(compress_threads); + return compress_threads; +} + +static void compress_page_with_multi_thread(Threads *threads, uint8_t *addr) +{ + CompressData *cd; + +retry: + cd = threaded_workqueue_get_request(threads); + if (!cd) { + goto retry; + } + + cd->ram_addr = addr; + threaded_workqueue_submit_request(threads, cd); +} + +static void run(Threads *threads, uint8_t *mem, unsigned long mem_size, + int repeated_count) +{ + uint8_t *ptr = mem, *end = mem + mem_size; + uint64_t start_ts, spend, total_ts = 0, pages = mem_size >> PAGE_SHIFT; + double rate; + int i; + + for (i = 0; i < repeated_count; i++) { + ptr = mem; + memset(&comp_stats, 0, sizeof(comp_stats)); + + start_ts = g_get_monotonic_time(); + for (ptr = mem; ptr < end; ptr += PAGE_SIZE) { + *ptr = 0x10; + compress_page_with_multi_thread(threads, ptr); + } + threaded_workqueue_wait_for_requests(threads); + spend = g_get_monotonic_time() - start_ts; + total_ts += spend; + + if (comp_stats.pages != pages) { + printf("ERROR: pages are compressed %ld, expect %ld.\n", + comp_stats.pages, pages); + exit(-1); + } + + rate = (double)(comp_stats.pages * PAGE_SIZE) / + comp_stats.compressed_size; + printf("RUN %d: Request # %ld Cost %ld, Compression Rate %f.\n", i, + comp_stats.pages, spend, rate); + } + + printf("AVG: Time Cost %ld.\n", total_ts / repeated_count); +} + +static void usage(const char *arg0) +{ + printf("\nThreaded Workqueue Benchmark.\n"); + printf("Usage:\n"); + printf(" %s [OPTIONS]\n", arg0); + printf("Options:\n"); + printf(" -t the number of threads (default %d).\n", + DEFAULT_THREAD_NR); + printf(" -r: the number of requests handled by each thread (default %d).\n", + DEFAULT_THREAD_REQUEST_NR); + printf(" -m: the size of the memory (G) used to test (default %dG).\n", + DEFAULT_MEM_SIZE); + printf(" -c: the repeated count (default %d).\n", + DEFAULT_REPEATED_COUNT); + printf(" -h show this help info.\n"); +} + +int main(int argc, char *argv[]) +{ + int c, threads_nr, requests_nr, repeated_count; + unsigned long mem_size; + uint8_t *mem; + Threads *threads; + + threads_nr = DEFAULT_THREAD_NR; + requests_nr = DEFAULT_THREAD_REQUEST_NR; + mem_size = DEFAULT_MEM_SIZE; + repeated_count = DEFAULT_REPEATED_COUNT; + + for (;;) { + c = getopt(argc, argv, "t:r:m:c:h"); + if (c < 0) { + break; + } + + switch (c) { + case 't': + threads_nr = atoi(optarg); + break; + case 'r': + requests_nr = atoi(optarg); + break; + case 'm': + mem_size = atol(optarg); + break; + case 'c': + repeated_count = atoi(optarg); + break; + default: + printf("Unkown option: %c.\n", c); + case 'h': + usage(argv[0]); + return -1; + } + } + + printf("Run the benchmark: threads %d requests-per-thread: %d memory %ldG repeat %d.\n", + threads_nr, requests_nr, mem_size, repeated_count); + + mem_size = mem_size << 30; + mem = qemu_memalign(PAGE_SIZE, mem_size); + memset(mem, 0, mem_size); + + threads = compress_threads_save_setup(threads_nr, requests_nr); + run(threads, mem, mem_size, repeated_count); + compress_threads_save_cleanup(threads); + return 0; +}