Message ID | 1297330258-20494-8-git-send-email-tamura.yoshiaki@lab.ntt.co.jp |
---|---|
State | New |
Headers | show |
Yoshiaki: I have one question about ram_save_live, during migration 3 stage(completation stage), it will call cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty pages. at the end of migrate_ft_trans_connect function, it will invoke vm_start(), at this time, cpu_physical_memory_set_dirty_tracking(1) is not called yet, so there may have some ram pages not recorded when qemu_savevm_trans_begin is called. I think you need calll cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect function, Am I right? BR Green. 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> > This code implements VM transaction protocol. Like buffered_file, it > sits between savevm and migration layer. With this architecture, VM > transaction protocol is implemented mostly independent from other > existing code. > > Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> > Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp> > --- > Makefile.objs | 1 + > ft_trans_file.c | 624 > +++++++++++++++++++++++++++++++++++++++++++++++++++++++ > ft_trans_file.h | 72 +++++++ > migration.c | 3 + > trace-events | 15 ++ > 5 files changed, 715 insertions(+), 0 deletions(-) > create mode 100644 ft_trans_file.c > create mode 100644 ft_trans_file.h > > diff --git a/Makefile.objs b/Makefile.objs > index 353b1a8..04148b5 100644 > --- a/Makefile.objs > +++ b/Makefile.objs > @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o > common-obj-y += qdev.o qdev-properties.o > common-obj-y += block-migration.o > common-obj-y += pflib.o > +common-obj-y += ft_trans_file.o > > common-obj-$(CONFIG_BRLAPI) += baum.o > common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o > migration-fd.o > diff --git a/ft_trans_file.c b/ft_trans_file.c > new file mode 100644 > index 0000000..2b42b95 > --- /dev/null > +++ b/ft_trans_file.c > @@ -0,0 +1,624 @@ > +/* > + * Fault tolerant VM transaction QEMUFile > + * > + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. > + * > + * This work is licensed under the terms of the GNU GPL, version 2. See > + * the COPYING file in the top-level directory. > + * > + * This source code is based on buffered_file.c. > + * Copyright IBM, Corp. 2008 > + * Authors: > + * Anthony Liguori <aliguori@us.ibm.com> > + */ > + > +#include "qemu-common.h" > +#include "qemu-error.h" > +#include "hw/hw.h" > +#include "qemu-timer.h" > +#include "sysemu.h" > +#include "qemu-char.h" > +#include "trace.h" > +#include "ft_trans_file.h" > + > +typedef struct FtTransHdr > +{ > + uint16_t cmd; > + uint16_t id; > + uint32_t seq; > + uint32_t payload_len; > +} FtTransHdr; > + > +typedef struct QEMUFileFtTrans > +{ > + FtTransPutBufferFunc *put_buffer; > + FtTransGetBufferFunc *get_buffer; > + FtTransPutReadyFunc *put_ready; > + FtTransGetReadyFunc *get_ready; > + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; > + FtTransCloseFunc *close; > + void *opaque; > + QEMUFile *file; > + > + enum QEMU_VM_TRANSACTION_STATE state; > + uint32_t seq; > + uint16_t id; > + > + int has_error; > + > + bool freeze_output; > + bool freeze_input; > + bool rate_limit; > + bool is_sender; > + bool is_payload; > + > + uint8_t *buf; > + size_t buf_max_size; > + size_t put_offset; > + size_t get_offset; > + > + FtTransHdr header; > + size_t header_offset; > +} QEMUFileFtTrans; > + > +#define IO_BUF_SIZE 32768 > + > +static void ft_trans_append(QEMUFileFtTrans *s, > + const uint8_t *buf, size_t size) > +{ > + if (size > (s->buf_max_size - s->put_offset)) { > + trace_ft_trans_realloc(s->buf_max_size, size + 1024); > + s->buf_max_size += size + 1024; > + s->buf = qemu_realloc(s->buf, s->buf_max_size); > + } > + > + trace_ft_trans_append(size); > + memcpy(s->buf + s->put_offset, buf, size); > + s->put_offset += size; > +} > + > +static void ft_trans_flush(QEMUFileFtTrans *s) > +{ > + size_t offset = 0; > + > + if (s->has_error) { > + error_report("flush when error %d, bailing", s->has_error); > + return; > + } > + > + while (offset < s->put_offset) { > + ssize_t ret; > + > + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - > offset); > + if (ret == -EAGAIN) { > + break; > + } > + > + if (ret <= 0) { > + error_report("error flushing data, %s", strerror(errno)); > + s->has_error = FT_TRANS_ERR_FLUSH; > + break; > + } else { > + offset += ret; > + } > + } > + > + trace_ft_trans_flush(offset, s->put_offset); > + memmove(s->buf, s->buf + offset, s->put_offset - offset); > + s->put_offset -= offset; > + s->freeze_output = !!s->put_offset; > +} > + > +static ssize_t ft_trans_put(void *opaque, void *buf, int size) > +{ > + QEMUFileFtTrans *s = opaque; > + size_t offset = 0; > + ssize_t len; > + > + /* flush buffered data before putting next */ > + if (s->put_offset) { > + ft_trans_flush(s); > + } > + > + while (!s->freeze_output && offset < size) { > + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - > offset); > + > + if (len == -EAGAIN) { > + trace_ft_trans_freeze_output(); > + s->freeze_output = 1; > + break; > + } > + > + if (len <= 0) { > + error_report("putting data failed, %s", strerror(errno)); > + s->has_error = 1; > + offset = -EINVAL; > + break; > + } > + > + offset += len; > + } > + > + if (s->freeze_output) { > + ft_trans_append(s, buf + offset, size - offset); > + offset = size; > + } > + > + return offset; > +} > + > +static int ft_trans_send_header(QEMUFileFtTrans *s, > + enum QEMU_VM_TRANSACTION_STATE state, > + uint32_t payload_len) > +{ > + int ret; > + FtTransHdr *hdr = &s->header; > + > + trace_ft_trans_send_header(state); > + > + hdr->cmd = s->state = state; > + hdr->id = s->id; > + hdr->seq = s->seq; > + hdr->payload_len = payload_len; > + > + ret = ft_trans_put(s, hdr, sizeof(*hdr)); > + if (ret < 0) { > + error_report("send header failed"); > + s->has_error = FT_TRANS_ERR_SEND_HDR; > + } > + > + return ret; > +} > + > +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t > pos, int size) > +{ > + QEMUFileFtTrans *s = opaque; > + ssize_t ret; > + > + trace_ft_trans_put_buffer(size, pos); > + > + if (s->has_error) { > + error_report("put_buffer when error %d, bailing", s->has_error); > + return -EINVAL; > + } > + > + /* assuming qemu_file_put_notify() is calling */ > + if (pos == 0 && size == 0) { > + trace_ft_trans_put_ready(); > + ft_trans_flush(s); > + > + if (!s->freeze_output) { > + trace_ft_trans_cb(s->put_ready); > + ret = s->put_ready(); > + } > + > + ret = 0; > + goto out; > + } > + > + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); > + if (ret < 0) { > + goto out; > + } > + > + ret = ft_trans_put(s, (uint8_t *)buf, size); > + if (ret < 0) { > + error_report("send palyload failed"); > + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; > + goto out; > + } > + > + s->seq++; > + > +out: > + return ret; > +} > + > +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) > +{ > + QEMUFileFtTrans *s = opaque; > + size_t offset = 0; > + ssize_t len; > + > + s->freeze_input = 0; > + > + while (offset < size) { > + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, > + 0, size - offset); > + if (len == -EAGAIN) { > + trace_ft_trans_freeze_input(); > + s->freeze_input = 1; > + break; > + } > + > + if (len <= 0) { > + error_report("fill buffer failed, %s", strerror(errno)); > + s->has_error = 1; > + return -EINVAL; > + } > + > + offset += len; > + } > + > + return offset; > +} > + > +static int ft_trans_recv_header(QEMUFileFtTrans *s) > +{ > + int ret; > + char *buf = (char *)&s->header + s->header_offset; > + > + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - > s->header_offset); > + if (ret < 0) { > + error_report("recv header failed"); > + s->has_error = FT_TRANS_ERR_RECV_HDR; > + goto out; > + } > + > + s->header_offset += ret; > + if (s->header_offset == sizeof(FtTransHdr)) { > + trace_ft_trans_recv_header(s->header.cmd); > + s->state = s->header.cmd; > + s->header_offset = 0; > + > + if (!s->is_sender) { > + s->id = s->header.id; > + s->seq = s->header.seq; > + } > + } > + > +out: > + return ret; > +} > + > +static int ft_trans_recv_payload(QEMUFileFtTrans *s) > +{ > + QEMUFile *f = s->file; > + int ret = -1; > + > + /* extend QEMUFile buf if there weren't enough space */ > + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { > + s->buf_max_size += (s->header.payload_len - > + (s->buf_max_size - s->get_offset)); > + s->buf = qemu_realloc_buffer(f, s->buf_max_size); > + } > + > + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, > + s->header.payload_len); > + if (ret < 0) { > + error_report("recv payload failed"); > + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; > + goto out; > + } > + > + trace_ft_trans_recv_payload(ret, s->header.payload_len, > s->get_offset); > + > + s->header.payload_len -= ret; > + s->get_offset += ret; > + s->is_payload = !!s->header.payload_len; > + > +out: > + return ret; > +} > + > +static int ft_trans_recv(QEMUFileFtTrans *s) > +{ > + int ret; > + > + /* get payload and return */ > + if (s->is_payload) { > + ret = ft_trans_recv_payload(s); > + goto out; > + } > + > + ret = ft_trans_recv_header(s); > + if (ret < 0 || s->freeze_input) { > + goto out; > + } > + > + switch (s->state) { > + case QEMU_VM_TRANSACTION_BEGIN: > + /* CONTINUE or COMMIT should come shortly */ > + s->is_payload = 0; > + break; > + > + case QEMU_VM_TRANSACTION_CONTINUE: > + /* get payload */ > + s->is_payload = 1; > + break; > + > + case QEMU_VM_TRANSACTION_COMMIT: > + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); > + if (ret < 0) { > + goto out; > + } > + > + trace_ft_trans_cb(s->get_ready); > + ret = s->get_ready(s->opaque); > + if (ret < 0) { > + goto out; > + } > + > + qemu_clear_buffer(s->file); > + s->get_offset = 0; > + s->is_payload = 0; > + > + break; > + > + case QEMU_VM_TRANSACTION_ATOMIC: > + /* not implemented yet */ > + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d", > + ret); > + break; > + > + case QEMU_VM_TRANSACTION_CANCEL: > + /* return -EINVAL until migrate cancel on recevier side is > supported */ > + ret = -EINVAL; > + break; > + > + default: > + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret); > + s->has_error = FT_TRANS_ERR_STATE_INVALID; > + ret = -EINVAL; > + } > + > +out: > + return ret; > +} > + > +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, > + int64_t pos, int size) > +{ > + QEMUFileFtTrans *s = opaque; > + int ret; > + > + if (s->has_error) { > + error_report("get_buffer when error %d, bailing", s->has_error); > + return -EINVAL; > + } > + > + /* assuming qemu_file_get_notify() is calling */ > + if (pos == 0 && size == 0) { > + trace_ft_trans_get_ready(); > + s->freeze_input = 0; > + > + /* sender should be waiting for ACK */ > + if (s->is_sender) { > + ret = ft_trans_recv_header(s); > + if (s->freeze_input) { > + ret = 0; > + goto out; > + } > + if (ret < 0) { > + error_report("recv ack failed"); > + goto out; > + } > + > + if (s->state != QEMU_VM_TRANSACTION_ACK) { > + error_report("recv invalid state %d", s->state); > + s->has_error = FT_TRANS_ERR_STATE_INVALID; > + ret = -EINVAL; > + goto out; > + } > + > + trace_ft_trans_cb(s->get_ready); > + ret = s->get_ready(s->opaque); > + if (ret < 0) { > + goto out; > + } > + > + /* proceed trans id */ > + s->id++; > + > + return 0; > + } > + > + /* set QEMUFile buf at beginning */ > + if (!s->buf) { > + s->buf = buf; > + } > + > + ret = ft_trans_recv(s); > + goto out; > + } > + > + ret = s->get_offset; > + > +out: > + return ret; > +} > + > +static int ft_trans_close(void *opaque) > +{ > + QEMUFileFtTrans *s = opaque; > + int ret; > + > + trace_ft_trans_close(); > + ret = s->close(s->opaque); > + if (s->is_sender) { > + qemu_free(s->buf); > + } > + qemu_free(s); > + > + return ret; > +} > + > +static int ft_trans_rate_limit(void *opaque) > +{ > + QEMUFileFtTrans *s = opaque; > + > + if (s->has_error) { > + return 0; > + } > + > + if (s->rate_limit && s->freeze_output) { > + return 1; > + } > + > + return 0; > +} > + > +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) > +{ > + QEMUFileFtTrans *s = opaque; > + > + if (s->has_error) { > + goto out; > + } > + > + s->rate_limit = !!new_rate; > + > +out: > + return s->rate_limit; > +} > + > +int ft_trans_begin(void *opaque) > +{ > + QEMUFileFtTrans *s = opaque; > + int ret; > + s->seq = 0; > + > + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ > + if (!s->is_sender) { > + if (s->state != QEMU_VM_TRANSACTION_INIT) { > + error_report("invalid state %d", s->state); > + s->has_error = FT_TRANS_ERR_STATE_INVALID; > + ret = -EINVAL; > + } > + > + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); > + goto out; > + } > + > + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */ > + if (s->state == QEMU_VM_TRANSACTION_INIT) { > +retry: > + ret = ft_trans_recv_header(s); > + if (s->freeze_input) { > + goto retry; > + } > + if (ret < 0) { > + error_report("recv ack failed"); > + goto out; > + } > + > + if (s->state != QEMU_VM_TRANSACTION_ACK) { > + error_report("recv invalid state %d", s->state); > + s->has_error = FT_TRANS_ERR_STATE_INVALID; > + ret = -EINVAL; > + goto out; > + } > + } > + > + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); > + if (ret < 0) { > + goto out; > + } > + > + s->state = QEMU_VM_TRANSACTION_CONTINUE; > + > +out: > + return ret; > +} > + > +int ft_trans_commit(void *opaque) > +{ > + QEMUFileFtTrans *s = opaque; > + int ret; > + > + if (!s->is_sender) { > + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); > + goto out; > + } > + > + /* sender should flush buf before sending COMMIT */ > + qemu_fflush(s->file); > + > + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); > + if (ret < 0) { > + goto out; > + } > + > + while (!s->has_error && s->put_offset) { > + ft_trans_flush(s); > + if (s->freeze_output) { > + s->wait_for_unfreeze(s); > + } > + } > + > + if (s->has_error) { > + ret = -EINVAL; > + goto out; > + } > + > + ret = ft_trans_recv_header(s); > + if (s->freeze_input) { > + ret = -EAGAIN; > + goto out; > + } > + if (ret < 0) { > + error_report("recv ack failed"); > + goto out; > + } > + > + if (s->state != QEMU_VM_TRANSACTION_ACK) { > + error_report("recv invalid state %d", s->state); > + s->has_error = FT_TRANS_ERR_STATE_INVALID; > + ret = -EINVAL; > + goto out; > + } > + > + s->id++; > + ret = 0; > + > +out: > + return ret; > +} > + > +int ft_trans_cancel(void *opaque) > +{ > + QEMUFileFtTrans *s = opaque; > + > + /* invalid until migrate cancel on recevier side is supported */ > + if (!s->is_sender) { > + return -EINVAL; > + } > + > + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); > +} > + > +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, > + FtTransPutBufferFunc *put_buffer, > + FtTransGetBufferFunc *get_buffer, > + FtTransPutReadyFunc *put_ready, > + FtTransGetReadyFunc *get_ready, > + FtTransWaitForUnfreezeFunc > *wait_for_unfreeze, > + FtTransCloseFunc *close, > + bool is_sender) > +{ > + QEMUFileFtTrans *s; > + > + s = qemu_mallocz(sizeof(*s)); > + > + s->opaque = opaque; > + s->put_buffer = put_buffer; > + s->get_buffer = get_buffer; > + s->put_ready = put_ready; > + s->get_ready = get_ready; > + s->wait_for_unfreeze = wait_for_unfreeze; > + s->close = close; > + s->is_sender = is_sender; > + s->id = 0; > + s->seq = 0; > + s->rate_limit = 1; > + > + if (!s->is_sender) { > + s->buf_max_size = IO_BUF_SIZE; > + } > + > + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer, > + ft_trans_close, ft_trans_rate_limit, > + ft_trans_set_rate_limit, NULL); > + > + return s->file; > +} > diff --git a/ft_trans_file.h b/ft_trans_file.h > new file mode 100644 > index 0000000..5ca6b53 > --- /dev/null > +++ b/ft_trans_file.h > @@ -0,0 +1,72 @@ > +/* > + * Fault tolerant VM transaction QEMUFile > + * > + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. > + * > + * This work is licensed under the terms of the GNU GPL, version 2. See > + * the COPYING file in the top-level directory. > + * > + * This source code is based on buffered_file.h. > + * Copyright IBM, Corp. 2008 > + * Authors: > + * Anthony Liguori <aliguori@us.ibm.com> > + */ > + > +#ifndef QEMU_FT_TRANSACTION_FILE_H > +#define QEMU_FT_TRANSACTION_FILE_H > + > +#include "hw/hw.h" > + > +enum QEMU_VM_TRANSACTION_STATE { > + QEMU_VM_TRANSACTION_NACK = -1, > + QEMU_VM_TRANSACTION_INIT, > + QEMU_VM_TRANSACTION_BEGIN, > + QEMU_VM_TRANSACTION_CONTINUE, > + QEMU_VM_TRANSACTION_COMMIT, > + QEMU_VM_TRANSACTION_CANCEL, > + QEMU_VM_TRANSACTION_ATOMIC, > + QEMU_VM_TRANSACTION_ACK, > +}; > + > +enum FT_MODE { > + FT_ERROR = -1, > + FT_OFF, > + FT_INIT, > + FT_TRANSACTION_BEGIN, > + FT_TRANSACTION_ITER, > + FT_TRANSACTION_COMMIT, > + FT_TRANSACTION_ATOMIC, > + FT_TRANSACTION_RECV, > +}; > +extern enum FT_MODE ft_mode; > + > +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ > +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ > +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ > +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ > +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ > +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */ > +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ > + > +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, > size_t size); > +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t > pos, size_t size); > +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec > *iov, int iovcnt); > +typedef int (FtTransPutReadyFunc)(void); > +typedef int (FtTransGetReadyFunc)(void *opaque); > +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); > +typedef int (FtTransCloseFunc)(void *opaque); > + > +int ft_trans_begin(void *opaque); > +int ft_trans_commit(void *opaque); > +int ft_trans_cancel(void *opaque); > + > +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, > + FtTransPutBufferFunc *put_buffer, > + FtTransGetBufferFunc *get_buffer, > + FtTransPutReadyFunc *put_ready, > + FtTransGetReadyFunc *get_ready, > + FtTransWaitForUnfreezeFunc > *wait_for_unfreeze, > + FtTransCloseFunc *close, > + bool is_sender); > + > +#endif > diff --git a/migration.c b/migration.c > index dd3bf94..c5e0146 100644 > --- a/migration.c > +++ b/migration.c > @@ -15,6 +15,7 @@ > #include "migration.h" > #include "monitor.h" > #include "buffered_file.h" > +#include "ft_trans_file.h" > #include "sysemu.h" > #include "block.h" > #include "qemu_socket.h" > @@ -31,6 +32,8 @@ > do { } while (0) > #endif > > +enum FT_MODE ft_mode = FT_OFF; > + > /* Migration speed throttling */ > static int64_t max_throttle = (32 << 20); > > diff --git a/trace-events b/trace-events > index e6138ea..50ac840 100644 > --- a/trace-events > +++ b/trace-events > @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) "spice > wrottn %lu of requested %zd > disable spice_vmc_read(int bytes, int len) "spice read %lu of requested > %zd" > disable spice_vmc_register_interface(void *scd) "spice vmc registered > interface %p" > disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered > interface %p" > + > +# ft_trans_file.c > +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing > buffer from %zu by %zu" > +disable ft_trans_append(size_t size) "buffering %zu bytes" > +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu bytes" > +disable ft_trans_send_header(uint16_t cmd) "send header %d" > +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" > +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes at > %"PRId64"" > +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) > "recv %d of %d total %d" > +disable ft_trans_close(void) "closing" > +disable ft_trans_freeze_output(void) "backend not ready, freezing output" > +disable ft_trans_freeze_input(void) "backend not ready, freezing input" > +disable ft_trans_put_ready(void) "file is ready to put" > +disable ft_trans_get_ready(void) "file is ready to get" > +disable ft_trans_cb(void *cb) "callback %p" > -- > 1.7.1.2 > > -- > To unsubscribe from this list: send the line "unsubscribe kvm" in > the body of a message to majordomo@vger.kernel.org > More majordomo info at http://vger.kernel.org/majordomo-info.html >
Hi Green, 2011/2/21 ya su <suya94335@gmail.com>: > Yoshiaki: > > I have one question about ram_save_live, during migration 3 > stage(completation stage), it will call > cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty pages. > at the end of migrate_ft_trans_connect function, it will invoke vm_start(), > at this time, cpu_physical_memory_set_dirty_tracking(1) is not called yet, > so there may have some ram pages not recorded when qemu_savevm_trans_begin > is called. I think you need calll > cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect > function, Am I right? Thank you for taking a look. When qemu_savevm_trans_begin is called for the first time, it calls ram_save_live with stage 1, that sends all pages and sets dirty tracking, so there won't be missing pages. Note that event-tap is turned on by then, meaning no outputs are sent before finishing the first transaction. I understand that this implementation is inefficient, and planning to introduce a new stage that is almost same as stage 3 but keeps dirty tracking in the future. Thanks, Yoshi > > BR > > Green. > > > 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> >> This code implements VM transaction protocol. Like buffered_file, it >> sits between savevm and migration layer. With this architecture, VM >> transaction protocol is implemented mostly independent from other >> existing code. >> >> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp> >> --- >> Makefile.objs | 1 + >> ft_trans_file.c | 624 >> +++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> ft_trans_file.h | 72 +++++++ >> migration.c | 3 + >> trace-events | 15 ++ >> 5 files changed, 715 insertions(+), 0 deletions(-) >> create mode 100644 ft_trans_file.c >> create mode 100644 ft_trans_file.h >> >> diff --git a/Makefile.objs b/Makefile.objs >> index 353b1a8..04148b5 100644 >> --- a/Makefile.objs >> +++ b/Makefile.objs >> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o >> common-obj-y += qdev.o qdev-properties.o >> common-obj-y += block-migration.o >> common-obj-y += pflib.o >> +common-obj-y += ft_trans_file.o >> >> common-obj-$(CONFIG_BRLAPI) += baum.o >> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o >> migration-fd.o >> diff --git a/ft_trans_file.c b/ft_trans_file.c >> new file mode 100644 >> index 0000000..2b42b95 >> --- /dev/null >> +++ b/ft_trans_file.c >> @@ -0,0 +1,624 @@ >> +/* >> + * Fault tolerant VM transaction QEMUFile >> + * >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. >> + * >> + * This work is licensed under the terms of the GNU GPL, version 2. See >> + * the COPYING file in the top-level directory. >> + * >> + * This source code is based on buffered_file.c. >> + * Copyright IBM, Corp. 2008 >> + * Authors: >> + * Anthony Liguori <aliguori@us.ibm.com> >> + */ >> + >> +#include "qemu-common.h" >> +#include "qemu-error.h" >> +#include "hw/hw.h" >> +#include "qemu-timer.h" >> +#include "sysemu.h" >> +#include "qemu-char.h" >> +#include "trace.h" >> +#include "ft_trans_file.h" >> + >> +typedef struct FtTransHdr >> +{ >> + uint16_t cmd; >> + uint16_t id; >> + uint32_t seq; >> + uint32_t payload_len; >> +} FtTransHdr; >> + >> +typedef struct QEMUFileFtTrans >> +{ >> + FtTransPutBufferFunc *put_buffer; >> + FtTransGetBufferFunc *get_buffer; >> + FtTransPutReadyFunc *put_ready; >> + FtTransGetReadyFunc *get_ready; >> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; >> + FtTransCloseFunc *close; >> + void *opaque; >> + QEMUFile *file; >> + >> + enum QEMU_VM_TRANSACTION_STATE state; >> + uint32_t seq; >> + uint16_t id; >> + >> + int has_error; >> + >> + bool freeze_output; >> + bool freeze_input; >> + bool rate_limit; >> + bool is_sender; >> + bool is_payload; >> + >> + uint8_t *buf; >> + size_t buf_max_size; >> + size_t put_offset; >> + size_t get_offset; >> + >> + FtTransHdr header; >> + size_t header_offset; >> +} QEMUFileFtTrans; >> + >> +#define IO_BUF_SIZE 32768 >> + >> +static void ft_trans_append(QEMUFileFtTrans *s, >> + const uint8_t *buf, size_t size) >> +{ >> + if (size > (s->buf_max_size - s->put_offset)) { >> + trace_ft_trans_realloc(s->buf_max_size, size + 1024); >> + s->buf_max_size += size + 1024; >> + s->buf = qemu_realloc(s->buf, s->buf_max_size); >> + } >> + >> + trace_ft_trans_append(size); >> + memcpy(s->buf + s->put_offset, buf, size); >> + s->put_offset += size; >> +} >> + >> +static void ft_trans_flush(QEMUFileFtTrans *s) >> +{ >> + size_t offset = 0; >> + >> + if (s->has_error) { >> + error_report("flush when error %d, bailing", s->has_error); >> + return; >> + } >> + >> + while (offset < s->put_offset) { >> + ssize_t ret; >> + >> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - >> offset); >> + if (ret == -EAGAIN) { >> + break; >> + } >> + >> + if (ret <= 0) { >> + error_report("error flushing data, %s", strerror(errno)); >> + s->has_error = FT_TRANS_ERR_FLUSH; >> + break; >> + } else { >> + offset += ret; >> + } >> + } >> + >> + trace_ft_trans_flush(offset, s->put_offset); >> + memmove(s->buf, s->buf + offset, s->put_offset - offset); >> + s->put_offset -= offset; >> + s->freeze_output = !!s->put_offset; >> +} >> + >> +static ssize_t ft_trans_put(void *opaque, void *buf, int size) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + size_t offset = 0; >> + ssize_t len; >> + >> + /* flush buffered data before putting next */ >> + if (s->put_offset) { >> + ft_trans_flush(s); >> + } >> + >> + while (!s->freeze_output && offset < size) { >> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - >> offset); >> + >> + if (len == -EAGAIN) { >> + trace_ft_trans_freeze_output(); >> + s->freeze_output = 1; >> + break; >> + } >> + >> + if (len <= 0) { >> + error_report("putting data failed, %s", strerror(errno)); >> + s->has_error = 1; >> + offset = -EINVAL; >> + break; >> + } >> + >> + offset += len; >> + } >> + >> + if (s->freeze_output) { >> + ft_trans_append(s, buf + offset, size - offset); >> + offset = size; >> + } >> + >> + return offset; >> +} >> + >> +static int ft_trans_send_header(QEMUFileFtTrans *s, >> + enum QEMU_VM_TRANSACTION_STATE state, >> + uint32_t payload_len) >> +{ >> + int ret; >> + FtTransHdr *hdr = &s->header; >> + >> + trace_ft_trans_send_header(state); >> + >> + hdr->cmd = s->state = state; >> + hdr->id = s->id; >> + hdr->seq = s->seq; >> + hdr->payload_len = payload_len; >> + >> + ret = ft_trans_put(s, hdr, sizeof(*hdr)); >> + if (ret < 0) { >> + error_report("send header failed"); >> + s->has_error = FT_TRANS_ERR_SEND_HDR; >> + } >> + >> + return ret; >> +} >> + >> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t >> pos, int size) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + ssize_t ret; >> + >> + trace_ft_trans_put_buffer(size, pos); >> + >> + if (s->has_error) { >> + error_report("put_buffer when error %d, bailing", s->has_error); >> + return -EINVAL; >> + } >> + >> + /* assuming qemu_file_put_notify() is calling */ >> + if (pos == 0 && size == 0) { >> + trace_ft_trans_put_ready(); >> + ft_trans_flush(s); >> + >> + if (!s->freeze_output) { >> + trace_ft_trans_cb(s->put_ready); >> + ret = s->put_ready(); >> + } >> + >> + ret = 0; >> + goto out; >> + } >> + >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + ret = ft_trans_put(s, (uint8_t *)buf, size); >> + if (ret < 0) { >> + error_report("send palyload failed"); >> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; >> + goto out; >> + } >> + >> + s->seq++; >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + size_t offset = 0; >> + ssize_t len; >> + >> + s->freeze_input = 0; >> + >> + while (offset < size) { >> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, >> + 0, size - offset); >> + if (len == -EAGAIN) { >> + trace_ft_trans_freeze_input(); >> + s->freeze_input = 1; >> + break; >> + } >> + >> + if (len <= 0) { >> + error_report("fill buffer failed, %s", strerror(errno)); >> + s->has_error = 1; >> + return -EINVAL; >> + } >> + >> + offset += len; >> + } >> + >> + return offset; >> +} >> + >> +static int ft_trans_recv_header(QEMUFileFtTrans *s) >> +{ >> + int ret; >> + char *buf = (char *)&s->header + s->header_offset; >> + >> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - >> s->header_offset); >> + if (ret < 0) { >> + error_report("recv header failed"); >> + s->has_error = FT_TRANS_ERR_RECV_HDR; >> + goto out; >> + } >> + >> + s->header_offset += ret; >> + if (s->header_offset == sizeof(FtTransHdr)) { >> + trace_ft_trans_recv_header(s->header.cmd); >> + s->state = s->header.cmd; >> + s->header_offset = 0; >> + >> + if (!s->is_sender) { >> + s->id = s->header.id; >> + s->seq = s->header.seq; >> + } >> + } >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_recv_payload(QEMUFileFtTrans *s) >> +{ >> + QEMUFile *f = s->file; >> + int ret = -1; >> + >> + /* extend QEMUFile buf if there weren't enough space */ >> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { >> + s->buf_max_size += (s->header.payload_len - >> + (s->buf_max_size - s->get_offset)); >> + s->buf = qemu_realloc_buffer(f, s->buf_max_size); >> + } >> + >> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, >> + s->header.payload_len); >> + if (ret < 0) { >> + error_report("recv payload failed"); >> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; >> + goto out; >> + } >> + >> + trace_ft_trans_recv_payload(ret, s->header.payload_len, >> s->get_offset); >> + >> + s->header.payload_len -= ret; >> + s->get_offset += ret; >> + s->is_payload = !!s->header.payload_len; >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_recv(QEMUFileFtTrans *s) >> +{ >> + int ret; >> + >> + /* get payload and return */ >> + if (s->is_payload) { >> + ret = ft_trans_recv_payload(s); >> + goto out; >> + } >> + >> + ret = ft_trans_recv_header(s); >> + if (ret < 0 || s->freeze_input) { >> + goto out; >> + } >> + >> + switch (s->state) { >> + case QEMU_VM_TRANSACTION_BEGIN: >> + /* CONTINUE or COMMIT should come shortly */ >> + s->is_payload = 0; >> + break; >> + >> + case QEMU_VM_TRANSACTION_CONTINUE: >> + /* get payload */ >> + s->is_payload = 1; >> + break; >> + >> + case QEMU_VM_TRANSACTION_COMMIT: >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + trace_ft_trans_cb(s->get_ready); >> + ret = s->get_ready(s->opaque); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + qemu_clear_buffer(s->file); >> + s->get_offset = 0; >> + s->is_payload = 0; >> + >> + break; >> + >> + case QEMU_VM_TRANSACTION_ATOMIC: >> + /* not implemented yet */ >> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d", >> + ret); >> + break; >> + >> + case QEMU_VM_TRANSACTION_CANCEL: >> + /* return -EINVAL until migrate cancel on recevier side is >> supported */ >> + ret = -EINVAL; >> + break; >> + >> + default: >> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + } >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, >> + int64_t pos, int size) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + int ret; >> + >> + if (s->has_error) { >> + error_report("get_buffer when error %d, bailing", s->has_error); >> + return -EINVAL; >> + } >> + >> + /* assuming qemu_file_get_notify() is calling */ >> + if (pos == 0 && size == 0) { >> + trace_ft_trans_get_ready(); >> + s->freeze_input = 0; >> + >> + /* sender should be waiting for ACK */ >> + if (s->is_sender) { >> + ret = ft_trans_recv_header(s); >> + if (s->freeze_input) { >> + ret = 0; >> + goto out; >> + } >> + if (ret < 0) { >> + error_report("recv ack failed"); >> + goto out; >> + } >> + >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> + error_report("recv invalid state %d", s->state); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + goto out; >> + } >> + >> + trace_ft_trans_cb(s->get_ready); >> + ret = s->get_ready(s->opaque); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + /* proceed trans id */ >> + s->id++; >> + >> + return 0; >> + } >> + >> + /* set QEMUFile buf at beginning */ >> + if (!s->buf) { >> + s->buf = buf; >> + } >> + >> + ret = ft_trans_recv(s); >> + goto out; >> + } >> + >> + ret = s->get_offset; >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_close(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + int ret; >> + >> + trace_ft_trans_close(); >> + ret = s->close(s->opaque); >> + if (s->is_sender) { >> + qemu_free(s->buf); >> + } >> + qemu_free(s); >> + >> + return ret; >> +} >> + >> +static int ft_trans_rate_limit(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + >> + if (s->has_error) { >> + return 0; >> + } >> + >> + if (s->rate_limit && s->freeze_output) { >> + return 1; >> + } >> + >> + return 0; >> +} >> + >> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + >> + if (s->has_error) { >> + goto out; >> + } >> + >> + s->rate_limit = !!new_rate; >> + >> +out: >> + return s->rate_limit; >> +} >> + >> +int ft_trans_begin(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + int ret; >> + s->seq = 0; >> + >> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ >> + if (!s->is_sender) { >> + if (s->state != QEMU_VM_TRANSACTION_INIT) { >> + error_report("invalid state %d", s->state); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + } >> + >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> + goto out; >> + } >> + >> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */ >> + if (s->state == QEMU_VM_TRANSACTION_INIT) { >> +retry: >> + ret = ft_trans_recv_header(s); >> + if (s->freeze_input) { >> + goto retry; >> + } >> + if (ret < 0) { >> + error_report("recv ack failed"); >> + goto out; >> + } >> + >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> + error_report("recv invalid state %d", s->state); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + goto out; >> + } >> + } >> + >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + s->state = QEMU_VM_TRANSACTION_CONTINUE; >> + >> +out: >> + return ret; >> +} >> + >> +int ft_trans_commit(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + int ret; >> + >> + if (!s->is_sender) { >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> + goto out; >> + } >> + >> + /* sender should flush buf before sending COMMIT */ >> + qemu_fflush(s->file); >> + >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + while (!s->has_error && s->put_offset) { >> + ft_trans_flush(s); >> + if (s->freeze_output) { >> + s->wait_for_unfreeze(s); >> + } >> + } >> + >> + if (s->has_error) { >> + ret = -EINVAL; >> + goto out; >> + } >> + >> + ret = ft_trans_recv_header(s); >> + if (s->freeze_input) { >> + ret = -EAGAIN; >> + goto out; >> + } >> + if (ret < 0) { >> + error_report("recv ack failed"); >> + goto out; >> + } >> + >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> + error_report("recv invalid state %d", s->state); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + goto out; >> + } >> + >> + s->id++; >> + ret = 0; >> + >> +out: >> + return ret; >> +} >> + >> +int ft_trans_cancel(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + >> + /* invalid until migrate cancel on recevier side is supported */ >> + if (!s->is_sender) { >> + return -EINVAL; >> + } >> + >> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); >> +} >> + >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, >> + FtTransPutBufferFunc *put_buffer, >> + FtTransGetBufferFunc *get_buffer, >> + FtTransPutReadyFunc *put_ready, >> + FtTransGetReadyFunc *get_ready, >> + FtTransWaitForUnfreezeFunc >> *wait_for_unfreeze, >> + FtTransCloseFunc *close, >> + bool is_sender) >> +{ >> + QEMUFileFtTrans *s; >> + >> + s = qemu_mallocz(sizeof(*s)); >> + >> + s->opaque = opaque; >> + s->put_buffer = put_buffer; >> + s->get_buffer = get_buffer; >> + s->put_ready = put_ready; >> + s->get_ready = get_ready; >> + s->wait_for_unfreeze = wait_for_unfreeze; >> + s->close = close; >> + s->is_sender = is_sender; >> + s->id = 0; >> + s->seq = 0; >> + s->rate_limit = 1; >> + >> + if (!s->is_sender) { >> + s->buf_max_size = IO_BUF_SIZE; >> + } >> + >> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer, >> + ft_trans_close, ft_trans_rate_limit, >> + ft_trans_set_rate_limit, NULL); >> + >> + return s->file; >> +} >> diff --git a/ft_trans_file.h b/ft_trans_file.h >> new file mode 100644 >> index 0000000..5ca6b53 >> --- /dev/null >> +++ b/ft_trans_file.h >> @@ -0,0 +1,72 @@ >> +/* >> + * Fault tolerant VM transaction QEMUFile >> + * >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. >> + * >> + * This work is licensed under the terms of the GNU GPL, version 2. See >> + * the COPYING file in the top-level directory. >> + * >> + * This source code is based on buffered_file.h. >> + * Copyright IBM, Corp. 2008 >> + * Authors: >> + * Anthony Liguori <aliguori@us.ibm.com> >> + */ >> + >> +#ifndef QEMU_FT_TRANSACTION_FILE_H >> +#define QEMU_FT_TRANSACTION_FILE_H >> + >> +#include "hw/hw.h" >> + >> +enum QEMU_VM_TRANSACTION_STATE { >> + QEMU_VM_TRANSACTION_NACK = -1, >> + QEMU_VM_TRANSACTION_INIT, >> + QEMU_VM_TRANSACTION_BEGIN, >> + QEMU_VM_TRANSACTION_CONTINUE, >> + QEMU_VM_TRANSACTION_COMMIT, >> + QEMU_VM_TRANSACTION_CANCEL, >> + QEMU_VM_TRANSACTION_ATOMIC, >> + QEMU_VM_TRANSACTION_ACK, >> +}; >> + >> +enum FT_MODE { >> + FT_ERROR = -1, >> + FT_OFF, >> + FT_INIT, >> + FT_TRANSACTION_BEGIN, >> + FT_TRANSACTION_ITER, >> + FT_TRANSACTION_COMMIT, >> + FT_TRANSACTION_ATOMIC, >> + FT_TRANSACTION_RECV, >> +}; >> +extern enum FT_MODE ft_mode; >> + >> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ >> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ >> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ >> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ >> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ >> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */ >> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ >> + >> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, >> size_t size); >> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t >> pos, size_t size); >> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec >> *iov, int iovcnt); >> +typedef int (FtTransPutReadyFunc)(void); >> +typedef int (FtTransGetReadyFunc)(void *opaque); >> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); >> +typedef int (FtTransCloseFunc)(void *opaque); >> + >> +int ft_trans_begin(void *opaque); >> +int ft_trans_commit(void *opaque); >> +int ft_trans_cancel(void *opaque); >> + >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, >> + FtTransPutBufferFunc *put_buffer, >> + FtTransGetBufferFunc *get_buffer, >> + FtTransPutReadyFunc *put_ready, >> + FtTransGetReadyFunc *get_ready, >> + FtTransWaitForUnfreezeFunc >> *wait_for_unfreeze, >> + FtTransCloseFunc *close, >> + bool is_sender); >> + >> +#endif >> diff --git a/migration.c b/migration.c >> index dd3bf94..c5e0146 100644 >> --- a/migration.c >> +++ b/migration.c >> @@ -15,6 +15,7 @@ >> #include "migration.h" >> #include "monitor.h" >> #include "buffered_file.h" >> +#include "ft_trans_file.h" >> #include "sysemu.h" >> #include "block.h" >> #include "qemu_socket.h" >> @@ -31,6 +32,8 @@ >> do { } while (0) >> #endif >> >> +enum FT_MODE ft_mode = FT_OFF; >> + >> /* Migration speed throttling */ >> static int64_t max_throttle = (32 << 20); >> >> diff --git a/trace-events b/trace-events >> index e6138ea..50ac840 100644 >> --- a/trace-events >> +++ b/trace-events >> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) "spice >> wrottn %lu of requested %zd >> disable spice_vmc_read(int bytes, int len) "spice read %lu of requested >> %zd" >> disable spice_vmc_register_interface(void *scd) "spice vmc registered >> interface %p" >> disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered >> interface %p" >> + >> +# ft_trans_file.c >> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing >> buffer from %zu by %zu" >> +disable ft_trans_append(size_t size) "buffering %zu bytes" >> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu >> bytes" >> +disable ft_trans_send_header(uint16_t cmd) "send header %d" >> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" >> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes >> at %"PRId64"" >> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) >> "recv %d of %d total %d" >> +disable ft_trans_close(void) "closing" >> +disable ft_trans_freeze_output(void) "backend not ready, freezing output" >> +disable ft_trans_freeze_input(void) "backend not ready, freezing input" >> +disable ft_trans_put_ready(void) "file is ready to put" >> +disable ft_trans_get_ready(void) "file is ready to get" >> +disable ft_trans_cb(void *cb) "callback %p" >> -- >> 1.7.1.2 >> >> -- >> To unsubscribe from this list: send the line "unsubscribe kvm" in >> the body of a message to majordomo@vger.kernel.org >> More majordomo info at http://vger.kernel.org/majordomo-info.html > >
Yoshi: thanks for your explaining. if you introduce a new stage as 3, I think stage 1 also need to change as it will mark all pages dirty. looking forward to your new patch update. Green. 2011/2/21 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> > Hi Green, > > 2011/2/21 ya su <suya94335@gmail.com>: > > Yoshiaki: > > > > I have one question about ram_save_live, during migration 3 > > stage(completation stage), it will call > > cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty > pages. > > at the end of migrate_ft_trans_connect function, it will invoke > vm_start(), > > at this time, cpu_physical_memory_set_dirty_tracking(1) is not called > yet, > > so there may have some ram pages not recorded when > qemu_savevm_trans_begin > > is called. I think you need calll > > cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect > > function, Am I right? > > Thank you for taking a look. > When qemu_savevm_trans_begin is called for the first time, it > calls ram_save_live with stage 1, that sends all pages and sets > dirty tracking, so there won't be missing pages. Note that > event-tap is turned on by then, meaning no outputs are sent before > finishing the first transaction. I understand that this > implementation is inefficient, and planning to introduce a new > stage that is almost same as stage 3 but keeps dirty tracking in > the future. > > Thanks, > > Yoshi > > > > > BR > > > > Green. > > > > > > 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> > >> > >> This code implements VM transaction protocol. Like buffered_file, it > >> sits between savevm and migration layer. With this architecture, VM > >> transaction protocol is implemented mostly independent from other > >> existing code. > >> > >> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> > >> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp> > >> --- > >> Makefile.objs | 1 + > >> ft_trans_file.c | 624 > >> +++++++++++++++++++++++++++++++++++++++++++++++++++++++ > >> ft_trans_file.h | 72 +++++++ > >> migration.c | 3 + > >> trace-events | 15 ++ > >> 5 files changed, 715 insertions(+), 0 deletions(-) > >> create mode 100644 ft_trans_file.c > >> create mode 100644 ft_trans_file.h > >> > >> diff --git a/Makefile.objs b/Makefile.objs > >> index 353b1a8..04148b5 100644 > >> --- a/Makefile.objs > >> +++ b/Makefile.objs > >> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o > >> common-obj-y += qdev.o qdev-properties.o > >> common-obj-y += block-migration.o > >> common-obj-y += pflib.o > >> +common-obj-y += ft_trans_file.o > >> > >> common-obj-$(CONFIG_BRLAPI) += baum.o > >> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o > >> migration-fd.o > >> diff --git a/ft_trans_file.c b/ft_trans_file.c > >> new file mode 100644 > >> index 0000000..2b42b95 > >> --- /dev/null > >> +++ b/ft_trans_file.c > >> @@ -0,0 +1,624 @@ > >> +/* > >> + * Fault tolerant VM transaction QEMUFile > >> + * > >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. > >> + * > >> + * This work is licensed under the terms of the GNU GPL, version 2. > See > >> + * the COPYING file in the top-level directory. > >> + * > >> + * This source code is based on buffered_file.c. > >> + * Copyright IBM, Corp. 2008 > >> + * Authors: > >> + * Anthony Liguori <aliguori@us.ibm.com> > >> + */ > >> + > >> +#include "qemu-common.h" > >> +#include "qemu-error.h" > >> +#include "hw/hw.h" > >> +#include "qemu-timer.h" > >> +#include "sysemu.h" > >> +#include "qemu-char.h" > >> +#include "trace.h" > >> +#include "ft_trans_file.h" > >> + > >> +typedef struct FtTransHdr > >> +{ > >> + uint16_t cmd; > >> + uint16_t id; > >> + uint32_t seq; > >> + uint32_t payload_len; > >> +} FtTransHdr; > >> + > >> +typedef struct QEMUFileFtTrans > >> +{ > >> + FtTransPutBufferFunc *put_buffer; > >> + FtTransGetBufferFunc *get_buffer; > >> + FtTransPutReadyFunc *put_ready; > >> + FtTransGetReadyFunc *get_ready; > >> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; > >> + FtTransCloseFunc *close; > >> + void *opaque; > >> + QEMUFile *file; > >> + > >> + enum QEMU_VM_TRANSACTION_STATE state; > >> + uint32_t seq; > >> + uint16_t id; > >> + > >> + int has_error; > >> + > >> + bool freeze_output; > >> + bool freeze_input; > >> + bool rate_limit; > >> + bool is_sender; > >> + bool is_payload; > >> + > >> + uint8_t *buf; > >> + size_t buf_max_size; > >> + size_t put_offset; > >> + size_t get_offset; > >> + > >> + FtTransHdr header; > >> + size_t header_offset; > >> +} QEMUFileFtTrans; > >> + > >> +#define IO_BUF_SIZE 32768 > >> + > >> +static void ft_trans_append(QEMUFileFtTrans *s, > >> + const uint8_t *buf, size_t size) > >> +{ > >> + if (size > (s->buf_max_size - s->put_offset)) { > >> + trace_ft_trans_realloc(s->buf_max_size, size + 1024); > >> + s->buf_max_size += size + 1024; > >> + s->buf = qemu_realloc(s->buf, s->buf_max_size); > >> + } > >> + > >> + trace_ft_trans_append(size); > >> + memcpy(s->buf + s->put_offset, buf, size); > >> + s->put_offset += size; > >> +} > >> + > >> +static void ft_trans_flush(QEMUFileFtTrans *s) > >> +{ > >> + size_t offset = 0; > >> + > >> + if (s->has_error) { > >> + error_report("flush when error %d, bailing", s->has_error); > >> + return; > >> + } > >> + > >> + while (offset < s->put_offset) { > >> + ssize_t ret; > >> + > >> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - > >> offset); > >> + if (ret == -EAGAIN) { > >> + break; > >> + } > >> + > >> + if (ret <= 0) { > >> + error_report("error flushing data, %s", strerror(errno)); > >> + s->has_error = FT_TRANS_ERR_FLUSH; > >> + break; > >> + } else { > >> + offset += ret; > >> + } > >> + } > >> + > >> + trace_ft_trans_flush(offset, s->put_offset); > >> + memmove(s->buf, s->buf + offset, s->put_offset - offset); > >> + s->put_offset -= offset; > >> + s->freeze_output = !!s->put_offset; > >> +} > >> + > >> +static ssize_t ft_trans_put(void *opaque, void *buf, int size) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + size_t offset = 0; > >> + ssize_t len; > >> + > >> + /* flush buffered data before putting next */ > >> + if (s->put_offset) { > >> + ft_trans_flush(s); > >> + } > >> + > >> + while (!s->freeze_output && offset < size) { > >> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - > >> offset); > >> + > >> + if (len == -EAGAIN) { > >> + trace_ft_trans_freeze_output(); > >> + s->freeze_output = 1; > >> + break; > >> + } > >> + > >> + if (len <= 0) { > >> + error_report("putting data failed, %s", strerror(errno)); > >> + s->has_error = 1; > >> + offset = -EINVAL; > >> + break; > >> + } > >> + > >> + offset += len; > >> + } > >> + > >> + if (s->freeze_output) { > >> + ft_trans_append(s, buf + offset, size - offset); > >> + offset = size; > >> + } > >> + > >> + return offset; > >> +} > >> + > >> +static int ft_trans_send_header(QEMUFileFtTrans *s, > >> + enum QEMU_VM_TRANSACTION_STATE state, > >> + uint32_t payload_len) > >> +{ > >> + int ret; > >> + FtTransHdr *hdr = &s->header; > >> + > >> + trace_ft_trans_send_header(state); > >> + > >> + hdr->cmd = s->state = state; > >> + hdr->id = s->id; > >> + hdr->seq = s->seq; > >> + hdr->payload_len = payload_len; > >> + > >> + ret = ft_trans_put(s, hdr, sizeof(*hdr)); > >> + if (ret < 0) { > >> + error_report("send header failed"); > >> + s->has_error = FT_TRANS_ERR_SEND_HDR; > >> + } > >> + > >> + return ret; > >> +} > >> + > >> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, > int64_t > >> pos, int size) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + ssize_t ret; > >> + > >> + trace_ft_trans_put_buffer(size, pos); > >> + > >> + if (s->has_error) { > >> + error_report("put_buffer when error %d, bailing", > s->has_error); > >> + return -EINVAL; > >> + } > >> + > >> + /* assuming qemu_file_put_notify() is calling */ > >> + if (pos == 0 && size == 0) { > >> + trace_ft_trans_put_ready(); > >> + ft_trans_flush(s); > >> + > >> + if (!s->freeze_output) { > >> + trace_ft_trans_cb(s->put_ready); > >> + ret = s->put_ready(); > >> + } > >> + > >> + ret = 0; > >> + goto out; > >> + } > >> + > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + ret = ft_trans_put(s, (uint8_t *)buf, size); > >> + if (ret < 0) { > >> + error_report("send palyload failed"); > >> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; > >> + goto out; > >> + } > >> + > >> + s->seq++; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + size_t offset = 0; > >> + ssize_t len; > >> + > >> + s->freeze_input = 0; > >> + > >> + while (offset < size) { > >> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, > >> + 0, size - offset); > >> + if (len == -EAGAIN) { > >> + trace_ft_trans_freeze_input(); > >> + s->freeze_input = 1; > >> + break; > >> + } > >> + > >> + if (len <= 0) { > >> + error_report("fill buffer failed, %s", strerror(errno)); > >> + s->has_error = 1; > >> + return -EINVAL; > >> + } > >> + > >> + offset += len; > >> + } > >> + > >> + return offset; > >> +} > >> + > >> +static int ft_trans_recv_header(QEMUFileFtTrans *s) > >> +{ > >> + int ret; > >> + char *buf = (char *)&s->header + s->header_offset; > >> + > >> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - > >> s->header_offset); > >> + if (ret < 0) { > >> + error_report("recv header failed"); > >> + s->has_error = FT_TRANS_ERR_RECV_HDR; > >> + goto out; > >> + } > >> + > >> + s->header_offset += ret; > >> + if (s->header_offset == sizeof(FtTransHdr)) { > >> + trace_ft_trans_recv_header(s->header.cmd); > >> + s->state = s->header.cmd; > >> + s->header_offset = 0; > >> + > >> + if (!s->is_sender) { > >> + s->id = s->header.id; > >> + s->seq = s->header.seq; > >> + } > >> + } > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_recv_payload(QEMUFileFtTrans *s) > >> +{ > >> + QEMUFile *f = s->file; > >> + int ret = -1; > >> + > >> + /* extend QEMUFile buf if there weren't enough space */ > >> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { > >> + s->buf_max_size += (s->header.payload_len - > >> + (s->buf_max_size - s->get_offset)); > >> + s->buf = qemu_realloc_buffer(f, s->buf_max_size); > >> + } > >> + > >> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, > >> + s->header.payload_len); > >> + if (ret < 0) { > >> + error_report("recv payload failed"); > >> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; > >> + goto out; > >> + } > >> + > >> + trace_ft_trans_recv_payload(ret, s->header.payload_len, > >> s->get_offset); > >> + > >> + s->header.payload_len -= ret; > >> + s->get_offset += ret; > >> + s->is_payload = !!s->header.payload_len; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_recv(QEMUFileFtTrans *s) > >> +{ > >> + int ret; > >> + > >> + /* get payload and return */ > >> + if (s->is_payload) { > >> + ret = ft_trans_recv_payload(s); > >> + goto out; > >> + } > >> + > >> + ret = ft_trans_recv_header(s); > >> + if (ret < 0 || s->freeze_input) { > >> + goto out; > >> + } > >> + > >> + switch (s->state) { > >> + case QEMU_VM_TRANSACTION_BEGIN: > >> + /* CONTINUE or COMMIT should come shortly */ > >> + s->is_payload = 0; > >> + break; > >> + > >> + case QEMU_VM_TRANSACTION_CONTINUE: > >> + /* get payload */ > >> + s->is_payload = 1; > >> + break; > >> + > >> + case QEMU_VM_TRANSACTION_COMMIT: > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + trace_ft_trans_cb(s->get_ready); > >> + ret = s->get_ready(s->opaque); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + qemu_clear_buffer(s->file); > >> + s->get_offset = 0; > >> + s->is_payload = 0; > >> + > >> + break; > >> + > >> + case QEMU_VM_TRANSACTION_ATOMIC: > >> + /* not implemented yet */ > >> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d", > >> + ret); > >> + break; > >> + > >> + case QEMU_VM_TRANSACTION_CANCEL: > >> + /* return -EINVAL until migrate cancel on recevier side is > >> supported */ > >> + ret = -EINVAL; > >> + break; > >> + > >> + default: > >> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + } > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, > >> + int64_t pos, int size) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + int ret; > >> + > >> + if (s->has_error) { > >> + error_report("get_buffer when error %d, bailing", > s->has_error); > >> + return -EINVAL; > >> + } > >> + > >> + /* assuming qemu_file_get_notify() is calling */ > >> + if (pos == 0 && size == 0) { > >> + trace_ft_trans_get_ready(); > >> + s->freeze_input = 0; > >> + > >> + /* sender should be waiting for ACK */ > >> + if (s->is_sender) { > >> + ret = ft_trans_recv_header(s); > >> + if (s->freeze_input) { > >> + ret = 0; > >> + goto out; > >> + } > >> + if (ret < 0) { > >> + error_report("recv ack failed"); > >> + goto out; > >> + } > >> + > >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { > >> + error_report("recv invalid state %d", s->state); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + goto out; > >> + } > >> + > >> + trace_ft_trans_cb(s->get_ready); > >> + ret = s->get_ready(s->opaque); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + /* proceed trans id */ > >> + s->id++; > >> + > >> + return 0; > >> + } > >> + > >> + /* set QEMUFile buf at beginning */ > >> + if (!s->buf) { > >> + s->buf = buf; > >> + } > >> + > >> + ret = ft_trans_recv(s); > >> + goto out; > >> + } > >> + > >> + ret = s->get_offset; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_close(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + int ret; > >> + > >> + trace_ft_trans_close(); > >> + ret = s->close(s->opaque); > >> + if (s->is_sender) { > >> + qemu_free(s->buf); > >> + } > >> + qemu_free(s); > >> + > >> + return ret; > >> +} > >> + > >> +static int ft_trans_rate_limit(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + > >> + if (s->has_error) { > >> + return 0; > >> + } > >> + > >> + if (s->rate_limit && s->freeze_output) { > >> + return 1; > >> + } > >> + > >> + return 0; > >> +} > >> + > >> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + > >> + if (s->has_error) { > >> + goto out; > >> + } > >> + > >> + s->rate_limit = !!new_rate; > >> + > >> +out: > >> + return s->rate_limit; > >> +} > >> + > >> +int ft_trans_begin(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + int ret; > >> + s->seq = 0; > >> + > >> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ > >> + if (!s->is_sender) { > >> + if (s->state != QEMU_VM_TRANSACTION_INIT) { > >> + error_report("invalid state %d", s->state); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + } > >> + > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); > >> + goto out; > >> + } > >> + > >> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */ > >> + if (s->state == QEMU_VM_TRANSACTION_INIT) { > >> +retry: > >> + ret = ft_trans_recv_header(s); > >> + if (s->freeze_input) { > >> + goto retry; > >> + } > >> + if (ret < 0) { > >> + error_report("recv ack failed"); > >> + goto out; > >> + } > >> + > >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { > >> + error_report("recv invalid state %d", s->state); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + goto out; > >> + } > >> + } > >> + > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + s->state = QEMU_VM_TRANSACTION_CONTINUE; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +int ft_trans_commit(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + int ret; > >> + > >> + if (!s->is_sender) { > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); > >> + goto out; > >> + } > >> + > >> + /* sender should flush buf before sending COMMIT */ > >> + qemu_fflush(s->file); > >> + > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + while (!s->has_error && s->put_offset) { > >> + ft_trans_flush(s); > >> + if (s->freeze_output) { > >> + s->wait_for_unfreeze(s); > >> + } > >> + } > >> + > >> + if (s->has_error) { > >> + ret = -EINVAL; > >> + goto out; > >> + } > >> + > >> + ret = ft_trans_recv_header(s); > >> + if (s->freeze_input) { > >> + ret = -EAGAIN; > >> + goto out; > >> + } > >> + if (ret < 0) { > >> + error_report("recv ack failed"); > >> + goto out; > >> + } > >> + > >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { > >> + error_report("recv invalid state %d", s->state); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + goto out; > >> + } > >> + > >> + s->id++; > >> + ret = 0; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +int ft_trans_cancel(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + > >> + /* invalid until migrate cancel on recevier side is supported */ > >> + if (!s->is_sender) { > >> + return -EINVAL; > >> + } > >> + > >> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); > >> +} > >> + > >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, > >> + FtTransPutBufferFunc *put_buffer, > >> + FtTransGetBufferFunc *get_buffer, > >> + FtTransPutReadyFunc *put_ready, > >> + FtTransGetReadyFunc *get_ready, > >> + FtTransWaitForUnfreezeFunc > >> *wait_for_unfreeze, > >> + FtTransCloseFunc *close, > >> + bool is_sender) > >> +{ > >> + QEMUFileFtTrans *s; > >> + > >> + s = qemu_mallocz(sizeof(*s)); > >> + > >> + s->opaque = opaque; > >> + s->put_buffer = put_buffer; > >> + s->get_buffer = get_buffer; > >> + s->put_ready = put_ready; > >> + s->get_ready = get_ready; > >> + s->wait_for_unfreeze = wait_for_unfreeze; > >> + s->close = close; > >> + s->is_sender = is_sender; > >> + s->id = 0; > >> + s->seq = 0; > >> + s->rate_limit = 1; > >> + > >> + if (!s->is_sender) { > >> + s->buf_max_size = IO_BUF_SIZE; > >> + } > >> + > >> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, > ft_trans_get_buffer, > >> + ft_trans_close, ft_trans_rate_limit, > >> + ft_trans_set_rate_limit, NULL); > >> + > >> + return s->file; > >> +} > >> diff --git a/ft_trans_file.h b/ft_trans_file.h > >> new file mode 100644 > >> index 0000000..5ca6b53 > >> --- /dev/null > >> +++ b/ft_trans_file.h > >> @@ -0,0 +1,72 @@ > >> +/* > >> + * Fault tolerant VM transaction QEMUFile > >> + * > >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. > >> + * > >> + * This work is licensed under the terms of the GNU GPL, version 2. > See > >> + * the COPYING file in the top-level directory. > >> + * > >> + * This source code is based on buffered_file.h. > >> + * Copyright IBM, Corp. 2008 > >> + * Authors: > >> + * Anthony Liguori <aliguori@us.ibm.com> > >> + */ > >> + > >> +#ifndef QEMU_FT_TRANSACTION_FILE_H > >> +#define QEMU_FT_TRANSACTION_FILE_H > >> + > >> +#include "hw/hw.h" > >> + > >> +enum QEMU_VM_TRANSACTION_STATE { > >> + QEMU_VM_TRANSACTION_NACK = -1, > >> + QEMU_VM_TRANSACTION_INIT, > >> + QEMU_VM_TRANSACTION_BEGIN, > >> + QEMU_VM_TRANSACTION_CONTINUE, > >> + QEMU_VM_TRANSACTION_COMMIT, > >> + QEMU_VM_TRANSACTION_CANCEL, > >> + QEMU_VM_TRANSACTION_ATOMIC, > >> + QEMU_VM_TRANSACTION_ACK, > >> +}; > >> + > >> +enum FT_MODE { > >> + FT_ERROR = -1, > >> + FT_OFF, > >> + FT_INIT, > >> + FT_TRANSACTION_BEGIN, > >> + FT_TRANSACTION_ITER, > >> + FT_TRANSACTION_COMMIT, > >> + FT_TRANSACTION_ATOMIC, > >> + FT_TRANSACTION_RECV, > >> +}; > >> +extern enum FT_MODE ft_mode; > >> + > >> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ > >> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ > >> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ > >> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ > >> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ > >> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed > */ > >> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ > >> + > >> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, > >> size_t size); > >> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t > >> pos, size_t size); > >> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec > >> *iov, int iovcnt); > >> +typedef int (FtTransPutReadyFunc)(void); > >> +typedef int (FtTransGetReadyFunc)(void *opaque); > >> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); > >> +typedef int (FtTransCloseFunc)(void *opaque); > >> + > >> +int ft_trans_begin(void *opaque); > >> +int ft_trans_commit(void *opaque); > >> +int ft_trans_cancel(void *opaque); > >> + > >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, > >> + FtTransPutBufferFunc *put_buffer, > >> + FtTransGetBufferFunc *get_buffer, > >> + FtTransPutReadyFunc *put_ready, > >> + FtTransGetReadyFunc *get_ready, > >> + FtTransWaitForUnfreezeFunc > >> *wait_for_unfreeze, > >> + FtTransCloseFunc *close, > >> + bool is_sender); > >> + > >> +#endif > >> diff --git a/migration.c b/migration.c > >> index dd3bf94..c5e0146 100644 > >> --- a/migration.c > >> +++ b/migration.c > >> @@ -15,6 +15,7 @@ > >> #include "migration.h" > >> #include "monitor.h" > >> #include "buffered_file.h" > >> +#include "ft_trans_file.h" > >> #include "sysemu.h" > >> #include "block.h" > >> #include "qemu_socket.h" > >> @@ -31,6 +32,8 @@ > >> do { } while (0) > >> #endif > >> > >> +enum FT_MODE ft_mode = FT_OFF; > >> + > >> /* Migration speed throttling */ > >> static int64_t max_throttle = (32 << 20); > >> > >> diff --git a/trace-events b/trace-events > >> index e6138ea..50ac840 100644 > >> --- a/trace-events > >> +++ b/trace-events > >> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) > "spice > >> wrottn %lu of requested %zd > >> disable spice_vmc_read(int bytes, int len) "spice read %lu of requested > >> %zd" > >> disable spice_vmc_register_interface(void *scd) "spice vmc registered > >> interface %p" > >> disable spice_vmc_unregister_interface(void *scd) "spice vmc > unregistered > >> interface %p" > >> + > >> +# ft_trans_file.c > >> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing > >> buffer from %zu by %zu" > >> +disable ft_trans_append(size_t size) "buffering %zu bytes" > >> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu > >> bytes" > >> +disable ft_trans_send_header(uint16_t cmd) "send header %d" > >> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" > >> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes > >> at %"PRId64"" > >> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) > >> "recv %d of %d total %d" > >> +disable ft_trans_close(void) "closing" > >> +disable ft_trans_freeze_output(void) "backend not ready, freezing > output" > >> +disable ft_trans_freeze_input(void) "backend not ready, freezing input" > >> +disable ft_trans_put_ready(void) "file is ready to put" > >> +disable ft_trans_get_ready(void) "file is ready to get" > >> +disable ft_trans_cb(void *cb) "callback %p" > >> -- > >> 1.7.1.2 > >> > >> -- > >> To unsubscribe from this list: send the line "unsubscribe kvm" in > >> the body of a message to majordomo@vger.kernel.org > >> More majordomo info at http://vger.kernel.org/majordomo-info.html > > > > >
2011/2/23 ya su <suya94335@gmail.com>: > Yoshi: > > thanks for your explaining. > if you introduce a new stage as 3, I think stage 1 also need to change as > it will mark all pages dirty. > looking forward to your new patch update. Unless there're strong comments from others, I won't put it in this series though because I don't want to touch other components as much as possible this time. Yoshi > > Green. > > > 2011/2/21 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> >> Hi Green, >> >> 2011/2/21 ya su <suya94335@gmail.com>: >> > Yoshiaki: >> > >> > I have one question about ram_save_live, during migration 3 >> > stage(completation stage), it will call >> > cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty >> > pages. >> > at the end of migrate_ft_trans_connect function, it will invoke >> > vm_start(), >> > at this time, cpu_physical_memory_set_dirty_tracking(1) is not called >> > yet, >> > so there may have some ram pages not recorded when >> > qemu_savevm_trans_begin >> > is called. I think you need calll >> > cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect >> > function, Am I right? >> >> Thank you for taking a look. >> When qemu_savevm_trans_begin is called for the first time, it >> calls ram_save_live with stage 1, that sends all pages and sets >> dirty tracking, so there won't be missing pages. Note that >> event-tap is turned on by then, meaning no outputs are sent before >> finishing the first transaction. I understand that this >> implementation is inefficient, and planning to introduce a new >> stage that is almost same as stage 3 but keeps dirty tracking in >> the future. >> >> Thanks, >> >> Yoshi >> >> > >> > BR >> > >> > Green. >> > >> > >> > 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> >> >> >> This code implements VM transaction protocol. Like buffered_file, it >> >> sits between savevm and migration layer. With this architecture, VM >> >> transaction protocol is implemented mostly independent from other >> >> existing code. >> >> >> >> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> >> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp> >> >> --- >> >> Makefile.objs | 1 + >> >> ft_trans_file.c | 624 >> >> +++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> >> ft_trans_file.h | 72 +++++++ >> >> migration.c | 3 + >> >> trace-events | 15 ++ >> >> 5 files changed, 715 insertions(+), 0 deletions(-) >> >> create mode 100644 ft_trans_file.c >> >> create mode 100644 ft_trans_file.h >> >> >> >> diff --git a/Makefile.objs b/Makefile.objs >> >> index 353b1a8..04148b5 100644 >> >> --- a/Makefile.objs >> >> +++ b/Makefile.objs >> >> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o >> >> common-obj-y += qdev.o qdev-properties.o >> >> common-obj-y += block-migration.o >> >> common-obj-y += pflib.o >> >> +common-obj-y += ft_trans_file.o >> >> >> >> common-obj-$(CONFIG_BRLAPI) += baum.o >> >> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o >> >> migration-fd.o >> >> diff --git a/ft_trans_file.c b/ft_trans_file.c >> >> new file mode 100644 >> >> index 0000000..2b42b95 >> >> --- /dev/null >> >> +++ b/ft_trans_file.c >> >> @@ -0,0 +1,624 @@ >> >> +/* >> >> + * Fault tolerant VM transaction QEMUFile >> >> + * >> >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. >> >> + * >> >> + * This work is licensed under the terms of the GNU GPL, version 2. >> >> See >> >> + * the COPYING file in the top-level directory. >> >> + * >> >> + * This source code is based on buffered_file.c. >> >> + * Copyright IBM, Corp. 2008 >> >> + * Authors: >> >> + * Anthony Liguori <aliguori@us.ibm.com> >> >> + */ >> >> + >> >> +#include "qemu-common.h" >> >> +#include "qemu-error.h" >> >> +#include "hw/hw.h" >> >> +#include "qemu-timer.h" >> >> +#include "sysemu.h" >> >> +#include "qemu-char.h" >> >> +#include "trace.h" >> >> +#include "ft_trans_file.h" >> >> + >> >> +typedef struct FtTransHdr >> >> +{ >> >> + uint16_t cmd; >> >> + uint16_t id; >> >> + uint32_t seq; >> >> + uint32_t payload_len; >> >> +} FtTransHdr; >> >> + >> >> +typedef struct QEMUFileFtTrans >> >> +{ >> >> + FtTransPutBufferFunc *put_buffer; >> >> + FtTransGetBufferFunc *get_buffer; >> >> + FtTransPutReadyFunc *put_ready; >> >> + FtTransGetReadyFunc *get_ready; >> >> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; >> >> + FtTransCloseFunc *close; >> >> + void *opaque; >> >> + QEMUFile *file; >> >> + >> >> + enum QEMU_VM_TRANSACTION_STATE state; >> >> + uint32_t seq; >> >> + uint16_t id; >> >> + >> >> + int has_error; >> >> + >> >> + bool freeze_output; >> >> + bool freeze_input; >> >> + bool rate_limit; >> >> + bool is_sender; >> >> + bool is_payload; >> >> + >> >> + uint8_t *buf; >> >> + size_t buf_max_size; >> >> + size_t put_offset; >> >> + size_t get_offset; >> >> + >> >> + FtTransHdr header; >> >> + size_t header_offset; >> >> +} QEMUFileFtTrans; >> >> + >> >> +#define IO_BUF_SIZE 32768 >> >> + >> >> +static void ft_trans_append(QEMUFileFtTrans *s, >> >> + const uint8_t *buf, size_t size) >> >> +{ >> >> + if (size > (s->buf_max_size - s->put_offset)) { >> >> + trace_ft_trans_realloc(s->buf_max_size, size + 1024); >> >> + s->buf_max_size += size + 1024; >> >> + s->buf = qemu_realloc(s->buf, s->buf_max_size); >> >> + } >> >> + >> >> + trace_ft_trans_append(size); >> >> + memcpy(s->buf + s->put_offset, buf, size); >> >> + s->put_offset += size; >> >> +} >> >> + >> >> +static void ft_trans_flush(QEMUFileFtTrans *s) >> >> +{ >> >> + size_t offset = 0; >> >> + >> >> + if (s->has_error) { >> >> + error_report("flush when error %d, bailing", s->has_error); >> >> + return; >> >> + } >> >> + >> >> + while (offset < s->put_offset) { >> >> + ssize_t ret; >> >> + >> >> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset >> >> - >> >> offset); >> >> + if (ret == -EAGAIN) { >> >> + break; >> >> + } >> >> + >> >> + if (ret <= 0) { >> >> + error_report("error flushing data, %s", strerror(errno)); >> >> + s->has_error = FT_TRANS_ERR_FLUSH; >> >> + break; >> >> + } else { >> >> + offset += ret; >> >> + } >> >> + } >> >> + >> >> + trace_ft_trans_flush(offset, s->put_offset); >> >> + memmove(s->buf, s->buf + offset, s->put_offset - offset); >> >> + s->put_offset -= offset; >> >> + s->freeze_output = !!s->put_offset; >> >> +} >> >> + >> >> +static ssize_t ft_trans_put(void *opaque, void *buf, int size) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + size_t offset = 0; >> >> + ssize_t len; >> >> + >> >> + /* flush buffered data before putting next */ >> >> + if (s->put_offset) { >> >> + ft_trans_flush(s); >> >> + } >> >> + >> >> + while (!s->freeze_output && offset < size) { >> >> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - >> >> offset); >> >> + >> >> + if (len == -EAGAIN) { >> >> + trace_ft_trans_freeze_output(); >> >> + s->freeze_output = 1; >> >> + break; >> >> + } >> >> + >> >> + if (len <= 0) { >> >> + error_report("putting data failed, %s", strerror(errno)); >> >> + s->has_error = 1; >> >> + offset = -EINVAL; >> >> + break; >> >> + } >> >> + >> >> + offset += len; >> >> + } >> >> + >> >> + if (s->freeze_output) { >> >> + ft_trans_append(s, buf + offset, size - offset); >> >> + offset = size; >> >> + } >> >> + >> >> + return offset; >> >> +} >> >> + >> >> +static int ft_trans_send_header(QEMUFileFtTrans *s, >> >> + enum QEMU_VM_TRANSACTION_STATE state, >> >> + uint32_t payload_len) >> >> +{ >> >> + int ret; >> >> + FtTransHdr *hdr = &s->header; >> >> + >> >> + trace_ft_trans_send_header(state); >> >> + >> >> + hdr->cmd = s->state = state; >> >> + hdr->id = s->id; >> >> + hdr->seq = s->seq; >> >> + hdr->payload_len = payload_len; >> >> + >> >> + ret = ft_trans_put(s, hdr, sizeof(*hdr)); >> >> + if (ret < 0) { >> >> + error_report("send header failed"); >> >> + s->has_error = FT_TRANS_ERR_SEND_HDR; >> >> + } >> >> + >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, >> >> int64_t >> >> pos, int size) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + ssize_t ret; >> >> + >> >> + trace_ft_trans_put_buffer(size, pos); >> >> + >> >> + if (s->has_error) { >> >> + error_report("put_buffer when error %d, bailing", >> >> s->has_error); >> >> + return -EINVAL; >> >> + } >> >> + >> >> + /* assuming qemu_file_put_notify() is calling */ >> >> + if (pos == 0 && size == 0) { >> >> + trace_ft_trans_put_ready(); >> >> + ft_trans_flush(s); >> >> + >> >> + if (!s->freeze_output) { >> >> + trace_ft_trans_cb(s->put_ready); >> >> + ret = s->put_ready(); >> >> + } >> >> + >> >> + ret = 0; >> >> + goto out; >> >> + } >> >> + >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + ret = ft_trans_put(s, (uint8_t *)buf, size); >> >> + if (ret < 0) { >> >> + error_report("send palyload failed"); >> >> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; >> >> + goto out; >> >> + } >> >> + >> >> + s->seq++; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + size_t offset = 0; >> >> + ssize_t len; >> >> + >> >> + s->freeze_input = 0; >> >> + >> >> + while (offset < size) { >> >> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, >> >> + 0, size - offset); >> >> + if (len == -EAGAIN) { >> >> + trace_ft_trans_freeze_input(); >> >> + s->freeze_input = 1; >> >> + break; >> >> + } >> >> + >> >> + if (len <= 0) { >> >> + error_report("fill buffer failed, %s", strerror(errno)); >> >> + s->has_error = 1; >> >> + return -EINVAL; >> >> + } >> >> + >> >> + offset += len; >> >> + } >> >> + >> >> + return offset; >> >> +} >> >> + >> >> +static int ft_trans_recv_header(QEMUFileFtTrans *s) >> >> +{ >> >> + int ret; >> >> + char *buf = (char *)&s->header + s->header_offset; >> >> + >> >> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - >> >> s->header_offset); >> >> + if (ret < 0) { >> >> + error_report("recv header failed"); >> >> + s->has_error = FT_TRANS_ERR_RECV_HDR; >> >> + goto out; >> >> + } >> >> + >> >> + s->header_offset += ret; >> >> + if (s->header_offset == sizeof(FtTransHdr)) { >> >> + trace_ft_trans_recv_header(s->header.cmd); >> >> + s->state = s->header.cmd; >> >> + s->header_offset = 0; >> >> + >> >> + if (!s->is_sender) { >> >> + s->id = s->header.id; >> >> + s->seq = s->header.seq; >> >> + } >> >> + } >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_recv_payload(QEMUFileFtTrans *s) >> >> +{ >> >> + QEMUFile *f = s->file; >> >> + int ret = -1; >> >> + >> >> + /* extend QEMUFile buf if there weren't enough space */ >> >> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { >> >> + s->buf_max_size += (s->header.payload_len - >> >> + (s->buf_max_size - s->get_offset)); >> >> + s->buf = qemu_realloc_buffer(f, s->buf_max_size); >> >> + } >> >> + >> >> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, >> >> + s->header.payload_len); >> >> + if (ret < 0) { >> >> + error_report("recv payload failed"); >> >> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; >> >> + goto out; >> >> + } >> >> + >> >> + trace_ft_trans_recv_payload(ret, s->header.payload_len, >> >> s->get_offset); >> >> + >> >> + s->header.payload_len -= ret; >> >> + s->get_offset += ret; >> >> + s->is_payload = !!s->header.payload_len; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_recv(QEMUFileFtTrans *s) >> >> +{ >> >> + int ret; >> >> + >> >> + /* get payload and return */ >> >> + if (s->is_payload) { >> >> + ret = ft_trans_recv_payload(s); >> >> + goto out; >> >> + } >> >> + >> >> + ret = ft_trans_recv_header(s); >> >> + if (ret < 0 || s->freeze_input) { >> >> + goto out; >> >> + } >> >> + >> >> + switch (s->state) { >> >> + case QEMU_VM_TRANSACTION_BEGIN: >> >> + /* CONTINUE or COMMIT should come shortly */ >> >> + s->is_payload = 0; >> >> + break; >> >> + >> >> + case QEMU_VM_TRANSACTION_CONTINUE: >> >> + /* get payload */ >> >> + s->is_payload = 1; >> >> + break; >> >> + >> >> + case QEMU_VM_TRANSACTION_COMMIT: >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + trace_ft_trans_cb(s->get_ready); >> >> + ret = s->get_ready(s->opaque); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + qemu_clear_buffer(s->file); >> >> + s->get_offset = 0; >> >> + s->is_payload = 0; >> >> + >> >> + break; >> >> + >> >> + case QEMU_VM_TRANSACTION_ATOMIC: >> >> + /* not implemented yet */ >> >> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d", >> >> + ret); >> >> + break; >> >> + >> >> + case QEMU_VM_TRANSACTION_CANCEL: >> >> + /* return -EINVAL until migrate cancel on recevier side is >> >> supported */ >> >> + ret = -EINVAL; >> >> + break; >> >> + >> >> + default: >> >> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + } >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, >> >> + int64_t pos, int size) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + int ret; >> >> + >> >> + if (s->has_error) { >> >> + error_report("get_buffer when error %d, bailing", >> >> s->has_error); >> >> + return -EINVAL; >> >> + } >> >> + >> >> + /* assuming qemu_file_get_notify() is calling */ >> >> + if (pos == 0 && size == 0) { >> >> + trace_ft_trans_get_ready(); >> >> + s->freeze_input = 0; >> >> + >> >> + /* sender should be waiting for ACK */ >> >> + if (s->is_sender) { >> >> + ret = ft_trans_recv_header(s); >> >> + if (s->freeze_input) { >> >> + ret = 0; >> >> + goto out; >> >> + } >> >> + if (ret < 0) { >> >> + error_report("recv ack failed"); >> >> + goto out; >> >> + } >> >> + >> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> >> + error_report("recv invalid state %d", s->state); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + goto out; >> >> + } >> >> + >> >> + trace_ft_trans_cb(s->get_ready); >> >> + ret = s->get_ready(s->opaque); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + /* proceed trans id */ >> >> + s->id++; >> >> + >> >> + return 0; >> >> + } >> >> + >> >> + /* set QEMUFile buf at beginning */ >> >> + if (!s->buf) { >> >> + s->buf = buf; >> >> + } >> >> + >> >> + ret = ft_trans_recv(s); >> >> + goto out; >> >> + } >> >> + >> >> + ret = s->get_offset; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_close(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + int ret; >> >> + >> >> + trace_ft_trans_close(); >> >> + ret = s->close(s->opaque); >> >> + if (s->is_sender) { >> >> + qemu_free(s->buf); >> >> + } >> >> + qemu_free(s); >> >> + >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_rate_limit(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + >> >> + if (s->has_error) { >> >> + return 0; >> >> + } >> >> + >> >> + if (s->rate_limit && s->freeze_output) { >> >> + return 1; >> >> + } >> >> + >> >> + return 0; >> >> +} >> >> + >> >> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + >> >> + if (s->has_error) { >> >> + goto out; >> >> + } >> >> + >> >> + s->rate_limit = !!new_rate; >> >> + >> >> +out: >> >> + return s->rate_limit; >> >> +} >> >> + >> >> +int ft_trans_begin(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + int ret; >> >> + s->seq = 0; >> >> + >> >> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ >> >> + if (!s->is_sender) { >> >> + if (s->state != QEMU_VM_TRANSACTION_INIT) { >> >> + error_report("invalid state %d", s->state); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + } >> >> + >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> >> + goto out; >> >> + } >> >> + >> >> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction >> >> */ >> >> + if (s->state == QEMU_VM_TRANSACTION_INIT) { >> >> +retry: >> >> + ret = ft_trans_recv_header(s); >> >> + if (s->freeze_input) { >> >> + goto retry; >> >> + } >> >> + if (ret < 0) { >> >> + error_report("recv ack failed"); >> >> + goto out; >> >> + } >> >> + >> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> >> + error_report("recv invalid state %d", s->state); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + goto out; >> >> + } >> >> + } >> >> + >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + s->state = QEMU_VM_TRANSACTION_CONTINUE; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +int ft_trans_commit(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + int ret; >> >> + >> >> + if (!s->is_sender) { >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> >> + goto out; >> >> + } >> >> + >> >> + /* sender should flush buf before sending COMMIT */ >> >> + qemu_fflush(s->file); >> >> + >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + while (!s->has_error && s->put_offset) { >> >> + ft_trans_flush(s); >> >> + if (s->freeze_output) { >> >> + s->wait_for_unfreeze(s); >> >> + } >> >> + } >> >> + >> >> + if (s->has_error) { >> >> + ret = -EINVAL; >> >> + goto out; >> >> + } >> >> + >> >> + ret = ft_trans_recv_header(s); >> >> + if (s->freeze_input) { >> >> + ret = -EAGAIN; >> >> + goto out; >> >> + } >> >> + if (ret < 0) { >> >> + error_report("recv ack failed"); >> >> + goto out; >> >> + } >> >> + >> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> >> + error_report("recv invalid state %d", s->state); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + goto out; >> >> + } >> >> + >> >> + s->id++; >> >> + ret = 0; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +int ft_trans_cancel(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + >> >> + /* invalid until migrate cancel on recevier side is supported */ >> >> + if (!s->is_sender) { >> >> + return -EINVAL; >> >> + } >> >> + >> >> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); >> >> +} >> >> + >> >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, >> >> + FtTransPutBufferFunc *put_buffer, >> >> + FtTransGetBufferFunc *get_buffer, >> >> + FtTransPutReadyFunc *put_ready, >> >> + FtTransGetReadyFunc *get_ready, >> >> + FtTransWaitForUnfreezeFunc >> >> *wait_for_unfreeze, >> >> + FtTransCloseFunc *close, >> >> + bool is_sender) >> >> +{ >> >> + QEMUFileFtTrans *s; >> >> + >> >> + s = qemu_mallocz(sizeof(*s)); >> >> + >> >> + s->opaque = opaque; >> >> + s->put_buffer = put_buffer; >> >> + s->get_buffer = get_buffer; >> >> + s->put_ready = put_ready; >> >> + s->get_ready = get_ready; >> >> + s->wait_for_unfreeze = wait_for_unfreeze; >> >> + s->close = close; >> >> + s->is_sender = is_sender; >> >> + s->id = 0; >> >> + s->seq = 0; >> >> + s->rate_limit = 1; >> >> + >> >> + if (!s->is_sender) { >> >> + s->buf_max_size = IO_BUF_SIZE; >> >> + } >> >> + >> >> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, >> >> ft_trans_get_buffer, >> >> + ft_trans_close, ft_trans_rate_limit, >> >> + ft_trans_set_rate_limit, NULL); >> >> + >> >> + return s->file; >> >> +} >> >> diff --git a/ft_trans_file.h b/ft_trans_file.h >> >> new file mode 100644 >> >> index 0000000..5ca6b53 >> >> --- /dev/null >> >> +++ b/ft_trans_file.h >> >> @@ -0,0 +1,72 @@ >> >> +/* >> >> + * Fault tolerant VM transaction QEMUFile >> >> + * >> >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. >> >> + * >> >> + * This work is licensed under the terms of the GNU GPL, version 2. >> >> See >> >> + * the COPYING file in the top-level directory. >> >> + * >> >> + * This source code is based on buffered_file.h. >> >> + * Copyright IBM, Corp. 2008 >> >> + * Authors: >> >> + * Anthony Liguori <aliguori@us.ibm.com> >> >> + */ >> >> + >> >> +#ifndef QEMU_FT_TRANSACTION_FILE_H >> >> +#define QEMU_FT_TRANSACTION_FILE_H >> >> + >> >> +#include "hw/hw.h" >> >> + >> >> +enum QEMU_VM_TRANSACTION_STATE { >> >> + QEMU_VM_TRANSACTION_NACK = -1, >> >> + QEMU_VM_TRANSACTION_INIT, >> >> + QEMU_VM_TRANSACTION_BEGIN, >> >> + QEMU_VM_TRANSACTION_CONTINUE, >> >> + QEMU_VM_TRANSACTION_COMMIT, >> >> + QEMU_VM_TRANSACTION_CANCEL, >> >> + QEMU_VM_TRANSACTION_ATOMIC, >> >> + QEMU_VM_TRANSACTION_ACK, >> >> +}; >> >> + >> >> +enum FT_MODE { >> >> + FT_ERROR = -1, >> >> + FT_OFF, >> >> + FT_INIT, >> >> + FT_TRANSACTION_BEGIN, >> >> + FT_TRANSACTION_ITER, >> >> + FT_TRANSACTION_COMMIT, >> >> + FT_TRANSACTION_ATOMIC, >> >> + FT_TRANSACTION_RECV, >> >> +}; >> >> +extern enum FT_MODE ft_mode; >> >> + >> >> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ >> >> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ >> >> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ >> >> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ >> >> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ >> >> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed >> >> */ >> >> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ >> >> + >> >> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, >> >> size_t size); >> >> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t >> >> pos, size_t size); >> >> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct >> >> iovec >> >> *iov, int iovcnt); >> >> +typedef int (FtTransPutReadyFunc)(void); >> >> +typedef int (FtTransGetReadyFunc)(void *opaque); >> >> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); >> >> +typedef int (FtTransCloseFunc)(void *opaque); >> >> + >> >> +int ft_trans_begin(void *opaque); >> >> +int ft_trans_commit(void *opaque); >> >> +int ft_trans_cancel(void *opaque); >> >> + >> >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, >> >> + FtTransPutBufferFunc *put_buffer, >> >> + FtTransGetBufferFunc *get_buffer, >> >> + FtTransPutReadyFunc *put_ready, >> >> + FtTransGetReadyFunc *get_ready, >> >> + FtTransWaitForUnfreezeFunc >> >> *wait_for_unfreeze, >> >> + FtTransCloseFunc *close, >> >> + bool is_sender); >> >> + >> >> +#endif >> >> diff --git a/migration.c b/migration.c >> >> index dd3bf94..c5e0146 100644 >> >> --- a/migration.c >> >> +++ b/migration.c >> >> @@ -15,6 +15,7 @@ >> >> #include "migration.h" >> >> #include "monitor.h" >> >> #include "buffered_file.h" >> >> +#include "ft_trans_file.h" >> >> #include "sysemu.h" >> >> #include "block.h" >> >> #include "qemu_socket.h" >> >> @@ -31,6 +32,8 @@ >> >> do { } while (0) >> >> #endif >> >> >> >> +enum FT_MODE ft_mode = FT_OFF; >> >> + >> >> /* Migration speed throttling */ >> >> static int64_t max_throttle = (32 << 20); >> >> >> >> diff --git a/trace-events b/trace-events >> >> index e6138ea..50ac840 100644 >> >> --- a/trace-events >> >> +++ b/trace-events >> >> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) >> >> "spice >> >> wrottn %lu of requested %zd >> >> disable spice_vmc_read(int bytes, int len) "spice read %lu of >> >> requested >> >> %zd" >> >> disable spice_vmc_register_interface(void *scd) "spice vmc registered >> >> interface %p" >> >> disable spice_vmc_unregister_interface(void *scd) "spice vmc >> >> unregistered >> >> interface %p" >> >> + >> >> +# ft_trans_file.c >> >> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing >> >> buffer from %zu by %zu" >> >> +disable ft_trans_append(size_t size) "buffering %zu bytes" >> >> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu >> >> bytes" >> >> +disable ft_trans_send_header(uint16_t cmd) "send header %d" >> >> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" >> >> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d >> >> bytes >> >> at %"PRId64"" >> >> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) >> >> "recv %d of %d total %d" >> >> +disable ft_trans_close(void) "closing" >> >> +disable ft_trans_freeze_output(void) "backend not ready, freezing >> >> output" >> >> +disable ft_trans_freeze_input(void) "backend not ready, freezing >> >> input" >> >> +disable ft_trans_put_ready(void) "file is ready to put" >> >> +disable ft_trans_get_ready(void) "file is ready to get" >> >> +disable ft_trans_cb(void *cb) "callback %p" >> >> -- >> >> 1.7.1.2 >> >> >> >> -- >> >> To unsubscribe from this list: send the line "unsubscribe kvm" in >> >> the body of a message to majordomo@vger.kernel.org >> >> More majordomo info at http://vger.kernel.org/majordomo-info.html >> > >> > > >
diff --git a/Makefile.objs b/Makefile.objs index 353b1a8..04148b5 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o common-obj-y += qdev.o qdev-properties.o common-obj-y += block-migration.o common-obj-y += pflib.o +common-obj-y += ft_trans_file.o common-obj-$(CONFIG_BRLAPI) += baum.o common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o migration-fd.o diff --git a/ft_trans_file.c b/ft_trans_file.c new file mode 100644 index 0000000..2b42b95 --- /dev/null +++ b/ft_trans_file.c @@ -0,0 +1,624 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.c. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + */ + +#include "qemu-common.h" +#include "qemu-error.h" +#include "hw/hw.h" +#include "qemu-timer.h" +#include "sysemu.h" +#include "qemu-char.h" +#include "trace.h" +#include "ft_trans_file.h" + +typedef struct FtTransHdr +{ + uint16_t cmd; + uint16_t id; + uint32_t seq; + uint32_t payload_len; +} FtTransHdr; + +typedef struct QEMUFileFtTrans +{ + FtTransPutBufferFunc *put_buffer; + FtTransGetBufferFunc *get_buffer; + FtTransPutReadyFunc *put_ready; + FtTransGetReadyFunc *get_ready; + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; + FtTransCloseFunc *close; + void *opaque; + QEMUFile *file; + + enum QEMU_VM_TRANSACTION_STATE state; + uint32_t seq; + uint16_t id; + + int has_error; + + bool freeze_output; + bool freeze_input; + bool rate_limit; + bool is_sender; + bool is_payload; + + uint8_t *buf; + size_t buf_max_size; + size_t put_offset; + size_t get_offset; + + FtTransHdr header; + size_t header_offset; +} QEMUFileFtTrans; + +#define IO_BUF_SIZE 32768 + +static void ft_trans_append(QEMUFileFtTrans *s, + const uint8_t *buf, size_t size) +{ + if (size > (s->buf_max_size - s->put_offset)) { + trace_ft_trans_realloc(s->buf_max_size, size + 1024); + s->buf_max_size += size + 1024; + s->buf = qemu_realloc(s->buf, s->buf_max_size); + } + + trace_ft_trans_append(size); + memcpy(s->buf + s->put_offset, buf, size); + s->put_offset += size; +} + +static void ft_trans_flush(QEMUFileFtTrans *s) +{ + size_t offset = 0; + + if (s->has_error) { + error_report("flush when error %d, bailing", s->has_error); + return; + } + + while (offset < s->put_offset) { + ssize_t ret; + + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - offset); + if (ret == -EAGAIN) { + break; + } + + if (ret <= 0) { + error_report("error flushing data, %s", strerror(errno)); + s->has_error = FT_TRANS_ERR_FLUSH; + break; + } else { + offset += ret; + } + } + + trace_ft_trans_flush(offset, s->put_offset); + memmove(s->buf, s->buf + offset, s->put_offset - offset); + s->put_offset -= offset; + s->freeze_output = !!s->put_offset; +} + +static ssize_t ft_trans_put(void *opaque, void *buf, int size) +{ + QEMUFileFtTrans *s = opaque; + size_t offset = 0; + ssize_t len; + + /* flush buffered data before putting next */ + if (s->put_offset) { + ft_trans_flush(s); + } + + while (!s->freeze_output && offset < size) { + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - offset); + + if (len == -EAGAIN) { + trace_ft_trans_freeze_output(); + s->freeze_output = 1; + break; + } + + if (len <= 0) { + error_report("putting data failed, %s", strerror(errno)); + s->has_error = 1; + offset = -EINVAL; + break; + } + + offset += len; + } + + if (s->freeze_output) { + ft_trans_append(s, buf + offset, size - offset); + offset = size; + } + + return offset; +} + +static int ft_trans_send_header(QEMUFileFtTrans *s, + enum QEMU_VM_TRANSACTION_STATE state, + uint32_t payload_len) +{ + int ret; + FtTransHdr *hdr = &s->header; + + trace_ft_trans_send_header(state); + + hdr->cmd = s->state = state; + hdr->id = s->id; + hdr->seq = s->seq; + hdr->payload_len = payload_len; + + ret = ft_trans_put(s, hdr, sizeof(*hdr)); + if (ret < 0) { + error_report("send header failed"); + s->has_error = FT_TRANS_ERR_SEND_HDR; + } + + return ret; +} + +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) +{ + QEMUFileFtTrans *s = opaque; + ssize_t ret; + + trace_ft_trans_put_buffer(size, pos); + + if (s->has_error) { + error_report("put_buffer when error %d, bailing", s->has_error); + return -EINVAL; + } + + /* assuming qemu_file_put_notify() is calling */ + if (pos == 0 && size == 0) { + trace_ft_trans_put_ready(); + ft_trans_flush(s); + + if (!s->freeze_output) { + trace_ft_trans_cb(s->put_ready); + ret = s->put_ready(); + } + + ret = 0; + goto out; + } + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); + if (ret < 0) { + goto out; + } + + ret = ft_trans_put(s, (uint8_t *)buf, size); + if (ret < 0) { + error_report("send palyload failed"); + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; + goto out; + } + + s->seq++; + +out: + return ret; +} + +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) +{ + QEMUFileFtTrans *s = opaque; + size_t offset = 0; + ssize_t len; + + s->freeze_input = 0; + + while (offset < size) { + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, + 0, size - offset); + if (len == -EAGAIN) { + trace_ft_trans_freeze_input(); + s->freeze_input = 1; + break; + } + + if (len <= 0) { + error_report("fill buffer failed, %s", strerror(errno)); + s->has_error = 1; + return -EINVAL; + } + + offset += len; + } + + return offset; +} + +static int ft_trans_recv_header(QEMUFileFtTrans *s) +{ + int ret; + char *buf = (char *)&s->header + s->header_offset; + + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - s->header_offset); + if (ret < 0) { + error_report("recv header failed"); + s->has_error = FT_TRANS_ERR_RECV_HDR; + goto out; + } + + s->header_offset += ret; + if (s->header_offset == sizeof(FtTransHdr)) { + trace_ft_trans_recv_header(s->header.cmd); + s->state = s->header.cmd; + s->header_offset = 0; + + if (!s->is_sender) { + s->id = s->header.id; + s->seq = s->header.seq; + } + } + +out: + return ret; +} + +static int ft_trans_recv_payload(QEMUFileFtTrans *s) +{ + QEMUFile *f = s->file; + int ret = -1; + + /* extend QEMUFile buf if there weren't enough space */ + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { + s->buf_max_size += (s->header.payload_len - + (s->buf_max_size - s->get_offset)); + s->buf = qemu_realloc_buffer(f, s->buf_max_size); + } + + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, + s->header.payload_len); + if (ret < 0) { + error_report("recv payload failed"); + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; + goto out; + } + + trace_ft_trans_recv_payload(ret, s->header.payload_len, s->get_offset); + + s->header.payload_len -= ret; + s->get_offset += ret; + s->is_payload = !!s->header.payload_len; + +out: + return ret; +} + +static int ft_trans_recv(QEMUFileFtTrans *s) +{ + int ret; + + /* get payload and return */ + if (s->is_payload) { + ret = ft_trans_recv_payload(s); + goto out; + } + + ret = ft_trans_recv_header(s); + if (ret < 0 || s->freeze_input) { + goto out; + } + + switch (s->state) { + case QEMU_VM_TRANSACTION_BEGIN: + /* CONTINUE or COMMIT should come shortly */ + s->is_payload = 0; + break; + + case QEMU_VM_TRANSACTION_CONTINUE: + /* get payload */ + s->is_payload = 1; + break; + + case QEMU_VM_TRANSACTION_COMMIT: + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); + if (ret < 0) { + goto out; + } + + trace_ft_trans_cb(s->get_ready); + ret = s->get_ready(s->opaque); + if (ret < 0) { + goto out; + } + + qemu_clear_buffer(s->file); + s->get_offset = 0; + s->is_payload = 0; + + break; + + case QEMU_VM_TRANSACTION_ATOMIC: + /* not implemented yet */ + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d", + ret); + break; + + case QEMU_VM_TRANSACTION_CANCEL: + /* return -EINVAL until migrate cancel on recevier side is supported */ + ret = -EINVAL; + break; + + default: + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + } + +out: + return ret; +} + +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, + int64_t pos, int size) +{ + QEMUFileFtTrans *s = opaque; + int ret; + + if (s->has_error) { + error_report("get_buffer when error %d, bailing", s->has_error); + return -EINVAL; + } + + /* assuming qemu_file_get_notify() is calling */ + if (pos == 0 && size == 0) { + trace_ft_trans_get_ready(); + s->freeze_input = 0; + + /* sender should be waiting for ACK */ + if (s->is_sender) { + ret = ft_trans_recv_header(s); + if (s->freeze_input) { + ret = 0; + goto out; + } + if (ret < 0) { + error_report("recv ack failed"); + goto out; + } + + if (s->state != QEMU_VM_TRANSACTION_ACK) { + error_report("recv invalid state %d", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + goto out; + } + + trace_ft_trans_cb(s->get_ready); + ret = s->get_ready(s->opaque); + if (ret < 0) { + goto out; + } + + /* proceed trans id */ + s->id++; + + return 0; + } + + /* set QEMUFile buf at beginning */ + if (!s->buf) { + s->buf = buf; + } + + ret = ft_trans_recv(s); + goto out; + } + + ret = s->get_offset; + +out: + return ret; +} + +static int ft_trans_close(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + int ret; + + trace_ft_trans_close(); + ret = s->close(s->opaque); + if (s->is_sender) { + qemu_free(s->buf); + } + qemu_free(s); + + return ret; +} + +static int ft_trans_rate_limit(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + + if (s->has_error) { + return 0; + } + + if (s->rate_limit && s->freeze_output) { + return 1; + } + + return 0; +} + +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) +{ + QEMUFileFtTrans *s = opaque; + + if (s->has_error) { + goto out; + } + + s->rate_limit = !!new_rate; + +out: + return s->rate_limit; +} + +int ft_trans_begin(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + int ret; + s->seq = 0; + + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ + if (!s->is_sender) { + if (s->state != QEMU_VM_TRANSACTION_INIT) { + error_report("invalid state %d", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + } + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); + goto out; + } + + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */ + if (s->state == QEMU_VM_TRANSACTION_INIT) { +retry: + ret = ft_trans_recv_header(s); + if (s->freeze_input) { + goto retry; + } + if (ret < 0) { + error_report("recv ack failed"); + goto out; + } + + if (s->state != QEMU_VM_TRANSACTION_ACK) { + error_report("recv invalid state %d", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + goto out; + } + } + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); + if (ret < 0) { + goto out; + } + + s->state = QEMU_VM_TRANSACTION_CONTINUE; + +out: + return ret; +} + +int ft_trans_commit(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + int ret; + + if (!s->is_sender) { + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); + goto out; + } + + /* sender should flush buf before sending COMMIT */ + qemu_fflush(s->file); + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); + if (ret < 0) { + goto out; + } + + while (!s->has_error && s->put_offset) { + ft_trans_flush(s); + if (s->freeze_output) { + s->wait_for_unfreeze(s); + } + } + + if (s->has_error) { + ret = -EINVAL; + goto out; + } + + ret = ft_trans_recv_header(s); + if (s->freeze_input) { + ret = -EAGAIN; + goto out; + } + if (ret < 0) { + error_report("recv ack failed"); + goto out; + } + + if (s->state != QEMU_VM_TRANSACTION_ACK) { + error_report("recv invalid state %d", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + goto out; + } + + s->id++; + ret = 0; + +out: + return ret; +} + +int ft_trans_cancel(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + + /* invalid until migrate cancel on recevier side is supported */ + if (!s->is_sender) { + return -EINVAL; + } + + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); +} + +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, + FtTransPutBufferFunc *put_buffer, + FtTransGetBufferFunc *get_buffer, + FtTransPutReadyFunc *put_ready, + FtTransGetReadyFunc *get_ready, + FtTransWaitForUnfreezeFunc *wait_for_unfreeze, + FtTransCloseFunc *close, + bool is_sender) +{ + QEMUFileFtTrans *s; + + s = qemu_mallocz(sizeof(*s)); + + s->opaque = opaque; + s->put_buffer = put_buffer; + s->get_buffer = get_buffer; + s->put_ready = put_ready; + s->get_ready = get_ready; + s->wait_for_unfreeze = wait_for_unfreeze; + s->close = close; + s->is_sender = is_sender; + s->id = 0; + s->seq = 0; + s->rate_limit = 1; + + if (!s->is_sender) { + s->buf_max_size = IO_BUF_SIZE; + } + + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer, + ft_trans_close, ft_trans_rate_limit, + ft_trans_set_rate_limit, NULL); + + return s->file; +} diff --git a/ft_trans_file.h b/ft_trans_file.h new file mode 100644 index 0000000..5ca6b53 --- /dev/null +++ b/ft_trans_file.h @@ -0,0 +1,72 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.h. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + */ + +#ifndef QEMU_FT_TRANSACTION_FILE_H +#define QEMU_FT_TRANSACTION_FILE_H + +#include "hw/hw.h" + +enum QEMU_VM_TRANSACTION_STATE { + QEMU_VM_TRANSACTION_NACK = -1, + QEMU_VM_TRANSACTION_INIT, + QEMU_VM_TRANSACTION_BEGIN, + QEMU_VM_TRANSACTION_CONTINUE, + QEMU_VM_TRANSACTION_COMMIT, + QEMU_VM_TRANSACTION_CANCEL, + QEMU_VM_TRANSACTION_ATOMIC, + QEMU_VM_TRANSACTION_ACK, +}; + +enum FT_MODE { + FT_ERROR = -1, + FT_OFF, + FT_INIT, + FT_TRANSACTION_BEGIN, + FT_TRANSACTION_ITER, + FT_TRANSACTION_COMMIT, + FT_TRANSACTION_ATOMIC, + FT_TRANSACTION_RECV, +}; +extern enum FT_MODE ft_mode; + +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */ +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ + +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size); +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t pos, size_t size); +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec *iov, int iovcnt); +typedef int (FtTransPutReadyFunc)(void); +typedef int (FtTransGetReadyFunc)(void *opaque); +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); +typedef int (FtTransCloseFunc)(void *opaque); + +int ft_trans_begin(void *opaque); +int ft_trans_commit(void *opaque); +int ft_trans_cancel(void *opaque); + +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, + FtTransPutBufferFunc *put_buffer, + FtTransGetBufferFunc *get_buffer, + FtTransPutReadyFunc *put_ready, + FtTransGetReadyFunc *get_ready, + FtTransWaitForUnfreezeFunc *wait_for_unfreeze, + FtTransCloseFunc *close, + bool is_sender); + +#endif diff --git a/migration.c b/migration.c index dd3bf94..c5e0146 100644 --- a/migration.c +++ b/migration.c @@ -15,6 +15,7 @@ #include "migration.h" #include "monitor.h" #include "buffered_file.h" +#include "ft_trans_file.h" #include "sysemu.h" #include "block.h" #include "qemu_socket.h" @@ -31,6 +32,8 @@ do { } while (0) #endif +enum FT_MODE ft_mode = FT_OFF; + /* Migration speed throttling */ static int64_t max_throttle = (32 << 20); diff --git a/trace-events b/trace-events index e6138ea..50ac840 100644 --- a/trace-events +++ b/trace-events @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) "spice wrottn %lu of requested %zd disable spice_vmc_read(int bytes, int len) "spice read %lu of requested %zd" disable spice_vmc_register_interface(void *scd) "spice vmc registered interface %p" disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered interface %p" + +# ft_trans_file.c +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing buffer from %zu by %zu" +disable ft_trans_append(size_t size) "buffering %zu bytes" +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu bytes" +disable ft_trans_send_header(uint16_t cmd) "send header %d" +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes at %"PRId64"" +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) "recv %d of %d total %d" +disable ft_trans_close(void) "closing" +disable ft_trans_freeze_output(void) "backend not ready, freezing output" +disable ft_trans_freeze_input(void) "backend not ready, freezing input" +disable ft_trans_put_ready(void) "file is ready to put" +disable ft_trans_get_ready(void) "file is ready to get" +disable ft_trans_cb(void *cb) "callback %p"