diff mbox

[V19,06/12] quorum: Add quorum mechanism.

Message ID 1393017681-12794-7-git-send-email-benoit.canet@irqsave.net
State New
Headers show

Commit Message

Benoît Canet Feb. 21, 2014, 9:21 p.m. UTC
From: Benoît Canet <benoit@irqsave.net>

This patchset enables the core of the quorum mechanism.
The num_children reads are compared to get the majority version and if this
version exists more than threshold times the guest won't see the error at all.

If a block is corrupted or if an error occurs during an IO or if the quorum
cannot be established QMP events are used to report to the management.

Use gnutls's SHA-256 to compare versions.

--enable-quorum must be used to enable the feature.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
Reviewed-by: Max Reitz <mreitz@redhat.com>
---
 block/Makefile.objs       |   2 +-
 block/quorum.c            | 391 +++++++++++++++++++++++++++++++++++++++++++++-
 configure                 |  36 +++++
 docs/qmp/qmp-events.txt   |  36 +++++
 include/monitor/monitor.h |   2 +
 monitor.c                 |   2 +
 6 files changed, 467 insertions(+), 2 deletions(-)

Comments

Eric Blake Feb. 21, 2014, 10:09 p.m. UTC | #1
On 02/21/2014 02:21 PM, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
> 
> This patchset enables the core of the quorum mechanism.
> The num_children reads are compared to get the majority version and if this
> version exists more than threshold times the guest won't see the error at all.

> +++ b/docs/qmp/qmp-events.txt
> @@ -500,3 +500,39 @@ Example:
>  
>  Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
>  followed respectively by the RESET, SHUTDOWN, or STOP events.
> +
> +QUORUM_FAILURE
> +--------------

This should have been inserted in sorted order to match the rest of the
file, rather than at the end.  Can be rearranged in a followup patch,
now that this series is queued.

> +
> +Emitted by the Quorum block driver if it fails to establish a quorum.
> +
> +Data:
> +
> +- "reference":    device name if defined else node name.
> +- "sector-num":   Number of the first sector of the failed read operation.

I'm assuming sector-num is relative to the guest's view of the data?

> +
> +QUORUM_REPORT_BAD
> +-----------------
> +
> +Emitted to report a corruption of a Quorum file.
> +
> +Data:
> +
> +- "ret":          The IO return code.

What values is this likely to contain?  Is it a finite set, in which
case it would be nice to have a QAPI enum that describes the set of
return codes, rather than a raw number?

I'd like to make sure we are happy with the wire format of this QMP
event before it gets baked into 2.0.
Benoît Canet Feb. 21, 2014, 10:30 p.m. UTC | #2
The Friday 21 Feb 2014 à 15:09:42 (-0700), Eric Blake wrote :
> On 02/21/2014 02:21 PM, Benoît Canet wrote:
> > From: Benoît Canet <benoit@irqsave.net>
> > 
> > This patchset enables the core of the quorum mechanism.
> > The num_children reads are compared to get the majority version and if this
> > version exists more than threshold times the guest won't see the error at all.
> 
> > +++ b/docs/qmp/qmp-events.txt
> > @@ -500,3 +500,39 @@ Example:
> >  
> >  Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
> >  followed respectively by the RESET, SHUTDOWN, or STOP events.
> > +
> > +QUORUM_FAILURE
> > +--------------
> 
> This should have been inserted in sorted order to match the rest of the
> file, rather than at the end.  Can be rearranged in a followup patch,
> now that this series is queued.
> 
> > +
> > +Emitted by the Quorum block driver if it fails to establish a quorum.
> > +
> > +Data:
> > +
> > +- "reference":    device name if defined else node name.
> > +- "sector-num":   Number of the first sector of the failed read operation.
> 
> I'm assuming sector-num is relative to the guest's view of the data?
Yes,

> 
> > +
> > +QUORUM_REPORT_BAD
> > +-----------------
> > +
> > +Emitted to report a corruption of a Quorum file.
> > +
> > +Data:
> > +
> > +- "ret":          The IO return code.
> 
> What values is this likely to contain?  Is it a finite set, in which
> case it would be nice to have a QAPI enum that describes the set of
> return codes, rather than a raw number?

It's anything that the block stack could return as an error.

> 
> I'd like to make sure we are happy with the wire format of this QMP
> event before it gets baked into 2.0.

Ok

> 
> -- 
> Eric Blake   eblake redhat com    +1-919-301-3266
> Libvirt virtualization library http://libvirt.org
>
Kevin Wolf Feb. 21, 2014, 10:38 p.m. UTC | #3
Am 21.02.2014 um 23:30 hat Benoît Canet geschrieben:
> The Friday 21 Feb 2014 à 15:09:42 (-0700), Eric Blake wrote :
> > On 02/21/2014 02:21 PM, Benoît Canet wrote:
> > > From: Benoît Canet <benoit@irqsave.net>
> > > 
> > > This patchset enables the core of the quorum mechanism.
> > > The num_children reads are compared to get the majority version and if this
> > > version exists more than threshold times the guest won't see the error at all.
> > 
> > > +++ b/docs/qmp/qmp-events.txt
> > > @@ -500,3 +500,39 @@ Example:
> > >  
> > >  Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
> > >  followed respectively by the RESET, SHUTDOWN, or STOP events.
> > > +
> > > +QUORUM_FAILURE
> > > +--------------
> > 
> > This should have been inserted in sorted order to match the rest of the
> > file, rather than at the end.  Can be rearranged in a followup patch,
> > now that this series is queued.
> > 
> > > +
> > > +Emitted by the Quorum block driver if it fails to establish a quorum.
> > > +
> > > +Data:
> > > +
> > > +- "reference":    device name if defined else node name.
> > > +- "sector-num":   Number of the first sector of the failed read operation.
> > 
> > I'm assuming sector-num is relative to the guest's view of the data?
> Yes,

To be more precise: It's the sector number specified by the layer above
quorum. This can be the guest, but in a different configuration it could
be another block driver.

> > > +
> > > +QUORUM_REPORT_BAD
> > > +-----------------
> > > +
> > > +Emitted to report a corruption of a Quorum file.
> > > +
> > > +Data:
> > > +
> > > +- "ret":          The IO return code.
> > 
> > What values is this likely to contain?  Is it a finite set, in which
> > case it would be nice to have a QAPI enum that describes the set of
> > return codes, rather than a raw number?
> 
> It's anything that the block stack could return as an error.

In other words, it's meaning depends on the host and the value is only
suitable for human readers. Perhaps we could change this to strerror(),
which is, I believe, the same as error_setg_errno() does.

Kevin
Eric Blake Feb. 21, 2014, 10:41 p.m. UTC | #4
On 02/21/2014 03:30 PM, Benoît Canet wrote:

>>> +
>>> +- "ret":          The IO return code.
>>
>> What values is this likely to contain?  Is it a finite set, in which
>> case it would be nice to have a QAPI enum that describes the set of
>> return codes, rather than a raw number?
> 
> It's anything that the block stack could return as an error.

Yes, but returning a raw integer is not friendly.  What do those
integers mean?  In this patch, I found only two callers that set this
parameter:

quorum_report_bad_versions:
+            quorum_report_bad(acb, s->bs[item->index]->node_name, 0);

So the code means nothing there, because you always pass 0.

quroum_aio_cb:
     if (ret == 0) {
         acb->success_count++;
+    } else {
+        quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);

Here, ret is non-zero - but will it ever be anything other than -1?

Seriously, I think you should change this to NOT be an integer, but
instead be an enum value where the enum is tracked in qapi-schema.json.
 Change the signature of quorum_report_bad to take the enum value,
rather than a raw int.  And over the wire, the QMP event will appear
with a string encoding of the enum name, where we can add named errors
to the enum type as we come up with what different error codes we want
to distinguish.  I'm opposed to the current nebulous "it's whatever the
block stack wanted to report" without knowing what it means.  If you
can't switch to an enum, it may be better to just delete the field
entirely, if you can't justify what it is doing.  QMP-wise, we can
always add more information later, but once the release happens, we
can't take away information, even if later refactoring makes it harder
to support.
Eric Blake Feb. 21, 2014, 10:44 p.m. UTC | #5
On 02/21/2014 03:38 PM, Kevin Wolf wrote:
>>>> +- "ret":          The IO return code.
>>>
>>> What values is this likely to contain?  Is it a finite set, in which
>>> case it would be nice to have a QAPI enum that describes the set of
>>> return codes, rather than a raw number?
>>
>> It's anything that the block stack could return as an error.
> 
> In other words, it's meaning depends on the host and the value is only
> suitable for human readers. Perhaps we could change this to strerror(),
> which is, I believe, the same as error_setg_errno() does.

Ah, so you're saying that 'ret' would be a '-errno' value - in that
case, yes, converting it to string, and documenting this field as a
human-only strerror() representation of the error would also work (using
'str', rather than an enum type).
Benoît Canet Feb. 21, 2014, 10:50 p.m. UTC | #6
The Friday 21 Feb 2014 à 15:44:06 (-0700), Eric Blake wrote :
> On 02/21/2014 03:38 PM, Kevin Wolf wrote:
> >>>> +- "ret":          The IO return code.
> >>>
> >>> What values is this likely to contain?  Is it a finite set, in which
> >>> case it would be nice to have a QAPI enum that describes the set of
> >>> return codes, rather than a raw number?
> >>
> >> It's anything that the block stack could return as an error.
> > 
> > In other words, it's meaning depends on the host and the value is only
> > suitable for human readers. Perhaps we could change this to strerror(),
> > which is, I believe, the same as error_setg_errno() does.
> 
> Ah, so you're saying that 'ret' would be a '-errno' value - in that
> case, yes, converting it to string, and documenting this field as a
> human-only strerror() representation of the error would also work (using
> 'str', rather than an enum type).

How do I proceed ?

Should I respin the serie ? Or do a follow up patch ? 

> 
> -- 
> Eric Blake   eblake redhat com    +1-919-301-3266
> Libvirt virtualization library http://libvirt.org
>
Eric Blake Feb. 21, 2014, 11:32 p.m. UTC | #7
On 02/21/2014 03:50 PM, Benoît Canet wrote:
> The Friday 21 Feb 2014 à 15:44:06 (-0700), Eric Blake wrote :
>> On 02/21/2014 03:38 PM, Kevin Wolf wrote:
>>>>>> +- "ret":          The IO return code.
>>>>>
>>>>> What values is this likely to contain?  Is it a finite set, in which
>>>>> case it would be nice to have a QAPI enum that describes the set of
>>>>> return codes, rather than a raw number?
>>>>
>>>> It's anything that the block stack could return as an error.
>>>
>>> In other words, it's meaning depends on the host and the value is only
>>> suitable for human readers. Perhaps we could change this to strerror(),
>>> which is, I believe, the same as error_setg_errno() does.
>>
>> Ah, so you're saying that 'ret' would be a '-errno' value - in that
>> case, yes, converting it to string, and documenting this field as a
>> human-only strerror() representation of the error would also work (using
>> 'str', rather than an enum type).
> 
> How do I proceed ?
> 
> Should I respin the serie ? Or do a follow up patch ? 

Followup is fine, as long as we get it done before 2.0.
diff mbox

Patch

diff --git a/block/Makefile.objs b/block/Makefile.objs
index 716556f..425d7fb 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -3,7 +3,7 @@  block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
 block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
 block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
-block-obj-y += quorum.o
+block-obj-$(CONFIG_QUORUM) += quorum.o
 block-obj-y += parallels.o blkdebug.o blkverify.o
 block-obj-y += snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
diff --git a/block/quorum.c b/block/quorum.c
index fd19662..4beee96 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -13,7 +13,43 @@ 
  * See the COPYING file in the top-level directory.
  */
 
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
 #include "block/block_int.h"
+#include "qapi/qmp/qjson.h"
+
+#define HASH_LENGTH 32
+
+/* This union holds a vote hash value */
+typedef union QuorumVoteValue {
+    char h[HASH_LENGTH];       /* SHA-256 hash */
+    int64_t l;                 /* simpler 64 bits hash */
+} QuorumVoteValue;
+
+/* A vote item */
+typedef struct QuorumVoteItem {
+    int index;
+    QLIST_ENTRY(QuorumVoteItem) next;
+} QuorumVoteItem;
+
+/* this structure is a vote version. A version is the set of votes sharing the
+ * same vote value.
+ * The set of votes will be tracked with the items field and its cardinality is
+ * vote_count.
+ */
+typedef struct QuorumVoteVersion {
+    QuorumVoteValue value;
+    int index;
+    int vote_count;
+    QLIST_HEAD(, QuorumVoteItem) items;
+    QLIST_ENTRY(QuorumVoteVersion) next;
+} QuorumVoteVersion;
+
+/* this structure holds a group of vote versions together */
+typedef struct QuorumVotes {
+    QLIST_HEAD(, QuorumVoteVersion) vote_list;
+    bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
+} QuorumVotes;
 
 /* the following structure holds the state of one quorum instance */
 typedef struct BDRVQuorumState {
@@ -65,10 +101,14 @@  struct QuorumAIOCB {
     int count;                  /* number of completed AIOCB */
     int success_count;          /* number of successfully completed AIOCB */
 
+    QuorumVotes votes;
+
     bool is_read;
     int vote_ret;
 };
 
+static void quorum_vote(QuorumAIOCB *acb);
+
 static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
@@ -94,6 +134,10 @@  static void quorum_aio_finalize(QuorumAIOCB *acb)
     BDRVQuorumState *s = acb->common.bs->opaque;
     int i, ret = 0;
 
+    if (acb->vote_ret) {
+        ret = acb->vote_ret;
+    }
+
     acb->common.cb(acb->common.opaque, ret);
 
     if (acb->is_read) {
@@ -107,6 +151,16 @@  static void quorum_aio_finalize(QuorumAIOCB *acb)
     qemu_aio_release(acb);
 }
 
+static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
+{
+    return !memcmp(a->h, b->h, HASH_LENGTH);
+}
+
+static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
+{
+    return a->l == b->l;
+}
+
 static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
                                    BlockDriverState *bs,
                                    QEMUIOVector *qiov,
@@ -125,6 +179,8 @@  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
     acb->count = 0;
     acb->success_count = 0;
+    acb->votes.compare = quorum_sha256_compare;
+    QLIST_INIT(&acb->votes.vote_list);
     acb->is_read = false;
     acb->vote_ret = 0;
 
@@ -137,6 +193,48 @@  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     return acb;
 }
 
+static void quorum_report_bad(QuorumAIOCB *acb, char *node_name, int ret)
+{
+    QObject *data;
+    assert(node_name);
+    data = qobject_from_jsonf("{ 'ret': %d"
+                              ", 'node-name': %s"
+                              ", 'sector-num': %" PRId64
+                              ", 'sectors-count': %d }",
+                              ret, node_name, acb->sector_num, acb->nb_sectors);
+    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
+    qobject_decref(data);
+}
+
+static void quorum_report_failure(QuorumAIOCB *acb)
+{
+    QObject *data;
+    const char *reference = acb->common.bs->device_name[0] ?
+                            acb->common.bs->device_name :
+                            acb->common.bs->node_name;
+    data = qobject_from_jsonf("{ 'reference': %s"
+                              ", 'sector-num': %" PRId64
+                              ", 'sectors-count': %d }",
+                              reference, acb->sector_num, acb->nb_sectors);
+    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
+    qobject_decref(data);
+}
+
+static int quorum_vote_error(QuorumAIOCB *acb);
+
+static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
+{
+    BDRVQuorumState *s = acb->common.bs->opaque;
+
+    if (acb->success_count < s->threshold) {
+        acb->vote_ret = quorum_vote_error(acb);
+        quorum_report_failure(acb);
+        return true;
+    }
+
+    return false;
+}
+
 static void quorum_aio_cb(void *opaque, int ret)
 {
     QuorumChildRequest *sacb = opaque;
@@ -147,6 +245,8 @@  static void quorum_aio_cb(void *opaque, int ret)
     acb->count++;
     if (ret == 0) {
         acb->success_count++;
+    } else {
+        quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
     }
     assert(acb->count <= s->num_children);
     assert(acb->success_count <= s->num_children);
@@ -154,9 +254,298 @@  static void quorum_aio_cb(void *opaque, int ret)
         return;
     }
 
+    /* Do the vote on read */
+    if (acb->is_read) {
+        quorum_vote(acb);
+    } else {
+        quorum_has_too_much_io_failed(acb);
+    }
+
     quorum_aio_finalize(acb);
 }
 
+static void quorum_report_bad_versions(BDRVQuorumState *s,
+                                       QuorumAIOCB *acb,
+                                       QuorumVoteValue *value)
+{
+    QuorumVoteVersion *version;
+    QuorumVoteItem *item;
+
+    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
+        if (acb->votes.compare(&version->value, value)) {
+            continue;
+        }
+        QLIST_FOREACH(item, &version->items, next) {
+            quorum_report_bad(acb, s->bs[item->index]->node_name, 0);
+        }
+    }
+}
+
+static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
+{
+    int i;
+    assert(dest->niov == source->niov);
+    assert(dest->size == source->size);
+    for (i = 0; i < source->niov; i++) {
+        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
+        memcpy(dest->iov[i].iov_base,
+               source->iov[i].iov_base,
+               source->iov[i].iov_len);
+    }
+}
+
+static void quorum_count_vote(QuorumVotes *votes,
+                              QuorumVoteValue *value,
+                              int index)
+{
+    QuorumVoteVersion *v = NULL, *version = NULL;
+    QuorumVoteItem *item;
+
+    /* look if we have something with this hash */
+    QLIST_FOREACH(v, &votes->vote_list, next) {
+        if (votes->compare(&v->value, value)) {
+            version = v;
+            break;
+        }
+    }
+
+    /* It's a version not yet in the list add it */
+    if (!version) {
+        version = g_new0(QuorumVoteVersion, 1);
+        QLIST_INIT(&version->items);
+        memcpy(&version->value, value, sizeof(version->value));
+        version->index = index;
+        version->vote_count = 0;
+        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
+    }
+
+    version->vote_count++;
+
+    item = g_new0(QuorumVoteItem, 1);
+    item->index = index;
+    QLIST_INSERT_HEAD(&version->items, item, next);
+}
+
+static void quorum_free_vote_list(QuorumVotes *votes)
+{
+    QuorumVoteVersion *version, *next_version;
+    QuorumVoteItem *item, *next_item;
+
+    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
+        QLIST_REMOVE(version, next);
+        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
+            QLIST_REMOVE(item, next);
+            g_free(item);
+        }
+        g_free(version);
+    }
+}
+
+static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
+{
+    int j, ret;
+    gnutls_hash_hd_t dig;
+    QEMUIOVector *qiov = &acb->qcrs[i].qiov;
+
+    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    for (j = 0; j < qiov->niov; j++) {
+        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
+        if (ret < 0) {
+            break;
+        }
+    }
+
+    gnutls_hash_deinit(dig, (void *) hash);
+    return ret;
+}
+
+static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
+{
+    int max = 0;
+    QuorumVoteVersion *candidate, *winner = NULL;
+
+    QLIST_FOREACH(candidate, &votes->vote_list, next) {
+        if (candidate->vote_count > max) {
+            max = candidate->vote_count;
+            winner = candidate;
+        }
+    }
+
+    return winner;
+}
+
+/* qemu_iovec_compare is handy for blkverify mode because it returns the first
+ * differing byte location. Yet it is handcoded to compare vectors one byte
+ * after another so it does not benefit from the libc SIMD optimizations.
+ * quorum_iovec_compare is written for speed and should be used in the non
+ * blkverify mode of quorum.
+ */
+static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
+{
+    int i;
+    int result;
+
+    assert(a->niov == b->niov);
+    for (i = 0; i < a->niov; i++) {
+        assert(a->iov[i].iov_len == b->iov[i].iov_len);
+        result = memcmp(a->iov[i].iov_base,
+                        b->iov[i].iov_base,
+                        a->iov[i].iov_len);
+        if (result) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
+                                          const char *fmt, ...)
+{
+    va_list ap;
+
+    va_start(ap, fmt);
+    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
+            acb->sector_num, acb->nb_sectors);
+    vfprintf(stderr, fmt, ap);
+    fprintf(stderr, "\n");
+    va_end(ap);
+    exit(1);
+}
+
+static bool quorum_compare(QuorumAIOCB *acb,
+                           QEMUIOVector *a,
+                           QEMUIOVector *b)
+{
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    ssize_t offset;
+
+    /* This driver will replace blkverify in this particular case */
+    if (s->is_blkverify) {
+        offset = qemu_iovec_compare(a, b);
+        if (offset != -1) {
+            quorum_err(acb, "contents mismatch in sector %" PRId64,
+                       acb->sector_num +
+                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
+        }
+        return true;
+    }
+
+    return quorum_iovec_compare(a, b);
+}
+
+/* Do a vote to get the error code */
+static int quorum_vote_error(QuorumAIOCB *acb)
+{
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    QuorumVoteVersion *winner = NULL;
+    QuorumVotes error_votes;
+    QuorumVoteValue result_value;
+    int i, ret = 0;
+    bool error = false;
+
+    QLIST_INIT(&error_votes.vote_list);
+    error_votes.compare = quorum_64bits_compare;
+
+    for (i = 0; i < s->num_children; i++) {
+        ret = acb->qcrs[i].ret;
+        if (ret) {
+            error = true;
+            result_value.l = ret;
+            quorum_count_vote(&error_votes, &result_value, i);
+        }
+    }
+
+    if (error) {
+        winner = quorum_get_vote_winner(&error_votes);
+        ret = winner->value.l;
+    }
+
+    quorum_free_vote_list(&error_votes);
+
+    return ret;
+}
+
+static void quorum_vote(QuorumAIOCB *acb)
+{
+    bool quorum = true;
+    int i, j, ret;
+    QuorumVoteValue hash;
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    QuorumVoteVersion *winner;
+
+    if (quorum_has_too_much_io_failed(acb)) {
+        return;
+    }
+
+    /* get the index of the first successful read */
+    for (i = 0; i < s->num_children; i++) {
+        if (!acb->qcrs[i].ret) {
+            break;
+        }
+    }
+
+    assert(i < s->num_children);
+
+    /* compare this read with all other successful reads stopping at quorum
+     * failure
+     */
+    for (j = i + 1; j < s->num_children; j++) {
+        if (acb->qcrs[j].ret) {
+            continue;
+        }
+        quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
+        if (!quorum) {
+            break;
+       }
+    }
+
+    /* Every successful read agrees */
+    if (quorum) {
+        quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
+        return;
+    }
+
+    /* compute hashes for each successful read, also store indexes */
+    for (i = 0; i < s->num_children; i++) {
+        if (acb->qcrs[i].ret) {
+            continue;
+        }
+        ret = quorum_compute_hash(acb, i, &hash);
+        /* if ever the hash computation failed */
+        if (ret < 0) {
+            acb->vote_ret = ret;
+            goto free_exit;
+        }
+        quorum_count_vote(&acb->votes, &hash, i);
+    }
+
+    /* vote to select the most represented version */
+    winner = quorum_get_vote_winner(&acb->votes);
+
+    /* if the winner count is smaller than threshold the read fails */
+    if (winner->vote_count < s->threshold) {
+        quorum_report_failure(acb);
+        acb->vote_ret = -EIO;
+        goto free_exit;
+    }
+
+    /* we have a winner: copy it */
+    quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
+
+    /* some versions are bad print them */
+    quorum_report_bad_versions(s, acb, &winner->value);
+
+free_exit:
+    /* free lists */
+    quorum_free_vote_list(&acb->votes);
+}
+
 static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                          int64_t sector_num,
                                          QEMUIOVector *qiov,
@@ -178,7 +567,7 @@  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
     }
 
     for (i = 0; i < s->num_children; i++) {
-        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
+        bdrv_aio_readv(s->bs[i], sector_num, &acb->qcrs[i].qiov, nb_sectors,
                        quorum_aio_cb, &acb->qcrs[i]);
     }
 
diff --git a/configure b/configure
index 39f2a1a..6143acb 100755
--- a/configure
+++ b/configure
@@ -264,6 +264,7 @@  gtkabi="2.0"
 tpm="no"
 libssh2=""
 vhdx=""
+quorum="no"
 
 # parse CC options first
 for opt do
@@ -1005,6 +1006,10 @@  for opt do
   ;;
   --disable-vhdx) vhdx="no"
   ;;
+  --disable-quorum) quorum="no"
+  ;;
+  --enable-quorum) quorum="yes"
+  ;;
   *) echo "ERROR: unknown option $opt"; show_help="yes"
   ;;
   esac
@@ -1261,6 +1266,8 @@  Advanced options (experts only):
   --enable-libssh2         enable ssh block device support
   --disable-vhdx           disables support for the Microsoft VHDX image format
   --enable-vhdx            enable support for the Microsoft VHDX image format
+  --disable-quorum         disable quorum block filter support
+  --enable-quorum          enable quorum block filter support
 
 NOTE: The object files are built at the place where configure is launched
 EOF
@@ -1937,6 +1944,30 @@  EOF
 fi
 
 ##########################################
+# Quorum probe (check for gnutls)
+if test "$quorum" != "no" ; then
+cat > $TMPC <<EOF
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
+int main(void) {char data[4096], digest[32];
+gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
+return 0;
+}
+EOF
+quorum_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
+quorum_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
+if compile_prog "$quorum_tls_cflags" "$quorum_tls_libs" ; then
+  qcow_tls=yes
+  libs_softmmu="$quorum_tls_libs $libs_softmmu"
+  libs_tools="$quorum_tls_libs $libs_softmmu"
+  QEMU_CFLAGS="$QEMU_CFLAGS $quorum_tls_cflags"
+else
+  echo "gnutls > 2.10.0 required to compile Quorum"
+  exit 1
+fi
+fi
+
+##########################################
 # VNC SASL detection
 if test "$vnc" = "yes" -a "$vnc_sasl" != "no" ; then
   cat > $TMPC <<EOF
@@ -3893,6 +3924,7 @@  echo "libssh2 support   $libssh2"
 echo "TPM passthrough   $tpm_passthrough"
 echo "QOM debugging     $qom_cast_debug"
 echo "vhdx              $vhdx"
+echo "Quorum            $quorum"
 
 if test "$sdl_too_old" = "yes"; then
 echo "-> Your SDL version is too old - please upgrade to have SDL support"
@@ -4295,6 +4327,10 @@  if test "$libssh2" = "yes" ; then
   echo "CONFIG_LIBSSH2=y" >> $config_host_mak
 fi
 
+if test "$quorum" = "yes" ; then
+  echo "CONFIG_QUORUM=y" >> $config_host_mak
+fi
+
 if test "$virtio_blk_data_plane" = "yes" ; then
   echo 'CONFIG_VIRTIO_BLK_DATA_PLANE=$(CONFIG_VIRTIO)' >> $config_host_mak
 fi
diff --git a/docs/qmp/qmp-events.txt b/docs/qmp/qmp-events.txt
index a378c87..00f9515 100644
--- a/docs/qmp/qmp-events.txt
+++ b/docs/qmp/qmp-events.txt
@@ -500,3 +500,39 @@  Example:
 
 Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
 followed respectively by the RESET, SHUTDOWN, or STOP events.
+
+QUORUM_FAILURE
+--------------
+
+Emitted by the Quorum block driver if it fails to establish a quorum.
+
+Data:
+
+- "reference":    device name if defined else node name.
+- "sector-num":   Number of the first sector of the failed read operation.
+- "sector-count": Failed read operation sector count.
+
+Example:
+
+{ "event": "QUORUM_FAILURE",
+     "data": { "reference": "usr1", "sector-num": 345435, "sector-count": 5 },
+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
+
+QUORUM_REPORT_BAD
+-----------------
+
+Emitted to report a corruption of a Quorum file.
+
+Data:
+
+- "ret":          The IO return code.
+- "node-name":    The graph node name of the block driver state.
+- "sector-num":   Number of the first sector of the failed read operation.
+- "sector-count": Failed read operation sector count.
+
+Example:
+
+{ "event": "QUORUM_REPORT_BAD",
+     "data": { "ret": 0, "node-name": "1.raw", "sector-num": 345435,
+               "sector-count": 5 },
+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
diff --git a/include/monitor/monitor.h b/include/monitor/monitor.h
index 7e5f752..a49ea11 100644
--- a/include/monitor/monitor.h
+++ b/include/monitor/monitor.h
@@ -49,6 +49,8 @@  typedef enum MonitorEvent {
     QEVENT_SPICE_MIGRATE_COMPLETED,
     QEVENT_GUEST_PANICKED,
     QEVENT_BLOCK_IMAGE_CORRUPTED,
+    QEVENT_QUORUM_FAILURE,
+    QEVENT_QUORUM_REPORT_BAD,
 
     /* Add to 'monitor_event_names' array in monitor.c when
      * defining new events here */
diff --git a/monitor.c b/monitor.c
index de90fba..8ae095f 100644
--- a/monitor.c
+++ b/monitor.c
@@ -508,6 +508,8 @@  static const char *monitor_event_names[] = {
     [QEVENT_SPICE_MIGRATE_COMPLETED] = "SPICE_MIGRATE_COMPLETED",
     [QEVENT_GUEST_PANICKED] = "GUEST_PANICKED",
     [QEVENT_BLOCK_IMAGE_CORRUPTED] = "BLOCK_IMAGE_CORRUPTED",
+    [QEVENT_QUORUM_FAILURE] = "QUORUM_FAILURE",
+    [QEVENT_QUORUM_REPORT_BAD] = "QUORUM_REPORT_BAD",
 };
 QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)