diff mbox

[7/9] throttle: Add throttle group support

Message ID 142929b6b5adbf371309043d23864c22b0afdec4.1423842044.git.berto@igalia.com
State New
Headers show

Commit Message

Alberto Garcia Feb. 13, 2015, 4:06 p.m. UTC
The throttle group support use a cooperative round robin scheduling algorithm.

The principles of the algorithm are simple:
- Each BDS of the group is used as a token in a circular way.
- The active BDS compute if a wait must be done and arm the right timer.
- If a wait must be done the token timer will be armed so the token will become
  the next active BDS.

Signed-off-by: Alberto Garcia <berto@igalia.com>
---
 block.c                   | 212 +++++++++++++++++++++++++++++++++++++++-------
 block/qapi.c              |   7 +-
 block/throttle-groups.c   |   2 +-
 blockdev.c                |  19 ++++-
 hmp.c                     |   4 +-
 include/block/block.h     |   3 +-
 include/block/block_int.h |   9 +-
 qapi/block-core.json      |   5 +-
 qemu-options.hx           |   1 +
 qmp-commands.hx           |   3 +-
 10 files changed, 220 insertions(+), 45 deletions(-)

Comments

Eric Blake Feb. 24, 2015, 4:45 p.m. UTC | #1
On 02/13/2015 09:06 AM, Alberto Garcia wrote:
> The throttle group support use a cooperative round robin scheduling algorithm.
> 
> The principles of the algorithm are simple:
> - Each BDS of the group is used as a token in a circular way.
> - The active BDS compute if a wait must be done and arm the right timer.
> - If a wait must be done the token timer will be armed so the token will become
>   the next active BDS.
> 
> Signed-off-by: Alberto Garcia <berto@igalia.com>
> ---
>  block.c                   | 212 +++++++++++++++++++++++++++++++++++++++-------
>  block/qapi.c              |   7 +-
>  block/throttle-groups.c   |   2 +-
>  blockdev.c                |  19 ++++-
>  hmp.c                     |   4 +-
>  include/block/block.h     |   3 +-
>  include/block/block_int.h |   9 +-
>  qapi/block-core.json      |   5 +-
>  qemu-options.hx           |   1 +
>  qmp-commands.hx           |   3 +-

Just a qapi review for now...


> +++ b/qapi/block-core.json
> @@ -989,6 +989,9 @@
>  #
>  # @iops_size: #optional an I/O size in bytes (Since 1.7)
>  #
> +#

Why the extra blank line?

> +# @group: #optional throttle group name (Since 2.3)
> +#
>  # Returns: Nothing on success
>  #          If @device is not a valid block device, DeviceNotFound
>  #
> @@ -1000,7 +1003,7 @@
>              '*bps_max': 'int', '*bps_rd_max': 'int',
>              '*bps_wr_max': 'int', '*iops_max': 'int',
>              '*iops_rd_max': 'int', '*iops_wr_max': 'int',
> -            '*iops_size': 'int' } }
> +            '*iops_size': 'int', '*group': 'str' } }

What is the default if no 'group' is specified, unthrottled?

I hate write-only interfaces.  What is the corresponding 'query-*'
command that I can invoke to learn which group, if any, a node is
associated with?
Eric Blake Feb. 24, 2015, 4:47 p.m. UTC | #2
On 02/24/2015 09:45 AM, Eric Blake wrote:
> On 02/13/2015 09:06 AM, Alberto Garcia wrote:
>> The throttle group support use a cooperative round robin scheduling algorithm.
>>
>>  qapi/block-core.json      |   5 +-
>>  qemu-options.hx           |   1 +
>>  qmp-commands.hx           |   3 +-
> 
> Just a qapi review for now...
> 

> 
> I hate write-only interfaces.  What is the corresponding 'query-*'
> command that I can invoke to learn which group, if any, a node is
> associated with?

Okay, I hit send too soon.  I see this is part of BlockDeviceInfo, which
is in turn part of BlockInfo returned by 'query-block'.  So this
addition is the output side, and there is nothing that yet exists to
associate a node with a throttle group on the input side (assuming that
comes later in 8/9 or 9/9).
Stefan Hajnoczi March 3, 2015, 9 p.m. UTC | #3
On Fri, Feb 13, 2015 at 06:06:15PM +0200, Alberto Garcia wrote:
> The throttle group support use a cooperative round robin scheduling algorithm.
> 
> The principles of the algorithm are simple:
> - Each BDS of the group is used as a token in a circular way.
> - The active BDS compute if a wait must be done and arm the right timer.
> - If a wait must be done the token timer will be armed so the token will become
>   the next active BDS.
> 
> Signed-off-by: Alberto Garcia <berto@igalia.com>
> ---
>  block.c                   | 212 +++++++++++++++++++++++++++++++++++++++-------
>  block/qapi.c              |   7 +-
>  block/throttle-groups.c   |   2 +-
>  blockdev.c                |  19 ++++-
>  hmp.c                     |   4 +-
>  include/block/block.h     |   3 +-
>  include/block/block_int.h |   9 +-
>  qapi/block-core.json      |   5 +-
>  qemu-options.hx           |   1 +
>  qmp-commands.hx           |   3 +-
>  10 files changed, 220 insertions(+), 45 deletions(-)
> 
> diff --git a/block.c b/block.c
> index 642ef04..625f1c8 100644
> --- a/block.c
> +++ b/block.c
> @@ -36,6 +36,7 @@
>  #include "qmp-commands.h"
>  #include "qemu/timer.h"
>  #include "qapi-event.h"
> +#include "block/throttle-groups.h"
>  
>  #ifdef CONFIG_BSD
>  #include <sys/types.h>
> @@ -130,7 +131,9 @@ void bdrv_set_io_limits(BlockDriverState *bs,
>  {
>      int i;
>  
> -    throttle_config(&bs->throttle_state, &bs->throttle_timers, cfg);
> +    throttle_group_lock(bs->throttle_state);
> +    throttle_config(bs->throttle_state, &bs->throttle_timers, cfg);
> +    throttle_group_unlock(bs->throttle_state);
>  
>      for (i = 0; i < 2; i++) {
>          qemu_co_enter_next(&bs->throttled_reqs[i]);
> @@ -157,34 +160,99 @@ static bool bdrv_start_throttled_reqs(BlockDriverState *bs)
>      return drained;
>  }
>  
> +static void bdrv_throttle_group_add(BlockDriverState *bs)
> +{
> +    int i;
> +    BlockDriverState *token;
> +
> +    for (i = 0; i < 2; i++) {
> +        /* Get the BlockDriverState having the round robin token */
> +        token = throttle_group_token(bs->throttle_state, i);
> +
> +        /* If the ThrottleGroup is new set the current BlockDriverState as
> +         * token
> +         */
> +        if (!token) {
> +            throttle_group_set_token(bs->throttle_state, bs, i);
> +        }
> +
> +    }

Does it make sense to move this inside throttle_group_register_bs()?
Then bdrv_throttle_group_add() could be dropped and
throttle_group_register_bs() is called directly.

> +
> +    throttle_group_register_bs(bs->throttle_state, bs);
> +}
> +
> +static void bdrv_throttle_group_remove(BlockDriverState *bs)

The name is a hint that this doesn't belong in block.c.  This function
should be throttle_group_unregister_bs().

> +{
> +    BlockDriverState *token;
> +    int i;
> +
> +    for (i = 0; i < 2; i++) {
> +        /* Get the BlockDriverState having the round robin token */
> +        token = throttle_group_token(bs->throttle_state, i);
> +        /* if this bs is the current token set the next bs as token */
> +        if (token == bs) {
> +            token = throttle_group_next_bs(token);
> +            /* take care of the case where bs is the only bs of the group */
> +            if (token == bs) {
> +                token = NULL;
> +            }
> +            throttle_group_set_token(bs->throttle_state, token, i);
> +        }
> +    }
> +
> +    /* remove the current bs from the list */
> +    QLIST_REMOVE(bs, round_robin);
> +}
> +
>  void bdrv_io_limits_disable(BlockDriverState *bs)
>  {
> +
> +    throttle_group_lock(bs->throttle_state);
>      bs->io_limits_enabled = false;
> +    throttle_group_unlock(bs->throttle_state);

What are the locking rules?  I don't understand why throttle_state must
be locked to modify bs->io_limits_enabled.

>  
>      bdrv_start_throttled_reqs(bs);
>  
> +    throttle_group_lock(bs->throttle_state);
> +    bdrv_throttle_group_remove(bs);
> +    throttle_group_unlock(bs->throttle_state);
> +
> +    throttle_group_unref(bs->throttle_state);
> +    bs->throttle_state = NULL;
> +
>      throttle_timers_destroy(&bs->throttle_timers);
>  }
>  
>  static void bdrv_throttle_read_timer_cb(void *opaque)
>  {
>      BlockDriverState *bs = opaque;
> -    throttle_timer_fired(&bs->throttle_state, false);
> +
> +    throttle_group_lock(bs->throttle_state);
> +    throttle_timer_fired(bs->throttle_state, false);
> +    throttle_group_unlock(bs->throttle_state);

This pattern suggests throttle_timer_fired() should acquire the lock
internally instead.
Alberto Garcia March 4, 2015, 1:53 p.m. UTC | #4
On Tue, Mar 03, 2015 at 03:00:06PM -0600, Stefan Hajnoczi wrote:

> > +    throttle_group_lock(bs->throttle_state);
> > +    bdrv_throttle_group_remove(bs);
> > +    throttle_group_unlock(bs->throttle_state);
> > +
> > +    throttle_group_unref(bs->throttle_state);
> > +    bs->throttle_state = NULL;
> > +
> >      throttle_timers_destroy(&bs->throttle_timers);
> >  }
> >  
> >  static void bdrv_throttle_read_timer_cb(void *opaque)
> >  {
> >      BlockDriverState *bs = opaque;
> > -    throttle_timer_fired(&bs->throttle_state, false);
> > +
> > +    throttle_group_lock(bs->throttle_state);
> > +    throttle_timer_fired(bs->throttle_state, false);
> > +    throttle_group_unlock(bs->throttle_state);
> 
> This pattern suggests throttle_timer_fired() should acquire the lock
> internally instead.

The idea is that the ThrottleState code itself doesn't know anything
about locks or groups. As I understood it Benoît designed the
ThrottleState code to be independent from the block layer and reusable
for other things (that's why it's in util/).

Berto
Stefan Hajnoczi March 4, 2015, 4:04 p.m. UTC | #5
On Wed, Mar 04, 2015 at 02:53:42PM +0100, Alberto Garcia wrote:
> On Tue, Mar 03, 2015 at 03:00:06PM -0600, Stefan Hajnoczi wrote:
> 
> > > +    throttle_group_lock(bs->throttle_state);
> > > +    bdrv_throttle_group_remove(bs);
> > > +    throttle_group_unlock(bs->throttle_state);
> > > +
> > > +    throttle_group_unref(bs->throttle_state);
> > > +    bs->throttle_state = NULL;
> > > +
> > >      throttle_timers_destroy(&bs->throttle_timers);
> > >  }
> > >  
> > >  static void bdrv_throttle_read_timer_cb(void *opaque)
> > >  {
> > >      BlockDriverState *bs = opaque;
> > > -    throttle_timer_fired(&bs->throttle_state, false);
> > > +
> > > +    throttle_group_lock(bs->throttle_state);
> > > +    throttle_timer_fired(bs->throttle_state, false);
> > > +    throttle_group_unlock(bs->throttle_state);
> > 
> > This pattern suggests throttle_timer_fired() should acquire the lock
> > internally instead.
> 
> The idea is that the ThrottleState code itself doesn't know anything
> about locks or groups. As I understood it Benoît designed the
> ThrottleState code to be independent from the block layer and reusable
> for other things (that's why it's in util/).

Then ThrottleGroup could offer an API throttle_group_timer_fired() that
does the locking.

The advantage of encapsulating locking in ThrottleGroup is that callers
don't have to remember the take the lock.  (But they must still be
careful about sequences of calls which will not be atomic.)

Stefan
Alberto Garcia March 4, 2015, 4:16 p.m. UTC | #6
On Wed, Mar 04, 2015 at 10:04:27AM -0600, Stefan Hajnoczi wrote:

> > > This pattern suggests throttle_timer_fired() should acquire the
> > > lock internally instead.
> > 
> > The idea is that the ThrottleState code itself doesn't know
> > anything about locks or groups. As I understood it Benoît
> > designed the ThrottleState code to be independent from the block
> > layer and reusable for other things (that's why it's in util/).
> 
> Then ThrottleGroup could offer an API throttle_group_timer_fired()
> that does the locking.
> 
> The advantage of encapsulating locking in ThrottleGroup is that
> callers don't have to remember the take the lock.  (But they must
> still be careful about sequences of calls which will not be atomic.)

No other code in ThrottleGroup takes the lock directly, so making this
an exception could be confusing.

Truth to be told I'm not a very big fan of the timer callback having
to deal with that in any case. The any_timer_armed flag should be
something internal to the throttling code. I'll try to figure out a
more elegant way to solve this.

Berto
Stefan Hajnoczi March 5, 2015, 5:41 p.m. UTC | #7
On Wed, Mar 04, 2015 at 05:16:51PM +0100, Alberto Garcia wrote:
> On Wed, Mar 04, 2015 at 10:04:27AM -0600, Stefan Hajnoczi wrote:
> 
> > > > This pattern suggests throttle_timer_fired() should acquire the
> > > > lock internally instead.
> > > 
> > > The idea is that the ThrottleState code itself doesn't know
> > > anything about locks or groups. As I understood it Benoît
> > > designed the ThrottleState code to be independent from the block
> > > layer and reusable for other things (that's why it's in util/).
> > 
> > Then ThrottleGroup could offer an API throttle_group_timer_fired()
> > that does the locking.
> > 
> > The advantage of encapsulating locking in ThrottleGroup is that
> > callers don't have to remember the take the lock.  (But they must
> > still be careful about sequences of calls which will not be atomic.)
> 
> No other code in ThrottleGroup takes the lock directly, so making this
> an exception could be confusing.

It shouldn't be an exception.  I'm proposing that the ThrottleGroup API
should handle locking internally instead of requiring callers to do it.

Stefan
diff mbox

Patch

diff --git a/block.c b/block.c
index 642ef04..625f1c8 100644
--- a/block.c
+++ b/block.c
@@ -36,6 +36,7 @@ 
 #include "qmp-commands.h"
 #include "qemu/timer.h"
 #include "qapi-event.h"
+#include "block/throttle-groups.h"
 
 #ifdef CONFIG_BSD
 #include <sys/types.h>
@@ -130,7 +131,9 @@  void bdrv_set_io_limits(BlockDriverState *bs,
 {
     int i;
 
-    throttle_config(&bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_lock(bs->throttle_state);
+    throttle_config(bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_unlock(bs->throttle_state);
 
     for (i = 0; i < 2; i++) {
         qemu_co_enter_next(&bs->throttled_reqs[i]);
@@ -157,34 +160,99 @@  static bool bdrv_start_throttled_reqs(BlockDriverState *bs)
     return drained;
 }
 
+static void bdrv_throttle_group_add(BlockDriverState *bs)
+{
+    int i;
+    BlockDriverState *token;
+
+    for (i = 0; i < 2; i++) {
+        /* Get the BlockDriverState having the round robin token */
+        token = throttle_group_token(bs->throttle_state, i);
+
+        /* If the ThrottleGroup is new set the current BlockDriverState as
+         * token
+         */
+        if (!token) {
+            throttle_group_set_token(bs->throttle_state, bs, i);
+        }
+
+    }
+
+    throttle_group_register_bs(bs->throttle_state, bs);
+}
+
+static void bdrv_throttle_group_remove(BlockDriverState *bs)
+{
+    BlockDriverState *token;
+    int i;
+
+    for (i = 0; i < 2; i++) {
+        /* Get the BlockDriverState having the round robin token */
+        token = throttle_group_token(bs->throttle_state, i);
+        /* if this bs is the current token set the next bs as token */
+        if (token == bs) {
+            token = throttle_group_next_bs(token);
+            /* take care of the case where bs is the only bs of the group */
+            if (token == bs) {
+                token = NULL;
+            }
+            throttle_group_set_token(bs->throttle_state, token, i);
+        }
+    }
+
+    /* remove the current bs from the list */
+    QLIST_REMOVE(bs, round_robin);
+}
+
 void bdrv_io_limits_disable(BlockDriverState *bs)
 {
+
+    throttle_group_lock(bs->throttle_state);
     bs->io_limits_enabled = false;
+    throttle_group_unlock(bs->throttle_state);
 
     bdrv_start_throttled_reqs(bs);
 
+    throttle_group_lock(bs->throttle_state);
+    bdrv_throttle_group_remove(bs);
+    throttle_group_unlock(bs->throttle_state);
+
+    throttle_group_unref(bs->throttle_state);
+    bs->throttle_state = NULL;
+
     throttle_timers_destroy(&bs->throttle_timers);
 }
 
 static void bdrv_throttle_read_timer_cb(void *opaque)
 {
     BlockDriverState *bs = opaque;
-    throttle_timer_fired(&bs->throttle_state, false);
+
+    throttle_group_lock(bs->throttle_state);
+    throttle_timer_fired(bs->throttle_state, false);
+    throttle_group_unlock(bs->throttle_state);
+
     qemu_co_enter_next(&bs->throttled_reqs[0]);
 }
 
 static void bdrv_throttle_write_timer_cb(void *opaque)
 {
     BlockDriverState *bs = opaque;
-    throttle_timer_fired(&bs->throttle_state, true);
+
+    throttle_group_lock(bs->throttle_state);
+    throttle_timer_fired(bs->throttle_state, true);
+    throttle_group_unlock(bs->throttle_state);
+
     qemu_co_enter_next(&bs->throttled_reqs[1]);
 }
 
 /* should be called before bdrv_set_io_limits if a limit is set */
-void bdrv_io_limits_enable(BlockDriverState *bs)
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group)
 {
     assert(!bs->io_limits_enabled);
-    throttle_init(&bs->throttle_state);
+    bs->throttle_state = throttle_group_incref(group ? group : bdrv_get_device_name(bs));
+
+    throttle_group_lock(bs->throttle_state);
+    bdrv_throttle_group_add(bs);
     throttle_timers_init(&bs->throttle_timers,
                          bdrv_get_aio_context(bs),
                          QEMU_CLOCK_VIRTUAL,
@@ -192,6 +260,53 @@  void bdrv_io_limits_enable(BlockDriverState *bs)
                          bdrv_throttle_write_timer_cb,
                          bs);
     bs->io_limits_enabled = true;
+    throttle_group_unlock(bs->throttle_state);
+}
+
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group)
+{
+    /* this bs is not part of any group */
+    if (!bs->throttle_state) {
+        return;
+    }
+
+    /* this bs is a part of the same group than the one we want */
+    if (throttle_group_compare(bs->throttle_state, group)) {
+        return;
+    }
+
+    /* need to change the group this bs belong to */
+    bdrv_io_limits_disable(bs);
+    bdrv_io_limits_enable(bs, group);
+}
+
+/* This implement the round robin policy and must be called under ThrottleGroup
+ * lock
+ */
+static BlockDriverState *bdrv_next_throttle_token(BlockDriverState *bs,
+                                                  bool is_write)
+{
+    BlockDriverState *token, *start;
+
+    start = token = throttle_group_token(bs->throttle_state, is_write);
+
+    /* get next bs round in round robin style */
+    token = throttle_group_next_bs(token);
+    while (token != start  &&
+           qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = throttle_group_next_bs(token);
+    }
+
+    /* If no IO are queued for scheduling on the next round robin token
+     * then decide the token is the current bs because chances are
+     * the current bs get the current request queued.
+     */
+    if (token == start &&
+        qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = bs;
+    }
+
+    return token;
 }
 
 /* This function makes an IO wait if needed
@@ -204,31 +319,63 @@  static void bdrv_io_limits_intercept(BlockDriverState *bs,
                                      bool is_write)
 {
     bool armed;
-
-    /* does this io must wait */
-    bool must_wait = throttle_schedule_timer(&bs->throttle_state,
-                                             &bs->throttle_timers,
-                                             is_write,
-                                             &armed);
-
-    /* if must wait or any request of this type throttled queue the IO */
-    if (must_wait ||
-        !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+    bool must_wait;
+    BlockDriverState *token;
+
+    throttle_group_lock(bs->throttle_state);
+
+    /* First we check if this I/O has to be throttled.
+     * If that's the case and we need to set a new timer for that,
+     * we have to do it in the next bs according to the scheduling
+     * algorithm. In that case that bs will be used as the new
+     * token. */
+    token = bdrv_next_throttle_token(bs, is_write);
+    must_wait = throttle_schedule_timer(bs->throttle_state,
+                                        &token->throttle_timers,
+                                        is_write,
+                                        &armed);
+    if (armed) {
+        throttle_group_set_token(bs->throttle_state, token, is_write);
+    }
+
+    /* Wait if there's a timer set or queued requests of this type */
+    if (must_wait || !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+        throttle_group_unlock(bs->throttle_state);
         qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
-    }
-
-    /* the IO will be executed, do the accounting */
-    throttle_account(&bs->throttle_state, is_write, bytes);
-
+        throttle_group_lock(bs->throttle_state);
+    }
+
+    /* The I/O will be executed, so do the accounting */
+    throttle_account(bs->throttle_state, is_write, bytes);
+
+    /* Now we check if there's any pending request to schedule next.
+     * If that's the case and it doesn't have to wait, we queue it for
+     * immediate execution. */
+    token = bdrv_next_throttle_token(bs, is_write);
+    if (!qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        must_wait = throttle_schedule_timer(bs->throttle_state,
+                                            &token->throttle_timers,
+                                            is_write,
+                                            &armed);
+
+        if (!must_wait) {
+            /* If we don't need to do any throttling, give preference
+             * to requests from the current bs */
+            if (!qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+                token = bs;
+                qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+            } else {
+                throttle_fire_timer(&token->throttle_timers, is_write);
+            }
+        }
 
-    /* if the next request must wait -> do nothing */
-    if (throttle_schedule_timer(&bs->throttle_state, &bs->throttle_timers,
-                                is_write, &armed)) {
-        return;
+        /* Set the new token unless a timer had already been armed */
+        if (armed || !must_wait) {
+            throttle_group_set_token(bs->throttle_state, token, is_write);
+        }
     }
 
-    /* else queue next request for execution */
-    qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+    throttle_group_unlock(bs->throttle_state);
 }
 
 size_t bdrv_opt_mem_align(BlockDriverState *bs)
@@ -2056,15 +2203,16 @@  static void bdrv_move_feature_fields(BlockDriverState *bs_dest,
     bs_dest->enable_write_cache = bs_src->enable_write_cache;
 
     /* i/o throttled req */
-    memcpy(&bs_dest->throttle_state,
-           &bs_src->throttle_state,
-           sizeof(ThrottleState));
+    bs_dest->throttle_state     = bs_src->throttle_state,
+    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
+    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
+    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
+    memcpy(&bs_dest->round_robin,
+           &bs_src->round_robin,
+           sizeof(bs_dest->round_robin));
     memcpy(&bs_dest->throttle_timers,
            &bs_src->throttle_timers,
            sizeof(ThrottleTimers));
-    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
-    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
-    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
 
     /* r/w error */
     bs_dest->on_read_error      = bs_src->on_read_error;
diff --git a/block/qapi.c b/block/qapi.c
index 1808e67..9ed3e68 100644
--- a/block/qapi.c
+++ b/block/qapi.c
@@ -24,6 +24,7 @@ 
 
 #include "block/qapi.h"
 #include "block/block_int.h"
+#include "block/throttle-groups.h"
 #include "block/write-threshold.h"
 #include "qmp-commands.h"
 #include "qapi-visit.h"
@@ -63,7 +64,11 @@  BlockDeviceInfo *bdrv_block_device_info(BlockDriverState *bs)
 
     if (bs->io_limits_enabled) {
         ThrottleConfig cfg;
-        throttle_get_config(&bs->throttle_state, &cfg);
+
+        throttle_group_lock(bs->throttle_state);
+        throttle_get_config(bs->throttle_state, &cfg);
+        throttle_group_unlock(bs->throttle_state);
+
         info->bps     = cfg.buckets[THROTTLE_BPS_TOTAL].avg;
         info->bps_rd  = cfg.buckets[THROTTLE_BPS_READ].avg;
         info->bps_wr  = cfg.buckets[THROTTLE_BPS_WRITE].avg;
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index ea5baca..399ae5e 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -154,7 +154,7 @@  void throttle_group_register_bs(ThrottleState *ts, BlockDriverState *bs)
  */
 BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
 {
-    ThrottleState *ts = &bs->throttle_state;
+    ThrottleState *ts = bs->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
     BlockDriverState *next = QLIST_NEXT(bs, round_robin);
 
diff --git a/blockdev.c b/blockdev.c
index 7d34960..135cdeb 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -361,6 +361,7 @@  static BlockBackend *blockdev_init(const char *file, QDict *bs_opts,
     bool has_driver_specific_opts;
     BlockdevDetectZeroesOptions detect_zeroes;
     BlockDriver *drv = NULL;
+    const char *throttling_group;
 
     /* Check common options by copying from bs_opts to opts, all other options
      * stay in bs_opts for processing by bdrv_open(). */
@@ -463,6 +464,8 @@  static BlockBackend *blockdev_init(const char *file, QDict *bs_opts,
 
     cfg.op_size = qemu_opt_get_number(opts, "throttling.iops-size", 0);
 
+    throttling_group = qemu_opt_get(opts, "throttling.group");
+
     if (!check_throttle_config(&cfg, &error)) {
         error_propagate(errp, error);
         goto early_err;
@@ -518,7 +521,7 @@  static BlockBackend *blockdev_init(const char *file, QDict *bs_opts,
 
     /* disk I/O throttling */
     if (throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, throttling_group);
         bdrv_set_io_limits(bs, &cfg);
     }
 
@@ -721,6 +724,8 @@  DriveInfo *drive_new(QemuOpts *all_opts, BlockInterfaceType block_default_type)
 
         { "iops_size",      "throttling.iops-size" },
 
+        { "group",          "throttling.group" },
+
         { "readonly",       "read-only" },
     };
 
@@ -1889,7 +1894,9 @@  void qmp_block_set_io_throttle(const char *device, int64_t bps, int64_t bps_rd,
                                bool has_iops_wr_max,
                                int64_t iops_wr_max,
                                bool has_iops_size,
-                               int64_t iops_size, Error **errp)
+                               int64_t iops_size,
+                               bool has_group,
+                               const char *group, Error **errp)
 {
     ThrottleConfig cfg;
     BlockDriverState *bs;
@@ -1941,9 +1948,11 @@  void qmp_block_set_io_throttle(const char *device, int64_t bps, int64_t bps_rd,
     aio_context_acquire(aio_context);
 
     if (!bs->io_limits_enabled && throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, has_group ? group : NULL);
     } else if (bs->io_limits_enabled && !throttle_enabled(&cfg)) {
         bdrv_io_limits_disable(bs);
+    } else if (bs->io_limits_enabled && throttle_enabled(&cfg)) {
+        bdrv_io_limits_update_group(bs, has_group ? group : NULL);
     }
 
     if (bs->io_limits_enabled) {
@@ -3004,6 +3013,10 @@  QemuOptsList qemu_common_drive_opts = {
             .type = QEMU_OPT_NUMBER,
             .help = "when limiting by iops max size of an I/O in bytes",
         },{
+            .name = "throttling.group",
+            .type = QEMU_OPT_STRING,
+            .help = "name of the block throttling group",
+        },{
             .name = "copy-on-read",
             .type = QEMU_OPT_BOOL,
             .help = "copy read data from backing file into image file",
diff --git a/hmp.c b/hmp.c
index b47f331..47663ce 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1228,7 +1228,9 @@  void hmp_block_set_io_throttle(Monitor *mon, const QDict *qdict)
                               false,
                               0,
                               false, /* No default I/O size */
-                              0, &err);
+                              0,
+                              false,
+                              NULL, &err);
     hmp_handle_error(mon, &err);
 }
 
diff --git a/include/block/block.h b/include/block/block.h
index 321295e..c74771f 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -162,8 +162,9 @@  void bdrv_stats_print(Monitor *mon, const QObject *data);
 void bdrv_info_stats(Monitor *mon, QObject **ret_data);
 
 /* disk I/O throttling */
-void bdrv_io_limits_enable(BlockDriverState *bs);
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group);
 void bdrv_io_limits_disable(BlockDriverState *bs);
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group);
 
 void bdrv_init(void);
 void bdrv_init_with_whitelist(void);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 28d754c..5d49ed2 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -358,12 +358,13 @@  struct BlockDriverState {
     /* number of in-flight serialising requests */
     unsigned int serialising_in_flight;
 
-    /* I/O throttling */
-    ThrottleState throttle_state;
-    ThrottleTimers throttle_timers;
-    CoQueue      throttled_reqs[2];
+    /* I/O throttling - following elements protected by ThrottleGroup lock */
+    ThrottleState *throttle_state;
     bool         io_limits_enabled;
+    CoQueue      throttled_reqs[2];
     QLIST_ENTRY(BlockDriverState) round_robin;
+    /* timers have their own locking */
+    ThrottleTimers throttle_timers;
 
     /* I/O stats (display with "info blockstats"). */
     BlockAcctStats stats;
diff --git a/qapi/block-core.json b/qapi/block-core.json
index a3fdaf0..563b11f 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -989,6 +989,9 @@ 
 #
 # @iops_size: #optional an I/O size in bytes (Since 1.7)
 #
+#
+# @group: #optional throttle group name (Since 2.3)
+#
 # Returns: Nothing on success
 #          If @device is not a valid block device, DeviceNotFound
 #
@@ -1000,7 +1003,7 @@ 
             '*bps_max': 'int', '*bps_rd_max': 'int',
             '*bps_wr_max': 'int', '*iops_max': 'int',
             '*iops_rd_max': 'int', '*iops_wr_max': 'int',
-            '*iops_size': 'int' } }
+            '*iops_size': 'int', '*group': 'str' } }
 
 ##
 # @block-stream:
diff --git a/qemu-options.hx b/qemu-options.hx
index 85ca3ad..ef8e76c 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -442,6 +442,7 @@  DEF("drive", HAS_ARG, QEMU_OPTION_drive,
     "       [[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n"
     "       [[,iops_max=im]|[[,iops_rd_max=irm][,iops_wr_max=iwm]]]\n"
     "       [[,iops_size=is]]\n"
+    "       [[,group=g]]\n"
     "                use 'file' as a drive image\n", QEMU_ARCH_ALL)
 STEXI
 @item -drive @var{option}[,@var{option}[,@var{option}[,...]]]
diff --git a/qmp-commands.hx b/qmp-commands.hx
index a85d847..2ce6af6 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -1704,7 +1704,7 @@  EQMP
 
     {
         .name       = "block_set_io_throttle",
-        .args_type  = "device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?",
+        .args_type  = "device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?,group:s?",
         .mhandler.cmd_new = qmp_marshal_input_block_set_io_throttle,
     },
 
@@ -1730,6 +1730,7 @@  Arguments:
 - "iops_rd_max":  read I/O operations max (json-int)
 - "iops_wr_max":  write I/O operations max (json-int)
 - "iops_size":  I/O size in bytes when limiting (json-int)
+- "group": throttle group name (json-string)
 
 Example: