diff mbox

[RFC,v2,2/6] block: add live block commit functionality

Message ID 812f2aca86713b00473144bb272f014cc5e8a7f4.1346351079.git.jcody@redhat.com
State New
Headers show

Commit Message

Jeff Cody Aug. 30, 2012, 6:47 p.m. UTC
This adds the live commit coroutine.  This iteration focuses on the
commit only below the active layer, and not the active layer itself.

The behaviour is similar to block streaming; the sectors are walked
through, and anything that exists above 'base' is committed back down
into base.  At the end, intermediate images are deleted, and the
chain stitched together.  Images are restored to their original open
flags upon completion.

Signed-off-by: Jeff Cody <jcody@redhat.com>
---
 block/Makefile.objs |   1 +
 block/commit.c      | 202 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 block_int.h         |  19 +++++
 trace-events        |   2 +
 4 files changed, 224 insertions(+)
 create mode 100644 block/commit.c

Comments

Kevin Wolf Sept. 6, 2012, 2 p.m. UTC | #1
Am 30.08.2012 20:47, schrieb Jeff Cody:
> This adds the live commit coroutine.  This iteration focuses on the
> commit only below the active layer, and not the active layer itself.
> 
> The behaviour is similar to block streaming; the sectors are walked
> through, and anything that exists above 'base' is committed back down
> into base.  At the end, intermediate images are deleted, and the
> chain stitched together.  Images are restored to their original open
> flags upon completion.
> 
> Signed-off-by: Jeff Cody <jcody@redhat.com>
> ---
>  block/Makefile.objs |   1 +
>  block/commit.c      | 202 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  block_int.h         |  19 +++++
>  trace-events        |   2 +
>  4 files changed, 224 insertions(+)
>  create mode 100644 block/commit.c
> 
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index b5754d3..4a136b8 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -4,6 +4,7 @@ block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>  block-obj-y += qed-check.o
>  block-obj-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
>  block-obj-y += stream.o
> +block-obj-y += commit.o
>  block-obj-$(CONFIG_WIN32) += raw-win32.o
>  block-obj-$(CONFIG_POSIX) += raw-posix.o
>  block-obj-$(CONFIG_LIBISCSI) += iscsi.o
> diff --git a/block/commit.c b/block/commit.c
> new file mode 100644
> index 0000000..bd3d882
> --- /dev/null
> +++ b/block/commit.c
> @@ -0,0 +1,202 @@
> +/*
> + * Live block commit
> + *
> + * Copyright Red Hat, Inc. 2012
> + *
> + * Authors:
> + *  Jeff Cody   <jcody@redhat.com>
> + *  Based on stream.c by Stefan Hajnoczi
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + *
> + */
> +
> +#include "trace.h"
> +#include "block_int.h"
> +#include "qemu/ratelimit.h"
> +
> +enum {
> +    /*
> +     * Size of data buffer for populating the image file.  This should be large
> +     * enough to process multiple clusters in a single call, so that populating
> +     * contiguous regions of the image is efficient.
> +     */
> +    COMMIT_BUFFER_SIZE = 512 * 1024, /* in bytes */
> +};
> +
> +#define SLICE_TIME 100000000ULL /* ns */
> +
> +typedef struct CommitBlockJob {
> +    BlockJob common;
> +    RateLimit limit;
> +    BlockDriverState *active;
> +    BlockDriverState *top;
> +    BlockDriverState *base;
> +    BlockErrorAction on_error;
> +    int base_flags;
> +    int top_flags;
> +} CommitBlockJob;
> +
> +static int coroutine_fn commit_populate(BlockDriverState *bs,
> +                                        BlockDriverState *base,
> +                                        int64_t sector_num, int nb_sectors,
> +                                        void *buf)
> +{
> +    if (bdrv_read(bs, sector_num, buf, nb_sectors)) {
> +        return -EIO;
> +    }
> +    if (bdrv_write(base, sector_num, buf, nb_sectors)) {
> +        return -EIO;
> +    }

Don't throw error codes away.

What should we do with backing files that are smaller than the image to
commit? In this version, data is copied up to the size of the backing
file, and then we get -EIO from bdrv_co_do_writev().

> +    return 0;
> +}
> +
> +static void coroutine_fn commit_run(void *opaque)
> +{
> +    CommitBlockJob *s = opaque;
> +    BlockDriverState *active = s->active;
> +    BlockDriverState *top = s->top;
> +    BlockDriverState *base = s->base;
> +    BlockDriverState *top_child = NULL;
> +    int64_t sector_num, end;
> +    int error = 0;
> +    int ret = 0;
> +    int n = 0;
> +    void *buf;
> +    int bytes_written = 0;
> +
> +    s->common.len = bdrv_getlength(top);
> +    if (s->common.len < 0) {
> +        block_job_complete(&s->common, s->common.len);
> +        return;
> +    }
> +
> +    top_child = bdrv_find_child(active, top);
> +
> +    end = s->common.len >> BDRV_SECTOR_BITS;
> +    buf = qemu_blockalign(top, COMMIT_BUFFER_SIZE);
> +
> +    for (sector_num = 0; sector_num < end; sector_num += n) {
> +        uint64_t delay_ns = 0;
> +        bool copy;
> +
> +wait:
> +        /* Note that even when no rate limit is applied we need to yield
> +         * with no pending I/O here so that qemu_aio_flush() returns.
> +         */
> +        block_job_sleep_ns(&s->common, rt_clock, delay_ns);
> +        if (block_job_is_cancelled(&s->common)) {
> +            break;
> +        }
> +        /* Copy if allocated above the base */
> +        ret = bdrv_co_is_allocated_above(top, base, sector_num,
> +                                         COMMIT_BUFFER_SIZE / BDRV_SECTOR_SIZE,
> +                                         &n);
> +        copy = (ret == 1);
> +        trace_commit_one_iteration(s, sector_num, n, ret);
> +        if (ret >= 0 && copy) {

If ret == 1, it's automatically >= 0...

By the way, I'm not sure if the interface is 0/1/-errno or
0/positive/-errno.

> +            if (s->common.speed) {
> +                delay_ns = ratelimit_calculate_delay(&s->limit, n);
> +                if (delay_ns > 0) {
> +                    goto wait;
> +                }
> +            }
> +            ret = commit_populate(top, base, sector_num, n, buf);
> +            bytes_written += n * BDRV_SECTOR_SIZE;
> +        }
> +        if (ret < 0) {
> +            if (s->on_error == BLOCK_ERR_STOP_ANY ||
> +                s->on_error == BLOCK_ERR_STOP_ENOSPC) {

Shouldn't there be a check for ret == -ENOSPC then...? And does this
error handling do anything useful if you can't pause the job? Wouldn't
it retry all the time?

> +                n = 0;
> +                continue;
> +            }
> +            if (error == 0) {
> +                error = ret;
> +            }
> +            if (s->on_error == BLOCK_ERR_REPORT) {
> +                break;
> +            }
> +        }
> +        ret = 0;
> +
> +        /* Publish progress */
> +        s->common.offset += n * BDRV_SECTOR_SIZE;
> +    }
> +
> +    if (!block_job_is_cancelled(&s->common) && sector_num == end && ret == 0) {
> +        /* success */
> +        if (bdrv_delete_intermediate(active, top, base)) {
> +            /* something went wrong! */
> +            /* TODO:add error reporting here */

Indeed :-)

> +        }
> +    }
> +
> +    /* restore base open flags here if appropriate (e.g., change the base back
> +     * to r/o). These reopens do not need to be atomic, since we won't abort
> +     * even on failure here */
> +
> +    if (s->base_flags != bdrv_get_flags(base)) {
> +        bdrv_reopen(base, s->base_flags, NULL);
> +    }
> +    if (s->top_flags != bdrv_get_flags(top_child)) {
> +        bdrv_reopen(top_child, s->top_flags, NULL);
> +    }
> +
> +    qemu_vfree(buf);
> +    block_job_complete(&s->common, ret);
> +}
> +
> +static void commit_set_speed(BlockJob *job, int64_t speed, Error **errp)
> +{
> +    CommitBlockJob *s = container_of(job, CommitBlockJob, common);
> +
> +    if (speed < 0) {
> +        error_set(errp, QERR_INVALID_PARAMETER, "speed");
> +        return;
> +    }
> +    ratelimit_set_speed(&s->limit, speed / BDRV_SECTOR_SIZE, SLICE_TIME);
> +}
> +
> +static BlockJobType commit_job_type = {
> +    .instance_size = sizeof(CommitBlockJob),
> +    .job_type      = "commit",
> +    .set_speed     = commit_set_speed,
> +};
> +
> +void commit_start(BlockDriverState *bs, BlockDriverState *base,
> +                 BlockDriverState *top, int64_t speed,
> +                 BlockErrorAction on_error, BlockDriverCompletionFunc *cb,
> +                 void *opaque, int orig_base_flags, int orig_top_flags,
> +                 Error **errp)
> +{
> +    CommitBlockJob *s;
> +
> +    if ((on_error == BLOCK_ERR_STOP_ANY ||
> +         on_error == BLOCK_ERR_STOP_ENOSPC) &&
> +        !bdrv_iostatus_is_enabled(bs)) {
> +        error_set(errp, QERR_INVALID_PARAMETER_COMBINATION);
> +        return;
> +    }
> +
> +    s = block_job_create(&commit_job_type, bs, speed, cb, opaque, errp);
> +    if (!s) {
> +        return;
> +    }
> +
> +    s->base   = base;
> +    s->top    = top;
> +    s->active = bs;
> +
> +    s->base_flags = orig_base_flags;
> +    s->top_flags  = orig_top_flags;

So it's the caller who is expected to reopen r/w and then pass the
original flags in? Can't we do both of it here?

> +
> +    s->on_error = on_error;
> +    s->common.co = qemu_coroutine_create(commit_run);
> +
> +    trace_commit_start(bs, base, top, s, s->common.co, opaque,
> +                       orig_base_flags, orig_top_flags);
> +    qemu_coroutine_enter(s->common.co, s);
> +
> +    return;

Unnecessary return.

Kevin
Jeff Cody Sept. 6, 2012, 8:37 p.m. UTC | #2
On 09/06/2012 10:00 AM, Kevin Wolf wrote:
> Am 30.08.2012 20:47, schrieb Jeff Cody:
>> This adds the live commit coroutine.  This iteration focuses on the
>> commit only below the active layer, and not the active layer itself.
>>
>> The behaviour is similar to block streaming; the sectors are walked
>> through, and anything that exists above 'base' is committed back down
>> into base.  At the end, intermediate images are deleted, and the
>> chain stitched together.  Images are restored to their original open
>> flags upon completion.
>>
>> Signed-off-by: Jeff Cody <jcody@redhat.com>
>> ---
>>  block/Makefile.objs |   1 +
>>  block/commit.c      | 202 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>>  block_int.h         |  19 +++++
>>  trace-events        |   2 +
>>  4 files changed, 224 insertions(+)
>>  create mode 100644 block/commit.c
>>
>> diff --git a/block/Makefile.objs b/block/Makefile.objs
>> index b5754d3..4a136b8 100644
>> --- a/block/Makefile.objs
>> +++ b/block/Makefile.objs
>> @@ -4,6 +4,7 @@ block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>>  block-obj-y += qed-check.o
>>  block-obj-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
>>  block-obj-y += stream.o
>> +block-obj-y += commit.o
>>  block-obj-$(CONFIG_WIN32) += raw-win32.o
>>  block-obj-$(CONFIG_POSIX) += raw-posix.o
>>  block-obj-$(CONFIG_LIBISCSI) += iscsi.o
>> diff --git a/block/commit.c b/block/commit.c
>> new file mode 100644
>> index 0000000..bd3d882
>> --- /dev/null
>> +++ b/block/commit.c
>> @@ -0,0 +1,202 @@
>> +/*
>> + * Live block commit
>> + *
>> + * Copyright Red Hat, Inc. 2012
>> + *
>> + * Authors:
>> + *  Jeff Cody   <jcody@redhat.com>
>> + *  Based on stream.c by Stefan Hajnoczi
>> + *
>> + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
>> + * See the COPYING.LIB file in the top-level directory.
>> + *
>> + */
>> +
>> +#include "trace.h"
>> +#include "block_int.h"
>> +#include "qemu/ratelimit.h"
>> +
>> +enum {
>> +    /*
>> +     * Size of data buffer for populating the image file.  This should be large
>> +     * enough to process multiple clusters in a single call, so that populating
>> +     * contiguous regions of the image is efficient.
>> +     */
>> +    COMMIT_BUFFER_SIZE = 512 * 1024, /* in bytes */
>> +};
>> +
>> +#define SLICE_TIME 100000000ULL /* ns */
>> +
>> +typedef struct CommitBlockJob {
>> +    BlockJob common;
>> +    RateLimit limit;
>> +    BlockDriverState *active;
>> +    BlockDriverState *top;
>> +    BlockDriverState *base;
>> +    BlockErrorAction on_error;
>> +    int base_flags;
>> +    int top_flags;
>> +} CommitBlockJob;
>> +
>> +static int coroutine_fn commit_populate(BlockDriverState *bs,
>> +                                        BlockDriverState *base,
>> +                                        int64_t sector_num, int nb_sectors,
>> +                                        void *buf)
>> +{
>> +    if (bdrv_read(bs, sector_num, buf, nb_sectors)) {
>> +        return -EIO;
>> +    }
>> +    if (bdrv_write(base, sector_num, buf, nb_sectors)) {
>> +        return -EIO;
>> +    }
> 
> Don't throw error codes away.
> 

OK, I'll pass the return from bdrv_read/write to the caller.

> What should we do with backing files that are smaller than the image to
> commit? In this version, data is copied up to the size of the backing
> file, and then we get -EIO from bdrv_co_do_writev().
> 

We could leave it like that, and let it receive -EIO in that case.
Alternatively, we could try and determine before the commit if the data
will fit in the base, and return -ENOSPC if not.

>> +    return 0;
>> +}
>> +
>> +static void coroutine_fn commit_run(void *opaque)
>> +{
>> +    CommitBlockJob *s = opaque;
>> +    BlockDriverState *active = s->active;
>> +    BlockDriverState *top = s->top;
>> +    BlockDriverState *base = s->base;
>> +    BlockDriverState *top_child = NULL;
>> +    int64_t sector_num, end;
>> +    int error = 0;
>> +    int ret = 0;
>> +    int n = 0;
>> +    void *buf;
>> +    int bytes_written = 0;
>> +
>> +    s->common.len = bdrv_getlength(top);
>> +    if (s->common.len < 0) {
>> +        block_job_complete(&s->common, s->common.len);
>> +        return;
>> +    }
>> +
>> +    top_child = bdrv_find_child(active, top);
>> +
>> +    end = s->common.len >> BDRV_SECTOR_BITS;
>> +    buf = qemu_blockalign(top, COMMIT_BUFFER_SIZE);
>> +
>> +    for (sector_num = 0; sector_num < end; sector_num += n) {
>> +        uint64_t delay_ns = 0;
>> +        bool copy;
>> +
>> +wait:
>> +        /* Note that even when no rate limit is applied we need to yield
>> +         * with no pending I/O here so that qemu_aio_flush() returns.
>> +         */
>> +        block_job_sleep_ns(&s->common, rt_clock, delay_ns);
>> +        if (block_job_is_cancelled(&s->common)) {
>> +            break;
>> +        }
>> +        /* Copy if allocated above the base */
>> +        ret = bdrv_co_is_allocated_above(top, base, sector_num,
>> +                                         COMMIT_BUFFER_SIZE / BDRV_SECTOR_SIZE,
>> +                                         &n);
>> +        copy = (ret == 1);
>> +        trace_commit_one_iteration(s, sector_num, n, ret);
>> +        if (ret >= 0 && copy) {
> 
> If ret == 1, it's automatically >= 0...
> 
> By the way, I'm not sure if the interface is 0/1/-errno or
> 0/positive/-errno.
> 

The header for bdrv_co_is_allocated() states that it returns true if
it is allocated, and false otherwise.  However, the function actually
returns true if allocated, false if not, and something < 0 on failure.

But either way, you are right, I can just use 'copy' here.

>> +            if (s->common.speed) {
>> +                delay_ns = ratelimit_calculate_delay(&s->limit, n);
>> +                if (delay_ns > 0) {
>> +                    goto wait;
>> +                }
>> +            }
>> +            ret = commit_populate(top, base, sector_num, n, buf);
>> +            bytes_written += n * BDRV_SECTOR_SIZE;
>> +        }
>> +        if (ret < 0) {
>> +            if (s->on_error == BLOCK_ERR_STOP_ANY ||
>> +                s->on_error == BLOCK_ERR_STOP_ENOSPC) {
> 
> Shouldn't there be a check for ret == -ENOSPC then...? And does this
> error handling do anything useful if you can't pause the job? Wouldn't
> it retry all the time?
> 

We should check for ret == -ENOSPC, yes.  On the error handling, we
should probably just stop completely here, and return error.

>> +                n = 0;
>> +                continue;
>> +            }
>> +            if (error == 0) {
>> +                error = ret;
>> +            }
>> +            if (s->on_error == BLOCK_ERR_REPORT) {
>> +                break;
>> +            }
>> +        }
>> +        ret = 0;
>> +
>> +        /* Publish progress */
>> +        s->common.offset += n * BDRV_SECTOR_SIZE;
>> +    }
>> +
>> +    if (!block_job_is_cancelled(&s->common) && sector_num == end && ret == 0) {
>> +        /* success */
>> +        if (bdrv_delete_intermediate(active, top, base)) {
>> +            /* something went wrong! */
>> +            /* TODO:add error reporting here */
> 
> Indeed :-)
> 
>> +        }
>> +    }
>> +
>> +    /* restore base open flags here if appropriate (e.g., change the base back
>> +     * to r/o). These reopens do not need to be atomic, since we won't abort
>> +     * even on failure here */
>> +
>> +    if (s->base_flags != bdrv_get_flags(base)) {
>> +        bdrv_reopen(base, s->base_flags, NULL);
>> +    }
>> +    if (s->top_flags != bdrv_get_flags(top_child)) {
>> +        bdrv_reopen(top_child, s->top_flags, NULL);
>> +    }
>> +
>> +    qemu_vfree(buf);
>> +    block_job_complete(&s->common, ret);
>> +}
>> +
>> +static void commit_set_speed(BlockJob *job, int64_t speed, Error **errp)
>> +{
>> +    CommitBlockJob *s = container_of(job, CommitBlockJob, common);
>> +
>> +    if (speed < 0) {
>> +        error_set(errp, QERR_INVALID_PARAMETER, "speed");
>> +        return;
>> +    }
>> +    ratelimit_set_speed(&s->limit, speed / BDRV_SECTOR_SIZE, SLICE_TIME);
>> +}
>> +
>> +static BlockJobType commit_job_type = {
>> +    .instance_size = sizeof(CommitBlockJob),
>> +    .job_type      = "commit",
>> +    .set_speed     = commit_set_speed,
>> +};
>> +
>> +void commit_start(BlockDriverState *bs, BlockDriverState *base,
>> +                 BlockDriverState *top, int64_t speed,
>> +                 BlockErrorAction on_error, BlockDriverCompletionFunc *cb,
>> +                 void *opaque, int orig_base_flags, int orig_top_flags,
>> +                 Error **errp)
>> +{
>> +    CommitBlockJob *s;
>> +
>> +    if ((on_error == BLOCK_ERR_STOP_ANY ||
>> +         on_error == BLOCK_ERR_STOP_ENOSPC) &&
>> +        !bdrv_iostatus_is_enabled(bs)) {
>> +        error_set(errp, QERR_INVALID_PARAMETER_COMBINATION);
>> +        return;
>> +    }
>> +
>> +    s = block_job_create(&commit_job_type, bs, speed, cb, opaque, errp);
>> +    if (!s) {
>> +        return;
>> +    }
>> +
>> +    s->base   = base;
>> +    s->top    = top;
>> +    s->active = bs;
>> +
>> +    s->base_flags = orig_base_flags;
>> +    s->top_flags  = orig_top_flags;
> 
> So it's the caller who is expected to reopen r/w and then pass the
> original flags in? Can't we do both of it here?
> 

Yes, that would be cleaner - I'll move the reopen() to commit_start().


>> +
>> +    s->on_error = on_error;
>> +    s->common.co = qemu_coroutine_create(commit_run);
>> +
>> +    trace_commit_start(bs, base, top, s, s->common.co, opaque,
>> +                       orig_base_flags, orig_top_flags);
>> +    qemu_coroutine_enter(s->common.co, s);
>> +
>> +    return;
> 
> Unnecessary return.
> 
> Kevin
>
Eric Blake Sept. 6, 2012, 9:16 p.m. UTC | #3
On 09/06/2012 02:37 PM, Jeff Cody wrote:
> On 09/06/2012 10:00 AM, Kevin Wolf wrote:
>> Am 30.08.2012 20:47, schrieb Jeff Cody:
>>> This adds the live commit coroutine.  This iteration focuses on the
>>> commit only below the active layer, and not the active layer itself.
>>>
>>> The behaviour is similar to block streaming; the sectors are walked
>>> through, and anything that exists above 'base' is committed back down
>>> into base.  At the end, intermediate images are deleted, and the
>>> chain stitched together.  Images are restored to their original open
>>> flags upon completion.
>>>

> 
>> What should we do with backing files that are smaller than the image to
>> commit? In this version, data is copied up to the size of the backing
>> file, and then we get -EIO from bdrv_co_do_writev().
>>
> 
> We could leave it like that, and let it receive -EIO in that case.
> Alternatively, we could try and determine before the commit if the data
> will fit in the base, and return -ENOSPC if not.

Neither sounds appealing.  Why can't we first try to resize the base to
the new data size being committed, and only fall back to -ENOSPC or -EIO
if the resize fails?
Jeff Cody Sept. 7, 2012, 3:56 p.m. UTC | #4
On 09/06/2012 05:16 PM, Eric Blake wrote:
> On 09/06/2012 02:37 PM, Jeff Cody wrote:
>> On 09/06/2012 10:00 AM, Kevin Wolf wrote:
>>> Am 30.08.2012 20:47, schrieb Jeff Cody:
>>>> This adds the live commit coroutine.  This iteration focuses on the
>>>> commit only below the active layer, and not the active layer itself.
>>>>
>>>> The behaviour is similar to block streaming; the sectors are walked
>>>> through, and anything that exists above 'base' is committed back down
>>>> into base.  At the end, intermediate images are deleted, and the
>>>> chain stitched together.  Images are restored to their original open
>>>> flags upon completion.
>>>>
> 
>>
>>> What should we do with backing files that are smaller than the image to
>>> commit? In this version, data is copied up to the size of the backing
>>> file, and then we get -EIO from bdrv_co_do_writev().
>>>
>>
>> We could leave it like that, and let it receive -EIO in that case.
>> Alternatively, we could try and determine before the commit if the data
>> will fit in the base, and return -ENOSPC if not.
> 
> Neither sounds appealing.  Why can't we first try to resize the base to
> the new data size being committed, and only fall back to -ENOSPC or -EIO
> if the resize fails?
> 

OK - we will attempt to resize the base, and return the appropriate
error on failure or if unsupported for the format.
diff mbox

Patch

diff --git a/block/Makefile.objs b/block/Makefile.objs
index b5754d3..4a136b8 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -4,6 +4,7 @@  block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
 block-obj-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
 block-obj-y += stream.o
+block-obj-y += commit.o
 block-obj-$(CONFIG_WIN32) += raw-win32.o
 block-obj-$(CONFIG_POSIX) += raw-posix.o
 block-obj-$(CONFIG_LIBISCSI) += iscsi.o
diff --git a/block/commit.c b/block/commit.c
new file mode 100644
index 0000000..bd3d882
--- /dev/null
+++ b/block/commit.c
@@ -0,0 +1,202 @@ 
+/*
+ * Live block commit
+ *
+ * Copyright Red Hat, Inc. 2012
+ *
+ * Authors:
+ *  Jeff Cody   <jcody@redhat.com>
+ *  Based on stream.c by Stefan Hajnoczi
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ *
+ */
+
+#include "trace.h"
+#include "block_int.h"
+#include "qemu/ratelimit.h"
+
+enum {
+    /*
+     * Size of data buffer for populating the image file.  This should be large
+     * enough to process multiple clusters in a single call, so that populating
+     * contiguous regions of the image is efficient.
+     */
+    COMMIT_BUFFER_SIZE = 512 * 1024, /* in bytes */
+};
+
+#define SLICE_TIME 100000000ULL /* ns */
+
+typedef struct CommitBlockJob {
+    BlockJob common;
+    RateLimit limit;
+    BlockDriverState *active;
+    BlockDriverState *top;
+    BlockDriverState *base;
+    BlockErrorAction on_error;
+    int base_flags;
+    int top_flags;
+} CommitBlockJob;
+
+static int coroutine_fn commit_populate(BlockDriverState *bs,
+                                        BlockDriverState *base,
+                                        int64_t sector_num, int nb_sectors,
+                                        void *buf)
+{
+    if (bdrv_read(bs, sector_num, buf, nb_sectors)) {
+        return -EIO;
+    }
+    if (bdrv_write(base, sector_num, buf, nb_sectors)) {
+        return -EIO;
+    }
+    return 0;
+}
+
+static void coroutine_fn commit_run(void *opaque)
+{
+    CommitBlockJob *s = opaque;
+    BlockDriverState *active = s->active;
+    BlockDriverState *top = s->top;
+    BlockDriverState *base = s->base;
+    BlockDriverState *top_child = NULL;
+    int64_t sector_num, end;
+    int error = 0;
+    int ret = 0;
+    int n = 0;
+    void *buf;
+    int bytes_written = 0;
+
+    s->common.len = bdrv_getlength(top);
+    if (s->common.len < 0) {
+        block_job_complete(&s->common, s->common.len);
+        return;
+    }
+
+    top_child = bdrv_find_child(active, top);
+
+    end = s->common.len >> BDRV_SECTOR_BITS;
+    buf = qemu_blockalign(top, COMMIT_BUFFER_SIZE);
+
+    for (sector_num = 0; sector_num < end; sector_num += n) {
+        uint64_t delay_ns = 0;
+        bool copy;
+
+wait:
+        /* Note that even when no rate limit is applied we need to yield
+         * with no pending I/O here so that qemu_aio_flush() returns.
+         */
+        block_job_sleep_ns(&s->common, rt_clock, delay_ns);
+        if (block_job_is_cancelled(&s->common)) {
+            break;
+        }
+        /* Copy if allocated above the base */
+        ret = bdrv_co_is_allocated_above(top, base, sector_num,
+                                         COMMIT_BUFFER_SIZE / BDRV_SECTOR_SIZE,
+                                         &n);
+        copy = (ret == 1);
+        trace_commit_one_iteration(s, sector_num, n, ret);
+        if (ret >= 0 && copy) {
+            if (s->common.speed) {
+                delay_ns = ratelimit_calculate_delay(&s->limit, n);
+                if (delay_ns > 0) {
+                    goto wait;
+                }
+            }
+            ret = commit_populate(top, base, sector_num, n, buf);
+            bytes_written += n * BDRV_SECTOR_SIZE;
+        }
+        if (ret < 0) {
+            if (s->on_error == BLOCK_ERR_STOP_ANY ||
+                s->on_error == BLOCK_ERR_STOP_ENOSPC) {
+                n = 0;
+                continue;
+            }
+            if (error == 0) {
+                error = ret;
+            }
+            if (s->on_error == BLOCK_ERR_REPORT) {
+                break;
+            }
+        }
+        ret = 0;
+
+        /* Publish progress */
+        s->common.offset += n * BDRV_SECTOR_SIZE;
+    }
+
+    if (!block_job_is_cancelled(&s->common) && sector_num == end && ret == 0) {
+        /* success */
+        if (bdrv_delete_intermediate(active, top, base)) {
+            /* something went wrong! */
+            /* TODO:add error reporting here */
+        }
+    }
+
+    /* restore base open flags here if appropriate (e.g., change the base back
+     * to r/o). These reopens do not need to be atomic, since we won't abort
+     * even on failure here */
+
+    if (s->base_flags != bdrv_get_flags(base)) {
+        bdrv_reopen(base, s->base_flags, NULL);
+    }
+    if (s->top_flags != bdrv_get_flags(top_child)) {
+        bdrv_reopen(top_child, s->top_flags, NULL);
+    }
+
+    qemu_vfree(buf);
+    block_job_complete(&s->common, ret);
+}
+
+static void commit_set_speed(BlockJob *job, int64_t speed, Error **errp)
+{
+    CommitBlockJob *s = container_of(job, CommitBlockJob, common);
+
+    if (speed < 0) {
+        error_set(errp, QERR_INVALID_PARAMETER, "speed");
+        return;
+    }
+    ratelimit_set_speed(&s->limit, speed / BDRV_SECTOR_SIZE, SLICE_TIME);
+}
+
+static BlockJobType commit_job_type = {
+    .instance_size = sizeof(CommitBlockJob),
+    .job_type      = "commit",
+    .set_speed     = commit_set_speed,
+};
+
+void commit_start(BlockDriverState *bs, BlockDriverState *base,
+                 BlockDriverState *top, int64_t speed,
+                 BlockErrorAction on_error, BlockDriverCompletionFunc *cb,
+                 void *opaque, int orig_base_flags, int orig_top_flags,
+                 Error **errp)
+{
+    CommitBlockJob *s;
+
+    if ((on_error == BLOCK_ERR_STOP_ANY ||
+         on_error == BLOCK_ERR_STOP_ENOSPC) &&
+        !bdrv_iostatus_is_enabled(bs)) {
+        error_set(errp, QERR_INVALID_PARAMETER_COMBINATION);
+        return;
+    }
+
+    s = block_job_create(&commit_job_type, bs, speed, cb, opaque, errp);
+    if (!s) {
+        return;
+    }
+
+    s->base   = base;
+    s->top    = top;
+    s->active = bs;
+
+    s->base_flags = orig_base_flags;
+    s->top_flags  = orig_top_flags;
+
+    s->on_error = on_error;
+    s->common.co = qemu_coroutine_create(commit_run);
+
+    trace_commit_start(bs, base, top, s, s->common.co, opaque,
+                       orig_base_flags, orig_top_flags);
+    qemu_coroutine_enter(s->common.co, s);
+
+    return;
+}
diff --git a/block_int.h b/block_int.h
index 7a4e226..5c936ee 100644
--- a/block_int.h
+++ b/block_int.h
@@ -469,4 +469,23 @@  void stream_start(BlockDriverState *bs, BlockDriverState *base,
                   BlockDriverCompletionFunc *cb,
                   void *opaque, Error **errp);
 
+/**
+ * commit_start:
+ * @bs: Top Block device
+ * @base: Block device that will be written into, and become the new top
+ * @speed: The maximum speed, in bytes per second, or 0 for unlimited.
+ * @on_error: The action to take upon error.
+ * @cb: Completion function for the job.
+ * @opaque: Opaque pointer value passed to @cb.
+ * @orig_base_flags: The original open flags for the base image
+ * @orig_top_flags: The original open flags for the top image
+ * @errp: Error object.
+ *
+ */
+void commit_start(BlockDriverState *bs, BlockDriverState *base,
+                 BlockDriverState *top, int64_t speed,
+                 BlockErrorAction on_error, BlockDriverCompletionFunc *cb,
+                 void *opaque, int orig_base_flags, int orig_top_flags,
+                 Error **errp);
+
 #endif /* BLOCK_INT_H */
diff --git a/trace-events b/trace-events
index 04b0723..9eb8f10 100644
--- a/trace-events
+++ b/trace-events
@@ -74,6 +74,8 @@  bdrv_co_do_copy_on_readv(void *bs, int64_t sector_num, int nb_sectors, int64_t c
 # block/stream.c
 stream_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
 stream_start(void *bs, void *base, void *s, void *co, void *opaque) "bs %p base %p s %p co %p opaque %p"
+commit_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
+commit_start(void *bs, void *base, void *top, void *s, void *co, void *opaque, int base_flags, int top_flags) "bs %p base %p top %p s %p co %p opaque %p base_flags %d top_flags %d"
 
 # blockdev.c
 qmp_block_job_cancel(void *job) "job %p"