diff mbox

[v2,3/3] Add rate limiting of RTC_CHANGE, BALLOON_CHANGE & WATCHDOG events

Message ID 1337619593-25823-4-git-send-email-berrange@redhat.com
State New
Headers show

Commit Message

Daniel P. Berrangé May 21, 2012, 4:59 p.m. UTC
From: "Daniel P. Berrange" <berrange@redhat.com>

Allow certain event types to be rate limited to avoid flooding
monitor clients. The monitor_protocol_event() method is changed
such that instead of immediately emitting the event to Monitor
instances, it will call a new monitor_protocol_event_queue()
method.

This will check to see if the rate limit for the event has been
exceeded, and if so schedule a timer to wakeup at the end of the
rate limit period. If further events arrive before the timer fires,
the previously queued event will be discarded in favour of the new
event. The event will eventually be emitted when the timer fires.

This logic is applied to RTC_CHANGE, BALLOON_CHANGE & WATCHDOG
events, since the data associated with these events is stateless

 * monitor.c: Add support for rate limiting
 * monitor.h: Define monitor_global_init for one-time setup tasks
 * vl.c: Invoke monitor_global_init
 * trace-events: Add hooks for monitor event tracing

Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
---
 monitor.c    |  162 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 monitor.h    |    1 +
 trace-events |    5 ++
 vl.c         |    2 +
 4 files changed, 164 insertions(+), 6 deletions(-)

Comments

Luiz Capitulino May 30, 2012, 6:50 p.m. UTC | #1
On Mon, 21 May 2012 17:59:53 +0100
"Daniel P. Berrange" <berrange@redhat.com> wrote:

> From: "Daniel P. Berrange" <berrange@redhat.com>
> 
> Allow certain event types to be rate limited to avoid flooding
> monitor clients. The monitor_protocol_event() method is changed
> such that instead of immediately emitting the event to Monitor
> instances, it will call a new monitor_protocol_event_queue()
> method.
> 
> This will check to see if the rate limit for the event has been
> exceeded, and if so schedule a timer to wakeup at the end of the
> rate limit period. If further events arrive before the timer fires,
> the previously queued event will be discarded in favour of the new
> event. The event will eventually be emitted when the timer fires.
> 
> This logic is applied to RTC_CHANGE, BALLOON_CHANGE & WATCHDOG
> events, since the data associated with these events is stateless
> 
>  * monitor.c: Add support for rate limiting
>  * monitor.h: Define monitor_global_init for one-time setup tasks
>  * vl.c: Invoke monitor_global_init
>  * trace-events: Add hooks for monitor event tracing
> 
> Signed-off-by: Daniel P. Berrange <berrange@redhat.com>

I've talked with Anthony he doesn't have objections. I've reviewed this again
and have some minor comments below.

> ---
>  monitor.c    |  162 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  monitor.h    |    1 +
>  trace-events |    5 ++
>  vl.c         |    2 +
>  4 files changed, 164 insertions(+), 6 deletions(-)
> 
> diff --git a/monitor.c b/monitor.c
> index 75fd4cf..5135966 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -66,6 +66,7 @@
>  #include "memory.h"
>  #include "qmp-commands.h"
>  #include "hmp.h"
> +#include "qemu-thread.h"
>  
>  /* for pic/irq_info */
>  #if defined(TARGET_SPARC)
> @@ -145,6 +146,19 @@ typedef struct MonitorControl {
>      int command_mode;
>  } MonitorControl;
>  
> +/*
> + * To prevent flooding clients, events can be throttled. The
> + * throttling is calculating globally, rather than per-Monitor
> + * instance.
> + */

calculated?

> +typedef struct MonitorEventState {
> +    MonitorEvent event; /* Event being tracked */
> +    int64_t rate;       /* Period over which to throttle. 0 to disable */
> +    int64_t last;       /* Time at which event was last emitted */
> +    QEMUTimer *timer;   /* Timer for handling delayed events */
> +    QObject *data;      /* Event pending delayed dispatch */
> +} MonitorEventState;
> +
>  struct Monitor {
>      CharDriverState *chr;
>      int mux_out;
> @@ -447,6 +461,141 @@ static const char *monitor_event_names[] = {
>  };
>  QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)
>  
> +MonitorEventState monitor_event_state[QEVENT_MAX];
> +QemuMutex monitor_event_state_lock;
> +
> +/*
> + * Emits the event to every monitor instance
> + */
> +static void
> +monitor_protocol_event_emit(MonitorEvent event,
> +                            QObject *data)
> +{
> +    Monitor *mon;
> +
> +    trace_monitor_protocol_event_emit(event, data);
> +    QLIST_FOREACH(mon, &mon_list, entry) {
> +        if (monitor_ctrl_mode(mon) && qmp_cmd_mode(mon)) {
> +            monitor_json_emitter(mon, data);
> +        }
> +    }
> +}
> +
> +
> +/*
> + * Queue a new event for emission to Monitor instances,
> + * applying any rate limiting if required.
> + */
> +static void
> +monitor_protocol_event_queue(MonitorEvent event,
> +                             QObject *data)
> +{
> +    MonitorEventState *evstate;
> +    int64_t now = qemu_get_clock_ns(rt_clock);
> +    assert(event < QEVENT_MAX);
> +
> +    qemu_mutex_lock(&monitor_event_state_lock);
> +    evstate = &(monitor_event_state[event]);
> +    trace_monitor_protocol_event_queue(event,
> +                                       data,
> +                                       evstate->rate,
> +                                       evstate->last,
> +                                       now);
> +
> +    /* Rate limit of 0 indicates no throttling */
> +    if (!evstate->rate) {
> +        monitor_protocol_event_emit(event, data);
> +        evstate->last = now;
> +    } else {
> +        int64_t delta = now - evstate->last;
> +        if (evstate->data ||
> +            delta < evstate->rate) {
> +            /* If there's an existing event pending, replace
> +             * it with the new event, otherwise schedule a
> +             * timer for delayed emission
> +             */
> +            if (evstate->data) {
> +                qobject_decref(evstate->data);
> +            } else {
> +                int64_t then = evstate->last + evstate->rate;
> +                qemu_mod_timer_ns(evstate->timer, then);
> +            }
> +            evstate->data = data;
> +            qobject_incref(evstate->data);
> +        } else {
> +            monitor_protocol_event_emit(event, data);
> +            evstate->last = now;
> +        }
> +    }
> +    qemu_mutex_unlock(&monitor_event_state_lock);
> +}
> +
> +
> +/*
> + * The callback invoked by QemuTimer when a delayed
> + * event is ready to be emitted
> + */
> +static void monitor_protocol_event_handler(void *opaque)
> +{
> +    MonitorEventState *evstate = opaque;
> +    int64_t now = qemu_get_clock_ns(rt_clock);
> +
> +    qemu_mutex_lock(&monitor_event_state_lock);
> +
> +    trace_monitor_protocol_event_handler(evstate->event,
> +                                         evstate->data,
> +                                         evstate->last,
> +                                         now);
> +    if (evstate->data) {
> +        monitor_protocol_event_emit(evstate->event, evstate->data);
> +        qobject_decref(evstate->data);
> +        evstate->data = NULL;
> +    }
> +    evstate->last = now;
> +    qemu_mutex_unlock(&monitor_event_state_lock);
> +}
> +
> +
> +/*
> + * @event: the event ID to be limited
> + * @rate: the rate limit in milliseconds
> + *
> + * Sets a rate limit on a particular event, so no
> + * more than 1 event will be emitted within @rate
> + * milliseconds
> + */
> +static void
> +monitor_protocol_event_throttle(MonitorEvent event,
> +                                int64_t rate)
> +{
> +    MonitorEventState *evstate;
> +    assert(event < QEVENT_MAX);
> +
> +    evstate = &(monitor_event_state[event]);
> +
> +    trace_monitor_protocol_event_throttle(event, rate);
> +    evstate->event = event;
> +    evstate->rate = rate * SCALE_MS;
> +    evstate->timer = qemu_new_timer(rt_clock,
> +                                    SCALE_MS,
> +                                    monitor_protocol_event_handler,
> +                                    evstate);
> +    evstate->last = 0;
> +    evstate->data = NULL;
> +}
> +
> +
> +/* Global, one-time initializer to configure the rate limiting
> + * and initialize state */
> +static void monitor_protocol_event_init(void)
> +{
> +    qemu_mutex_init(&monitor_event_state_lock);
> +    /* Limit RTC & BALLOON events to 1 per second */
> +    monitor_protocol_event_throttle(QEVENT_RTC_CHANGE, 1000);
> +    monitor_protocol_event_throttle(QEVENT_BALLOON_CHANGE, 1000);
> +    monitor_protocol_event_throttle(QEVENT_WATCHDOG, 1000);

What about SUSPENDED and BLOCK_IO_ERROR? Couldn't the former be also
used by a malicious guest to cause a DoS? The former is already emitted
several times for virtio.

> +}
> +
>  /**
>   * monitor_protocol_event(): Generate a Monitor event
>   *
> @@ -456,7 +605,6 @@ void monitor_protocol_event(MonitorEvent event, QObject *data)
>  {
>      QDict *qmp;
>      const char *event_name;
> -    Monitor *mon;
>  
>      assert(event < QEVENT_MAX);
>  
> @@ -471,11 +619,8 @@ void monitor_protocol_event(MonitorEvent event, QObject *data)
>          qdict_put_obj(qmp, "data", data);
>      }
>  
> -    QLIST_FOREACH(mon, &mon_list, entry) {
> -        if (monitor_ctrl_mode(mon) && qmp_cmd_mode(mon)) {
> -            monitor_json_emitter(mon, QOBJECT(qmp));
> -        }
> -    }
> +    trace_monitor_protocol_event(event, event_name, qmp);
> +    monitor_protocol_event_queue(event, QOBJECT(qmp));
>      QDECREF(qmp);
>  }
>  
> @@ -4564,6 +4709,11 @@ static void sortcmdlist(void)
>   * End:
>   */
>  
> +void monitor_global_init(void)
> +{

It's better to call it monitor_early_init() (or monitor_init_early()).

> +    monitor_protocol_event_init();
> +}
> +
>  void monitor_init(CharDriverState *chr, int flags)
>  {
>      static int is_first_init = 1;
> diff --git a/monitor.h b/monitor.h
> index 5f4de1b..3342cb4 100644
> --- a/monitor.h
> +++ b/monitor.h
> @@ -51,6 +51,7 @@ typedef enum MonitorEvent {
>  
>  int monitor_cur_is_qmp(void);
>  
> +void monitor_global_init(void);
>  void monitor_protocol_event(MonitorEvent event, QObject *data);
>  void monitor_init(CharDriverState *chr, int flags);
>  
> diff --git a/trace-events b/trace-events
> index 87cb96c..449b8ee 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -641,6 +641,11 @@ esp_mem_writeb_cmd_ensel(uint32_t val) "Enable selection (%2.2x)"
>  # monitor.c
>  handle_qmp_command(void *mon, const char *cmd_name) "mon %p cmd_name \"%s\""
>  monitor_protocol_emitter(void *mon) "mon %p"
> +monitor_protocol_event(uint32_t event, const char *evname, void *data) "event=%d name \"%s\" data %p"
> +monitor_protocol_event_handler(uint32_t event, void *data, uint64_t last, uint64_t now) "event=%d data=%p last=%" PRId64 " now=%" PRId64
> +monitor_protocol_event_emit(uint32_t event, void *data) "event=%d data=%p"
> +monitor_protocol_event_queue(uint32_t event, void *data, uint64_t rate, uint64_t last, uint64_t now) "event=%d data=%p rate=%" PRId64 " last=%" PRId64 " now=%" PRId64
> +monitor_protocol_event_throttle(uint32_t event, uint64_t rate) "event=%d rate=%" PRId64
>  
>  # hw/opencores_eth.c
>  open_eth_mii_write(unsigned idx, uint16_t v) "MII[%02x] <- %04x"
> diff --git a/vl.c b/vl.c
> index 23ab3a3..70eedfa 100644
> --- a/vl.c
> +++ b/vl.c
> @@ -3208,6 +3208,8 @@ int main(int argc, char **argv, char **envp)
>      }
>      loc_set_none();
>  
> +    monitor_global_init();
> +
>      /* Init CPU def lists, based on config
>       * - Must be called after all the qemu_read_config_file() calls
>       * - Must be called before list_cpus()
Daniel P. Berrangé June 8, 2012, 4:48 p.m. UTC | #2
On Wed, May 30, 2012 at 03:50:37PM -0300, Luiz Capitulino wrote:
> On Mon, 21 May 2012 17:59:53 +0100
> "Daniel P. Berrange" <berrange@redhat.com> wrote:

> > +/* Global, one-time initializer to configure the rate limiting
> > + * and initialize state */
> > +static void monitor_protocol_event_init(void)
> > +{
> > +    qemu_mutex_init(&monitor_event_state_lock);
> > +    /* Limit RTC & BALLOON events to 1 per second */
> > +    monitor_protocol_event_throttle(QEVENT_RTC_CHANGE, 1000);
> > +    monitor_protocol_event_throttle(QEVENT_BALLOON_CHANGE, 1000);
> > +    monitor_protocol_event_throttle(QEVENT_WATCHDOG, 1000);
> 
> What about SUSPENDED and BLOCK_IO_ERROR? Couldn't the former be also
> used by a malicious guest to cause a DoS? The former is already emitted
> several times for virtio.

This can't be used to filter BLOCK_IO_ERROR, since that event
contains per-device state information. Filtering this would
need to be done in the block layer, so it can done per device.

I don't think SUSPEND can be used to DoS, since once the VM
is in the suspend state, a monitor command is required to wake
it up again before the guest OS can trigger a new suspend.

> > @@ -4564,6 +4709,11 @@ static void sortcmdlist(void)
> >   * End:
> >   */
> >  
> > +void monitor_global_init(void)
> > +{
> 
> It's better to call it monitor_early_init() (or monitor_init_early()).

Hmm, I chose this name because wanted to make it clear
that this applied to all monitor instances, vs monitor_init
which is per-monitor.

Daniel
Luiz Capitulino June 11, 2012, 5:22 p.m. UTC | #3
On Fri, 8 Jun 2012 17:48:56 +0100
"Daniel P. Berrange" <berrange@redhat.com> wrote:

> On Wed, May 30, 2012 at 03:50:37PM -0300, Luiz Capitulino wrote:
> > On Mon, 21 May 2012 17:59:53 +0100
> > "Daniel P. Berrange" <berrange@redhat.com> wrote:
> 
> > > +/* Global, one-time initializer to configure the rate limiting
> > > + * and initialize state */
> > > +static void monitor_protocol_event_init(void)
> > > +{
> > > +    qemu_mutex_init(&monitor_event_state_lock);
> > > +    /* Limit RTC & BALLOON events to 1 per second */
> > > +    monitor_protocol_event_throttle(QEVENT_RTC_CHANGE, 1000);
> > > +    monitor_protocol_event_throttle(QEVENT_BALLOON_CHANGE, 1000);
> > > +    monitor_protocol_event_throttle(QEVENT_WATCHDOG, 1000);
> > 
> > What about SUSPENDED and BLOCK_IO_ERROR? Couldn't the former be also
> > used by a malicious guest to cause a DoS? The former is already emitted
> > several times for virtio.
> 
> This can't be used to filter BLOCK_IO_ERROR, since that event
> contains per-device state information. Filtering this would
> need to be done in the block layer, so it can done per device.

That's right.

> I don't think SUSPEND can be used to DoS, since once the VM
> is in the suspend state, a monitor command is required to wake
> it up again before the guest OS can trigger a new suspend.

Can't the guest OS awake itself?
Daniel P. Berrangé June 13, 2012, 2:53 p.m. UTC | #4
On Mon, Jun 11, 2012 at 02:22:05PM -0300, Luiz Capitulino wrote:
> On Fri, 8 Jun 2012 17:48:56 +0100
> "Daniel P. Berrange" <berrange@redhat.com> wrote:
> 
> > On Wed, May 30, 2012 at 03:50:37PM -0300, Luiz Capitulino wrote:
> > > On Mon, 21 May 2012 17:59:53 +0100
> > > "Daniel P. Berrange" <berrange@redhat.com> wrote:
> > 
> > > > +/* Global, one-time initializer to configure the rate limiting
> > > > + * and initialize state */
> > > > +static void monitor_protocol_event_init(void)
> > > > +{
> > > > +    qemu_mutex_init(&monitor_event_state_lock);
> > > > +    /* Limit RTC & BALLOON events to 1 per second */
> > > > +    monitor_protocol_event_throttle(QEVENT_RTC_CHANGE, 1000);
> > > > +    monitor_protocol_event_throttle(QEVENT_BALLOON_CHANGE, 1000);
> > > > +    monitor_protocol_event_throttle(QEVENT_WATCHDOG, 1000);
> > > 
> > > What about SUSPENDED and BLOCK_IO_ERROR? Couldn't the former be also
> > > used by a malicious guest to cause a DoS? The former is already emitted
> > > several times for virtio.
> > 
> > This can't be used to filter BLOCK_IO_ERROR, since that event
> > contains per-device state information. Filtering this would
> > need to be done in the block layer, so it can done per device.
> 
> That's right.
> 
> > I don't think SUSPEND can be used to DoS, since once the VM
> > is in the suspend state, a monitor command is required to wake
> > it up again before the guest OS can trigger a new suspend.
> 
> Can't the guest OS awake itself?

I didn't think so. Even if it can, we can't rate limit SUSPEND
events in isolation, because they must be strictly ordered
wrt RESUME events.

Daniel
Paolo Bonzini June 13, 2012, 2:57 p.m. UTC | #5
Il 13/06/2012 16:53, Daniel P. Berrange ha scritto:
>>> > > I don't think SUSPEND can be used to DoS, since once the VM
>>> > > is in the suspend state, a monitor command is required to wake
>>> > > it up again before the guest OS can trigger a new suspend.
>> > 
>> > Can't the guest OS awake itself?
> I didn't think so. Even if it can, we can't rate limit SUSPEND
> events in isolation, because they must be strictly ordered
> wrt RESUME events.

It can program the RTC to awake the OS, but only at 1 wakeup/second.

Paolo
Daniel P. Berrangé June 13, 2012, 3:04 p.m. UTC | #6
On Wed, May 30, 2012 at 03:50:37PM -0300, Luiz Capitulino wrote:
> On Mon, 21 May 2012 17:59:53 +0100
> "Daniel P. Berrange" <berrange@redhat.com> wrote:
> > @@ -4564,6 +4709,11 @@ static void sortcmdlist(void)
> >   * End:
> >   */
> >  
> > +void monitor_global_init(void)
> > +{
> 
> It's better to call it monitor_early_init() (or monitor_init_early()).

This was a bit of stupidity on my part. The existing
'monitor_init' function (which is called per monitor
instance), already has a code block where one-time
global initialization is done. So I'm removing this
function in the next patch posting.

Daniel
Daniel P. Berrangé June 13, 2012, 3:06 p.m. UTC | #7
On Wed, Jun 13, 2012 at 04:57:28PM +0200, Paolo Bonzini wrote:
> Il 13/06/2012 16:53, Daniel P. Berrange ha scritto:
> >>> > > I don't think SUSPEND can be used to DoS, since once the VM
> >>> > > is in the suspend state, a monitor command is required to wake
> >>> > > it up again before the guest OS can trigger a new suspend.
> >> > 
> >> > Can't the guest OS awake itself?
> > I didn't think so. Even if it can, we can't rate limit SUSPEND
> > events in isolation, because they must be strictly ordered
> > wrt RESUME events.
> 
> It can program the RTC to awake the OS, but only at 1 wakeup/second.

Ah that hardware rate limit is good - we don't need todo separate
throttling with this event then.

Daniel
Paolo Bonzini June 13, 2012, 3:35 p.m. UTC | #8
Il 13/06/2012 17:06, Daniel P. Berrange ha scritto:
> On Wed, Jun 13, 2012 at 04:57:28PM +0200, Paolo Bonzini wrote:
>> Il 13/06/2012 16:53, Daniel P. Berrange ha scritto:
>>>>>>> I don't think SUSPEND can be used to DoS, since once the VM
>>>>>>> is in the suspend state, a monitor command is required to wake
>>>>>>> it up again before the guest OS can trigger a new suspend.
>>>>>
>>>>> Can't the guest OS awake itself?
>>> I didn't think so. Even if it can, we can't rate limit SUSPEND
>>> events in isolation, because they must be strictly ordered
>>> wrt RESUME events.
>>
>> It can program the RTC to awake the OS, but only at 1 wakeup/second.
> 
> Ah that hardware rate limit is good - we don't need todo separate
> throttling with this event then.

Hmm, the ACPI PMTimer could be worse though.  Perhaps we could write a
kvm-unit-test and time how many wakeups we can do per second.

Paolo
diff mbox

Patch

diff --git a/monitor.c b/monitor.c
index 75fd4cf..5135966 100644
--- a/monitor.c
+++ b/monitor.c
@@ -66,6 +66,7 @@ 
 #include "memory.h"
 #include "qmp-commands.h"
 #include "hmp.h"
+#include "qemu-thread.h"
 
 /* for pic/irq_info */
 #if defined(TARGET_SPARC)
@@ -145,6 +146,19 @@  typedef struct MonitorControl {
     int command_mode;
 } MonitorControl;
 
+/*
+ * To prevent flooding clients, events can be throttled. The
+ * throttling is calculating globally, rather than per-Monitor
+ * instance.
+ */
+typedef struct MonitorEventState {
+    MonitorEvent event; /* Event being tracked */
+    int64_t rate;       /* Period over which to throttle. 0 to disable */
+    int64_t last;       /* Time at which event was last emitted */
+    QEMUTimer *timer;   /* Timer for handling delayed events */
+    QObject *data;      /* Event pending delayed dispatch */
+} MonitorEventState;
+
 struct Monitor {
     CharDriverState *chr;
     int mux_out;
@@ -447,6 +461,141 @@  static const char *monitor_event_names[] = {
 };
 QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)
 
+MonitorEventState monitor_event_state[QEVENT_MAX];
+QemuMutex monitor_event_state_lock;
+
+/*
+ * Emits the event to every monitor instance
+ */
+static void
+monitor_protocol_event_emit(MonitorEvent event,
+                            QObject *data)
+{
+    Monitor *mon;
+
+    trace_monitor_protocol_event_emit(event, data);
+    QLIST_FOREACH(mon, &mon_list, entry) {
+        if (monitor_ctrl_mode(mon) && qmp_cmd_mode(mon)) {
+            monitor_json_emitter(mon, data);
+        }
+    }
+}
+
+
+/*
+ * Queue a new event for emission to Monitor instances,
+ * applying any rate limiting if required.
+ */
+static void
+monitor_protocol_event_queue(MonitorEvent event,
+                             QObject *data)
+{
+    MonitorEventState *evstate;
+    int64_t now = qemu_get_clock_ns(rt_clock);
+    assert(event < QEVENT_MAX);
+
+    qemu_mutex_lock(&monitor_event_state_lock);
+    evstate = &(monitor_event_state[event]);
+    trace_monitor_protocol_event_queue(event,
+                                       data,
+                                       evstate->rate,
+                                       evstate->last,
+                                       now);
+
+    /* Rate limit of 0 indicates no throttling */
+    if (!evstate->rate) {
+        monitor_protocol_event_emit(event, data);
+        evstate->last = now;
+    } else {
+        int64_t delta = now - evstate->last;
+        if (evstate->data ||
+            delta < evstate->rate) {
+            /* If there's an existing event pending, replace
+             * it with the new event, otherwise schedule a
+             * timer for delayed emission
+             */
+            if (evstate->data) {
+                qobject_decref(evstate->data);
+            } else {
+                int64_t then = evstate->last + evstate->rate;
+                qemu_mod_timer_ns(evstate->timer, then);
+            }
+            evstate->data = data;
+            qobject_incref(evstate->data);
+        } else {
+            monitor_protocol_event_emit(event, data);
+            evstate->last = now;
+        }
+    }
+    qemu_mutex_unlock(&monitor_event_state_lock);
+}
+
+
+/*
+ * The callback invoked by QemuTimer when a delayed
+ * event is ready to be emitted
+ */
+static void monitor_protocol_event_handler(void *opaque)
+{
+    MonitorEventState *evstate = opaque;
+    int64_t now = qemu_get_clock_ns(rt_clock);
+
+    qemu_mutex_lock(&monitor_event_state_lock);
+
+    trace_monitor_protocol_event_handler(evstate->event,
+                                         evstate->data,
+                                         evstate->last,
+                                         now);
+    if (evstate->data) {
+        monitor_protocol_event_emit(evstate->event, evstate->data);
+        qobject_decref(evstate->data);
+        evstate->data = NULL;
+    }
+    evstate->last = now;
+    qemu_mutex_unlock(&monitor_event_state_lock);
+}
+
+
+/*
+ * @event: the event ID to be limited
+ * @rate: the rate limit in milliseconds
+ *
+ * Sets a rate limit on a particular event, so no
+ * more than 1 event will be emitted within @rate
+ * milliseconds
+ */
+static void
+monitor_protocol_event_throttle(MonitorEvent event,
+                                int64_t rate)
+{
+    MonitorEventState *evstate;
+    assert(event < QEVENT_MAX);
+
+    evstate = &(monitor_event_state[event]);
+
+    trace_monitor_protocol_event_throttle(event, rate);
+    evstate->event = event;
+    evstate->rate = rate * SCALE_MS;
+    evstate->timer = qemu_new_timer(rt_clock,
+                                    SCALE_MS,
+                                    monitor_protocol_event_handler,
+                                    evstate);
+    evstate->last = 0;
+    evstate->data = NULL;
+}
+
+
+/* Global, one-time initializer to configure the rate limiting
+ * and initialize state */
+static void monitor_protocol_event_init(void)
+{
+    qemu_mutex_init(&monitor_event_state_lock);
+    /* Limit RTC & BALLOON events to 1 per second */
+    monitor_protocol_event_throttle(QEVENT_RTC_CHANGE, 1000);
+    monitor_protocol_event_throttle(QEVENT_BALLOON_CHANGE, 1000);
+    monitor_protocol_event_throttle(QEVENT_WATCHDOG, 1000);
+}
+
 /**
  * monitor_protocol_event(): Generate a Monitor event
  *
@@ -456,7 +605,6 @@  void monitor_protocol_event(MonitorEvent event, QObject *data)
 {
     QDict *qmp;
     const char *event_name;
-    Monitor *mon;
 
     assert(event < QEVENT_MAX);
 
@@ -471,11 +619,8 @@  void monitor_protocol_event(MonitorEvent event, QObject *data)
         qdict_put_obj(qmp, "data", data);
     }
 
-    QLIST_FOREACH(mon, &mon_list, entry) {
-        if (monitor_ctrl_mode(mon) && qmp_cmd_mode(mon)) {
-            monitor_json_emitter(mon, QOBJECT(qmp));
-        }
-    }
+    trace_monitor_protocol_event(event, event_name, qmp);
+    monitor_protocol_event_queue(event, QOBJECT(qmp));
     QDECREF(qmp);
 }
 
@@ -4564,6 +4709,11 @@  static void sortcmdlist(void)
  * End:
  */
 
+void monitor_global_init(void)
+{
+    monitor_protocol_event_init();
+}
+
 void monitor_init(CharDriverState *chr, int flags)
 {
     static int is_first_init = 1;
diff --git a/monitor.h b/monitor.h
index 5f4de1b..3342cb4 100644
--- a/monitor.h
+++ b/monitor.h
@@ -51,6 +51,7 @@  typedef enum MonitorEvent {
 
 int monitor_cur_is_qmp(void);
 
+void monitor_global_init(void);
 void monitor_protocol_event(MonitorEvent event, QObject *data);
 void monitor_init(CharDriverState *chr, int flags);
 
diff --git a/trace-events b/trace-events
index 87cb96c..449b8ee 100644
--- a/trace-events
+++ b/trace-events
@@ -641,6 +641,11 @@  esp_mem_writeb_cmd_ensel(uint32_t val) "Enable selection (%2.2x)"
 # monitor.c
 handle_qmp_command(void *mon, const char *cmd_name) "mon %p cmd_name \"%s\""
 monitor_protocol_emitter(void *mon) "mon %p"
+monitor_protocol_event(uint32_t event, const char *evname, void *data) "event=%d name \"%s\" data %p"
+monitor_protocol_event_handler(uint32_t event, void *data, uint64_t last, uint64_t now) "event=%d data=%p last=%" PRId64 " now=%" PRId64
+monitor_protocol_event_emit(uint32_t event, void *data) "event=%d data=%p"
+monitor_protocol_event_queue(uint32_t event, void *data, uint64_t rate, uint64_t last, uint64_t now) "event=%d data=%p rate=%" PRId64 " last=%" PRId64 " now=%" PRId64
+monitor_protocol_event_throttle(uint32_t event, uint64_t rate) "event=%d rate=%" PRId64
 
 # hw/opencores_eth.c
 open_eth_mii_write(unsigned idx, uint16_t v) "MII[%02x] <- %04x"
diff --git a/vl.c b/vl.c
index 23ab3a3..70eedfa 100644
--- a/vl.c
+++ b/vl.c
@@ -3208,6 +3208,8 @@  int main(int argc, char **argv, char **envp)
     }
     loc_set_none();
 
+    monitor_global_init();
+
     /* Init CPU def lists, based on config
      * - Must be called after all the qemu_read_config_file() calls
      * - Must be called before list_cpus()