@@ -30,6 +30,7 @@
#define QUORUM_OPT_BLKVERIFY "blkverify"
#define QUORUM_OPT_REWRITE "rewrite-corrupted"
#define QUORUM_OPT_READ_PATTERN "read-pattern"
+#define QUORUM_CHILDREN_OPT_IGNORE_ERRORS "ignore-errors"
/* This union holds a vote hash value */
typedef union QuorumVoteValue {
@@ -65,6 +66,7 @@ typedef struct QuorumVotes {
/* the following structure holds the state of one quorum instance */
typedef struct BDRVQuorumState {
BlockDriverState **bs; /* children BlockDriverStates */
+ bool *ignore_errors; /* ignore children's error? */
int num_children; /* children count */
int threshold; /* if less than threshold children reads gave the
* same result a quorum error occurs.
@@ -97,6 +99,7 @@ typedef struct QuorumChildRequest {
uint8_t *buf;
int ret;
QuorumAIOCB *parent;
+ int index;
} QuorumChildRequest;
/* Quorum will use the following structure to track progress of each read/write
@@ -209,6 +212,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
acb->qcrs[i].buf = NULL;
acb->qcrs[i].ret = 0;
acb->qcrs[i].parent = acb;
+ acb->qcrs[i].index = i;
}
return acb;
@@ -305,7 +309,7 @@ static void quorum_aio_cb(void *opaque, int ret)
acb->count++;
if (ret == 0) {
acb->success_count++;
- } else {
+ } else if (!s->ignore_errors[sacb->index]) {
quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
}
assert(acb->count <= s->num_children);
@@ -720,19 +724,31 @@ static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs,
static int64_t quorum_getlength(BlockDriverState *bs)
{
BDRVQuorumState *s = bs->opaque;
- int64_t result;
+ int64_t result = -EIO;
int i;
/* check that all file have the same length */
- result = bdrv_getlength(s->bs[0]);
- if (result < 0) {
- return result;
- }
- for (i = 1; i < s->num_children; i++) {
+ for (i = 0; i < s->num_children; i++) {
int64_t value = bdrv_getlength(s->bs[i]);
+
if (value < 0) {
return value;
}
+
+ if (value == 0 && s->ignore_errors[i]) {
+ /*
+ * If the child is not ready, it cannot return -errno,
+ * otherwise refresh_total_sectors() will fail when
+ * we open the child.
+ */
+ continue;
+ }
+
+ if (result == -EIO) {
+ result = value;
+ continue;
+ }
+
if (value != result) {
return -EIO;
}
@@ -770,6 +786,9 @@ static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
for (i = 0; i < s->num_children; i++) {
result = bdrv_co_flush(s->bs[i]);
+ if (result < 0 && s->ignore_errors[i]) {
+ result = 0;
+ }
result_value.l = result;
quorum_count_vote(&error_votes, &result_value, i);
}
@@ -844,6 +863,19 @@ static QemuOptsList quorum_runtime_opts = {
},
};
+static QemuOptsList quorum_children_common_opts = {
+ .name = "quorum children",
+ .head = QTAILQ_HEAD_INITIALIZER(quorum_children_common_opts.head),
+ .desc = {
+ {
+ .name = QUORUM_CHILDREN_OPT_IGNORE_ERRORS,
+ .type = QEMU_OPT_BOOL,
+ .help = "ignore child I/O error",
+ },
+ { /* end of list */ }
+ },
+};
+
static int parse_read_pattern(const char *opt)
{
int i;
@@ -939,11 +971,14 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
/* allocate the children BlockDriverState array */
s->bs = g_new0(BlockDriverState *, s->num_children);
opened = g_new0(bool, s->num_children);
+ s->ignore_errors = g_new0(bool, s->num_children);
for (i = 0, lentry = qlist_first(list); lentry;
lentry = qlist_next(lentry), i++) {
QDict *d;
QString *string;
+ QemuOpts *children_opts = NULL;
+ bool value;
switch (qobject_type(lentry->value))
{
@@ -951,6 +986,20 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
case QTYPE_QDICT:
d = qobject_to_qdict(lentry->value);
QINCREF(d);
+
+ children_opts = qemu_opts_create(&quorum_children_common_opts,
+ NULL, 0, &error_abort);
+ qemu_opts_absorb_qdict(children_opts, d, &local_err);
+ if (local_err) {
+ ret = -EINVAL;
+ qemu_opts_del(children_opts);
+ goto close_exit;
+ }
+ value = qemu_opt_get_bool(children_opts,
+ QUORUM_CHILDREN_OPT_IGNORE_ERRORS,
+ false);
+ s->ignore_errors[i] = value;
+ qemu_opts_del(children_opts);
ret = bdrv_open(&s->bs[i], NULL, NULL, d, flags, NULL,
&local_err);
break;
@@ -1008,6 +1057,7 @@ static void quorum_close(BlockDriverState *bs)
}
g_free(s->bs);
+ g_free(s->ignore_errors);
}
static void quorum_detach_aio_context(BlockDriverState *bs)