diff mbox

[4/6] throttle: Add throttle group support

Message ID bfb885fd48da3e13a60ef962f0e0a7cbbd757480.1426000801.git.berto@igalia.com
State New
Headers show

Commit Message

Alberto Garcia March 10, 2015, 3:30 p.m. UTC
The throttle group support use a cooperative round robin scheduling
algorithm.

The principles of the algorithm are simple:
- Each BDS of the group is used as a token in a circular way.
- The active BDS computes if a wait must be done and arms the right
  timer.
- If a wait must be done the token timer will be armed so the token
  will become the next active BDS.

Signed-off-by: Alberto Garcia <berto@igalia.com>
---
 block.c                         |  65 ++++++++++----------
 block/qapi.c                    |   5 +-
 block/throttle-groups.c         | 127 ++++++++++++++++++++++++++++++++++++++++
 blockdev.c                      |  19 +++++-
 hmp.c                           |   4 +-
 include/block/block.h           |   3 +-
 include/block/block_int.h       |   9 +--
 include/block/throttle-groups.h |   4 ++
 qapi/block-core.json            |   4 +-
 qemu-options.hx                 |   1 +
 qmp-commands.hx                 |   3 +-
 11 files changed, 198 insertions(+), 46 deletions(-)

Comments

Stefan Hajnoczi March 24, 2015, 4:03 p.m. UTC | #1
On Tue, Mar 10, 2015 at 05:30:48PM +0200, Alberto Garcia wrote:
> @@ -179,10 +179,11 @@ static void bdrv_throttle_write_timer_cb(void *opaque)
>  }
>  
>  /* should be called before bdrv_set_io_limits if a limit is set */
> -void bdrv_io_limits_enable(BlockDriverState *bs)
> +void bdrv_io_limits_enable(BlockDriverState *bs, const char *group)
>  {
>      assert(!bs->io_limits_enabled);
> -    throttle_init(&bs->throttle_state);
> +
> +    throttle_group_register_bs(bs, group ? group : bdrv_get_device_name(bs));

Please don't allow a NULL group argument.  blockdev_init() should pick
the group name instead of requiring bdrv_io_limits_enable() to generate
a unique name.

This way we avoid silently adding all BDS without a BlockBackend to a ""
throttling group.  I think that's a bug waiting to happen :).

> @@ -201,29 +219,7 @@ static void bdrv_io_limits_intercept(BlockDriverState *bs,
>                                       unsigned int bytes,
>                                       bool is_write)
>  {
> -    /* does this io must wait */
> -    bool must_wait = throttle_schedule_timer(&bs->throttle_state,
> -                                             &bs->throttle_timers,
> -                                             is_write);
> -
> -    /* if must wait or any request of this type throttled queue the IO */
> -    if (must_wait ||
> -        !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
> -        qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
> -    }
> -
> -    /* the IO will be executed, do the accounting */
> -    throttle_account(&bs->throttle_state, is_write, bytes);
> -
> -
> -    /* if the next request must wait -> do nothing */
> -    if (throttle_schedule_timer(&bs->throttle_state, &bs->throttle_timers,
> -                                is_write)) {
> -        return;
> -    }
> -
> -    /* else queue next request for execution */
> -    qemu_co_queue_next(&bs->throttled_reqs[is_write]);
> +    throttle_group_io_limits_intercept(bs, bytes, is_write);
>  }

bdrv_io_limits_intercept() is no longer useful.  Can you replace
bdrv_io_limits_intercept() calls with
throttle_group_io_limits_intercept() to eliminate this indirection?

>  
>  size_t bdrv_opt_mem_align(BlockDriverState *bs)
> @@ -2050,15 +2046,16 @@ static void bdrv_move_feature_fields(BlockDriverState *bs_dest,
>      bs_dest->enable_write_cache = bs_src->enable_write_cache;
>  
>      /* i/o throttled req */
> -    memcpy(&bs_dest->throttle_state,
> -           &bs_src->throttle_state,
> -           sizeof(ThrottleState));
> +    bs_dest->throttle_state     = bs_src->throttle_state,
> +    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
> +    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
> +    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
> +    memcpy(&bs_dest->round_robin,
> +           &bs_src->round_robin,
> +           sizeof(bs_dest->round_robin));

The bdrv_swap() code is tricky to think about...I think this is okay
because bs_dest and bs_src linked list pointers will be unchanged
(although they are now pointing to a different block driver instance).

Just highlighting this in case anyone else spots a problem with the
pointer swapping.

> @@ -145,6 +147,131 @@ static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
>      return next;
>  }
>  
> +/* Return the next BlockDriverState in the round-robin sequence with
> + * pending I/O requests.
> + *
> + * @bs:        the current BlockDriverState
> + * @is_write:  the type of operation (read/write)
> + * @ret:       the next BlockDriverState with pending requests, or bs
> + *             if there is none.

Assumes tg->lock is held.  It's helpful to document this.
Stefan Hajnoczi March 24, 2015, 4:31 p.m. UTC | #2
On Tue, Mar 10, 2015 at 05:30:48PM +0200, Alberto Garcia wrote:
> +/* Return the next BlockDriverState in the round-robin sequence with
> + * pending I/O requests.
> + *
> + * @bs:        the current BlockDriverState
> + * @is_write:  the type of operation (read/write)
> + * @ret:       the next BlockDriverState with pending requests, or bs
> + *             if there is none.
> + */
> +static BlockDriverState *next_throttle_token(BlockDriverState *bs,
> +                                             bool is_write)
> +{
> +    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
> +    BlockDriverState *token, *start;
> +
> +    start = token = tg->tokens[is_write];
> +
> +    /* get next bs round in round robin style */
> +    token = throttle_group_next_bs(token);
> +    while (token != start  &&
> +           qemu_co_queue_empty(&token->throttled_reqs[is_write])) {

It's not safe to access bs->throttled_reqs[].  There are many of other
places that access bs->throttled_reqs[] without holding tg->lock (e.g.
block.c).

throttled_reqs needs to be protected by tg->lock in order for this to
work.

> +/* Check if an I/O request needs to be throttled, wait and set a timer
> + * if necessary, and schedule the next request using a round robin
> + * algorithm.
> + *
> + * @bs:        the current BlockDriverState
> + * @bytes:     the number of bytes for this I/O
> + * @is_write:  the type of operation (read/write)
> + */
> +void throttle_group_io_limits_intercept(BlockDriverState *bs,
> +                                        unsigned int bytes,
> +                                        bool is_write)

This function must be called from coroutine context because it uses
qemu_co_queue_wait() and may yield.  Please mark the function with
coroutine_fn (see include/block/coroutine.h).  This serves as
documentation.

> +{
> +    bool must_wait;
> +    BlockDriverState *token;
> +
> +    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
> +    qemu_mutex_lock(&tg->lock);
> +
> +    /* First we check if this I/O has to be throttled. */
> +    token = next_throttle_token(bs, is_write);
> +    must_wait = throttle_group_schedule_timer(token, is_write);
> +
> +    /* Wait if there's a timer set or queued requests of this type */
> +    if (must_wait || !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
> +        qemu_mutex_unlock(&tg->lock);
> +        qemu_co_queue_wait(&bs->throttled_reqs[is_write]);

Here bs->throttled_reqs[] is used without holding tg->lock.  It can race
with token->throttled_reqs[] users.
Alberto Garcia March 24, 2015, 5:48 p.m. UTC | #3
On Tue, Mar 24, 2015 at 04:31:45PM +0000, Stefan Hajnoczi wrote:

> > +    /* get next bs round in round robin style */
> > +    token = throttle_group_next_bs(token);
> > +    while (token != start  &&
> > +           qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
> 
> It's not safe to access bs->throttled_reqs[].  There are many of
> other places that access bs->throttled_reqs[] without holding
> tg->lock (e.g. block.c).
>
> throttled_reqs needs to be protected by tg->lock in order for this
> to work.

Good catch, but I don't think that's the solution. throttled_reqs
cannot be protected by that lock because it must release it before
calling qemu_co_queue_wait(&bs->throttled_reqs[is_write]). Otherwise
it will cause a deadlock.

My assumption was that throttled_reqs would be protected by the BDS's
AioContext lock, but I overlooked the fact that this part of the
algorithm needs to access other BDSs' queues so we indeed have a
problem.

I will give it a thought to see what I can come up with. Since we only
need to check whether other BDSs have pending requests maybe I can
implement this with a flag.

Thanks!

Berto
Stefan Hajnoczi March 25, 2015, 12:03 p.m. UTC | #4
On Tue, Mar 24, 2015 at 06:48:37PM +0100, Alberto Garcia wrote:
> On Tue, Mar 24, 2015 at 04:31:45PM +0000, Stefan Hajnoczi wrote:
> 
> > > +    /* get next bs round in round robin style */
> > > +    token = throttle_group_next_bs(token);
> > > +    while (token != start  &&
> > > +           qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
> > 
> > It's not safe to access bs->throttled_reqs[].  There are many of
> > other places that access bs->throttled_reqs[] without holding
> > tg->lock (e.g. block.c).
> >
> > throttled_reqs needs to be protected by tg->lock in order for this
> > to work.
> 
> Good catch, but I don't think that's the solution. throttled_reqs
> cannot be protected by that lock because it must release it before
> calling qemu_co_queue_wait(&bs->throttled_reqs[is_write]). Otherwise
> it will cause a deadlock.
> 
> My assumption was that throttled_reqs would be protected by the BDS's
> AioContext lock, but I overlooked the fact that this part of the
> algorithm needs to access other BDSs' queues so we indeed have a
> problem.
> 
> I will give it a thought to see what I can come up with. Since we only
> need to check whether other BDSs have pending requests maybe I can
> implement this with a flag.

A flag is a good solution since it is not possible to acquire other BDS'
AioContext lock from arbitrary block layer code (it's only allowed when
the QEMU global mutex is held).

Stefan
diff mbox

Patch

diff --git a/block.c b/block.c
index da0cb48..cd8582f 100644
--- a/block.c
+++ b/block.c
@@ -36,6 +36,7 @@ 
 #include "qmp-commands.h"
 #include "qemu/timer.h"
 #include "qapi-event.h"
+#include "block/throttle-groups.h"
 
 #ifdef CONFIG_BSD
 #include <sys/types.h>
@@ -130,7 +131,7 @@  void bdrv_set_io_limits(BlockDriverState *bs,
 {
     int i;
 
-    throttle_config(&bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_config(bs->throttle_state, &bs->throttle_timers, cfg);
 
     for (i = 0; i < 2; i++) {
         qemu_co_enter_next(&bs->throttled_reqs[i]);
@@ -160,9 +161,8 @@  static bool bdrv_start_throttled_reqs(BlockDriverState *bs)
 void bdrv_io_limits_disable(BlockDriverState *bs)
 {
     bs->io_limits_enabled = false;
-
     bdrv_start_throttled_reqs(bs);
-
+    throttle_group_unregister_bs(bs);
     throttle_timers_destroy(&bs->throttle_timers);
 }
 
@@ -179,10 +179,11 @@  static void bdrv_throttle_write_timer_cb(void *opaque)
 }
 
 /* should be called before bdrv_set_io_limits if a limit is set */
-void bdrv_io_limits_enable(BlockDriverState *bs)
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group)
 {
     assert(!bs->io_limits_enabled);
-    throttle_init(&bs->throttle_state);
+
+    throttle_group_register_bs(bs, group ? group : bdrv_get_device_name(bs));
     throttle_timers_init(&bs->throttle_timers,
                          bdrv_get_aio_context(bs),
                          QEMU_CLOCK_VIRTUAL,
@@ -192,6 +193,23 @@  void bdrv_io_limits_enable(BlockDriverState *bs)
     bs->io_limits_enabled = true;
 }
 
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group)
+{
+    /* this bs is not part of any group */
+    if (!bs->throttle_state) {
+        return;
+    }
+
+    /* this bs is a part of the same group than the one we want */
+    if (!g_strcmp0(throttle_group_get_name(bs->throttle_state), group)) {
+        return;
+    }
+
+    /* need to change the group this bs belong to */
+    bdrv_io_limits_disable(bs);
+    bdrv_io_limits_enable(bs, group);
+}
+
 /* This function makes an IO wait if needed
  *
  * @nb_sectors: the number of sectors of the IO
@@ -201,29 +219,7 @@  static void bdrv_io_limits_intercept(BlockDriverState *bs,
                                      unsigned int bytes,
                                      bool is_write)
 {
-    /* does this io must wait */
-    bool must_wait = throttle_schedule_timer(&bs->throttle_state,
-                                             &bs->throttle_timers,
-                                             is_write);
-
-    /* if must wait or any request of this type throttled queue the IO */
-    if (must_wait ||
-        !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
-        qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
-    }
-
-    /* the IO will be executed, do the accounting */
-    throttle_account(&bs->throttle_state, is_write, bytes);
-
-
-    /* if the next request must wait -> do nothing */
-    if (throttle_schedule_timer(&bs->throttle_state, &bs->throttle_timers,
-                                is_write)) {
-        return;
-    }
-
-    /* else queue next request for execution */
-    qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+    throttle_group_io_limits_intercept(bs, bytes, is_write);
 }
 
 size_t bdrv_opt_mem_align(BlockDriverState *bs)
@@ -2050,15 +2046,16 @@  static void bdrv_move_feature_fields(BlockDriverState *bs_dest,
     bs_dest->enable_write_cache = bs_src->enable_write_cache;
 
     /* i/o throttled req */
-    memcpy(&bs_dest->throttle_state,
-           &bs_src->throttle_state,
-           sizeof(ThrottleState));
+    bs_dest->throttle_state     = bs_src->throttle_state,
+    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
+    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
+    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
+    memcpy(&bs_dest->round_robin,
+           &bs_src->round_robin,
+           sizeof(bs_dest->round_robin));
     memcpy(&bs_dest->throttle_timers,
            &bs_src->throttle_timers,
            sizeof(ThrottleTimers));
-    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
-    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
-    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
 
     /* r/w error */
     bs_dest->on_read_error      = bs_src->on_read_error;
diff --git a/block/qapi.c b/block/qapi.c
index 1808e67..abeeb38 100644
--- a/block/qapi.c
+++ b/block/qapi.c
@@ -24,6 +24,7 @@ 
 
 #include "block/qapi.h"
 #include "block/block_int.h"
+#include "block/throttle-groups.h"
 #include "block/write-threshold.h"
 #include "qmp-commands.h"
 #include "qapi-visit.h"
@@ -63,7 +64,9 @@  BlockDeviceInfo *bdrv_block_device_info(BlockDriverState *bs)
 
     if (bs->io_limits_enabled) {
         ThrottleConfig cfg;
-        throttle_get_config(&bs->throttle_state, &cfg);
+
+        throttle_group_get_config(bs->throttle_state, &cfg);
+
         info->bps     = cfg.buckets[THROTTLE_BPS_TOTAL].avg;
         info->bps_rd  = cfg.buckets[THROTTLE_BPS_READ].avg;
         info->bps_wr  = cfg.buckets[THROTTLE_BPS_WRITE].avg;
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index e355fed..9075c46 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -23,6 +23,8 @@ 
  */
 
 #include "block/throttle-groups.h"
+#include "qemu/queue.h"
+#include "qemu/thread.h"
 
 /* The ThrottleGroup structure (with its ThrottleState) is shared
  * among different BlockDriverState and it's independent from
@@ -145,6 +147,131 @@  static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
     return next;
 }
 
+/* Return the next BlockDriverState in the round-robin sequence with
+ * pending I/O requests.
+ *
+ * @bs:        the current BlockDriverState
+ * @is_write:  the type of operation (read/write)
+ * @ret:       the next BlockDriverState with pending requests, or bs
+ *             if there is none.
+ */
+static BlockDriverState *next_throttle_token(BlockDriverState *bs,
+                                             bool is_write)
+{
+    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+    BlockDriverState *token, *start;
+
+    start = token = tg->tokens[is_write];
+
+    /* get next bs round in round robin style */
+    token = throttle_group_next_bs(token);
+    while (token != start  &&
+           qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = throttle_group_next_bs(token);
+    }
+
+    /* If no IO are queued for scheduling on the next round robin token
+     * then decide the token is the current bs because chances are
+     * the current bs get the current request queued.
+     */
+    if (token == start &&
+        qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = bs;
+    }
+
+    return token;
+}
+
+/* Check if the next I/O request for a BlockDriverState needs to be
+ * throttled or not. If there's no timer set in this group, set one
+ * and update the token accordingly.
+ *
+ * @bs:         the current BlockDriverState
+ * @is_write:   the type of operation (read/write)
+ * @ret:        whether the I/O request needs to be throttled or not
+ */
+static bool throttle_group_schedule_timer(BlockDriverState *bs,
+                                          bool is_write)
+{
+    ThrottleState *ts = bs->throttle_state;
+    ThrottleTimers *tt = &bs->throttle_timers;
+    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+    bool must_wait;
+    BlockDriverState *iter;
+
+    /* Check if any of the timers in this group is already armed */
+    QLIST_FOREACH(iter, &tg->head, round_robin) {
+        if (timer_pending(iter->throttle_timers.timers[is_write])) {
+            return true;
+        }
+    }
+
+    must_wait = throttle_schedule_timer(ts, tt, is_write);
+
+    /* If a timer just got armed, set bs as the current token */
+    if (must_wait) {
+        tg->tokens[is_write] = bs;
+    }
+
+    return must_wait;
+}
+
+/* Check if an I/O request needs to be throttled, wait and set a timer
+ * if necessary, and schedule the next request using a round robin
+ * algorithm.
+ *
+ * @bs:        the current BlockDriverState
+ * @bytes:     the number of bytes for this I/O
+ * @is_write:  the type of operation (read/write)
+ */
+void throttle_group_io_limits_intercept(BlockDriverState *bs,
+                                        unsigned int bytes,
+                                        bool is_write)
+{
+    bool must_wait;
+    BlockDriverState *token;
+
+    ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+    qemu_mutex_lock(&tg->lock);
+
+    /* First we check if this I/O has to be throttled. */
+    token = next_throttle_token(bs, is_write);
+    must_wait = throttle_group_schedule_timer(token, is_write);
+
+    /* Wait if there's a timer set or queued requests of this type */
+    if (must_wait || !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+        qemu_mutex_unlock(&tg->lock);
+        qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
+        qemu_mutex_lock(&tg->lock);
+    }
+
+    /* The I/O will be executed, so do the accounting */
+    throttle_account(bs->throttle_state, is_write, bytes);
+
+    /* Now we check if there's any pending request to schedule next */
+    token = next_throttle_token(bs, is_write);
+    if (!qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+
+        /* If it doesn't have to wait, queue it for immediate execution */
+        must_wait = throttle_group_schedule_timer(token, is_write);
+
+        if (!must_wait) {
+            /* Give preference to requests from the current bs */
+            if (!qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+                token = bs;
+                qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+            } else {
+                ThrottleTimers *tt = &token->throttle_timers;
+                int64_t now = qemu_clock_get_ns(tt->clock_type);
+                timer_mod(tt->timers[is_write], now + 1);
+            }
+            tg->tokens[is_write] = token;
+        }
+    }
+
+    qemu_mutex_unlock(&tg->lock);
+}
+
 /* Update the throttle configuration for a particular group. Similar
  * to throttle_config(), but guarantees atomicity within the
  * throttling group.
diff --git a/blockdev.c b/blockdev.c
index b9c1c0c..89fc7a8 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -357,6 +357,7 @@  static BlockBackend *blockdev_init(const char *file, QDict *bs_opts,
     const char *id;
     bool has_driver_specific_opts;
     BlockdevDetectZeroesOptions detect_zeroes;
+    const char *throttling_group;
 
     /* Check common options by copying from bs_opts to opts, all other options
      * stay in bs_opts for processing by bdrv_open(). */
@@ -459,6 +460,8 @@  static BlockBackend *blockdev_init(const char *file, QDict *bs_opts,
 
     cfg.op_size = qemu_opt_get_number(opts, "throttling.iops-size", 0);
 
+    throttling_group = qemu_opt_get(opts, "throttling.group");
+
     if (!check_throttle_config(&cfg, &error)) {
         error_propagate(errp, error);
         goto early_err;
@@ -547,7 +550,7 @@  static BlockBackend *blockdev_init(const char *file, QDict *bs_opts,
 
     /* disk I/O throttling */
     if (throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, throttling_group);
         bdrv_set_io_limits(bs, &cfg);
     }
 
@@ -711,6 +714,8 @@  DriveInfo *drive_new(QemuOpts *all_opts, BlockInterfaceType block_default_type)
 
         { "iops_size",      "throttling.iops-size" },
 
+        { "group",          "throttling.group" },
+
         { "readonly",       "read-only" },
     };
 
@@ -1877,7 +1882,9 @@  void qmp_block_set_io_throttle(const char *device, int64_t bps, int64_t bps_rd,
                                bool has_iops_wr_max,
                                int64_t iops_wr_max,
                                bool has_iops_size,
-                               int64_t iops_size, Error **errp)
+                               int64_t iops_size,
+                               bool has_group,
+                               const char *group, Error **errp)
 {
     ThrottleConfig cfg;
     BlockDriverState *bs;
@@ -1929,9 +1936,11 @@  void qmp_block_set_io_throttle(const char *device, int64_t bps, int64_t bps_rd,
     aio_context_acquire(aio_context);
 
     if (!bs->io_limits_enabled && throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, has_group ? group : NULL);
     } else if (bs->io_limits_enabled && !throttle_enabled(&cfg)) {
         bdrv_io_limits_disable(bs);
+    } else if (bs->io_limits_enabled && throttle_enabled(&cfg)) {
+        bdrv_io_limits_update_group(bs, has_group ? group : NULL);
     }
 
     if (bs->io_limits_enabled) {
@@ -2991,6 +3000,10 @@  QemuOptsList qemu_common_drive_opts = {
             .type = QEMU_OPT_NUMBER,
             .help = "when limiting by iops max size of an I/O in bytes",
         },{
+            .name = "throttling.group",
+            .type = QEMU_OPT_STRING,
+            .help = "name of the block throttling group",
+        },{
             .name = "copy-on-read",
             .type = QEMU_OPT_BOOL,
             .help = "copy read data from backing file into image file",
diff --git a/hmp.c b/hmp.c
index 71c28bc..b81e4ba 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1261,7 +1261,9 @@  void hmp_block_set_io_throttle(Monitor *mon, const QDict *qdict)
                               false,
                               0,
                               false, /* No default I/O size */
-                              0, &err);
+                              0,
+                              false,
+                              NULL, &err);
     hmp_handle_error(mon, &err);
 }
 
diff --git a/include/block/block.h b/include/block/block.h
index 649c269..2bb6222 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -162,8 +162,9 @@  void bdrv_stats_print(Monitor *mon, const QObject *data);
 void bdrv_info_stats(Monitor *mon, QObject **ret_data);
 
 /* disk I/O throttling */
-void bdrv_io_limits_enable(BlockDriverState *bs);
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group);
 void bdrv_io_limits_disable(BlockDriverState *bs);
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group);
 
 void bdrv_init(void);
 void bdrv_init_with_whitelist(void);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 1314fd3..c02e963 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -358,12 +358,13 @@  struct BlockDriverState {
     /* number of in-flight serialising requests */
     unsigned int serialising_in_flight;
 
-    /* I/O throttling */
-    ThrottleState throttle_state;
-    ThrottleTimers throttle_timers;
-    CoQueue      throttled_reqs[2];
+    /* I/O throttling - following elements protected by ThrottleGroup lock */
+    ThrottleState *throttle_state;
     bool         io_limits_enabled;
+    CoQueue      throttled_reqs[2];
     QLIST_ENTRY(BlockDriverState) round_robin;
+    /* timers have their own locking */
+    ThrottleTimers throttle_timers;
 
     /* I/O stats (display with "info blockstats"). */
     BlockAcctStats stats;
diff --git a/include/block/throttle-groups.h b/include/block/throttle-groups.h
index 23a172f..ddb962b 100644
--- a/include/block/throttle-groups.h
+++ b/include/block/throttle-groups.h
@@ -38,4 +38,8 @@  void throttle_group_get_config(ThrottleState *ts, ThrottleConfig *cfg);
 void throttle_group_register_bs(BlockDriverState *bs, const char *groupname);
 void throttle_group_unregister_bs(BlockDriverState *bs);
 
+void throttle_group_io_limits_intercept(BlockDriverState *bs,
+                                        unsigned int bytes,
+                                        bool is_write);
+
 #endif
diff --git a/qapi/block-core.json b/qapi/block-core.json
index a3fdaf0..487b147 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -989,6 +989,8 @@ 
 #
 # @iops_size: #optional an I/O size in bytes (Since 1.7)
 #
+# @group: #optional throttle group name (Since 2.3)
+#
 # Returns: Nothing on success
 #          If @device is not a valid block device, DeviceNotFound
 #
@@ -1000,7 +1002,7 @@ 
             '*bps_max': 'int', '*bps_rd_max': 'int',
             '*bps_wr_max': 'int', '*iops_max': 'int',
             '*iops_rd_max': 'int', '*iops_wr_max': 'int',
-            '*iops_size': 'int' } }
+            '*iops_size': 'int', '*group': 'str' } }
 
 ##
 # @block-stream:
diff --git a/qemu-options.hx b/qemu-options.hx
index 837624d..6db0613 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -454,6 +454,7 @@  DEF("drive", HAS_ARG, QEMU_OPTION_drive,
     "       [[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n"
     "       [[,iops_max=im]|[[,iops_rd_max=irm][,iops_wr_max=iwm]]]\n"
     "       [[,iops_size=is]]\n"
+    "       [[,group=g]]\n"
     "                use 'file' as a drive image\n", QEMU_ARCH_ALL)
 STEXI
 @item -drive @var{option}[,@var{option}[,@var{option}[,...]]]
diff --git a/qmp-commands.hx b/qmp-commands.hx
index c12334a..d866d57 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -1704,7 +1704,7 @@  EQMP
 
     {
         .name       = "block_set_io_throttle",
-        .args_type  = "device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?",
+        .args_type  = "device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?,group:s?",
         .mhandler.cmd_new = qmp_marshal_input_block_set_io_throttle,
     },
 
@@ -1730,6 +1730,7 @@  Arguments:
 - "iops_rd_max":  read I/O operations max (json-int)
 - "iops_wr_max":  write I/O operations max (json-int)
 - "iops_size":  I/O size in bytes when limiting (json-int)
+- "group": throttle group name (json-string)
 
 Example: